| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import os
- import subprocess
- import logging
- from datetime import datetime, timedelta
- from pathlib import Path
- import psycopg2
- import sys
- # -------------------------
- # 配置与日志
- # -------------------------
- def setup_logger():
- logger = logging.getLogger("wal_archive_check")
- logger.setLevel(logging.INFO)
- if not logger.handlers:
- log_dir = Path("logs")
- log_dir.mkdir(exist_ok=True)
- fh = logging.FileHandler(log_dir / "wal_archive_check.log", encoding="utf-8")
- ch = logging.StreamHandler()
- fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- fh.setFormatter(fmt)
- ch.setFormatter(fmt)
- logger.addHandler(fh)
- logger.addHandler(ch)
- return logger
- logger = setup_logger()
- # -------------------------
- # 数据库连接辅助函数
- # -------------------------
- def connect_to_db(dbname='postgres', user='postgres', password='abc@1234',
- host='localhost', port=5432):
- try:
- conn = psycopg2.connect(
- dbname=dbname,
- user=user,
- password=password,
- host=host,
- port=port,
- connect_timeout=5
- )
- return conn
- except Exception as e:
- logger.error(f"数据库连接失败: {e}")
- return None
- # -------------------------
- # 获取当前WAL文件名(兼容所有psycopg2版本)
- # -------------------------
- def get_current_wal_file(conn):
- """获取当前WAL文件名(兼容所有psycopg2版本)"""
- try:
- cur = conn.cursor()
- # 使用SQL函数获取WAL文件名
- cur.execute("SELECT pg_walfile_name(pg_current_wal_lsn())")
- current_wal_file = cur.fetchone()[0]
- cur.close()
- return current_wal_file
- except Exception as e:
- logger.error(f"获取当前WAL文件名失败: {e}")
- return None
- # -------------------------
- # 获取归档状态
- # -------------------------
- def get_archive_status(conn):
- try:
- cur = conn.cursor()
- cur.execute("SELECT * FROM pg_stat_archiver")
- columns = [desc[0] for desc in cur.description]
- row = cur.fetchone()
- cur.close()
-
- if row:
- return dict(zip(columns, row))
- return None
- except Exception as e:
- logger.error(f"获取归档状态失败: {e}")
- return None
- # -------------------------
- # 检查WAL文件是否归档
- # -------------------------
- def check_wal_archived(wal_name, archive_dir):
- """检查特定WAL文件是否在归档目录中"""
- archive_path = Path(archive_dir) / wal_name
- return archive_path.exists()
- # -------------------------
- # 获取未归档的WAL文件列表(优化性能)
- # -------------------------
- def get_missing_wal_files(conn, archive_dir):
- """获取所有未归档的WAL文件(优化性能)"""
- try:
- cur = conn.cursor()
- # 获取需要检查的WAL文件范围
- cur.execute("""
- SELECT min(pg_walfile_name(lsn)) AS first_wal,
- max(pg_walfile_name(lsn)) AS last_wal
- FROM generate_series(
- '0/00000000'::pg_lsn,
- pg_current_wal_lsn(),
- '1'::pg_lsn
- ) AS lsn
- """)
- result = cur.fetchone()
- if not result:
- return []
-
- first_wal, last_wal = result
- logger.info(f"检查WAL文件范围: {first_wal} 到 {last_wal}")
-
- # 获取归档目录中已有的WAL文件
- archive_files = set(f.name for f in Path(archive_dir).iterdir()
- if f.name.startswith('0000000') and
- first_wal <= f.name <= last_wal)
-
- # 生成所有应该存在的WAL文件
- all_wal_files = set()
- current = first_wal
- while current <= last_wal:
- all_wal_files.add(current)
- # 计算下一个WAL文件名
- prefix = current[:24]
- suffix = int(current[24:], 16) + 1
- current = f"{prefix}{suffix:08X}"
-
- # 找出缺失的文件
- missing_files = sorted(all_wal_files - archive_files)
- return missing_files
- except Exception as e:
- logger.error(f"获取未归档WAL文件失败: {e}")
- return []
- # -------------------------
- # 检查归档目录状态
- # -------------------------
- def check_archive_directory(archive_dir):
- """检查归档目录的基本状态"""
- archive_path = Path(archive_dir)
-
- if not archive_path.exists():
- logger.error(f"归档目录不存在: {archive_dir}")
- return False
-
- if not archive_path.is_dir():
- logger.error(f"归档路径不是目录: {archive_dir}")
- return False
-
- # 检查目录中是否有WAL文件
- wal_files = list(archive_path.glob("0000000*"))
- if not wal_files:
- logger.warning(f"归档目录中没有WAL文件: {archive_dir}")
- return False
-
- return True
- # -------------------------
- # 主检查函数
- # -------------------------
- def check_wal_archive_status(archive_dir, dbname='postgres', user='postgres',
- password='abc@1234', host='localhost', port=5432):
- """
- 检查WAL归档状态
- archive_dir: WAL归档目录路径
- """
- logger.info(f"开始检查WAL归档状态: {archive_dir}")
-
- # 1. 检查归档目录
- if not check_archive_directory(archive_dir):
- return False
-
- # 2. 连接数据库
- conn = connect_to_db(dbname, user, password, host, port)
- if not conn:
- return False
-
- try:
- # 3. 获取当前WAL文件名
- current_wal_file = get_current_wal_file(conn)
- if not current_wal_file:
- logger.error("无法获取当前WAL文件名")
- return False
- logger.info(f"当前WAL文件: {current_wal_file}")
-
- # 4. 获取归档状态
- archive_status = get_archive_status(conn)
- if archive_status:
- logger.info("归档状态:")
- for key, value in archive_status.items():
- if key in ['last_archived_wal', 'last_failed_wal', 'last_archived_time',
- 'last_failed_time', 'last_archived_failure_message']:
- logger.info(f" {key}: {value}")
-
- # 检查最后归档的文件
- last_archived_wal = archive_status.get("last_archived_wal")
- if last_archived_wal:
- logger.info(f"最后归档的WAL文件: {last_archived_wal}")
-
- # 检查最后归档的文件是否在目录中
- if check_wal_archived(last_archived_wal, archive_dir):
- logger.info("最后归档的文件存在于归档目录中")
- else:
- logger.error("最后归档的文件不存在于归档目录中!")
-
- # 检查是否有归档失败
- last_failed_wal = archive_status.get("last_failed_wal")
- if last_failed_wal:
- logger.error(f"最后归档失败的文件: {last_failed_wal}")
- logger.error(f"失败原因: {archive_status.get('last_archived_failure_message')}")
-
- # 5. 获取未归档的文件列表
- missing_files = get_missing_wal_files(conn, archive_dir)
- if missing_files:
- logger.warning(f"发现 {len(missing_files)} 个未归档的WAL文件:")
- for i, file in enumerate(missing_files[:10]): # 最多显示前10个
- logger.warning(f" {i+1}. {file}")
- if len(missing_files) > 10:
- logger.warning(f" 还有 {len(missing_files)-10} 个未显示...")
- else:
- logger.info("所有WAL文件均已归档")
-
- return True
- finally:
- conn.close()
- # -------------------------
- # 主函数
- # -------------------------
- if __name__ == "__main__":
- # 配置参数
- archive_dir = r"E:\code\backup\postgres\wal_archive" # 修改为您的归档目录
- db_params = {
- 'dbname': 'postgres',
- 'user': 'postgres',
- 'password': 'abc@1234',
- 'host': 'localhost',
- 'port': 5432
- }
-
- # 执行检查
- success = check_wal_archive_status(archive_dir, **db_params)
-
- if success:
- logger.info("WAL归档状态检查完成")
- sys.exit(0)
- else:
- logger.error("WAL归档状态检查失败")
- sys.exit(1)
|