testpostgresql.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import os
  2. import subprocess
  3. import logging
  4. from datetime import datetime, timedelta
  5. from pathlib import Path
  6. import psycopg2
  7. import sys
  8. # -------------------------
  9. # 配置与日志
  10. # -------------------------
  11. def setup_logger():
  12. logger = logging.getLogger("wal_archive_check")
  13. logger.setLevel(logging.INFO)
  14. if not logger.handlers:
  15. log_dir = Path("logs")
  16. log_dir.mkdir(exist_ok=True)
  17. fh = logging.FileHandler(log_dir / "wal_archive_check.log", encoding="utf-8")
  18. ch = logging.StreamHandler()
  19. fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  20. fh.setFormatter(fmt)
  21. ch.setFormatter(fmt)
  22. logger.addHandler(fh)
  23. logger.addHandler(ch)
  24. return logger
  25. logger = setup_logger()
  26. # -------------------------
  27. # 数据库连接辅助函数
  28. # -------------------------
  29. def connect_to_db(dbname='postgres', user='postgres', password='abc@1234',
  30. host='localhost', port=5432):
  31. try:
  32. conn = psycopg2.connect(
  33. dbname=dbname,
  34. user=user,
  35. password=password,
  36. host=host,
  37. port=port,
  38. connect_timeout=5
  39. )
  40. return conn
  41. except Exception as e:
  42. logger.error(f"数据库连接失败: {e}")
  43. return None
  44. # -------------------------
  45. # 获取当前WAL文件名(兼容所有psycopg2版本)
  46. # -------------------------
  47. def get_current_wal_file(conn):
  48. """获取当前WAL文件名(兼容所有psycopg2版本)"""
  49. try:
  50. cur = conn.cursor()
  51. # 使用SQL函数获取WAL文件名
  52. cur.execute("SELECT pg_walfile_name(pg_current_wal_lsn())")
  53. current_wal_file = cur.fetchone()[0]
  54. cur.close()
  55. return current_wal_file
  56. except Exception as e:
  57. logger.error(f"获取当前WAL文件名失败: {e}")
  58. return None
  59. # -------------------------
  60. # 获取归档状态
  61. # -------------------------
  62. def get_archive_status(conn):
  63. try:
  64. cur = conn.cursor()
  65. cur.execute("SELECT * FROM pg_stat_archiver")
  66. columns = [desc[0] for desc in cur.description]
  67. row = cur.fetchone()
  68. cur.close()
  69. if row:
  70. return dict(zip(columns, row))
  71. return None
  72. except Exception as e:
  73. logger.error(f"获取归档状态失败: {e}")
  74. return None
  75. # -------------------------
  76. # 检查WAL文件是否归档
  77. # -------------------------
  78. def check_wal_archived(wal_name, archive_dir):
  79. """检查特定WAL文件是否在归档目录中"""
  80. archive_path = Path(archive_dir) / wal_name
  81. return archive_path.exists()
  82. # -------------------------
  83. # 获取未归档的WAL文件列表(优化性能)
  84. # -------------------------
  85. def get_missing_wal_files(conn, archive_dir):
  86. """获取所有未归档的WAL文件(优化性能)"""
  87. try:
  88. cur = conn.cursor()
  89. # 获取需要检查的WAL文件范围
  90. cur.execute("""
  91. SELECT min(pg_walfile_name(lsn)) AS first_wal,
  92. max(pg_walfile_name(lsn)) AS last_wal
  93. FROM generate_series(
  94. '0/00000000'::pg_lsn,
  95. pg_current_wal_lsn(),
  96. '1'::pg_lsn
  97. ) AS lsn
  98. """)
  99. result = cur.fetchone()
  100. if not result:
  101. return []
  102. first_wal, last_wal = result
  103. logger.info(f"检查WAL文件范围: {first_wal} 到 {last_wal}")
  104. # 获取归档目录中已有的WAL文件
  105. archive_files = set(f.name for f in Path(archive_dir).iterdir()
  106. if f.name.startswith('0000000') and
  107. first_wal <= f.name <= last_wal)
  108. # 生成所有应该存在的WAL文件
  109. all_wal_files = set()
  110. current = first_wal
  111. while current <= last_wal:
  112. all_wal_files.add(current)
  113. # 计算下一个WAL文件名
  114. prefix = current[:24]
  115. suffix = int(current[24:], 16) + 1
  116. current = f"{prefix}{suffix:08X}"
  117. # 找出缺失的文件
  118. missing_files = sorted(all_wal_files - archive_files)
  119. return missing_files
  120. except Exception as e:
  121. logger.error(f"获取未归档WAL文件失败: {e}")
  122. return []
  123. # -------------------------
  124. # 检查归档目录状态
  125. # -------------------------
  126. def check_archive_directory(archive_dir):
  127. """检查归档目录的基本状态"""
  128. archive_path = Path(archive_dir)
  129. if not archive_path.exists():
  130. logger.error(f"归档目录不存在: {archive_dir}")
  131. return False
  132. if not archive_path.is_dir():
  133. logger.error(f"归档路径不是目录: {archive_dir}")
  134. return False
  135. # 检查目录中是否有WAL文件
  136. wal_files = list(archive_path.glob("0000000*"))
  137. if not wal_files:
  138. logger.warning(f"归档目录中没有WAL文件: {archive_dir}")
  139. return False
  140. return True
  141. # -------------------------
  142. # 主检查函数
  143. # -------------------------
  144. def check_wal_archive_status(archive_dir, dbname='postgres', user='postgres',
  145. password='abc@1234', host='localhost', port=5432):
  146. """
  147. 检查WAL归档状态
  148. archive_dir: WAL归档目录路径
  149. """
  150. logger.info(f"开始检查WAL归档状态: {archive_dir}")
  151. # 1. 检查归档目录
  152. if not check_archive_directory(archive_dir):
  153. return False
  154. # 2. 连接数据库
  155. conn = connect_to_db(dbname, user, password, host, port)
  156. if not conn:
  157. return False
  158. try:
  159. # 3. 获取当前WAL文件名
  160. current_wal_file = get_current_wal_file(conn)
  161. if not current_wal_file:
  162. logger.error("无法获取当前WAL文件名")
  163. return False
  164. logger.info(f"当前WAL文件: {current_wal_file}")
  165. # 4. 获取归档状态
  166. archive_status = get_archive_status(conn)
  167. if archive_status:
  168. logger.info("归档状态:")
  169. for key, value in archive_status.items():
  170. if key in ['last_archived_wal', 'last_failed_wal', 'last_archived_time',
  171. 'last_failed_time', 'last_archived_failure_message']:
  172. logger.info(f" {key}: {value}")
  173. # 检查最后归档的文件
  174. last_archived_wal = archive_status.get("last_archived_wal")
  175. if last_archived_wal:
  176. logger.info(f"最后归档的WAL文件: {last_archived_wal}")
  177. # 检查最后归档的文件是否在目录中
  178. if check_wal_archived(last_archived_wal, archive_dir):
  179. logger.info("最后归档的文件存在于归档目录中")
  180. else:
  181. logger.error("最后归档的文件不存在于归档目录中!")
  182. # 检查是否有归档失败
  183. last_failed_wal = archive_status.get("last_failed_wal")
  184. if last_failed_wal:
  185. logger.error(f"最后归档失败的文件: {last_failed_wal}")
  186. logger.error(f"失败原因: {archive_status.get('last_archived_failure_message')}")
  187. # 5. 获取未归档的文件列表
  188. missing_files = get_missing_wal_files(conn, archive_dir)
  189. if missing_files:
  190. logger.warning(f"发现 {len(missing_files)} 个未归档的WAL文件:")
  191. for i, file in enumerate(missing_files[:10]): # 最多显示前10个
  192. logger.warning(f" {i+1}. {file}")
  193. if len(missing_files) > 10:
  194. logger.warning(f" 还有 {len(missing_files)-10} 个未显示...")
  195. else:
  196. logger.info("所有WAL文件均已归档")
  197. return True
  198. finally:
  199. conn.close()
  200. # -------------------------
  201. # 主函数
  202. # -------------------------
  203. if __name__ == "__main__":
  204. # 配置参数
  205. archive_dir = r"E:\code\backup\postgres\wal_archive" # 修改为您的归档目录
  206. db_params = {
  207. 'dbname': 'postgres',
  208. 'user': 'postgres',
  209. 'password': 'abc@1234',
  210. 'host': 'localhost',
  211. 'port': 5432
  212. }
  213. # 执行检查
  214. success = check_wal_archive_status(archive_dir, **db_params)
  215. if success:
  216. logger.info("WAL归档状态检查完成")
  217. sys.exit(0)
  218. else:
  219. logger.error("WAL归档状态检查失败")
  220. sys.exit(1)