water_service.py 14 KB


  1. import os
  2. import geopandas as gpd
  3. import pandas as pd
  4. from pyproj import Transformer
  5. from shapely.geometry import Point
  6. import rasterio
  7. from typing import Optional, Dict, Any
  8. from datetime import datetime
  9. import numpy as np
  10. import logging
  11. import shutil
  12. import sys
  13. # 导入MappingUtils
  14. from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow
  15. # 配置日志
  16. logger = logging.getLogger(__name__)
  17. logger.setLevel(logging.INFO)
  18. handler = logging.StreamHandler()
  19. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  20. handler.setFormatter(formatter)
  21. logger.addHandler(handler)
  22. # 设置全局字体
  23. import matplotlib.pyplot as plt
  24. plt.rcParams['font.family'] = 'Arial'
  25. plt.rcParams['axes.unicode_minus'] = False
  26. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
  27. # 设定常用的colormap(与mapping_utils保持一致)
  28. COLORMAPS = {
  29. "yellow_orange_brown": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
  30. "blue_series": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
  31. "yellow_green": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
  32. "green_brown": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
  33. "yellow_pink_purple": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
  34. "green_yellow_red_purple": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
  35. }
  36. # 建立中文到英文的colormap映射
  37. COLORMAP_MAPPING = {
  38. "黄-橙-棕": "yellow_orange_brown",
  39. "蓝色系": "blue_series",
  40. "淡黄-草绿": "yellow_green",
  41. "绿色-棕色": "green_brown",
  42. "黄-粉-紫": "yellow_pink_purple",
  43. "绿-黄-红-紫": "green_yellow_red_purple"
  44. }
  45. # 主路径配置
  46. def get_base_dir():
  47. """获取基础目录路径"""
  48. if getattr(sys, 'frozen', False):
  49. # 打包后的可执行文件
  50. return os.path.dirname(sys.executable)
  51. else:
  52. # 脚本运行模式
  53. return os.path.dirname(os.path.abspath(__file__))
  54. # 土地数据处理函数
  55. def process_land_data(land_type, coefficient_params=None):
  56. """处理土地类型数据并生成清洗后的简化数据"""
  57. base_dir = get_base_dir()
  58. shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp")
  59. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  60. logger.info(f"处理土地类型: {land_type}")
  61. logger.info(f"SHP文件: {shp_file}")
  62. logger.info(f"Excel文件: {xls_file}")
  63. # 读取和处理SHP数据
  64. gdf_shp = gpd.read_file(shp_file)
  65. gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type]
  66. if gdf_shp.empty:
  67. logger.warning(f"没有找到 '{land_type}' 类型的要素")
  68. return None, None
  69. # 坐标系转换器
  70. transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
  71. # 读取Excel采样点数据
  72. df_xls = pd.read_excel(xls_file)
  73. # 设置土地类型系数
  74. default_params = {
  75. "水田": (711, 0.524),
  76. "水浇地": (427, 0.599),
  77. "旱地": (200, 0.7)
  78. }
  79. params = coefficient_params or default_params
  80. param1, param2 = params.get(land_type, (200, 0.7))
  81. Num = param1 * param2
  82. logger.info(f"系数: {param1} * {param2} = {Num}")
  83. # 处理每个面要素
  84. cd_values = []
  85. centers = []
  86. for index, row in gdf_shp.iterrows():
  87. center_original = row['geometry'].centroid
  88. center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
  89. centers.append((center_lon, center_lat))
  90. distances = df_xls.apply(
  91. lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
  92. axis=1
  93. )
  94. min_idx = distances.idxmin()
  95. nearest = df_xls.loc[min_idx]
  96. # 只计算Cd含量值
  97. cd_value = nearest['Cd (ug/L)'] * Num
  98. cd_values.append(cd_value)
  99. # 创建输出目录
  100. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  101. os.makedirs(data_dir, exist_ok=True)
  102. logger.info(f"数据目录: {data_dir}")
  103. # 创建简化数据DataFrame
  104. simplified_data = pd.DataFrame({
  105. 'lon': [c[0] for c in centers],
  106. 'lat': [c[1] for c in centers],
  107. 'Prediction': cd_values
  108. })
  109. # 应用3σ原则检测异常值
  110. mean_value = simplified_data['Prediction'].mean()
  111. std_value = simplified_data['Prediction'].std()
  112. lower_bound = mean_value - 3 * std_value
  113. upper_bound = mean_value + 3 * std_value
  114. logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
  115. logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
  116. # 识别异常值
  117. outliers = simplified_data[
  118. (simplified_data['Prediction'] < lower_bound) |
  119. (simplified_data['Prediction'] > upper_bound)
  120. ]
  121. logger.info(f"检测到异常值数量: {len(outliers)}")
  122. # 创建清洗后的数据
  123. cleaned_data = simplified_data[
  124. (simplified_data['Prediction'] >= lower_bound) &
  125. (simplified_data['Prediction'] <= upper_bound)
  126. ]
  127. logger.info(f"清洗后数据点数: {len(cleaned_data)}")
  128. # 保存清洗后的简化数据CSV
  129. cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  130. cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig')
  131. logger.info(f"保存CSV: {cleaned_csv}")
  132. return cleaned_csv, Num
  133. # 可视化函数(使用MappingUtils)
  134. def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
  135. """生成栅格地图可视化"""
  136. try:
  137. logger.info(f"生成栅格地图: {title_name}")
  138. # 确保输出目录存在
  139. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  140. # 创建MappingUtils实例
  141. mapper = MappingUtils()
  142. # 转换颜色方案名称
  143. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  144. # 调用MappingUtils的高级绘图功能
  145. mapper.create_raster_map(
  146. shp_path=shp_path,
  147. tif_path=tif_path,
  148. output_path=os.path.splitext(output_path)[0], # 去掉扩展名
  149. colormap=colormap_key,
  150. title=title_name,
  151. output_size=output_size,
  152. figsize=None,
  153. dpi=300,
  154. enable_interpolation=False,
  155. interpolation_method='linear'
  156. )
  157. # MappingUtils会自动添加.jpg扩展名,重命名文件以匹配原始输出路径
  158. generated_path = os.path.splitext(output_path)[0] + ".jpg"
  159. if generated_path != output_path and os.path.exists(generated_path):
  160. shutil.move(generated_path, output_path)
  161. logger.info(f"重命名图片: {generated_path} -> {output_path}")
  162. return output_path
  163. except Exception as e:
  164. logger.error(f"栅格地图生成失败: {str(e)}")
  165. return None
  166. def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
  167. xlabel='Cd(ug/L)', ylabel='frequency density',
  168. title='Irrigation water input flux'):
  169. """生成TIFF文件的直方图"""
  170. try:
  171. logger.info(f"生成直方图: {file_path}")
  172. # 创建MappingUtils实例
  173. mapper = MappingUtils()
  174. # 调用MappingUtils的高级直方图功能
  175. mapper.create_histogram(
  176. file_path=file_path,
  177. save_path=output_path,
  178. figsize=figsize,
  179. xlabel=xlabel,
  180. ylabel=ylabel,
  181. title=title,
  182. bins=100,
  183. dpi=300
  184. )
  185. return output_path
  186. except Exception as e:
  187. logger.error(f"直方图生成失败: {str(e)}")
  188. return None
  189. # 完整的处理流程
  190. def process_land_to_visualization(land_type, coefficient_params=None,
  191. color_map_name="绿-黄-红-紫",
  192. output_size=8):
  193. """
  194. 完整的土地数据处理可视化流程:
  195. 1. 生成清洗后CSV
  196. 2. 使用csv_to_raster_workflow转换为GeoTIFF
  197. 3. 生成栅格地图
  198. 4. 生成直方图
  199. 返回所有生成文件的路径
  200. """
  201. base_dir = get_base_dir()
  202. logger.info(f"开始处理: {land_type}")
  203. # 1. 生成清洗后的CSV
  204. cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params)
  205. if not cleaned_csv:
  206. logger.error(f"处理土地数据失败: {land_type}")
  207. return None, None, None, None, None, None
  208. # 2. 使用csv_to_raster_workflow转换为GeoTIFF
  209. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  210. template_tif = os.path.join(raster_dir, "meanTemp.tif")
  211. output_dir = os.path.dirname(cleaned_csv) # 使用CSV所在目录作为输出目录
  212. # 调用csv_to_raster_workflow
  213. workflow_result = csv_to_raster_workflow(
  214. csv_file=cleaned_csv,
  215. template_tif=template_tif,
  216. output_dir=output_dir,
  217. boundary_shp=None,
  218. resolution_factor=4.0,
  219. interpolation_method='linear',
  220. field_name='Prediction',
  221. lon_col=0, # CSV中经度列索引
  222. lat_col=1, # CSV中纬度列索引
  223. value_col=2, # CSV中数值列索引
  224. enable_interpolation=True
  225. )
  226. # 获取输出的栅格文件路径
  227. output_tif = workflow_result['raster']
  228. logger.info(f"生成栅格文件: {output_tif}")
  229. # 3. 生成栅格地图
  230. map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
  231. county_shp = os.path.join(raster_dir, "Lechang.shp") # 县界SHP
  232. mapping_raster(
  233. shp_path=county_shp,
  234. tif_path=output_tif,
  235. color_map_name=color_map_name,
  236. title_name=f"Irrigation water input flux",
  237. output_path=map_output,
  238. output_size=output_size
  239. )
  240. # 4. 生成直方图
  241. hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
  242. plot_tif_histogram(
  243. file_path=output_tif,
  244. output_path=hist_output,
  245. title=f"Irrigation water input flux"
  246. )
  247. return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
  248. def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
  249. """
  250. 获取指定土地类型的Cd预测结果统计信息
  251. @param {str} land_type - 土地类型(水田、旱地或水浇地)
  252. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  253. """
  254. try:
  255. logger.info(f"获取土地类型统计信息: {land_type}")
  256. # 获取基础目录
  257. base_dir = get_base_dir()
  258. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  259. # 构建数据文件路径
  260. data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  261. logger.info(f"数据文件路径: {data_file}")
  262. if not os.path.exists(data_file):
  263. logger.warning(f"未找到土地类型数据文件: {data_file}")
  264. return None
  265. # 读取预测数据
  266. df = pd.read_csv(data_file)
  267. logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
  268. # 检查必要的列是否存在
  269. if 'Prediction' not in df.columns:
  270. logger.warning("数据文件中缺少'Prediction'列")
  271. return None
  272. predictions = df['Prediction']
  273. # 计算基础统计信息
  274. stats = {
  275. "土地类型": land_type,
  276. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'),
  277. "数据点总数": len(predictions),
  278. "均值": float(predictions.mean()),
  279. "中位数": float(predictions.median()),
  280. "标准差": float(predictions.std()),
  281. "最小值": float(predictions.min()),
  282. "最大值": float(predictions.max()),
  283. "25%分位数": float(predictions.quantile(0.25)),
  284. "75%分位数": float(predictions.quantile(0.75)),
  285. "偏度": float(predictions.skew()),
  286. "峰度": float(predictions.kurtosis())
  287. }
  288. return stats
  289. except Exception as e:
  290. logger.error(f"获取土地类型统计信息失败: {str(e)}")
  291. return None
  292. # 主函数
  293. def main():
  294. """主函数用于测试完整的处理流程"""
  295. logger.info("=" * 50)
  296. logger.info("土地数据处理与可视化系统")
  297. logger.info("=" * 50)
  298. try:
  299. # 处理水田数据
  300. logger.info("\n===== 处理水田数据 =====")
  301. results = process_land_to_visualization("水田")
  302. if results and all(results):
  303. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  304. logger.info(f"清洗后CSV: {cleaned_csv}")
  305. logger.info(f"Shapefile: {shapefile}")
  306. logger.info(f"GeoTIFF文件: {tif_file}")
  307. logger.info(f"地图图片: {map_file}")
  308. logger.info(f"直方图: {hist_file}")
  309. logger.info(f"使用系数: {used_coeff}")
  310. else:
  311. logger.error("水田数据处理失败")
  312. # 处理旱地数据(使用自定义参数)
  313. logger.info("\n===== 处理旱地数据 =====")
  314. custom_params = {"旱地": (220, 0.65)}
  315. results = process_land_to_visualization(
  316. "旱地",
  317. coefficient_params=custom_params,
  318. color_map_name="蓝色系"
  319. )
  320. if results and all(results):
  321. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  322. logger.info(f"清洗后CSV: {cleaned_csv}")
  323. logger.info(f"Shapefile: {shapefile}")
  324. logger.info(f"GeoTIFF文件: {tif_file}")
  325. logger.info(f"地图图片: {map_file}")
  326. logger.info(f"直方图: {hist_file}")
  327. logger.info(f"使用系数: {used_coeff}")
  328. else:
  329. logger.error("旱地数据处理失败")
  330. except Exception as e:
  331. logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
  332. finally:
  333. logger.info("处理完成")
  334. if __name__ == "__main__":
  335. main()