database.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from sqlalchemy import create_engine
  2. from sqlalchemy.orm import sessionmaker
  3. from sqlalchemy.ext.declarative import declarative_base
  4. import os
  5. from dotenv import load_dotenv
  6. import logging
  7. from sqlalchemy.exc import SQLAlchemyError
  8. # 配置日志
  9. logging.basicConfig(level=logging.INFO)
  10. logger = logging.getLogger(__name__)
  11. # 创建Base类
  12. Base = declarative_base()
  13. # 加载环境变量
  14. load_dotenv("config.env")
  15. # 从环境变量获取数据库连接信息
  16. DB_USER = os.getenv("DB_USER", "postgres")
  17. DB_PASSWORD = os.getenv("DB_PASSWORD", "root")
  18. DB_HOST = os.getenv("DB_HOST", "localhost")
  19. DB_PORT = os.getenv("DB_PORT", "5432")
  20. DB_NAME = os.getenv("DB_NAME", "testdb")
  21. print(DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME)
  22. # 构建数据库连接URL
  23. SQLALCHEMY_DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
  24. def create_database_engine():
  25. """创建并配置数据库引擎"""
  26. try:
  27. engine = create_engine(
  28. SQLALCHEMY_DATABASE_URL,
  29. pool_size=5,
  30. max_overflow=10,
  31. pool_timeout=30,
  32. pool_recycle=1800
  33. )
  34. return engine
  35. except Exception as e:
  36. logger.error(f"创建数据库引擎失败: {str(e)}")
  37. raise
  38. def test_database_connection(engine):
  39. """测试数据库连接"""
  40. try:
  41. with engine.connect() as conn:
  42. logger.info("数据库连接测试成功")
  43. return True
  44. except SQLAlchemyError as e:
  45. logger.error(f"数据库连接测试失败: {str(e)}")
  46. return False
  47. # 创建数据库引擎
  48. engine = create_database_engine()
  49. # 测试数据库连接
  50. if not test_database_connection(engine):
  51. raise Exception("无法连接到数据库,请检查数据库配置")
  52. # 创建会话工厂
  53. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  54. def get_db():
  55. """获取数据库会话"""
  56. db = SessionLocal()
  57. try:
  58. yield db
  59. except SQLAlchemyError as e:
  60. logger.error(f"数据库操作错误: {str(e)}")
  61. raise
  62. finally:
  63. db.close()
  64. def execute_sql(sql_statement):
  65. """
  66. 执行原始SQL语句
  67. Args:
  68. sql_statement: 要执行的SQL语句
  69. Returns:
  70. 执行结果
  71. """
  72. try:
  73. with engine.begin() as connection:
  74. result = connection.execute(sql_statement)
  75. return result
  76. except SQLAlchemyError as e:
  77. logger.error(f"执行SQL语句失败: {str(e)}")
  78. raise