| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649 | import osimport subprocessimport loggingfrom datetime import datetime, timedelta, timezonefrom pathlib import Pathimport shutilimport timeimport psycopg2from psycopg2 import sql# -------------------------# 配置与日志# -------------------------def setup_logger():    logger = logging.getLogger("pitr_recovery")    logger.setLevel(logging.INFO)    if not logger.handlers:        log_dir = Path("logs")        log_dir.mkdir(exist_ok=True)        fh = logging.FileHandler(log_dir / "postgres_recovery.log", encoding="utf-8")        ch = logging.StreamHandler()        fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')        fh.setFormatter(fmt)        ch.setFormatter(fmt)        logger.addHandler(fh)        logger.addHandler(ch)    return loggerlogger = setup_logger()# -------------------------# 基本路径/服务检测# -------------------------def get_postgres_data_dir():    """获取 PostgreSQL 数据目录"""    possible_paths = [        Path(r"D:/app/postgresql/data"),        Path(r"C:/Program Files/PostgreSQL/16/data"),        Path(r"C:/Program Files/PostgreSQL/15/data"),        Path(r"C:/Program Files/PostgreSQL/14/data"),        Path(r"C:/Program Files/PostgreSQL/13/data"),        Path(r"C:/Program Files/PostgreSQL/12/data"),    ]    env_path = os.environ.get('PGDATA')    if env_path:        p = Path(env_path)        if p.exists():            return p    for p in possible_paths:        if p.exists():            return p    raise FileNotFoundError("无法找到 PostgreSQL 数据目录,请设置 PGDATA 或检查常见路径。")def get_postgres_bin_path():    """获取 PostgreSQL bin 目录"""    possible_paths = [        Path(r"D:/app/postgresql/bin"),        Path(r"C:/Program Files/PostgreSQL/16/bin"),        Path(r"C:/Program Files/PostgreSQL/15/bin"),        Path(r"C:/Program Files/PostgreSQL/14/bin"),        Path(r"C:/Program Files/PostgreSQL/13/bin"),        Path(r"C:/Program Files/PostgreSQL/12/bin"),    ]    env_path = os.environ.get('PG_BIN_PATH')    if env_path:        p = Path(env_path)        if p.exists():            return p    for p in possible_paths:        if p.exists():            return p    # 尝试从 PATH 中查找    return Nonedef get_postgres_service_name():    """检测 PostgreSQL 服务名称"""    candidates = [        "postgresql-x64-16",        "postgresql-x64-15",        "postgresql-x64-14",        "postgresql-x64-13",        "postgresql-x64-12",        "postgresql"    ]    for svc in candidates:        try:            result = subprocess.run(                ["sc", "query", svc],                 capture_output=True,                 text=True,                 creationflags=subprocess.CREATE_NO_WINDOW            )            if "RUNNING" in result.stdout:                return svc        except Exception:            continue    return candidates[0]def is_postgres_service_running():    """检查 PostgreSQL 服务是否运行"""    svc = get_postgres_service_name()    try:        result = subprocess.run(            ["sc", "query", svc],             capture_output=True,             text=True,             creationflags=subprocess.CREATE_NO_WINDOW        )        return "RUNNING" in result.stdout    except Exception:        return Falsedef stop_postgres_service():    """停止 PostgreSQL 服务"""    svc = get_postgres_service_name()    logger.info(f"停止 PostgreSQL 服务: {svc}")        # 尝试 net stop    r = subprocess.run(        ["net", "stop", svc],         capture_output=True,         text=True,         creationflags=subprocess.CREATE_NO_WINDOW    )    if r.returncode == 0:        logger.info("服务已停止")        return True        # 尝试 sc stop    r = subprocess.run(        ["sc", "stop", svc],         capture_output=True,         text=True,         creationflags=subprocess.CREATE_NO_WINDOW    )    if r.returncode == 0:        logger.info("服务已停止 (sc stop)")        return True        # 尝试 taskkill    logger.warning("通过服务接口无法停止,尝试 taskkill 强制结束 postgres.exe")    subprocess.run(        ["taskkill", "/F", "/IM", "postgres.exe"],         capture_output=True,         text=True,         creationflags=subprocess.CREATE_NO_WINDOW    )        # 确认服务已停止    max_wait = 60  # 最大等待60秒    for _ in range(max_wait // 5):        if not is_postgres_service_running():            logger.info("服务已确认停止")            return True        time.sleep(5)        logger.error("服务停止超时")    return Falsedef start_postgres_service():    """启动 PostgreSQL 服务"""    svc = get_postgres_service_name()    logger.info(f"启动 PostgreSQL 服务: {svc}")    try:        r = subprocess.run(            ["net", "start", svc],             capture_output=True,             text=True,             creationflags=subprocess.CREATE_NO_WINDOW        )        if r.returncode == 0:            logger.info("服务已启动")            return True                r = subprocess.run(            ["sc", "start", svc],             capture_output=True,             text=True,             creationflags=subprocess.CREATE_NO_WINDOW        )        if r.returncode == 0:            logger.info("服务已启动 (sc start)")            return True                logger.error(f"启动服务失败: {r.stderr}")        return False    except Exception as e:        logger.error(f"启动服务异常: {e}")        return False# -------------------------# 基础备份(pg_basebackup)# -------------------------def perform_base_backup(pg_superuser='postgres', pg_password='abc@1234'):    """执行基础备份"""    pg_bin = get_postgres_bin_path()    if not pg_bin:        raise RuntimeError("无法找到 PostgreSQL bin 目录")        pg_basebackup = pg_bin / "pg_basebackup.exe"    now = datetime.now()    dest = Path(f"E:/code/backup/postgres/base_backup/{now.strftime('%Y%m%d_%H%M%S')}")    dest.mkdir(parents=True, exist_ok=True)        cmd = [        str(pg_basebackup),         "-D", str(dest),         "-F", "p",         "-X", "f",         "-P",         "-U", pg_superuser    ]        env = os.environ.copy()    env['PGPASSWORD'] = pg_password        logger.info("开始基础备份: " + " ".join(cmd))    r = subprocess.run(        cmd,         env=env,         capture_output=True,         text=True,         creationflags=subprocess.CREATE_NO_WINDOW    )        if r.returncode != 0:        logger.error(r.stderr)        raise RuntimeError("pg_basebackup 失败: " + r.stderr)        # 验证备份完整性    if not (dest / "backup_label").exists():        raise RuntimeError("基础备份不完整,缺少 backup_label 文件")        logger.info(f"基础备份完成: {dest}")    return str(dest)# -------------------------# 检查数据库状态# -------------------------def check_database_status(dbname='postgres', user='postgres', password='abc@1234',                          host='localhost', port=5432):    """    检查数据库状态    返回 (is_running, is_in_recovery)    """    try:        conn = psycopg2.connect(            dbname=dbname,             user=user,             password=password,             host=host,             port=port,             connect_timeout=5        )        conn.autocommit = True        cur = conn.cursor()                # 检查是否在恢复模式        cur.execute("SELECT pg_is_in_recovery()")        in_recovery = cur.fetchone()[0]                cur.close()        conn.close()                return True, in_recovery    except Exception:        return False, False# -------------------------# 主恢复函数:恢复到基础备份# -------------------------def restore_to_base_backup(base_backup_dir):    """    执行基础备份恢复    base_backup_dir: 基础备份目录路径    """    data_dir = get_postgres_data_dir()    logger.info(f"开始恢复到基础备份: {base_backup_dir}")    backup_data_dir = None    try:        # 1. 停止服务        if not stop_postgres_service():            raise RuntimeError("无法停止 PostgreSQL 服务")        time.sleep(5)  # 等待服务完全停止        # 2. 备份当前数据目录        backup_data_dir = data_dir.with_name(            data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")        )        logger.info(f"备份当前数据目录到: {backup_data_dir}")        if backup_data_dir.exists():            shutil.rmtree(backup_data_dir)        shutil.copytree(data_dir, backup_data_dir)        logger.info("备份完成")        # 3. 安全清空数据目录(保留关键配置文件)        logger.info("安全清空数据目录...")        exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}        for item in data_dir.iterdir():            if item.name in exclude_files:                logger.info(f"保留配置文件: {item.name}")                continue            try:                if item.is_dir():                    shutil.rmtree(item)                else:                    item.unlink()            except Exception as e:                logger.warning("删除 %s 失败: %s", item, e)        logger.info("数据目录已清空")        # 4. 恢复基础备份        logger.info("恢复基础备份到数据目录...")        shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)        logger.info("基础备份已恢复")        # 5. 删除恢复相关文件(确保不进入恢复模式)        for recovery_file in ["recovery.signal", "standby.signal", "recovery.conf"]:            file_path = data_dir / recovery_file            if file_path.exists():                logger.info(f"删除恢复文件: {recovery_file}")                file_path.unlink()                # 6. 清理postgresql.auto.conf中的恢复设置        auto_conf = data_dir / "postgresql.auto.conf"        if auto_conf.exists():            logger.info("清理postgresql.auto.conf中的恢复设置")            with open(auto_conf, "r", encoding="utf-8") as f:                lines = f.readlines()                        # 过滤掉恢复相关设置            new_lines = [                line for line in lines                 if not line.strip().startswith((                    "restore_command",                     "recovery_target_time",                    "recovery_target_timeline",                    "recovery_target_action"                ))            ]                        with open(auto_conf, "w", encoding="utf-8") as f:                f.writelines(new_lines)                # 7. 启动服务        if not start_postgres_service():            raise RuntimeError("无法启动 PostgreSQL 服务")                # 8. 验证服务状态        max_retries = 10        for i in range(max_retries):            try:                is_running, in_recovery = check_database_status()                if is_running and not in_recovery:                    logger.info("数据库已成功启动,不在恢复模式")                    return True                time.sleep(2)            except Exception:                if i == max_retries - 1:                    logger.error("无法验证数据库状态")                time.sleep(2)                return True    except Exception as e:        logger.error(f"恢复到基础备份失败: {e}")        # 回滚:恢复原始数据目录        try:            if backup_data_dir and backup_data_dir.exists():                logger.info("开始回滚:恢复原始数据目录...")                try:                    stop_postgres_service()                except:                    pass                                # 清空当前数据目录                for item in data_dir.iterdir():                    try:                        if item.is_dir():                            shutil.rmtree(item)                        else:                            item.unlink()                    except Exception as ex:                        logger.warning("删除 %s 失败: %s", item, ex)                                # 复制备份回去                shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)                                # 启动服务                if start_postgres_service():                    logger.info("回滚完成并已启动服务(原始数据已恢复)")                else:                    logger.error("回滚后启动服务失败,请手动检查")        except Exception as roll_err:            logger.error(f"回滚失败: {roll_err}")        raise# -------------------------# 辅助:解析时间字符串 # 下面的代码用不了,直接基础备份恢复# -------------------------def parse_target_time(t):    """解析目标时间并转换为 UTC"""    if isinstance(t, datetime):        return t.astimezone(timezone.utc)        # 尝试多种格式    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):        try:            dt = datetime.strptime(t, fmt)            return dt.astimezone(timezone.utc)        except Exception:            continue        raise ValueError("无法解析目标时间,请使用 'YYYY-MM-DD HH:MM:SS' 格式")# -------------------------# 检查恢复进度(通过数据库)# -------------------------def check_recovery_progress(dbname='postgres', user='postgres', password='abc@1234',                            host='localhost', port=5432, target_dt=None):    """    检查恢复进度    返回 (in_recovery_bool, last_replay_timestamp_or_None, meets_target)    """    try:        conn = psycopg2.connect(            dbname=dbname,             user=user,             password=password,             host=host,             port=port,             connect_timeout=5        )        conn.autocommit = True        cur = conn.cursor()                # 检查是否在恢复模式        cur.execute("SELECT pg_is_in_recovery()")        in_recovery = cur.fetchone()[0]                last_ts = None        try:            # 获取最后回放时间            cur.execute("SELECT pg_last_xact_replay_timestamp()")            res = cur.fetchone()[0]            if res:                last_ts = res.astimezone(timezone.utc)        except Exception as e:            logger.debug("查询 pg_last_xact_replay_timestamp 失败: %s", e)                cur.close()        conn.close()                # 检查是否达到目标时间        ok = False        if target_dt and last_ts:            # 允许30秒的时间差            time_diff = (target_dt - last_ts).total_seconds()            ok = time_diff <= 30                return in_recovery, last_ts, ok    except Exception as e:        logger.debug("数据库连接/查询失败: %s", e)        return None, None, False# -------------------------# 主恢复函数:PITR 恢复到指定时间点# -------------------------def restore_to_point_in_time(target_time, base_backup_dir,                             wal_archive_dir=r"E:\code\backup\postgres\wal_archive",                            pg_superuser='postgres', pg_password='abc@1234',                             db_check_name='postgres', db_check_user='postgres',                             db_check_password='abc@1234', db_host='localhost',                             db_port=5432, max_wait_minutes=30):    """    执行时间点恢复 (PITR)    target_time: 'YYYY-MM-DD HH:MM:SS' 或 datetime    base_backup_dir: 基础备份目录    wal_archive_dir: WAL归档目录    """    target_dt = parse_target_time(target_time)    data_dir = get_postgres_data_dir()    logger.info(f"PITR 开始: 目标时间 {target_dt}, 数据目录: {data_dir}, 基础备份: {base_backup_dir}")    backup_data_dir = None    try:        # 1. 停止服务        if not stop_postgres_service():            raise RuntimeError("无法停止 PostgreSQL 服务")        time.sleep(5)  # 等待服务完全停止        # 2. 备份当前数据目录        backup_data_dir = data_dir.with_name(            data_dir.name + "_backup_" + datetime.now().strftime("%Y%m%d_%H%M%S")        )        logger.info(f"备份当前数据目录到: {backup_data_dir}")        if backup_data_dir.exists():            shutil.rmtree(backup_data_dir)        shutil.copytree(data_dir, backup_data_dir)        logger.info("备份完成")        # 3. 安全清空数据目录(保留关键配置文件)        logger.info("安全清空数据目录...")        exclude_files = {"postgresql.conf", "pg_hba.conf", "pg_ident.conf"}        for item in data_dir.iterdir():            if item.name in exclude_files:                logger.info(f"保留配置文件: {item.name}")                continue            try:                if item.is_dir():                    shutil.rmtree(item)                else:                    item.unlink()            except Exception as e:                logger.warning("删除 %s 失败: %s", item, e)        logger.info("数据目录已清空")        # 4. 恢复基础备份        logger.info("恢复基础备份到数据目录...")        shutil.copytree(base_backup_dir, data_dir, dirs_exist_ok=True)        logger.info("基础备份已恢复")        # 5. 创建恢复信号文件 (PostgreSQL 12+)        recovery_signal = data_dir / "recovery.signal"        logger.info(f"创建恢复信号: {recovery_signal}")        recovery_signal.touch()        # 6. 配置恢复参数        auto_conf = data_dir / "postgresql.auto.conf"        safe_wal_dir = wal_archive_dir.replace("\\", "/")        restore_command = f'copy "{wal_archive_dir}\\%f" "%p"'                logger.info(f"写入恢复配置到 {auto_conf}")        with open(auto_conf, "w", encoding="utf-8") as f:            f.write(f"restore_command = '{restore_command}'\n")            f.write(f"recovery_target_time = '{target_dt.isoformat()}'\n")            f.write("recovery_target_timeline = 'latest'\n")        logger.info("恢复配置写入完成")        # 7. 启动服务        if not start_postgres_service():            raise RuntimeError("无法启动 PostgreSQL 服务(恢复阶段)")        logger.info("服务已启动,开始轮询恢复进度...")        # 8. 等待恢复完成        timeout = timedelta(minutes=max_wait_minutes)        start_time = datetime.now(timezone.utc)        last_logged = None        recovery_completed = False        recovery_signal_file = data_dir / "recovery.signal"        while True:            elapsed = datetime.now(timezone.utc) - start_time            if elapsed > timeout:                raise RuntimeError(f"恢复超时(超过 {max_wait_minutes} 分钟)")            try:                in_recovery, last_replay_ts, meets_target = check_recovery_progress(                    dbname=db_check_name,                    user=db_check_user,                    password=db_check_password,                    host=db_host,                    port=db_port,                    target_dt=target_dt                )            except Exception as e:                logger.error(f"检查恢复进度时出错: {e}")                in_recovery, last_replay_ts, meets_target = None, None, False            # 可能无法连接(服务刚启动)            if in_recovery is None:                logger.info("尚未能连接到数据库,等待 5 秒后重试...")                time.sleep(5)                continue            # 每分钟打印一次恢复进度            current_time = datetime.now(timezone.utc).strftime("%H:%M:%S")            if last_logged != current_time:                if last_replay_ts:                    time_diff = (target_dt - last_replay_ts).total_seconds()                    logger.info(f"恢复进度: {last_replay_ts} (差 {time_diff:.1f} 秒)")                else:                    logger.info(f"恢复中... pg_is_in_recovery={in_recovery}")                last_logged = current_time            # 检查恢复完成条件(PostgreSQL 12+)            if not in_recovery and not recovery_signal_file.exists():                logger.info("恢复完成:数据库已退出恢复模式。")                recovery_completed = True                break            elif meets_target:                logger.info("已满足目标时间条件,等待最终完成...")                time.sleep(5)                continue            # 继续等待            time.sleep(5)        if recovery_completed:            logger.info(f"PITR 成功:数据库已恢复到目标时间 {target_dt}")            return True        else:            logger.warning("恢复过程未正常完成")            return True    except Exception as e:        logger.error(f"PITR 恢复失败: {e}")        # 回滚:恢复原始数据目录        try:            if backup_data_dir and backup_data_dir.exists():                logger.info("开始回滚:恢复原始数据目录...")                try:                    stop_postgres_service()                except:                    pass                                # 清空当前数据目录                for item in data_dir.iterdir():                    try:                        if item.is_dir():                            shutil.rmtree(item)                        else:                            item.unlink()                    except Exception as ex:                        logger.warning("删除 %s 失败: %s", item, ex)                                # 复制备份回去                shutil.copytree(backup_data_dir, data_dir, dirs_exist_ok=True)                                # 清理恢复标识文件                for fn in ("recovery.signal", "postgresql.auto.conf"):                    p = data_dir / fn                    if p.exists():                        try:                            p.unlink()                        except Exception:                            pass                                # 启动服务                if start_postgres_service():                    logger.info("回滚完成并已启动服务(原始数据已恢复)")                else:                    logger.error("回滚后启动服务失败,请手动检查")        except Exception as roll_err:            logger.error(f"回滚失败: {roll_err}")        raise
 |