import sqlite3 import csv import time import os # 记录开始时间 start_time = time.time() # 连接数据库(动态获取数据库路径) db_path = os.path.abspath('../db.sqlite3') print(f"[{time.strftime('%H:%M:%S')}] 正在连接数据库: {db_path}") try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # 在插入前清空表 cursor.execute("DELETE FROM product;") print(f"[{time.strftime('%H:%M:%S')}] ✅ 数据库连接成功") # 读取CSV文件 csv_file = 'product.csv' print(f"\n[{time.strftime('%H:%M:%S')}] 开始导入文件: {csv_file}") with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.reader(f) # 读取标题行 try: header = next(reader) num_columns = len(header) print(f"[{time.strftime('%H:%M:%S')}] 检测到 {num_columns} 列 | 标题: {', '.join(header)}") except StopIteration: print("❌ 错误:CSV文件为空或格式不正确") exit() # 准备SQL语句 placeholders = ', '.join(['?'] * num_columns) sql = f"INSERT INTO product VALUES ({placeholders})" print(f"[{time.strftime('%H:%M:%S')}] 生成SQL语句: {sql}\n") # 插入数据 total_rows = 0 for i, row in enumerate(reader, 1): # 关键修改:将空白值转换为0 processed_row = [ 0 if str(cell).strip() == '' else cell # 空白转0,保留非空值 for cell in row ] cursor.execute(sql, processed_row) total_rows += 1 # 每100行提示进度 if i % 10 == 0: print(f"[{time.strftime('%H:%M:%S')}] 已插入 {i} 行...", end='\r') # 提交事务 conn.commit() print(f"\n[{time.strftime('%H:%M:%S')}] ✅ 数据插入完成,共 {total_rows} 行") except FileNotFoundError: print(f"❌ 错误:CSV文件不存在,当前搜索路径: {os.path.abspath(csv_file)}") except sqlite3.Error as e: print(f"❌ 数据库错误: {str(e)}") conn.rollback() except Exception as e: print(f"❌ 发生异常: {str(e)}") finally: if 'conn' in locals(): conn.close() print(f"[{time.strftime('%H:%M:%S')}] 数据库连接已关闭") # 计算总耗时 end_time = time.time() print(f"\n总耗时: {end_time - start_time:.2f} 秒")