| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692 | import sqlite3import psycopg2import osimport sysimport datetimefrom dotenv import load_dotenvfrom psycopg2 import sqlfrom collections import defaultdict# 加载环境变量load_dotenv()class SQLiteToPostgresMigrator:    def __init__(self):        # SQLite 配置        self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3')                # PostgreSQL 配置        self.pg_config = {            'dbname': os.getenv('PG_DB_NAME', 'wmsdb'),            'user': os.getenv('PG_DB_USER', 'wmsuser'),            'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'),            'host': os.getenv('PG_DB_HOST', 'localhost'),            'port': os.getenv('PG_DB_PORT', '5432')        }                # 连接        self.sqlite_conn = None        self.sqlite_cursor = None        self.pg_conn = None        self.pg_cursor = None                # 表依赖关系        self.table_dependencies = defaultdict(list)        self.reverse_dependencies = defaultdict(list)        self.migration_order = []                # 需要跳过的系统表        self.skip_tables = [            'auth_permission',            'django_content_type',            'django_migrations',            'auth_group_permissions',            'auth_user_groups',            'auth_user_user_permissions',            'django_session',            'django_admin_log'        ]                # 迁移状态        self.migrated_tables = set()        self.failed_tables = set()                # 存储表的主键信息        self.primary_keys = {}                # 存储字段长度限制        self.column_lengths = {}                # 日志文件        self.log_file = None        self.log_file_path = f"migration_log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"    def log(self, message):        """记录日志到控制台和文件"""        print(message)        if self.log_file and not self.log_file.closed:            self.log_file.write(message + "\n")            self.log_file.flush()    def connect(self):        """连接到数据库"""        try:            # 打开日志文件            self.log_file = open(self.log_file_path, "w", encoding="utf-8")            self.log(f"迁移日志文件: {self.log_file_path}")                        # SQLite 连接            self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)            self.sqlite_cursor = self.sqlite_conn.cursor()                        # PostgreSQL 连接            self.pg_conn = psycopg2.connect(**self.pg_config)            self.pg_cursor = self.pg_conn.cursor()            self.log(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})")            return True        except Exception as e:            self.log(f"连接失败: {str(e)}")            return False    def close(self):        """关闭数据库连接"""        # 先记录关闭消息        if self.log_file and not self.log_file.closed:            self.log_file.write(f"日志文件已保存: {self.log_file_path}\n")            self.log_file.flush()                # 关闭数据库连接        for cursor in [self.sqlite_cursor, self.pg_cursor]:            if cursor: cursor.close()        for conn in [self.sqlite_conn, self.pg_conn]:            if conn: conn.close()                # 最后关闭日志文件        if self.log_file and not self.log_file.closed:            self.log_file.close()    def clear_postgres_data(self):        """清空 PostgreSQL 数据库中的所有数据"""        self.log("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!")        confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ")                if confirm.lower() != 'y':            self.log("取消清空操作")            return False                try:            # 获取所有表名            self.pg_cursor.execute("""                SELECT table_name                 FROM information_schema.tables                 WHERE table_schema = 'public'            """)            tables = [row[0] for row in self.pg_cursor.fetchall()]                        # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role            for table in tables:                try:                    self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;")                    self.log(f"已清空表: {table}")                except Exception as e:                    self.log(f"清空表 {table} 时出错: {str(e)}")                    continue                        self.pg_conn.commit()            self.log("成功清空 PostgreSQL 数据库中的所有数据")            return True        except Exception as e:            self.pg_conn.rollback()            self.log(f"清空数据库时出错: {str(e)}")            return False    def analyze_dependencies(self):        """分析表依赖关系"""        # 获取 SQLite 中的所有表        self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")        tables = [row[0] for row in self.sqlite_cursor.fetchall()]                # 排除系统表        excluded_tables = ['sqlite_sequence'] + self.skip_tables        tables = [t for t in tables if t not in excluded_tables]                # 分析外键依赖        for table in tables:            self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})")            for fk in self.sqlite_cursor.fetchall():                ref_table = fk[2]                if ref_table in tables:                    self.table_dependencies[ref_table].append(table)                    self.reverse_dependencies[table].append(ref_table)                # 创建迁移顺序 (拓扑排序)        visited = set()        order = []                def visit(table):            if table in visited:                return            visited.add(table)            for dependent in self.table_dependencies.get(table, []):                visit(dependent)            order.append(table)                for table in tables:            if table not in visited:                visit(table)                self.migration_order = order        self.log(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定")    def get_pg_table_structure(self, table_name):        """获取 PostgreSQL 表结构"""        try:            self.pg_cursor.execute("""                SELECT column_name, data_type, character_maximum_length                 FROM information_schema.columns                 WHERE table_name = %s            """, (table_name.lower(),))                        structure = {}            for row in self.pg_cursor.fetchall():                column_name, data_type, max_length = row                structure[column_name] = {                    'data_type': data_type,                    'max_length': max_length                }            return structure        except Exception:            return {}    def get_column_lengths(self, table_name):        """获取 PostgreSQL 表的字段长度限制"""        if table_name in self.column_lengths:            return self.column_lengths[table_name]                try:            self.pg_cursor.execute("""                SELECT column_name, character_maximum_length                 FROM information_schema.columns                 WHERE table_name = %s            """, (table_name.lower(),))                        lengths = {}            for row in self.pg_cursor.fetchall():                column_name, max_length = row                if max_length:                    lengths[column_name] = max_length            self.column_lengths[table_name] = lengths            return lengths        except Exception:            return {}    def transform_data(self, row, columns, pg_structure):        """根据目标类型转换数据"""        transformed = []        for i, value in enumerate(row):            col_name = columns[i]            col_info = pg_structure.get(col_name.lower(), {})            pg_type = col_info.get('data_type')            max_length = col_info.get('max_length')                        # 处理布尔字段转换            if pg_type in ('boolean', 'bool'):                # 将整数0/1转换为布尔值                if value in (0, '0', False):                    transformed.append(False)                elif value in (1, '1', True):                    transformed.append(True)                else:                    # 无法转换的值保持原样(可能会出错)                    transformed.append(value)            # 处理字符串长度限制            elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str):                if len(value) > max_length:                    # 截断超过长度的字符串                    truncated = value[:max_length]                    self.log(f"  警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})")                    transformed.append(truncated)                else:                    transformed.append(value)            # 处理整数范围限制            elif pg_type in ('integer', 'int', 'int4'):                # 检查值是否在PostgreSQL整数范围内                if isinstance(value, int) and (value < -2147483648 or value > 2147483647):                    # 尝试转换为bigint                    try:                        self.log(f"  警告: 字段 '{col_name}' 值超出整数范围 ({value}),尝试转换为bigint")                        transformed.append(value)                    except Exception:                        self.log(f"  错误: 无法转换字段 '{col_name}' 的值 ({value})")                        transformed.append(None)                else:                    transformed.append(value)            # 处理bigint范围限制            elif pg_type in ('bigint', 'int8'):                # 检查值是否在bigint范围内                bigint_min = -9223372036854775808                bigint_max = 9223372036854775807                if isinstance(value, int) and (value < bigint_min or value > bigint_max):                    self.log(f"  错误: 字段 '{col_name}' 值超出bigint范围 ({value})")                    transformed.append(None)                else:                    transformed.append(value)            else:                transformed.append(value)        return transformed    def get_primary_keys(self, table):        """获取表的主键列"""        if table in self.primary_keys:            return self.primary_keys[table]                # 查询SQLite表的主键        self.sqlite_cursor.execute(f"PRAGMA table_info({table})")        columns_info = self.sqlite_cursor.fetchall()                # 主键列是那些pk值不为0的列        primary_keys = []        for col_info in columns_info:            # col_info格式: (cid, name, type, notnull, dflt_value, pk)            if col_info[5] != 0:  # pk列不为0表示是主键                primary_keys.append(col_info[1])                self.primary_keys[table] = primary_keys        return primary_keys    def check_dependencies_migrated(self, table):        """检查依赖的表是否已经成功迁移"""        dependencies = self.reverse_dependencies.get(table, [])                if not dependencies:            return True                self.log(f"  检查 {table} 的依赖表迁移状态...")        all_migrated = True                for dep_table in dependencies:            if dep_table in self.migrated_tables:                self.log(f"    ✓ {dep_table} 已迁移")            elif dep_table in self.failed_tables:                self.log(f"    ✗ {dep_table} 迁移失败")                all_migrated = False            else:                self.log(f"    ? {dep_table} 尚未迁移")                all_migrated = False                return all_migrated    def handle_integer_overflow(self, table, columns, pg_structure, row):        """处理整数超出范围错误"""        # 找出导致问题的字段        for i, value in enumerate(row):            col_name = columns[i]            col_info = pg_structure.get(col_name.lower(), {})            pg_type = col_info.get('data_type')                        if pg_type in ('integer', 'int', 'int4') and isinstance(value, int):                if value < -2147483648 or value > 2147483647:                    self.log(f"  检测到字段 '{col_name}' 值超出范围 ({value})")                                        # 尝试将字段类型改为bigint                    try:                        self.log(f"  尝试将字段 '{col_name}' 类型改为 bigint")                        alter_query = sql.SQL("""                            ALTER TABLE {} ALTER COLUMN {} TYPE BIGINT;                        """).format(                            sql.Identifier(table.lower()),                            sql.Identifier(col_name)                        )                        self.pg_cursor.execute(alter_query)                        self.pg_conn.commit()                        self.log(f"  成功将字段 '{col_name}' 类型改为 bigint")                                                # 更新表结构信息                        pg_structure = self.get_pg_table_structure(table)                        return True                    except Exception as e:                        self.log(f"  修改字段类型失败: {str(e)}")                        self.pg_conn.rollback()                        return False        return False    def migrate_table(self, table):        """迁移单个表"""        self.log(f"\n准备迁移表: {table}")                # 检查是否需要跳过        if table.lower() in [t.lower() for t in self.skip_tables]:            self.log(f"  跳过系统表: {table}")            return True                # 检查依赖表是否已迁移        if not self.check_dependencies_migrated(table):            self.log(f"  依赖表未完全迁移,暂时跳过 {table}")            return False                # 获取 SQLite 表结构        self.sqlite_cursor.execute(f"PRAGMA table_info({table})")        columns_info = self.sqlite_cursor.fetchall()        columns = [col[1] for col in columns_info]                # 获取主键        primary_keys = self.get_primary_keys(table)                # 获取数据        self.sqlite_cursor.execute(f"SELECT * FROM {table}")        rows = self.sqlite_cursor.fetchall()                if not rows:            self.log(f"  表 {table} 为空,跳过")            self.migrated_tables.add(table)            return True                # 获取 PostgreSQL 表结构        pg_structure = self.get_pg_table_structure(table)                # 获取字段长度限制        column_lengths = self.get_column_lengths(table)                # 创建 PostgreSQL 插入语句        placeholders = ', '.join(['%s'] * len(columns))        column_names = ', '.join([f'"{col}"' for col in columns])                # 构建插入语句,根据主键是否存在添加ON CONFLICT子句        if primary_keys:            # 使用主键处理冲突            conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys])            insert_query = sql.SQL("""                INSERT INTO {} ({})                 VALUES ({})                ON CONFLICT ({}) DO NOTHING            """).format(                sql.Identifier(table.lower()),                sql.SQL(column_names),                sql.SQL(placeholders),                sql.SQL(conflict_columns)            )        else:            # 没有主键,使用普通插入语句            insert_query = sql.SQL("""                INSERT INTO {} ({})                 VALUES ({})            """).format(                sql.Identifier(table.lower()),                sql.SQL(column_names),                sql.SQL(placeholders)            )                # 用户确认        self.log(f"  表 {table} 有 {len(rows)} 行数据需要迁移")        if primary_keys:            self.log(f"  使用主键 ({', '.join(primary_keys)}) 处理冲突")        else:            self.log("  警告: 表没有主键,可能产生重复数据")                # 显示字段长度限制        if column_lengths:            self.log("  字段长度限制:")            for col, max_len in column_lengths.items():                self.log(f"    {col}: {max_len} 字符")                confirm = input("  按 Enter 键开始迁移,或输入 's' 跳过此表: ")                if confirm.lower() == 's':            self.log(f"  已跳过表 {table}")            return True                # 迁移数据        try:            success_count = 0            error_count = 0            for row in rows:                try:                    # 转换数据类型                    transformed_row = self.transform_data(row, columns, pg_structure)                    self.pg_cursor.execute(insert_query, transformed_row)                    success_count += 1                except psycopg2.IntegrityError as e:                    error_count += 1                    if 'foreign key constraint' in str(e):                        self.log(f"  外键约束错误: {str(e)}")                    elif 'integer out of range' in str(e):                        # 处理整数超出范围错误                        self.log(f"  整数超出范围错误: {str(e)}")                        # 尝试将字段类型改为bigint                        self.handle_integer_overflow(table, columns, pg_structure, row)                    elif 'duplicate key value violates unique constraint' in str(e):                        # 处理主键冲突错误                        self.log(f"  主键冲突错误: {str(e)}")                        self.log(f"  行数据: {row}")                        self.log(f"  尝试跳过此行...")                    else:                        self.log(f"  完整性错误: {str(e)}")                    self.pg_conn.rollback()                except Exception as e:                    error_count += 1                    self.log(f"  行插入失败: {str(e)}")                    self.pg_conn.rollback()                        self.pg_conn.commit()                        if error_count == 0:                self.log(f"  成功迁移 {success_count} 行数据")                self.migrated_tables.add(table)                return True            else:                self.log(f"  部分迁移: {success_count} 行成功, {error_count} 行失败")                self.failed_tables.add(table)                return False        except Exception as e:            self.pg_conn.rollback()            self.log(f"  迁移表 {table} 时出错: {str(e)}")            self.failed_tables.add(table)            return False    def migrate(self):        """执行迁移"""        if not self.connect():            return False                try:            # 询问是否清空 PostgreSQL 数据            self.log("\n是否在迁移前清空 PostgreSQL 数据库?")            self.log("1. 清空所有数据 (确保无重复)")            self.log("2. 不清空 (可能产生重复数据)")            self.log("3. 退出")                        choice = input("请选择 (1/2/3): ")                        if choice == '3':                self.log("退出迁移")                return False                            if choice == '1':                if not self.clear_postgres_data():                    self.log("清空操作失败,退出迁移")                    return False                        self.analyze_dependencies()            self.log(f"\n开始迁移 {len(self.migration_order)} 个表")                        # 按依赖顺序迁移表            remaining_tables = self.migration_order.copy()            max_attempts = 10  # 最大尝试次数            attempt = 0                        while remaining_tables and attempt < max_attempts:                attempt += 1                self.log(f"\n第 {attempt} 轮迁移尝试")                                # 复制列表以便在迭代时修改                tables_to_migrate = remaining_tables.copy()                remaining_tables = []                                for table in tables_to_migrate:                    success = self.migrate_table(table)                    if not success:                        remaining_tables.append(table)                                if remaining_tables:                    self.log(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移")                        # 检查迁移结果            if remaining_tables:                self.log(f"\n迁移完成,但以下表迁移失败:")                for table in remaining_tables:                    self.log(f"  - {table}")            else:                self.log("\n所有表迁移成功!")                        return True        except Exception as e:            self.log(f"迁移过程中发生错误: {str(e)}")            return False        finally:            self.close()    def mark_django_migrations_as_applied(self):        """标记 Django 迁移为已应用状态"""        try:            self.log("\n检查 Django 迁移状态...")                        # 重新连接数据库            self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)            self.sqlite_cursor = self.sqlite_conn.cursor()                        self.pg_conn = psycopg2.connect(**self.pg_config)            self.pg_cursor = self.pg_conn.cursor()                        # 获取所有迁移            self.sqlite_cursor.execute("SELECT app, name FROM django_migrations")            migrations = self.sqlite_cursor.fetchall()                        if not migrations:                self.log("  未找到 Django 迁移记录")                return True                        # 在 PostgreSQL 中标记迁移为已应用            for app, name in migrations:                try:                    # 先检查迁移是否已存在                    self.pg_cursor.execute("""                        SELECT 1 FROM django_migrations                         WHERE app = %s AND name = %s                    """, (app, name))                    exists = self.pg_cursor.fetchone()                                        if exists:                        self.log(f"  迁移已存在: {app}.{name}")                        continue                                        # 插入迁移记录                    self.pg_cursor.execute("""                        INSERT INTO django_migrations (app, name, applied)                        VALUES (%s, %s, NOW())                    """, (app, name))                    self.log(f"  标记迁移: {app}.{name}")                except psycopg2.IntegrityError as e:                    if 'unique constraint' in str(e):                        self.log(f"  迁移已存在: {app}.{name}")                    else:                        self.log(f"  标记迁移失败: {app}.{name} - {str(e)}")                except Exception as e:                    self.log(f"  标记迁移失败: {app}.{name} - {str(e)}")                        self.pg_conn.commit()            self.log("成功标记 Django 迁移为已应用状态")            return True        except Exception as e:            self.log(f"标记迁移状态时出错: {str(e)}")            return False        finally:            # 关闭临时连接            if self.sqlite_cursor: self.sqlite_cursor.close()            if self.sqlite_conn: self.sqlite_conn.close()            if self.pg_cursor: self.pg_cursor.close()            if self.pg_conn: self.pg_conn.close()    def reset_sequences(self):        """稳健重置 PostgreSQL 序列(每个序列单独处理,出现错误继续)"""        try:            self.log("\n重置 PostgreSQL 序列(稳健模式)...")            # 重新连接数据库,确保干净游标            if self.pg_cursor:                self.pg_cursor.close()            if self.pg_conn:                self.pg_conn.close()            self.pg_conn = psycopg2.connect(**self.pg_config)            self.pg_conn.autocommit = False            self.pg_cursor = self.pg_conn.cursor()            # 找出 public schema 中所有使用 sequence 的列            self.pg_cursor.execute("""                SELECT table_name, column_name                FROM information_schema.columns                WHERE column_default LIKE 'nextval(%' AND table_schema = 'public'            """)            rows = self.pg_cursor.fetchall()            if not rows:                self.log("  未发现任何使用 sequence 的列(可能无需重置)")                return True            for table_name, column_name in rows:                try:                    # 获取序列名(如果不是 serial 列,pg_get_serial_sequence 可能返回 NULL)                    self.pg_cursor.execute(                        "SELECT pg_get_serial_sequence(%s, %s)",                        (f"public.{table_name}", column_name)                    )                    seq_row = self.pg_cursor.fetchone()                    sequence_name = seq_row[0] if seq_row else None                    if not sequence_name:                        self.log(f"  跳过: {table_name}.{column_name} 没有可识别的 sequence")                        continue                    # 构造并执行 setval,将 sequence 设置为表中 max(id) 或 1,然后提交                    setval_sql = sql.SQL(                        "SELECT setval(%s, COALESCE((SELECT MAX({col}) FROM {tbl}), 1), true)"                    ).format(                        col=sql.Identifier(column_name),                        tbl=sql.Identifier('public', table_name) if False else sql.Identifier(table_name)                    )                    # 直接用 sequence_name 作为第一个参数                    self.pg_cursor.execute("SELECT setval(%s, COALESCE((SELECT MAX(%s) FROM %s), 1), true)",                                        (sequence_name, sql.Identifier(column_name).string, sql.Identifier(table_name).string))                    # 上面的字符串化参数不好控制,改为更可靠的方式:                    # 使用动态 SQL:                    setval_stmt = f"SELECT setval('{sequence_name}', COALESCE((SELECT MAX(\"{column_name}\") FROM \"{table_name}\"), 1), true);"                    self.pg_cursor.execute(setval_stmt)                    self.pg_conn.commit()                    self.log(f"  已重置序列: {sequence_name} (表 {table_name}.{column_name})")                except Exception as e:                    # 单个序列失败时回滚该序列的事务并继续                    try:                        self.pg_conn.rollback()                    except Exception:                        pass                    self.log(f"  重置序列失败: {table_name}.{column_name} - {str(e)}")                    continue            self.log("成功尝试重置所有发现的序列")            return True        except Exception as e:            self.log(f"重置序列时出错: {str(e)}")            return False        finally:            if self.pg_cursor:                self.pg_cursor.close()            if self.pg_conn:                self.pg_conn.close()if __name__ == "__main__":    migrator = SQLiteToPostgresMigrator()    if migrator.migrate():        # 标记 Django 迁移状态        migrator.mark_django_migrations_as_applied()                # 重置序列        migrator.reset_sequences()
 |