| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 | import osimport subprocessimport loggingfrom datetime import datetime, timedeltafrom pathlib import Pathimport psycopg2import sys# -------------------------# 配置与日志# -------------------------def setup_logger():    logger = logging.getLogger("wal_archive_check")    logger.setLevel(logging.INFO)    if not logger.handlers:        log_dir = Path("logs")        log_dir.mkdir(exist_ok=True)        fh = logging.FileHandler(log_dir / "wal_archive_check.log", encoding="utf-8")        ch = logging.StreamHandler()        fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')        fh.setFormatter(fmt)        ch.setFormatter(fmt)        logger.addHandler(fh)        logger.addHandler(ch)    return loggerlogger = setup_logger()# -------------------------# 数据库连接辅助函数# -------------------------def connect_to_db(dbname='postgres', user='postgres', password='abc@1234',                  host='localhost', port=5432):    try:        conn = psycopg2.connect(            dbname=dbname,            user=user,            password=password,            host=host,            port=port,            connect_timeout=5        )        return conn    except Exception as e:        logger.error(f"数据库连接失败: {e}")        return None# -------------------------# 获取当前WAL文件名(兼容所有psycopg2版本)# -------------------------def get_current_wal_file(conn):    """获取当前WAL文件名(兼容所有psycopg2版本)"""    try:        cur = conn.cursor()        # 使用SQL函数获取WAL文件名        cur.execute("SELECT pg_walfile_name(pg_current_wal_lsn())")        current_wal_file = cur.fetchone()[0]        cur.close()        return current_wal_file    except Exception as e:        logger.error(f"获取当前WAL文件名失败: {e}")        return None# -------------------------# 获取归档状态# -------------------------def get_archive_status(conn):    try:        cur = conn.cursor()        cur.execute("SELECT * FROM pg_stat_archiver")        columns = [desc[0] for desc in cur.description]        row = cur.fetchone()        cur.close()                if row:            return dict(zip(columns, row))        return None    except Exception as e:        logger.error(f"获取归档状态失败: {e}")        return None# -------------------------# 检查WAL文件是否归档# -------------------------def check_wal_archived(wal_name, archive_dir):    """检查特定WAL文件是否在归档目录中"""    archive_path = Path(archive_dir) / wal_name    return archive_path.exists()# -------------------------# 获取未归档的WAL文件列表(优化性能)# -------------------------def get_missing_wal_files(conn, archive_dir):    """获取所有未归档的WAL文件(优化性能)"""    try:        cur = conn.cursor()        # 获取需要检查的WAL文件范围        cur.execute("""            SELECT min(pg_walfile_name(lsn)) AS first_wal,                   max(pg_walfile_name(lsn)) AS last_wal            FROM generate_series(                '0/00000000'::pg_lsn,                pg_current_wal_lsn(),                '1'::pg_lsn            ) AS lsn        """)        result = cur.fetchone()        if not result:            return []                first_wal, last_wal = result        logger.info(f"检查WAL文件范围: {first_wal} 到 {last_wal}")                # 获取归档目录中已有的WAL文件        archive_files = set(f.name for f in Path(archive_dir).iterdir()                            if f.name.startswith('0000000') and                            first_wal <= f.name <= last_wal)                # 生成所有应该存在的WAL文件        all_wal_files = set()        current = first_wal        while current <= last_wal:            all_wal_files.add(current)            # 计算下一个WAL文件名            prefix = current[:24]            suffix = int(current[24:], 16) + 1            current = f"{prefix}{suffix:08X}"                # 找出缺失的文件        missing_files = sorted(all_wal_files - archive_files)        return missing_files    except Exception as e:        logger.error(f"获取未归档WAL文件失败: {e}")        return []# -------------------------# 检查归档目录状态# -------------------------def check_archive_directory(archive_dir):    """检查归档目录的基本状态"""    archive_path = Path(archive_dir)        if not archive_path.exists():        logger.error(f"归档目录不存在: {archive_dir}")        return False        if not archive_path.is_dir():        logger.error(f"归档路径不是目录: {archive_dir}")        return False        # 检查目录中是否有WAL文件    wal_files = list(archive_path.glob("0000000*"))    if not wal_files:        logger.warning(f"归档目录中没有WAL文件: {archive_dir}")        return False        return True# -------------------------# 主检查函数# -------------------------def check_wal_archive_status(archive_dir, dbname='postgres', user='postgres',                             password='abc@1234', host='localhost', port=5432):    """    检查WAL归档状态    archive_dir: WAL归档目录路径    """    logger.info(f"开始检查WAL归档状态: {archive_dir}")        # 1. 检查归档目录    if not check_archive_directory(archive_dir):        return False        # 2. 连接数据库    conn = connect_to_db(dbname, user, password, host, port)    if not conn:        return False        try:        # 3. 获取当前WAL文件名        current_wal_file = get_current_wal_file(conn)        if not current_wal_file:            logger.error("无法获取当前WAL文件名")            return False        logger.info(f"当前WAL文件: {current_wal_file}")                # 4. 获取归档状态        archive_status = get_archive_status(conn)        if archive_status:            logger.info("归档状态:")            for key, value in archive_status.items():                if key in ['last_archived_wal', 'last_failed_wal', 'last_archived_time',                           'last_failed_time', 'last_archived_failure_message']:                    logger.info(f"  {key}: {value}")                        # 检查最后归档的文件            last_archived_wal = archive_status.get("last_archived_wal")            if last_archived_wal:                logger.info(f"最后归档的WAL文件: {last_archived_wal}")                                # 检查最后归档的文件是否在目录中                if check_wal_archived(last_archived_wal, archive_dir):                    logger.info("最后归档的文件存在于归档目录中")                else:                    logger.error("最后归档的文件不存在于归档目录中!")                        # 检查是否有归档失败            last_failed_wal = archive_status.get("last_failed_wal")            if last_failed_wal:                logger.error(f"最后归档失败的文件: {last_failed_wal}")                logger.error(f"失败原因: {archive_status.get('last_archived_failure_message')}")                # 5. 获取未归档的文件列表        missing_files = get_missing_wal_files(conn, archive_dir)        if missing_files:            logger.warning(f"发现 {len(missing_files)} 个未归档的WAL文件:")            for i, file in enumerate(missing_files[:10]):  # 最多显示前10个                logger.warning(f"  {i+1}. {file}")            if len(missing_files) > 10:                logger.warning(f"  还有 {len(missing_files)-10} 个未显示...")        else:            logger.info("所有WAL文件均已归档")                return True    finally:        conn.close()# -------------------------# 主函数# -------------------------if __name__ == "__main__":    # 配置参数    archive_dir = r"E:\code\backup\postgres\wal_archive"  # 修改为您的归档目录    db_params = {        'dbname': 'postgres',        'user': 'postgres',        'password': 'abc@1234',        'host': 'localhost',        'port': 5432    }        # 执行检查    success = check_wal_archive_status(archive_dir, **db_params)        if success:        logger.info("WAL归档状态检查完成")        sys.exit(0)    else:        logger.error("WAL归档状态检查失败")        sys.exit(1)
 |