cd_prediction_database_service.py 30 KB


  1. """
  2. 基于数据库的Cd预测服务类
  3. @description: 从数据库表中读取数据进行预测并更新结果表
  4. @author: AcidMap Team
  5. """
  6. import os
  7. import logging
  8. import asyncio
  9. from datetime import datetime
  10. from typing import Dict, Any, Optional, List, Tuple
  11. import pandas as pd
  12. import numpy as np
  13. from sqlalchemy.orm import Session
  14. from sqlalchemy import and_
  15. from ..database import SessionLocal
  16. from ..models.farmland import FarmlandData
  17. from ..models.EffCd_input import EffCdInputData
  18. from ..models.EffCd_output import EffCdOutputData
  19. from ..models.CropCd_input import CropCdInputData
  20. from ..models.CropCd_output import CropCdOutputData
  21. from .cd_prediction_service_v3 import CdPredictionServiceV3
  22. from .admin_boundary_service import get_boundary_geojson_by_name
  23. from ..log.logger import get_logger
  24. import tempfile
  25. import json
  26. class CdPredictionDatabaseService:
  27. """
  28. 基于数据库的Cd预测服务类
  29. 从数据库表读取输入数据,执行预测,并将结果保存回数据库
  30. """
  31. def __init__(self):
  32. """初始化数据库预测服务"""
  33. self.logger = get_logger(__name__)
  34. self.prediction_service = CdPredictionServiceV3()
  35. self.logger.info("数据库Cd预测服务初始化完成")
  36. def _get_database_session(self) -> Session:
  37. """获取数据库会话"""
  38. return SessionLocal()
  39. def _query_effective_cd_input_data(self, db: Session) -> pd.DataFrame:
  40. """
  41. 从EffCd_input_data和Farmland_data表查询输入数据
  42. @param db: 数据库会话
  43. @returns: 包含经纬度和环境因子的DataFrame
  44. """
  45. try:
  46. # 构建查询,联接EffCd_input_data和Farmland_data表
  47. query = db.query(
  48. FarmlandData.lon,
  49. FarmlandData.lan,
  50. EffCdInputData.oc_fe_0_30,
  51. EffCdInputData.silt_content,
  52. EffCdInputData.sand_content,
  53. EffCdInputData.gravel_content,
  54. EffCdInputData.available_potassium,
  55. EffCdInputData.available_phosphorus,
  56. EffCdInputData.electrical_conductivity,
  57. EffCdInputData.slow_available_potassium,
  58. EffCdInputData.total_aluminum,
  59. EffCdInputData.total_calcium,
  60. EffCdInputData.total_cadmium, # 新增
  61. EffCdInputData.soluble_salts, # 新增
  62. EffCdInputData.exchangeable_acidity, # 新增
  63. EffCdInputData.total_iron,
  64. EffCdInputData.total_potassium, # 新增
  65. EffCdInputData.total_magnesium,
  66. EffCdInputData.total_manganese,
  67. EffCdInputData.total_nitrogen,
  68. EffCdInputData.total_phosphorus, # 新增
  69. EffCdInputData.total_sulfur,
  70. EffCdInputData.cd_solution,
  71. EffCdInputData.farmland_id,
  72. EffCdInputData.sample_id
  73. ).join(
  74. FarmlandData,
  75. and_(
  76. EffCdInputData.farmland_id == FarmlandData.farmland_id,
  77. EffCdInputData.sample_id == FarmlandData.sample_id
  78. )
  79. )
  80. # 执行查询并转换为DataFrame
  81. results = query.all()
  82. if not results:
  83. raise ValueError("未找到符合条件的EffCd输入数据")
  84. # 转换为DataFrame
  85. df = pd.DataFrame([
  86. {
  87. 'lon': row.lon,
  88. 'lan': row.lan,
  89. 'OC-Fe_0-30': row.oc_fe_0_30,
  90. '002_0002IDW': row.silt_content,
  91. '02_002IDW': row.sand_content,
  92. '2_02IDW': row.gravel_content,
  93. 'AvaK_IDW': row.available_potassium,
  94. 'AvaP_IDW': row.available_phosphorus,
  95. 'EC_IDW': row.electrical_conductivity,
  96. 'SAvaK_IDW': row.slow_available_potassium,
  97. 'TAl_IDW': row.total_aluminum,
  98. 'TCa_IDW': row.total_calcium,
  99. 'TCd_IDW': row.total_cadmium, # 新增
  100. 'TEB_IDW': row.soluble_salts, # 新增
  101. 'TExH_IDW': row.exchangeable_acidity, # 新增
  102. 'TFe_IDW': row.total_iron,
  103. 'TK_IDW': row.total_potassium, # 新增
  104. 'TMg_IDW': row.total_magnesium,
  105. 'TMn_IDW': row.total_manganese,
  106. 'TN_IDW': row.total_nitrogen,
  107. 'TP_IDW': row.total_phosphorus, # 新增
  108. 'TS_IDW': row.total_sulfur,
  109. 'Cdsolution': row.cd_solution, # 修正列名
  110. 'farmland_id': row.farmland_id,
  111. 'sample_id': row.sample_id
  112. }
  113. for row in results
  114. ])
  115. self.logger.info(f"成功查询到{len(df)}条EffCd输入数据记录")
  116. return df
  117. except Exception as e:
  118. self.logger.error(f"查询EffCd输入数据失败: {str(e)}")
  119. raise
  120. def _query_crop_cd_input_data(self, db: Session) -> pd.DataFrame:
  121. """
  122. 从CropCd_input_data和Farmland_data表查询输入数据
  123. @param db: 数据库会话
  124. @returns: 包含经纬度和环境因子的DataFrame
  125. """
  126. try:
  127. # 构建查询,联接CropCd_input_data和Farmland_data表
  128. query = db.query(
  129. FarmlandData.lon,
  130. FarmlandData.lan,
  131. CropCdInputData.silt_content, # 002_0002IDW
  132. CropCdInputData.sand_content, # 02_002IDW
  133. CropCdInputData.gravel_content, # 2_02IDW
  134. CropCdInputData.available_phosphorus, # AvaP_IDW (注意:这里对应CSV中的AvaP)
  135. CropCdInputData.available_potassium, # AvaK_IDW (注意:这里对应CSV中的AvaK)
  136. CropCdInputData.slow_available_potassium, # SAvaK_IDW
  137. CropCdInputData.total_aluminum, # TAl_IDW
  138. CropCdInputData.total_calcium, # TCa_IDW
  139. CropCdInputData.total_iron, # TFe_IDW
  140. CropCdInputData.total_magnesium, # TMg_IDW
  141. CropCdInputData.total_manganese, # TMn_IDW
  142. CropCdInputData.total_nitrogen, # TN_IDW
  143. CropCdInputData.total_sulfur, # TS_IDW
  144. CropCdInputData.ln_cd_solution, # solution
  145. CropCdInputData.farmland_id,
  146. CropCdInputData.sample_id
  147. ).join(
  148. FarmlandData,
  149. and_(
  150. CropCdInputData.farmland_id == FarmlandData.farmland_id,
  151. CropCdInputData.sample_id == FarmlandData.sample_id
  152. )
  153. )
  154. # 执行查询并转换为DataFrame
  155. results = query.all()
  156. if not results:
  157. raise ValueError("未找到符合条件的CropCd输入数据")
  158. # 转换为DataFrame
  159. df = pd.DataFrame([
  160. {
  161. 'lon': row.lon,
  162. 'lan': row.lan,
  163. '002_0002IDW': row.silt_content,
  164. '02_002IDW': row.sand_content,
  165. '2_02IDW': row.gravel_content,
  166. 'AvaP': row.available_phosphorus, # 注意:CSV中的AvaP对应数据库的AvaP_IDW字段
  167. 'AvaK_IDW': row.available_potassium,
  168. 'SAvaK_IDW': row.slow_available_potassium,
  169. 'TAl_IDW': row.total_aluminum,
  170. 'TCa_IDW': row.total_calcium,
  171. 'TFe_IDW': row.total_iron,
  172. 'TMg_IDW': row.total_magnesium,
  173. 'TMn_IDW': row.total_manganese,
  174. 'TN_IDW': row.total_nitrogen,
  175. 'TS_IDW': row.total_sulfur,
  176. 'solution': row.ln_cd_solution,
  177. 'farmland_id': row.farmland_id,
  178. 'sample_id': row.sample_id
  179. }
  180. for row in results
  181. ])
  182. self.logger.info(f"成功查询到{len(df)}条CropCd输入数据记录")
  183. return df
  184. except Exception as e:
  185. self.logger.error(f"查询CropCd输入数据失败: {str(e)}")
  186. raise
  187. def _prepare_prediction_data(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
  188. """
  189. 准备预测数据,分离坐标和环境因子
  190. @param df: 包含所有数据的DataFrame
  191. @returns: (坐标DataFrame, 环境因子DataFrame)
  192. """
  193. try:
  194. # 提取坐标数据 (前两列)
  195. coordinates_df = df[['lon', 'lan']].copy()
  196. coordinates_df.columns = ['longitude', 'latitude']
  197. # 提取环境因子数据 (去除坐标和ID列)
  198. exclude_cols = ['lon', 'lan', 'farmland_id', 'sample_id']
  199. environmental_cols = [col for col in df.columns if col not in exclude_cols]
  200. environmental_df = df[environmental_cols].copy()
  201. # 检查数据完整性
  202. missing_coords = coordinates_df.isnull().sum().sum()
  203. missing_env = environmental_df.isnull().sum().sum()
  204. if missing_coords > 0:
  205. self.logger.warning(f"坐标数据中有{missing_coords}个缺失值")
  206. if missing_env > 0:
  207. self.logger.warning(f"环境因子数据中有{missing_env}个缺失值")
  208. self.logger.info(f"准备预测数据完成: 坐标{coordinates_df.shape}, 环境因子{environmental_df.shape}")
  209. return coordinates_df, environmental_df
  210. except Exception as e:
  211. self.logger.error(f"准备预测数据失败: {str(e)}")
  212. raise
  213. def _save_predictions_to_database(self, db: Session, df: pd.DataFrame, predictions: np.ndarray) -> int:
  214. """
  215. 将预测结果批量保存到EffCd_output_data表(使用UPSERT操作)
  216. @param db: 数据库会话
  217. @param df: 原始数据DataFrame(包含farmland_id和sample_id)
  218. @param predictions: 预测结果数组
  219. @returns: 更新的记录数
  220. """
  221. try:
  222. self.logger.info(f"开始批量保存{len(predictions)}条预测结果...")
  223. # 首先删除所有现有记录(简单粗暴但高效)
  224. farmland_ids = df['farmland_id'].unique().tolist()
  225. delete_count = db.query(EffCdOutputData).filter(
  226. EffCdOutputData.farmland_id.in_(farmland_ids)
  227. ).delete(synchronize_session=False)
  228. if delete_count > 0:
  229. self.logger.info(f"清理了{delete_count}条旧记录")
  230. # 准备批量插入数据(向量化操作,避免逐行循环)
  231. batch_data = [
  232. {
  233. 'farmland_id': int(farmland_id),
  234. 'sample_id': int(sample_id),
  235. 'ln_eff_cd': float(prediction)
  236. }
  237. for farmland_id, sample_id, prediction in zip(
  238. df['farmland_id'].values,
  239. df['sample_id'].values,
  240. predictions
  241. )
  242. ]
  243. # 分批批量插入新记录(避免单次插入过多数据)
  244. batch_size = 5000 # 每批处理5000条记录
  245. total_inserted = 0
  246. for i in range(0, len(batch_data), batch_size):
  247. batch_chunk = batch_data[i:i + batch_size]
  248. db.bulk_insert_mappings(EffCdOutputData, batch_chunk)
  249. total_inserted += len(batch_chunk)
  250. self.logger.info(f"已插入 {total_inserted}/{len(batch_data)} 条记录")
  251. # 提交事务
  252. db.commit()
  253. self.logger.info(f"成功批量保存{total_inserted}条预测结果到EffCd_output_data表")
  254. return total_inserted
  255. except Exception as e:
  256. db.rollback()
  257. self.logger.error(f"批量保存预测结果到数据库失败: {str(e)}")
  258. raise
  259. def _save_crop_cd_predictions_to_database(self, db: Session, df: pd.DataFrame, predictions: np.ndarray) -> int:
  260. """
  261. 将作物Cd预测结果批量保存到CropCd_output_data表
  262. @param db: 数据库会话
  263. @param df: 原始数据DataFrame(包含farmland_id和sample_id)
  264. @param predictions: 预测结果数组
  265. @returns: 更新的记录数
  266. """
  267. try:
  268. self.logger.info(f"开始批量保存{len(predictions)}条作物Cd预测结果...")
  269. # 首先删除所有现有记录(简单粗暴但高效)
  270. farmland_ids = df['farmland_id'].unique().tolist()
  271. delete_count = db.query(CropCdOutputData).filter(
  272. CropCdOutputData.farmland_id.in_(farmland_ids)
  273. ).delete(synchronize_session=False)
  274. if delete_count > 0:
  275. self.logger.info(f"清理了{delete_count}条旧的作物Cd记录")
  276. # 准备批量插入数据(向量化操作,避免逐行循环)
  277. batch_data = [
  278. {
  279. 'farmland_id': int(farmland_id),
  280. 'sample_id': int(sample_id),
  281. 'ln_crop_cd': float(prediction)
  282. }
  283. for farmland_id, sample_id, prediction in zip(
  284. df['farmland_id'].values,
  285. df['sample_id'].values,
  286. predictions
  287. )
  288. ]
  289. # 分批批量插入新记录(避免单次插入过多数据)
  290. batch_size = 5000 # 每批处理5000条记录
  291. total_inserted = 0
  292. for i in range(0, len(batch_data), batch_size):
  293. batch_chunk = batch_data[i:i + batch_size]
  294. db.bulk_insert_mappings(CropCdOutputData, batch_chunk)
  295. total_inserted += len(batch_chunk)
  296. self.logger.info(f"已插入作物Cd预测结果 {total_inserted}/{len(batch_data)} 条记录")
  297. # 提交事务
  298. db.commit()
  299. self.logger.info(f"成功批量保存{total_inserted}条作物Cd预测结果到CropCd_output_data表")
  300. return total_inserted
  301. except Exception as e:
  302. db.rollback()
  303. self.logger.error(f"批量保存作物Cd预测结果到数据库失败: {str(e)}")
  304. raise
  305. async def generate_effective_cd_prediction_from_database(
  306. self,
  307. area: str,
  308. level: str,
  309. raster_config_override: Optional[Dict[str, Any]] = None
  310. ) -> Dict[str, Any]:
  311. """
  312. 基于数据库数据生成有效态Cd预测
  313. @param area: 地区名称
  314. @param level: 行政级别
  315. @param raster_config_override: 栅格配置覆盖参数
  316. @returns: 预测结果信息
  317. """
  318. db = None
  319. try:
  320. self.logger.info(f"开始基于数据库数据生成有效态Cd预测: {area} ({level})")
  321. # 获取数据库会话
  322. db = self._get_database_session()
  323. # 查询输入数据
  324. input_df = self._query_effective_cd_input_data(db)
  325. if len(input_df) == 0:
  326. raise ValueError("未找到有效的输入数据")
  327. # 准备预测数据
  328. coordinates_df, environmental_df = self._prepare_prediction_data(input_df)
  329. # 合并坐标和环境因子用于预测服务
  330. prediction_input_df = pd.concat([coordinates_df, environmental_df], axis=1)
  331. # 保存临时数据文件用于预测服务
  332. temp_file_path = self.prediction_service.save_temp_data(prediction_input_df, area)
  333. # 获取边界数据
  334. boundary_gdf = self._get_boundary_geojson(area, level)
  335. # 直接使用预测引擎进行预测和可视化
  336. prediction_result = await self._run_effective_cd_prediction_with_boundary(
  337. prediction_input_df, area, boundary_gdf, raster_config_override
  338. )
  339. # 从预测结果中提取预测值
  340. # 可以直接从返回的DataFrame中获取预测结果
  341. final_data_df = prediction_result.get('final_data_df')
  342. if final_data_df is not None and 'Prediction' in final_data_df.columns:
  343. predictions = final_data_df['Prediction'].values
  344. else:
  345. # 如果没有直接的DataFrame,尝试从文件读取
  346. final_data_file = prediction_result.get('final_data_file')
  347. if final_data_file and os.path.exists(final_data_file):
  348. final_df = pd.read_csv(final_data_file)
  349. predictions = final_df['Prediction'].values
  350. else:
  351. raise ValueError("无法获取预测结果数据")
  352. # 保存预测结果到数据库
  353. updated_count = self._save_predictions_to_database(db, input_df, predictions)
  354. result = {
  355. 'success': True,
  356. 'area': area,
  357. 'level': level,
  358. 'processed_records': len(input_df),
  359. 'updated_records': updated_count,
  360. 'map_path': prediction_result.get('map_path'),
  361. 'histogram_path': prediction_result.get('histogram_path'),
  362. 'raster_path': prediction_result.get('raster_path'),
  363. 'timestamp': prediction_result.get('timestamp'),
  364. 'validation': prediction_result.get('validation', {})
  365. }
  366. self.logger.info(f"基于数据库数据的有效态Cd预测完成: {area} ({level}), 处理{len(input_df)}条记录")
  367. return result
  368. except Exception as e:
  369. self.logger.error(f"基于数据库数据生成有效态Cd预测失败: {str(e)}")
  370. raise
  371. finally:
  372. if db:
  373. db.close()
  374. def get_effective_cd_results_from_database(
  375. self,
  376. limit: Optional[int] = None
  377. ) -> pd.DataFrame:
  378. """
  379. 从数据库获取有效态Cd预测结果
  380. @param limit: 可选的结果数量限制
  381. @returns: 包含预测结果的DataFrame
  382. """
  383. db = None
  384. try:
  385. db = self._get_database_session()
  386. # 构建查询,联接结果表和农田数据表获取坐标
  387. query = db.query(
  388. EffCdOutputData.farmland_id,
  389. EffCdOutputData.sample_id,
  390. EffCdOutputData.ln_eff_cd,
  391. FarmlandData.lon,
  392. FarmlandData.lan
  393. ).join(
  394. FarmlandData,
  395. and_(
  396. EffCdOutputData.farmland_id == FarmlandData.farmland_id,
  397. EffCdOutputData.sample_id == FarmlandData.sample_id
  398. )
  399. )
  400. # 添加数量限制
  401. if limit:
  402. query = query.limit(limit)
  403. # 执行查询
  404. results = query.all()
  405. if not results:
  406. return pd.DataFrame()
  407. # 转换为DataFrame
  408. df = pd.DataFrame([
  409. {
  410. 'farmland_id': row.farmland_id,
  411. 'sample_id': row.sample_id,
  412. 'longitude': row.lon,
  413. 'latitude': row.lan,
  414. 'LnEffCd': row.ln_eff_cd,
  415. 'EffCd': np.exp(row.ln_eff_cd) # 转换为实际有效态镉浓度
  416. }
  417. for row in results
  418. ])
  419. self.logger.info(f"成功查询到{len(df)}条有效态Cd预测结果")
  420. return df
  421. except Exception as e:
  422. self.logger.error(f"查询有效态Cd预测结果失败: {str(e)}")
  423. raise
  424. finally:
  425. if db:
  426. db.close()
  427. async def generate_crop_cd_prediction_from_database(
  428. self,
  429. area: str,
  430. level: str,
  431. raster_config_override: Optional[Dict[str, Any]] = None
  432. ) -> Dict[str, Any]:
  433. """
  434. 基于数据库数据生成作物Cd预测
  435. @param area: 地区名称
  436. @param level: 行政级别
  437. @param raster_config_override: 栅格配置覆盖参数
  438. @returns: 预测结果信息
  439. """
  440. db = None
  441. try:
  442. self.logger.info(f"开始基于数据库数据生成作物Cd预测: {area} ({level})")
  443. # 获取数据库会话
  444. db = self._get_database_session()
  445. # 查询输入数据
  446. input_df = self._query_crop_cd_input_data(db)
  447. if len(input_df) == 0:
  448. raise ValueError("未找到有效的作物Cd输入数据")
  449. # 准备预测数据
  450. coordinates_df, environmental_df = self._prepare_prediction_data(input_df)
  451. # 合并坐标和环境因子用于预测服务
  452. prediction_input_df = pd.concat([coordinates_df, environmental_df], axis=1)
  453. # 保存临时数据文件用于预测服务
  454. temp_file_path = self.prediction_service.save_temp_data(prediction_input_df, area)
  455. # 获取边界数据
  456. boundary_gdf = self._get_boundary_geojson(area, level)
  457. # 直接使用预测引擎进行预测和可视化
  458. prediction_result = await self._run_crop_cd_prediction_with_boundary(
  459. prediction_input_df, area, boundary_gdf, raster_config_override
  460. )
  461. # 从预测结果中提取预测值
  462. # 可以直接从返回的DataFrame中获取预测结果
  463. final_data_df = prediction_result.get('final_data_df')
  464. if final_data_df is not None and 'Prediction' in final_data_df.columns:
  465. predictions = final_data_df['Prediction'].values
  466. else:
  467. # 如果没有直接的DataFrame,尝试从文件读取
  468. final_data_file = prediction_result.get('final_data_file')
  469. if final_data_file and os.path.exists(final_data_file):
  470. final_df = pd.read_csv(final_data_file)
  471. predictions = final_df['Prediction'].values
  472. else:
  473. raise ValueError("无法获取作物Cd预测结果数据")
  474. # 保存预测结果到数据库
  475. updated_count = self._save_crop_cd_predictions_to_database(db, input_df, predictions)
  476. result = {
  477. 'success': True,
  478. 'area': area,
  479. 'level': level,
  480. 'processed_records': len(input_df),
  481. 'updated_records': updated_count,
  482. 'map_path': prediction_result.get('map_path'),
  483. 'histogram_path': prediction_result.get('histogram_path'),
  484. 'raster_path': prediction_result.get('raster_path'),
  485. 'timestamp': prediction_result.get('timestamp'),
  486. 'validation': prediction_result.get('validation', {})
  487. }
  488. self.logger.info(f"基于数据库数据的作物Cd预测完成: {area} ({level}), 处理{len(input_df)}条记录")
  489. return result
  490. except Exception as e:
  491. self.logger.error(f"基于数据库数据生成作物Cd预测失败: {str(e)}")
  492. raise
  493. finally:
  494. if db:
  495. db.close()
  496. async def _run_crop_cd_prediction_with_boundary(
  497. self,
  498. input_data: pd.DataFrame,
  499. area: str,
  500. boundary_gdf,
  501. raster_config_override: Optional[Dict[str, Any]] = None
  502. ) -> Dict[str, Any]:
  503. """
  504. 使用边界数据执行作物Cd预测
  505. @param input_data: 输入数据DataFrame
  506. @param area: 地区名称
  507. @param boundary_gdf: 边界GeoDataFrame
  508. @param raster_config_override: 栅格配置覆盖参数
  509. @returns: 预测结果
  510. """
  511. try:
  512. # 在线程池中运行预测
  513. loop = asyncio.get_event_loop()
  514. result = await loop.run_in_executor(
  515. None,
  516. self._run_crop_cd_prediction_sync,
  517. input_data, area, boundary_gdf, raster_config_override
  518. )
  519. return result
  520. except Exception as e:
  521. self.logger.error(f"执行作物Cd预测失败: {str(e)}")
  522. raise
  523. def _run_crop_cd_prediction_sync(
  524. self,
  525. input_data: pd.DataFrame,
  526. area: str,
  527. boundary_gdf,
  528. raster_config_override: Optional[Dict[str, Any]] = None
  529. ) -> Dict[str, Any]:
  530. """
  531. 同步执行作物Cd预测(用于线程池)
  532. @param input_data: 输入数据DataFrame
  533. @param area: 地区名称
  534. @param boundary_gdf: 边界GeoDataFrame
  535. @param raster_config_override: 栅格配置覆盖参数
  536. @returns: 预测结果
  537. """
  538. try:
  539. # 使用预测引擎的predict_and_visualize方法
  540. result = self.prediction_service.engine.predict_and_visualize(
  541. input_data=input_data,
  542. model_type="crop_cd", # 使用作物Cd模型
  543. county_name=area, # 使用area作为county_name
  544. boundary_gdf=boundary_gdf,
  545. raster_config_override=raster_config_override,
  546. save_raster=False # 不保存栅格文件,节省存储空间
  547. )
  548. return result
  549. except Exception as e:
  550. self.logger.error(f"同步作物Cd预测执行失败: {str(e)}")
  551. raise
  552. def _get_boundary_geojson(self, area: str, level: str) -> Optional[object]:
  553. """
  554. 获取指定区域的边界GeoDataFrame
  555. @param area: 地区名称
  556. @param level: 行政级别
  557. @returns: GeoDataFrame对象或None
  558. """
  559. try:
  560. db = self._get_database_session()
  561. feature = get_boundary_geojson_by_name(db, area, level)
  562. if feature:
  563. # 将feature转换为GeoDataFrame
  564. import geopandas as gpd
  565. # 创建临时GeoJSON文件
  566. tmp_dir = tempfile.mkdtemp()
  567. tmp_geojson = os.path.join(tmp_dir, "boundary.geojson")
  568. fc = {"type": "FeatureCollection", "features": [feature]}
  569. with open(tmp_geojson, 'w', encoding='utf-8') as f:
  570. json.dump(fc, f, ensure_ascii=False)
  571. # 读取为GeoDataFrame
  572. boundary_gdf = gpd.read_file(tmp_geojson)
  573. # 清理临时文件
  574. import shutil
  575. shutil.rmtree(tmp_dir, ignore_errors=True)
  576. self.logger.info(f"成功获取边界数据: {area} ({level})")
  577. return boundary_gdf
  578. return None
  579. except Exception as e:
  580. self.logger.warning(f"获取边界数据失败: {str(e)}")
  581. return None
  582. finally:
  583. try:
  584. db.close()
  585. except Exception:
  586. pass
  587. async def _run_effective_cd_prediction_with_boundary(
  588. self,
  589. input_data: pd.DataFrame,
  590. area: str,
  591. boundary_gdf,
  592. raster_config_override: Optional[Dict[str, Any]] = None
  593. ) -> Dict[str, Any]:
  594. """
  595. 使用边界数据执行有效态Cd预测
  596. @param input_data: 输入数据DataFrame
  597. @param area: 地区名称
  598. @param boundary_gdf: 边界GeoDataFrame
  599. @param raster_config_override: 栅格配置覆盖参数
  600. @returns: 预测结果
  601. """
  602. try:
  603. # 在线程池中运行预测
  604. loop = asyncio.get_event_loop()
  605. result = await loop.run_in_executor(
  606. None,
  607. self._run_prediction_sync,
  608. input_data, area, boundary_gdf, raster_config_override
  609. )
  610. return result
  611. except Exception as e:
  612. self.logger.error(f"执行有效态Cd预测失败: {str(e)}")
  613. raise
  614. def _run_prediction_sync(
  615. self,
  616. input_data: pd.DataFrame,
  617. area: str,
  618. boundary_gdf,
  619. raster_config_override: Optional[Dict[str, Any]] = None
  620. ) -> Dict[str, Any]:
  621. """
  622. 同步执行预测(用于线程池)
  623. @param input_data: 输入数据DataFrame
  624. @param area: 地区名称
  625. @param boundary_gdf: 边界GeoDataFrame
  626. @param raster_config_override: 栅格配置覆盖参数
  627. @returns: 预测结果
  628. """
  629. try:
  630. # 使用预测引擎的predict_and_visualize方法
  631. result = self.prediction_service.engine.predict_and_visualize(
  632. input_data=input_data,
  633. model_type="effective_cd",
  634. county_name=area, # 使用area作为county_name
  635. boundary_gdf=boundary_gdf,
  636. raster_config_override=raster_config_override,
  637. save_raster=False # 不保存栅格文件,节省存储空间
  638. )
  639. return result
  640. except Exception as e:
  641. self.logger.error(f"同步预测执行失败: {str(e)}")
  642. raise