import sqlite3 import psycopg2 import os import sys from dotenv import load_dotenv from psycopg2 import sql from collections import defaultdict # 加载环境变量 load_dotenv() class SQLiteToPostgresMigrator: def __init__(self): # SQLite 配置 self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3') # PostgreSQL 配置 self.pg_config = { 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'), 'user': os.getenv('PG_DB_USER', 'wmsuser'), 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'), 'host': os.getenv('PG_DB_HOST', 'localhost'), 'port': os.getenv('PG_DB_PORT', '5432') } # 连接 self.sqlite_conn = None self.sqlite_cursor = None self.pg_conn = None self.pg_cursor = None # 表依赖关系 self.table_dependencies = defaultdict(list) self.reverse_dependencies = defaultdict(list) self.migration_order = [] # 需要跳过的系统表 self.skip_tables = [ 'auth_permission', 'django_content_type', 'django_migrations', 'auth_group_permissions', 'auth_user_groups', 'auth_user_user_permissions' ] # 迁移状态 self.migrated_tables = set() self.failed_tables = set() # 存储表的主键信息 self.primary_keys = {} # 存储字段长度限制 self.column_lengths = {} def connect(self): """连接到数据库""" try: # SQLite 连接 self.sqlite_conn = sqlite3.connect(self.sqlite_db_path) self.sqlite_cursor = self.sqlite_conn.cursor() # PostgreSQL 连接 self.pg_conn = psycopg2.connect(**self.pg_config) self.pg_cursor = self.pg_conn.cursor() print(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})") return True except Exception as e: print(f"连接失败: {str(e)}") return False def close(self): """关闭数据库连接""" for cursor in [self.sqlite_cursor, self.pg_cursor]: if cursor: cursor.close() for conn in [self.sqlite_conn, self.pg_conn]: if conn: conn.close() def clear_postgres_data(self): """清空 PostgreSQL 数据库中的所有数据""" print("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!") confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ") if confirm.lower() != 'y': print("取消清空操作") return False try: # 获取所有表名 self.pg_cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' """) tables = [row[0] for row in self.pg_cursor.fetchall()] # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role for table in tables: try: self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;") print(f"已清空表: {table}") except Exception as e: print(f"清空表 {table} 时出错: {str(e)}") continue self.pg_conn.commit() print("成功清空 PostgreSQL 数据库中的所有数据") return True except Exception as e: self.pg_conn.rollback() print(f"清空数据库时出错: {str(e)}") return False def analyze_dependencies(self): """分析表依赖关系""" # 获取 SQLite 中的所有表 self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = [row[0] for row in self.sqlite_cursor.fetchall()] # 排除系统表 excluded_tables = ['sqlite_sequence'] + self.skip_tables tables = [t for t in tables if t not in excluded_tables] # 分析外键依赖 for table in tables: self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})") for fk in self.sqlite_cursor.fetchall(): ref_table = fk[2] if ref_table in tables: self.table_dependencies[ref_table].append(table) self.reverse_dependencies[table].append(ref_table) # 创建迁移顺序 (拓扑排序) visited = set() order = [] def visit(table): if table in visited: return visited.add(table) for dependent in self.table_dependencies.get(table, []): visit(dependent) order.append(table) for table in tables: if table not in visited: visit(table) self.migration_order = order print(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定") def get_pg_table_structure(self, table_name): """获取 PostgreSQL 表结构""" try: self.pg_cursor.execute(""" SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_name = %s """, (table_name.lower(),)) structure = {} for row in self.pg_cursor.fetchall(): column_name, data_type, max_length = row structure[column_name] = { 'data_type': data_type, 'max_length': max_length } return structure except Exception: return {} def get_column_lengths(self, table_name): """获取 PostgreSQL 表的字段长度限制""" if table_name in self.column_lengths: return self.column_lengths[table_name] try: self.pg_cursor.execute(""" SELECT column_name, character_maximum_length FROM information_schema.columns WHERE table_name = %s """, (table_name.lower(),)) lengths = {} for row in self.pg_cursor.fetchall(): column_name, max_length = row if max_length: lengths[column_name] = max_length self.column_lengths[table_name] = lengths return lengths except Exception: return {} def transform_data(self, row, columns, pg_structure): """根据目标类型转换数据""" transformed = [] for i, value in enumerate(row): col_name = columns[i] col_info = pg_structure.get(col_name.lower(), {}) pg_type = col_info.get('data_type') max_length = col_info.get('max_length') # 处理布尔字段转换 if pg_type in ('boolean', 'bool'): # 将整数0/1转换为布尔值 if value in (0, '0', False): transformed.append(False) elif value in (1, '1', True): transformed.append(True) else: # 无法转换的值保持原样(可能会出错) transformed.append(value) # 处理字符串长度限制 elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str): if len(value) > max_length: # 截断超过长度的字符串 truncated = value[:max_length] print(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})") transformed.append(truncated) else: transformed.append(value) else: transformed.append(value) return transformed def get_primary_keys(self, table): """获取表的主键列""" if table in self.primary_keys: return self.primary_keys[table] # 查询SQLite表的主键 self.sqlite_cursor.execute(f"PRAGMA table_info({table})") columns_info = self.sqlite_cursor.fetchall() # 主键列是那些pk值不为0的列 primary_keys = [] for col_info in columns_info: # col_info格式: (cid, name, type, notnull, dflt_value, pk) if col_info[5] != 0: # pk列不为0表示是主键 primary_keys.append(col_info[1]) self.primary_keys[table] = primary_keys return primary_keys def check_dependencies_migrated(self, table): """检查依赖的表是否已经成功迁移""" dependencies = self.reverse_dependencies.get(table, []) if not dependencies: return True print(f" 检查 {table} 的依赖表迁移状态...") all_migrated = True for dep_table in dependencies: if dep_table in self.migrated_tables: print(f" ✓ {dep_table} 已迁移") elif dep_table in self.failed_tables: print(f" ✗ {dep_table} 迁移失败") all_migrated = False else: print(f" ? {dep_table} 尚未迁移") all_migrated = False return all_migrated def migrate_table(self, table): """迁移单个表""" print(f"\n准备迁移表: {table}") # 检查是否需要跳过 if table in self.skip_tables: print(f" 跳过系统表: {table}") return True # 检查依赖表是否已迁移 if not self.check_dependencies_migrated(table): print(f" 依赖表未完全迁移,暂时跳过 {table}") return False # 获取 SQLite 表结构 self.sqlite_cursor.execute(f"PRAGMA table_info({table})") columns_info = self.sqlite_cursor.fetchall() columns = [col[1] for col in columns_info] # 获取主键 primary_keys = self.get_primary_keys(table) # 获取数据 self.sqlite_cursor.execute(f"SELECT * FROM {table}") rows = self.sqlite_cursor.fetchall() if not rows: print(f" 表 {table} 为空,跳过") self.migrated_tables.add(table) return True # 获取 PostgreSQL 表结构 pg_structure = self.get_pg_table_structure(table) # 获取字段长度限制 column_lengths = self.get_column_lengths(table) # 创建 PostgreSQL 插入语句 placeholders = ', '.join(['%s'] * len(columns)) column_names = ', '.join([f'"{col}"' for col in columns]) # 构建插入语句,根据主键是否存在添加ON CONFLICT子句 if primary_keys: # 使用主键处理冲突 conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys]) insert_query = sql.SQL(""" INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO NOTHING """).format( sql.Identifier(table.lower()), sql.SQL(column_names), sql.SQL(placeholders), sql.SQL(conflict_columns) ) else: # 没有主键,使用普通插入语句 insert_query = sql.SQL(""" INSERT INTO {} ({}) VALUES ({}) """).format( sql.Identifier(table.lower()), sql.SQL(column_names), sql.SQL(placeholders) ) # 用户确认 print(f" 表 {table} 有 {len(rows)} 行数据需要迁移") if primary_keys: print(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突") else: print(" 警告: 表没有主键,可能产生重复数据") # 显示字段长度限制 if column_lengths: print(" 字段长度限制:") for col, max_len in column_lengths.items(): print(f" {col}: {max_len} 字符") confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ") if confirm.lower() == 's': print(f" 已跳过表 {table}") return True # 迁移数据 try: success_count = 0 error_count = 0 for row in rows: try: # 转换数据类型 transformed_row = self.transform_data(row, columns, pg_structure) self.pg_cursor.execute(insert_query, transformed_row) success_count += 1 except psycopg2.IntegrityError as e: error_count += 1 if 'foreign key constraint' in str(e): print(f" 外键约束错误: {str(e)}") else: print(f" 完整性错误: {str(e)}") self.pg_conn.rollback() except Exception as e: error_count += 1 print(f" 行插入失败: {str(e)}") self.pg_conn.rollback() self.pg_conn.commit() if error_count == 0: print(f" 成功迁移 {success_count} 行数据") self.migrated_tables.add(table) return True else: print(f" 部分迁移: {success_count} 行成功, {error_count} 行失败") self.failed_tables.add(table) return False except Exception as e: self.pg_conn.rollback() print(f" 迁移表 {table} 时出错: {str(e)}") self.failed_tables.add(table) return False def migrate(self): """执行迁移""" if not self.connect(): return False try: # 询问是否清空 PostgreSQL 数据 print("\n是否在迁移前清空 PostgreSQL 数据库?") print("1. 清空所有数据 (确保无重复)") print("2. 不清空 (可能产生重复数据)") print("3. 退出") choice = input("请选择 (1/2/3): ") if choice == '3': print("退出迁移") return False if choice == '1': if not self.clear_postgres_data(): print("清空操作失败,退出迁移") return False self.analyze_dependencies() print(f"\n开始迁移 {len(self.migration_order)} 个表") # 按依赖顺序迁移表 remaining_tables = self.migration_order.copy() max_attempts = 10 # 最大尝试次数 attempt = 0 while remaining_tables and attempt < max_attempts: attempt += 1 print(f"\n第 {attempt} 轮迁移尝试") # 复制列表以便在迭代时修改 tables_to_migrate = remaining_tables.copy() remaining_tables = [] for table in tables_to_migrate: success = self.migrate_table(table) if not success: remaining_tables.append(table) if remaining_tables: print(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移") # 检查迁移结果 if remaining_tables: print(f"\n迁移完成,但以下表迁移失败:") for table in remaining_tables: print(f" - {table}") else: print("\n所有表迁移成功!") return True except Exception as e: print(f"迁移过程中发生错误: {str(e)}") return False finally: self.close() if __name__ == "__main__": migrator = SQLiteToPostgresMigrator() migrator.migrate()