migrate_sqlite_to_postgres.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. import sqlite3
  2. import psycopg2
  3. import os
  4. import sys
  5. from dotenv import load_dotenv
  6. from psycopg2 import sql
  7. from collections import defaultdict
  8. # 加载环境变量
  9. load_dotenv()
  10. class SQLiteToPostgresMigrator:
  11. def __init__(self):
  12. # SQLite 配置
  13. self.sqlite_db_path = os.getenv('SQLITE_DB_PATH', 'db.sqlite3')
  14. # PostgreSQL 配置
  15. self.pg_config = {
  16. 'dbname': os.getenv('PG_DB_NAME', 'wmsdb'),
  17. 'user': os.getenv('PG_DB_USER', 'wmsuser'),
  18. 'password': os.getenv('PG_DB_PASSWORD', 'abc@1234'),
  19. 'host': os.getenv('PG_DB_HOST', 'localhost'),
  20. 'port': os.getenv('PG_DB_PORT', '5432')
  21. }
  22. # 连接
  23. self.sqlite_conn = None
  24. self.sqlite_cursor = None
  25. self.pg_conn = None
  26. self.pg_cursor = None
  27. # 表依赖关系
  28. self.table_dependencies = defaultdict(list)
  29. self.reverse_dependencies = defaultdict(list)
  30. self.migration_order = []
  31. # 需要跳过的系统表
  32. self.skip_tables = [
  33. 'auth_permission',
  34. 'django_content_type',
  35. 'django_migrations',
  36. 'auth_group_permissions',
  37. 'auth_user_groups',
  38. 'auth_user_user_permissions'
  39. ]
  40. # 迁移状态
  41. self.migrated_tables = set()
  42. self.failed_tables = set()
  43. # 存储表的主键信息
  44. self.primary_keys = {}
  45. # 存储字段长度限制
  46. self.column_lengths = {}
  47. def connect(self):
  48. """连接到数据库"""
  49. try:
  50. # SQLite 连接
  51. self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
  52. self.sqlite_cursor = self.sqlite_conn.cursor()
  53. # PostgreSQL 连接
  54. self.pg_conn = psycopg2.connect(**self.pg_config)
  55. self.pg_cursor = self.pg_conn.cursor()
  56. print(f"连接成功: SQLite({self.sqlite_db_path}) → PostgreSQL({self.pg_config['dbname']})")
  57. return True
  58. except Exception as e:
  59. print(f"连接失败: {str(e)}")
  60. return False
  61. def close(self):
  62. """关闭数据库连接"""
  63. for cursor in [self.sqlite_cursor, self.pg_cursor]:
  64. if cursor: cursor.close()
  65. for conn in [self.sqlite_conn, self.pg_conn]:
  66. if conn: conn.close()
  67. def clear_postgres_data(self):
  68. """清空 PostgreSQL 数据库中的所有数据"""
  69. print("\n警告: 此操作将清空 PostgreSQL 数据库中的所有数据!")
  70. confirm = input("确定要清空 PostgreSQL 数据库吗? (y/n): ")
  71. if confirm.lower() != 'y':
  72. print("取消清空操作")
  73. return False
  74. try:
  75. # 获取所有表名
  76. self.pg_cursor.execute("""
  77. SELECT table_name
  78. FROM information_schema.tables
  79. WHERE table_schema = 'public'
  80. """)
  81. tables = [row[0] for row in self.pg_cursor.fetchall()]
  82. # 使用 TRUNCATE TABLE ... CASCADE 替代 session_replication_role
  83. for table in tables:
  84. try:
  85. self.pg_cursor.execute(f"TRUNCATE TABLE {table} CASCADE;")
  86. print(f"已清空表: {table}")
  87. except Exception as e:
  88. print(f"清空表 {table} 时出错: {str(e)}")
  89. continue
  90. self.pg_conn.commit()
  91. print("成功清空 PostgreSQL 数据库中的所有数据")
  92. return True
  93. except Exception as e:
  94. self.pg_conn.rollback()
  95. print(f"清空数据库时出错: {str(e)}")
  96. return False
  97. def analyze_dependencies(self):
  98. """分析表依赖关系"""
  99. # 获取 SQLite 中的所有表
  100. self.sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
  101. tables = [row[0] for row in self.sqlite_cursor.fetchall()]
  102. # 排除系统表
  103. excluded_tables = ['sqlite_sequence'] + self.skip_tables
  104. tables = [t for t in tables if t not in excluded_tables]
  105. # 分析外键依赖
  106. for table in tables:
  107. self.sqlite_cursor.execute(f"PRAGMA foreign_key_list({table})")
  108. for fk in self.sqlite_cursor.fetchall():
  109. ref_table = fk[2]
  110. if ref_table in tables:
  111. self.table_dependencies[ref_table].append(table)
  112. self.reverse_dependencies[table].append(ref_table)
  113. # 创建迁移顺序 (拓扑排序)
  114. visited = set()
  115. order = []
  116. def visit(table):
  117. if table in visited:
  118. return
  119. visited.add(table)
  120. for dependent in self.table_dependencies.get(table, []):
  121. visit(dependent)
  122. order.append(table)
  123. for table in tables:
  124. if table not in visited:
  125. visit(table)
  126. self.migration_order = order
  127. print(f"分析完成: 发现 {len(tables)} 个表, 依赖顺序已确定")
  128. def get_pg_table_structure(self, table_name):
  129. """获取 PostgreSQL 表结构"""
  130. try:
  131. self.pg_cursor.execute("""
  132. SELECT column_name, data_type, character_maximum_length
  133. FROM information_schema.columns
  134. WHERE table_name = %s
  135. """, (table_name.lower(),))
  136. structure = {}
  137. for row in self.pg_cursor.fetchall():
  138. column_name, data_type, max_length = row
  139. structure[column_name] = {
  140. 'data_type': data_type,
  141. 'max_length': max_length
  142. }
  143. return structure
  144. except Exception:
  145. return {}
  146. def get_column_lengths(self, table_name):
  147. """获取 PostgreSQL 表的字段长度限制"""
  148. if table_name in self.column_lengths:
  149. return self.column_lengths[table_name]
  150. try:
  151. self.pg_cursor.execute("""
  152. SELECT column_name, character_maximum_length
  153. FROM information_schema.columns
  154. WHERE table_name = %s
  155. """, (table_name.lower(),))
  156. lengths = {}
  157. for row in self.pg_cursor.fetchall():
  158. column_name, max_length = row
  159. if max_length:
  160. lengths[column_name] = max_length
  161. self.column_lengths[table_name] = lengths
  162. return lengths
  163. except Exception:
  164. return {}
  165. def transform_data(self, row, columns, pg_structure):
  166. """根据目标类型转换数据"""
  167. transformed = []
  168. for i, value in enumerate(row):
  169. col_name = columns[i]
  170. col_info = pg_structure.get(col_name.lower(), {})
  171. pg_type = col_info.get('data_type')
  172. max_length = col_info.get('max_length')
  173. # 处理布尔字段转换
  174. if pg_type in ('boolean', 'bool'):
  175. # 将整数0/1转换为布尔值
  176. if value in (0, '0', False):
  177. transformed.append(False)
  178. elif value in (1, '1', True):
  179. transformed.append(True)
  180. else:
  181. # 无法转换的值保持原样(可能会出错)
  182. transformed.append(value)
  183. # 处理字符串长度限制
  184. elif pg_type in ('character varying', 'varchar', 'char', 'text') and max_length and isinstance(value, str):
  185. if len(value) > max_length:
  186. # 截断超过长度的字符串
  187. truncated = value[:max_length]
  188. print(f" 警告: 字段 '{col_name}' 值被截断 ({len(value)} -> {max_length})")
  189. transformed.append(truncated)
  190. else:
  191. transformed.append(value)
  192. else:
  193. transformed.append(value)
  194. return transformed
  195. def get_primary_keys(self, table):
  196. """获取表的主键列"""
  197. if table in self.primary_keys:
  198. return self.primary_keys[table]
  199. # 查询SQLite表的主键
  200. self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
  201. columns_info = self.sqlite_cursor.fetchall()
  202. # 主键列是那些pk值不为0的列
  203. primary_keys = []
  204. for col_info in columns_info:
  205. # col_info格式: (cid, name, type, notnull, dflt_value, pk)
  206. if col_info[5] != 0: # pk列不为0表示是主键
  207. primary_keys.append(col_info[1])
  208. self.primary_keys[table] = primary_keys
  209. return primary_keys
  210. def check_dependencies_migrated(self, table):
  211. """检查依赖的表是否已经成功迁移"""
  212. dependencies = self.reverse_dependencies.get(table, [])
  213. if not dependencies:
  214. return True
  215. print(f" 检查 {table} 的依赖表迁移状态...")
  216. all_migrated = True
  217. for dep_table in dependencies:
  218. if dep_table in self.migrated_tables:
  219. print(f" ✓ {dep_table} 已迁移")
  220. elif dep_table in self.failed_tables:
  221. print(f" ✗ {dep_table} 迁移失败")
  222. all_migrated = False
  223. else:
  224. print(f" ? {dep_table} 尚未迁移")
  225. all_migrated = False
  226. return all_migrated
  227. def migrate_table(self, table):
  228. """迁移单个表"""
  229. print(f"\n准备迁移表: {table}")
  230. # 检查是否需要跳过
  231. if table in self.skip_tables:
  232. print(f" 跳过系统表: {table}")
  233. return True
  234. # 检查依赖表是否已迁移
  235. if not self.check_dependencies_migrated(table):
  236. print(f" 依赖表未完全迁移,暂时跳过 {table}")
  237. return False
  238. # 获取 SQLite 表结构
  239. self.sqlite_cursor.execute(f"PRAGMA table_info({table})")
  240. columns_info = self.sqlite_cursor.fetchall()
  241. columns = [col[1] for col in columns_info]
  242. # 获取主键
  243. primary_keys = self.get_primary_keys(table)
  244. # 获取数据
  245. self.sqlite_cursor.execute(f"SELECT * FROM {table}")
  246. rows = self.sqlite_cursor.fetchall()
  247. if not rows:
  248. print(f" 表 {table} 为空,跳过")
  249. self.migrated_tables.add(table)
  250. return True
  251. # 获取 PostgreSQL 表结构
  252. pg_structure = self.get_pg_table_structure(table)
  253. # 获取字段长度限制
  254. column_lengths = self.get_column_lengths(table)
  255. # 创建 PostgreSQL 插入语句
  256. placeholders = ', '.join(['%s'] * len(columns))
  257. column_names = ', '.join([f'"{col}"' for col in columns])
  258. # 构建插入语句,根据主键是否存在添加ON CONFLICT子句
  259. if primary_keys:
  260. # 使用主键处理冲突
  261. conflict_columns = ', '.join([f'"{pk}"' for pk in primary_keys])
  262. insert_query = sql.SQL("""
  263. INSERT INTO {} ({})
  264. VALUES ({})
  265. ON CONFLICT ({}) DO NOTHING
  266. """).format(
  267. sql.Identifier(table.lower()),
  268. sql.SQL(column_names),
  269. sql.SQL(placeholders),
  270. sql.SQL(conflict_columns)
  271. )
  272. else:
  273. # 没有主键,使用普通插入语句
  274. insert_query = sql.SQL("""
  275. INSERT INTO {} ({})
  276. VALUES ({})
  277. """).format(
  278. sql.Identifier(table.lower()),
  279. sql.SQL(column_names),
  280. sql.SQL(placeholders)
  281. )
  282. # 用户确认
  283. print(f" 表 {table} 有 {len(rows)} 行数据需要迁移")
  284. if primary_keys:
  285. print(f" 使用主键 ({', '.join(primary_keys)}) 处理冲突")
  286. else:
  287. print(" 警告: 表没有主键,可能产生重复数据")
  288. # 显示字段长度限制
  289. if column_lengths:
  290. print(" 字段长度限制:")
  291. for col, max_len in column_lengths.items():
  292. print(f" {col}: {max_len} 字符")
  293. confirm = input(" 按 Enter 键开始迁移,或输入 's' 跳过此表: ")
  294. if confirm.lower() == 's':
  295. print(f" 已跳过表 {table}")
  296. return True
  297. # 迁移数据
  298. try:
  299. success_count = 0
  300. error_count = 0
  301. for row in rows:
  302. try:
  303. # 转换数据类型
  304. transformed_row = self.transform_data(row, columns, pg_structure)
  305. self.pg_cursor.execute(insert_query, transformed_row)
  306. success_count += 1
  307. except psycopg2.IntegrityError as e:
  308. error_count += 1
  309. if 'foreign key constraint' in str(e):
  310. print(f" 外键约束错误: {str(e)}")
  311. else:
  312. print(f" 完整性错误: {str(e)}")
  313. self.pg_conn.rollback()
  314. except Exception as e:
  315. error_count += 1
  316. print(f" 行插入失败: {str(e)}")
  317. self.pg_conn.rollback()
  318. self.pg_conn.commit()
  319. if error_count == 0:
  320. print(f" 成功迁移 {success_count} 行数据")
  321. self.migrated_tables.add(table)
  322. return True
  323. else:
  324. print(f" 部分迁移: {success_count} 行成功, {error_count} 行失败")
  325. self.failed_tables.add(table)
  326. return False
  327. except Exception as e:
  328. self.pg_conn.rollback()
  329. print(f" 迁移表 {table} 时出错: {str(e)}")
  330. self.failed_tables.add(table)
  331. return False
  332. def migrate(self):
  333. """执行迁移"""
  334. if not self.connect():
  335. return False
  336. try:
  337. # 询问是否清空 PostgreSQL 数据
  338. print("\n是否在迁移前清空 PostgreSQL 数据库?")
  339. print("1. 清空所有数据 (确保无重复)")
  340. print("2. 不清空 (可能产生重复数据)")
  341. print("3. 退出")
  342. choice = input("请选择 (1/2/3): ")
  343. if choice == '3':
  344. print("退出迁移")
  345. return False
  346. if choice == '1':
  347. if not self.clear_postgres_data():
  348. print("清空操作失败,退出迁移")
  349. return False
  350. self.analyze_dependencies()
  351. print(f"\n开始迁移 {len(self.migration_order)} 个表")
  352. # 按依赖顺序迁移表
  353. remaining_tables = self.migration_order.copy()
  354. max_attempts = 10 # 最大尝试次数
  355. attempt = 0
  356. while remaining_tables and attempt < max_attempts:
  357. attempt += 1
  358. print(f"\n第 {attempt} 轮迁移尝试")
  359. # 复制列表以便在迭代时修改
  360. tables_to_migrate = remaining_tables.copy()
  361. remaining_tables = []
  362. for table in tables_to_migrate:
  363. success = self.migrate_table(table)
  364. if not success:
  365. remaining_tables.append(table)
  366. if remaining_tables:
  367. print(f"\n本轮迁移后仍有 {len(remaining_tables)} 个表未成功迁移")
  368. # 检查迁移结果
  369. if remaining_tables:
  370. print(f"\n迁移完成,但以下表迁移失败:")
  371. for table in remaining_tables:
  372. print(f" - {table}")
  373. else:
  374. print("\n所有表迁移成功!")
  375. return True
  376. except Exception as e:
  377. print(f"迁移过程中发生错误: {str(e)}")
  378. return False
  379. finally:
  380. self.close()
  381. if __name__ == "__main__":
  382. migrator = SQLiteToPostgresMigrator()
  383. migrator.migrate()