import sqlite3 import psycopg2 import os import sys import datetime from dotenv import load_dotenv from psycopg2 import sql from collections import defaultdict # 加载环境变量 load_dotenv() class SQLiteToPostgresMigrator: def __init__(self): # SQLite 配置 self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3') # PostgreSQL 配置 self.pg_config = { 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'), 'user': os.getenv('PG_DB_USER', 'wmsuser'), 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'), 'host': os.getenv('PG_DB_HOST', 'localhost'), 'port': os.getenv('PG_DB_PORT', '5432') } # 连接 self.sqlite_conn = None self.sqlite_cursor = None self.pg_conn = None self.pg_cursor = None # 表依赖关系 self.table_dependencies = defaultdict(list) self.reverse_dependencies = defaultdict(list) self.migration_order = [] # 需要跳过的系统表 self.skip_tables = [ 'auth_permission', 'django_content_type', 'django_migrations', 'auth_group_permissions', 'auth_user_groups', 'auth_user_user_permissions', 'django_session', 'django_admin_log' ] # 迁移状态 self.migrated_tables = set() self.failed_tables = set() # 存储表的主键信息 self.primary_keys = {} # 存储字段长度限制 self.column_lengths = {} # 日志文件 self.log_file = None self.log_file_path = f"migration_log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" def log(self, message): """记录日志到控制台和文件""" print(message) if self.log_file and not self.log_file.closed: self.log_file.write(message + "\n") self.log_file.flush() def connect(self): """连接到数据库""" try: # 打开日志文件 self.log_file = open(self.log_file_path, "w", encoding="utf-8") self.log(f"迁移日志文件: {self.log_file_path}") # SQLite 连接 self.sqlite_conn = sqlite3.connect(self.sqlite_db_path) self.sqlite_cursor = self.sqlite_conn.cursor() # PostgreSQL 连接 self.pg_conn = psycopg2.connect(**self.pg_config) self.pg_cursor = self.pg_conn.cursor() self.log(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})") return True except Exception as e: self.log(f"连接失败: {str(e)}") return False def close(self): """关闭数据库连接""" # 先记录关闭消息 if self.log_file and not self.log_file.closed: self.log_file.write(f"日志文件已保存: {self.log_file_path}\n") self.log_file.flush() # 关闭数据库连接 for cursor in [self.sqlite_cursor, self.pg_cursor]: if cursor: cursor.close() for conn in [self.sqlite_conn, self.pg_conn]: if conn: conn.close() # 最后关闭日志文件 if self.log_file and not self.log_file.closed: self.log_file.close() def clear_postgres_data(self): """清空 PostgreSQL 数据库中的所有数据""" self.log("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!") confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ") if confirm.lower() != 'y': self.log("取消清空操作") return False try: # 获取所有表名 self.pg_cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' """) tables = [row[0] for row in self.pg_cursor.fetchall()] # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role for table in tables: try: self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;") self.log(f"已清空表: {table}") except Exception as e: self.log(f"清空表 {table} 时出错: {str(e)}") continue self.pg_conn.commit() self.log("成功清空 PostgreSQL 数据库中的所有数据") return True except Exception as e: self.pg_conn.rollback() self.log(f"清空数据库时出错: {str(e)}") return False def analyze_dependencies(self): """分析表依赖关系""" # 获取 SQLite 中的所有表 self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = [row[0] for row in self.sqlite_cursor.fetchall()] # 排除系统表 excluded_tables = ['sqlite_sequence'] + self.skip_tables tables = [t for t in tables if t not in excluded_tables] # 分析外键依赖 for table in tables: self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})") for fk in self.sqlite_cursor.fetchall(): ref_table = fk[2] if ref_table in tables: self.table_dependencies[ref_table].append(table) self.reverse_dependencies[table].append(ref_table) # 创建迁移顺序 (拓扑排序) visited = set() order = [] def visit(table): if table in visited: return visited.add(table) for dependent in self.table_dependencies.get(table, []): visit(dependent) order.append(table) for table in tables: if table not in visited: visit(table) self.migration_order = order self.log(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定") def get_pg_table_structure(self, table_name): """获取 PostgreSQL 表结构""" try: self.pg_cursor.execute(""" SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_name = %s """, (table_name.lower(),)) structure = {} for row in self.pg_cursor.fetchall(): column_name, data_type, max_length = row structure[column_name] = { 'data_type': data_type, 'max_length': max_length } return structure except Exception: return {} def get_column_lengths(self, table_name): """获取 PostgreSQL 表的字段长度限制""" if table_name in self.column_lengths: return self.column_lengths[table_name] try: self.pg_cursor.execute(""" SELECT column_name, character_maximum_length FROM information_schema.columns WHERE table_name = %s """, (table_name.lower(),)) lengths = {} for row in self.pg_cursor.fetchall(): column_name, max_length = row if max_length: lengths[column_name] = max_length self.column_lengths[table_name] = lengths return lengths except Exception: return {} def transform_data(self, row, columns, pg_structure): """根据目标类型转换数据""" transformed = [] for i, value in enumerate(row): col_name = columns[i] col_info = pg_structure.get(col_name.lower(), {}) pg_type = col_info.get('data_type') max_length = col_info.get('max_length') # 处理布尔字段转换 if pg_type in ('boolean', 'bool'): # 将整数0/1转换为布尔值 if value in (0, '0', False): transformed.append(False) elif value in (1, '1', True): transformed.append(True) else: # 无法转换的值保持原样(可能会出错) transformed.append(value) # 处理字符串长度限制 elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str): if len(value) > max_length: # 截断超过长度的字符串 truncated = value[:max_length] self.log(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})") transformed.append(truncated) else: transformed.append(value) # 处理整数范围限制 elif pg_type in ('integer', 'int', 'int4'): # 检查值是否在PostgreSQL整数范围内 if isinstance(value, int) and (value < -2147483648 or value > 2147483647): # 尝试转换为bigint try: self.log(f" 警告: 字段 '{col_name}' 值超出整数范围 ({value}),尝试转换为bigint") transformed.append(value) except Exception: self.log(f" 错误: 无法转换字段 '{col_name}' 的值 ({value})") transformed.append(None) else: transformed.append(value) # 处理bigint范围限制 elif pg_type in ('bigint', 'int8'): # 检查值是否在bigint范围内 bigint_min = -9223372036854775808 bigint_max = 9223372036854775807 if isinstance(value, int) and (value < bigint_min or value > bigint_max): self.log(f" 错误: 字段 '{col_name}' 值超出bigint范围 ({value})") transformed.append(None) else: transformed.append(value) else: transformed.append(value) return transformed def get_primary_keys(self, table): """获取表的主键列""" if table in self.primary_keys: return self.primary_keys[table] # 查询SQLite表的主键 self.sqlite_cursor.execute(f"PRAGMA table_info({table})") columns_info = self.sqlite_cursor.fetchall() # 主键列是那些pk值不为0的列 primary_keys = [] for col_info in columns_info: # col_info格式: (cid, name, type, notnull, dflt_value, pk) if col_info[5] != 0: # pk列不为0表示是主键 primary_keys.append(col_info[1]) self.primary_keys[table] = primary_keys return primary_keys def check_dependencies_migrated(self, table): """检查依赖的表是否已经成功迁移""" dependencies = self.reverse_dependencies.get(table, []) if not dependencies: return True self.log(f" 检查 {table} 的依赖表迁移状态...") all_migrated = True for dep_table in dependencies: if dep_table in self.migrated_tables: self.log(f" ✓ {dep_table} 已迁移") elif dep_table in self.failed_tables: self.log(f" ✗ {dep_table} 迁移失败") all_migrated = False else: self.log(f" ? {dep_table} 尚未迁移") all_migrated = False return all_migrated def handle_integer_overflow(self, table, columns, pg_structure, row): """处理整数超出范围错误""" # 找出导致问题的字段 for i, value in enumerate(row): col_name = columns[i] col_info = pg_structure.get(col_name.lower(), {}) pg_type = col_info.get('data_type') if pg_type in ('integer', 'int', 'int4') and isinstance(value, int): if value < -2147483648 or value > 2147483647: self.log(f" 检测到字段 '{col_name}' 值超出范围 ({value})") # 尝试将字段类型改为bigint try: self.log(f" 尝试将字段 '{col_name}' 类型改为 bigint") alter_query = sql.SQL(""" ALTER TABLE {} ALTER COLUMN {} TYPE BIGINT; """).format( sql.Identifier(table.lower()), sql.Identifier(col_name) ) self.pg_cursor.execute(alter_query) self.pg_conn.commit() self.log(f" 成功将字段 '{col_name}' 类型改为 bigint") # 更新表结构信息 pg_structure = self.get_pg_table_structure(table) return True except Exception as e: self.log(f" 修改字段类型失败: {str(e)}") self.pg_conn.rollback() return False return False def migrate_table(self, table): """迁移单个表""" self.log(f"\n准备迁移表: {table}") # 检查是否需要跳过 if table.lower() in [t.lower() for t in self.skip_tables]: self.log(f" 跳过系统表: {table}") return True # 检查依赖表是否已迁移 if not self.check_dependencies_migrated(table): self.log(f" 依赖表未完全迁移,暂时跳过 {table}") return False # 获取 SQLite 表结构 self.sqlite_cursor.execute(f"PRAGMA table_info({table})") columns_info = self.sqlite_cursor.fetchall() columns = [col[1] for col in columns_info] # 获取主键 primary_keys = self.get_primary_keys(table) # 获取数据 self.sqlite_cursor.execute(f"SELECT * FROM {table}") rows = self.sqlite_cursor.fetchall() if not rows: self.log(f" 表 {table} 为空,跳过") self.migrated_tables.add(table) return True # 获取 PostgreSQL 表结构 pg_structure = self.get_pg_table_structure(table) # 获取字段长度限制 column_lengths = self.get_column_lengths(table) # 创建 PostgreSQL 插入语句 placeholders = ', '.join(['%s'] * len(columns)) column_names = ', '.join([f'"{col}"' for col in columns]) # 构建插入语句,根据主键是否存在添加ON CONFLICT子句 if primary_keys: # 使用主键处理冲突 conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys]) insert_query = sql.SQL(""" INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO NOTHING """).format( sql.Identifier(table.lower()), sql.SQL(column_names), sql.SQL(placeholders), sql.SQL(conflict_columns) ) else: # 没有主键,使用普通插入语句 insert_query = sql.SQL(""" INSERT INTO {} ({}) VALUES ({}) """).format( sql.Identifier(table.lower()), sql.SQL(column_names), sql.SQL(placeholders) ) # 用户确认 self.log(f" 表 {table} 有 {len(rows)} 行数据需要迁移") if primary_keys: self.log(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突") else: self.log(" 警告: 表没有主键,可能产生重复数据") # 显示字段长度限制 if column_lengths: self.log(" 字段长度限制:") for col, max_len in column_lengths.items(): self.log(f" {col}: {max_len} 字符") confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ") if confirm.lower() == 's': self.log(f" 已跳过表 {table}") return True # 迁移数据 try: success_count = 0 error_count = 0 for row in rows: try: # 转换数据类型 transformed_row = self.transform_data(row, columns, pg_structure) self.pg_cursor.execute(insert_query, transformed_row) success_count += 1 except psycopg2.IntegrityError as e: error_count += 1 if 'foreign key constraint' in str(e): self.log(f" 外键约束错误: {str(e)}") elif 'integer out of range' in str(e): # 处理整数超出范围错误 self.log(f" 整数超出范围错误: {str(e)}") # 尝试将字段类型改为bigint self.handle_integer_overflow(table, columns, pg_structure, row) elif 'duplicate key value violates unique constraint' in str(e): # 处理主键冲突错误 self.log(f" 主键冲突错误: {str(e)}") self.log(f" 行数据: {row}") self.log(f" 尝试跳过此行...") else: self.log(f" 完整性错误: {str(e)}") self.pg_conn.rollback() except Exception as e: error_count += 1 self.log(f" 行插入失败: {str(e)}") self.pg_conn.rollback() self.pg_conn.commit() if error_count == 0: self.log(f" 成功迁移 {success_count} 行数据") self.migrated_tables.add(table) return True else: self.log(f" 部分迁移: {success_count} 行成功, {error_count} 行失败") self.failed_tables.add(table) return False except Exception as e: self.pg_conn.rollback() self.log(f" 迁移表 {table} 时出错: {str(e)}") self.failed_tables.add(table) return False def migrate(self): """执行迁移""" if not self.connect(): return False try: # 询问是否清空 PostgreSQL 数据 self.log("\n是否在迁移前清空 PostgreSQL 数据库?") self.log("1. 清空所有数据 (确保无重复)") self.log("2. 不清空 (可能产生重复数据)") self.log("3. 退出") choice = input("请选择 (1/2/3): ") if choice == '3': self.log("退出迁移") return False if choice == '1': if not self.clear_postgres_data(): self.log("清空操作失败,退出迁移") return False self.analyze_dependencies() self.log(f"\n开始迁移 {len(self.migration_order)} 个表") # 按依赖顺序迁移表 remaining_tables = self.migration_order.copy() max_attempts = 10 # 最大尝试次数 attempt = 0 while remaining_tables and attempt < max_attempts: attempt += 1 self.log(f"\n第 {attempt} 轮迁移尝试") # 复制列表以便在迭代时修改 tables_to_migrate = remaining_tables.copy() remaining_tables = [] for table in tables_to_migrate: success = self.migrate_table(table) if not success: remaining_tables.append(table) if remaining_tables: self.log(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移") # 检查迁移结果 if remaining_tables: self.log(f"\n迁移完成,但以下表迁移失败:") for table in remaining_tables: self.log(f" - {table}") else: self.log("\n所有表迁移成功!") return True except Exception as e: self.log(f"迁移过程中发生错误: {str(e)}") return False finally: self.close() def mark_django_migrations_as_applied(self): """标记 Django 迁移为已应用状态""" try: self.log("\n检查 Django 迁移状态...") # 重新连接数据库 self.sqlite_conn = sqlite3.connect(self.sqlite_db_path) self.sqlite_cursor = self.sqlite_conn.cursor() self.pg_conn = psycopg2.connect(**self.pg_config) self.pg_cursor = self.pg_conn.cursor() # 获取所有迁移 self.sqlite_cursor.execute("SELECT app, name FROM django_migrations") migrations = self.sqlite_cursor.fetchall() if not migrations: self.log(" 未找到 Django 迁移记录") return True # 在 PostgreSQL 中标记迁移为已应用 for app, name in migrations: try: # 先检查迁移是否已存在 self.pg_cursor.execute(""" SELECT 1 FROM django_migrations WHERE app = %s AND name = %s """, (app, name)) exists = self.pg_cursor.fetchone() if exists: self.log(f" 迁移已存在: {app}.{name}") continue # 插入迁移记录 self.pg_cursor.execute(""" INSERT INTO django_migrations (app, name, applied) VALUES (%s, %s, NOW()) """, (app, name)) self.log(f" 标记迁移: {app}.{name}") except psycopg2.IntegrityError as e: if 'unique constraint' in str(e): self.log(f" 迁移已存在: {app}.{name}") else: self.log(f" 标记迁移失败: {app}.{name} - {str(e)}") except Exception as e: self.log(f" 标记迁移失败: {app}.{name} - {str(e)}") self.pg_conn.commit() self.log("成功标记 Django 迁移为已应用状态") return True except Exception as e: self.log(f"标记迁移状态时出错: {str(e)}") return False finally: # 关闭临时连接 if self.sqlite_cursor: self.sqlite_cursor.close() if self.sqlite_conn: self.sqlite_conn.close() if self.pg_cursor: self.pg_cursor.close() if self.pg_conn: self.pg_conn.close() def reset_sequences(self): """稳健重置 PostgreSQL 序列(每个序列单独处理,出现错误继续)""" try: self.log("\n重置 PostgreSQL 序列(稳健模式)...") # 重新连接数据库,确保干净游标 if self.pg_cursor: self.pg_cursor.close() if self.pg_conn: self.pg_conn.close() self.pg_conn = psycopg2.connect(**self.pg_config) self.pg_conn.autocommit = False self.pg_cursor = self.pg_conn.cursor() # 找出 public schema 中所有使用 sequence 的列 self.pg_cursor.execute(""" SELECT table_name, column_name FROM information_schema.columns WHERE column_default LIKE 'nextval(%' AND table_schema = 'public' """) rows = self.pg_cursor.fetchall() if not rows: self.log(" 未发现任何使用 sequence 的列(可能无需重置)") return True for table_name, column_name in rows: try: # 获取序列名(如果不是 serial 列,pg_get_serial_sequence 可能返回 NULL) self.pg_cursor.execute( "SELECT pg_get_serial_sequence(%s, %s)", (f"public.{table_name}", column_name) ) seq_row = self.pg_cursor.fetchone() sequence_name = seq_row[0] if seq_row else None if not sequence_name: self.log(f" 跳过: {table_name}.{column_name} 没有可识别的 sequence") continue # 构造并执行 setval,将 sequence 设置为表中 max(id) 或 1,然后提交 setval_sql = sql.SQL( "SELECT setval(%s, COALESCE((SELECT MAX({col}) FROM {tbl}), 1), true)" ).format( col=sql.Identifier(column_name), tbl=sql.Identifier('public', table_name) if False else sql.Identifier(table_name) ) # 直接用 sequence_name 作为第一个参数 self.pg_cursor.execute("SELECT setval(%s, COALESCE((SELECT MAX(%s) FROM %s), 1), true)", (sequence_name, sql.Identifier(column_name).string, sql.Identifier(table_name).string)) # 上面的字符串化参数不好控制,改为更可靠的方式: # 使用动态 SQL: setval_stmt = f"SELECT setval('{sequence_name}', COALESCE((SELECT MAX(\"{column_name}\") FROM \"{table_name}\"), 1), true);" self.pg_cursor.execute(setval_stmt) self.pg_conn.commit() self.log(f" 已重置序列: {sequence_name} (表 {table_name}.{column_name})") except Exception as e: # 单个序列失败时回滚该序列的事务并继续 try: self.pg_conn.rollback() except Exception: pass self.log(f" 重置序列失败: {table_name}.{column_name} - {str(e)}") continue self.log("成功尝试重置所有发现的序列") return True except Exception as e: self.log(f"重置序列时出错: {str(e)}") return False finally: if self.pg_cursor: self.pg_cursor.close() if self.pg_conn: self.pg_conn.close() if __name__ == "__main__": migrator = SQLiteToPostgresMigrator() if migrator.migrate(): # 标记 Django 迁移状态 migrator.mark_django_migrations_as_applied() # 重置序列 migrator.reset_sequences()