migrate_sqlite_to_postgres.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. import sqlite3
  2. import psycopg2
  3. import os
  4. import sys
  5. import datetime
  6. from dotenv import load_dotenv
  7. from psycopg2 import sql
  8. from collections import defaultdict
  9. # 加载环境变量
  10. load_dotenv()
  11. class SQLiteToPostgresMigrator:
  12. def __init__(self):
  13. # SQLite 配置
  14. self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3')
  15. # PostgreSQL 配置
  16. self.pg_config = {
  17. 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'),
  18. 'user': os.getenv('PG_DB_USER', 'wmsuser'),
  19. 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'),
  20. 'host': os.getenv('PG_DB_HOST', 'localhost'),
  21. 'port': os.getenv('PG_DB_PORT', '5432')
  22. }
  23. # 连接
  24. self.sqlite_conn = None
  25. self.sqlite_cursor = None
  26. self.pg_conn = None
  27. self.pg_cursor = None
  28. # 表依赖关系
  29. self.table_dependencies = defaultdict(list)
  30. self.reverse_dependencies = defaultdict(list)
  31. self.migration_order = []
  32. # 需要跳过的系统表
  33. self.skip_tables = [
  34. 'auth_permission',
  35. 'django_content_type',
  36. 'django_migrations',
  37. 'auth_group_permissions',
  38. 'auth_user_groups',
  39. 'auth_user_user_permissions',
  40. 'django_session',
  41. 'django_admin_log'
  42. ]
  43. # 迁移状态
  44. self.migrated_tables = set()
  45. self.failed_tables = set()
  46. # 存储表的主键信息
  47. self.primary_keys = {}
  48. # 存储字段长度限制
  49. self.column_lengths = {}
  50. # 日志文件
  51. self.log_file = None
  52. self.log_file_path = f"migration_log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
  53. def log(self, message):
  54. """记录日志到控制台和文件"""
  55. print(message)
  56. if self.log_file and not self.log_file.closed:
  57. self.log_file.write(message + "\n")
  58. self.log_file.flush()
  59. def connect(self):
  60. """连接到数据库"""
  61. try:
  62. # 打开日志文件
  63. self.log_file = open(self.log_file_path, "w", encoding="utf-8")
  64. self.log(f"迁移日志文件: {self.log_file_path}")
  65. # SQLite 连接
  66. self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
  67. self.sqlite_cursor = self.sqlite_conn.cursor()
  68. # PostgreSQL 连接
  69. self.pg_conn = psycopg2.connect(**self.pg_config)
  70. self.pg_cursor = self.pg_conn.cursor()
  71. self.log(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})")
  72. return True
  73. except Exception as e:
  74. self.log(f"连接失败: {str(e)}")
  75. return False
  76. def close(self):
  77. """关闭数据库连接"""
  78. # 先记录关闭消息
  79. if self.log_file and not self.log_file.closed:
  80. self.log_file.write(f"日志文件已保存: {self.log_file_path}\n")
  81. self.log_file.flush()
  82. # 关闭数据库连接
  83. for cursor in [self.sqlite_cursor, self.pg_cursor]:
  84. if cursor: cursor.close()
  85. for conn in [self.sqlite_conn, self.pg_conn]:
  86. if conn: conn.close()
  87. # 最后关闭日志文件
  88. if self.log_file and not self.log_file.closed:
  89. self.log_file.close()
  90. def clear_postgres_data(self):
  91. """清空 PostgreSQL 数据库中的所有数据"""
  92. self.log("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!")
  93. confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ")
  94. if confirm.lower() != 'y':
  95. self.log("取消清空操作")
  96. return False
  97. try:
  98. # 获取所有表名
  99. self.pg_cursor.execute("""
  100. SELECT table_name
  101. FROM information_schema.tables
  102. WHERE table_schema = 'public'
  103. """)
  104. tables = [row[0] for row in self.pg_cursor.fetchall()]
  105. # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role
  106. for table in tables:
  107. try:
  108. self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;")
  109. self.log(f"已清空表: {table}")
  110. except Exception as e:
  111. self.log(f"清空表 {table} 时出错: {str(e)}")
  112. continue
  113. self.pg_conn.commit()
  114. self.log("成功清空 PostgreSQL 数据库中的所有数据")
  115. return True
  116. except Exception as e:
  117. self.pg_conn.rollback()
  118. self.log(f"清空数据库时出错: {str(e)}")
  119. return False
  120. def analyze_dependencies(self):
  121. """分析表依赖关系"""
  122. # 获取 SQLite 中的所有表
  123. self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
  124. tables = [row[0] for row in self.sqlite_cursor.fetchall()]
  125. # 排除系统表
  126. excluded_tables = ['sqlite_sequence'] + self.skip_tables
  127. tables = [t for t in tables if t not in excluded_tables]
  128. # 分析外键依赖
  129. for table in tables:
  130. self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})")
  131. for fk in self.sqlite_cursor.fetchall():
  132. ref_table = fk[2]
  133. if ref_table in tables:
  134. self.table_dependencies[ref_table].append(table)
  135. self.reverse_dependencies[table].append(ref_table)
  136. # 创建迁移顺序 (拓扑排序)
  137. visited = set()
  138. order = []
  139. def visit(table):
  140. if table in visited:
  141. return
  142. visited.add(table)
  143. for dependent in self.table_dependencies.get(table, []):
  144. visit(dependent)
  145. order.append(table)
  146. for table in tables:
  147. if table not in visited:
  148. visit(table)
  149. self.migration_order = order
  150. self.log(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定")
  151. def get_pg_table_structure(self, table_name):
  152. """获取 PostgreSQL 表结构"""
  153. try:
  154. self.pg_cursor.execute("""
  155. SELECT column_name, data_type, character_maximum_length
  156. FROM information_schema.columns
  157. WHERE table_name = %s
  158. """, (table_name.lower(),))
  159. structure = {}
  160. for row in self.pg_cursor.fetchall():
  161. column_name, data_type, max_length = row
  162. structure[column_name] = {
  163. 'data_type': data_type,
  164. 'max_length': max_length
  165. }
  166. return structure
  167. except Exception:
  168. return {}
  169. def get_column_lengths(self, table_name):
  170. """获取 PostgreSQL 表的字段长度限制"""
  171. if table_name in self.column_lengths:
  172. return self.column_lengths[table_name]
  173. try:
  174. self.pg_cursor.execute("""
  175. SELECT column_name, character_maximum_length
  176. FROM information_schema.columns
  177. WHERE table_name = %s
  178. """, (table_name.lower(),))
  179. lengths = {}
  180. for row in self.pg_cursor.fetchall():
  181. column_name, max_length = row
  182. if max_length:
  183. lengths[column_name] = max_length
  184. self.column_lengths[table_name] = lengths
  185. return lengths
  186. except Exception:
  187. return {}
  188. def transform_data(self, row, columns, pg_structure):
  189. """根据目标类型转换数据"""
  190. transformed = []
  191. for i, value in enumerate(row):
  192. col_name = columns[i]
  193. col_info = pg_structure.get(col_name.lower(), {})
  194. pg_type = col_info.get('data_type')
  195. max_length = col_info.get('max_length')
  196. # 处理布尔字段转换
  197. if pg_type in ('boolean', 'bool'):
  198. # 将整数0/1转换为布尔值
  199. if value in (0, '0', False):
  200. transformed.append(False)
  201. elif value in (1, '1', True):
  202. transformed.append(True)
  203. else:
  204. # 无法转换的值保持原样(可能会出错)
  205. transformed.append(value)
  206. # 处理字符串长度限制
  207. elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str):
  208. if len(value) > max_length:
  209. # 截断超过长度的字符串
  210. truncated = value[:max_length]
  211. self.log(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})")
  212. transformed.append(truncated)
  213. else:
  214. transformed.append(value)
  215. # 处理整数范围限制
  216. elif pg_type in ('integer', 'int', 'int4'):
  217. # 检查值是否在PostgreSQL整数范围内
  218. if isinstance(value, int) and (value < -2147483648 or value > 2147483647):
  219. # 尝试转换为bigint
  220. try:
  221. self.log(f" 警告: 字段 '{col_name}' 值超出整数范围 ({value}),尝试转换为bigint")
  222. transformed.append(value)
  223. except Exception:
  224. self.log(f" 错误: 无法转换字段 '{col_name}' 的值 ({value})")
  225. transformed.append(None)
  226. else:
  227. transformed.append(value)
  228. # 处理bigint范围限制
  229. elif pg_type in ('bigint', 'int8'):
  230. # 检查值是否在bigint范围内
  231. bigint_min = -9223372036854775808
  232. bigint_max = 9223372036854775807
  233. if isinstance(value, int) and (value < bigint_min or value > bigint_max):
  234. self.log(f" 错误: 字段 '{col_name}' 值超出bigint范围 ({value})")
  235. transformed.append(None)
  236. else:
  237. transformed.append(value)
  238. else:
  239. transformed.append(value)
  240. return transformed
  241. def get_primary_keys(self, table):
  242. """获取表的主键列"""
  243. if table in self.primary_keys:
  244. return self.primary_keys[table]
  245. # 查询SQLite表的主键
  246. self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
  247. columns_info = self.sqlite_cursor.fetchall()
  248. # 主键列是那些pk值不为0的列
  249. primary_keys = []
  250. for col_info in columns_info:
  251. # col_info格式: (cid, name, type, notnull, dflt_value, pk)
  252. if col_info[5] != 0: # pk列不为0表示是主键
  253. primary_keys.append(col_info[1])
  254. self.primary_keys[table] = primary_keys
  255. return primary_keys
  256. def check_dependencies_migrated(self, table):
  257. """检查依赖的表是否已经成功迁移"""
  258. dependencies = self.reverse_dependencies.get(table, [])
  259. if not dependencies:
  260. return True
  261. self.log(f" 检查 {table} 的依赖表迁移状态...")
  262. all_migrated = True
  263. for dep_table in dependencies:
  264. if dep_table in self.migrated_tables:
  265. self.log(f" ✓ {dep_table} 已迁移")
  266. elif dep_table in self.failed_tables:
  267. self.log(f" ✗ {dep_table} 迁移失败")
  268. all_migrated = False
  269. else:
  270. self.log(f" ? {dep_table} 尚未迁移")
  271. all_migrated = False
  272. return all_migrated
  273. def handle_integer_overflow(self, table, columns, pg_structure, row):
  274. """处理整数超出范围错误"""
  275. # 找出导致问题的字段
  276. for i, value in enumerate(row):
  277. col_name = columns[i]
  278. col_info = pg_structure.get(col_name.lower(), {})
  279. pg_type = col_info.get('data_type')
  280. if pg_type in ('integer', 'int', 'int4') and isinstance(value, int):
  281. if value < -2147483648 or value > 2147483647:
  282. self.log(f" 检测到字段 '{col_name}' 值超出范围 ({value})")
  283. # 尝试将字段类型改为bigint
  284. try:
  285. self.log(f" 尝试将字段 '{col_name}' 类型改为 bigint")
  286. alter_query = sql.SQL("""
  287. ALTER TABLE {} ALTER COLUMN {} TYPE BIGINT;
  288. """).format(
  289. sql.Identifier(table.lower()),
  290. sql.Identifier(col_name)
  291. )
  292. self.pg_cursor.execute(alter_query)
  293. self.pg_conn.commit()
  294. self.log(f" 成功将字段 '{col_name}' 类型改为 bigint")
  295. # 更新表结构信息
  296. pg_structure = self.get_pg_table_structure(table)
  297. return True
  298. except Exception as e:
  299. self.log(f" 修改字段类型失败: {str(e)}")
  300. self.pg_conn.rollback()
  301. return False
  302. return False
  303. def migrate_table(self, table):
  304. """迁移单个表"""
  305. self.log(f"\n准备迁移表: {table}")
  306. # 检查是否需要跳过
  307. if table.lower() in [t.lower() for t in self.skip_tables]:
  308. self.log(f" 跳过系统表: {table}")
  309. return True
  310. # 检查依赖表是否已迁移
  311. if not self.check_dependencies_migrated(table):
  312. self.log(f" 依赖表未完全迁移,暂时跳过 {table}")
  313. return False
  314. # 获取 SQLite 表结构
  315. self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
  316. columns_info = self.sqlite_cursor.fetchall()
  317. columns = [col[1] for col in columns_info]
  318. # 获取主键
  319. primary_keys = self.get_primary_keys(table)
  320. # 获取数据
  321. self.sqlite_cursor.execute(f"SELECT * FROM {table}")
  322. rows = self.sqlite_cursor.fetchall()
  323. if not rows:
  324. self.log(f" 表 {table} 为空,跳过")
  325. self.migrated_tables.add(table)
  326. return True
  327. # 获取 PostgreSQL 表结构
  328. pg_structure = self.get_pg_table_structure(table)
  329. # 获取字段长度限制
  330. column_lengths = self.get_column_lengths(table)
  331. # 创建 PostgreSQL 插入语句
  332. placeholders = ', '.join(['%s'] * len(columns))
  333. column_names = ', '.join([f'"{col}"' for col in columns])
  334. # 构建插入语句,根据主键是否存在添加ON CONFLICT子句
  335. if primary_keys:
  336. # 使用主键处理冲突
  337. conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys])
  338. insert_query = sql.SQL("""
  339. INSERT INTO {} ({})
  340. VALUES ({})
  341. ON CONFLICT ({}) DO NOTHING
  342. """).format(
  343. sql.Identifier(table.lower()),
  344. sql.SQL(column_names),
  345. sql.SQL(placeholders),
  346. sql.SQL(conflict_columns)
  347. )
  348. else:
  349. # 没有主键,使用普通插入语句
  350. insert_query = sql.SQL("""
  351. INSERT INTO {} ({})
  352. VALUES ({})
  353. """).format(
  354. sql.Identifier(table.lower()),
  355. sql.SQL(column_names),
  356. sql.SQL(placeholders)
  357. )
  358. # 用户确认
  359. self.log(f" 表 {table} 有 {len(rows)} 行数据需要迁移")
  360. if primary_keys:
  361. self.log(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突")
  362. else:
  363. self.log(" 警告: 表没有主键,可能产生重复数据")
  364. # 显示字段长度限制
  365. if column_lengths:
  366. self.log(" 字段长度限制:")
  367. for col, max_len in column_lengths.items():
  368. self.log(f" {col}: {max_len} 字符")
  369. confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ")
  370. if confirm.lower() == 's':
  371. self.log(f" 已跳过表 {table}")
  372. return True
  373. # 迁移数据
  374. try:
  375. success_count = 0
  376. error_count = 0
  377. for row in rows:
  378. try:
  379. # 转换数据类型
  380. transformed_row = self.transform_data(row, columns, pg_structure)
  381. self.pg_cursor.execute(insert_query, transformed_row)
  382. success_count += 1
  383. except psycopg2.IntegrityError as e:
  384. error_count += 1
  385. if 'foreign key constraint' in str(e):
  386. self.log(f" 外键约束错误: {str(e)}")
  387. elif 'integer out of range' in str(e):
  388. # 处理整数超出范围错误
  389. self.log(f" 整数超出范围错误: {str(e)}")
  390. # 尝试将字段类型改为bigint
  391. self.handle_integer_overflow(table, columns, pg_structure, row)
  392. elif 'duplicate key value violates unique constraint' in str(e):
  393. # 处理主键冲突错误
  394. self.log(f" 主键冲突错误: {str(e)}")
  395. self.log(f" 行数据: {row}")
  396. self.log(f" 尝试跳过此行...")
  397. else:
  398. self.log(f" 完整性错误: {str(e)}")
  399. self.pg_conn.rollback()
  400. except Exception as e:
  401. error_count += 1
  402. self.log(f" 行插入失败: {str(e)}")
  403. self.pg_conn.rollback()
  404. self.pg_conn.commit()
  405. if error_count == 0:
  406. self.log(f" 成功迁移 {success_count} 行数据")
  407. self.migrated_tables.add(table)
  408. return True
  409. else:
  410. self.log(f" 部分迁移: {success_count} 行成功, {error_count} 行失败")
  411. self.failed_tables.add(table)
  412. return False
  413. except Exception as e:
  414. self.pg_conn.rollback()
  415. self.log(f" 迁移表 {table} 时出错: {str(e)}")
  416. self.failed_tables.add(table)
  417. return False
  418. def migrate(self):
  419. """执行迁移"""
  420. if not self.connect():
  421. return False
  422. try:
  423. # 询问是否清空 PostgreSQL 数据
  424. self.log("\n是否在迁移前清空 PostgreSQL 数据库?")
  425. self.log("1. 清空所有数据 (确保无重复)")
  426. self.log("2. 不清空 (可能产生重复数据)")
  427. self.log("3. 退出")
  428. choice = input("请选择 (1/2/3): ")
  429. if choice == '3':
  430. self.log("退出迁移")
  431. return False
  432. if choice == '1':
  433. if not self.clear_postgres_data():
  434. self.log("清空操作失败,退出迁移")
  435. return False
  436. self.analyze_dependencies()
  437. self.log(f"\n开始迁移 {len(self.migration_order)} 个表")
  438. # 按依赖顺序迁移表
  439. remaining_tables = self.migration_order.copy()
  440. max_attempts = 10 # 最大尝试次数
  441. attempt = 0
  442. while remaining_tables and attempt < max_attempts:
  443. attempt += 1
  444. self.log(f"\n第 {attempt} 轮迁移尝试")
  445. # 复制列表以便在迭代时修改
  446. tables_to_migrate = remaining_tables.copy()
  447. remaining_tables = []
  448. for table in tables_to_migrate:
  449. success = self.migrate_table(table)
  450. if not success:
  451. remaining_tables.append(table)
  452. if remaining_tables:
  453. self.log(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移")
  454. # 检查迁移结果
  455. if remaining_tables:
  456. self.log(f"\n迁移完成,但以下表迁移失败:")
  457. for table in remaining_tables:
  458. self.log(f" - {table}")
  459. else:
  460. self.log("\n所有表迁移成功!")
  461. return True
  462. except Exception as e:
  463. self.log(f"迁移过程中发生错误: {str(e)}")
  464. return False
  465. finally:
  466. self.close()
  467. def mark_django_migrations_as_applied(self):
  468. """标记 Django 迁移为已应用状态"""
  469. try:
  470. self.log("\n检查 Django 迁移状态...")
  471. # 重新连接数据库
  472. self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
  473. self.sqlite_cursor = self.sqlite_conn.cursor()
  474. self.pg_conn = psycopg2.connect(**self.pg_config)
  475. self.pg_cursor = self.pg_conn.cursor()
  476. # 获取所有迁移
  477. self.sqlite_cursor.execute("SELECT app, name FROM django_migrations")
  478. migrations = self.sqlite_cursor.fetchall()
  479. if not migrations:
  480. self.log(" 未找到 Django 迁移记录")
  481. return True
  482. # 在 PostgreSQL 中标记迁移为已应用
  483. for app, name in migrations:
  484. try:
  485. # 先检查迁移是否已存在
  486. self.pg_cursor.execute("""
  487. SELECT 1 FROM django_migrations
  488. WHERE app = %s AND name = %s
  489. """, (app, name))
  490. exists = self.pg_cursor.fetchone()
  491. if exists:
  492. self.log(f" 迁移已存在: {app}.{name}")
  493. continue
  494. # 插入迁移记录
  495. self.pg_cursor.execute("""
  496. INSERT INTO django_migrations (app, name, applied)
  497. VALUES (%s, %s, NOW())
  498. """, (app, name))
  499. self.log(f" 标记迁移: {app}.{name}")
  500. except psycopg2.IntegrityError as e:
  501. if 'unique constraint' in str(e):
  502. self.log(f" 迁移已存在: {app}.{name}")
  503. else:
  504. self.log(f" 标记迁移失败: {app}.{name} - {str(e)}")
  505. except Exception as e:
  506. self.log(f" 标记迁移失败: {app}.{name} - {str(e)}")
  507. self.pg_conn.commit()
  508. self.log("成功标记 Django 迁移为已应用状态")
  509. return True
  510. except Exception as e:
  511. self.log(f"标记迁移状态时出错: {str(e)}")
  512. return False
  513. finally:
  514. # 关闭临时连接
  515. if self.sqlite_cursor: self.sqlite_cursor.close()
  516. if self.sqlite_conn: self.sqlite_conn.close()
  517. if self.pg_cursor: self.pg_cursor.close()
  518. if self.pg_conn: self.pg_conn.close()
  519. def reset_sequences(self):
  520. """稳健重置 PostgreSQL 序列(每个序列单独处理,出现错误继续)"""
  521. try:
  522. self.log("\n重置 PostgreSQL 序列(稳健模式)...")
  523. # 重新连接数据库,确保干净游标
  524. if self.pg_cursor:
  525. self.pg_cursor.close()
  526. if self.pg_conn:
  527. self.pg_conn.close()
  528. self.pg_conn = psycopg2.connect(**self.pg_config)
  529. self.pg_conn.autocommit = False
  530. self.pg_cursor = self.pg_conn.cursor()
  531. # 找出 public schema 中所有使用 sequence 的列
  532. self.pg_cursor.execute("""
  533. SELECT table_name, column_name
  534. FROM information_schema.columns
  535. WHERE column_default LIKE 'nextval(%' AND table_schema = 'public'
  536. """)
  537. rows = self.pg_cursor.fetchall()
  538. if not rows:
  539. self.log(" 未发现任何使用 sequence 的列(可能无需重置)")
  540. return True
  541. for table_name, column_name in rows:
  542. try:
  543. # 获取序列名(如果不是 serial 列,pg_get_serial_sequence 可能返回 NULL)
  544. self.pg_cursor.execute(
  545. "SELECT pg_get_serial_sequence(%s, %s)",
  546. (f"public.{table_name}", column_name)
  547. )
  548. seq_row = self.pg_cursor.fetchone()
  549. sequence_name = seq_row[0] if seq_row else None
  550. if not sequence_name:
  551. self.log(f" 跳过: {table_name}.{column_name} 没有可识别的 sequence")
  552. continue
  553. # 构造并执行 setval,将 sequence 设置为表中 max(id) 或 1,然后提交
  554. setval_sql = sql.SQL(
  555. "SELECT setval(%s, COALESCE((SELECT MAX({col}) FROM {tbl}), 1), true)"
  556. ).format(
  557. col=sql.Identifier(column_name),
  558. tbl=sql.Identifier('public', table_name) if False else sql.Identifier(table_name)
  559. )
  560. # 直接用 sequence_name 作为第一个参数
  561. self.pg_cursor.execute("SELECT setval(%s, COALESCE((SELECT MAX(%s) FROM %s), 1), true)",
  562. (sequence_name, sql.Identifier(column_name).string, sql.Identifier(table_name).string))
  563. # 上面的字符串化参数不好控制,改为更可靠的方式:
  564. # 使用动态 SQL:
  565. setval_stmt = f"SELECT setval('{sequence_name}', COALESCE((SELECT MAX(\"{column_name}\") FROM \"{table_name}\"), 1), true);"
  566. self.pg_cursor.execute(setval_stmt)
  567. self.pg_conn.commit()
  568. self.log(f" 已重置序列: {sequence_name} (表 {table_name}.{column_name})")
  569. except Exception as e:
  570. # 单个序列失败时回滚该序列的事务并继续
  571. try:
  572. self.pg_conn.rollback()
  573. except Exception:
  574. pass
  575. self.log(f" 重置序列失败: {table_name}.{column_name} - {str(e)}")
  576. continue
  577. self.log("成功尝试重置所有发现的序列")
  578. return True
  579. except Exception as e:
  580. self.log(f"重置序列时出错: {str(e)}")
  581. return False
  582. finally:
  583. if self.pg_cursor:
  584. self.pg_cursor.close()
  585. if self.pg_conn:
  586. self.pg_conn.close()
  587. if __name__ == "__main__":
  588. migrator = SQLiteToPostgresMigrator()
  589. if migrator.migrate():
  590. # 标记 Django 迁移状态
  591. migrator.mark_django_migrations_as_applied()
  592. # 重置序列
  593. migrator.reset_sequences()