cd_flux_removal_service.py 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. """
  2. Cd通量移除计算服务
  3. @description: 提供籽粒移除和秸秆移除的Cd通量计算功能
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import logging
  8. import math
  9. import os
  10. import pandas as pd
  11. from datetime import datetime
  12. from typing import Dict, Any, List, Optional
  13. from sqlalchemy.orm import sessionmaker, Session
  14. from sqlalchemy import create_engine, and_
  15. from ..database import SessionLocal, engine
  16. from ..models import FluxCdInputData
  17. from ..models.parameters import Parameters
  18. from ..models.CropCd_output import CropCdOutputData
  19. from ..models.farmland import FarmlandData
  20. from ..utils.mapping_utils import MappingUtils
  21. from .admin_boundary_service import get_boundary_geojson_by_name, get_boundary_gdf_by_name
  22. import geopandas as gpd
  23. from shapely.geometry import shape, Point
  24. import tempfile
  25. import json
  26. class CdFluxRemovalService:
  27. """
  28. Cd通量移除计算服务类
  29. @description: 提供基于CropCd_output_data和Parameters表数据的籽粒移除和秸秆移除Cd通量计算功能
  30. """
  31. def __init__(self):
  32. """
  33. 初始化Cd通量移除服务
  34. """
  35. self.logger = logging.getLogger(__name__)
  36. # 严格匹配策略:不做名称变体或后缀映射
  37. def calculate_grain_removal_by_area(self, area: str, level: Optional[str] = None) -> Dict[str, Any]:
  38. """
  39. 根据地区计算籽粒移除Cd通量
  40. @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
  41. @returns: 计算结果字典
  42. 计算公式:籽粒移除(g/ha/a) = EXP(LnCropCd) * F11 * 0.5 * 15 / 1000
  43. """
  44. try:
  45. with SessionLocal() as db:
  46. # 参数固定使用"韶关",area参数仅用于地图边界
  47. parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
  48. if not parameter:
  49. return {
  50. "success": False,
  51. "message": f"未找到韶关地区的参数数据",
  52. "data": None
  53. }
  54. # 查询CropCd输出数据
  55. crop_cd_outputs = db.query(CropCdOutputData).all()
  56. if not crop_cd_outputs:
  57. return {
  58. "success": False,
  59. "message": f"未找到CropCd输出数据",
  60. "data": None
  61. }
  62. # 计算每个样点的籽粒移除Cd通量
  63. results = []
  64. for output in crop_cd_outputs:
  65. crop_cd_value = math.exp(output.ln_crop_cd) # EXP(LnCropCd)
  66. grain_removal = crop_cd_value * parameter.f11 * 0.5 * 15 / 1000
  67. results.append({
  68. "farmland_id": output.farmland_id,
  69. "sample_id": output.sample_id,
  70. "ln_crop_cd": output.ln_crop_cd,
  71. "crop_cd_value": crop_cd_value,
  72. "f11_yield": parameter.f11,
  73. "grain_removal_flux": grain_removal
  74. })
  75. # 计算统计信息
  76. flux_values = [r["grain_removal_flux"] for r in results]
  77. statistics = {
  78. "total_samples": len(results),
  79. "mean_flux": sum(flux_values) / len(flux_values),
  80. "max_flux": max(flux_values),
  81. "min_flux": min(flux_values)
  82. }
  83. return {
  84. "success": True,
  85. "message": f"地区 '{area}' 的籽粒移除Cd通量计算成功",
  86. "data": {
  87. "area": area,
  88. "calculation_type": "grain_removal",
  89. "formula": "EXP(LnCropCd) * F11 * 0.5 * 15 / 1000",
  90. "unit": "g/ha/a",
  91. "results": results,
  92. "statistics": statistics
  93. }
  94. }
  95. except Exception as e:
  96. self.logger.error(f"计算地区 '{area}' 的籽粒移除Cd通量失败: {str(e)}")
  97. return {
  98. "success": False,
  99. "message": f"计算失败: {str(e)}",
  100. "data": None
  101. }
  102. def calculate_straw_removal_by_area(self, area: str, level: Optional[str] = None) -> Dict[str, Any]:
  103. """
  104. 根据地区计算秸秆移除Cd通量
  105. @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
  106. @returns: 计算结果字典
  107. 计算公式:秸秆移除(g/ha/a) = [EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000
  108. """
  109. try:
  110. with SessionLocal() as db:
  111. # 参数固定使用"韶关",area参数仅用于地图边界
  112. parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
  113. if not parameter:
  114. return {
  115. "success": False,
  116. "message": f"未找到韶关地区的参数数据",
  117. "data": None
  118. }
  119. # 查询CropCd输出数据
  120. crop_cd_outputs = db.query(CropCdOutputData).all()
  121. if not crop_cd_outputs:
  122. return {
  123. "success": False,
  124. "message": f"未找到CropCd输出数据",
  125. "data": None
  126. }
  127. # 计算每个样点的秸秆移除Cd通量
  128. results = []
  129. for output in crop_cd_outputs:
  130. crop_cd_value = math.exp(output.ln_crop_cd) # EXP(LnCropCd)
  131. # 计算分母:EXP(LnCropCd)*0.76-0.0034
  132. denominator = crop_cd_value * 0.76 - 0.0034
  133. # 检查分母是否为零或负数,避免除零错误
  134. if denominator <= 0:
  135. self.logger.warning(f"样点 {output.farmland_id}-{output.sample_id} 的分母值为 {denominator},跳过计算")
  136. continue
  137. # 计算秸秆移除Cd通量
  138. straw_removal = (crop_cd_value / denominator) * parameter.f11 * 0.5 * 15 / 1000
  139. results.append({
  140. "farmland_id": output.farmland_id,
  141. "sample_id": output.sample_id,
  142. "ln_crop_cd": output.ln_crop_cd,
  143. "crop_cd_value": crop_cd_value,
  144. "denominator": denominator,
  145. "f11_yield": parameter.f11,
  146. "straw_removal_flux": straw_removal
  147. })
  148. if not results:
  149. return {
  150. "success": False,
  151. "message": "所有样点的计算都因分母值无效而失败",
  152. "data": None
  153. }
  154. # 计算统计信息
  155. flux_values = [r["straw_removal_flux"] for r in results]
  156. statistics = {
  157. "total_samples": len(results),
  158. "mean_flux": sum(flux_values) / len(flux_values),
  159. "max_flux": max(flux_values),
  160. "min_flux": min(flux_values)
  161. }
  162. return {
  163. "success": True,
  164. "message": f"地区 '{area}' 的秸秆移除Cd通量计算成功",
  165. "data": {
  166. "area": area,
  167. "calculation_type": "straw_removal",
  168. "formula": "[EXP(LnCropCd)/(EXP(LnCropCd)*0.76-0.0034)] * F11 * 0.5 * 15 / 1000",
  169. "unit": "g/ha/a",
  170. "results": results,
  171. "statistics": statistics
  172. }
  173. }
  174. except Exception as e:
  175. self.logger.error(f"计算地区 '{area}' 的秸秆移除Cd通量失败: {str(e)}")
  176. return {
  177. "success": False,
  178. "message": f"计算失败: {str(e)}",
  179. "data": None
  180. }
  181. def export_results_to_csv(self, results_data: Dict[str, Any], output_dir: str = "app/static/cd_flux") -> str:
  182. """
  183. 将计算结果导出为CSV文件
  184. @param results_data: 计算结果数据
  185. @param output_dir: 输出目录
  186. @returns: CSV文件路径
  187. """
  188. try:
  189. # 确保输出目录存在
  190. os.makedirs(output_dir, exist_ok=True)
  191. # 生成时间戳
  192. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  193. # 生成文件名
  194. calculation_type = results_data.get("calculation_type", "flux_removal")
  195. area = results_data.get("area", "unknown")
  196. filename = f"{calculation_type}_{area}_{timestamp}.csv"
  197. csv_path = os.path.join(output_dir, filename)
  198. # 转换为DataFrame
  199. results = results_data.get("results", [])
  200. if not results:
  201. raise ValueError("没有结果数据可导出")
  202. df = pd.DataFrame(results)
  203. # 保存CSV文件
  204. df.to_csv(csv_path, index=False, encoding='utf-8-sig')
  205. self.logger.info(f"✓ 成功导出结果到: {csv_path}")
  206. return csv_path
  207. except Exception as e:
  208. self.logger.error(f"导出CSV文件失败: {str(e)}")
  209. raise
  210. def get_coordinates_for_results(self, results_data: Dict[str, Any]) -> List[Dict[str, Any]]:
  211. """
  212. 获取结果数据对应的坐标信息
  213. @param results_data: 计算结果数据
  214. @returns: 包含坐标的结果列表
  215. """
  216. try:
  217. results = results_data.get("results", [])
  218. if not results:
  219. return []
  220. # 提取成对键,避免 N 次数据库查询
  221. farmland_sample_pairs = [(r["farmland_id"], r["sample_id"]) for r in results]
  222. with SessionLocal() as db:
  223. # 使用 farmland_id 分片查询,避免复合 IN 导致的兼容性与参数数量问题
  224. wanted_pairs = set(farmland_sample_pairs)
  225. unique_farmland_ids = sorted({fid for fid, _ in wanted_pairs})
  226. def chunk_list(items: List[int], chunk_size: int = 500) -> List[List[int]]:
  227. return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
  228. rows: List[FarmlandData] = []
  229. for id_chunk in chunk_list(unique_farmland_ids, 500):
  230. rows.extend(
  231. db.query(FarmlandData)
  232. .filter(FarmlandData.farmland_id.in_(id_chunk))
  233. .all()
  234. )
  235. pair_to_farmland = {
  236. (row.farmland_id, row.sample_id): row for row in rows
  237. }
  238. coordinates_results: List[Dict[str, Any]] = []
  239. for r in results:
  240. key = (r["farmland_id"], r["sample_id"])
  241. farmland = pair_to_farmland.get(key)
  242. if farmland is None:
  243. continue
  244. coord_result = {
  245. "farmland_id": r["farmland_id"],
  246. "sample_id": r["sample_id"],
  247. "longitude": farmland.lon,
  248. "latitude": farmland.lan,
  249. "flux_value": r.get("grain_removal_flux") or r.get("straw_removal_flux")
  250. }
  251. coord_result.update(r)
  252. coordinates_results.append(coord_result)
  253. self.logger.info(f"✓ 成功获取 {len(coordinates_results)} 个样点的坐标信息(分片批量查询)")
  254. return coordinates_results
  255. except Exception as e:
  256. self.logger.error(f"获取坐标信息失败: {str(e)}")
  257. raise
  258. def create_flux_visualization(self, area: str, calculation_type: str,
  259. results_with_coords: List[Dict[str, Any]],
  260. level: str = None,
  261. output_dir: str = "app/static/cd_flux",
  262. template_raster: str = "app/static/cd_flux/meanTemp.tif",
  263. boundary_shp: str = None,
  264. colormap: str = "green_yellow_red_purple",
  265. resolution_factor: float = 4.0,
  266. enable_interpolation: bool = False,
  267. cleanup_intermediate: bool = True) -> Dict[str, str]:
  268. """
  269. 创建Cd通量移除可视化图表
  270. @param area: 地区名称
  271. @param calculation_type: 计算类型(grain_removal 或 straw_removal)
  272. @param results_with_coords: 包含坐标的结果数据
  273. @param output_dir: 输出目录
  274. @param template_raster: 模板栅格文件路径
  275. @param boundary_shp: 边界shapefile路径
  276. @param colormap: 色彩方案
  277. @param resolution_factor: 分辨率因子
  278. @param enable_interpolation: 是否启用空间插值
  279. @returns: 生成的图片文件路径字典
  280. """
  281. try:
  282. if not results_with_coords:
  283. raise ValueError("没有包含坐标的结果数据")
  284. # 确保输出目录存在
  285. os.makedirs(output_dir, exist_ok=True)
  286. generated_files: Dict[str, str] = {}
  287. # 生成时间戳
  288. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  289. # 创建CSV文件用于绘图
  290. csv_filename = f"{calculation_type}_{area}_temp_{timestamp}.csv"
  291. csv_path = os.path.join(output_dir, csv_filename)
  292. # 准备绘图数据
  293. plot_data = []
  294. for result in results_with_coords:
  295. plot_data.append({
  296. "longitude": result["longitude"],
  297. "latitude": result["latitude"],
  298. "flux_value": result["flux_value"]
  299. })
  300. # 保存为CSV
  301. df = pd.DataFrame(plot_data)
  302. df.to_csv(csv_path, index=False, encoding='utf-8-sig')
  303. # 初始化绘图工具
  304. mapper = MappingUtils()
  305. # 生成输出文件路径
  306. map_output = os.path.join(output_dir, f"{calculation_type}_{area}_map_{timestamp}")
  307. histogram_output = os.path.join(output_dir, f"{calculation_type}_{area}_histogram_{timestamp}")
  308. # 检查模板文件是否存在
  309. if not os.path.exists(template_raster):
  310. self.logger.warning(f"模板栅格文件不存在: {template_raster}")
  311. template_raster = None
  312. # 动态获取边界数据(严格使用指定层级)
  313. if not level:
  314. raise ValueError("必须提供行政层级 level:county | city | province")
  315. # 优化:直接从数据库获取边界GeoDataFrame,避免创建临时shapefile文件
  316. # 这样可以减少磁盘I/O操作和临时文件管理的开销
  317. boundary_gdf = self._get_boundary_gdf_from_database(area, level)
  318. boundary_shp = None # 不再需要临时边界文件
  319. if boundary_gdf is None:
  320. self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
  321. else:
  322. # 在绘图前进行样点边界包含性统计
  323. try:
  324. if boundary_gdf is not None and len(boundary_gdf) > 0:
  325. boundary_union = boundary_gdf.unary_union
  326. total_points = len(results_with_coords)
  327. inside_count = 0
  328. outside_points: List[Dict[str, Any]] = []
  329. for r in results_with_coords:
  330. pt = Point(float(r["longitude"]), float(r["latitude"]))
  331. if boundary_union.contains(pt) or boundary_union.touches(pt):
  332. inside_count += 1
  333. else:
  334. outside_points.append({
  335. "farmland_id": r.get("farmland_id"),
  336. "sample_id": r.get("sample_id"),
  337. "longitude": r.get("longitude"),
  338. "latitude": r.get("latitude"),
  339. "flux_value": r.get("flux_value")
  340. })
  341. outside_count = total_points - inside_count
  342. inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
  343. self.logger.info(
  344. f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
  345. if outside_count > 0:
  346. sample_preview = outside_points[:10]
  347. self.logger.warning(
  348. f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
  349. # 在日志中打印边界检查统计结果
  350. self.logger.info(
  351. f"边界检查统计 - 地区: {area}, 层级: {level}, 计算类型: {calculation_type}, "
  352. f"总样点: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), "
  353. f"边界外: {outside_count}"
  354. )
  355. if outside_count > 0 and len(outside_points) > 0:
  356. sample_preview = outside_points[:5] # 只显示前5个样本
  357. self.logger.info(f"边界外样点示例(前5个): {sample_preview}")
  358. else:
  359. generated_files = {}
  360. except Exception as check_err:
  361. self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
  362. # 保持已有 generated_files,不覆盖
  363. # 创建shapefile
  364. shapefile_path = csv_path.replace('.csv', '_points.shp')
  365. mapper.csv_to_shapefile(csv_path, shapefile_path,
  366. lon_col='longitude', lat_col='latitude', value_col='flux_value')
  367. # 合并已生成文件映射
  368. generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
  369. # 如果有模板栅格文件,创建栅格地图
  370. if template_raster:
  371. try:
  372. # 创建栅格
  373. raster_path = csv_path.replace('.csv', '_raster.tif')
  374. raster_path, stats = mapper.vector_to_raster(
  375. shapefile_path, template_raster, raster_path, 'flux_value',
  376. resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
  377. interpolation_method='nearest', enable_interpolation=enable_interpolation
  378. )
  379. generated_files["raster"] = raster_path
  380. # 创建栅格地图 - 使用英文标题避免中文乱码
  381. title_mapping = {
  382. "grain_removal": "Grain Removal Cd Flux",
  383. "straw_removal": "Straw Removal Cd Flux"
  384. }
  385. map_title = title_mapping.get(calculation_type, "Cd Flux Removal")
  386. map_file = mapper.create_raster_map(
  387. boundary_shp if boundary_shp else None,
  388. raster_path,
  389. map_output,
  390. colormap=colormap,
  391. title=map_title,
  392. output_size=12,
  393. dpi=300,
  394. resolution_factor=4.0,
  395. enable_interpolation=False,
  396. interpolation_method='nearest',
  397. boundary_gdf=boundary_gdf
  398. )
  399. generated_files["map"] = map_file
  400. # 创建直方图 - 使用英文标题避免中文乱码
  401. histogram_title_mapping = {
  402. "grain_removal": "Grain Removal Cd Flux Distribution",
  403. "straw_removal": "Straw Removal Cd Flux Distribution"
  404. }
  405. histogram_title = histogram_title_mapping.get(calculation_type, "Cd Flux Distribution")
  406. histogram_file = mapper.create_histogram(
  407. raster_path,
  408. f"{histogram_output}.jpg",
  409. title=histogram_title,
  410. xlabel='Cd Flux (g/ha/a)',
  411. ylabel='Frequency Density'
  412. )
  413. generated_files["histogram"] = histogram_file
  414. except Exception as viz_error:
  415. self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
  416. # 即使栅格可视化失败,也返回已生成的文件
  417. # 清理中间文件(默认开启,仅保留最终可视化)
  418. if cleanup_intermediate:
  419. try:
  420. # 由于不再创建临时边界文件,所以传递None
  421. self._cleanup_intermediate_files(generated_files, None)
  422. except Exception as cleanup_err:
  423. self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
  424. self.logger.info(f"✓ 成功创建 {calculation_type} 可视化,生成文件: {list(generated_files.keys())}")
  425. return generated_files
  426. except Exception as e:
  427. self.logger.error(f"创建可视化失败: {str(e)}")
  428. raise
  429. def _cleanup_intermediate_files(self, generated_files: Dict[str, str], boundary_shp: Optional[str]) -> None:
  430. """
  431. 清理中间文件:CSV、Shapefile 及其配套文件、栅格TIFF;若边界为临时目录,则一并删除
  432. """
  433. import shutil
  434. import tempfile
  435. def _safe_remove(path: str) -> None:
  436. try:
  437. if path and os.path.exists(path) and os.path.isfile(path):
  438. os.remove(path)
  439. except Exception:
  440. pass
  441. # 删除 CSV
  442. _safe_remove(generated_files.get("csv"))
  443. # 删除栅格
  444. _safe_remove(generated_files.get("raster"))
  445. # 删除 Shapefile 全家桶
  446. shp_path = generated_files.get("shapefile")
  447. if shp_path:
  448. base, _ = os.path.splitext(shp_path)
  449. for ext in (".shp", ".shx", ".dbf", ".prj", ".cpg"):
  450. _safe_remove(base + ext)
  451. # 如果边界文件来自系统临时目录,删除其所在目录
  452. if boundary_shp:
  453. temp_root = tempfile.gettempdir()
  454. try:
  455. if os.path.commonprefix([os.path.abspath(boundary_shp), temp_root]) == temp_root:
  456. temp_dir = os.path.dirname(os.path.abspath(boundary_shp))
  457. if os.path.isdir(temp_dir):
  458. shutil.rmtree(temp_dir, ignore_errors=True)
  459. except Exception:
  460. pass
  461. def _get_boundary_file_for_area(self, area: str, level: str) -> Optional[str]:
  462. """
  463. 为指定地区获取边界文件
  464. @param area: 地区名称
  465. @returns: 边界文件路径或None
  466. """
  467. try:
  468. # 仅从数据库严格获取边界(按指定层级精确匹配)
  469. boundary_path = self._create_boundary_from_database(area, level)
  470. if boundary_path:
  471. return boundary_path
  472. # 如果都没有找到,记录警告但不使用默认文件
  473. self.logger.warning(f"未找到地区 '{area}' 的专用边界文件,也无法从数据库获取")
  474. return None
  475. except Exception as e:
  476. self.logger.error(f"获取边界文件失败: {str(e)}")
  477. return None
  478. def _get_boundary_gdf_from_database(self, area: str, level: str) -> Optional[gpd.GeoDataFrame]:
  479. """
  480. 直接从数据库获取边界数据作为GeoDataFrame
  481. @param area: 地区名称
  482. @param level: 行政层级
  483. @returns: 边界GeoDataFrame或None
  484. """
  485. try:
  486. with SessionLocal() as db:
  487. norm_area = area.strip()
  488. boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
  489. if boundary_gdf is not None:
  490. self.logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
  491. return boundary_gdf
  492. except Exception as e:
  493. self.logger.warning(f"从数据库获取边界数据失败: {str(e)}")
  494. return None
  495. def _create_boundary_from_database(self, area: str, level: str) -> Optional[str]:
  496. """
  497. 从数据库获取边界数据并创建临时shapefile
  498. @param area: 地区名称
  499. @returns: 临时边界文件路径或None
  500. @deprecated: 建议使用 _get_boundary_gdf_from_database 方法直接获取 GeoDataFrame
  501. """
  502. try:
  503. with SessionLocal() as db:
  504. norm_area = area.strip()
  505. boundary_geojson = get_boundary_geojson_by_name(db, norm_area, level=level)
  506. if boundary_geojson:
  507. geometry_obj = shape(boundary_geojson["geometry"])
  508. gdf = gpd.GeoDataFrame([boundary_geojson["properties"]], geometry=[geometry_obj], crs="EPSG:4326")
  509. temp_dir = tempfile.mkdtemp()
  510. boundary_path = os.path.join(temp_dir, f"{norm_area}_boundary.shp")
  511. gdf.to_file(boundary_path, driver="ESRI Shapefile")
  512. self.logger.info(f"从数据库创建边界文件: {boundary_path}")
  513. return boundary_path
  514. except Exception as e:
  515. self.logger.warning(f"从数据库创建边界文件失败: {str(e)}")
  516. return None
  517. def create_surface_runoff_visualization(self, area: str, level: str,
  518. output_dir: str = "app/static/surface_runoff",
  519. template_raster: str = "app/static/cd_flux/meanTemp.tif",
  520. boundary_shp: str = None,
  521. colormap: str = "green_yellow_red_purple",
  522. resolution_factor: float = 4.0,
  523. enable_interpolation: bool = False,
  524. cleanup_intermediate: bool = True) -> Dict[str, str]:
  525. """
  526. 创建地表径流Cd通量可视化图表
  527. @param area: 地区名称
  528. @param level: 行政层级
  529. @param output_dir: 输出目录
  530. @param template_raster: 模板栅格文件路径
  531. @param boundary_shp: 边界shapefile路径
  532. @param colormap: 色彩方案
  533. @param resolution_factor: 分辨率因子
  534. @param enable_interpolation: 是否启用空间插值
  535. @param cleanup_intermediate: 是否清理中间文件
  536. @returns: 生成的图片文件路径字典
  537. """
  538. try:
  539. # 从数据库获取地表径流数据
  540. results_with_coords = []
  541. with SessionLocal() as db:
  542. # 查询地表径流数据(DB_Cd)及对应坐标
  543. runoff_data = db.query(
  544. FarmlandData.farmland_id,
  545. FarmlandData.sample_id,
  546. FarmlandData.lon,
  547. FarmlandData.lan,
  548. FluxCdInputData.surface_runoff
  549. ).join(
  550. FluxCdInputData,
  551. (FluxCdInputData.farmland_id == FarmlandData.farmland_id) &
  552. (FluxCdInputData.sample_id == FarmlandData.sample_id)
  553. ).all()
  554. if not runoff_data:
  555. self.logger.warning("未找到地表径流数据")
  556. return {}
  557. for data in runoff_data:
  558. results_with_coords.append({
  559. "farmland_id": data.farmland_id,
  560. "sample_id": data.sample_id,
  561. "longitude": data.lon,
  562. "latitude": data.lan,
  563. "flux_value": data.surface_runoff
  564. })
  565. # 确保输出目录存在
  566. os.makedirs(output_dir, exist_ok=True)
  567. generated_files: Dict[str, str] = {}
  568. # 生成时间戳
  569. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  570. # 创建CSV文件用于绘图
  571. csv_filename = f"surface_runoff_{area}_temp_{timestamp}.csv"
  572. csv_path = os.path.join(output_dir, csv_filename)
  573. # 准备绘图数据
  574. plot_data = []
  575. for result in results_with_coords:
  576. plot_data.append({
  577. "longitude": result["longitude"],
  578. "latitude": result["latitude"],
  579. "flux_value": result["flux_value"]
  580. })
  581. # 保存为CSV
  582. df = pd.DataFrame(plot_data)
  583. df.to_csv(csv_path, index=False, encoding='utf-8-sig')
  584. # 初始化绘图工具
  585. mapper = MappingUtils()
  586. # 生成输出文件路径
  587. map_output = os.path.join(output_dir, f"surface_runoff_{area}_map_{timestamp}")
  588. histogram_output = os.path.join(output_dir, f"surface_runoff_{area}_histogram_{timestamp}")
  589. # 检查模板文件是否存在
  590. if not os.path.exists(template_raster):
  591. self.logger.warning(f"模板栅格文件不存在: {template_raster}")
  592. template_raster = None
  593. # 动态获取边界数据(严格使用指定层级)
  594. if not level:
  595. raise ValueError("必须提供行政层级 level:county | city | province")
  596. # 直接从数据库获取边界GeoDataFrame
  597. boundary_gdf = self._get_boundary_gdf_from_database(area, level)
  598. boundary_shp = None # 不再需要临时边界文件
  599. if boundary_gdf is None:
  600. self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
  601. else:
  602. # 在绘图前进行样点边界包含性统计
  603. try:
  604. if boundary_gdf is not None and len(boundary_gdf) > 0:
  605. boundary_union = boundary_gdf.unary_union
  606. total_points = len(results_with_coords)
  607. inside_count = 0
  608. outside_points: List[Dict[str, Any]] = []
  609. for r in results_with_coords:
  610. pt = Point(float(r["longitude"]), float(r["latitude"]))
  611. if boundary_union.contains(pt) or boundary_union.touches(pt):
  612. inside_count += 1
  613. else:
  614. outside_points.append({
  615. "farmland_id": r.get("farmland_id"),
  616. "sample_id": r.get("sample_id"),
  617. "longitude": r.get("longitude"),
  618. "latitude": r.get("latitude"),
  619. "flux_value": r.get("flux_value")
  620. })
  621. outside_count = total_points - inside_count
  622. inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
  623. self.logger.info(
  624. f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
  625. if outside_count > 0:
  626. sample_preview = outside_points[:10]
  627. self.logger.warning(
  628. f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
  629. except Exception as check_err:
  630. self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
  631. # 创建shapefile
  632. shapefile_path = csv_path.replace('.csv', '_points.shp')
  633. mapper.csv_to_shapefile(csv_path, shapefile_path,
  634. lon_col='longitude', lat_col='latitude', value_col='flux_value')
  635. # 合并已生成文件映射
  636. generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
  637. # 如果有模板栅格文件,创建栅格地图
  638. if template_raster:
  639. try:
  640. # 创建栅格
  641. raster_path = csv_path.replace('.csv', '_raster.tif')
  642. raster_path, stats = mapper.vector_to_raster(
  643. shapefile_path, template_raster, raster_path, 'flux_value',
  644. resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
  645. interpolation_method='nearest', enable_interpolation=enable_interpolation
  646. )
  647. generated_files["raster"] = raster_path
  648. # 创建栅格地图 - 使用英文标题避免中文乱码
  649. map_title = "Surface Runoff Cd Flux"
  650. map_file = mapper.create_raster_map(
  651. boundary_shp if boundary_shp else None,
  652. raster_path,
  653. map_output,
  654. colormap=colormap,
  655. title=map_title,
  656. output_size=12,
  657. dpi=300,
  658. resolution_factor=4.0,
  659. enable_interpolation=False,
  660. interpolation_method='nearest',
  661. boundary_gdf=boundary_gdf
  662. )
  663. generated_files["map"] = map_file
  664. # 创建直方图 - 使用英文标题避免中文乱码
  665. histogram_title = "Surface Runoff Cd Flux Distribution"
  666. histogram_file = mapper.create_histogram(
  667. raster_path,
  668. f"{histogram_output}.jpg",
  669. title=histogram_title,
  670. xlabel='Cd Flux (g/ha/a)',
  671. ylabel='Frequency Density'
  672. )
  673. generated_files["histogram"] = histogram_file
  674. except Exception as viz_error:
  675. self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
  676. # 即使栅格可视化失败,也返回已生成的文件
  677. # 清理中间文件(默认开启,仅保留最终可视化)
  678. if cleanup_intermediate:
  679. try:
  680. self._cleanup_intermediate_files(generated_files, None)
  681. except Exception as cleanup_err:
  682. self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
  683. self.logger.info(f"✓ 成功创建地表径流Cd通量可视化,生成文件: {list(generated_files.keys())}")
  684. return generated_files
  685. except Exception as e:
  686. self.logger.error(f"创建地表径流可视化失败: {str(e)}")
  687. raise
  688. def get_groundwater_leaching_data(self, area: str) -> List[Dict[str, Any]]:
  689. """
  690. 获取地下渗流(地下水渗漏)通量数据及坐标
  691. @param area: 地区名称
  692. @returns: 包含坐标和通量值的结果列表
  693. """
  694. try:
  695. with SessionLocal() as db:
  696. # 查询地下渗流数据及对应坐标
  697. leaching_data = db.query(
  698. FluxCdInputData.groundwater_leaching,
  699. FarmlandData.farmland_id,
  700. FarmlandData.sample_id,
  701. FarmlandData.lon,
  702. FarmlandData.lan
  703. ).join(
  704. FarmlandData,
  705. and_(
  706. FluxCdInputData.farmland_id == FarmlandData.farmland_id,
  707. FluxCdInputData.sample_id == FarmlandData.sample_id
  708. )
  709. ).all()
  710. if not leaching_data:
  711. self.logger.warning(f"未找到地区 '{area}' 的地下渗流数据")
  712. return []
  713. results = []
  714. for data in leaching_data:
  715. results.append({
  716. "farmland_id": data.farmland_id,
  717. "sample_id": data.sample_id,
  718. "longitude": data.lon,
  719. "latitude": data.lan,
  720. "flux_value": data.groundwater_leaching
  721. })
  722. self.logger.info(f"成功获取 {len(results)} 个样点的地下渗流数据")
  723. return results
  724. except Exception as e:
  725. self.logger.error(f"获取地下渗流数据失败: {str(e)}")
  726. raise
  727. def create_groundwater_leaching_visualization(self, area: str, level: str,
  728. output_dir: str = "app/static/groundwater_leaching",
  729. template_raster: str = "app/static/cd_flux/meanTemp.tif",
  730. colormap: str = "blues",
  731. resolution_factor: float = 4.0,
  732. enable_interpolation: bool = False,
  733. cleanup_intermediate: bool = True) -> Dict[str, str]:
  734. """
  735. 创建地下渗流(地下水渗漏)通量可视化图表
  736. @param area: 地区名称
  737. @param level: 行政层级
  738. @param output_dir: 输出目录
  739. @param template_raster: 模板栅格文件路径
  740. @param colormap: 色彩方案
  741. @param resolution_factor: 分辨率因子
  742. @param enable_interpolation: 是否启用空间插值
  743. @param cleanup_intermediate: 是否清理中间文件
  744. @returns: 生成的图片文件路径字典
  745. """
  746. try:
  747. # 获取地下渗流数据
  748. leaching_results = self.get_groundwater_leaching_data(area)
  749. if not leaching_results:
  750. return {
  751. "success": False,
  752. "message": f"没有找到地区 '{area}' 的地下渗流数据",
  753. "data": None
  754. }
  755. # 确保输出目录存在
  756. os.makedirs(output_dir, exist_ok=True)
  757. generated_files: Dict[str, str] = {}
  758. # 生成时间戳
  759. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  760. # 创建CSV文件用于绘图
  761. csv_filename = f"groundwater_leaching_{area}_temp_{timestamp}.csv"
  762. csv_path = os.path.join(output_dir, csv_filename)
  763. # 准备绘图数据
  764. plot_data = []
  765. for result in leaching_results:
  766. plot_data.append({
  767. "longitude": result["longitude"],
  768. "latitude": result["latitude"],
  769. "flux_value": result["flux_value"]
  770. })
  771. # 保存为CSV
  772. df = pd.DataFrame(plot_data)
  773. df.to_csv(csv_path, index=False, encoding='utf-8-sig')
  774. # 初始化绘图工具
  775. mapper = MappingUtils()
  776. # 生成输出文件路径
  777. map_output = os.path.join(output_dir, f"groundwater_leaching_{area}_map_{timestamp}")
  778. histogram_output = os.path.join(output_dir, f"groundwater_leaching_{area}_histogram_{timestamp}")
  779. # 检查模板文件是否存在
  780. if not os.path.exists(template_raster):
  781. self.logger.warning(f"模板栅格文件不存在: {template_raster}")
  782. template_raster = None
  783. # 直接从数据库获取边界GeoDataFrame
  784. boundary_gdf = self._get_boundary_gdf_from_database(area, level)
  785. boundary_shp = None # 不再需要临时边界文件
  786. # 在绘图前进行样点边界包含性统计
  787. if boundary_gdf is not None and len(boundary_gdf) > 0:
  788. boundary_union = boundary_gdf.unary_union
  789. total_points = len(leaching_results)
  790. inside_count = 0
  791. for r in leaching_results:
  792. pt = Point(float(r["longitude"]), float(r["latitude"]))
  793. if boundary_union.contains(pt) or boundary_union.touches(pt):
  794. inside_count += 1
  795. outside_count = total_points - inside_count
  796. inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
  797. self.logger.info(
  798. f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
  799. # 创建shapefile
  800. shapefile_path = csv_path.replace('.csv', '_points.shp')
  801. mapper.csv_to_shapefile(csv_path, shapefile_path,
  802. lon_col='longitude', lat_col='latitude', value_col='flux_value')
  803. # 合并已生成文件映射
  804. generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
  805. # 如果有模板栅格文件,创建栅格地图
  806. if template_raster:
  807. try:
  808. # 创建栅格
  809. raster_path = csv_path.replace('.csv', '_raster.tif')
  810. raster_path, stats = mapper.vector_to_raster(
  811. shapefile_path, template_raster, raster_path, 'flux_value',
  812. resolution_factor=resolution_factor, boundary_gdf=boundary_gdf,
  813. interpolation_method='nearest', enable_interpolation=enable_interpolation
  814. )
  815. generated_files["raster"] = raster_path
  816. # 创建栅格地图 - 使用特定标题
  817. map_title = "Groundwater Leaching Cd Flux"
  818. map_file = mapper.create_raster_map(
  819. boundary_shp,
  820. raster_path,
  821. map_output,
  822. colormap=colormap,
  823. title=map_title,
  824. output_size=12,
  825. dpi=300,
  826. resolution_factor=4.0,
  827. enable_interpolation=False,
  828. interpolation_method='nearest',
  829. boundary_gdf=boundary_gdf
  830. )
  831. generated_files["map"] = map_file
  832. # 创建直方图 - 使用特定标题
  833. histogram_title = "Groundwater Leaching Cd Flux Distribution"
  834. histogram_file = mapper.create_histogram(
  835. raster_path,
  836. f"{histogram_output}.jpg",
  837. title=histogram_title,
  838. xlabel='Cd Flux (g/ha/a)',
  839. ylabel='Frequency Density'
  840. )
  841. generated_files["histogram"] = histogram_file
  842. except Exception as viz_error:
  843. self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
  844. # 清理中间文件
  845. if cleanup_intermediate:
  846. try:
  847. self._cleanup_intermediate_files(generated_files, None)
  848. except Exception as cleanup_err:
  849. self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
  850. self.logger.info(f"✓ 成功创建地下渗流通量可视化,生成文件: {list(generated_files.keys())}")
  851. return generated_files
  852. except Exception as e:
  853. self.logger.error(f"创建地下渗流可视化失败: {str(e)}")
  854. return {
  855. "success": False,
  856. "message": f"可视化创建失败: {str(e)}",
  857. "data": None
  858. }