db_migrate.py 6.0 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 数据库迁移辅助脚本
  5. 此脚本提供简单的命令行接口来执行常见的数据库迁移操作。
  6. """
  7. import argparse
  8. import os
  9. import sys
  10. import importlib
  11. import traceback
  12. from alembic.config import Config
  13. from alembic import command
  14. # 直接设置环境变量,确保所有地方使用相同的数据库连接信息
  15. os.environ["DB_USER"] = "postgres"
  16. os.environ["DB_PASSWORD"] = "123456789Qq"
  17. os.environ["DB_HOST"] = "localhost"
  18. os.environ["DB_PORT"] = "5432"
  19. os.environ["DB_NAME"] = "soilgd"
  20. def get_alembic_config():
  21. """
  22. 获取 Alembic 配置
  23. Returns:
  24. Alembic 配置对象
  25. """
  26. # 使用相对路径获取配置文件
  27. script_dir = os.path.dirname(os.path.abspath(__file__))
  28. alembic_ini_path = os.path.join(script_dir, 'alembic.ini')
  29. print(f"使用配置文件: {alembic_ini_path}")
  30. # 创建配置对象
  31. alembic_cfg = Config(alembic_ini_path)
  32. return alembic_cfg
  33. def create_migration(message, auto=True):
  34. """
  35. 创建新的迁移
  36. Args:
  37. message: 迁移说明
  38. auto: 是否自动生成迁移内容
  39. """
  40. cfg = get_alembic_config()
  41. try:
  42. print(f"开始创建迁移: {message}, 自动生成: {auto}")
  43. if auto:
  44. command.revision(cfg, message=message, autogenerate=True)
  45. else:
  46. command.revision(cfg, message=message, autogenerate=False)
  47. print(f"成功创建迁移 '{message}'")
  48. return 0
  49. except Exception as e:
  50. print(f"创建迁移失败: {str(e)}")
  51. print("详细错误信息:")
  52. traceback.print_exc()
  53. return 1
  54. def upgrade(target="head"):
  55. """
  56. 升级数据库
  57. Args:
  58. target: 目标版本,默认为最新版本
  59. """
  60. cfg = get_alembic_config()
  61. try:
  62. command.upgrade(cfg, target)
  63. print(f"成功升级数据库到 '{target}'")
  64. return 0
  65. except Exception as e:
  66. print(f"升级数据库失败: {str(e)}")
  67. return 1
  68. def downgrade(target="-1"):
  69. """
  70. 降级数据库
  71. Args:
  72. target: 目标版本,默认为上一个版本
  73. """
  74. cfg = get_alembic_config()
  75. try:
  76. command.downgrade(cfg, target)
  77. print(f"成功降级数据库到 '{target}'")
  78. return 0
  79. except Exception as e:
  80. print(f"降级数据库失败: {str(e)}")
  81. return 1
  82. def show_history():
  83. """显示迁移历史"""
  84. cfg = get_alembic_config()
  85. try:
  86. command.history(cfg)
  87. return 0
  88. except Exception as e:
  89. print(f"显示历史失败: {str(e)}")
  90. return 1
  91. def show_current():
  92. """显示当前版本"""
  93. cfg = get_alembic_config()
  94. try:
  95. command.current(cfg)
  96. return 0
  97. except Exception as e:
  98. print(f"显示当前版本失败: {str(e)}")
  99. return 1
  100. def stamp_revision(target="head"):
  101. """
  102. 将数据库版本标记为指定版本,而不运行任何迁移
  103. Args:
  104. target: 目标版本,默认为最新版本
  105. """
  106. cfg = get_alembic_config()
  107. try:
  108. command.stamp(cfg, target)
  109. print(f"成功将数据库版本标记为 '{target}'")
  110. return 0
  111. except Exception as e:
  112. print(f"标记数据库版本失败: {str(e)}")
  113. return 1
  114. def execute_sql(sql_statement):
  115. """
  116. 直接执行 SQL 语句
  117. Args:
  118. sql_statement: 要执行的 SQL 语句
  119. """
  120. try:
  121. from app.database import execute_sql
  122. execute_sql(sql_statement)
  123. print(f"成功执行 SQL: {sql_statement}")
  124. return 0
  125. except Exception as e:
  126. print(f"执行 SQL 失败: {str(e)}")
  127. return 1
  128. def main():
  129. """主函数"""
  130. parser = argparse.ArgumentParser(description='数据库迁移辅助工具')
  131. subparsers = parser.add_subparsers(dest='command', help='命令')
  132. # 检查 alembic 是否已安装
  133. try:
  134. importlib.import_module('alembic')
  135. except ImportError:
  136. print("错误: 未找到 alembic 模块,请先安装: pip install alembic")
  137. return 1
  138. # 创建迁移
  139. create_parser = subparsers.add_parser('create', help='创建新的迁移')
  140. create_parser.add_argument('message', help='迁移说明')
  141. create_parser.add_argument('--manual', action='store_true', help='手动创建迁移(不自动生成)')
  142. # 升级数据库
  143. upgrade_parser = subparsers.add_parser('upgrade', help='升级数据库')
  144. upgrade_parser.add_argument('target', nargs='?', default='head', help='目标版本,默认为最新版本')
  145. # 降级数据库
  146. downgrade_parser = subparsers.add_parser('downgrade', help='降级数据库')
  147. downgrade_parser.add_argument('target', nargs='?', default='-1', help='目标版本,默认为上一个版本')
  148. # 标记数据库版本
  149. stamp_parser = subparsers.add_parser('stamp', help='标记数据库版本,不执行迁移')
  150. stamp_parser.add_argument('target', nargs='?', default='head', help='目标版本,默认为最新版本')
  151. # 执行 SQL
  152. sql_parser = subparsers.add_parser('sql', help='执行 SQL 语句')
  153. sql_parser.add_argument('statement', help='要执行的 SQL 语句')
  154. # 显示历史
  155. subparsers.add_parser('history', help='显示迁移历史')
  156. # 显示当前版本
  157. subparsers.add_parser('current', help='显示当前版本')
  158. args = parser.parse_args()
  159. if args.command == 'create':
  160. return create_migration(args.message, not args.manual)
  161. elif args.command == 'upgrade':
  162. return upgrade(args.target)
  163. elif args.command == 'downgrade':
  164. return downgrade(args.target)
  165. elif args.command == 'stamp':
  166. return stamp_revision(args.target)
  167. elif args.command == 'sql':
  168. return execute_sql(args.statement)
  169. elif args.command == 'history':
  170. return show_history()
  171. elif args.command == 'current':
  172. return show_current()
  173. else:
  174. parser.print_help()
  175. return 0
  176. if __name__ == '__main__':
  177. sys.exit(main())