import os import subprocess import logging from datetime import datetime, timedelta, timezone from pathlib import Path import shutil import time import psycopg2 from psycopg2 import sql # ------------------------- # 配置与日志 # ------------------------- def setup_logger(): logger = logging.getLogger("pitr_recovery") logger.setLevel(logging.INFO) if not logger.handlers: log_dir = Path("logs") log_dir.mkdir(exist_ok=True) fh = logging.FileHandler(log_dir / "postgres_recovery.log", encoding="utf-8") ch = logging.StreamHandler() fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') fh.setFormatter(fmt) ch.setFormatter(fmt) logger.addHandler(fh) logger.addHandler(ch) return logger logger = setup_logger() # ------------------------- # 基本路径/服务检测 # ------------------------- def get_postgres_data_dir(): """获取 PostgreSQL 数据目录""" possible_paths = [ Path(r"D:/app/postgresql/data"), Path(r"C:/Program Files/PostgreSQL/16/data"), Path(r"C:/Program Files/PostgreSQL/15/data"), Path(r"C:/Program Files/PostgreSQL/14/data"), Path(r"C:/Program Files/PostgreSQL/13/data"), Path(r"C:/Program Files/PostgreSQL/12/data"), ] env_path = os.environ.get('PGDATA') if env_path: p = Path(env_path) if p.exists(): return p for p in possible_paths: if p.exists(): return p raise FileNotFoundError("无法找到 PostgreSQL 数据目录,请设置 PGDATA 或检查常见路径。") def get_postgres_bin_path(): """获取 PostgreSQL bin 目录""" possible_paths = [ Path(r"D:/app/postgresql/bin"), Path(r"C:/Program Files/PostgreSQL/16/bin"), Path(r"C:/Program Files/PostgreSQL/15/bin"), Path(r"C:/Program Files/PostgreSQL/14/bin"), Path(r"C:/Program Files/PostgreSQL/13/bin"), Path(r"C:/Program Files/PostgreSQL/12/bin"), ] env_path = os.environ.get('PG_BIN_PATH') if env_path: p = Path(env_path) if p.exists(): return p for p in possible_paths: if p.exists(): return p # 尝试从 PATH 中查找 return None def get_postgres_service_name(): """检测 PostgreSQL 服务名称""" candidates = [ "postgresql-x64-16", "postgresql-x64-15", "postgresql-x64-14", "postgresql-x64-13", "postgresql-x64-12", "postgresql" ] for svc in candidates: try: result = subprocess.run( ["sc", "query", svc], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) if "RUNNING" in result.stdout: return svc except Exception: continue return candidates[0] def is_postgres_service_running(): """检查 PostgreSQL 服务是否运行""" svc = get_postgres_service_name() try: result = subprocess.run( ["sc", "query", svc], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) return "RUNNING" in result.stdout except Exception: return False def stop_postgres_service(): """停止 PostgreSQL 服务""" svc = get_postgres_service_name() logger.info(f"停止 PostgreSQL 服务: {svc}") # 尝试 net stop r = subprocess.run( ["net", "stop", svc], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) if r.returncode == 0: logger.info("服务已停止") return True # 尝试 sc stop r = subprocess.run( ["sc", "stop", svc], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) if r.returncode == 0: logger.info("服务已停止 (sc stop)") return True # 尝试 taskkill logger.warning("通过服务接口无法停止,尝试 taskkill 强制结束 postgres.exe") subprocess.run( ["taskkill", "/F", "/IM", "postgres.exe"], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) # 确认服务已停止 max_wait = 60 # 最大等待60秒 for _ in range(max_wait // 5): if not is_postgres_service_running(): logger.info("服务已确认停止") return True time.sleep(5) logger.error("服务停止超时") return False def start_postgres_service(): """启动 PostgreSQL 服务""" svc = get_postgres_service_name() logger.info(f"启动 PostgreSQL 服务: {svc}") try: r = subprocess.run( ["net", "start", svc], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) if r.returncode == 0: logger.info("服务已启动") return True r = subprocess.run( ["sc", "start", svc], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) if r.returncode == 0: logger.info("服务已启动 (sc start)") return True logger.error(f"启动服务失败: {r.stderr}") return False except Exception as e: logger.error(f"启动服务异常: {e}") return False # ------------------------- # 基础备份(pg_basebackup) # ------------------------- def perform_base_backup(pg_superuser='postgres', pg_password='abc@1234'): """执行基础备份""" pg_bin = get_postgres_bin_path() if not pg_bin: raise RuntimeError("无法找到 PostgreSQL bin 目录") pg_basebackup = pg_bin / "pg_basebackup.exe" now = datetime.now() dest = Path(f"E:/code/backup/postgres/base_backup/{now.strftime('%Y%m%d_%H%M%S')}") dest.mkdir(parents=True, exist_ok=True) cmd = [ str(pg_basebackup), "-D", str(dest), "-F", "p", "-X", "f", "-P", "-U", pg_superuser ] env = os.environ.copy() env['PGPASSWORD'] = pg_password logger.info("开始基础备份: " + " ".join(cmd)) r = subprocess.run( cmd, env=env, capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW ) if r.returncode != 0: logger.error(r.stderr) raise RuntimeError("pg_basebackup 失败: " + r.stderr) # 验证备份完整性 if not (dest / "backup_label").exists(): raise RuntimeError("基础备份不完整,缺少 backup_label 文件") logger.info(f"基础备份完成: {dest}") return str(dest) # ------------------------- # 检查数据库状态 # ------------------------- def check_database_status(dbname='postgres', user='postgres', password='abc@1234', host='localhost', port=5432): """ 检查数据库状态 返回 (is_running, is_in_recovery) """ try: conn = psycopg2.connect( dbname=dbname, user=user, password=password, host=host, port=port, connect_timeout=5 ) conn.autocommit = True cur = conn.cursor() # 检查是否在恢复模式 cur.execute("SELECT pg_is_in_recovery()") in_recovery = cur.fetchone()[0] cur.close() conn.close() return True, in_recovery except Exception: return False, False # ------------------------- # 主恢复函数:恢复到基础备份 # ------------------------- def restore_to_base_backup(base_backup_dir): """ 执行基础备份恢复 base_backup_dir: 基础备份目录路径 """ data_dir = get_postgres_data_dir() logger.info(f"开始恢复到基础备份: {base_backup_dir}") backup_data_dir = None try: # 1. 停止服务 if not stop_postgres_service(): raise RuntimeError("无法停止 PostgreSQL 服务") time.sleep(5) # 等待服务完全停止 # 2. 备份当前数据目录 backup_data_dir = data_dir.with_name( data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S") ) logger.info(f"备份当前数据目录到: {backup_data_dir}") if backup_data_dir.exists(): shutil.rmtree(backup_data_dir) shutil.copytree(data_dir, backup_data_dir) logger.info("备份完成") # 3. 安全清空数据目录(保留关键配置文件) logger.info("安全清空数据目录...") exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"} for item in data_dir.iterdir(): if item.name in exclude_files: logger.info(f"保留配置文件: {item.name}") continue try: if item.is_dir(): shutil.rmtree(item) else: item.unlink() except Exception as e: logger.warning("删除 %s 失败: %s", item, e) logger.info("数据目录已清空") # 4. 恢复基础备份 logger.info("恢复基础备份到数据目录...") shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True) logger.info("基础备份已恢复") # 5. 删除恢复相关文件(确保不进入恢复模式) for recovery_file in ["recovery.signal", "standby.signal", "recovery.conf"]: file_path = data_dir / recovery_file if file_path.exists(): logger.info(f"删除恢复文件: {recovery_file}") file_path.unlink() # 6. 清理postgresql.auto.conf中的恢复设置 auto_conf = data_dir / "postgresql.auto.conf" if auto_conf.exists(): logger.info("清理postgresql.auto.conf中的恢复设置") with open(auto_conf, "r", encoding="utf-8") as f: lines = f.readlines() # 过滤掉恢复相关设置 new_lines = [ line for line in lines if not line.strip().startswith(( "restore_command", "recovery_target_time", "recovery_target_timeline", "recovery_target_action" )) ] with open(auto_conf, "w", encoding="utf-8") as f: f.writelines(new_lines) # 7. 启动服务 if not start_postgres_service(): raise RuntimeError("无法启动 PostgreSQL 服务") # 8. 验证服务状态 max_retries = 10 for i in range(max_retries): try: is_running, in_recovery = check_database_status() if is_running and not in_recovery: logger.info("数据库已成功启动,不在恢复模式") return True time.sleep(2) except Exception: if i == max_retries - 1: logger.error("无法验证数据库状态") time.sleep(2) return True except Exception as e: logger.error(f"恢复到基础备份失败: {e}") # 回滚:恢复原始数据目录 try: if backup_data_dir and backup_data_dir.exists(): logger.info("开始回滚:恢复原始数据目录...") try: stop_postgres_service() except: pass # 清空当前数据目录 for item in data_dir.iterdir(): try: if item.is_dir(): shutil.rmtree(item) else: item.unlink() except Exception as ex: logger.warning("删除 %s 失败: %s", item, ex) # 复制备份回去 shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True) # 启动服务 if start_postgres_service(): logger.info("回滚完成并已启动服务(原始数据已恢复)") else: logger.error("回滚后启动服务失败,请手动检查") except Exception as roll_err: logger.error(f"回滚失败: {roll_err}") raise # ------------------------- # 辅助:解析时间字符串 # 下面的代码用不了,直接基础备份恢复 # ------------------------- def parse_target_time(t): """解析目标时间并转换为 UTC""" if isinstance(t, datetime): return t.astimezone(timezone.utc) # 尝试多种格式 for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"): try: dt = datetime.strptime(t, fmt) return dt.astimezone(timezone.utc) except Exception: continue raise ValueError("无法解析目标时间,请使用 'YYYY-MM-DD HH:MM:SS' 格式") # ------------------------- # 检查恢复进度(通过数据库) # ------------------------- def check_recovery_progress(dbname='postgres', user='postgres', password='abc@1234', host='localhost', port=5432, target_dt=None): """ 检查恢复进度 返回 (in_recovery_bool, last_replay_timestamp_or_None, meets_target) """ try: conn = psycopg2.connect( dbname=dbname, user=user, password=password, host=host, port=port, connect_timeout=5 ) conn.autocommit = True cur = conn.cursor() # 检查是否在恢复模式 cur.execute("SELECT pg_is_in_recovery()") in_recovery = cur.fetchone()[0] last_ts = None try: # 获取最后回放时间 cur.execute("SELECT pg_last_xact_replay_timestamp()") res = cur.fetchone()[0] if res: last_ts = res.astimezone(timezone.utc) except Exception as e: logger.debug("查询 pg_last_xact_replay_timestamp 失败: %s", e) cur.close() conn.close() # 检查是否达到目标时间 ok = False if target_dt and last_ts: # 允许30秒的时间差 time_diff = (target_dt - last_ts).total_seconds() ok = time_diff <= 30 return in_recovery, last_ts, ok except Exception as e: logger.debug("数据库连接/查询失败: %s", e) return None, None, False # ------------------------- # 主恢复函数:PITR 恢复到指定时间点 # ------------------------- def restore_to_point_in_time(target_time, base_backup_dir, wal_archive_dir=r"E:\code\backup\postgres\wal_archive", pg_superuser='postgres', pg_password='abc@1234', db_check_name='postgres', db_check_user='postgres', db_check_password='abc@1234', db_host='localhost', db_port=5432, max_wait_minutes=30): """ 执行时间点恢复 (PITR) target_time: 'YYYY-MM-DD HH:MM:SS' 或 datetime base_backup_dir: 基础备份目录 wal_archive_dir: WAL归档目录 """ target_dt = parse_target_time(target_time) data_dir = get_postgres_data_dir() logger.info(f"PITR 开始: 目标时间 {target_dt}, 数据目录: {data_dir}, 基础备份: {base_backup_dir}") backup_data_dir = None try: # 1. 停止服务 if not stop_postgres_service(): raise RuntimeError("无法停止 PostgreSQL 服务") time.sleep(5) # 等待服务完全停止 # 2. 备份当前数据目录 backup_data_dir = data_dir.with_name( data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S") ) logger.info(f"备份当前数据目录到: {backup_data_dir}") if backup_data_dir.exists(): shutil.rmtree(backup_data_dir) shutil.copytree(data_dir, backup_data_dir) logger.info("备份完成") # 3. 安全清空数据目录(保留关键配置文件) logger.info("安全清空数据目录...") exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"} for item in data_dir.iterdir(): if item.name in exclude_files: logger.info(f"保留配置文件: {item.name}") continue try: if item.is_dir(): shutil.rmtree(item) else: item.unlink() except Exception as e: logger.warning("删除 %s 失败: %s", item, e) logger.info("数据目录已清空") # 4. 恢复基础备份 logger.info("恢复基础备份到数据目录...") shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True) logger.info("基础备份已恢复") # 5. 创建恢复信号文件 (PostgreSQL 12+) recovery_signal = data_dir / "recovery.signal" logger.info(f"创建恢复信号: {recovery_signal}") recovery_signal.touch() # 6. 配置恢复参数 auto_conf = data_dir / "postgresql.auto.conf" safe_wal_dir = wal_archive_dir.replace("\\", "/") restore_command = f'copy "{wal_archive_dir}\\%f" "%p"' logger.info(f"写入恢复配置到 {auto_conf}") with open(auto_conf, "w", encoding="utf-8") as f: f.write(f"restore_command = '{restore_command}'\n") f.write(f"recovery_target_time = '{target_dt.isoformat()}'\n") f.write("recovery_target_timeline = 'latest'\n") logger.info("恢复配置写入完成") # 7. 启动服务 if not start_postgres_service(): raise RuntimeError("无法启动 PostgreSQL 服务(恢复阶段)") logger.info("服务已启动,开始轮询恢复进度...") # 8. 等待恢复完成 timeout = timedelta(minutes=max_wait_minutes) start_time = datetime.now(timezone.utc) last_logged = None recovery_completed = False recovery_signal_file = data_dir / "recovery.signal" while True: elapsed = datetime.now(timezone.utc) - start_time if elapsed > timeout: raise RuntimeError(f"恢复超时(超过 {max_wait_minutes} 分钟)") try: in_recovery, last_replay_ts, meets_target = check_recovery_progress( dbname=db_check_name, user=db_check_user, password=db_check_password, host=db_host, port=db_port, target_dt=target_dt ) except Exception as e: logger.error(f"检查恢复进度时出错: {e}") in_recovery, last_replay_ts, meets_target = None, None, False # 可能无法连接(服务刚启动) if in_recovery is None: logger.info("尚未能连接到数据库,等待 5 秒后重试...") time.sleep(5) continue # 每分钟打印一次恢复进度 current_time = datetime.now(timezone.utc).strftime("%H:%M:%S") if last_logged != current_time: if last_replay_ts: time_diff = (target_dt - last_replay_ts).total_seconds() logger.info(f"恢复进度: {last_replay_ts} (差 {time_diff:.1f} 秒)") else: logger.info(f"恢复中... pg_is_in_recovery={in_recovery}") last_logged = current_time # 检查恢复完成条件(PostgreSQL 12+) if not in_recovery and not recovery_signal_file.exists(): logger.info("恢复完成:数据库已退出恢复模式。") recovery_completed = True break elif meets_target: logger.info("已满足目标时间条件,等待最终完成...") time.sleep(5) continue # 继续等待 time.sleep(5) if recovery_completed: logger.info(f"PITR 成功:数据库已恢复到目标时间 {target_dt}") return True else: logger.warning("恢复过程未正常完成") return True except Exception as e: logger.error(f"PITR 恢复失败: {e}") # 回滚:恢复原始数据目录 try: if backup_data_dir and backup_data_dir.exists(): logger.info("开始回滚:恢复原始数据目录...") try: stop_postgres_service() except: pass # 清空当前数据目录 for item in data_dir.iterdir(): try: if item.is_dir(): shutil.rmtree(item) else: item.unlink() except Exception as ex: logger.warning("删除 %s 失败: %s", item, ex) # 复制备份回去 shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True) # 清理恢复标识文件 for fn in ("recovery.signal", "postgresql.auto.conf"): p = data_dir / fn if p.exists(): try: p.unlink() except Exception: pass # 启动服务 if start_postgres_service(): logger.info("回滚完成并已启动服务(原始数据已恢复)") else: logger.error("回滚后启动服务失败,请手动检查") except Exception as roll_err: logger.error(f"回滚失败: {roll_err}") raise