backup_utils.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. import os
  2. import subprocess
  3. import logging
  4. from datetime import datetime, timedelta, timezone
  5. from pathlib import Path
  6. import shutil
  7. import time
  8. import psycopg2
  9. from psycopg2 import sql
  10. # -------------------------
  11. # 配置与日志
  12. # -------------------------
  13. def setup_logger():
  14. logger = logging.getLogger("pitr_recovery")
  15. logger.setLevel(logging.INFO)
  16. if not logger.handlers:
  17. log_dir = Path("logs")
  18. log_dir.mkdir(exist_ok=True)
  19. fh = logging.FileHandler(log_dir / "postgres_recovery.log", encoding="utf-8")
  20. ch = logging.StreamHandler()
  21. fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  22. fh.setFormatter(fmt)
  23. ch.setFormatter(fmt)
  24. logger.addHandler(fh)
  25. logger.addHandler(ch)
  26. return logger
  27. logger = setup_logger()
  28. # -------------------------
  29. # 基本路径/服务检测
  30. # -------------------------
  31. def get_postgres_data_dir():
  32. """获取 PostgreSQL 数据目录"""
  33. possible_paths = [
  34. Path(r"D:/app/postgresql/data"),
  35. Path(r"C:/Program Files/PostgreSQL/16/data"),
  36. Path(r"C:/Program Files/PostgreSQL/15/data"),
  37. Path(r"C:/Program Files/PostgreSQL/14/data"),
  38. Path(r"C:/Program Files/PostgreSQL/13/data"),
  39. Path(r"C:/Program Files/PostgreSQL/12/data"),
  40. ]
  41. env_path = os.environ.get('PGDATA')
  42. if env_path:
  43. p = Path(env_path)
  44. if p.exists():
  45. return p
  46. for p in possible_paths:
  47. if p.exists():
  48. return p
  49. raise FileNotFoundError("无法找到 PostgreSQL 数据目录,请设置 PGDATA 或检查常见路径。")
  50. def get_postgres_bin_path():
  51. """获取 PostgreSQL bin 目录"""
  52. possible_paths = [
  53. Path(r"D:/app/postgresql/bin"),
  54. Path(r"C:/Program Files/PostgreSQL/16/bin"),
  55. Path(r"C:/Program Files/PostgreSQL/15/bin"),
  56. Path(r"C:/Program Files/PostgreSQL/14/bin"),
  57. Path(r"C:/Program Files/PostgreSQL/13/bin"),
  58. Path(r"C:/Program Files/PostgreSQL/12/bin"),
  59. ]
  60. env_path = os.environ.get('PG_BIN_PATH')
  61. if env_path:
  62. p = Path(env_path)
  63. if p.exists():
  64. return p
  65. for p in possible_paths:
  66. if p.exists():
  67. return p
  68. # 尝试从 PATH 中查找
  69. return None
  70. def get_postgres_service_name():
  71. """检测 PostgreSQL 服务名称"""
  72. candidates = [
  73. "postgresql-x64-16",
  74. "postgresql-x64-15",
  75. "postgresql-x64-14",
  76. "postgresql-x64-13",
  77. "postgresql-x64-12",
  78. "postgresql"
  79. ]
  80. for svc in candidates:
  81. try:
  82. result = subprocess.run(
  83. ["sc", "query", svc],
  84. capture_output=True,
  85. text=True,
  86. creationflags=subprocess.CREATE_NO_WINDOW
  87. )
  88. if "RUNNING" in result.stdout:
  89. return svc
  90. except Exception:
  91. continue
  92. return candidates[0]
  93. def is_postgres_service_running():
  94. """检查 PostgreSQL 服务是否运行"""
  95. svc = get_postgres_service_name()
  96. try:
  97. result = subprocess.run(
  98. ["sc", "query", svc],
  99. capture_output=True,
  100. text=True,
  101. creationflags=subprocess.CREATE_NO_WINDOW
  102. )
  103. return "RUNNING" in result.stdout
  104. except Exception:
  105. return False
  106. def stop_postgres_service():
  107. """停止 PostgreSQL 服务"""
  108. svc = get_postgres_service_name()
  109. logger.info(f"停止 PostgreSQL 服务: {svc}")
  110. # 尝试 net stop
  111. r = subprocess.run(
  112. ["net", "stop", svc],
  113. capture_output=True,
  114. text=True,
  115. creationflags=subprocess.CREATE_NO_WINDOW
  116. )
  117. if r.returncode == 0:
  118. logger.info("服务已停止")
  119. return True
  120. # 尝试 sc stop
  121. r = subprocess.run(
  122. ["sc", "stop", svc],
  123. capture_output=True,
  124. text=True,
  125. creationflags=subprocess.CREATE_NO_WINDOW
  126. )
  127. if r.returncode == 0:
  128. logger.info("服务已停止 (sc stop)")
  129. return True
  130. # 尝试 taskkill
  131. logger.warning("通过服务接口无法停止,尝试 taskkill 强制结束 postgres.exe")
  132. subprocess.run(
  133. ["taskkill", "/F", "/IM", "postgres.exe"],
  134. capture_output=True,
  135. text=True,
  136. creationflags=subprocess.CREATE_NO_WINDOW
  137. )
  138. # 确认服务已停止
  139. max_wait = 60 # 最大等待60秒
  140. for _ in range(max_wait // 5):
  141. if not is_postgres_service_running():
  142. logger.info("服务已确认停止")
  143. return True
  144. time.sleep(5)
  145. logger.error("服务停止超时")
  146. return False
  147. def start_postgres_service():
  148. """启动 PostgreSQL 服务"""
  149. svc = get_postgres_service_name()
  150. logger.info(f"启动 PostgreSQL 服务: {svc}")
  151. try:
  152. r = subprocess.run(
  153. ["net", "start", svc],
  154. capture_output=True,
  155. text=True,
  156. creationflags=subprocess.CREATE_NO_WINDOW
  157. )
  158. if r.returncode == 0:
  159. logger.info("服务已启动")
  160. return True
  161. r = subprocess.run(
  162. ["sc", "start", svc],
  163. capture_output=True,
  164. text=True,
  165. creationflags=subprocess.CREATE_NO_WINDOW
  166. )
  167. if r.returncode == 0:
  168. logger.info("服务已启动 (sc start)")
  169. return True
  170. logger.error(f"启动服务失败: {r.stderr}")
  171. return False
  172. except Exception as e:
  173. logger.error(f"启动服务异常: {e}")
  174. return False
  175. # -------------------------
  176. # 基础备份(pg_basebackup)
  177. # -------------------------
  178. def perform_base_backup(pg_superuser='postgres', pg_password='abc@1234'):
  179. """执行基础备份"""
  180. pg_bin = get_postgres_bin_path()
  181. if not pg_bin:
  182. raise RuntimeError("无法找到 PostgreSQL bin 目录")
  183. pg_basebackup = pg_bin / "pg_basebackup.exe"
  184. now = datetime.now()
  185. dest = Path(f"E:/code/backup/postgres/base_backup/{now.strftime('%Y%m%d_%H%M%S')}")
  186. dest.mkdir(parents=True, exist_ok=True)
  187. cmd = [
  188. str(pg_basebackup),
  189. "-D", str(dest),
  190. "-F", "p",
  191. "-X", "f",
  192. "-P",
  193. "-U", pg_superuser
  194. ]
  195. env = os.environ.copy()
  196. env['PGPASSWORD'] = pg_password
  197. logger.info("开始基础备份: " + " ".join(cmd))
  198. r = subprocess.run(
  199. cmd,
  200. env=env,
  201. capture_output=True,
  202. text=True,
  203. creationflags=subprocess.CREATE_NO_WINDOW
  204. )
  205. if r.returncode != 0:
  206. logger.error(r.stderr)
  207. raise RuntimeError("pg_basebackup 失败: " + r.stderr)
  208. # 验证备份完整性
  209. if not (dest / "backup_label").exists():
  210. raise RuntimeError("基础备份不完整,缺少 backup_label 文件")
  211. logger.info(f"基础备份完成: {dest}")
  212. return str(dest)
  213. # -------------------------
  214. # 检查数据库状态
  215. # -------------------------
  216. def check_database_status(dbname='postgres', user='postgres', password='abc@1234',
  217. host='localhost', port=5432):
  218. """
  219. 检查数据库状态
  220. 返回 (is_running, is_in_recovery)
  221. """
  222. try:
  223. conn = psycopg2.connect(
  224. dbname=dbname,
  225. user=user,
  226. password=password,
  227. host=host,
  228. port=port,
  229. connect_timeout=5
  230. )
  231. conn.autocommit = True
  232. cur = conn.cursor()
  233. # 检查是否在恢复模式
  234. cur.execute("SELECT pg_is_in_recovery()")
  235. in_recovery = cur.fetchone()[0]
  236. cur.close()
  237. conn.close()
  238. return True, in_recovery
  239. except Exception:
  240. return False, False
  241. # -------------------------
  242. # 主恢复函数:恢复到基础备份
  243. # -------------------------
  244. def restore_to_base_backup(base_backup_dir):
  245. """
  246. 执行基础备份恢复
  247. base_backup_dir: 基础备份目录路径
  248. """
  249. data_dir = get_postgres_data_dir()
  250. logger.info(f"开始恢复到基础备份: {base_backup_dir}")
  251. backup_data_dir = None
  252. try:
  253. # 1. 停止服务
  254. if not stop_postgres_service():
  255. raise RuntimeError("无法停止 PostgreSQL 服务")
  256. time.sleep(5) # 等待服务完全停止
  257. # 2. 备份当前数据目录
  258. backup_data_dir = data_dir.with_name(
  259. data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")
  260. )
  261. logger.info(f"备份当前数据目录到: {backup_data_dir}")
  262. if backup_data_dir.exists():
  263. shutil.rmtree(backup_data_dir)
  264. shutil.copytree(data_dir, backup_data_dir)
  265. logger.info("备份完成")
  266. # 3. 安全清空数据目录(保留关键配置文件)
  267. logger.info("安全清空数据目录...")
  268. exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}
  269. for item in data_dir.iterdir():
  270. if item.name in exclude_files:
  271. logger.info(f"保留配置文件: {item.name}")
  272. continue
  273. try:
  274. if item.is_dir():
  275. shutil.rmtree(item)
  276. else:
  277. item.unlink()
  278. except Exception as e:
  279. logger.warning("删除 %s 失败: %s", item, e)
  280. logger.info("数据目录已清空")
  281. # 4. 恢复基础备份
  282. logger.info("恢复基础备份到数据目录...")
  283. shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)
  284. logger.info("基础备份已恢复")
  285. # 5. 删除恢复相关文件(确保不进入恢复模式)
  286. for recovery_file in ["recovery.signal", "standby.signal", "recovery.conf"]:
  287. file_path = data_dir / recovery_file
  288. if file_path.exists():
  289. logger.info(f"删除恢复文件: {recovery_file}")
  290. file_path.unlink()
  291. # 6. 清理postgresql.auto.conf中的恢复设置
  292. auto_conf = data_dir / "postgresql.auto.conf"
  293. if auto_conf.exists():
  294. logger.info("清理postgresql.auto.conf中的恢复设置")
  295. with open(auto_conf, "r", encoding="utf-8") as f:
  296. lines = f.readlines()
  297. # 过滤掉恢复相关设置
  298. new_lines = [
  299. line for line in lines
  300. if not line.strip().startswith((
  301. "restore_command",
  302. "recovery_target_time",
  303. "recovery_target_timeline",
  304. "recovery_target_action"
  305. ))
  306. ]
  307. with open(auto_conf, "w", encoding="utf-8") as f:
  308. f.writelines(new_lines)
  309. # 7. 启动服务
  310. if not start_postgres_service():
  311. raise RuntimeError("无法启动 PostgreSQL 服务")
  312. # 8. 验证服务状态
  313. max_retries = 10
  314. for i in range(max_retries):
  315. try:
  316. is_running, in_recovery = check_database_status()
  317. if is_running and not in_recovery:
  318. logger.info("数据库已成功启动,不在恢复模式")
  319. return True
  320. time.sleep(2)
  321. except Exception:
  322. if i == max_retries - 1:
  323. logger.error("无法验证数据库状态")
  324. time.sleep(2)
  325. return True
  326. except Exception as e:
  327. logger.error(f"恢复到基础备份失败: {e}")
  328. # 回滚:恢复原始数据目录
  329. try:
  330. if backup_data_dir and backup_data_dir.exists():
  331. logger.info("开始回滚:恢复原始数据目录...")
  332. try:
  333. stop_postgres_service()
  334. except:
  335. pass
  336. # 清空当前数据目录
  337. for item in data_dir.iterdir():
  338. try:
  339. if item.is_dir():
  340. shutil.rmtree(item)
  341. else:
  342. item.unlink()
  343. except Exception as ex:
  344. logger.warning("删除 %s 失败: %s", item, ex)
  345. # 复制备份回去
  346. shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)
  347. # 启动服务
  348. if start_postgres_service():
  349. logger.info("回滚完成并已启动服务(原始数据已恢复)")
  350. else:
  351. logger.error("回滚后启动服务失败,请手动检查")
  352. except Exception as roll_err:
  353. logger.error(f"回滚失败: {roll_err}")
  354. raise
  355. # -------------------------
  356. # 辅助:解析时间字符串
  357. # 下面的代码用不了,直接基础备份恢复
  358. # -------------------------
  359. def parse_target_time(t):
  360. """解析目标时间并转换为 UTC"""
  361. if isinstance(t, datetime):
  362. return t.astimezone(timezone.utc)
  363. # 尝试多种格式
  364. for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
  365. try:
  366. dt = datetime.strptime(t, fmt)
  367. return dt.astimezone(timezone.utc)
  368. except Exception:
  369. continue
  370. raise ValueError("无法解析目标时间,请使用 'YYYY-MM-DD HH:MM:SS' 格式")
  371. # -------------------------
  372. # 检查恢复进度(通过数据库)
  373. # -------------------------
  374. def check_recovery_progress(dbname='postgres', user='postgres', password='abc@1234',
  375. host='localhost', port=5432, target_dt=None):
  376. """
  377. 检查恢复进度
  378. 返回 (in_recovery_bool, last_replay_timestamp_or_None, meets_target)
  379. """
  380. try:
  381. conn = psycopg2.connect(
  382. dbname=dbname,
  383. user=user,
  384. password=password,
  385. host=host,
  386. port=port,
  387. connect_timeout=5
  388. )
  389. conn.autocommit = True
  390. cur = conn.cursor()
  391. # 检查是否在恢复模式
  392. cur.execute("SELECT pg_is_in_recovery()")
  393. in_recovery = cur.fetchone()[0]
  394. last_ts = None
  395. try:
  396. # 获取最后回放时间
  397. cur.execute("SELECT pg_last_xact_replay_timestamp()")
  398. res = cur.fetchone()[0]
  399. if res:
  400. last_ts = res.astimezone(timezone.utc)
  401. except Exception as e:
  402. logger.debug("查询 pg_last_xact_replay_timestamp 失败: %s", e)
  403. cur.close()
  404. conn.close()
  405. # 检查是否达到目标时间
  406. ok = False
  407. if target_dt and last_ts:
  408. # 允许30秒的时间差
  409. time_diff = (target_dt - last_ts).total_seconds()
  410. ok = time_diff <= 30
  411. return in_recovery, last_ts, ok
  412. except Exception as e:
  413. logger.debug("数据库连接/查询失败: %s", e)
  414. return None, None, False
  415. # -------------------------
  416. # 主恢复函数:PITR 恢复到指定时间点
  417. # -------------------------
  418. def restore_to_point_in_time(target_time, base_backup_dir,
  419. wal_archive_dir=r"E:\code\backup\postgres\wal_archive",
  420. pg_superuser='postgres', pg_password='abc@1234',
  421. db_check_name='postgres', db_check_user='postgres',
  422. db_check_password='abc@1234', db_host='localhost',
  423. db_port=5432, max_wait_minutes=30):
  424. """
  425. 执行时间点恢复 (PITR)
  426. target_time: 'YYYY-MM-DD HH:MM:SS' 或 datetime
  427. base_backup_dir: 基础备份目录
  428. wal_archive_dir: WAL归档目录
  429. """
  430. target_dt = parse_target_time(target_time)
  431. data_dir = get_postgres_data_dir()
  432. logger.info(f"PITR 开始: 目标时间 {target_dt}, 数据目录: {data_dir}, 基础备份: {base_backup_dir}")
  433. backup_data_dir = None
  434. try:
  435. # 1. 停止服务
  436. if not stop_postgres_service():
  437. raise RuntimeError("无法停止 PostgreSQL 服务")
  438. time.sleep(5) # 等待服务完全停止
  439. # 2. 备份当前数据目录
  440. backup_data_dir = data_dir.with_name(
  441. data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")
  442. )
  443. logger.info(f"备份当前数据目录到: {backup_data_dir}")
  444. if backup_data_dir.exists():
  445. shutil.rmtree(backup_data_dir)
  446. shutil.copytree(data_dir, backup_data_dir)
  447. logger.info("备份完成")
  448. # 3. 安全清空数据目录(保留关键配置文件)
  449. logger.info("安全清空数据目录...")
  450. exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}
  451. for item in data_dir.iterdir():
  452. if item.name in exclude_files:
  453. logger.info(f"保留配置文件: {item.name}")
  454. continue
  455. try:
  456. if item.is_dir():
  457. shutil.rmtree(item)
  458. else:
  459. item.unlink()
  460. except Exception as e:
  461. logger.warning("删除 %s 失败: %s", item, e)
  462. logger.info("数据目录已清空")
  463. # 4. 恢复基础备份
  464. logger.info("恢复基础备份到数据目录...")
  465. shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)
  466. logger.info("基础备份已恢复")
  467. # 5. 创建恢复信号文件 (PostgreSQL 12+)
  468. recovery_signal = data_dir / "recovery.signal"
  469. logger.info(f"创建恢复信号: {recovery_signal}")
  470. recovery_signal.touch()
  471. # 6. 配置恢复参数
  472. auto_conf = data_dir / "postgresql.auto.conf"
  473. safe_wal_dir = wal_archive_dir.replace("\\", "/")
  474. restore_command = f'copy "{wal_archive_dir}\\%f" "%p"'
  475. logger.info(f"写入恢复配置到 {auto_conf}")
  476. with open(auto_conf, "w", encoding="utf-8") as f:
  477. f.write(f"restore_command = '{restore_command}'\n")
  478. f.write(f"recovery_target_time = '{target_dt.isoformat()}'\n")
  479. f.write("recovery_target_timeline = 'latest'\n")
  480. logger.info("恢复配置写入完成")
  481. # 7. 启动服务
  482. if not start_postgres_service():
  483. raise RuntimeError("无法启动 PostgreSQL 服务(恢复阶段)")
  484. logger.info("服务已启动,开始轮询恢复进度...")
  485. # 8. 等待恢复完成
  486. timeout = timedelta(minutes=max_wait_minutes)
  487. start_time = datetime.now(timezone.utc)
  488. last_logged = None
  489. recovery_completed = False
  490. recovery_signal_file = data_dir / "recovery.signal"
  491. while True:
  492. elapsed = datetime.now(timezone.utc) - start_time
  493. if elapsed > timeout:
  494. raise RuntimeError(f"恢复超时(超过 {max_wait_minutes} 分钟)")
  495. try:
  496. in_recovery, last_replay_ts, meets_target = check_recovery_progress(
  497. dbname=db_check_name,
  498. user=db_check_user,
  499. password=db_check_password,
  500. host=db_host,
  501. port=db_port,
  502. target_dt=target_dt
  503. )
  504. except Exception as e:
  505. logger.error(f"检查恢复进度时出错: {e}")
  506. in_recovery, last_replay_ts, meets_target = None, None, False
  507. # 可能无法连接(服务刚启动)
  508. if in_recovery is None:
  509. logger.info("尚未能连接到数据库,等待 5 秒后重试...")
  510. time.sleep(5)
  511. continue
  512. # 每分钟打印一次恢复进度
  513. current_time = datetime.now(timezone.utc).strftime("%H:%M:%S")
  514. if last_logged != current_time:
  515. if last_replay_ts:
  516. time_diff = (target_dt - last_replay_ts).total_seconds()
  517. logger.info(f"恢复进度: {last_replay_ts} (差 {time_diff:.1f} 秒)")
  518. else:
  519. logger.info(f"恢复中... pg_is_in_recovery={in_recovery}")
  520. last_logged = current_time
  521. # 检查恢复完成条件(PostgreSQL 12+)
  522. if not in_recovery and not recovery_signal_file.exists():
  523. logger.info("恢复完成:数据库已退出恢复模式。")
  524. recovery_completed = True
  525. break
  526. elif meets_target:
  527. logger.info("已满足目标时间条件,等待最终完成...")
  528. time.sleep(5)
  529. continue
  530. # 继续等待
  531. time.sleep(5)
  532. if recovery_completed:
  533. logger.info(f"PITR 成功:数据库已恢复到目标时间 {target_dt}")
  534. return True
  535. else:
  536. logger.warning("恢复过程未正常完成")
  537. return True
  538. except Exception as e:
  539. logger.error(f"PITR 恢复失败: {e}")
  540. # 回滚:恢复原始数据目录
  541. try:
  542. if backup_data_dir and backup_data_dir.exists():
  543. logger.info("开始回滚:恢复原始数据目录...")
  544. try:
  545. stop_postgres_service()
  546. except:
  547. pass
  548. # 清空当前数据目录
  549. for item in data_dir.iterdir():
  550. try:
  551. if item.is_dir():
  552. shutil.rmtree(item)
  553. else:
  554. item.unlink()
  555. except Exception as ex:
  556. logger.warning("删除 %s 失败: %s", item, ex)
  557. # 复制备份回去
  558. shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)
  559. # 清理恢复标识文件
  560. for fn in ("recovery.signal", "postgresql.auto.conf"):
  561. p = data_dir / fn
  562. if p.exists():
  563. try:
  564. p.unlink()
  565. except Exception:
  566. pass
  567. # 启动服务
  568. if start_postgres_service():
  569. logger.info("回滚完成并已启动服务(原始数据已恢复)")
  570. else:
  571. logger.error("回滚后启动服务失败,请手动检查")
  572. except Exception as roll_err:
  573. logger.error(f"回滚失败: {roll_err}")
  574. raise