| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- import sqlite3
- import psycopg2
- import os
- import sys
- import datetime
- from dotenv import load_dotenv
- from psycopg2 import sql
- from collections import defaultdict
- # 加载环境变量
- load_dotenv()
- class SQLiteToPostgresMigrator:
- def __init__(self):
- # SQLite 配置
- self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3')
-
- # PostgreSQL 配置
- self.pg_config = {
- 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'),
- 'user': os.getenv('PG_DB_USER', 'wmsuser'),
- 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'),
- 'host': os.getenv('PG_DB_HOST', 'localhost'),
- 'port': os.getenv('PG_DB_PORT', '5432')
- }
-
- # 连接
- self.sqlite_conn = None
- self.sqlite_cursor = None
- self.pg_conn = None
- self.pg_cursor = None
-
- # 表依赖关系
- self.table_dependencies = defaultdict(list)
- self.reverse_dependencies = defaultdict(list)
- self.migration_order = []
-
- # 需要跳过的系统表
- self.skip_tables = [
- 'auth_permission',
- 'django_content_type',
- 'django_migrations',
- 'auth_group_permissions',
- 'auth_user_groups',
- 'auth_user_user_permissions'
- ]
-
- # 迁移状态
- self.migrated_tables = set()
- self.failed_tables = set()
-
- # 存储表的主键信息
- self.primary_keys = {}
-
- # 存储字段长度限制
- self.column_lengths = {}
-
- # 日志文件
- self.log_file = None
- self.log_file_path = f"migration_log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
- def log(self, message):
- """记录日志到控制台和文件"""
- print(message)
- if self.log_file:
- self.log_file.write(message + "\n")
- self.log_file.flush()
- def connect(self):
- """连接到数据库"""
- try:
- # 打开日志文件
- self.log_file = open(self.log_file_path, "w", encoding="utf-8")
- self.log(f"迁移日志文件: {self.log_file_path}")
-
- # SQLite 连接
- self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
- self.sqlite_cursor = self.sqlite_conn.cursor()
-
- # PostgreSQL 连接
- self.pg_conn = psycopg2.connect(**self.pg_config)
- self.pg_cursor = self.pg_conn.cursor()
- self.log(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})")
- return True
- except Exception as e:
- self.log(f"连接失败: {str(e)}")
- return False
- def close(self):
- """关闭数据库连接"""
- for cursor in [self.sqlite_cursor, self.pg_cursor]:
- if cursor: cursor.close()
- for conn in [self.sqlite_conn, self.pg_conn]:
- if conn: conn.close()
- if self.log_file:
- self.log_file.close()
- self.log(f"日志文件已保存: {self.log_file_path}")
- def clear_postgres_data(self):
- """清空 PostgreSQL 数据库中的所有数据"""
- self.log("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!")
- confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ")
-
- if confirm.lower() != 'y':
- self.log("取消清空操作")
- return False
-
- try:
- # 获取所有表名
- self.pg_cursor.execute("""
- SELECT table_name
- FROM information_schema.tables
- WHERE table_schema = 'public'
- """)
- tables = [row[0] for row in self.pg_cursor.fetchall()]
-
- # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role
- for table in tables:
- try:
- self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;")
- self.log(f"已清空表: {table}")
- except Exception as e:
- self.log(f"清空表 {table} 时出错: {str(e)}")
- continue
-
- self.pg_conn.commit()
- self.log("成功清空 PostgreSQL 数据库中的所有数据")
- return True
- except Exception as e:
- self.pg_conn.rollback()
- self.log(f"清空数据库时出错: {str(e)}")
- return False
- def analyze_dependencies(self):
- """分析表依赖关系"""
- # 获取 SQLite 中的所有表
- self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
- tables = [row[0] for row in self.sqlite_cursor.fetchall()]
-
- # 排除系统表
- excluded_tables = ['sqlite_sequence'] + self.skip_tables
- tables = [t for t in tables if t not in excluded_tables]
-
- # 分析外键依赖
- for table in tables:
- self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})")
- for fk in self.sqlite_cursor.fetchall():
- ref_table = fk[2]
- if ref_table in tables:
- self.table_dependencies[ref_table].append(table)
- self.reverse_dependencies[table].append(ref_table)
-
- # 创建迁移顺序 (拓扑排序)
- visited = set()
- order = []
-
- def visit(table):
- if table in visited:
- return
- visited.add(table)
- for dependent in self.table_dependencies.get(table, []):
- visit(dependent)
- order.append(table)
-
- for table in tables:
- if table not in visited:
- visit(table)
-
- self.migration_order = order
- self.log(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定")
- def get_pg_table_structure(self, table_name):
- """获取 PostgreSQL 表结构"""
- try:
- self.pg_cursor.execute("""
- SELECT column_name, data_type, character_maximum_length
- FROM information_schema.columns
- WHERE table_name = %s
- """, (table_name.lower(),))
-
- structure = {}
- for row in self.pg_cursor.fetchall():
- column_name, data_type, max_length = row
- structure[column_name] = {
- 'data_type': data_type,
- 'max_length': max_length
- }
- return structure
- except Exception:
- return {}
- def get_column_lengths(self, table_name):
- """获取 PostgreSQL 表的字段长度限制"""
- if table_name in self.column_lengths:
- return self.column_lengths[table_name]
-
- try:
- self.pg_cursor.execute("""
- SELECT column_name, character_maximum_length
- FROM information_schema.columns
- WHERE table_name = %s
- """, (table_name.lower(),))
-
- lengths = {}
- for row in self.pg_cursor.fetchall():
- column_name, max_length = row
- if max_length:
- lengths[column_name] = max_length
- self.column_lengths[table_name] = lengths
- return lengths
- except Exception:
- return {}
- def transform_data(self, row, columns, pg_structure):
- """根据目标类型转换数据"""
- transformed = []
- for i, value in enumerate(row):
- col_name = columns[i]
- col_info = pg_structure.get(col_name.lower(), {})
- pg_type = col_info.get('data_type')
- max_length = col_info.get('max_length')
-
- # 处理布尔字段转换
- if pg_type in ('boolean', 'bool'):
- # 将整数0/1转换为布尔值
- if value in (0, '0', False):
- transformed.append(False)
- elif value in (1, '1', True):
- transformed.append(True)
- else:
- # 无法转换的值保持原样(可能会出错)
- transformed.append(value)
- # 处理字符串长度限制
- elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str):
- if len(value) > max_length:
- # 截断超过长度的字符串
- truncated = value[:max_length]
- self.log(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})")
- transformed.append(truncated)
- else:
- transformed.append(value)
- # 处理整数范围限制
- elif pg_type in ('integer', 'int', 'int4'):
- # 检查值是否在PostgreSQL整数范围内
- if isinstance(value, int) and (value < -2147483648 or value > 2147483647):
- # 尝试转换为bigint
- try:
- self.log(f" 警告: 字段 '{col_name}' 值超出整数范围 ({value}),尝试转换为bigint")
- transformed.append(value)
- except Exception:
- self.log(f" 错误: 无法转换字段 '{col_name}' 的值 ({value})")
- transformed.append(None)
- else:
- transformed.append(value)
- # 处理bigint范围限制
- elif pg_type in ('bigint', 'int8'):
- # 检查值是否在bigint范围内
- bigint_min = -9223372036854775808
- bigint_max = 9223372036854775807
- if isinstance(value, int) and (value < bigint_min or value > bigint_max):
- self.log(f" 错误: 字段 '{col_name}' 值超出bigint范围 ({value})")
- transformed.append(None)
- else:
- transformed.append(value)
- else:
- transformed.append(value)
- return transformed
- def get_primary_keys(self, table):
- """获取表的主键列"""
- if table in self.primary_keys:
- return self.primary_keys[table]
-
- # 查询SQLite表的主键
- self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
- columns_info = self.sqlite_cursor.fetchall()
-
- # 主键列是那些pk值不为0的列
- primary_keys = []
- for col_info in columns_info:
- # col_info格式: (cid, name, type, notnull, dflt_value, pk)
- if col_info[5] != 0: # pk列不为0表示是主键
- primary_keys.append(col_info[1])
-
- self.primary_keys[table] = primary_keys
- return primary_keys
- def check_dependencies_migrated(self, table):
- """检查依赖的表是否已经成功迁移"""
- dependencies = self.reverse_dependencies.get(table, [])
-
- if not dependencies:
- return True
-
- self.log(f" 检查 {table} 的依赖表迁移状态...")
- all_migrated = True
-
- for dep_table in dependencies:
- if dep_table in self.migrated_tables:
- self.log(f" ✓ {dep_table} 已迁移")
- elif dep_table in self.failed_tables:
- self.log(f" ✗ {dep_table} 迁移失败")
- all_migrated = False
- else:
- self.log(f" ? {dep_table} 尚未迁移")
- all_migrated = False
-
- return all_migrated
- def handle_integer_overflow(self, table, columns, pg_structure, row):
- """处理整数超出范围错误"""
- # 找出导致问题的字段
- for i, value in enumerate(row):
- col_name = columns[i]
- col_info = pg_structure.get(col_name.lower(), {})
- pg_type = col_info.get('data_type')
-
- if pg_type in ('integer', 'int', 'int4') and isinstance(value, int):
- if value < -2147483648 or value > 2147483647:
- self.log(f" 检测到字段 '{col_name}' 值超出范围 ({value})")
-
- # 尝试将字段类型改为bigint
- try:
- self.log(f" 尝试将字段 '{col_name}' 类型改为 bigint")
- alter_query = sql.SQL("""
- ALTER TABLE {} ALTER COLUMN {} TYPE BIGINT;
- """).format(
- sql.Identifier(table.lower()),
- sql.Identifier(col_name)
- )
- self.pg_cursor.execute(alter_query)
- self.pg_conn.commit()
- self.log(f" 成功将字段 '{col_name}' 类型改为 bigint")
-
- # 更新表结构信息
- pg_structure = self.get_pg_table_structure(table)
- return True
- except Exception as e:
- self.log(f" 修改字段类型失败: {str(e)}")
- self.pg_conn.rollback()
- return False
- return False
- def migrate_table(self, table):
- """迁移单个表"""
- self.log(f"\n准备迁移表: {table}")
-
- # 检查是否需要跳过
- if table in self.skip_tables:
- self.log(f" 跳过系统表: {table}")
- return True
-
- # 检查依赖表是否已迁移
- if not self.check_dependencies_migrated(table):
- self.log(f" 依赖表未完全迁移,暂时跳过 {table}")
- return False
-
- # 获取 SQLite 表结构
- self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
- columns_info = self.sqlite_cursor.fetchall()
- columns = [col[1] for col in columns_info]
-
- # 获取主键
- primary_keys = self.get_primary_keys(table)
-
- # 获取数据
- self.sqlite_cursor.execute(f"SELECT * FROM {table}")
- rows = self.sqlite_cursor.fetchall()
-
- if not rows:
- self.log(f" 表 {table} 为空,跳过")
- self.migrated_tables.add(table)
- return True
-
- # 获取 PostgreSQL 表结构
- pg_structure = self.get_pg_table_structure(table)
-
- # 获取字段长度限制
- column_lengths = self.get_column_lengths(table)
-
- # 创建 PostgreSQL 插入语句
- placeholders = ', '.join(['%s'] * len(columns))
- column_names = ', '.join([f'"{col}"' for col in columns])
-
- # 构建插入语句,根据主键是否存在添加ON CONFLICT子句
- if primary_keys:
- # 使用主键处理冲突
- conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys])
- insert_query = sql.SQL("""
- INSERT INTO {} ({})
- VALUES ({})
- ON CONFLICT ({}) DO NOTHING
- """).format(
- sql.Identifier(table.lower()),
- sql.SQL(column_names),
- sql.SQL(placeholders),
- sql.SQL(conflict_columns)
- )
- else:
- # 没有主键,使用普通插入语句
- insert_query = sql.SQL("""
- INSERT INTO {} ({})
- VALUES ({})
- """).format(
- sql.Identifier(table.lower()),
- sql.SQL(column_names),
- sql.SQL(placeholders)
- )
-
- # 用户确认
- self.log(f" 表 {table} 有 {len(rows)} 行数据需要迁移")
- if primary_keys:
- self.log(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突")
- else:
- self.log(" 警告: 表没有主键,可能产生重复数据")
-
- # 显示字段长度限制
- if column_lengths:
- self.log(" 字段长度限制:")
- for col, max_len in column_lengths.items():
- self.log(f" {col}: {max_len} 字符")
-
- confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ")
-
- if confirm.lower() == 's':
- self.log(f" 已跳过表 {table}")
- return True
-
- # 迁移数据
- try:
- success_count = 0
- error_count = 0
- for row in rows:
- try:
- # 转换数据类型
- transformed_row = self.transform_data(row, columns, pg_structure)
- self.pg_cursor.execute(insert_query, transformed_row)
- success_count += 1
- except psycopg2.IntegrityError as e:
- error_count += 1
- if 'foreign key constraint' in str(e):
- self.log(f" 外键约束错误: {str(e)}")
- elif 'integer out of range' in str(e):
- # 处理整数超出范围错误
- self.log(f" 整数超出范围错误: {str(e)}")
- # 尝试将字段类型改为bigint
- self.handle_integer_overflow(table, columns, pg_structure, row)
- else:
- self.log(f" 完整性错误: {str(e)}")
- self.pg_conn.rollback()
- except Exception as e:
- error_count += 1
- self.log(f" 行插入失败: {str(e)}")
- self.pg_conn.rollback()
-
- self.pg_conn.commit()
-
- if error_count == 0:
- self.log(f" 成功迁移 {success_count} 行数据")
- self.migrated_tables.add(table)
- return True
- else:
- self.log(f" 部分迁移: {success_count} 行成功, {error_count} 行失败")
- self.failed_tables.add(table)
- return False
- except Exception as e:
- self.pg_conn.rollback()
- self.log(f" 迁移表 {table} 时出错: {str(e)}")
- self.failed_tables.add(table)
- return False
- def migrate(self):
- """执行迁移"""
- if not self.connect():
- return False
-
- try:
- # 询问是否清空 PostgreSQL 数据
- self.log("\n是否在迁移前清空 PostgreSQL 数据库?")
- self.log("1. 清空所有数据 (确保无重复)")
- self.log("2. 不清空 (可能产生重复数据)")
- self.log("3. 退出")
-
- choice = input("请选择 (1/2/3): ")
-
- if choice == '3':
- self.log("退出迁移")
- return False
-
- if choice == '1':
- if not self.clear_postgres_data():
- self.log("清空操作失败,退出迁移")
- return False
-
- self.analyze_dependencies()
- self.log(f"\n开始迁移 {len(self.migration_order)} 个表")
-
- # 按依赖顺序迁移表
- remaining_tables = self.migration_order.copy()
- max_attempts = 10 # 最大尝试次数
- attempt = 0
-
- while remaining_tables and attempt < max_attempts:
- attempt += 1
- self.log(f"\n第 {attempt} 轮迁移尝试")
-
- # 复制列表以便在迭代时修改
- tables_to_migrate = remaining_tables.copy()
- remaining_tables = []
-
- for table in tables_to_migrate:
- success = self.migrate_table(table)
- if not success:
- remaining_tables.append(table)
-
- if remaining_tables:
- self.log(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移")
-
- # 检查迁移结果
- if remaining_tables:
- self.log(f"\n迁移完成,但以下表迁移失败:")
- for table in remaining_tables:
- self.log(f" - {table}")
- else:
- self.log("\n所有表迁移成功!")
-
- return True
- except Exception as e:
- self.log(f"迁移过程中发生错误: {str(e)}")
- return False
- finally:
- self.close()
- if __name__ == "__main__":
- migrator = SQLiteToPostgresMigrator()
- migrator.migrate()
|