water_service.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. import os
  2. import geopandas as gpd
  3. import pandas as pd
  4. from pyproj import Transformer
  5. from shapely.geometry import Point
  6. import rasterio
  7. from typing import Optional, Dict, Any, List
  8. from datetime import datetime
  9. import numpy as np
  10. import logging
  11. import shutil
  12. import sys
  13. from sklearn.neighbors import BallTree
  14. from time import time
  15. # 导入MappingUtils
  16. from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
  17. # 导入土地利用服务
  18. from .land_use_service import land_use_service
  19. # 导入边界服务
  20. from .admin_boundary_service import get_boundary_gdf_by_name
  21. from ..database import SessionLocal
  22. import geopandas as gpd
  23. import tempfile
  24. # 配置日志
  25. logger = logging.getLogger(__name__)
  26. # 设置全局字体
  27. import matplotlib.pyplot as plt
  28. plt.rcParams['font.family'] = 'Arial'
  29. plt.rcParams['axes.unicode_minus'] = False
  30. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
  31. # 设定常用的colormap(与mapping_utils保持一致)
  32. COLORMAPS = {
  33. "yellow_orange_brown": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
  34. "blue_series": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
  35. "yellow_green": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
  36. "green_brown": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
  37. "yellow_pink_purple": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
  38. "green_yellow_red_purple": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
  39. }
  40. # 建立中文到英文的colormap映射
  41. COLORMAP_MAPPING = {
  42. "黄-橙-棕": "yellow_orange_brown",
  43. "蓝色系": "blue_series",
  44. "淡黄-草绿": "yellow_green",
  45. "绿色-棕色": "green_brown",
  46. "黄-粉-紫": "yellow_pink_purple",
  47. "绿-黄-红-紫": "green_yellow_red_purple"
  48. }
  49. # 主路径配置
  50. def get_base_dir():
  51. """获取基础目录路径"""
  52. if getattr(sys, 'frozen', False):
  53. # 打包后的可执行文件
  54. return os.path.dirname(sys.executable)
  55. else:
  56. # 脚本运行模式
  57. return os.path.dirname(os.path.abspath(__file__))
  58. def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
  59. """
  60. 直接从数据库获取边界数据作为GeoDataFrame
  61. @description: 与其他模块保持一致的边界获取方法
  62. @param area: 地区名称
  63. @param level: 行政层级(county | city | province)
  64. @returns: 边界GeoDataFrame或None
  65. """
  66. try:
  67. with SessionLocal() as db:
  68. norm_area = area.strip()
  69. boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
  70. if boundary_gdf is not None:
  71. logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
  72. return boundary_gdf
  73. except Exception as e:
  74. logger.warning(f"从数据库获取边界数据失败: {str(e)}")
  75. return None
  76. def find_nearest_sampling_points_optimized(land_centers_df: pd.DataFrame,
  77. sampling_points_df: pd.DataFrame) -> np.ndarray:
  78. """
  79. 使用BallTree高效计算每个土地中心点的最近采样点
  80. @description: 使用空间索引优化最近邻搜索,将O(n×m)复杂度降低到O(n×log(m))
  81. @param land_centers_df: 土地中心点数据,包含center_lon和center_lat列
  82. @param sampling_points_df: 采样点数据,包含经度和纬度列
  83. @returns: 每个土地中心点对应的最近采样点索引数组
  84. """
  85. logger.info("开始构建空间索引优化最近邻搜索...")
  86. start_time = time()
  87. # 1. 准备采样点坐标数据(转换为弧度用于BallTree)
  88. sampling_coords = np.radians(sampling_points_df[['经度', '纬度']].values)
  89. # 2. 构建BallTree空间索引
  90. logger.info(f"构建BallTree索引,采样点数量: {len(sampling_coords)}")
  91. tree = BallTree(sampling_coords, metric='haversine')
  92. # 3. 准备土地中心点坐标数据
  93. land_coords = np.radians(land_centers_df[['center_lon', 'center_lat']].values)
  94. # 4. 批量查询最近邻(k=1表示只找最近的一个点)
  95. logger.info(f"批量查询最近邻,土地中心点数量: {len(land_coords)}")
  96. distances, indices = tree.query(land_coords, k=1)
  97. # 5. 提取索引(indices是二维数组,我们只需要第一列)
  98. nearest_indices = indices.flatten()
  99. elapsed_time = time() - start_time
  100. logger.info(f"空间索引搜索完成,耗时: {elapsed_time:.2f}秒")
  101. logger.info(f"平均每个点查询时间: {elapsed_time/len(land_coords)*1000:.2f}毫秒")
  102. return nearest_indices
  103. def cleanup_temporary_files(*file_paths):
  104. """
  105. 清理临时文件
  106. @description: 安全地删除指定的临时文件,支持多种文件类型
  107. @param file_paths: 要删除的文件路径(可变参数)
  108. """
  109. import tempfile
  110. for file_path in file_paths:
  111. if not file_path:
  112. continue
  113. try:
  114. if os.path.exists(file_path) and os.path.isfile(file_path):
  115. os.remove(file_path)
  116. logger.info(f"已清理临时文件: {os.path.basename(file_path)}")
  117. # 如果是shapefile,也删除相关的配套文件
  118. if file_path.endswith('.shp'):
  119. base_path = os.path.splitext(file_path)[0]
  120. for ext in ['.shx', '.dbf', '.prj', '.cpg']:
  121. related_file = base_path + ext
  122. if os.path.exists(related_file):
  123. os.remove(related_file)
  124. logger.info(f"已清理相关文件: {os.path.basename(related_file)}")
  125. except Exception as e:
  126. logger.warning(f"清理文件失败 {file_path}: {str(e)}")
  127. def cleanup_temp_files_in_directory(directory: str, patterns: List[str] = None) -> int:
  128. """
  129. 清理指定目录下的临时文件
  130. @description: 根据文件名模式清理目录中的临时文件
  131. @param directory: 要清理的目录路径
  132. @param patterns: 文件名模式列表,默认为['memory_raster_', 'temp_', 'tmp_']
  133. @returns: 清理的文件数量
  134. """
  135. if patterns is None:
  136. patterns = ['memory_raster_', 'temp_', 'tmp_']
  137. if not os.path.exists(directory) or not os.path.isdir(directory):
  138. logger.warning(f"目录不存在或不是有效目录: {directory}")
  139. return 0
  140. cleaned_count = 0
  141. try:
  142. for filename in os.listdir(directory):
  143. file_path = os.path.join(directory, filename)
  144. # 检查是否是文件
  145. if not os.path.isfile(file_path):
  146. continue
  147. # 检查文件名是否匹配任何模式
  148. should_clean = any(pattern in filename for pattern in patterns)
  149. if should_clean:
  150. try:
  151. os.remove(file_path)
  152. logger.info(f"已清理临时文件: {filename}")
  153. cleaned_count += 1
  154. except Exception as e:
  155. logger.warning(f"清理文件失败 {filename}: {str(e)}")
  156. except Exception as e:
  157. logger.error(f"清理目录失败 {directory}: {str(e)}")
  158. return cleaned_count
  159. # 土地数据处理函数
  160. def process_land_data(land_type, coefficient_params=None, save_csv=True):
  161. """处理土地类型数据并生成清洗后的简化数据"""
  162. base_dir = get_base_dir()
  163. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  164. logger.info(f"处理土地类型: {land_type}")
  165. logger.info(f"Excel文件: {xls_file}")
  166. # 从数据库获取土地利用数据,替代原来的SHP文件读取
  167. logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
  168. land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
  169. if land_centers_df is None or land_centers_df.empty:
  170. logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
  171. return None, None, None
  172. logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
  173. logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
  174. # 读取Excel采样点数据
  175. if not os.path.exists(xls_file):
  176. logger.error(f"采样点Excel文件不存在: {xls_file}")
  177. return None, None, None
  178. df_xls = pd.read_excel(xls_file)
  179. logger.info(f"读取到 {len(df_xls)} 个采样点数据")
  180. # 设置土地类型系数
  181. default_params = {
  182. "水田": (711, 0.524),
  183. "水浇地": (427, 0.599),
  184. "旱地": (200, 0.7)
  185. }
  186. params = coefficient_params or default_params
  187. param1, param2 = params.get(land_type, (200, 0.7))
  188. Num = param1 * param2
  189. logger.info(f"系数: {param1} * {param2} = {Num}")
  190. # 高效处理:使用空间索引查找最近采样点
  191. logger.info("开始高效距离计算和Cd值计算...")
  192. start_time = time()
  193. # 使用优化的空间索引方法查找最近采样点
  194. nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
  195. # 批量计算Cd含量值
  196. centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
  197. cd_values = df_xls.iloc[nearest_indices]['Cd (ug/L)'].values * Num
  198. calculation_time = time() - start_time
  199. logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
  200. logger.info(f"处理了 {len(centers)} 个土地中心点")
  201. # 创建简化数据DataFrame
  202. simplified_data = pd.DataFrame({
  203. 'lon': [c[0] for c in centers],
  204. 'lat': [c[1] for c in centers],
  205. 'Prediction': cd_values
  206. })
  207. # 应用3σ原则检测异常值
  208. mean_value = simplified_data['Prediction'].mean()
  209. std_value = simplified_data['Prediction'].std()
  210. lower_bound = mean_value - 3 * std_value
  211. upper_bound = mean_value + 3 * std_value
  212. logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
  213. logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
  214. # 识别异常值
  215. outliers = simplified_data[
  216. (simplified_data['Prediction'] < lower_bound) |
  217. (simplified_data['Prediction'] > upper_bound)
  218. ]
  219. logger.info(f"检测到异常值数量: {len(outliers)}")
  220. # 创建清洗后的数据
  221. cleaned_data = simplified_data[
  222. (simplified_data['Prediction'] >= lower_bound) &
  223. (simplified_data['Prediction'] <= upper_bound)
  224. ]
  225. logger.info(f"清洗后数据点数: {len(cleaned_data)}")
  226. # 可选:保存CSV文件用于兼容性和调试
  227. cleaned_csv_path = None
  228. if save_csv:
  229. # 创建输出目录
  230. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  231. os.makedirs(data_dir, exist_ok=True)
  232. logger.info(f"数据目录: {data_dir}")
  233. cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  234. cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
  235. logger.info(f"保存CSV: {cleaned_csv_path}")
  236. else:
  237. logger.info("跳过CSV文件生成(内存处理优化)")
  238. return cleaned_data, Num, cleaned_csv_path # 返回DataFrame, 系数, 和CSV路径(如果生成了)
  239. # 可视化函数(完全使用统一的MappingUtils接口)
  240. def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
  241. """
  242. 生成栅格地图可视化
  243. @description: 使用统一的MappingUtils接口生成栅格地图
  244. @param shp_path: 边界shapefile路径
  245. @param tif_path: 栅格数据文件路径
  246. @param color_map_name: 色彩方案名称(中文)
  247. @param title_name: 地图标题
  248. @param output_path: 输出图片路径
  249. @param output_size: 图片尺寸
  250. @returns: 输出图片路径或None(失败时)
  251. """
  252. try:
  253. logger.info(f"生成栅格地图: {title_name}")
  254. # 确保输出目录存在
  255. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  256. # 创建MappingUtils实例
  257. mapper = MappingUtils()
  258. # 转换颜色方案名称
  259. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  260. # 直接调用统一的绘图接口,无需额外包装
  261. generated_path = mapper.create_raster_map(
  262. shp_path=shp_path,
  263. tif_path=tif_path,
  264. output_path=os.path.splitext(output_path)[0], # 去掉扩展名,MappingUtils会自动添加.jpg
  265. colormap=colormap_key,
  266. title=title_name,
  267. output_size=output_size,
  268. figsize=None,
  269. dpi=300,
  270. enable_interpolation=False,
  271. interpolation_method='linear'
  272. )
  273. # 如果生成的路径与期望路径不同,则重命名
  274. if generated_path != output_path and os.path.exists(generated_path):
  275. shutil.move(generated_path, output_path)
  276. logger.info(f"重命名图片: {generated_path} -> {output_path}")
  277. return output_path
  278. return generated_path
  279. except Exception as e:
  280. logger.error(f"栅格地图生成失败: {str(e)}")
  281. return None
  282. def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
  283. xlabel='Cd(ug/L)', ylabel='frequency density',
  284. title='Irrigation water input flux'):
  285. """
  286. 生成TIFF文件的直方图
  287. @description: 使用统一的MappingUtils接口生成直方图
  288. @param file_path: TIFF文件路径
  289. @param output_path: 输出图片路径
  290. @param figsize: 图片尺寸元组
  291. @param xlabel: X轴标签
  292. @param ylabel: Y轴标签
  293. @param title: 图表标题
  294. @returns: 输出图片路径或None(失败时)
  295. """
  296. try:
  297. logger.info(f"生成直方图: {file_path}")
  298. # 创建MappingUtils实例
  299. mapper = MappingUtils()
  300. # 直接调用统一的绘图接口,无需额外包装
  301. generated_path = mapper.create_histogram(
  302. file_path=file_path,
  303. save_path=output_path,
  304. figsize=figsize,
  305. xlabel=xlabel,
  306. ylabel=ylabel,
  307. title=title,
  308. bins=100,
  309. dpi=300
  310. )
  311. return generated_path
  312. except Exception as e:
  313. logger.error(f"直方图生成失败: {str(e)}")
  314. return None
  315. def process_land_to_visualization(land_type, coefficient_params=None,
  316. color_map_name="绿-黄-红-紫",
  317. output_size=8,
  318. area: Optional[str] = None,
  319. level: Optional[str] = None,
  320. enable_interpolation: Optional[bool] = True,
  321. interpolation_method: Optional[str] = "linear",
  322. resolution_factor: Optional[float] = 4.0,
  323. save_csv: Optional[bool] = True,
  324. cleanup_temp_files: Optional[bool] = True):
  325. """
  326. 完整的土地数据处理可视化流程(使用统一的MappingUtils接口,支持动态边界和插值控制)
  327. @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
  328. 支持通过area和level参数动态控制地图边界,支持插值控制参数
  329. 工作流程:
  330. 1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量
  331. 2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
  332. 3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
  333. 4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
  334. 5. 生成直方图 - 直接调用MappingUtils.create_histogram()
  335. @param land_type: 土地类型(水田、旱地、水浇地)
  336. @param coefficient_params: 可选的系数参数字典
  337. @param color_map_name: 色彩方案名称(中文)
  338. @param output_size: 输出图片尺寸
  339. @param area: 可选的地区名称,如"乐昌市",用于动态控制地图边界
  340. @param level: 可选的行政层级,county | city | province
  341. @param enable_interpolation: 是否启用空间插值,默认True
  342. @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
  343. @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
  344. @param save_csv: 是否生成CSV文件,默认True
  345. @param cleanup_temp_files: 是否清理临时文件,默认True
  346. @returns: 包含所有生成文件路径的元组
  347. """
  348. base_dir = get_base_dir()
  349. logger.info(f"开始处理: {land_type}")
  350. # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
  351. cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
  352. land_type,
  353. coefficient_params,
  354. save_csv=save_csv # 根据参数决定是否生成CSV文件
  355. )
  356. if cleaned_data is None:
  357. logger.error(f"处理土地数据失败: {land_type}")
  358. return None, None, None, None, None, None
  359. # 2. 获取动态边界数据(提前获取用于栅格转换)
  360. boundary_gdf = None
  361. if area and level:
  362. logger.info(f"尝试获取动态边界: area='{area}', level='{level}'")
  363. boundary_gdf = get_boundary_gdf_from_database(area, level)
  364. if boundary_gdf is not None:
  365. logger.info(f"成功获取动态边界用于栅格转换: {area} ({level}) - 包含 {len(boundary_gdf)} 个几何体")
  366. logger.info(f"边界数据范围: {boundary_gdf.bounds.to_dict('records')[0] if len(boundary_gdf) > 0 else 'N/A'}")
  367. else:
  368. logger.warning(f"未能获取动态边界,栅格转换将使用完整范围: {area} ({level})")
  369. logger.warning("可能的原因: 1) 数据库中没有该地区数据 2) 地区名称拼写错误 3) level参数不正确")
  370. else:
  371. logger.info("未指定area和level参数,尝试获取默认边界(乐昌市)")
  372. boundary_gdf = get_boundary_gdf_from_database("乐昌市", "county")
  373. if boundary_gdf is not None:
  374. logger.info(f"成功获取默认边界(乐昌市)用于栅格转换 - 包含 {len(boundary_gdf)} 个几何体")
  375. else:
  376. logger.info("未能获取默认边界,栅格转换将使用数据点范围")
  377. # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
  378. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  379. template_tif = os.path.join(raster_dir, "meanTemp.tif")
  380. output_dir = raster_dir # 直接使用raster目录作为输出目录
  381. # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
  382. workflow_result = dataframe_to_raster_workflow(
  383. df=cleaned_data, # 直接使用DataFrame
  384. template_tif=template_tif,
  385. output_dir=output_dir,
  386. boundary_gdf=boundary_gdf, # 直接使用GeoDataFrame
  387. resolution_factor=resolution_factor, # 可配置的分辨率因子
  388. interpolation_method=interpolation_method, # 可配置的插值方法
  389. field_name='Prediction',
  390. lon_col=0, # DataFrame中经度列索引
  391. lat_col=1, # DataFrame中纬度列索引
  392. value_col=2, # DataFrame中数值列索引
  393. enable_interpolation=enable_interpolation # 可配置的插值开关
  394. )
  395. # 获取输出的栅格文件路径
  396. output_tif = workflow_result['raster']
  397. logger.info(f"生成栅格文件: {output_tif}")
  398. # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
  399. map_output = os.path.join(raster_dir, f"{land_type}_Cd含量地图.jpg")
  400. try:
  401. # 创建MappingUtils实例并直接调用
  402. mapper = MappingUtils()
  403. # 转换颜色方案名称
  404. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  405. # 统一使用boundary_gdf,不再依赖Shapefile
  406. map_result = mapper.create_raster_map(
  407. shp_path=None, # 不使用Shapefile
  408. tif_path=output_tif,
  409. output_path=os.path.splitext(map_output)[0], # 去掉扩展名
  410. colormap=colormap_key,
  411. title="Irrigation water input flux",
  412. output_size=output_size,
  413. dpi=300,
  414. enable_interpolation=enable_interpolation, # 可配置的插值开关
  415. interpolation_method=interpolation_method, # 可配置的插值方法
  416. boundary_gdf=boundary_gdf # 使用从数据库获取的边界数据,如果为None则使用数据点范围
  417. )
  418. # 如果生成的路径与期望不同,则重命名
  419. if map_result != map_output and os.path.exists(map_result):
  420. shutil.move(map_result, map_output)
  421. logger.info(f"栅格地图已生成: {map_output}")
  422. except Exception as e:
  423. logger.error(f"栅格地图生成失败: {str(e)}")
  424. # 5. 生成直方图(直接使用统一的MappingUtils接口)
  425. hist_output = os.path.join(raster_dir, f"{land_type}_Cd含量直方图.jpg")
  426. try:
  427. # 直接调用统一绘图接口生成直方图
  428. hist_result = mapper.create_histogram(
  429. file_path=output_tif,
  430. save_path=hist_output,
  431. figsize=(8, 8),
  432. xlabel='Cd(ug/L)',
  433. ylabel='frequency density',
  434. title='Irrigation water input flux',
  435. bins=100,
  436. dpi=300
  437. )
  438. if hist_result:
  439. logger.info(f"直方图已生成: {hist_output}")
  440. except Exception as e:
  441. logger.error(f"直方图生成失败: {str(e)}")
  442. # 使用实际的CSV路径(如果生成了)或生成兼容性路径
  443. if cleaned_csv_path:
  444. cleaned_csv = cleaned_csv_path
  445. else:
  446. # 为了兼容性,生成路径(即使文件不存在)
  447. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  448. cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  449. # 清理临时文件(如果启用)
  450. if cleanup_temp_files:
  451. logger.info("开始清理临时文件...")
  452. # 要清理的临时文件列表
  453. temp_files_to_cleanup = []
  454. # 添加临时栅格文件(如果是memory_raster_开头的)
  455. if output_tif and 'memory_raster_' in os.path.basename(output_tif):
  456. temp_files_to_cleanup.append(output_tif)
  457. # 添加临时shapefile(如果存在且是临时文件)
  458. temp_shapefile = workflow_result.get('shapefile')
  459. if temp_shapefile and ('temp' in temp_shapefile.lower() or 'memory' in temp_shapefile.lower()):
  460. temp_files_to_cleanup.append(temp_shapefile)
  461. # 如果不保存CSV,也清理CSV文件
  462. if not save_csv and cleaned_csv_path and os.path.exists(cleaned_csv_path):
  463. temp_files_to_cleanup.append(cleaned_csv_path)
  464. # 执行清理
  465. if temp_files_to_cleanup:
  466. cleanup_temporary_files(*temp_files_to_cleanup)
  467. logger.info(f"已清理 {len(temp_files_to_cleanup)} 个临时文件")
  468. # 如果清理了栅格文件,将返回路径设为None以避免引用已删除的文件
  469. if output_tif in temp_files_to_cleanup:
  470. output_tif = None
  471. logger.info("注意:临时栅格文件已被清理,返回的栅格路径为None")
  472. else:
  473. logger.info("没有临时文件需要清理")
  474. return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
  475. def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
  476. """
  477. 获取指定土地类型的Cd预测结果统计信息
  478. @param {str} land_type - 土地类型(水田、旱地或水浇地)
  479. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  480. """
  481. try:
  482. logger.info(f"获取土地类型统计信息: {land_type}")
  483. # 获取基础目录
  484. base_dir = get_base_dir()
  485. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  486. # 构建数据文件路径
  487. data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  488. logger.info(f"数据文件路径: {data_file}")
  489. if not os.path.exists(data_file):
  490. logger.warning(f"未找到土地类型数据文件: {data_file}")
  491. return None
  492. # 读取预测数据
  493. df = pd.read_csv(data_file)
  494. logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
  495. # 检查必要的列是否存在
  496. if 'Prediction' not in df.columns:
  497. logger.warning("数据文件中缺少'Prediction'列")
  498. return None
  499. predictions = df['Prediction']
  500. # 计算基础统计信息
  501. stats = {
  502. "土地类型": land_type,
  503. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'),
  504. "数据点总数": len(predictions),
  505. "均值": float(predictions.mean()),
  506. "中位数": float(predictions.median()),
  507. "标准差": float(predictions.std()),
  508. "最小值": float(predictions.min()),
  509. "最大值": float(predictions.max()),
  510. "25%分位数": float(predictions.quantile(0.25)),
  511. "75%分位数": float(predictions.quantile(0.75)),
  512. "偏度": float(predictions.skew()),
  513. "峰度": float(predictions.kurtosis())
  514. }
  515. return stats
  516. except Exception as e:
  517. logger.error(f"获取土地类型统计信息失败: {str(e)}")
  518. return None
  519. # 主函数
  520. def main():
  521. """主函数用于测试完整的处理流程"""
  522. logger.info("=" * 50)
  523. logger.info("土地数据处理与可视化系统")
  524. logger.info("=" * 50)
  525. try:
  526. # 处理水田数据
  527. logger.info("\n===== 处理水田数据 =====")
  528. results = process_land_to_visualization("水田")
  529. if results and all(results):
  530. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  531. logger.info(f"清洗后CSV: {cleaned_csv}")
  532. logger.info(f"Shapefile: {shapefile}")
  533. logger.info(f"GeoTIFF文件: {tif_file}")
  534. logger.info(f"地图图片: {map_file}")
  535. logger.info(f"直方图: {hist_file}")
  536. logger.info(f"使用系数: {used_coeff}")
  537. else:
  538. logger.error("水田数据处理失败")
  539. # 处理旱地数据(使用自定义参数)
  540. logger.info("\n===== 处理旱地数据 =====")
  541. custom_params = {"旱地": (220, 0.65)}
  542. results = process_land_to_visualization(
  543. "旱地",
  544. coefficient_params=custom_params,
  545. color_map_name="蓝色系"
  546. )
  547. if results and all(results):
  548. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  549. logger.info(f"清洗后CSV: {cleaned_csv}")
  550. logger.info(f"Shapefile: {shapefile}")
  551. logger.info(f"GeoTIFF文件: {tif_file}")
  552. logger.info(f"地图图片: {map_file}")
  553. logger.info(f"直方图: {hist_file}")
  554. logger.info(f"使用系数: {used_coeff}")
  555. else:
  556. logger.error("旱地数据处理失败")
  557. except Exception as e:
  558. logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
  559. finally:
  560. # 清理临时文件
  561. base_dir = get_base_dir()
  562. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  563. cleaned_count = cleanup_temp_files_in_directory(raster_dir)
  564. if cleaned_count > 0:
  565. logger.info(f"已清理 {cleaned_count} 个临时文件")
  566. logger.info("处理完成")
  567. if __name__ == "__main__":
  568. main()