import os import subprocess import logging from datetime import datetime, timedelta from pathlib import Path import psycopg2 import sys # ------------------------- # 配置与日志 # ------------------------- def setup_logger(): logger = logging.getLogger("wal_archive_check") logger.setLevel(logging.INFO) if not logger.handlers: log_dir = Path("logs") log_dir.mkdir(exist_ok=True) fh = logging.FileHandler(log_dir / "wal_archive_check.log", encoding="utf-8") ch = logging.StreamHandler() fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') fh.setFormatter(fmt) ch.setFormatter(fmt) logger.addHandler(fh) logger.addHandler(ch) return logger logger = setup_logger() # ------------------------- # 数据库连接辅助函数 # ------------------------- def connect_to_db(dbname='postgres', user='postgres', password='abc@1234', host='localhost', port=5432): try: conn = psycopg2.connect( dbname=dbname, user=user, password=password, host=host, port=port, connect_timeout=5 ) return conn except Exception as e: logger.error(f"数据库连接失败: {e}") return None # ------------------------- # 获取当前WAL文件名(兼容所有psycopg2版本) # ------------------------- def get_current_wal_file(conn): """获取当前WAL文件名(兼容所有psycopg2版本)""" try: cur = conn.cursor() # 使用SQL函数获取WAL文件名 cur.execute("SELECT pg_walfile_name(pg_current_wal_lsn())") current_wal_file = cur.fetchone()[0] cur.close() return current_wal_file except Exception as e: logger.error(f"获取当前WAL文件名失败: {e}") return None # ------------------------- # 获取归档状态 # ------------------------- def get_archive_status(conn): try: cur = conn.cursor() cur.execute("SELECT * FROM pg_stat_archiver") columns = [desc[0] for desc in cur.description] row = cur.fetchone() cur.close() if row: return dict(zip(columns, row)) return None except Exception as e: logger.error(f"获取归档状态失败: {e}") return None # ------------------------- # 检查WAL文件是否归档 # ------------------------- def check_wal_archived(wal_name, archive_dir): """检查特定WAL文件是否在归档目录中""" archive_path = Path(archive_dir) / wal_name return archive_path.exists() # ------------------------- # 获取未归档的WAL文件列表(优化性能) # ------------------------- def get_missing_wal_files(conn, archive_dir): """获取所有未归档的WAL文件(优化性能)""" try: cur = conn.cursor() # 获取需要检查的WAL文件范围 cur.execute(""" SELECT min(pg_walfile_name(lsn)) AS first_wal, max(pg_walfile_name(lsn)) AS last_wal FROM generate_series( '0/00000000'::pg_lsn, pg_current_wal_lsn(), '1'::pg_lsn ) AS lsn """) result = cur.fetchone() if not result: return [] first_wal, last_wal = result logger.info(f"检查WAL文件范围: {first_wal} 到 {last_wal}") # 获取归档目录中已有的WAL文件 archive_files = set(f.name for f in Path(archive_dir).iterdir() if f.name.startswith('0000000') and first_wal <= f.name <= last_wal) # 生成所有应该存在的WAL文件 all_wal_files = set() current = first_wal while current <= last_wal: all_wal_files.add(current) # 计算下一个WAL文件名 prefix = current[:24] suffix = int(current[24:], 16) + 1 current = f"{prefix}{suffix:08X}" # 找出缺失的文件 missing_files = sorted(all_wal_files - archive_files) return missing_files except Exception as e: logger.error(f"获取未归档WAL文件失败: {e}") return [] # ------------------------- # 检查归档目录状态 # ------------------------- def check_archive_directory(archive_dir): """检查归档目录的基本状态""" archive_path = Path(archive_dir) if not archive_path.exists(): logger.error(f"归档目录不存在: {archive_dir}") return False if not archive_path.is_dir(): logger.error(f"归档路径不是目录: {archive_dir}") return False # 检查目录中是否有WAL文件 wal_files = list(archive_path.glob("0000000*")) if not wal_files: logger.warning(f"归档目录中没有WAL文件: {archive_dir}") return False return True # ------------------------- # 主检查函数 # ------------------------- def check_wal_archive_status(archive_dir, dbname='postgres', user='postgres', password='abc@1234', host='localhost', port=5432): """ 检查WAL归档状态 archive_dir: WAL归档目录路径 """ logger.info(f"开始检查WAL归档状态: {archive_dir}") # 1. 检查归档目录 if not check_archive_directory(archive_dir): return False # 2. 连接数据库 conn = connect_to_db(dbname, user, password, host, port) if not conn: return False try: # 3. 获取当前WAL文件名 current_wal_file = get_current_wal_file(conn) if not current_wal_file: logger.error("无法获取当前WAL文件名") return False logger.info(f"当前WAL文件: {current_wal_file}") # 4. 获取归档状态 archive_status = get_archive_status(conn) if archive_status: logger.info("归档状态:") for key, value in archive_status.items(): if key in ['last_archived_wal', 'last_failed_wal', 'last_archived_time', 'last_failed_time', 'last_archived_failure_message']: logger.info(f" {key}: {value}") # 检查最后归档的文件 last_archived_wal = archive_status.get("last_archived_wal") if last_archived_wal: logger.info(f"最后归档的WAL文件: {last_archived_wal}") # 检查最后归档的文件是否在目录中 if check_wal_archived(last_archived_wal, archive_dir): logger.info("最后归档的文件存在于归档目录中") else: logger.error("最后归档的文件不存在于归档目录中!") # 检查是否有归档失败 last_failed_wal = archive_status.get("last_failed_wal") if last_failed_wal: logger.error(f"最后归档失败的文件: {last_failed_wal}") logger.error(f"失败原因: {archive_status.get('last_archived_failure_message')}") # 5. 获取未归档的文件列表 missing_files = get_missing_wal_files(conn, archive_dir) if missing_files: logger.warning(f"发现 {len(missing_files)} 个未归档的WAL文件:") for i, file in enumerate(missing_files[:10]): # 最多显示前10个 logger.warning(f" {i+1}. {file}") if len(missing_files) > 10: logger.warning(f" 还有 {len(missing_files)-10} 个未显示...") else: logger.info("所有WAL文件均已归档") return True finally: conn.close() # ------------------------- # 主函数 # ------------------------- if __name__ == "__main__": # 配置参数 archive_dir = r"E:\code\backup\postgres\wal_archive" # 修改为您的归档目录 db_params = { 'dbname': 'postgres', 'user': 'postgres', 'password': 'abc@1234', 'host': 'localhost', 'port': 5432 } # 执行检查 success = check_wal_archive_status(archive_dir, **db_params) if success: logger.info("WAL归档状态检查完成") sys.exit(0) else: logger.error("WAL归档状态检查失败") sys.exit(1)