agricultural_input_service.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765
  1. """
  2. 农业投入Cd通量计算服务
  3. @description: 根据Parameters表中的数据计算各个地区的农业投入输入Cd通量预测
  4. @author: AcidMap Team
  5. @version: 1.0.0
  6. """
  7. import logging
  8. import math
  9. import os
  10. import pandas as pd
  11. import numpy as np
  12. from datetime import datetime
  13. from typing import Dict, Any, List, Optional
  14. from sqlalchemy.orm import sessionmaker, Session
  15. from sqlalchemy import create_engine, and_
  16. from ..database import SessionLocal, engine
  17. from ..models.parameters import Parameters
  18. from ..models.farmland import FarmlandData
  19. from ..utils.mapping_utils import MappingUtils
  20. from .admin_boundary_service import get_boundary_geojson_by_name, get_boundary_gdf_by_name
  21. import geopandas as gpd
  22. from shapely.geometry import shape, Point
  23. import tempfile
  24. import json
  25. class AgriculturalInputService:
  26. """
  27. 农业投入Cd通量计算服务类
  28. @description: 提供基于Parameters表数据的农业投入输入Cd通量预测计算功能
  29. """
  30. def __init__(self):
  31. """
  32. 初始化农业投入服务
  33. """
  34. self.logger = logging.getLogger(__name__)
  35. def calculate_cd_flux_by_area(self, area: str) -> Dict[str, Any]:
  36. """
  37. 根据地区计算农业投入输入Cd通量
  38. @param area: 地区名称
  39. @returns: 计算结果
  40. """
  41. try:
  42. with SessionLocal() as db:
  43. # 查询指定地区的参数
  44. parameter = db.query(Parameters).filter(Parameters.area == area).first()
  45. if not parameter:
  46. return {
  47. "success": False,
  48. "message": f"未找到地区 '{area}' 的参数数据",
  49. "data": None
  50. }
  51. # 计算农业投入输入Cd通量
  52. # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
  53. cd_flux = (
  54. parameter.f3 * parameter.nf + # 氮肥
  55. parameter.f4 * parameter.pf + # 磷肥
  56. parameter.f5 * parameter.kf + # 钾肥
  57. parameter.f6 * parameter.cf + # 复合肥
  58. parameter.f7 * parameter.of + # 有机肥
  59. parameter.f8 * parameter.p + # 农药
  60. parameter.f9 * parameter.ff + # 农家肥
  61. parameter.f10 * parameter.af # 农膜
  62. )
  63. # 计算各项明细
  64. details = {
  65. "nitrogen_fertilizer": parameter.f3 * parameter.nf, # 氮肥贡献
  66. "phosphorus_fertilizer": parameter.f4 * parameter.pf, # 磷肥贡献
  67. "potassium_fertilizer": parameter.f5 * parameter.kf, # 钾肥贡献
  68. "compound_fertilizer": parameter.f6 * parameter.cf, # 复合肥贡献
  69. "organic_fertilizer": parameter.f7 * parameter.of, # 有机肥贡献
  70. "pesticide": parameter.f8 * parameter.p, # 农药贡献
  71. "farmyard_manure": parameter.f9 * parameter.ff, # 农家肥贡献
  72. "agricultural_film": parameter.f10 * parameter.af # 农膜贡献
  73. }
  74. return {
  75. "success": True,
  76. "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量",
  77. "data": {
  78. "area": area,
  79. "total_cd_flux": round(cd_flux, 6),
  80. "unit": "g/ha/a",
  81. "details": {key: round(value, 6) for key, value in details.items()},
  82. "parameters_used": {
  83. "f3_nitrogen_cd_content": parameter.f3,
  84. "f4_phosphorus_cd_content": parameter.f4,
  85. "f5_potassium_cd_content": parameter.f5,
  86. "f6_compound_cd_content": parameter.f6,
  87. "f7_organic_cd_content": parameter.f7,
  88. "f8_pesticide_cd_content": parameter.f8,
  89. "f9_farmyard_cd_content": parameter.f9,
  90. "f10_film_cd_content": parameter.f10,
  91. "nf_nitrogen_usage": parameter.nf,
  92. "pf_phosphorus_usage": parameter.pf,
  93. "kf_potassium_usage": parameter.kf,
  94. "cf_compound_usage": parameter.cf,
  95. "of_organic_usage": parameter.of,
  96. "p_pesticide_usage": parameter.p,
  97. "ff_farmyard_usage": parameter.ff,
  98. "af_film_usage": parameter.af
  99. }
  100. }
  101. }
  102. except Exception as e:
  103. self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
  104. return {
  105. "success": False,
  106. "message": f"计算失败: {str(e)}",
  107. "data": None
  108. }
  109. def calculate_all_areas_cd_flux(self) -> Dict[str, Any]:
  110. """
  111. 计算所有地区的农业投入输入Cd通量
  112. @returns: 所有地区的计算结果
  113. """
  114. try:
  115. with SessionLocal() as db:
  116. # 查询所有参数记录
  117. parameters = db.query(Parameters).all()
  118. if not parameters:
  119. return {
  120. "success": False,
  121. "message": "数据库中未找到任何参数数据",
  122. "data": None
  123. }
  124. results = []
  125. for param in parameters:
  126. # 计算每个地区的Cd通量
  127. cd_flux = (
  128. param.f3 * param.nf +
  129. param.f4 * param.pf +
  130. param.f5 * param.kf +
  131. param.f6 * param.cf +
  132. param.f7 * param.of +
  133. param.f8 * param.p +
  134. param.f9 * param.ff +
  135. param.f10 * param.af
  136. )
  137. details = {
  138. "nitrogen_fertilizer": param.f3 * param.nf,
  139. "phosphorus_fertilizer": param.f4 * param.pf,
  140. "potassium_fertilizer": param.f5 * param.kf,
  141. "compound_fertilizer": param.f6 * param.cf,
  142. "organic_fertilizer": param.f7 * param.of,
  143. "pesticide": param.f8 * param.p,
  144. "farmyard_manure": param.f9 * param.ff,
  145. "agricultural_film": param.f10 * param.af
  146. }
  147. result_item = {
  148. "area": param.area,
  149. "total_cd_flux": round(cd_flux, 6),
  150. "details": {key: round(value, 6) for key, value in details.items()}
  151. }
  152. results.append(result_item)
  153. # 按总通量排序
  154. results.sort(key=lambda x: x["total_cd_flux"], reverse=True)
  155. return {
  156. "success": True,
  157. "message": f"成功计算所有 {len(results)} 个地区的农业投入输入Cd通量",
  158. "data": {
  159. "total_areas": len(results),
  160. "unit": "g/ha/a",
  161. "results": results,
  162. "summary": {
  163. "max_cd_flux": max(results, key=lambda x: x["total_cd_flux"]) if results else None,
  164. "min_cd_flux": min(results, key=lambda x: x["total_cd_flux"]) if results else None,
  165. "average_cd_flux": round(sum(r["total_cd_flux"] for r in results) / len(results), 6) if results else 0
  166. }
  167. }
  168. }
  169. except Exception as e:
  170. self.logger.error(f"计算所有地区Cd通量时发生错误: {str(e)}")
  171. return {
  172. "success": False,
  173. "message": f"计算失败: {str(e)}",
  174. "data": None
  175. }
  176. def get_available_areas(self) -> Dict[str, Any]:
  177. """
  178. 获取数据库中所有可用的地区列表
  179. @returns: 可用地区列表
  180. """
  181. try:
  182. with SessionLocal() as db:
  183. # 查询所有不重复的地区
  184. areas = db.query(Parameters.area).distinct().all()
  185. area_list = [area[0] for area in areas if area[0]]
  186. return {
  187. "success": True,
  188. "message": f"成功获取 {len(area_list)} 个可用地区",
  189. "data": {
  190. "areas": sorted(area_list),
  191. "total_count": len(area_list)
  192. }
  193. }
  194. except Exception as e:
  195. self.logger.error(f"获取可用地区列表时发生错误: {str(e)}")
  196. return {
  197. "success": False,
  198. "message": f"获取失败: {str(e)}",
  199. "data": None
  200. }
  201. def calculate_cd_flux_with_custom_data(self, request) -> Dict[str, Any]:
  202. """
  203. 使用用户提供的自定义数据计算农业投入输入Cd通量
  204. @param request: 包含计算参数的请求对象
  205. @returns: 计算结果
  206. """
  207. try:
  208. # 进行参数验证
  209. if not self._validate_calculation_parameters(request):
  210. return {
  211. "success": False,
  212. "message": "提供的参数数据不完整或无效",
  213. "data": None
  214. }
  215. # 计算农业投入输入Cd通量
  216. # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
  217. cd_flux = (
  218. request.f3_nitrogen_cd_content * request.nf_nitrogen_usage + # 氮肥
  219. request.f4_phosphorus_cd_content * request.pf_phosphorus_usage + # 磷肥
  220. request.f5_potassium_cd_content * request.kf_potassium_usage + # 钾肥
  221. request.f6_compound_cd_content * request.cf_compound_usage + # 复合肥
  222. request.f7_organic_cd_content * request.of_organic_usage + # 有机肥
  223. request.f8_pesticide_cd_content * request.p_pesticide_usage + # 农药
  224. request.f9_farmyard_cd_content * request.ff_farmyard_usage + # 农家肥
  225. request.f10_film_cd_content * request.af_film_usage # 农膜
  226. )
  227. # 计算各项明细贡献
  228. details = {
  229. "nitrogen_fertilizer": request.f3_nitrogen_cd_content * request.nf_nitrogen_usage,
  230. "phosphorus_fertilizer": request.f4_phosphorus_cd_content * request.pf_phosphorus_usage,
  231. "potassium_fertilizer": request.f5_potassium_cd_content * request.kf_potassium_usage,
  232. "compound_fertilizer": request.f6_compound_cd_content * request.cf_compound_usage,
  233. "organic_fertilizer": request.f7_organic_cd_content * request.of_organic_usage,
  234. "pesticide": request.f8_pesticide_cd_content * request.p_pesticide_usage,
  235. "farmyard_manure": request.f9_farmyard_cd_content * request.ff_farmyard_usage,
  236. "agricultural_film": request.f10_film_cd_content * request.af_film_usage
  237. }
  238. # 构建返回结果
  239. result_data = {
  240. "total_cd_flux": round(cd_flux, 6),
  241. "unit": "g/ha/a",
  242. "details": {key: round(value, 6) for key, value in details.items()},
  243. "input_parameters": {
  244. "cd_contents": {
  245. "f3_nitrogen_cd_content": request.f3_nitrogen_cd_content,
  246. "f4_phosphorus_cd_content": request.f4_phosphorus_cd_content,
  247. "f5_potassium_cd_content": request.f5_potassium_cd_content,
  248. "f6_compound_cd_content": request.f6_compound_cd_content,
  249. "f7_organic_cd_content": request.f7_organic_cd_content,
  250. "f8_pesticide_cd_content": request.f8_pesticide_cd_content,
  251. "f9_farmyard_cd_content": request.f9_farmyard_cd_content,
  252. "f10_film_cd_content": request.f10_film_cd_content
  253. },
  254. "usage_amounts": {
  255. "nf_nitrogen_usage": request.nf_nitrogen_usage,
  256. "pf_phosphorus_usage": request.pf_phosphorus_usage,
  257. "kf_potassium_usage": request.kf_potassium_usage,
  258. "cf_compound_usage": request.cf_compound_usage,
  259. "of_organic_usage": request.of_organic_usage,
  260. "p_pesticide_usage": request.p_pesticide_usage,
  261. "ff_farmyard_usage": request.ff_farmyard_usage,
  262. "af_film_usage": request.af_film_usage
  263. }
  264. }
  265. }
  266. # 如果有描述信息,添加到结果中
  267. if request.description:
  268. result_data["description"] = request.description
  269. return {
  270. "success": True,
  271. "message": "成功计算农业投入输入Cd通量",
  272. "data": result_data
  273. }
  274. except Exception as e:
  275. self.logger.error(f"使用自定义数据计算Cd通量时发生错误: {str(e)}")
  276. return {
  277. "success": False,
  278. "message": f"计算失败: {str(e)}",
  279. "data": None
  280. }
  281. def _validate_calculation_parameters(self, request) -> bool:
  282. """
  283. 验证计算参数的有效性
  284. @param request: 请求对象
  285. @returns: 验证是否通过
  286. """
  287. try:
  288. # 检查所有必需的参数是否存在且为非负数
  289. required_attrs = [
  290. 'f3_nitrogen_cd_content', 'f4_phosphorus_cd_content', 'f5_potassium_cd_content',
  291. 'f6_compound_cd_content', 'f7_organic_cd_content', 'f8_pesticide_cd_content',
  292. 'f9_farmyard_cd_content', 'f10_film_cd_content', 'nf_nitrogen_usage',
  293. 'pf_phosphorus_usage', 'kf_potassium_usage', 'cf_compound_usage',
  294. 'of_organic_usage', 'p_pesticide_usage', 'ff_farmyard_usage', 'af_film_usage'
  295. ]
  296. for attr in required_attrs:
  297. if not hasattr(request, attr):
  298. self.logger.warning(f"缺少必需参数: {attr}")
  299. return False
  300. value = getattr(request, attr)
  301. if value is None or value < 0:
  302. self.logger.warning(f"参数 {attr} 无效: {value}")
  303. return False
  304. return True
  305. except Exception as e:
  306. self.logger.error(f"验证参数时发生错误: {str(e)}")
  307. return False
  308. def calculate_cd_flux_for_visualization(self, area: str) -> Dict[str, Any]:
  309. """
  310. 专门用于可视化的农业投入Cd通量计算(参数固定使用韶关)
  311. @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
  312. @returns: 计算结果
  313. """
  314. try:
  315. with SessionLocal() as db:
  316. # 参数固定使用"韶关",area参数仅用于地图边界
  317. parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
  318. if not parameter:
  319. return {
  320. "success": False,
  321. "message": f"未找到韶关地区的参数数据",
  322. "data": None
  323. }
  324. # 计算农业投入输入Cd通量
  325. # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
  326. cd_flux = (
  327. parameter.f3 * parameter.nf + # 氮肥
  328. parameter.f4 * parameter.pf + # 磷肥
  329. parameter.f5 * parameter.kf + # 钾肥
  330. parameter.f6 * parameter.cf + # 复合肥
  331. parameter.f7 * parameter.of + # 有机肥
  332. parameter.f8 * parameter.p + # 农药
  333. parameter.f9 * parameter.ff + # 农家肥
  334. parameter.f10 * parameter.af # 农膜
  335. )
  336. # 计算各项明细
  337. details = {
  338. "nitrogen_fertilizer": parameter.f3 * parameter.nf, # 氮肥贡献
  339. "phosphorus_fertilizer": parameter.f4 * parameter.pf, # 磷肥贡献
  340. "potassium_fertilizer": parameter.f5 * parameter.kf, # 钾肥贡献
  341. "compound_fertilizer": parameter.f6 * parameter.cf, # 复合肥贡献
  342. "organic_fertilizer": parameter.f7 * parameter.of, # 有机肥贡献
  343. "pesticide": parameter.f8 * parameter.p, # 农药贡献
  344. "farmyard_manure": parameter.f9 * parameter.ff, # 农家肥贡献
  345. "agricultural_film": parameter.f10 * parameter.af # 农膜贡献
  346. }
  347. return {
  348. "success": True,
  349. "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量(使用韶关参数)",
  350. "data": {
  351. "area": area,
  352. "total_cd_flux": round(cd_flux, 6),
  353. "unit": "g/ha/a",
  354. "details": {key: round(value, 6) for key, value in details.items()},
  355. "parameters_used": {
  356. "f3_nitrogen_cd_content": parameter.f3,
  357. "f4_phosphorus_cd_content": parameter.f4,
  358. "f5_potassium_cd_content": parameter.f5,
  359. "f6_compound_cd_content": parameter.f6,
  360. "f7_organic_cd_content": parameter.f7,
  361. "f8_pesticide_cd_content": parameter.f8,
  362. "f9_farmyard_cd_content": parameter.f9,
  363. "f10_film_cd_content": parameter.f10,
  364. "nf_nitrogen_usage": parameter.nf,
  365. "pf_phosphorus_usage": parameter.pf,
  366. "kf_potassium_usage": parameter.kf,
  367. "cf_compound_usage": parameter.cf,
  368. "of_organic_usage": parameter.of,
  369. "p_pesticide_usage": parameter.p,
  370. "ff_farmyard_usage": parameter.ff,
  371. "af_film_usage": parameter.af
  372. }
  373. }
  374. }
  375. except Exception as e:
  376. self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
  377. return {
  378. "success": False,
  379. "message": f"计算失败: {str(e)}",
  380. "data": None
  381. }
  382. def get_coordinates_for_results(self, results_data: Dict[str, Any], area: str) -> List[Dict[str, Any]]:
  383. """
  384. 获取农业投入计算结果对应的坐标信息
  385. @param results_data: 计算结果数据
  386. @param area: 地区名称
  387. @returns: 包含坐标的结果列表
  388. """
  389. try:
  390. # 农业投入计算只有一个结果值,需要与所有农田数据关联
  391. total_cd_flux = results_data.get("total_cd_flux", 0)
  392. with SessionLocal() as db:
  393. # 查询所有农田数据获取坐标
  394. farmland_data = db.query(FarmlandData).all()
  395. if not farmland_data:
  396. self.logger.warning("未找到农田坐标数据")
  397. return []
  398. coordinates_results = []
  399. for farmland in farmland_data:
  400. coord_result = {
  401. "farmland_id": farmland.farmland_id,
  402. "sample_id": farmland.sample_id,
  403. "longitude": farmland.lon,
  404. "latitude": farmland.lan,
  405. "flux_value": total_cd_flux, # 所有点使用相同的农业投入Cd通量值
  406. "area": area
  407. }
  408. coordinates_results.append(coord_result)
  409. self.logger.info(f"✓ 成功获取 {len(coordinates_results)} 个农田样点的坐标信息,农业投入Cd通量: {total_cd_flux}")
  410. return coordinates_results
  411. except Exception as e:
  412. self.logger.error(f"获取坐标信息失败: {str(e)}")
  413. raise
  414. def export_results_to_csv(self, results_data: Dict[str, Any], output_dir: str = "app/static/agricultural_input") -> str:
  415. """
  416. 将农业投入计算结果导出为CSV文件
  417. @param results_data: 计算结果数据
  418. @param output_dir: 输出目录
  419. @returns: CSV文件路径
  420. """
  421. try:
  422. # 确保输出目录存在
  423. os.makedirs(output_dir, exist_ok=True)
  424. # 生成时间戳
  425. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  426. # 生成文件名
  427. area = results_data.get("area", "unknown")
  428. filename = f"agricultural_input_{area}_{timestamp}.csv"
  429. csv_path = os.path.join(output_dir, filename)
  430. # 准备导出数据
  431. export_data = {
  432. "area": results_data.get("area"),
  433. "total_cd_flux": results_data.get("total_cd_flux"),
  434. "unit": results_data.get("unit"),
  435. "nitrogen_fertilizer": results_data["details"]["nitrogen_fertilizer"],
  436. "phosphorus_fertilizer": results_data["details"]["phosphorus_fertilizer"],
  437. "potassium_fertilizer": results_data["details"]["potassium_fertilizer"],
  438. "compound_fertilizer": results_data["details"]["compound_fertilizer"],
  439. "organic_fertilizer": results_data["details"]["organic_fertilizer"],
  440. "pesticide": results_data["details"]["pesticide"],
  441. "farmyard_manure": results_data["details"]["farmyard_manure"],
  442. "agricultural_film": results_data["details"]["agricultural_film"]
  443. }
  444. # 转换为DataFrame
  445. df = pd.DataFrame([export_data])
  446. # 保存CSV文件
  447. df.to_csv(csv_path, index=False, encoding='utf-8-sig')
  448. self.logger.info(f"✓ 成功导出农业投入结果到: {csv_path}")
  449. return csv_path
  450. except Exception as e:
  451. self.logger.error(f"导出CSV文件失败: {str(e)}")
  452. raise
  453. def create_agricultural_input_visualization(self, area: str, calculation_type: str,
  454. results_with_coords: List[Dict[str, Any]],
  455. level: str = None,
  456. output_dir: str = "app/static/agricultural_input",
  457. template_raster: str = "app/static/cd_flux/meanTemp.tif",
  458. boundary_shp: str = None,
  459. colormap: str = "green_yellow_red_purple",
  460. resolution_factor: float = 4.0,
  461. enable_interpolation: bool = False,
  462. cleanup_intermediate: bool = True) -> Dict[str, str]:
  463. """
  464. 创建农业投入Cd通量可视化图表
  465. @param area: 地区名称
  466. @param calculation_type: 计算类型(agricultural_input)
  467. @param results_with_coords: 包含坐标的结果数据
  468. @param level: 行政层级
  469. @param output_dir: 输出目录
  470. @param template_raster: 模板栅格文件路径
  471. @param boundary_shp: 边界shapefile路径
  472. @param colormap: 色彩方案
  473. @param resolution_factor: 分辨率因子
  474. @param enable_interpolation: 是否启用空间插值
  475. @param cleanup_intermediate: 是否清理中间文件
  476. @returns: 生成的图片文件路径字典
  477. """
  478. try:
  479. if not results_with_coords:
  480. raise ValueError("没有包含坐标的结果数据")
  481. # 确保输出目录存在
  482. os.makedirs(output_dir, exist_ok=True)
  483. generated_files: Dict[str, str] = {}
  484. # 生成时间戳
  485. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  486. # 创建CSV文件用于绘图
  487. csv_filename = f"agricultural_input_{area}_temp_{timestamp}.csv"
  488. csv_path = os.path.join(output_dir, csv_filename)
  489. # 准备绘图数据
  490. plot_data = []
  491. for result in results_with_coords:
  492. plot_data.append({
  493. "longitude": result["longitude"],
  494. "latitude": result["latitude"],
  495. "flux_value": result["flux_value"]
  496. })
  497. # 保存为CSV
  498. df = pd.DataFrame(plot_data)
  499. df.to_csv(csv_path, index=False, encoding='utf-8-sig')
  500. # 初始化绘图工具
  501. mapper = MappingUtils()
  502. # 生成输出文件路径
  503. map_output = os.path.join(output_dir, f"agricultural_input_{area}_map_{timestamp}")
  504. histogram_output = os.path.join(output_dir, f"agricultural_input_{area}_histogram_{timestamp}")
  505. # 检查模板文件是否存在
  506. if not os.path.exists(template_raster):
  507. self.logger.warning(f"模板栅格文件不存在: {template_raster}")
  508. template_raster = None
  509. # 动态获取边界数据(严格使用指定层级)
  510. if not level:
  511. raise ValueError("必须提供行政层级 level:county | city | province")
  512. # 直接从数据库获取边界GeoDataFrame
  513. boundary_gdf = self._get_boundary_gdf_from_database(area, level)
  514. boundary_shp = None # 不再需要临时边界文件
  515. if boundary_gdf is None:
  516. self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
  517. else:
  518. # 在绘图前进行样点边界包含性统计
  519. try:
  520. if boundary_gdf is not None and len(boundary_gdf) > 0:
  521. boundary_union = boundary_gdf.unary_union
  522. total_points = len(results_with_coords)
  523. inside_count = 0
  524. outside_points: List[Dict[str, Any]] = []
  525. for r in results_with_coords:
  526. pt = Point(float(r["longitude"]), float(r["latitude"]))
  527. if boundary_union.contains(pt) or boundary_union.touches(pt):
  528. inside_count += 1
  529. else:
  530. outside_points.append({
  531. "farmland_id": r.get("farmland_id"),
  532. "sample_id": r.get("sample_id"),
  533. "longitude": r.get("longitude"),
  534. "latitude": r.get("latitude"),
  535. "flux_value": r.get("flux_value")
  536. })
  537. outside_count = total_points - inside_count
  538. inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
  539. self.logger.info(
  540. f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
  541. if outside_count > 0:
  542. sample_preview = outside_points[:10]
  543. self.logger.warning(
  544. f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
  545. # 在日志中打印边界检查统计结果
  546. self.logger.info(
  547. f"边界检查统计 - 地区: {area}, 层级: {level}, 计算类型: {calculation_type}, "
  548. f"总样点: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), "
  549. f"边界外: {outside_count}"
  550. )
  551. if outside_count > 0 and len(outside_points) > 0:
  552. sample_preview = outside_points[:5] # 只显示前5个样本
  553. self.logger.info(f"边界外样点示例(前5个): {sample_preview}")
  554. else:
  555. generated_files = {}
  556. except Exception as check_err:
  557. self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
  558. # 创建shapefile
  559. shapefile_path = csv_path.replace('.csv', '_points.shp')
  560. mapper.csv_to_shapefile(csv_path, shapefile_path,
  561. lon_col='longitude', lat_col='latitude', value_col='flux_value')
  562. # 合并已生成文件映射
  563. generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
  564. # 如果有模板栅格文件,创建栅格地图
  565. if template_raster:
  566. try:
  567. # 创建栅格
  568. raster_path = csv_path.replace('.csv', '_raster.tif')
  569. raster_path, stats = mapper.vector_to_raster(
  570. shapefile_path, template_raster, raster_path, 'flux_value',
  571. resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
  572. interpolation_method='nearest', enable_interpolation=enable_interpolation
  573. )
  574. generated_files["raster"] = raster_path
  575. # 创建栅格地图 - 使用英文标题避免中文乱码
  576. map_title = "Agricultural Input Cd Flux"
  577. map_file = mapper.create_raster_map(
  578. boundary_shp if boundary_shp else None,
  579. raster_path,
  580. map_output,
  581. colormap=colormap,
  582. title=map_title,
  583. output_size=12,
  584. dpi=300,
  585. resolution_factor=4.0,
  586. enable_interpolation=False,
  587. interpolation_method='nearest',
  588. boundary_gdf=boundary_gdf
  589. )
  590. generated_files["map"] = map_file
  591. # 创建直方图 - 使用英文标题避免中文乱码
  592. histogram_title = "Agricultural Input Cd Flux Distribution"
  593. histogram_file = mapper.create_histogram(
  594. raster_path,
  595. f"{histogram_output}.jpg",
  596. title=histogram_title,
  597. xlabel='Cd Flux (g/ha/a)',
  598. ylabel='Frequency Density'
  599. )
  600. generated_files["histogram"] = histogram_file
  601. except Exception as viz_error:
  602. self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
  603. # 即使栅格可视化失败,也返回已生成的文件
  604. # 清理中间文件(默认开启,仅保留最终可视化)
  605. if cleanup_intermediate:
  606. try:
  607. self._cleanup_intermediate_files(generated_files, None)
  608. except Exception as cleanup_err:
  609. self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
  610. self.logger.info(f"✓ 成功创建农业投入Cd通量可视化,生成文件: {list(generated_files.keys())}")
  611. return generated_files
  612. except Exception as e:
  613. self.logger.error(f"创建农业投入可视化失败: {str(e)}")
  614. raise
  615. def _cleanup_intermediate_files(self, generated_files: Dict[str, str], boundary_shp: Optional[str]) -> None:
  616. """
  617. 清理中间文件:CSV、Shapefile 及其配套文件、栅格TIFF;若边界为临时目录,则一并删除
  618. """
  619. import shutil
  620. import tempfile
  621. def _safe_remove(path: str) -> None:
  622. try:
  623. if path and os.path.exists(path) and os.path.isfile(path):
  624. os.remove(path)
  625. except Exception:
  626. pass
  627. # 删除 CSV
  628. _safe_remove(generated_files.get("csv"))
  629. # 删除栅格
  630. _safe_remove(generated_files.get("raster"))
  631. # 删除 Shapefile 全家桶
  632. shp_path = generated_files.get("shapefile")
  633. if shp_path:
  634. base, _ = os.path.splitext(shp_path)
  635. for ext in (".shp", ".shx", ".dbf", ".prj", ".cpg"):
  636. _safe_remove(base + ext)
  637. # 如果边界文件来自系统临时目录,删除其所在目录
  638. if boundary_shp:
  639. temp_root = tempfile.gettempdir()
  640. try:
  641. if os.path.commonprefix([os.path.abspath(boundary_shp), temp_root]) == temp_root:
  642. temp_dir = os.path.dirname(os.path.abspath(boundary_shp))
  643. if os.path.isdir(temp_dir):
  644. shutil.rmtree(temp_dir, ignore_errors=True)
  645. except Exception:
  646. pass
  647. def _get_boundary_gdf_from_database(self, area: str, level: str) -> Optional[gpd.GeoDataFrame]:
  648. """
  649. 直接从数据库获取边界数据作为GeoDataFrame
  650. @param area: 地区名称
  651. @param level: 行政层级
  652. @returns: 边界GeoDataFrame或None
  653. """
  654. try:
  655. with SessionLocal() as db:
  656. norm_area = area.strip()
  657. boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
  658. if boundary_gdf is not None:
  659. self.logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
  660. return boundary_gdf
  661. except Exception as e:
  662. self.logger.warning(f"从数据库获取边界数据失败: {str(e)}")
  663. return None