|
|
@@ -0,0 +1,455 @@
|
|
|
+import sqlite3
|
|
|
+import psycopg2
|
|
|
+import os
|
|
|
+import sys
|
|
|
+from dotenv import load_dotenv
|
|
|
+from psycopg2 import sql
|
|
|
+from collections import defaultdict
|
|
|
+
|
|
|
+# 加载环境变量
|
|
|
+load_dotenv()
|
|
|
+
|
|
|
+class SQLiteToPostgresMigrator:
|
|
|
+ def __init__(self):
|
|
|
+ # SQLite 配置
|
|
|
+ self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3')
|
|
|
+
|
|
|
+ # PostgreSQL 配置
|
|
|
+ self.pg_config = {
|
|
|
+ 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'),
|
|
|
+ 'user': os.getenv('PG_DB_USER', 'wmsuser'),
|
|
|
+ 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'),
|
|
|
+ 'host': os.getenv('PG_DB_HOST', 'localhost'),
|
|
|
+ 'port': os.getenv('PG_DB_PORT', '5432')
|
|
|
+ }
|
|
|
+
|
|
|
+ # 连接
|
|
|
+ self.sqlite_conn = None
|
|
|
+ self.sqlite_cursor = None
|
|
|
+ self.pg_conn = None
|
|
|
+ self.pg_cursor = None
|
|
|
+
|
|
|
+ # 表依赖关系
|
|
|
+ self.table_dependencies = defaultdict(list)
|
|
|
+ self.reverse_dependencies = defaultdict(list)
|
|
|
+ self.migration_order = []
|
|
|
+
|
|
|
+ # 需要跳过的系统表
|
|
|
+ self.skip_tables = [
|
|
|
+ 'auth_permission',
|
|
|
+ 'django_content_type',
|
|
|
+ 'django_migrations',
|
|
|
+ 'auth_group_permissions',
|
|
|
+ 'auth_user_groups',
|
|
|
+ 'auth_user_user_permissions'
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 迁移状态
|
|
|
+ self.migrated_tables = set()
|
|
|
+ self.failed_tables = set()
|
|
|
+
|
|
|
+ # 存储表的主键信息
|
|
|
+ self.primary_keys = {}
|
|
|
+
|
|
|
+ # 存储字段长度限制
|
|
|
+ self.column_lengths = {}
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ """连接到数据库"""
|
|
|
+ try:
|
|
|
+ # SQLite 连接
|
|
|
+ self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
|
|
|
+ self.sqlite_cursor = self.sqlite_conn.cursor()
|
|
|
+
|
|
|
+ # PostgreSQL 连接
|
|
|
+ self.pg_conn = psycopg2.connect(**self.pg_config)
|
|
|
+ self.pg_cursor = self.pg_conn.cursor()
|
|
|
+ print(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ print(f"连接失败: {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def close(self):
|
|
|
+ """关闭数据库连接"""
|
|
|
+ for cursor in [self.sqlite_cursor, self.pg_cursor]:
|
|
|
+ if cursor: cursor.close()
|
|
|
+ for conn in [self.sqlite_conn, self.pg_conn]:
|
|
|
+ if conn: conn.close()
|
|
|
+
|
|
|
+ def clear_postgres_data(self):
|
|
|
+ """清空 PostgreSQL 数据库中的所有数据"""
|
|
|
+ print("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!")
|
|
|
+ confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ")
|
|
|
+
|
|
|
+ if confirm.lower() != 'y':
|
|
|
+ print("取消清空操作")
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 获取所有表名
|
|
|
+ self.pg_cursor.execute("""
|
|
|
+ SELECT table_name
|
|
|
+ FROM information_schema.tables
|
|
|
+ WHERE table_schema = 'public'
|
|
|
+ """)
|
|
|
+ tables = [row[0] for row in self.pg_cursor.fetchall()]
|
|
|
+
|
|
|
+ # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role
|
|
|
+ for table in tables:
|
|
|
+ try:
|
|
|
+ self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;")
|
|
|
+ print(f"已清空表: {table}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"清空表 {table} 时出错: {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ self.pg_conn.commit()
|
|
|
+ print("成功清空 PostgreSQL 数据库中的所有数据")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ self.pg_conn.rollback()
|
|
|
+ print(f"清空数据库时出错: {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def analyze_dependencies(self):
|
|
|
+ """分析表依赖关系"""
|
|
|
+ # 获取 SQLite 中的所有表
|
|
|
+ self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
|
|
+ tables = [row[0] for row in self.sqlite_cursor.fetchall()]
|
|
|
+
|
|
|
+ # 排除系统表
|
|
|
+ excluded_tables = ['sqlite_sequence'] + self.skip_tables
|
|
|
+ tables = [t for t in tables if t not in excluded_tables]
|
|
|
+
|
|
|
+ # 分析外键依赖
|
|
|
+ for table in tables:
|
|
|
+ self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})")
|
|
|
+ for fk in self.sqlite_cursor.fetchall():
|
|
|
+ ref_table = fk[2]
|
|
|
+ if ref_table in tables:
|
|
|
+ self.table_dependencies[ref_table].append(table)
|
|
|
+ self.reverse_dependencies[table].append(ref_table)
|
|
|
+
|
|
|
+ # 创建迁移顺序 (拓扑排序)
|
|
|
+ visited = set()
|
|
|
+ order = []
|
|
|
+
|
|
|
+ def visit(table):
|
|
|
+ if table in visited:
|
|
|
+ return
|
|
|
+ visited.add(table)
|
|
|
+ for dependent in self.table_dependencies.get(table, []):
|
|
|
+ visit(dependent)
|
|
|
+ order.append(table)
|
|
|
+
|
|
|
+ for table in tables:
|
|
|
+ if table not in visited:
|
|
|
+ visit(table)
|
|
|
+
|
|
|
+ self.migration_order = order
|
|
|
+ print(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定")
|
|
|
+
|
|
|
+ def get_pg_table_structure(self, table_name):
|
|
|
+ """获取 PostgreSQL 表结构"""
|
|
|
+ try:
|
|
|
+ self.pg_cursor.execute("""
|
|
|
+ SELECT column_name, data_type, character_maximum_length
|
|
|
+ FROM information_schema.columns
|
|
|
+ WHERE table_name = %s
|
|
|
+ """, (table_name.lower(),))
|
|
|
+
|
|
|
+ structure = {}
|
|
|
+ for row in self.pg_cursor.fetchall():
|
|
|
+ column_name, data_type, max_length = row
|
|
|
+ structure[column_name] = {
|
|
|
+ 'data_type': data_type,
|
|
|
+ 'max_length': max_length
|
|
|
+ }
|
|
|
+ return structure
|
|
|
+ except Exception:
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def get_column_lengths(self, table_name):
|
|
|
+ """获取 PostgreSQL 表的字段长度限制"""
|
|
|
+ if table_name in self.column_lengths:
|
|
|
+ return self.column_lengths[table_name]
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.pg_cursor.execute("""
|
|
|
+ SELECT column_name, character_maximum_length
|
|
|
+ FROM information_schema.columns
|
|
|
+ WHERE table_name = %s
|
|
|
+ """, (table_name.lower(),))
|
|
|
+
|
|
|
+ lengths = {}
|
|
|
+ for row in self.pg_cursor.fetchall():
|
|
|
+ column_name, max_length = row
|
|
|
+ if max_length:
|
|
|
+ lengths[column_name] = max_length
|
|
|
+ self.column_lengths[table_name] = lengths
|
|
|
+ return lengths
|
|
|
+ except Exception:
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def transform_data(self, row, columns, pg_structure):
|
|
|
+ """根据目标类型转换数据"""
|
|
|
+ transformed = []
|
|
|
+ for i, value in enumerate(row):
|
|
|
+ col_name = columns[i]
|
|
|
+ col_info = pg_structure.get(col_name.lower(), {})
|
|
|
+ pg_type = col_info.get('data_type')
|
|
|
+ max_length = col_info.get('max_length')
|
|
|
+
|
|
|
+ # 处理布尔字段转换
|
|
|
+ if pg_type in ('boolean', 'bool'):
|
|
|
+ # 将整数0/1转换为布尔值
|
|
|
+ if value in (0, '0', False):
|
|
|
+ transformed.append(False)
|
|
|
+ elif value in (1, '1', True):
|
|
|
+ transformed.append(True)
|
|
|
+ else:
|
|
|
+ # 无法转换的值保持原样(可能会出错)
|
|
|
+ transformed.append(value)
|
|
|
+ # 处理字符串长度限制
|
|
|
+ elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str):
|
|
|
+ if len(value) > max_length:
|
|
|
+ # 截断超过长度的字符串
|
|
|
+ truncated = value[:max_length]
|
|
|
+ print(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})")
|
|
|
+ transformed.append(truncated)
|
|
|
+ else:
|
|
|
+ transformed.append(value)
|
|
|
+ else:
|
|
|
+ transformed.append(value)
|
|
|
+ return transformed
|
|
|
+
|
|
|
+ def get_primary_keys(self, table):
|
|
|
+ """获取表的主键列"""
|
|
|
+ if table in self.primary_keys:
|
|
|
+ return self.primary_keys[table]
|
|
|
+
|
|
|
+ # 查询SQLite表的主键
|
|
|
+ self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
|
|
|
+ columns_info = self.sqlite_cursor.fetchall()
|
|
|
+
|
|
|
+ # 主键列是那些pk值不为0的列
|
|
|
+ primary_keys = []
|
|
|
+ for col_info in columns_info:
|
|
|
+ # col_info格式: (cid, name, type, notnull, dflt_value, pk)
|
|
|
+ if col_info[5] != 0: # pk列不为0表示是主键
|
|
|
+ primary_keys.append(col_info[1])
|
|
|
+
|
|
|
+ self.primary_keys[table] = primary_keys
|
|
|
+ return primary_keys
|
|
|
+
|
|
|
+ def check_dependencies_migrated(self, table):
|
|
|
+ """检查依赖的表是否已经成功迁移"""
|
|
|
+ dependencies = self.reverse_dependencies.get(table, [])
|
|
|
+
|
|
|
+ if not dependencies:
|
|
|
+ return True
|
|
|
+
|
|
|
+ print(f" 检查 {table} 的依赖表迁移状态...")
|
|
|
+ all_migrated = True
|
|
|
+
|
|
|
+ for dep_table in dependencies:
|
|
|
+ if dep_table in self.migrated_tables:
|
|
|
+ print(f" ✓ {dep_table} 已迁移")
|
|
|
+ elif dep_table in self.failed_tables:
|
|
|
+ print(f" ✗ {dep_table} 迁移失败")
|
|
|
+ all_migrated = False
|
|
|
+ else:
|
|
|
+ print(f" ? {dep_table} 尚未迁移")
|
|
|
+ all_migrated = False
|
|
|
+
|
|
|
+ return all_migrated
|
|
|
+
|
|
|
+ def migrate_table(self, table):
|
|
|
+ """迁移单个表"""
|
|
|
+ print(f"\n准备迁移表: {table}")
|
|
|
+
|
|
|
+ # 检查是否需要跳过
|
|
|
+ if table in self.skip_tables:
|
|
|
+ print(f" 跳过系统表: {table}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 检查依赖表是否已迁移
|
|
|
+ if not self.check_dependencies_migrated(table):
|
|
|
+ print(f" 依赖表未完全迁移,暂时跳过 {table}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 获取 SQLite 表结构
|
|
|
+ self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
|
|
|
+ columns_info = self.sqlite_cursor.fetchall()
|
|
|
+ columns = [col[1] for col in columns_info]
|
|
|
+
|
|
|
+ # 获取主键
|
|
|
+ primary_keys = self.get_primary_keys(table)
|
|
|
+
|
|
|
+ # 获取数据
|
|
|
+ self.sqlite_cursor.execute(f"SELECT * FROM {table}")
|
|
|
+ rows = self.sqlite_cursor.fetchall()
|
|
|
+
|
|
|
+ if not rows:
|
|
|
+ print(f" 表 {table} 为空,跳过")
|
|
|
+ self.migrated_tables.add(table)
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 获取 PostgreSQL 表结构
|
|
|
+ pg_structure = self.get_pg_table_structure(table)
|
|
|
+
|
|
|
+ # 获取字段长度限制
|
|
|
+ column_lengths = self.get_column_lengths(table)
|
|
|
+
|
|
|
+ # 创建 PostgreSQL 插入语句
|
|
|
+ placeholders = ', '.join(['%s'] * len(columns))
|
|
|
+ column_names = ', '.join([f'"{col}"' for col in columns])
|
|
|
+
|
|
|
+ # 构建插入语句,根据主键是否存在添加ON CONFLICT子句
|
|
|
+ if primary_keys:
|
|
|
+ # 使用主键处理冲突
|
|
|
+ conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys])
|
|
|
+ insert_query = sql.SQL("""
|
|
|
+ INSERT INTO {} ({})
|
|
|
+ VALUES ({})
|
|
|
+ ON CONFLICT ({}) DO NOTHING
|
|
|
+ """).format(
|
|
|
+ sql.Identifier(table.lower()),
|
|
|
+ sql.SQL(column_names),
|
|
|
+ sql.SQL(placeholders),
|
|
|
+ sql.SQL(conflict_columns)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 没有主键,使用普通插入语句
|
|
|
+ insert_query = sql.SQL("""
|
|
|
+ INSERT INTO {} ({})
|
|
|
+ VALUES ({})
|
|
|
+ """).format(
|
|
|
+ sql.Identifier(table.lower()),
|
|
|
+ sql.SQL(column_names),
|
|
|
+ sql.SQL(placeholders)
|
|
|
+ )
|
|
|
+
|
|
|
+ # 用户确认
|
|
|
+ print(f" 表 {table} 有 {len(rows)} 行数据需要迁移")
|
|
|
+ if primary_keys:
|
|
|
+ print(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突")
|
|
|
+ else:
|
|
|
+ print(" 警告: 表没有主键,可能产生重复数据")
|
|
|
+
|
|
|
+ # 显示字段长度限制
|
|
|
+ if column_lengths:
|
|
|
+ print(" 字段长度限制:")
|
|
|
+ for col, max_len in column_lengths.items():
|
|
|
+ print(f" {col}: {max_len} 字符")
|
|
|
+
|
|
|
+ confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ")
|
|
|
+
|
|
|
+ if confirm.lower() == 's':
|
|
|
+ print(f" 已跳过表 {table}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 迁移数据
|
|
|
+ try:
|
|
|
+ success_count = 0
|
|
|
+ error_count = 0
|
|
|
+ for row in rows:
|
|
|
+ try:
|
|
|
+ # 转换数据类型
|
|
|
+ transformed_row = self.transform_data(row, columns, pg_structure)
|
|
|
+ self.pg_cursor.execute(insert_query, transformed_row)
|
|
|
+ success_count += 1
|
|
|
+ except psycopg2.IntegrityError as e:
|
|
|
+ error_count += 1
|
|
|
+ if 'foreign key constraint' in str(e):
|
|
|
+ print(f" 外键约束错误: {str(e)}")
|
|
|
+ else:
|
|
|
+ print(f" 完整性错误: {str(e)}")
|
|
|
+ self.pg_conn.rollback()
|
|
|
+ except Exception as e:
|
|
|
+ error_count += 1
|
|
|
+ print(f" 行插入失败: {str(e)}")
|
|
|
+ self.pg_conn.rollback()
|
|
|
+
|
|
|
+ self.pg_conn.commit()
|
|
|
+
|
|
|
+ if error_count == 0:
|
|
|
+ print(f" 成功迁移 {success_count} 行数据")
|
|
|
+ self.migrated_tables.add(table)
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print(f" 部分迁移: {success_count} 行成功, {error_count} 行失败")
|
|
|
+ self.failed_tables.add(table)
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ self.pg_conn.rollback()
|
|
|
+ print(f" 迁移表 {table} 时出错: {str(e)}")
|
|
|
+ self.failed_tables.add(table)
|
|
|
+ return False
|
|
|
+
|
|
|
+ def migrate(self):
|
|
|
+ """执行迁移"""
|
|
|
+ if not self.connect():
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 询问是否清空 PostgreSQL 数据
|
|
|
+ print("\n是否在迁移前清空 PostgreSQL 数据库?")
|
|
|
+ print("1. 清空所有数据 (确保无重复)")
|
|
|
+ print("2. 不清空 (可能产生重复数据)")
|
|
|
+ print("3. 退出")
|
|
|
+
|
|
|
+ choice = input("请选择 (1/2/3): ")
|
|
|
+
|
|
|
+ if choice == '3':
|
|
|
+ print("退出迁移")
|
|
|
+ return False
|
|
|
+
|
|
|
+ if choice == '1':
|
|
|
+ if not self.clear_postgres_data():
|
|
|
+ print("清空操作失败,退出迁移")
|
|
|
+ return False
|
|
|
+
|
|
|
+ self.analyze_dependencies()
|
|
|
+ print(f"\n开始迁移 {len(self.migration_order)} 个表")
|
|
|
+
|
|
|
+ # 按依赖顺序迁移表
|
|
|
+ remaining_tables = self.migration_order.copy()
|
|
|
+ max_attempts = 10 # 最大尝试次数
|
|
|
+ attempt = 0
|
|
|
+
|
|
|
+ while remaining_tables and attempt < max_attempts:
|
|
|
+ attempt += 1
|
|
|
+ print(f"\n第 {attempt} 轮迁移尝试")
|
|
|
+
|
|
|
+ # 复制列表以便在迭代时修改
|
|
|
+ tables_to_migrate = remaining_tables.copy()
|
|
|
+ remaining_tables = []
|
|
|
+
|
|
|
+ for table in tables_to_migrate:
|
|
|
+ success = self.migrate_table(table)
|
|
|
+ if not success:
|
|
|
+ remaining_tables.append(table)
|
|
|
+
|
|
|
+ if remaining_tables:
|
|
|
+ print(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移")
|
|
|
+
|
|
|
+ # 检查迁移结果
|
|
|
+ if remaining_tables:
|
|
|
+ print(f"\n迁移完成,但以下表迁移失败:")
|
|
|
+ for table in remaining_tables:
|
|
|
+ print(f" - {table}")
|
|
|
+ else:
|
|
|
+ print("\n所有表迁移成功!")
|
|
|
+
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ print(f"迁移过程中发生错误: {str(e)}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ self.close()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ migrator = SQLiteToPostgresMigrator()
|
|
|
+ migrator.migrate()
|