db_health_check.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. """
  2. 数据库健康检查工具
  3. @description: 检查数据库连接状态、迁移状态和表结构完整性
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import os
  8. import sys
  9. import logging
  10. from datetime import datetime
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from app.database import engine, Base
  14. from alembic.config import Config
  15. from alembic.runtime.migration import MigrationContext
  16. from alembic.script import ScriptDirectory
  17. from sqlalchemy import text
  18. import traceback
  19. # 设置日志
  20. logging.basicConfig(
  21. level=logging.INFO,
  22. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  23. )
  24. logger = logging.getLogger(__name__)
  25. class DatabaseHealthChecker:
  26. """
  27. 数据库健康检查器
  28. @description: 全面检查数据库状态,包括连接、迁移、表结构等
  29. """
  30. def __init__(self):
  31. self.engine = engine
  32. self.alembic_cfg = Config(os.path.join(os.path.dirname(os.path.dirname(__file__)), "alembic.ini"))
  33. self.results = {
  34. "timestamp": datetime.now().isoformat(),
  35. "connection": False,
  36. "migration_status": "unknown",
  37. "tables_exist": False,
  38. "issues": [],
  39. "recommendations": []
  40. }
  41. def check_database_connection(self):
  42. """
  43. 检查数据库连接
  44. @returns: bool 连接是否成功
  45. """
  46. try:
  47. with self.engine.connect() as conn:
  48. # 执行简单查询测试连接
  49. result = conn.execute(text("SELECT 1"))
  50. result.fetchone()
  51. self.results["connection"] = True
  52. logger.info("✓ 数据库连接正常")
  53. return True
  54. except Exception as e:
  55. self.results["connection"] = False
  56. self.results["issues"].append(f"数据库连接失败: {str(e)}")
  57. logger.error(f"✗ 数据库连接失败: {str(e)}")
  58. return False
  59. def check_migration_status(self):
  60. """
  61. 检查数据库迁移状态
  62. @returns: dict 迁移状态信息
  63. """
  64. try:
  65. with self.engine.connect() as connection:
  66. context = MigrationContext.configure(connection)
  67. current_rev = context.get_current_revision()
  68. script_dir = ScriptDirectory.from_config(self.alembic_cfg)
  69. head_rev = script_dir.get_current_head()
  70. migration_info = {
  71. "current_revision": current_rev,
  72. "head_revision": head_rev,
  73. "is_up_to_date": current_rev == head_rev
  74. }
  75. if migration_info["is_up_to_date"]:
  76. self.results["migration_status"] = "up_to_date"
  77. logger.info(f"✓ 数据库迁移状态正常 (版本: {current_rev})")
  78. else:
  79. self.results["migration_status"] = "outdated"
  80. self.results["issues"].append(f"数据库版本过时: 当前 {current_rev}, 最新 {head_rev}")
  81. self.results["recommendations"].append("执行 'python db_migrate.py upgrade' 升级数据库")
  82. logger.warning(f"⚠ 数据库需要升级: {current_rev} -> {head_rev}")
  83. return migration_info
  84. except Exception as e:
  85. self.results["migration_status"] = "error"
  86. self.results["issues"].append(f"迁移状态检查失败: {str(e)}")
  87. logger.error(f"✗ 迁移状态检查失败: {str(e)}")
  88. return {"error": str(e)}
  89. def check_table_structure(self):
  90. """
  91. 检查表结构完整性
  92. @returns: dict 表结构检查结果
  93. """
  94. try:
  95. with self.engine.connect() as conn:
  96. # 获取所有表名
  97. tables_query = text("""
  98. SELECT table_name
  99. FROM information_schema.tables
  100. WHERE table_schema = 'public'
  101. """)
  102. existing_tables = [row[0] for row in conn.execute(tables_query).fetchall()]
  103. # 获取模型定义的表名
  104. model_tables = [table.name for table in Base.metadata.tables.values()]
  105. # 检查缺失的表
  106. missing_tables = set(model_tables) - set(existing_tables)
  107. extra_tables = set(existing_tables) - set(model_tables) - {'alembic_version'}
  108. table_info = {
  109. "existing_tables": existing_tables,
  110. "model_tables": model_tables,
  111. "missing_tables": list(missing_tables),
  112. "extra_tables": list(extra_tables)
  113. }
  114. if missing_tables:
  115. self.results["tables_exist"] = False
  116. self.results["issues"].append(f"缺失表: {', '.join(missing_tables)}")
  117. self.results["recommendations"].append("执行数据库升级或重新创建表结构")
  118. logger.warning(f"⚠ 缺失表: {', '.join(missing_tables)}")
  119. else:
  120. self.results["tables_exist"] = True
  121. logger.info("✓ 所有必需的表都存在")
  122. if extra_tables:
  123. logger.info(f"发现额外表: {', '.join(extra_tables)}")
  124. return table_info
  125. except Exception as e:
  126. self.results["tables_exist"] = False
  127. self.results["issues"].append(f"表结构检查失败: {str(e)}")
  128. logger.error(f"✗ 表结构检查失败: {str(e)}")
  129. return {"error": str(e)}
  130. def check_spatial_extensions(self):
  131. """
  132. 检查空间扩展 (PostGIS)
  133. @returns: dict 空间扩展状态
  134. """
  135. try:
  136. with self.engine.connect() as conn:
  137. # 检查PostGIS扩展
  138. postgis_query = text("""
  139. SELECT extname, extversion
  140. FROM pg_extension
  141. WHERE extname = 'postgis'
  142. """)
  143. postgis_result = conn.execute(postgis_query).fetchall()
  144. if postgis_result:
  145. version = postgis_result[0][1]
  146. logger.info(f"✓ PostGIS 扩展已安装 (版本: {version})")
  147. return {"installed": True, "version": version}
  148. else:
  149. self.results["issues"].append("PostGIS 扩展未安装")
  150. self.results["recommendations"].append("安装 PostGIS 扩展: CREATE EXTENSION postgis;")
  151. logger.warning("⚠ PostGIS 扩展未安装")
  152. return {"installed": False}
  153. except Exception as e:
  154. self.results["issues"].append(f"空间扩展检查失败: {str(e)}")
  155. logger.error(f"✗ 空间扩展检查失败: {str(e)}")
  156. return {"error": str(e)}
  157. def run_full_check(self):
  158. """
  159. 执行完整的健康检查
  160. @returns: dict 完整的检查结果
  161. """
  162. logger.info("开始数据库健康检查...")
  163. logger.info("=" * 50)
  164. # 检查数据库连接
  165. if not self.check_database_connection():
  166. logger.error("数据库连接失败,跳过后续检查")
  167. return self.results
  168. # 检查迁移状态
  169. migration_info = self.check_migration_status()
  170. # 检查表结构
  171. table_info = self.check_table_structure()
  172. # 检查空间扩展
  173. spatial_info = self.check_spatial_extensions()
  174. # 汇总结果
  175. self.results.update({
  176. "migration_info": migration_info,
  177. "table_info": table_info,
  178. "spatial_info": spatial_info
  179. })
  180. logger.info("=" * 50)
  181. logger.info("健康检查完成")
  182. # 打印总结
  183. if self.results["issues"]:
  184. logger.warning(f"发现 {len(self.results['issues'])} 个问题:")
  185. for issue in self.results["issues"]:
  186. logger.warning(f" - {issue}")
  187. if self.results["recommendations"]:
  188. logger.info(f"建议执行 {len(self.results['recommendations'])} 项操作:")
  189. for rec in self.results["recommendations"]:
  190. logger.info(f" - {rec}")
  191. if not self.results["issues"]:
  192. logger.info("✓ 数据库状态良好")
  193. return self.results
  194. def main():
  195. """
  196. 主函数
  197. """
  198. try:
  199. checker = DatabaseHealthChecker()
  200. results = checker.run_full_check()
  201. # 根据结果设置退出码
  202. if results["issues"]:
  203. sys.exit(1) # 有问题时退出码为1
  204. else:
  205. sys.exit(0) # 正常时退出码为0
  206. except Exception as e:
  207. logger.error(f"健康检查执行失败: {str(e)}")
  208. logger.error(f"错误详情: {traceback.format_exc()}")
  209. sys.exit(2) # 检查失败时退出码为2
  210. if __name__ == "__main__":
  211. main()