123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- """
- 数据库健康检查工具
- @description: 检查数据库连接状态、迁移状态和表结构完整性
- @author: AcidMap Team
- @version: 1.0.0
- """
- import os
- import sys
- import logging
- from datetime import datetime
- # 添加项目根目录到Python路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from app.database import engine, Base
- from alembic.config import Config
- from alembic.runtime.migration import MigrationContext
- from alembic.script import ScriptDirectory
- from sqlalchemy import text
- import traceback
- # 设置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class DatabaseHealthChecker:
- """
- 数据库健康检查器
-
- @description: 全面检查数据库状态,包括连接、迁移、表结构等
- """
-
- def __init__(self):
- self.engine = engine
- self.alembic_cfg = Config(os.path.join(os.path.dirname(os.path.dirname(__file__)), "alembic.ini"))
- self.results = {
- "timestamp": datetime.now().isoformat(),
- "connection": False,
- "migration_status": "unknown",
- "tables_exist": False,
- "issues": [],
- "recommendations": []
- }
-
- def check_database_connection(self):
- """
- 检查数据库连接
-
- @returns: bool 连接是否成功
- """
- try:
- with self.engine.connect() as conn:
- # 执行简单查询测试连接
- result = conn.execute(text("SELECT 1"))
- result.fetchone()
-
- self.results["connection"] = True
- logger.info("✓ 数据库连接正常")
- return True
-
- except Exception as e:
- self.results["connection"] = False
- self.results["issues"].append(f"数据库连接失败: {str(e)}")
- logger.error(f"✗ 数据库连接失败: {str(e)}")
- return False
-
- def check_migration_status(self):
- """
- 检查数据库迁移状态
-
- @returns: dict 迁移状态信息
- """
- try:
- with self.engine.connect() as connection:
- context = MigrationContext.configure(connection)
- current_rev = context.get_current_revision()
-
- script_dir = ScriptDirectory.from_config(self.alembic_cfg)
- head_rev = script_dir.get_current_head()
-
- migration_info = {
- "current_revision": current_rev,
- "head_revision": head_rev,
- "is_up_to_date": current_rev == head_rev
- }
-
- if migration_info["is_up_to_date"]:
- self.results["migration_status"] = "up_to_date"
- logger.info(f"✓ 数据库迁移状态正常 (版本: {current_rev})")
- else:
- self.results["migration_status"] = "outdated"
- self.results["issues"].append(f"数据库版本过时: 当前 {current_rev}, 最新 {head_rev}")
- self.results["recommendations"].append("执行 'python db_migrate.py upgrade' 升级数据库")
- logger.warning(f"⚠ 数据库需要升级: {current_rev} -> {head_rev}")
-
- return migration_info
-
- except Exception as e:
- self.results["migration_status"] = "error"
- self.results["issues"].append(f"迁移状态检查失败: {str(e)}")
- logger.error(f"✗ 迁移状态检查失败: {str(e)}")
- return {"error": str(e)}
-
- def check_table_structure(self):
- """
- 检查表结构完整性
-
- @returns: dict 表结构检查结果
- """
- try:
- with self.engine.connect() as conn:
- # 获取所有表名
- tables_query = text("""
- SELECT table_name
- FROM information_schema.tables
- WHERE table_schema = 'public'
- """)
- existing_tables = [row[0] for row in conn.execute(tables_query).fetchall()]
-
- # 获取模型定义的表名
- model_tables = [table.name for table in Base.metadata.tables.values()]
-
- # 检查缺失的表
- missing_tables = set(model_tables) - set(existing_tables)
- extra_tables = set(existing_tables) - set(model_tables) - {'alembic_version'}
-
- table_info = {
- "existing_tables": existing_tables,
- "model_tables": model_tables,
- "missing_tables": list(missing_tables),
- "extra_tables": list(extra_tables)
- }
-
- if missing_tables:
- self.results["tables_exist"] = False
- self.results["issues"].append(f"缺失表: {', '.join(missing_tables)}")
- self.results["recommendations"].append("执行数据库升级或重新创建表结构")
- logger.warning(f"⚠ 缺失表: {', '.join(missing_tables)}")
- else:
- self.results["tables_exist"] = True
- logger.info("✓ 所有必需的表都存在")
-
- if extra_tables:
- logger.info(f"发现额外表: {', '.join(extra_tables)}")
-
- return table_info
-
- except Exception as e:
- self.results["tables_exist"] = False
- self.results["issues"].append(f"表结构检查失败: {str(e)}")
- logger.error(f"✗ 表结构检查失败: {str(e)}")
- return {"error": str(e)}
-
- def check_spatial_extensions(self):
- """
- 检查空间扩展 (PostGIS)
-
- @returns: dict 空间扩展状态
- """
- try:
- with self.engine.connect() as conn:
- # 检查PostGIS扩展
- postgis_query = text("""
- SELECT extname, extversion
- FROM pg_extension
- WHERE extname = 'postgis'
- """)
- postgis_result = conn.execute(postgis_query).fetchall()
-
- if postgis_result:
- version = postgis_result[0][1]
- logger.info(f"✓ PostGIS 扩展已安装 (版本: {version})")
- return {"installed": True, "version": version}
- else:
- self.results["issues"].append("PostGIS 扩展未安装")
- self.results["recommendations"].append("安装 PostGIS 扩展: CREATE EXTENSION postgis;")
- logger.warning("⚠ PostGIS 扩展未安装")
- return {"installed": False}
-
- except Exception as e:
- self.results["issues"].append(f"空间扩展检查失败: {str(e)}")
- logger.error(f"✗ 空间扩展检查失败: {str(e)}")
- return {"error": str(e)}
-
- def run_full_check(self):
- """
- 执行完整的健康检查
-
- @returns: dict 完整的检查结果
- """
- logger.info("开始数据库健康检查...")
- logger.info("=" * 50)
-
- # 检查数据库连接
- if not self.check_database_connection():
- logger.error("数据库连接失败,跳过后续检查")
- return self.results
-
- # 检查迁移状态
- migration_info = self.check_migration_status()
-
- # 检查表结构
- table_info = self.check_table_structure()
-
- # 检查空间扩展
- spatial_info = self.check_spatial_extensions()
-
- # 汇总结果
- self.results.update({
- "migration_info": migration_info,
- "table_info": table_info,
- "spatial_info": spatial_info
- })
-
- logger.info("=" * 50)
- logger.info("健康检查完成")
-
- # 打印总结
- if self.results["issues"]:
- logger.warning(f"发现 {len(self.results['issues'])} 个问题:")
- for issue in self.results["issues"]:
- logger.warning(f" - {issue}")
-
- if self.results["recommendations"]:
- logger.info(f"建议执行 {len(self.results['recommendations'])} 项操作:")
- for rec in self.results["recommendations"]:
- logger.info(f" - {rec}")
-
- if not self.results["issues"]:
- logger.info("✓ 数据库状态良好")
-
- return self.results
- def main():
- """
- 主函数
- """
- try:
- checker = DatabaseHealthChecker()
- results = checker.run_full_check()
-
- # 根据结果设置退出码
- if results["issues"]:
- sys.exit(1) # 有问题时退出码为1
- else:
- sys.exit(0) # 正常时退出码为0
-
- except Exception as e:
- logger.error(f"健康检查执行失败: {str(e)}")
- logger.error(f"错误详情: {traceback.format_exc()}")
- sys.exit(2) # 检查失败时退出码为2
- if __name__ == "__main__":
- main()
|