water_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. import os
  2. import geopandas as gpd
  3. import pandas as pd
  4. from pyproj import Transformer
  5. from shapely.geometry import Point
  6. import rasterio
  7. from rasterio.features import rasterize
  8. from typing import Optional, Dict, Any
  9. from datetime import datetime
  10. import numpy as np
  11. import json
  12. import logging
  13. import shutil
  14. import tempfile
  15. from pathlib import Path
  16. import matplotlib.pyplot as plt
  17. from matplotlib.colors import ListedColormap, BoundaryNorm
  18. from rasterio.mask import mask
  19. from rasterio.plot import show
  20. import seaborn as sns
  21. import sys
  22. # 配置日志
  23. logger = logging.getLogger(__name__)
  24. logger.setLevel(logging.INFO)
  25. handler = logging.StreamHandler()
  26. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  27. handler.setFormatter(formatter)
  28. logger.addHandler(handler)
  29. # 设置全局字体
  30. plt.rcParams['font.family'] = 'Arial'
  31. plt.rcParams['axes.unicode_minus'] = False
  32. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
  33. # 设定常用的colormap
  34. COLORMAPS = {
  35. "黄-橙-棕": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
  36. "蓝色系": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
  37. "淡黄-草绿": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
  38. "绿色-棕色": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
  39. "黄-粉-紫": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
  40. "绿-黄-红-紫": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
  41. }
  42. # 主路径配置
  43. def get_base_dir():
  44. """获取基础目录路径"""
  45. if getattr(sys, 'frozen', False):
  46. # 打包后的可执行文件
  47. return os.path.dirname(sys.executable)
  48. else:
  49. # 脚本运行模式
  50. return os.path.dirname(os.path.abspath(__file__))
  51. # 土地数据处理函数
  52. def process_land_data(land_type, coefficient_params=None):
  53. """处理土地类型数据并生成清洗后的简化数据"""
  54. base_dir = get_base_dir()
  55. shp_file = os.path.join(base_dir, "..", "static", "water", "Raster", "四县三种用电.shp")
  56. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  57. logger.info(f"处理土地类型: {land_type}")
  58. logger.info(f"SHP文件: {shp_file}")
  59. logger.info(f"Excel文件: {xls_file}")
  60. # 读取和处理SHP数据
  61. gdf_shp = gpd.read_file(shp_file)
  62. gdf_shp = gdf_shp[gdf_shp['DLMC'] == land_type]
  63. if gdf_shp.empty:
  64. logger.warning(f"没有找到 '{land_type}' 类型的要素")
  65. return None, None
  66. # 坐标系转换器
  67. transformer = Transformer.from_crs(gdf_shp.crs, "EPSG:4326", always_xy=True)
  68. # 读取Excel采样点数据
  69. df_xls = pd.read_excel(xls_file)
  70. # 设置土地类型系数
  71. default_params = {
  72. "水田": (711, 0.524),
  73. "水浇地": (427, 0.599),
  74. "旱地": (200, 0.7)
  75. }
  76. params = coefficient_params or default_params
  77. param1, param2 = params.get(land_type, (200, 0.7))
  78. Num = param1 * param2
  79. logger.info(f"系数: {param1} * {param2} = {Num}")
  80. # 处理每个面要素
  81. cd_values = []
  82. centers = []
  83. for index, row in gdf_shp.iterrows():
  84. center_original = row['geometry'].centroid
  85. center_lon, center_lat = transformer.transform(center_original.x, center_original.y)
  86. centers.append((center_lon, center_lat))
  87. distances = df_xls.apply(
  88. lambda x: Point(center_lon, center_lat).distance(Point(x['经度'], x['纬度'])),
  89. axis=1
  90. )
  91. min_idx = distances.idxmin()
  92. nearest = df_xls.loc[min_idx]
  93. # 只计算Cd含量值
  94. cd_value = nearest['Cd (ug/L)'] * Num
  95. cd_values.append(cd_value)
  96. # 创建输出目录
  97. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  98. os.makedirs(data_dir, exist_ok=True)
  99. logger.info(f"数据目录: {data_dir}")
  100. # 创建简化数据DataFrame
  101. simplified_data = pd.DataFrame({
  102. 'lon': [c[0] for c in centers],
  103. 'lat': [c[1] for c in centers],
  104. 'Prediction': cd_values
  105. })
  106. # 应用3σ原则检测异常值
  107. mean_value = simplified_data['Prediction'].mean()
  108. std_value = simplified_data['Prediction'].std()
  109. lower_bound = mean_value - 3 * std_value
  110. upper_bound = mean_value + 3 * std_value
  111. logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
  112. logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
  113. # 识别异常值
  114. outliers = simplified_data[
  115. (simplified_data['Prediction'] < lower_bound) |
  116. (simplified_data['Prediction'] > upper_bound)
  117. ]
  118. logger.info(f"检测到异常值数量: {len(outliers)}")
  119. # 创建清洗后的数据
  120. cleaned_data = simplified_data[
  121. (simplified_data['Prediction'] >= lower_bound) &
  122. (simplified_data['Prediction'] <= upper_bound)
  123. ]
  124. logger.info(f"清洗后数据点数: {len(cleaned_data)}")
  125. # 保存清洗后的简化数据CSV
  126. cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  127. cleaned_data.to_csv(cleaned_csv, index=False, encoding='utf-8-sig')
  128. logger.info(f"保存CSV: {cleaned_csv}")
  129. return cleaned_csv, Num
  130. # 格式转换函数
  131. def csv_to_shapefile(csv_file, shapefile_output):
  132. """将CSV文件转换为Shapefile文件"""
  133. df = pd.read_csv(csv_file)
  134. lon = df.iloc[:, 0]
  135. lat = df.iloc[:, 1]
  136. val = df.iloc[:, 2]
  137. geometry = [Point(xy) for xy in zip(lon, lat)]
  138. gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
  139. os.makedirs(os.path.dirname(shapefile_output), exist_ok=True)
  140. gdf.to_file(shapefile_output, driver="ESRI Shapefile")
  141. logger.info(f"保存Shapefile: {shapefile_output}")
  142. def vector_to_raster(input_shapefile, template_tif, output_tif, field="Prediction"):
  143. """将点矢量数据转换为栅格数据"""
  144. gdf = gpd.read_file(input_shapefile)
  145. with rasterio.open(template_tif) as src:
  146. template_meta = src.meta.copy()
  147. transform = src.transform
  148. width = src.width
  149. height = src.height
  150. crs = src.crs
  151. if gdf.crs != crs:
  152. gdf = gdf.to_crs(crs)
  153. shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[field]))
  154. raster = rasterize(
  155. shapes=shapes,
  156. out_shape=(height, width),
  157. transform=transform,
  158. fill=np.nan,
  159. dtype='float32'
  160. )
  161. os.makedirs(os.path.dirname(output_tif), exist_ok=True)
  162. template_meta.update({
  163. "count": 1,
  164. "dtype": 'float32',
  165. "nodata": np.nan
  166. })
  167. with rasterio.open(output_tif, 'w', ** template_meta) as dst:
  168. dst.write(raster, 1)
  169. logger.info(f"保存GeoTIFF: {output_tif}")
  170. # 可视化函数
  171. def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
  172. """生成栅格地图可视化"""
  173. try:
  174. logger.info(f"生成栅格地图: {title_name}")
  175. # 确保输出目录存在
  176. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  177. # 读取矢量边界
  178. gdf = gpd.read_file(shp_path)
  179. if gdf.empty:
  180. raise ValueError("矢量数据为空")
  181. # 读取并裁剪栅格数据
  182. with rasterio.open(tif_path) as src:
  183. if gdf.crs != src.crs:
  184. gdf = gdf.to_crs(src.crs)
  185. geoms = [json.loads(gdf.to_json())["features"][0]["geometry"]]
  186. out_image, out_transform = mask(src, geoms, crop=True)
  187. # 处理数据
  188. raster = out_image[0].astype('float16')
  189. if np.all(np.isnan(raster)):
  190. raise ValueError("裁剪后的栅格不包含有效数据")
  191. # 根据分位数分为6个等级
  192. bounds = np.nanpercentile(raster, [0, 20, 40, 60, 80, 90, 100])
  193. norm = BoundaryNorm(bounds, ncolors=len(bounds) - 1)
  194. # 自定义颜色设定
  195. cmap = ListedColormap(color_map_name)
  196. # 绘图
  197. fig, ax = plt.subplots(figsize=(output_size, output_size))
  198. show(raster, transform=out_transform, ax=ax, cmap=cmap, norm=norm)
  199. gdf.boundary.plot(ax=ax, color='black', linewidth=1)
  200. # 设置标题和标签
  201. ax.set_title(title_name, fontsize=20)
  202. ax.set_xlabel("Longitude", fontsize=18)
  203. ax.set_ylabel("Latitude", fontsize=18)
  204. # 添加经纬网
  205. ax.grid(True, linestyle='--', color='gray', alpha=0.5)
  206. ax.tick_params(axis='y', labelrotation=90)
  207. # 创建图例
  208. tick_labels = [f"{bounds[i]:.1f}" for i in range(len(bounds) - 1)]
  209. cbar = plt.colorbar(
  210. plt.cm.ScalarMappable(norm=norm, cmap=cmap),
  211. ax=ax,
  212. ticks=[(bounds[i] + bounds[i + 1]) / 2 for i in range(len(bounds) - 1)],
  213. shrink=0.6,
  214. aspect=15
  215. )
  216. cbar.ax.set_yticklabels(tick_labels)
  217. cbar.set_label("Cd(ug/L)")
  218. # 保存图像
  219. plt.tight_layout()
  220. plt.savefig(output_path, dpi=300)
  221. plt.close(fig)
  222. logger.info(f"保存地图图片: {output_path}")
  223. return output_path
  224. except Exception as e:
  225. logger.error(f"栅格地图生成失败: {str(e)}")
  226. return None
  227. def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
  228. xlabel='Cd(ug/L)', ylabel='frequency density',
  229. title='Irrigation water input flux'):
  230. """生成TIFF文件的直方图"""
  231. try:
  232. logger.info(f"生成直方图: {file_path}")
  233. with rasterio.open(file_path) as src:
  234. band = src.read(1)
  235. nodata = src.nodata
  236. if nodata is not None:
  237. band = np.where(band == nodata, np.nan, band)
  238. band_flat = band.flatten()
  239. band_flat = band_flat[~np.isnan(band_flat)]
  240. plt.figure(figsize=figsize)
  241. sns.histplot(band_flat, bins=100, color='steelblue', alpha=0.7, edgecolor='black', stat='density')
  242. sns.kdeplot(band_flat, color='red', linewidth=2)
  243. plt.xlabel(xlabel, fontsize=14)
  244. plt.ylabel(ylabel, fontsize=14)
  245. plt.title(title, fontsize=16)
  246. plt.grid(True, linestyle='--', alpha=0.5)
  247. plt.tight_layout()
  248. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  249. plt.savefig(output_path, dpi=300, format='jpg', bbox_inches='tight')
  250. plt.close()
  251. logger.info(f"保存直方图: {output_path}")
  252. return output_path
  253. except Exception as e:
  254. logger.error(f"直方图生成失败: {str(e)}")
  255. return None
  256. # 完整的处理流程
  257. def process_land_to_visualization(land_type, coefficient_params=None,
  258. color_map_name="绿-黄-红-紫",
  259. output_size=8):
  260. """
  261. 完整的土地数据处理可视化流程:
  262. 1. 生成清洗后CSV
  263. 2. 转换为Shapefile
  264. 3. 转换为GeoTIFF
  265. 4. 生成栅格地图
  266. 5. 生成直方图
  267. 返回所有生成文件的路径
  268. """
  269. base_dir = get_base_dir()
  270. logger.info(f"开始处理: {land_type}")
  271. # 1. 生成清洗后的CSV
  272. cleaned_csv, used_coeff = process_land_data(land_type, coefficient_params)
  273. if not cleaned_csv:
  274. logger.error(f"处理土地数据失败: {land_type}")
  275. return None, None, None, None, None, None
  276. # 2. 转换为Shapefile
  277. shapefile_output = os.path.join(
  278. os.path.dirname(cleaned_csv),
  279. f"中心点经纬度与预测值&{land_type}_清洗.shp"
  280. )
  281. csv_to_shapefile(cleaned_csv, shapefile_output)
  282. # 3. 转换为GeoTIFF
  283. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  284. template_tif = os.path.join(raster_dir, "meanTemp.tif")
  285. output_tif = os.path.join(raster_dir, f"{land_type}_Cd含量.tif")
  286. vector_to_raster(shapefile_output, template_tif, output_tif)
  287. # 4. 生成栅格地图
  288. map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
  289. county_shp = os.path.join(raster_dir, "Lechang.shp") # 县界SHP
  290. mapping_raster(
  291. shp_path=county_shp,
  292. tif_path=output_tif,
  293. color_map_name=COLORMAPS.get(color_map_name, COLORMAPS["绿-黄-红-紫"]),
  294. title_name=f"Irrigation water input flux",
  295. output_path=map_output,
  296. output_size=output_size
  297. )
  298. # 5. 生成直方图
  299. hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
  300. plot_tif_histogram(
  301. file_path=output_tif,
  302. output_path=hist_output,
  303. title=f"Irrigation water input flux"
  304. )
  305. return cleaned_csv, shapefile_output, output_tif, map_output, hist_output, used_coeff
  306. def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
  307. """
  308. 获取指定土地类型的Cd预测结果统计信息
  309. @param {str} land_type - 土地类型(水田、旱地或水浇地)
  310. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  311. """
  312. try:
  313. logger.info(f"获取土地类型统计信息: {land_type}")
  314. # 获取基础目录
  315. base_dir = get_base_dir()
  316. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  317. # 构建数据文件路径
  318. data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  319. logger.info(f"数据文件路径: {data_file}")
  320. if not os.path.exists(data_file):
  321. logger.warning(f"未找到土地类型数据文件: {data_file}")
  322. return None
  323. # 读取预测数据
  324. df = pd.read_csv(data_file)
  325. logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
  326. # 检查必要的列是否存在
  327. if 'Prediction' not in df.columns:
  328. logger.warning("数据文件中缺少'Prediction'列")
  329. return None
  330. predictions = df['Prediction']
  331. # 计算基础统计信息
  332. stats = {
  333. "土地类型": land_type,
  334. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'),
  335. "数据点总数": len(predictions),
  336. "均值": float(predictions.mean()),
  337. "中位数": float(predictions.median()),
  338. "标准差": float(predictions.std()),
  339. "最小值": float(predictions.min()),
  340. "最大值": float(predictions.max()),
  341. "25%分位数": float(predictions.quantile(0.25)),
  342. "75%分位数": float(predictions.quantile(0.75)),
  343. "偏度": float(predictions.skew()),
  344. "峰度": float(predictions.kurtosis())
  345. }
  346. return stats
  347. except Exception as e:
  348. logger.error(f"获取土地类型统计信息失败: {str(e)}")
  349. return None
  350. # 主函数
  351. def main():
  352. """主函数用于测试完整的处理流程"""
  353. logger.info("=" * 50)
  354. logger.info("土地数据处理与可视化系统")
  355. logger.info("=" * 50)
  356. try:
  357. # 处理水田数据
  358. logger.info("\n===== 处理水田数据 =====")
  359. results = process_land_to_visualization("水田")
  360. if results and all(results):
  361. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  362. logger.info(f"清洗后CSV: {cleaned_csv}")
  363. logger.info(f"Shapefile: {shapefile}")
  364. logger.info(f"GeoTIFF文件: {tif_file}")
  365. logger.info(f"地图图片: {map_file}")
  366. logger.info(f"直方图: {hist_file}")
  367. logger.info(f"使用系数: {used_coeff}")
  368. else:
  369. logger.error("水田数据处理失败")
  370. # 处理旱地数据(使用自定义参数)
  371. logger.info("\n===== 处理旱地数据 =====")
  372. custom_params = {"旱地": (220, 0.65)}
  373. results = process_land_to_visualization(
  374. "旱地",
  375. coefficient_params=custom_params,
  376. color_map_name="蓝色系"
  377. )
  378. if results and all(results):
  379. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  380. logger.info(f"清洗后CSV: {cleaned_csv}")
  381. logger.info(f"Shapefile: {shapefile}")
  382. logger.info(f"GeoTIFF文件: {tif_file}")
  383. logger.info(f"地图图片: {map_file}")
  384. logger.info(f"直方图: {hist_file}")
  385. logger.info(f"使用系数: {used_coeff}")
  386. else:
  387. logger.error("旱地数据处理失败")
  388. except Exception as e:
  389. logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
  390. finally:
  391. logger.info("处理完成")
  392. if __name__ == "__main__":
  393. main()