|
|
@@ -1,47 +1,649 @@
|
|
|
-# 数据库备份核心模块:backup_utils.py
|
|
|
import os
|
|
|
-import shutil
|
|
|
-from datetime import datetime
|
|
|
-from django.conf import settings
|
|
|
-import sqlite3
|
|
|
-from pathlib import Path
|
|
|
+import subprocess
|
|
|
import logging
|
|
|
+from datetime import datetime, timedelta, timezone
|
|
|
+from pathlib import Path
|
|
|
+import shutil
|
|
|
+import time
|
|
|
+import psycopg2
|
|
|
+from psycopg2 import sql
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 配置与日志
|
|
|
+# -------------------------
|
|
|
+def setup_logger():
|
|
|
+ logger = logging.getLogger("pitr_recovery")
|
|
|
+ logger.setLevel(logging.INFO)
|
|
|
+ if not logger.handlers:
|
|
|
+ log_dir = Path("logs")
|
|
|
+ log_dir.mkdir(exist_ok=True)
|
|
|
+ fh = logging.FileHandler(log_dir / "postgres_recovery.log", encoding="utf-8")
|
|
|
+ ch = logging.StreamHandler()
|
|
|
+ fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
|
+ fh.setFormatter(fmt)
|
|
|
+ ch.setFormatter(fmt)
|
|
|
+ logger.addHandler(fh)
|
|
|
+ logger.addHandler(ch)
|
|
|
+ return logger
|
|
|
+
|
|
|
+logger = setup_logger()
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 基本路径/服务检测
|
|
|
+# -------------------------
|
|
|
+def get_postgres_data_dir():
|
|
|
+ """获取 PostgreSQL 数据目录"""
|
|
|
+ possible_paths = [
|
|
|
+ Path(r"D:/app/postgresql/data"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/16/data"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/15/data"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/14/data"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/13/data"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/12/data"),
|
|
|
+ ]
|
|
|
+ env_path = os.environ.get('PGDATA')
|
|
|
+ if env_path:
|
|
|
+ p = Path(env_path)
|
|
|
+ if p.exists():
|
|
|
+ return p
|
|
|
+ for p in possible_paths:
|
|
|
+ if p.exists():
|
|
|
+ return p
|
|
|
+ raise FileNotFoundError("无法找到 PostgreSQL 数据目录,请设置 PGDATA 或检查常见路径。")
|
|
|
+
|
|
|
+def get_postgres_bin_path():
|
|
|
+ """获取 PostgreSQL bin 目录"""
|
|
|
+ possible_paths = [
|
|
|
+ Path(r"D:/app/postgresql/bin"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/16/bin"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/15/bin"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/14/bin"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/13/bin"),
|
|
|
+ Path(r"C:/Program Files/PostgreSQL/12/bin"),
|
|
|
+ ]
|
|
|
+ env_path = os.environ.get('PG_BIN_PATH')
|
|
|
+ if env_path:
|
|
|
+ p = Path(env_path)
|
|
|
+ if p.exists():
|
|
|
+ return p
|
|
|
+ for p in possible_paths:
|
|
|
+ if p.exists():
|
|
|
+ return p
|
|
|
+ # 尝试从 PATH 中查找
|
|
|
+ return None
|
|
|
+
|
|
|
+def get_postgres_service_name():
|
|
|
+ """检测 PostgreSQL 服务名称"""
|
|
|
+ candidates = [
|
|
|
+ "postgresql-x64-16",
|
|
|
+ "postgresql-x64-15",
|
|
|
+ "postgresql-x64-14",
|
|
|
+ "postgresql-x64-13",
|
|
|
+ "postgresql-x64-12",
|
|
|
+ "postgresql"
|
|
|
+ ]
|
|
|
+ for svc in candidates:
|
|
|
+ try:
|
|
|
+ result = subprocess.run(
|
|
|
+ ["sc", "query", svc],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+ if "RUNNING" in result.stdout:
|
|
|
+ return svc
|
|
|
+ except Exception:
|
|
|
+ continue
|
|
|
+ return candidates[0]
|
|
|
+
|
|
|
+def is_postgres_service_running():
|
|
|
+ """检查 PostgreSQL 服务是否运行"""
|
|
|
+ svc = get_postgres_service_name()
|
|
|
+ try:
|
|
|
+ result = subprocess.run(
|
|
|
+ ["sc", "query", svc],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+ return "RUNNING" in result.stdout
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+def stop_postgres_service():
|
|
|
+ """停止 PostgreSQL 服务"""
|
|
|
+ svc = get_postgres_service_name()
|
|
|
+ logger.info(f"停止 PostgreSQL 服务: {svc}")
|
|
|
+
|
|
|
+ # 尝试 net stop
|
|
|
+ r = subprocess.run(
|
|
|
+ ["net", "stop", svc],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+ if r.returncode == 0:
|
|
|
+ logger.info("服务已停止")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 尝试 sc stop
|
|
|
+ r = subprocess.run(
|
|
|
+ ["sc", "stop", svc],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+ if r.returncode == 0:
|
|
|
+ logger.info("服务已停止 (sc stop)")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 尝试 taskkill
|
|
|
+ logger.warning("通过服务接口无法停止,尝试 taskkill 强制结束 postgres.exe")
|
|
|
+ subprocess.run(
|
|
|
+ ["taskkill", "/F", "/IM", "postgres.exe"],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+
|
|
|
+ # 确认服务已停止
|
|
|
+ max_wait = 60 # 最大等待60秒
|
|
|
+ for _ in range(max_wait // 5):
|
|
|
+ if not is_postgres_service_running():
|
|
|
+ logger.info("服务已确认停止")
|
|
|
+ return True
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ logger.error("服务停止超时")
|
|
|
+ return False
|
|
|
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
+def start_postgres_service():
|
|
|
+ """启动 PostgreSQL 服务"""
|
|
|
+ svc = get_postgres_service_name()
|
|
|
+ logger.info(f"启动 PostgreSQL 服务: {svc}")
|
|
|
+ try:
|
|
|
+ r = subprocess.run(
|
|
|
+ ["net", "start", svc],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+ if r.returncode == 0:
|
|
|
+ logger.info("服务已启动")
|
|
|
+ return True
|
|
|
+
|
|
|
+ r = subprocess.run(
|
|
|
+ ["sc", "start", svc],
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+ if r.returncode == 0:
|
|
|
+ logger.info("服务已启动 (sc start)")
|
|
|
+ return True
|
|
|
+
|
|
|
+ logger.error(f"启动服务失败: {r.stderr}")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"启动服务异常: {e}")
|
|
|
+ return False
|
|
|
|
|
|
-def backup_database():
|
|
|
- """执行数据库备份,返回备份路径"""
|
|
|
+# -------------------------
|
|
|
+# 基础备份(pg_basebackup)
|
|
|
+# -------------------------
|
|
|
+def perform_base_backup(pg_superuser='postgres', pg_password='zhanglei'):
|
|
|
+ """执行基础备份"""
|
|
|
+ pg_bin = get_postgres_bin_path()
|
|
|
+ if not pg_bin:
|
|
|
+ raise RuntimeError("无法找到 PostgreSQL bin 目录")
|
|
|
+
|
|
|
+ pg_basebackup = pg_bin / "pg_basebackup.exe"
|
|
|
+ now = datetime.now()
|
|
|
+ dest = Path(f"E:/code/backup/postgres/base_backup/{now.strftime('%Y%m%d_%H%M%S')}")
|
|
|
+ dest.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ cmd = [
|
|
|
+ str(pg_basebackup),
|
|
|
+ "-D", str(dest),
|
|
|
+ "-F", "p",
|
|
|
+ "-X", "f",
|
|
|
+ "-P",
|
|
|
+ "-U", pg_superuser
|
|
|
+ ]
|
|
|
+
|
|
|
+ env = os.environ.copy()
|
|
|
+ env['PGPASSWORD'] = pg_password
|
|
|
+
|
|
|
+ logger.info("开始基础备份: " + " ".join(cmd))
|
|
|
+ r = subprocess.run(
|
|
|
+ cmd,
|
|
|
+ env=env,
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ creationflags=subprocess.CREATE_NO_WINDOW
|
|
|
+ )
|
|
|
+
|
|
|
+ if r.returncode != 0:
|
|
|
+ logger.error(r.stderr)
|
|
|
+ raise RuntimeError("pg_basebackup 失败: " + r.stderr)
|
|
|
+
|
|
|
+ # 验证备份完整性
|
|
|
+ if not (dest / "backup_label").exists():
|
|
|
+ raise RuntimeError("基础备份不完整,缺少 backup_label 文件")
|
|
|
+
|
|
|
+ logger.info(f"基础备份完成: {dest}")
|
|
|
+ return str(dest)
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 检查数据库状态
|
|
|
+# -------------------------
|
|
|
+def check_database_status(dbname='postgres', user='postgres', password='zhanglei',
|
|
|
+ host='localhost', port=5432):
|
|
|
+ """
|
|
|
+ 检查数据库状态
|
|
|
+ 返回 (is_running, is_in_recovery)
|
|
|
+ """
|
|
|
try:
|
|
|
- # 源数据库路径(根据您的项目配置调整)
|
|
|
- source_db = Path(settings.BASE_DIR) / 'db.sqlite3'
|
|
|
+ conn = psycopg2.connect(
|
|
|
+ dbname=dbname,
|
|
|
+ user=user,
|
|
|
+ password=password,
|
|
|
+ host=host,
|
|
|
+ port=port,
|
|
|
+ connect_timeout=5
|
|
|
+ )
|
|
|
+ conn.autocommit = True
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 检查是否在恢复模式
|
|
|
+ cur.execute("SELECT pg_is_in_recovery()")
|
|
|
+ in_recovery = cur.fetchone()[0]
|
|
|
|
|
|
- if not source_db.exists():
|
|
|
- raise FileNotFoundError(f"数据库文件不存在: {source_db}")
|
|
|
+ cur.close()
|
|
|
+ conn.close()
|
|
|
|
|
|
- # 生成备份目录路径
|
|
|
- now = datetime.now()
|
|
|
- year_month = now.strftime("%Y%m") # 202507
|
|
|
- day_time = now.strftime("%m%d_%H_%M") # 0728_14_45
|
|
|
- backup_dir = Path(f"E:/code/backup/{year_month}/{day_time}")
|
|
|
+ return True, in_recovery
|
|
|
+ except Exception:
|
|
|
+ return False, False
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 主恢复函数:恢复到基础备份
|
|
|
+# -------------------------
|
|
|
+def restore_to_base_backup(base_backup_dir):
|
|
|
+ """
|
|
|
+ 执行基础备份恢复
|
|
|
+ base_backup_dir: 基础备份目录路径
|
|
|
+ """
|
|
|
+ data_dir = get_postgres_data_dir()
|
|
|
+ logger.info(f"开始恢复到基础备份: {base_backup_dir}")
|
|
|
+ backup_data_dir = None
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 停止服务
|
|
|
+ if not stop_postgres_service():
|
|
|
+ raise RuntimeError("无法停止 PostgreSQL 服务")
|
|
|
+ time.sleep(5) # 等待服务完全停止
|
|
|
+
|
|
|
+ # 2. 备份当前数据目录
|
|
|
+ backup_data_dir = data_dir.with_name(
|
|
|
+ data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+ )
|
|
|
+ logger.info(f"备份当前数据目录到: {backup_data_dir}")
|
|
|
+ if backup_data_dir.exists():
|
|
|
+ shutil.rmtree(backup_data_dir)
|
|
|
+ shutil.copytree(data_dir, backup_data_dir)
|
|
|
+ logger.info("备份完成")
|
|
|
|
|
|
+ # 3. 安全清空数据目录(保留关键配置文件)
|
|
|
+ logger.info("安全清空数据目录...")
|
|
|
+ exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}
|
|
|
+ for item in data_dir.iterdir():
|
|
|
+ if item.name in exclude_files:
|
|
|
+ logger.info(f"保留配置文件: {item.name}")
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ if item.is_dir():
|
|
|
+ shutil.rmtree(item)
|
|
|
+ else:
|
|
|
+ item.unlink()
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning("删除 %s 失败: %s", item, e)
|
|
|
+ logger.info("数据目录已清空")
|
|
|
+
|
|
|
+ # 4. 恢复基础备份
|
|
|
+ logger.info("恢复基础备份到数据目录...")
|
|
|
+ shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)
|
|
|
+ logger.info("基础备份已恢复")
|
|
|
+
|
|
|
+ # 5. 删除恢复相关文件(确保不进入恢复模式)
|
|
|
+ for recovery_file in ["recovery.signal", "standby.signal", "recovery.conf"]:
|
|
|
+ file_path = data_dir / recovery_file
|
|
|
+ if file_path.exists():
|
|
|
+ logger.info(f"删除恢复文件: {recovery_file}")
|
|
|
+ file_path.unlink()
|
|
|
|
|
|
- # 创建备份目录
|
|
|
- backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ # 6. 清理postgresql.auto.conf中的恢复设置
|
|
|
+ auto_conf = data_dir / "postgresql.auto.conf"
|
|
|
+ if auto_conf.exists():
|
|
|
+ logger.info("清理postgresql.auto.conf中的恢复设置")
|
|
|
+ with open(auto_conf, "r", encoding="utf-8") as f:
|
|
|
+ lines = f.readlines()
|
|
|
+
|
|
|
+ # 过滤掉恢复相关设置
|
|
|
+ new_lines = [
|
|
|
+ line for line in lines
|
|
|
+ if not line.strip().startswith((
|
|
|
+ "restore_command",
|
|
|
+ "recovery_target_time",
|
|
|
+ "recovery_target_timeline",
|
|
|
+ "recovery_target_action"
|
|
|
+ ))
|
|
|
+ ]
|
|
|
+
|
|
|
+ with open(auto_conf, "w", encoding="utf-8") as f:
|
|
|
+ f.writelines(new_lines)
|
|
|
|
|
|
- # 目标备份路径
|
|
|
- backup_path = backup_dir / f"db.sqlite3"
|
|
|
+ # 7. 启动服务
|
|
|
+ if not start_postgres_service():
|
|
|
+ raise RuntimeError("无法启动 PostgreSQL 服务")
|
|
|
|
|
|
- # 使用SQLite在线备份API确保备份完整性
|
|
|
- con = sqlite3.connect(source_db)
|
|
|
- bck = sqlite3.connect(backup_path)
|
|
|
- with bck:
|
|
|
- con.backup(bck, pages=1)
|
|
|
- bck.close()
|
|
|
- con.close()
|
|
|
+ # 8. 验证服务状态
|
|
|
+ max_retries = 10
|
|
|
+ for i in range(max_retries):
|
|
|
+ try:
|
|
|
+ is_running, in_recovery = check_database_status()
|
|
|
+ if is_running and not in_recovery:
|
|
|
+ logger.info("数据库已成功启动,不在恢复模式")
|
|
|
+ return True
|
|
|
+ time.sleep(2)
|
|
|
+ except Exception:
|
|
|
+ if i == max_retries - 1:
|
|
|
+ logger.error("无法验证数据库状态")
|
|
|
+ time.sleep(2)
|
|
|
|
|
|
- logger.info(f"数据库备份成功: {backup_path}")
|
|
|
- return str(backup_path)
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"恢复到基础备份失败: {e}")
|
|
|
+ # 回滚:恢复原始数据目录
|
|
|
+ try:
|
|
|
+ if backup_data_dir and backup_data_dir.exists():
|
|
|
+ logger.info("开始回滚:恢复原始数据目录...")
|
|
|
+ try:
|
|
|
+ stop_postgres_service()
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # 清空当前数据目录
|
|
|
+ for item in data_dir.iterdir():
|
|
|
+ try:
|
|
|
+ if item.is_dir():
|
|
|
+ shutil.rmtree(item)
|
|
|
+ else:
|
|
|
+ item.unlink()
|
|
|
+ except Exception as ex:
|
|
|
+ logger.warning("删除 %s 失败: %s", item, ex)
|
|
|
+
|
|
|
+ # 复制备份回去
|
|
|
+ shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)
|
|
|
+
|
|
|
+ # 启动服务
|
|
|
+ if start_postgres_service():
|
|
|
+ logger.info("回滚完成并已启动服务(原始数据已恢复)")
|
|
|
+ else:
|
|
|
+ logger.error("回滚后启动服务失败,请手动检查")
|
|
|
+ except Exception as roll_err:
|
|
|
+ logger.error(f"回滚失败: {roll_err}")
|
|
|
+ raise
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 辅助:解析时间字符串
|
|
|
+# 下面的代码用不了,直接基础备份恢复
|
|
|
+# -------------------------
|
|
|
+def parse_target_time(t):
|
|
|
+ """解析目标时间并转换为 UTC"""
|
|
|
+ if isinstance(t, datetime):
|
|
|
+ return t.astimezone(timezone.utc)
|
|
|
|
|
|
+ # 尝试多种格式
|
|
|
+ for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
|
|
|
+ try:
|
|
|
+ dt = datetime.strptime(t, fmt)
|
|
|
+ return dt.astimezone(timezone.utc)
|
|
|
+ except Exception:
|
|
|
+ continue
|
|
|
+
|
|
|
+ raise ValueError("无法解析目标时间,请使用 'YYYY-MM-DD HH:MM:SS' 格式")
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 检查恢复进度(通过数据库)
|
|
|
+# -------------------------
|
|
|
+def check_recovery_progress(dbname='postgres', user='postgres', password='zhanglei',
|
|
|
+ host='localhost', port=5432, target_dt=None):
|
|
|
+ """
|
|
|
+ 检查恢复进度
|
|
|
+ 返回 (in_recovery_bool, last_replay_timestamp_or_None, meets_target)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ conn = psycopg2.connect(
|
|
|
+ dbname=dbname,
|
|
|
+ user=user,
|
|
|
+ password=password,
|
|
|
+ host=host,
|
|
|
+ port=port,
|
|
|
+ connect_timeout=5
|
|
|
+ )
|
|
|
+ conn.autocommit = True
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ # 检查是否在恢复模式
|
|
|
+ cur.execute("SELECT pg_is_in_recovery()")
|
|
|
+ in_recovery = cur.fetchone()[0]
|
|
|
+
|
|
|
+ last_ts = None
|
|
|
+ try:
|
|
|
+ # 获取最后回放时间
|
|
|
+ cur.execute("SELECT pg_last_xact_replay_timestamp()")
|
|
|
+ res = cur.fetchone()[0]
|
|
|
+ if res:
|
|
|
+ last_ts = res.astimezone(timezone.utc)
|
|
|
+ except Exception as e:
|
|
|
+ logger.debug("查询 pg_last_xact_replay_timestamp 失败: %s", e)
|
|
|
+
|
|
|
+ cur.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ # 检查是否达到目标时间
|
|
|
+ ok = False
|
|
|
+ if target_dt and last_ts:
|
|
|
+ # 允许30秒的时间差
|
|
|
+ time_diff = (target_dt - last_ts).total_seconds()
|
|
|
+ ok = time_diff <= 30
|
|
|
+
|
|
|
+ return in_recovery, last_ts, ok
|
|
|
+ except Exception as e:
|
|
|
+ logger.debug("数据库连接/查询失败: %s", e)
|
|
|
+ return None, None, False
|
|
|
+
|
|
|
+# -------------------------
|
|
|
+# 主恢复函数:PITR 恢复到指定时间点
|
|
|
+# -------------------------
|
|
|
+
|
|
|
+def restore_to_point_in_time(target_time, base_backup_dir,
|
|
|
+ wal_archive_dir=r"E:\code\backup\postgres\wal_archive",
|
|
|
+ pg_superuser='postgres', pg_password='zhanglei',
|
|
|
+ db_check_name='postgres', db_check_user='postgres',
|
|
|
+ db_check_password='zhanglei', db_host='localhost',
|
|
|
+ db_port=5432, max_wait_minutes=30):
|
|
|
+ """
|
|
|
+ 执行时间点恢复 (PITR)
|
|
|
+ target_time: 'YYYY-MM-DD HH:MM:SS' 或 datetime
|
|
|
+ base_backup_dir: 基础备份目录
|
|
|
+ wal_archive_dir: WAL归档目录
|
|
|
+ """
|
|
|
+ target_dt = parse_target_time(target_time)
|
|
|
+ data_dir = get_postgres_data_dir()
|
|
|
+ logger.info(f"PITR 开始: 目标时间 {target_dt}, 数据目录: {data_dir}, 基础备份: {base_backup_dir}")
|
|
|
+ backup_data_dir = None
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 停止服务
|
|
|
+ if not stop_postgres_service():
|
|
|
+ raise RuntimeError("无法停止 PostgreSQL 服务")
|
|
|
+ time.sleep(5) # 等待服务完全停止
|
|
|
+
|
|
|
+ # 2. 备份当前数据目录
|
|
|
+ backup_data_dir = data_dir.with_name(
|
|
|
+ data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+ )
|
|
|
+ logger.info(f"备份当前数据目录到: {backup_data_dir}")
|
|
|
+ if backup_data_dir.exists():
|
|
|
+ shutil.rmtree(backup_data_dir)
|
|
|
+ shutil.copytree(data_dir, backup_data_dir)
|
|
|
+ logger.info("备份完成")
|
|
|
+
|
|
|
+ # 3. 安全清空数据目录(保留关键配置文件)
|
|
|
+ logger.info("安全清空数据目录...")
|
|
|
+ exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}
|
|
|
+ for item in data_dir.iterdir():
|
|
|
+ if item.name in exclude_files:
|
|
|
+ logger.info(f"保留配置文件: {item.name}")
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ if item.is_dir():
|
|
|
+ shutil.rmtree(item)
|
|
|
+ else:
|
|
|
+ item.unlink()
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning("删除 %s 失败: %s", item, e)
|
|
|
+ logger.info("数据目录已清空")
|
|
|
+
|
|
|
+ # 4. 恢复基础备份
|
|
|
+ logger.info("恢复基础备份到数据目录...")
|
|
|
+ shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)
|
|
|
+ logger.info("基础备份已恢复")
|
|
|
+
|
|
|
+ # 5. 创建恢复信号文件 (PostgreSQL 12+)
|
|
|
+ recovery_signal = data_dir / "recovery.signal"
|
|
|
+ logger.info(f"创建恢复信号: {recovery_signal}")
|
|
|
+ recovery_signal.touch()
|
|
|
+
|
|
|
+ # 6. 配置恢复参数
|
|
|
+ auto_conf = data_dir / "postgresql.auto.conf"
|
|
|
+ safe_wal_dir = wal_archive_dir.replace("\\", "/")
|
|
|
+ restore_command = f'copy "{wal_archive_dir}\\%f" "%p"'
|
|
|
+
|
|
|
+ logger.info(f"写入恢复配置到 {auto_conf}")
|
|
|
+ with open(auto_conf, "w", encoding="utf-8") as f:
|
|
|
+ f.write(f"restore_command = '{restore_command}'\n")
|
|
|
+ f.write(f"recovery_target_time = '{target_dt.isoformat()}'\n")
|
|
|
+ f.write("recovery_target_timeline = 'latest'\n")
|
|
|
+ logger.info("恢复配置写入完成")
|
|
|
+
|
|
|
+ # 7. 启动服务
|
|
|
+ if not start_postgres_service():
|
|
|
+ raise RuntimeError("无法启动 PostgreSQL 服务(恢复阶段)")
|
|
|
+ logger.info("服务已启动,开始轮询恢复进度...")
|
|
|
+
|
|
|
+ # 8. 等待恢复完成
|
|
|
+ timeout = timedelta(minutes=max_wait_minutes)
|
|
|
+ start_time = datetime.now(timezone.utc)
|
|
|
+ last_logged = None
|
|
|
+ recovery_completed = False
|
|
|
+ recovery_signal_file = data_dir / "recovery.signal"
|
|
|
+
|
|
|
+ while True:
|
|
|
+ elapsed = datetime.now(timezone.utc) - start_time
|
|
|
+ if elapsed > timeout:
|
|
|
+ raise RuntimeError(f"恢复超时(超过 {max_wait_minutes} 分钟)")
|
|
|
+
|
|
|
+ try:
|
|
|
+ in_recovery, last_replay_ts, meets_target = check_recovery_progress(
|
|
|
+ dbname=db_check_name,
|
|
|
+ user=db_check_user,
|
|
|
+ password=db_check_password,
|
|
|
+ host=db_host,
|
|
|
+ port=db_port,
|
|
|
+ target_dt=target_dt
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"检查恢复进度时出错: {e}")
|
|
|
+ in_recovery, last_replay_ts, meets_target = None, None, False
|
|
|
+
|
|
|
+ # 可能无法连接(服务刚启动)
|
|
|
+ if in_recovery is None:
|
|
|
+ logger.info("尚未能连接到数据库,等待 5 秒后重试...")
|
|
|
+ time.sleep(5)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 每分钟打印一次恢复进度
|
|
|
+ current_time = datetime.now(timezone.utc).strftime("%H:%M:%S")
|
|
|
+ if last_logged != current_time:
|
|
|
+ if last_replay_ts:
|
|
|
+ time_diff = (target_dt - last_replay_ts).total_seconds()
|
|
|
+ logger.info(f"恢复进度: {last_replay_ts} (差 {time_diff:.1f} 秒)")
|
|
|
+ else:
|
|
|
+ logger.info(f"恢复中... pg_is_in_recovery={in_recovery}")
|
|
|
+ last_logged = current_time
|
|
|
+
|
|
|
+ # 检查恢复完成条件(PostgreSQL 12+)
|
|
|
+ if not in_recovery and not recovery_signal_file.exists():
|
|
|
+ logger.info("恢复完成:数据库已退出恢复模式。")
|
|
|
+ recovery_completed = True
|
|
|
+ break
|
|
|
+ elif meets_target:
|
|
|
+ logger.info("已满足目标时间条件,等待最终完成...")
|
|
|
+ time.sleep(5)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 继续等待
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ if recovery_completed:
|
|
|
+ logger.info(f"PITR 成功:数据库已恢复到目标时间 {target_dt}")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.warning("恢复过程未正常完成")
|
|
|
+ return True
|
|
|
+
|
|
|
except Exception as e:
|
|
|
- logger.error(f"数据库备份失败: {str(e)}")
|
|
|
- raise
|
|
|
+ logger.error(f"PITR 恢复失败: {e}")
|
|
|
+ # 回滚:恢复原始数据目录
|
|
|
+ try:
|
|
|
+ if backup_data_dir and backup_data_dir.exists():
|
|
|
+ logger.info("开始回滚:恢复原始数据目录...")
|
|
|
+ try:
|
|
|
+ stop_postgres_service()
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # 清空当前数据目录
|
|
|
+ for item in data_dir.iterdir():
|
|
|
+ try:
|
|
|
+ if item.is_dir():
|
|
|
+ shutil.rmtree(item)
|
|
|
+ else:
|
|
|
+ item.unlink()
|
|
|
+ except Exception as ex:
|
|
|
+ logger.warning("删除 %s 失败: %s", item, ex)
|
|
|
+
|
|
|
+ # 复制备份回去
|
|
|
+ shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)
|
|
|
+
|
|
|
+ # 清理恢复标识文件
|
|
|
+ for fn in ("recovery.signal", "postgresql.auto.conf"):
|
|
|
+ p = data_dir / fn
|
|
|
+ if p.exists():
|
|
|
+ try:
|
|
|
+ p.unlink()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # 启动服务
|
|
|
+ if start_postgres_service():
|
|
|
+ logger.info("回滚完成并已启动服务(原始数据已恢复)")
|
|
|
+ else:
|
|
|
+ logger.error("回滚后启动服务失败,请手动检查")
|
|
|
+ except Exception as roll_err:
|
|
|
+ logger.error(f"回滚失败: {roll_err}")
|
|
|
+ raise
|