water_service.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. import os
  2. import geopandas as gpd
  3. import pandas as pd
  4. from pyproj import Transformer
  5. from shapely.geometry import Point
  6. import rasterio
  7. from typing import Optional, Dict, Any
  8. from datetime import datetime
  9. import numpy as np
  10. import logging
  11. import shutil
  12. import sys
  13. # 导入MappingUtils
  14. from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
  15. # 导入土地利用服务
  16. from .land_use_service import land_use_service
  17. # 导入边界服务
  18. from .admin_boundary_service import get_boundary_gdf_by_name
  19. from ..database import SessionLocal
  20. import geopandas as gpd
  21. import tempfile
  22. # 配置日志
  23. logger = logging.getLogger(__name__)
  24. # 设置全局字体
  25. import matplotlib.pyplot as plt
  26. plt.rcParams['font.family'] = 'Arial'
  27. plt.rcParams['axes.unicode_minus'] = False
  28. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
  29. # 设定常用的colormap(与mapping_utils保持一致)
  30. COLORMAPS = {
  31. "yellow_orange_brown": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
  32. "blue_series": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
  33. "yellow_green": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
  34. "green_brown": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
  35. "yellow_pink_purple": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
  36. "green_yellow_red_purple": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
  37. }
  38. # 建立中文到英文的colormap映射
  39. COLORMAP_MAPPING = {
  40. "黄-橙-棕": "yellow_orange_brown",
  41. "蓝色系": "blue_series",
  42. "淡黄-草绿": "yellow_green",
  43. "绿色-棕色": "green_brown",
  44. "黄-粉-紫": "yellow_pink_purple",
  45. "绿-黄-红-紫": "green_yellow_red_purple"
  46. }
  47. # 主路径配置
  48. def get_base_dir():
  49. """获取基础目录路径"""
  50. if getattr(sys, 'frozen', False):
  51. # 打包后的可执行文件
  52. return os.path.dirname(sys.executable)
  53. else:
  54. # 脚本运行模式
  55. return os.path.dirname(os.path.abspath(__file__))
  56. def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
  57. """
  58. 直接从数据库获取边界数据作为GeoDataFrame
  59. @description: 与其他模块保持一致的边界获取方法
  60. @param area: 地区名称
  61. @param level: 行政层级(county | city | province)
  62. @returns: 边界GeoDataFrame或None
  63. """
  64. try:
  65. with SessionLocal() as db:
  66. norm_area = area.strip()
  67. boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
  68. if boundary_gdf is not None:
  69. logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
  70. return boundary_gdf
  71. except Exception as e:
  72. logger.warning(f"从数据库获取边界数据失败: {str(e)}")
  73. return None
  74. # 土地数据处理函数
  75. def process_land_data(land_type, coefficient_params=None, save_csv=True):
  76. """处理土地类型数据并生成清洗后的简化数据"""
  77. base_dir = get_base_dir()
  78. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  79. logger.info(f"处理土地类型: {land_type}")
  80. logger.info(f"Excel文件: {xls_file}")
  81. # 从数据库获取土地利用数据,替代原来的SHP文件读取
  82. logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
  83. land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
  84. if land_centers_df is None or land_centers_df.empty:
  85. logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
  86. return None, None, None
  87. logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
  88. # 读取Excel采样点数据
  89. if not os.path.exists(xls_file):
  90. logger.error(f"采样点Excel文件不存在: {xls_file}")
  91. return None, None, None
  92. df_xls = pd.read_excel(xls_file)
  93. logger.info(f"读取到 {len(df_xls)} 个采样点数据")
  94. # 设置土地类型系数
  95. default_params = {
  96. "水田": (711, 0.524),
  97. "水浇地": (427, 0.599),
  98. "旱地": (200, 0.7)
  99. }
  100. params = coefficient_params or default_params
  101. param1, param2 = params.get(land_type, (200, 0.7))
  102. Num = param1 * param2
  103. logger.info(f"系数: {param1} * {param2} = {Num}")
  104. # 处理每个面要素,使用数据库中的中心点坐标
  105. cd_values = []
  106. centers = []
  107. for index, row in land_centers_df.iterrows():
  108. center_lon = row['center_lon']
  109. center_lat = row['center_lat']
  110. centers.append((center_lon, center_lat))
  111. # 计算到所有采样点的距离
  112. distances = df_xls.apply(
  113. lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
  114. axis=1
  115. )
  116. min_idx = distances.idxmin()
  117. nearest = df_xls.loc[min_idx]
  118. # 计算Cd含量值
  119. cd_value = nearest['Cd (ug/L)'] * Num
  120. cd_values.append(cd_value)
  121. # 创建简化数据DataFrame
  122. simplified_data = pd.DataFrame({
  123. 'lon': [c[0] for c in centers],
  124. 'lat': [c[1] for c in centers],
  125. 'Prediction': cd_values
  126. })
  127. # 应用3σ原则检测异常值
  128. mean_value = simplified_data['Prediction'].mean()
  129. std_value = simplified_data['Prediction'].std()
  130. lower_bound = mean_value - 3 * std_value
  131. upper_bound = mean_value + 3 * std_value
  132. logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
  133. logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
  134. # 识别异常值
  135. outliers = simplified_data[
  136. (simplified_data['Prediction'] < lower_bound) |
  137. (simplified_data['Prediction'] > upper_bound)
  138. ]
  139. logger.info(f"检测到异常值数量: {len(outliers)}")
  140. # 创建清洗后的数据
  141. cleaned_data = simplified_data[
  142. (simplified_data['Prediction'] >= lower_bound) &
  143. (simplified_data['Prediction'] <= upper_bound)
  144. ]
  145. logger.info(f"清洗后数据点数: {len(cleaned_data)}")
  146. # 可选:保存CSV文件用于兼容性和调试
  147. cleaned_csv_path = None
  148. if save_csv:
  149. # 创建输出目录
  150. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  151. os.makedirs(data_dir, exist_ok=True)
  152. logger.info(f"数据目录: {data_dir}")
  153. cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  154. cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
  155. logger.info(f"保存CSV: {cleaned_csv_path}")
  156. else:
  157. logger.info("跳过CSV文件生成(内存处理优化)")
  158. return cleaned_data, Num, cleaned_csv_path # 返回DataFrame, 系数, 和CSV路径(如果生成了)
  159. # 可视化函数(完全使用统一的MappingUtils接口)
  160. def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
  161. """
  162. 生成栅格地图可视化
  163. @description: 使用统一的MappingUtils接口生成栅格地图
  164. @param shp_path: 边界shapefile路径
  165. @param tif_path: 栅格数据文件路径
  166. @param color_map_name: 色彩方案名称(中文)
  167. @param title_name: 地图标题
  168. @param output_path: 输出图片路径
  169. @param output_size: 图片尺寸
  170. @returns: 输出图片路径或None(失败时)
  171. """
  172. try:
  173. logger.info(f"生成栅格地图: {title_name}")
  174. # 确保输出目录存在
  175. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  176. # 创建MappingUtils实例
  177. mapper = MappingUtils()
  178. # 转换颜色方案名称
  179. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  180. # 直接调用统一的绘图接口,无需额外包装
  181. generated_path = mapper.create_raster_map(
  182. shp_path=shp_path,
  183. tif_path=tif_path,
  184. output_path=os.path.splitext(output_path)[0], # 去掉扩展名,MappingUtils会自动添加.jpg
  185. colormap=colormap_key,
  186. title=title_name,
  187. output_size=output_size,
  188. figsize=None,
  189. dpi=300,
  190. enable_interpolation=False,
  191. interpolation_method='linear'
  192. )
  193. # 如果生成的路径与期望路径不同,则重命名
  194. if generated_path != output_path and os.path.exists(generated_path):
  195. shutil.move(generated_path, output_path)
  196. logger.info(f"重命名图片: {generated_path} -> {output_path}")
  197. return output_path
  198. return generated_path
  199. except Exception as e:
  200. logger.error(f"栅格地图生成失败: {str(e)}")
  201. return None
  202. def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
  203. xlabel='Cd(ug/L)', ylabel='frequency density',
  204. title='Irrigation water input flux'):
  205. """
  206. 生成TIFF文件的直方图
  207. @description: 使用统一的MappingUtils接口生成直方图
  208. @param file_path: TIFF文件路径
  209. @param output_path: 输出图片路径
  210. @param figsize: 图片尺寸元组
  211. @param xlabel: X轴标签
  212. @param ylabel: Y轴标签
  213. @param title: 图表标题
  214. @returns: 输出图片路径或None(失败时)
  215. """
  216. try:
  217. logger.info(f"生成直方图: {file_path}")
  218. # 创建MappingUtils实例
  219. mapper = MappingUtils()
  220. # 直接调用统一的绘图接口,无需额外包装
  221. generated_path = mapper.create_histogram(
  222. file_path=file_path,
  223. save_path=output_path,
  224. figsize=figsize,
  225. xlabel=xlabel,
  226. ylabel=ylabel,
  227. title=title,
  228. bins=100,
  229. dpi=300
  230. )
  231. return generated_path
  232. except Exception as e:
  233. logger.error(f"直方图生成失败: {str(e)}")
  234. return None
  235. def process_land_to_visualization(land_type, coefficient_params=None,
  236. color_map_name="绿-黄-红-紫",
  237. output_size=8,
  238. area: Optional[str] = None,
  239. level: Optional[str] = None,
  240. enable_interpolation: Optional[bool] = True,
  241. interpolation_method: Optional[str] = "linear",
  242. resolution_factor: Optional[float] = 4.0,
  243. save_csv: Optional[bool] = True):
  244. """
  245. 完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
  246. @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
  247. 支持通过area和level参数动态控制地图边界,支持插值控制参数
  248. 工作流程:
  249. 1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量
  250. 2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
  251. 3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
  252. 4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
  253. 5. 生成直方图 - 直接调用MappingUtils.create_histogram()
  254. @param land_type: 土地类型(水田、旱地、水浇地)
  255. @param coefficient_params: 可选的系数参数字典
  256. @param color_map_name: 色彩方案名称(中文)
  257. @param output_size: 输出图片尺寸
  258. @param area: 可选的地区名称,如"乐昌市",用于动态控制地图边界
  259. @param level: 可选的行政层级,county | city | province
  260. @param enable_interpolation: 是否启用空间插值,默认True
  261. @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
  262. @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
  263. @param save_csv: 是否生成CSV文件,默认True
  264. @returns: 包含所有生成文件路径的元组
  265. """
  266. base_dir = get_base_dir()
  267. logger.info(f"开始处理: {land_type}")
  268. # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
  269. cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
  270. land_type,
  271. coefficient_params,
  272. save_csv=save_csv # 根据参数决定是否生成CSV文件
  273. )
  274. if cleaned_data is None:
  275. logger.error(f"处理土地数据失败: {land_type}")
  276. return None, None, None, None, None, None
  277. # 2. 获取动态边界数据(提前获取用于栅格转换)
  278. boundary_gdf = None
  279. if area and level:
  280. logger.info(f"尝试获取动态边界: area='{area}', level='{level}'")
  281. boundary_gdf = get_boundary_gdf_from_database(area, level)
  282. if boundary_gdf is not None:
  283. logger.info(f"成功获取动态边界用于栅格转换: {area} ({level}) - 包含 {len(boundary_gdf)} 个几何体")
  284. logger.info(f"边界数据范围: {boundary_gdf.bounds.to_dict('records')[0] if len(boundary_gdf) > 0 else 'N/A'}")
  285. else:
  286. logger.warning(f"未能获取动态边界,栅格转换将使用完整范围: {area} ({level})")
  287. logger.warning("可能的原因: 1) 数据库中没有该地区数据 2) 地区名称拼写错误 3) level参数不正确")
  288. else:
  289. logger.info("未指定area和level参数,尝试获取默认边界(乐昌市)")
  290. boundary_gdf = get_boundary_gdf_from_database("乐昌市", "county")
  291. if boundary_gdf is not None:
  292. logger.info(f"成功获取默认边界(乐昌市)用于栅格转换 - 包含 {len(boundary_gdf)} 个几何体")
  293. else:
  294. logger.info("未能获取默认边界,栅格转换将使用数据点范围")
  295. # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
  296. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  297. template_tif = os.path.join(raster_dir, "meanTemp.tif")
  298. output_dir = raster_dir # 直接使用raster目录作为输出目录
  299. # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
  300. workflow_result = dataframe_to_raster_workflow(
  301. df=cleaned_data, # 直接使用DataFrame
  302. template_tif=template_tif,
  303. output_dir=output_dir,
  304. boundary_gdf=boundary_gdf, # 直接使用GeoDataFrame
  305. resolution_factor=resolution_factor, # 可配置的分辨率因子
  306. interpolation_method=interpolation_method, # 可配置的插值方法
  307. field_name='Prediction',
  308. lon_col=0, # DataFrame中经度列索引
  309. lat_col=1, # DataFrame中纬度列索引
  310. value_col=2, # DataFrame中数值列索引
  311. enable_interpolation=enable_interpolation # 可配置的插值开关
  312. )
  313. # 获取输出的栅格文件路径
  314. output_tif = workflow_result['raster']
  315. logger.info(f"生成栅格文件: {output_tif}")
  316. # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
  317. map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
  318. try:
  319. # 创建MappingUtils实例并直接调用
  320. mapper = MappingUtils()
  321. # 转换颜色方案名称
  322. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  323. # 统一使用boundary_gdf,不再依赖Shapefile
  324. map_result = mapper.create_raster_map(
  325. shp_path=None, # 不使用Shapefile
  326. tif_path=output_tif,
  327. output_path=os.path.splitext(map_output)[0], # 去掉扩展名
  328. colormap=colormap_key,
  329. title="Irrigation water input flux",
  330. output_size=output_size,
  331. dpi=300,
  332. enable_interpolation=enable_interpolation, # 可配置的插值开关
  333. interpolation_method=interpolation_method, # 可配置的插值方法
  334. boundary_gdf=boundary_gdf # 使用从数据库获取的边界数据,如果为None则使用数据点范围
  335. )
  336. # 如果生成的路径与期望不同,则重命名
  337. if map_result != map_output and os.path.exists(map_result):
  338. shutil.move(map_result, map_output)
  339. logger.info(f"栅格地图已生成: {map_output}")
  340. except Exception as e:
  341. logger.error(f"栅格地图生成失败: {str(e)}")
  342. # 5. 生成直方图(直接使用统一的MappingUtils接口)
  343. hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
  344. try:
  345. # 直接调用统一绘图接口生成直方图
  346. hist_result = mapper.create_histogram(
  347. file_path=output_tif,
  348. save_path=hist_output,
  349. figsize=(8, 8),
  350. xlabel='Cd(ug/L)',
  351. ylabel='frequency density',
  352. title='Irrigation water input flux',
  353. bins=100,
  354. dpi=300
  355. )
  356. if hist_result:
  357. logger.info(f"直方图已生成: {hist_output}")
  358. except Exception as e:
  359. logger.error(f"直方图生成失败: {str(e)}")
  360. # 使用实际的CSV路径(如果生成了)或生成兼容性路径
  361. if cleaned_csv_path:
  362. cleaned_csv = cleaned_csv_path
  363. else:
  364. # 为了兼容性,生成路径(即使文件不存在)
  365. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  366. cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  367. return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
  368. def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
  369. """
  370. 获取指定土地类型的Cd预测结果统计信息
  371. @param {str} land_type - 土地类型(水田、旱地或水浇地)
  372. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  373. """
  374. try:
  375. logger.info(f"获取土地类型统计信息: {land_type}")
  376. # 获取基础目录
  377. base_dir = get_base_dir()
  378. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  379. # 构建数据文件路径
  380. data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  381. logger.info(f"数据文件路径: {data_file}")
  382. if not os.path.exists(data_file):
  383. logger.warning(f"未找到土地类型数据文件: {data_file}")
  384. return None
  385. # 读取预测数据
  386. df = pd.read_csv(data_file)
  387. logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
  388. # 检查必要的列是否存在
  389. if 'Prediction' not in df.columns:
  390. logger.warning("数据文件中缺少'Prediction'列")
  391. return None
  392. predictions = df['Prediction']
  393. # 计算基础统计信息
  394. stats = {
  395. "土地类型": land_type,
  396. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'),
  397. "数据点总数": len(predictions),
  398. "均值": float(predictions.mean()),
  399. "中位数": float(predictions.median()),
  400. "标准差": float(predictions.std()),
  401. "最小值": float(predictions.min()),
  402. "最大值": float(predictions.max()),
  403. "25%分位数": float(predictions.quantile(0.25)),
  404. "75%分位数": float(predictions.quantile(0.75)),
  405. "偏度": float(predictions.skew()),
  406. "峰度": float(predictions.kurtosis())
  407. }
  408. return stats
  409. except Exception as e:
  410. logger.error(f"获取土地类型统计信息失败: {str(e)}")
  411. return None
  412. # 主函数
  413. def main():
  414. """主函数用于测试完整的处理流程"""
  415. logger.info("=" * 50)
  416. logger.info("土地数据处理与可视化系统")
  417. logger.info("=" * 50)
  418. try:
  419. # 处理水田数据
  420. logger.info("\n===== 处理水田数据 =====")
  421. results = process_land_to_visualization("水田")
  422. if results and all(results):
  423. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  424. logger.info(f"清洗后CSV: {cleaned_csv}")
  425. logger.info(f"Shapefile: {shapefile}")
  426. logger.info(f"GeoTIFF文件: {tif_file}")
  427. logger.info(f"地图图片: {map_file}")
  428. logger.info(f"直方图: {hist_file}")
  429. logger.info(f"使用系数: {used_coeff}")
  430. else:
  431. logger.error("水田数据处理失败")
  432. # 处理旱地数据(使用自定义参数)
  433. logger.info("\n===== 处理旱地数据 =====")
  434. custom_params = {"旱地": (220, 0.65)}
  435. results = process_land_to_visualization(
  436. "旱地",
  437. coefficient_params=custom_params,
  438. color_map_name="蓝色系"
  439. )
  440. if results and all(results):
  441. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  442. logger.info(f"清洗后CSV: {cleaned_csv}")
  443. logger.info(f"Shapefile: {shapefile}")
  444. logger.info(f"GeoTIFF文件: {tif_file}")
  445. logger.info(f"地图图片: {map_file}")
  446. logger.info(f"直方图: {hist_file}")
  447. logger.info(f"使用系数: {used_coeff}")
  448. else:
  449. logger.error("旱地数据处理失败")
  450. except Exception as e:
  451. logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
  452. finally:
  453. logger.info("处理完成")
  454. if __name__ == "__main__":
  455. main()