migrate_sqlite_to_postgres.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. import sqlite3
  2. import psycopg2
  3. import os
  4. import sys
  5. import datetime
  6. from dotenv import load_dotenv
  7. from psycopg2 import sql
  8. from collections import defaultdict
  9. # 加载环境变量
  10. load_dotenv()
  11. class SQLiteToPostgresMigrator:
  12. def __init__(self):
  13. # SQLite 配置
  14. self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3')
  15. # PostgreSQL 配置
  16. self.pg_config = {
  17. 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'),
  18. 'user': os.getenv('PG_DB_USER', 'wmsuser'),
  19. 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'),
  20. 'host': os.getenv('PG_DB_HOST', 'localhost'),
  21. 'port': os.getenv('PG_DB_PORT', '5432')
  22. }
  23. # 连接
  24. self.sqlite_conn = None
  25. self.sqlite_cursor = None
  26. self.pg_conn = None
  27. self.pg_cursor = None
  28. # 表依赖关系
  29. self.table_dependencies = defaultdict(list)
  30. self.reverse_dependencies = defaultdict(list)
  31. self.migration_order = []
  32. # 需要跳过的系统表
  33. self.skip_tables = [
  34. 'auth_permission',
  35. 'django_content_type',
  36. 'django_migrations',
  37. 'auth_group_permissions',
  38. 'auth_user_groups',
  39. 'auth_user_user_permissions'
  40. ]
  41. # 迁移状态
  42. self.migrated_tables = set()
  43. self.failed_tables = set()
  44. # 存储表的主键信息
  45. self.primary_keys = {}
  46. # 存储字段长度限制
  47. self.column_lengths = {}
  48. # 日志文件
  49. self.log_file = None
  50. self.log_file_path = f"migration_log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
  51. def log(self, message):
  52. """记录日志到控制台和文件"""
  53. print(message)
  54. if self.log_file:
  55. self.log_file.write(message + "\n")
  56. self.log_file.flush()
  57. def connect(self):
  58. """连接到数据库"""
  59. try:
  60. # 打开日志文件
  61. self.log_file = open(self.log_file_path, "w", encoding="utf-8")
  62. self.log(f"迁移日志文件: {self.log_file_path}")
  63. # SQLite 连接
  64. self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
  65. self.sqlite_cursor = self.sqlite_conn.cursor()
  66. # PostgreSQL 连接
  67. self.pg_conn = psycopg2.connect(**self.pg_config)
  68. self.pg_cursor = self.pg_conn.cursor()
  69. self.log(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})")
  70. return True
  71. except Exception as e:
  72. self.log(f"连接失败: {str(e)}")
  73. return False
  74. def close(self):
  75. """关闭数据库连接"""
  76. for cursor in [self.sqlite_cursor, self.pg_cursor]:
  77. if cursor: cursor.close()
  78. for conn in [self.sqlite_conn, self.pg_conn]:
  79. if conn: conn.close()
  80. if self.log_file:
  81. self.log_file.close()
  82. self.log(f"日志文件已保存: {self.log_file_path}")
  83. def clear_postgres_data(self):
  84. """清空 PostgreSQL 数据库中的所有数据"""
  85. self.log("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!")
  86. confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ")
  87. if confirm.lower() != 'y':
  88. self.log("取消清空操作")
  89. return False
  90. try:
  91. # 获取所有表名
  92. self.pg_cursor.execute("""
  93. SELECT table_name
  94. FROM information_schema.tables
  95. WHERE table_schema = 'public'
  96. """)
  97. tables = [row[0] for row in self.pg_cursor.fetchall()]
  98. # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role
  99. for table in tables:
  100. try:
  101. self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;")
  102. self.log(f"已清空表: {table}")
  103. except Exception as e:
  104. self.log(f"清空表 {table} 时出错: {str(e)}")
  105. continue
  106. self.pg_conn.commit()
  107. self.log("成功清空 PostgreSQL 数据库中的所有数据")
  108. return True
  109. except Exception as e:
  110. self.pg_conn.rollback()
  111. self.log(f"清空数据库时出错: {str(e)}")
  112. return False
  113. def analyze_dependencies(self):
  114. """分析表依赖关系"""
  115. # 获取 SQLite 中的所有表
  116. self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
  117. tables = [row[0] for row in self.sqlite_cursor.fetchall()]
  118. # 排除系统表
  119. excluded_tables = ['sqlite_sequence'] + self.skip_tables
  120. tables = [t for t in tables if t not in excluded_tables]
  121. # 分析外键依赖
  122. for table in tables:
  123. self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})")
  124. for fk in self.sqlite_cursor.fetchall():
  125. ref_table = fk[2]
  126. if ref_table in tables:
  127. self.table_dependencies[ref_table].append(table)
  128. self.reverse_dependencies[table].append(ref_table)
  129. # 创建迁移顺序 (拓扑排序)
  130. visited = set()
  131. order = []
  132. def visit(table):
  133. if table in visited:
  134. return
  135. visited.add(table)
  136. for dependent in self.table_dependencies.get(table, []):
  137. visit(dependent)
  138. order.append(table)
  139. for table in tables:
  140. if table not in visited:
  141. visit(table)
  142. self.migration_order = order
  143. self.log(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定")
  144. def get_pg_table_structure(self, table_name):
  145. """获取 PostgreSQL 表结构"""
  146. try:
  147. self.pg_cursor.execute("""
  148. SELECT column_name, data_type, character_maximum_length
  149. FROM information_schema.columns
  150. WHERE table_name = %s
  151. """, (table_name.lower(),))
  152. structure = {}
  153. for row in self.pg_cursor.fetchall():
  154. column_name, data_type, max_length = row
  155. structure[column_name] = {
  156. 'data_type': data_type,
  157. 'max_length': max_length
  158. }
  159. return structure
  160. except Exception:
  161. return {}
  162. def get_column_lengths(self, table_name):
  163. """获取 PostgreSQL 表的字段长度限制"""
  164. if table_name in self.column_lengths:
  165. return self.column_lengths[table_name]
  166. try:
  167. self.pg_cursor.execute("""
  168. SELECT column_name, character_maximum_length
  169. FROM information_schema.columns
  170. WHERE table_name = %s
  171. """, (table_name.lower(),))
  172. lengths = {}
  173. for row in self.pg_cursor.fetchall():
  174. column_name, max_length = row
  175. if max_length:
  176. lengths[column_name] = max_length
  177. self.column_lengths[table_name] = lengths
  178. return lengths
  179. except Exception:
  180. return {}
  181. def transform_data(self, row, columns, pg_structure):
  182. """根据目标类型转换数据"""
  183. transformed = []
  184. for i, value in enumerate(row):
  185. col_name = columns[i]
  186. col_info = pg_structure.get(col_name.lower(), {})
  187. pg_type = col_info.get('data_type')
  188. max_length = col_info.get('max_length')
  189. # 处理布尔字段转换
  190. if pg_type in ('boolean', 'bool'):
  191. # 将整数0/1转换为布尔值
  192. if value in (0, '0', False):
  193. transformed.append(False)
  194. elif value in (1, '1', True):
  195. transformed.append(True)
  196. else:
  197. # 无法转换的值保持原样(可能会出错)
  198. transformed.append(value)
  199. # 处理字符串长度限制
  200. elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str):
  201. if len(value) > max_length:
  202. # 截断超过长度的字符串
  203. truncated = value[:max_length]
  204. self.log(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})")
  205. transformed.append(truncated)
  206. else:
  207. transformed.append(value)
  208. # 处理整数范围限制
  209. elif pg_type in ('integer', 'int', 'int4'):
  210. # 检查值是否在PostgreSQL整数范围内
  211. if isinstance(value, int) and (value < -2147483648 or value > 2147483647):
  212. # 尝试转换为bigint
  213. try:
  214. self.log(f" 警告: 字段 '{col_name}' 值超出整数范围 ({value}),尝试转换为bigint")
  215. transformed.append(value)
  216. except Exception:
  217. self.log(f" 错误: 无法转换字段 '{col_name}' 的值 ({value})")
  218. transformed.append(None)
  219. else:
  220. transformed.append(value)
  221. # 处理bigint范围限制
  222. elif pg_type in ('bigint', 'int8'):
  223. # 检查值是否在bigint范围内
  224. bigint_min = -9223372036854775808
  225. bigint_max = 9223372036854775807
  226. if isinstance(value, int) and (value < bigint_min or value > bigint_max):
  227. self.log(f" 错误: 字段 '{col_name}' 值超出bigint范围 ({value})")
  228. transformed.append(None)
  229. else:
  230. transformed.append(value)
  231. else:
  232. transformed.append(value)
  233. return transformed
  234. def get_primary_keys(self, table):
  235. """获取表的主键列"""
  236. if table in self.primary_keys:
  237. return self.primary_keys[table]
  238. # 查询SQLite表的主键
  239. self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
  240. columns_info = self.sqlite_cursor.fetchall()
  241. # 主键列是那些pk值不为0的列
  242. primary_keys = []
  243. for col_info in columns_info:
  244. # col_info格式: (cid, name, type, notnull, dflt_value, pk)
  245. if col_info[5] != 0: # pk列不为0表示是主键
  246. primary_keys.append(col_info[1])
  247. self.primary_keys[table] = primary_keys
  248. return primary_keys
  249. def check_dependencies_migrated(self, table):
  250. """检查依赖的表是否已经成功迁移"""
  251. dependencies = self.reverse_dependencies.get(table, [])
  252. if not dependencies:
  253. return True
  254. self.log(f" 检查 {table} 的依赖表迁移状态...")
  255. all_migrated = True
  256. for dep_table in dependencies:
  257. if dep_table in self.migrated_tables:
  258. self.log(f" ✓ {dep_table} 已迁移")
  259. elif dep_table in self.failed_tables:
  260. self.log(f" ✗ {dep_table} 迁移失败")
  261. all_migrated = False
  262. else:
  263. self.log(f" ? {dep_table} 尚未迁移")
  264. all_migrated = False
  265. return all_migrated
  266. def handle_integer_overflow(self, table, columns, pg_structure, row):
  267. """处理整数超出范围错误"""
  268. # 找出导致问题的字段
  269. for i, value in enumerate(row):
  270. col_name = columns[i]
  271. col_info = pg_structure.get(col_name.lower(), {})
  272. pg_type = col_info.get('data_type')
  273. if pg_type in ('integer', 'int', 'int4') and isinstance(value, int):
  274. if value < -2147483648 or value > 2147483647:
  275. self.log(f" 检测到字段 '{col_name}' 值超出范围 ({value})")
  276. # 尝试将字段类型改为bigint
  277. try:
  278. self.log(f" 尝试将字段 '{col_name}' 类型改为 bigint")
  279. alter_query = sql.SQL("""
  280. ALTER TABLE {} ALTER COLUMN {} TYPE BIGINT;
  281. """).format(
  282. sql.Identifier(table.lower()),
  283. sql.Identifier(col_name)
  284. )
  285. self.pg_cursor.execute(alter_query)
  286. self.pg_conn.commit()
  287. self.log(f" 成功将字段 '{col_name}' 类型改为 bigint")
  288. # 更新表结构信息
  289. pg_structure = self.get_pg_table_structure(table)
  290. return True
  291. except Exception as e:
  292. self.log(f" 修改字段类型失败: {str(e)}")
  293. self.pg_conn.rollback()
  294. return False
  295. return False
  296. def migrate_table(self, table):
  297. """迁移单个表"""
  298. self.log(f"\n准备迁移表: {table}")
  299. # 检查是否需要跳过
  300. if table in self.skip_tables:
  301. self.log(f" 跳过系统表: {table}")
  302. return True
  303. # 检查依赖表是否已迁移
  304. if not self.check_dependencies_migrated(table):
  305. self.log(f" 依赖表未完全迁移,暂时跳过 {table}")
  306. return False
  307. # 获取 SQLite 表结构
  308. self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
  309. columns_info = self.sqlite_cursor.fetchall()
  310. columns = [col[1] for col in columns_info]
  311. # 获取主键
  312. primary_keys = self.get_primary_keys(table)
  313. # 获取数据
  314. self.sqlite_cursor.execute(f"SELECT * FROM {table}")
  315. rows = self.sqlite_cursor.fetchall()
  316. if not rows:
  317. self.log(f" 表 {table} 为空,跳过")
  318. self.migrated_tables.add(table)
  319. return True
  320. # 获取 PostgreSQL 表结构
  321. pg_structure = self.get_pg_table_structure(table)
  322. # 获取字段长度限制
  323. column_lengths = self.get_column_lengths(table)
  324. # 创建 PostgreSQL 插入语句
  325. placeholders = ', '.join(['%s'] * len(columns))
  326. column_names = ', '.join([f'"{col}"' for col in columns])
  327. # 构建插入语句,根据主键是否存在添加ON CONFLICT子句
  328. if primary_keys:
  329. # 使用主键处理冲突
  330. conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys])
  331. insert_query = sql.SQL("""
  332. INSERT INTO {} ({})
  333. VALUES ({})
  334. ON CONFLICT ({}) DO NOTHING
  335. """).format(
  336. sql.Identifier(table.lower()),
  337. sql.SQL(column_names),
  338. sql.SQL(placeholders),
  339. sql.SQL(conflict_columns)
  340. )
  341. else:
  342. # 没有主键,使用普通插入语句
  343. insert_query = sql.SQL("""
  344. INSERT INTO {} ({})
  345. VALUES ({})
  346. """).format(
  347. sql.Identifier(table.lower()),
  348. sql.SQL(column_names),
  349. sql.SQL(placeholders)
  350. )
  351. # 用户确认
  352. self.log(f" 表 {table} 有 {len(rows)} 行数据需要迁移")
  353. if primary_keys:
  354. self.log(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突")
  355. else:
  356. self.log(" 警告: 表没有主键,可能产生重复数据")
  357. # 显示字段长度限制
  358. if column_lengths:
  359. self.log(" 字段长度限制:")
  360. for col, max_len in column_lengths.items():
  361. self.log(f" {col}: {max_len} 字符")
  362. confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ")
  363. if confirm.lower() == 's':
  364. self.log(f" 已跳过表 {table}")
  365. return True
  366. # 迁移数据
  367. try:
  368. success_count = 0
  369. error_count = 0
  370. for row in rows:
  371. try:
  372. # 转换数据类型
  373. transformed_row = self.transform_data(row, columns, pg_structure)
  374. self.pg_cursor.execute(insert_query, transformed_row)
  375. success_count += 1
  376. except psycopg2.IntegrityError as e:
  377. error_count += 1
  378. if 'foreign key constraint' in str(e):
  379. self.log(f" 外键约束错误: {str(e)}")
  380. elif 'integer out of range' in str(e):
  381. # 处理整数超出范围错误
  382. self.log(f" 整数超出范围错误: {str(e)}")
  383. # 尝试将字段类型改为bigint
  384. self.handle_integer_overflow(table, columns, pg_structure, row)
  385. else:
  386. self.log(f" 完整性错误: {str(e)}")
  387. self.pg_conn.rollback()
  388. except Exception as e:
  389. error_count += 1
  390. self.log(f" 行插入失败: {str(e)}")
  391. self.pg_conn.rollback()
  392. self.pg_conn.commit()
  393. if error_count == 0:
  394. self.log(f" 成功迁移 {success_count} 行数据")
  395. self.migrated_tables.add(table)
  396. return True
  397. else:
  398. self.log(f" 部分迁移: {success_count} 行成功, {error_count} 行失败")
  399. self.failed_tables.add(table)
  400. return False
  401. except Exception as e:
  402. self.pg_conn.rollback()
  403. self.log(f" 迁移表 {table} 时出错: {str(e)}")
  404. self.failed_tables.add(table)
  405. return False
  406. def migrate(self):
  407. """执行迁移"""
  408. if not self.connect():
  409. return False
  410. try:
  411. # 询问是否清空 PostgreSQL 数据
  412. self.log("\n是否在迁移前清空 PostgreSQL 数据库?")
  413. self.log("1. 清空所有数据 (确保无重复)")
  414. self.log("2. 不清空 (可能产生重复数据)")
  415. self.log("3. 退出")
  416. choice = input("请选择 (1/2/3): ")
  417. if choice == '3':
  418. self.log("退出迁移")
  419. return False
  420. if choice == '1':
  421. if not self.clear_postgres_data():
  422. self.log("清空操作失败,退出迁移")
  423. return False
  424. self.analyze_dependencies()
  425. self.log(f"\n开始迁移 {len(self.migration_order)} 个表")
  426. # 按依赖顺序迁移表
  427. remaining_tables = self.migration_order.copy()
  428. max_attempts = 10 # 最大尝试次数
  429. attempt = 0
  430. while remaining_tables and attempt < max_attempts:
  431. attempt += 1
  432. self.log(f"\n第 {attempt} 轮迁移尝试")
  433. # 复制列表以便在迭代时修改
  434. tables_to_migrate = remaining_tables.copy()
  435. remaining_tables = []
  436. for table in tables_to_migrate:
  437. success = self.migrate_table(table)
  438. if not success:
  439. remaining_tables.append(table)
  440. if remaining_tables:
  441. self.log(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移")
  442. # 检查迁移结果
  443. if remaining_tables:
  444. self.log(f"\n迁移完成,但以下表迁移失败:")
  445. for table in remaining_tables:
  446. self.log(f" - {table}")
  447. else:
  448. self.log("\n所有表迁移成功!")
  449. return True
  450. except Exception as e:
  451. self.log(f"迁移过程中发生错误: {str(e)}")
  452. return False
  453. finally:
  454. self.close()
  455. if __name__ == "__main__":
  456. migrator = SQLiteToPostgresMigrator()
  457. migrator.migrate()