数据记录文件转换-python代码

import tkinter as tk
from tkinter import filedialog, messagebox
import os
import struct

class DataToCsvConverter:
    def __init__(self, root):
        self.root = root
        self.root.title("Data转CSV转换器")
        self.root.geometry("700x400")

        # 编码选择
        tk.Label(root, text="选择编码:").grid(row=0, column=0, padx=10, pady=5, sticky="w")
        self.encoding_var = tk.StringVar(value="GB2312")
        encoding_frame = tk.Frame(root)
        encoding_frame.grid(row=0, column=1, padx=10, pady=5, sticky="we")

        encodings = ["GB2312", "utf-8", "GBK", "big5"]
        encoding_menu = tk.OptionMenu(encoding_frame, self.encoding_var, *encodings)
        encoding_menu.pack(fill="x")

        # 输入文件选择
        tk.Label(root, text="输入文件:").grid(row=1, column=0, padx=10, pady=5, sticky="w")

        input_frame = tk.Frame(root)
        input_frame.grid(row=1, column=1, padx=10, pady=5, sticky="we")

        self.input_path_var = tk.StringVar()
        input_entry = tk.Entry(input_frame, textvariable=self.input_path_var)
        input_entry.pack(side="left", fill="x", expand=True, padx=(0, 10))

        browse_btn = tk.Button(input_frame, text="浏览", command=self.browse_input_file)
        browse_btn.pack(side="right")

        # 输出文件路径
        tk.Label(root, text="输出文件:").grid(row=2, column=0, padx=10, pady=5, sticky="w")

        output_frame = tk.Frame(root)
        output_frame.grid(row=2, column=1, padx=10, pady=5, sticky="we")

        self.output_path_var = tk.StringVar()
        output_entry = tk.Entry(output_frame, textvariable=self.output_path_var)
        output_entry.pack(fill="x", expand=True)

        # 文件信息显示区域
        tk.Label(root, text="文件信息:").grid(row=3, column=0, padx=10, pady=5, sticky="nw")
        info_frame = tk.Frame(root, bd=1, relief=tk.SUNKEN)
        info_frame.grid(row=3, column=1, padx=10, pady=5, sticky="we")

        self.length_var = tk.StringVar(value="每条记录最大字节数: --")
        self.dez_var = tk.StringVar(value="字段数量: --")
        self.maxval_var = tk.StringVar(value="最大行数: --")
        self.actual_rows_var = tk.StringVar(value="实际行数: --")

        tk.Label(info_frame, textvariable=self.length_var).pack(anchor="w", padx=5, pady=2)
        tk.Label(info_frame, textvariable=self.dez_var).pack(anchor="w", padx=5, pady=2)
        tk.Label(info_frame, textvariable=self.maxval_var).pack(anchor="w", padx=5, pady=2)
        tk.Label(info_frame, textvariable=self.actual_rows_var).pack(anchor="w", padx=5, pady=2)

        # 转换按钮
        convert_btn = tk.Button(root, text="开始转换", command=self.convert_file, height=2, width=15)
        convert_btn.grid(row=4, column=0, columnspan=2, pady=10)

        # 状态标签
        self.status_var = tk.StringVar(value="就绪")
        status_label = tk.Label(root, textvariable=self.status_var, fg="blue")
        status_label.grid(row=5, column=0, columnspan=2, sticky="w", padx=10)

        # 配置列权重,使界面可缩放
        root.grid_columnconfigure(1, weight=1)
        for i in range(6):
            root.grid_rowconfigure(i, weight=1)

    def browse_input_file(self):
        """浏览并选择输入的data文件,并解析显示文件信息"""
        file_path = filedialog.askopenfilename(
            title="选择data文件",
            filetypes=[("Data files", "*.data"), ("All files", "*.*")]
        )
        if file_path:
            self.input_path_var.set(file_path)
            # 自动生成输出文件路径
            dir_name, file_name = os.path.split(file_path)
            base_name = os.path.splitext(file_name)[0]
            output_path = os.path.join(dir_name, f"{base_name}.csv")
            self.output_path_var.set(output_path)

            # 解析并显示文件信息
            self.display_file_info(file_path)

    def read_bytes_as_int(self, file, offset, length):
        """从文件指定位置读取指定长度的字节并转换为整数"""
        file.seek(offset)
        bytes_data = file.read(length)
        # 使用小端模式解析4字节整数
        return struct.unpack('<I', bytes_data)[0]

    def display_file_info(self, file_path):
        """解析文件信息并在界面上显示"""
        try:
            with open(file_path, 'rb') as data_file:
                # 读取每条记录最大字节数 (0x020c位置,4字节)
                record_length = self.read_bytes_as_int(data_file, 0x020c, 4)
                # 读取字段数量 (0x0214位置,4字节)
                field_count = self.read_bytes_as_int(data_file, 0x0214, 4)
                # 读取最大行数 (0x0218位置,4字节)
                max_rows = self.read_bytes_as_int(data_file, 0x0218, 4)
                # 读取实际行数 (0x0208位置,4字节)
                actual_rows = self.read_bytes_as_int(data_file, 0x0208, 4)

                # 更新界面显示
                self.length_var.set(f"每条记录最大字节数: {record_length}")
                self.dez_var.set(f"字段数量: {field_count}")
                self.maxval_var.set(f"最大行数: {max_rows}")
                self.actual_rows_var.set(f"实际行数: {actual_rows}")
                self.status_var.set("文件信息解析成功")

        except Exception as e:
            self.status_var.set(f"文件信息解析失败: {str(e)}")
            messagebox.showerror("错误", f"解析文件信息时发生错误:\n{str(e)}")

    def convert_file(self):
        """将data文件转换为csv文件,包含^替换为逗号的处理"""
        input_path = self.input_path_var.get()
        output_path = self.output_path_var.get()
        encoding = self.encoding_var.get()

        # 验证输入
        if not input_path or not os.path.exists(input_path):
            messagebox.showerror("错误", "请选择有效的输入文件")
            return

        if not output_path:
            messagebox.showerror("错误", "请指定输出文件路径")
            return

        try:
            self.status_var.set("正在转换...")
            self.root.update()

            with open(input_path, 'rb') as data_file:
                # 读取文件信息
                record_length = self.read_bytes_as_int(data_file, 0x020c, 4)
                field_count = self.read_bytes_as_int(data_file, 0x0214, 4)
                max_rows = self.read_bytes_as_int(data_file, 0x0218, 4)
                actual_rows = self.read_bytes_as_int(data_file, 0x0208, 4)

                # 第一条记录的地址
                first_record_offset = 0x0400

                # 写入CSV文件
                with open(output_path, 'w', encoding=encoding, newline='') as csv_file:
                    # 计算每个字段的长度
                    field_length = record_length // field_count

                    for row in range(actual_rows):
                        # 计算当前记录的偏移量
                        record_offset = first_record_offset + row * record_length
                        data_file.seek(record_offset)

                        row_data = []
                        for field in range(field_count):
                            # 读取字段数据
                            field_data = data_file.read(field_length)
                            # 去除字符串结束符'\0'并解码
                            try:
                                # 找到第一个'\0'的位置
                                null_pos = field_data.find(b'\x00')
                                if null_pos != -1:
                                    field_data = field_data[:null_pos]
                                # 解码字段
                                field_str = field_data.decode(encoding, errors='replace')
                                # 将字段中的^替换为英文逗号
                                field_str = field_str.replace('^', ',')
                            except UnicodeDecodeError:
                                field_str = f"[无法解码: {field_data.hex()}]"

                            row_data.append(field_str.strip())

                        # 写入CSV行
                        csv_file.write(','.join(row_data) + '\n')

                        # 更新进度
                        if row % 100 == 0:
                            self.status_var.set(f"正在转换... {row+1}/{actual_rows}")
                            self.root.update()

            self.status_var.set("转换完成!")
            messagebox.showinfo("成功", f"文件已成功转换为:\n{output_path}")

        except Exception as e:
            self.status_var.set(f"转换失败: {str(e)}")
            messagebox.showerror("错误", f"转换过程中发生错误:\n{str(e)}")

if __name__ == "__main__":
    root = tk.Tk()
    app = DataToCsvConverter(root)
    root.mainloop()