||
- import os
- import subprocess
- import logging
- from datetime import datetime, timedelta, timezone
- from pathlib import Path
- import shutil
- import time
- import psycopg2
- from psycopg2 import sql
- # -------------------------
- # 配置与日志
- # -------------------------
- def setup_logger():
- logger = logging.getLogger("pitr_recovery")
- logger.setLevel(logging.INFO)
- if not logger.handlers:
- log_dir = Path("logs")
- log_dir.mkdir(exist_ok=True)
- fh = logging.FileHandler(log_dir / "postgres_recovery.log", encoding="utf-8")
- ch = logging.StreamHandler()
- fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- fh.setFormatter(fmt)
- ch.setFormatter(fmt)
- logger.addHandler(fh)
- logger.addHandler(ch)
- return logger
- logger = setup_logger()
- # -------------------------
- # 基本路径/服务检测
- # -------------------------
- def get_postgres_data_dir():
- """获取 PostgreSQL 数据目录"""
- possible_paths = [
- Path(r"D:/app/postgresql/data"),
- Path(r"C:/Program Files/PostgreSQL/16/data"),
- Path(r"C:/Program Files/PostgreSQL/15/data"),
- Path(r"C:/Program Files/PostgreSQL/14/data"),
- Path(r"C:/Program Files/PostgreSQL/13/data"),
- Path(r"C:/Program Files/PostgreSQL/12/data"),
- ]
- env_path = os.environ.get('PGDATA')
- if env_path:
- p = Path(env_path)
- if p.exists():
- return p
- for p in possible_paths:
- if p.exists():
- return p
- raise FileNotFoundError("无法找到 PostgreSQL 数据目录,请设置 PGDATA 或检查常见路径。")
- def get_postgres_bin_path():
- """获取 PostgreSQL bin 目录"""
- possible_paths = [
- Path(r"D:/app/postgresql/bin"),
- Path(r"C:/Program Files/PostgreSQL/16/bin"),
- Path(r"C:/Program Files/PostgreSQL/15/bin"),
- Path(r"C:/Program Files/PostgreSQL/14/bin"),
- Path(r"C:/Program Files/PostgreSQL/13/bin"),
- Path(r"C:/Program Files/PostgreSQL/12/bin"),
- ]
- env_path = os.environ.get('PG_BIN_PATH')
- if env_path:
- p = Path(env_path)
- if p.exists():
- return p
- for p in possible_paths:
- if p.exists():
- return p
- # 尝试从 PATH 中查找
- return None
- def get_postgres_service_name():
- """检测 PostgreSQL 服务名称"""
- candidates = [
- "postgresql-x64-16",
- "postgresql-x64-15",
- "postgresql-x64-14",
- "postgresql-x64-13",
- "postgresql-x64-12",
- "postgresql"
- ]
- for svc in candidates:
- try:
- result = subprocess.run(
- ["sc", "query", svc],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
- if "RUNNING" in result.stdout:
- return svc
- except Exception:
- continue
- return candidates[0]
- def is_postgres_service_running():
- """检查 PostgreSQL 服务是否运行"""
- svc = get_postgres_service_name()
- try:
- result = subprocess.run(
- ["sc", "query", svc],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
- return "RUNNING" in result.stdout
- except Exception:
- return False
- def stop_postgres_service():
- """停止 PostgreSQL 服务"""
- svc = get_postgres_service_name()
- logger.info(f"停止 PostgreSQL 服务: {svc}")
-
- # 尝试 net stop
- r = subprocess.run(
- ["net", "stop", svc],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
- if r.returncode == 0:
- logger.info("服务已停止")
- return True
-
- # 尝试 sc stop
- r = subprocess.run(
- ["sc", "stop", svc],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
- if r.returncode == 0:
- logger.info("服务已停止 (sc stop)")
- return True
-
- # 尝试 taskkill
- logger.warning("通过服务接口无法停止,尝试 taskkill 强制结束 postgres.exe")
- subprocess.run(
- ["taskkill", "/F", "/IM", "postgres.exe"],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
-
- # 确认服务已停止
- max_wait = 60 # 最大等待60秒
- for _ in range(max_wait // 5):
- if not is_postgres_service_running():
- logger.info("服务已确认停止")
- return True
- time.sleep(5)
-
- logger.error("服务停止超时")
- return False
- def start_postgres_service():
- """启动 PostgreSQL 服务"""
- svc = get_postgres_service_name()
- logger.info(f"启动 PostgreSQL 服务: {svc}")
- try:
- r = subprocess.run(
- ["net", "start", svc],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
- if r.returncode == 0:
- logger.info("服务已启动")
- return True
-
- r = subprocess.run(
- ["sc", "start", svc],
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
- if r.returncode == 0:
- logger.info("服务已启动 (sc start)")
- return True
-
- logger.error(f"启动服务失败: {r.stderr}")
- return False
- except Exception as e:
- logger.error(f"启动服务异常: {e}")
- return False
- # -------------------------
- # 基础备份(pg_basebackup)
- # -------------------------
- def perform_base_backup(pg_superuser='postgres', pg_password='abc@1234'):
- """执行基础备份"""
- pg_bin = get_postgres_bin_path()
- if not pg_bin:
- raise RuntimeError("无法找到 PostgreSQL bin 目录")
-
- pg_basebackup = pg_bin / "pg_basebackup.exe"
- now = datetime.now()
- dest = Path(f"E:/code/backup/postgres/base_backup/{now.strftime('%Y%m%d_%H%M%S')}")
- dest.mkdir(parents=True, exist_ok=True)
-
- cmd = [
- str(pg_basebackup),
- "-D", str(dest),
- "-F", "p",
- "-X", "f",
- "-P",
- "-U", pg_superuser
- ]
-
- env = os.environ.copy()
- env['PGPASSWORD'] = pg_password
-
- logger.info("开始基础备份: " + " ".join(cmd))
- r = subprocess.run(
- cmd,
- env=env,
- capture_output=True,
- text=True,
- creationflags=subprocess.CREATE_NO_WINDOW
- )
-
- if r.returncode != 0:
- logger.error(r.stderr)
- raise RuntimeError("pg_basebackup 失败: " + r.stderr)
-
- # 验证备份完整性
- if not (dest / "backup_label").exists():
- raise RuntimeError("基础备份不完整,缺少 backup_label 文件")
-
- logger.info(f"基础备份完成: {dest}")
- return str(dest)
- # -------------------------
- # 检查数据库状态
- # -------------------------
- def check_database_status(dbname='postgres', user='postgres', password='abc@1234',
- host='localhost', port=5432):
- """
- 检查数据库状态
- 返回 (is_running, is_in_recovery)
- """
- try:
- conn = psycopg2.connect(
- dbname=dbname,
- user=user,
- password=password,
- host=host,
- port=port,
- connect_timeout=5
- )
- conn.autocommit = True
- cur = conn.cursor()
-
- # 检查是否在恢复模式
- cur.execute("SELECT pg_is_in_recovery()")
- in_recovery = cur.fetchone()[0]
-
- cur.close()
- conn.close()
-
- return True, in_recovery
- except Exception:
- return False, False
- # -------------------------
- # 主恢复函数:恢复到基础备份
- # -------------------------
- def restore_to_base_backup(base_backup_dir):
- """
- 执行基础备份恢复
- base_backup_dir: 基础备份目录路径
- """
- data_dir = get_postgres_data_dir()
- logger.info(f"开始恢复到基础备份: {base_backup_dir}")
- backup_data_dir = None
- try:
- # 1. 停止服务
- if not stop_postgres_service():
- raise RuntimeError("无法停止 PostgreSQL 服务")
- time.sleep(5) # 等待服务完全停止
- # 2. 备份当前数据目录
- backup_data_dir = data_dir.with_name(
- data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")
- )
- logger.info(f"备份当前数据目录到: {backup_data_dir}")
- if backup_data_dir.exists():
- shutil.rmtree(backup_data_dir)
- shutil.copytree(data_dir, backup_data_dir)
- logger.info("备份完成")
- # 3. 安全清空数据目录(保留关键配置文件)
- logger.info("安全清空数据目录...")
- exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}
- for item in data_dir.iterdir():
- if item.name in exclude_files:
- logger.info(f"保留配置文件: {item.name}")
- continue
- try:
- if item.is_dir():
- shutil.rmtree(item)
- else:
- item.unlink()
- except Exception as e:
- logger.warning("删除 %s 失败: %s", item, e)
- logger.info("数据目录已清空")
- # 4. 恢复基础备份
- logger.info("恢复基础备份到数据目录...")
- shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)
- logger.info("基础备份已恢复")
- # 5. 删除恢复相关文件(确保不进入恢复模式)
- for recovery_file in ["recovery.signal", "standby.signal", "recovery.conf"]:
- file_path = data_dir / recovery_file
- if file_path.exists():
- logger.info(f"删除恢复文件: {recovery_file}")
- file_path.unlink()
-
- # 6. 清理postgresql.auto.conf中的恢复设置
- auto_conf = data_dir / "postgresql.auto.conf"
- if auto_conf.exists():
- logger.info("清理postgresql.auto.conf中的恢复设置")
- with open(auto_conf, "r", encoding="utf-8") as f:
- lines = f.readlines()
-
- # 过滤掉恢复相关设置
- new_lines = [
- line for line in lines
- if not line.strip().startswith((
- "restore_command",
- "recovery_target_time",
- "recovery_target_timeline",
- "recovery_target_action"
- ))
- ]
-
- with open(auto_conf, "w", encoding="utf-8") as f:
- f.writelines(new_lines)
-
- # 7. 启动服务
- if not start_postgres_service():
- raise RuntimeError("无法启动 PostgreSQL 服务")
-
- # 8. 验证服务状态
- max_retries = 10
- for i in range(max_retries):
- try:
- is_running, in_recovery = check_database_status()
- if is_running and not in_recovery:
- logger.info("数据库已成功启动,不在恢复模式")
- return True
- time.sleep(2)
- except Exception:
- if i == max_retries - 1:
- logger.error("无法验证数据库状态")
- time.sleep(2)
-
- return True
- except Exception as e:
- logger.error(f"恢复到基础备份失败: {e}")
- # 回滚:恢复原始数据目录
- try:
- if backup_data_dir and backup_data_dir.exists():
- logger.info("开始回滚:恢复原始数据目录...")
- try:
- stop_postgres_service()
- except:
- pass
-
- # 清空当前数据目录
- for item in data_dir.iterdir():
- try:
- if item.is_dir():
- shutil.rmtree(item)
- else:
- item.unlink()
- except Exception as ex:
- logger.warning("删除 %s 失败: %s", item, ex)
-
- # 复制备份回去
- shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)
-
- # 启动服务
- if start_postgres_service():
- logger.info("回滚完成并已启动服务(原始数据已恢复)")
- else:
- logger.error("回滚后启动服务失败,请手动检查")
- except Exception as roll_err:
- logger.error(f"回滚失败: {roll_err}")
- raise
- # -------------------------
- # 辅助:解析时间字符串
- # 下面的代码用不了,直接基础备份恢复
- # -------------------------
- def parse_target_time(t):
- """解析目标时间并转换为 UTC"""
- if isinstance(t, datetime):
- return t.astimezone(timezone.utc)
-
- # 尝试多种格式
- for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
- try:
- dt = datetime.strptime(t, fmt)
- return dt.astimezone(timezone.utc)
- except Exception:
- continue
-
- raise ValueError("无法解析目标时间,请使用 'YYYY-MM-DD HH:MM:SS' 格式")
- # -------------------------
- # 检查恢复进度(通过数据库)
- # -------------------------
- def check_recovery_progress(dbname='postgres', user='postgres', password='abc@1234',
- host='localhost', port=5432, target_dt=None):
- """
- 检查恢复进度
- 返回 (in_recovery_bool, last_replay_timestamp_or_None, meets_target)
- """
- try:
- conn = psycopg2.connect(
- dbname=dbname,
- user=user,
- password=password,
- host=host,
- port=port,
- connect_timeout=5
- )
- conn.autocommit = True
- cur = conn.cursor()
-
- # 检查是否在恢复模式
- cur.execute("SELECT pg_is_in_recovery()")
- in_recovery = cur.fetchone()[0]
-
- last_ts = None
- try:
- # 获取最后回放时间
- cur.execute("SELECT pg_last_xact_replay_timestamp()")
- res = cur.fetchone()[0]
- if res:
- last_ts = res.astimezone(timezone.utc)
- except Exception as e:
- logger.debug("查询 pg_last_xact_replay_timestamp 失败: %s", e)
-
- cur.close()
- conn.close()
-
- # 检查是否达到目标时间
- ok = False
- if target_dt and last_ts:
- # 允许30秒的时间差
- time_diff = (target_dt - last_ts).total_seconds()
- ok = time_diff <= 30
-
- return in_recovery, last_ts, ok
- except Exception as e:
- logger.debug("数据库连接/查询失败: %s", e)
- return None, None, False
- # -------------------------
- # 主恢复函数:PITR 恢复到指定时间点
- # -------------------------
- def restore_to_point_in_time(target_time, base_backup_dir,
- wal_archive_dir=r"E:\code\backup\postgres\wal_archive",
- pg_superuser='postgres', pg_password='abc@1234',
- db_check_name='postgres', db_check_user='postgres',
- db_check_password='abc@1234', db_host='localhost',
- db_port=5432, max_wait_minutes=30):
- """
- 执行时间点恢复 (PITR)
- target_time: 'YYYY-MM-DD HH:MM:SS' 或 datetime
- base_backup_dir: 基础备份目录
- wal_archive_dir: WAL归档目录
- """
- target_dt = parse_target_time(target_time)
- data_dir = get_postgres_data_dir()
- logger.info(f"PITR 开始: 目标时间 {target_dt}, 数据目录: {data_dir}, 基础备份: {base_backup_dir}")
- backup_data_dir = None
- try:
- # 1. 停止服务
- if not stop_postgres_service():
- raise RuntimeError("无法停止 PostgreSQL 服务")
- time.sleep(5) # 等待服务完全停止
- # 2. 备份当前数据目录
- backup_data_dir = data_dir.with_name(
- data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")
- )
- logger.info(f"备份当前数据目录到: {backup_data_dir}")
- if backup_data_dir.exists():
- shutil.rmtree(backup_data_dir)
- shutil.copytree(data_dir, backup_data_dir)
- logger.info("备份完成")
- # 3. 安全清空数据目录(保留关键配置文件)
- logger.info("安全清空数据目录...")
- exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}
- for item in data_dir.iterdir():
- if item.name in exclude_files:
- logger.info(f"保留配置文件: {item.name}")
- continue
- try:
- if item.is_dir():
- shutil.rmtree(item)
- else:
- item.unlink()
- except Exception as e:
- logger.warning("删除 %s 失败: %s", item, e)
- logger.info("数据目录已清空")
- # 4. 恢复基础备份
- logger.info("恢复基础备份到数据目录...")
- shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)
- logger.info("基础备份已恢复")
- # 5. 创建恢复信号文件 (PostgreSQL 12+)
- recovery_signal = data_dir / "recovery.signal"
- logger.info(f"创建恢复信号: {recovery_signal}")
- recovery_signal.touch()
- # 6. 配置恢复参数
- auto_conf = data_dir / "postgresql.auto.conf"
- safe_wal_dir = wal_archive_dir.replace("\\", "/")
- restore_command = f'copy "{wal_archive_dir}\\%f" "%p"'
-
- logger.info(f"写入恢复配置到 {auto_conf}")
- with open(auto_conf, "w", encoding="utf-8") as f:
- f.write(f"restore_command = '{restore_command}'\n")
- f.write(f"recovery_target_time = '{target_dt.isoformat()}'\n")
- f.write("recovery_target_timeline = 'latest'\n")
- logger.info("恢复配置写入完成")
- # 7. 启动服务
- if not start_postgres_service():
- raise RuntimeError("无法启动 PostgreSQL 服务(恢复阶段)")
- logger.info("服务已启动,开始轮询恢复进度...")
- # 8. 等待恢复完成
- timeout = timedelta(minutes=max_wait_minutes)
- start_time = datetime.now(timezone.utc)
- last_logged = None
- recovery_completed = False
- recovery_signal_file = data_dir / "recovery.signal"
- while True:
- elapsed = datetime.now(timezone.utc) - start_time
- if elapsed > timeout:
- raise RuntimeError(f"恢复超时(超过 {max_wait_minutes} 分钟)")
- try:
- in_recovery, last_replay_ts, meets_target = check_recovery_progress(
- dbname=db_check_name,
- user=db_check_user,
- password=db_check_password,
- host=db_host,
- port=db_port,
- target_dt=target_dt
- )
- except Exception as e:
- logger.error(f"检查恢复进度时出错: {e}")
- in_recovery, last_replay_ts, meets_target = None, None, False
- # 可能无法连接(服务刚启动)
- if in_recovery is None:
- logger.info("尚未能连接到数据库,等待 5 秒后重试...")
- time.sleep(5)
- continue
- # 每分钟打印一次恢复进度
- current_time = datetime.now(timezone.utc).strftime("%H:%M:%S")
- if last_logged != current_time:
- if last_replay_ts:
- time_diff = (target_dt - last_replay_ts).total_seconds()
- logger.info(f"恢复进度: {last_replay_ts} (差 {time_diff:.1f} 秒)")
- else:
- logger.info(f"恢复中... pg_is_in_recovery={in_recovery}")
- last_logged = current_time
- # 检查恢复完成条件(PostgreSQL 12+)
- if not in_recovery and not recovery_signal_file.exists():
- logger.info("恢复完成:数据库已退出恢复模式。")
- recovery_completed = True
- break
- elif meets_target:
- logger.info("已满足目标时间条件,等待最终完成...")
- time.sleep(5)
- continue
- # 继续等待
- time.sleep(5)
- if recovery_completed:
- logger.info(f"PITR 成功:数据库已恢复到目标时间 {target_dt}")
- return True
- else:
- logger.warning("恢复过程未正常完成")
- return True
- except Exception as e:
- logger.error(f"PITR 恢复失败: {e}")
- # 回滚:恢复原始数据目录
- try:
- if backup_data_dir and backup_data_dir.exists():
- logger.info("开始回滚:恢复原始数据目录...")
- try:
- stop_postgres_service()
- except:
- pass
-
- # 清空当前数据目录
- for item in data_dir.iterdir():
- try:
- if item.is_dir():
- shutil.rmtree(item)
- else:
- item.unlink()
- except Exception as ex:
- logger.warning("删除 %s 失败: %s", item, ex)
-
- # 复制备份回去
- shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)
-
- # 清理恢复标识文件
- for fn in ("recovery.signal", "postgresql.auto.conf"):
- p = data_dir / fn
- if p.exists():
- try:
- p.unlink()
- except Exception:
- pass
-
- # 启动服务
- if start_postgres_service():
- logger.info("回滚完成并已启动服务(原始数据已恢复)")
- else:
- logger.error("回滚后启动服务失败,请手动检查")
- except Exception as roll_err:
- logger.error(f"回滚失败: {roll_err}")
- raise
|