water_service.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. import os
  2. import geopandas as gpd
  3. import pandas as pd
  4. from pyproj import Transformer
  5. from shapely.geometry import Point
  6. import rasterio
  7. from typing import Optional, Dict, Any, List
  8. from datetime import datetime
  9. import numpy as np
  10. import logging
  11. import shutil
  12. import sys
  13. from sklearn.neighbors import BallTree
  14. from time import time
  15. # 导入MappingUtils
  16. from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
  17. # 导入土地利用服务
  18. from .land_use_service import land_use_service
  19. # 导入边界服务
  20. from .admin_boundary_service import get_boundary_gdf_by_name
  21. from ..database import SessionLocal
  22. import geopandas as gpd
  23. import tempfile
  24. # 配置日志
  25. logger = logging.getLogger(__name__)
  26. # 设置全局字体
  27. import matplotlib.pyplot as plt
  28. plt.rcParams['font.family'] = 'Arial'
  29. plt.rcParams['axes.unicode_minus'] = False
  30. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
  31. # 设定常用的colormap(与mapping_utils保持一致)
  32. COLORMAPS = {
  33. "yellow_orange_brown": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
  34. "blue_series": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
  35. "yellow_green": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
  36. "green_brown": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
  37. "yellow_pink_purple": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
  38. "green_yellow_red_purple": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
  39. }
  40. # 建立中文到英文的colormap映射
  41. COLORMAP_MAPPING = {
  42. "黄-橙-棕": "yellow_orange_brown",
  43. "蓝色系": "blue_series",
  44. "淡黄-草绿": "yellow_green",
  45. "绿色-棕色": "green_brown",
  46. "黄-粉-紫": "yellow_pink_purple",
  47. "绿-黄-红-紫": "green_yellow_red_purple"
  48. }
  49. # 主路径配置
  50. def get_base_dir():
  51. """获取基础目录路径"""
  52. if getattr(sys, 'frozen', False):
  53. # 打包后的可执行文件
  54. return os.path.dirname(sys.executable)
  55. else:
  56. # 脚本运行模式
  57. return os.path.dirname(os.path.abspath(__file__))
  58. def standardize_land_types_order(land_types: List[str]) -> List[str]:
  59. """
  60. 标准化土地类型顺序,确保文件名一致性
  61. @param land_types: 土地类型列表
  62. @returns: 按标准顺序排序的土地类型列表
  63. """
  64. # 定义标准顺序
  65. standard_order = ["水田", "旱地", "水浇地"]
  66. # 清理并标准化
  67. cleaned_types = [lt.strip() for lt in land_types if lt.strip()]
  68. # 按标准顺序排序
  69. standardized_types = sorted(
  70. cleaned_types,
  71. key=lambda x: standard_order.index(x) if x in standard_order else 999
  72. )
  73. return standardized_types
  74. def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
  75. """
  76. 直接从数据库获取边界数据作为GeoDataFrame
  77. @description: 与其他模块保持一致的边界获取方法
  78. @param area: 地区名称
  79. @param level: 行政层级(county | city | province)
  80. @returns: 边界GeoDataFrame或None
  81. """
  82. try:
  83. with SessionLocal() as db:
  84. norm_area = area.strip()
  85. boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
  86. if boundary_gdf is not None:
  87. logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
  88. return boundary_gdf
  89. except Exception as e:
  90. logger.warning(f"从数据库获取边界数据失败: {str(e)}")
  91. return None
  92. def find_nearest_sampling_points_optimized(land_centers_df: pd.DataFrame,
  93. sampling_points_df: pd.DataFrame) -> np.ndarray:
  94. """
  95. 使用BallTree高效计算每个土地中心点的最近采样点
  96. @description: 使用空间索引优化最近邻搜索,将O(n×m)复杂度降低到O(n×log(m))
  97. @param land_centers_df: 土地中心点数据,包含center_lon和center_lat列
  98. @param sampling_points_df: 采样点数据,包含经度和纬度列
  99. @returns: 每个土地中心点对应的最近采样点索引数组
  100. """
  101. logger.info("开始构建空间索引优化最近邻搜索...")
  102. start_time = time()
  103. # 1. 准备采样点坐标数据(转换为弧度用于BallTree)
  104. sampling_coords = np.radians(sampling_points_df[['经度', '纬度']].values)
  105. # 2. 构建BallTree空间索引
  106. logger.info(f"构建BallTree索引,采样点数量: {len(sampling_coords)}")
  107. tree = BallTree(sampling_coords, metric='haversine')
  108. # 3. 准备土地中心点坐标数据
  109. land_coords = np.radians(land_centers_df[['center_lon', 'center_lat']].values)
  110. # 4. 批量查询最近邻(k=1表示只找最近的一个点)
  111. logger.info(f"批量查询最近邻,土地中心点数量: {len(land_coords)}")
  112. distances, indices = tree.query(land_coords, k=1)
  113. # 5. 提取索引(indices是二维数组,我们只需要第一列)
  114. nearest_indices = indices.flatten()
  115. elapsed_time = time() - start_time
  116. logger.info(f"空间索引搜索完成,耗时: {elapsed_time:.2f}秒")
  117. logger.info(f"平均每个点查询时间: {elapsed_time/len(land_coords)*1000:.2f}毫秒")
  118. return nearest_indices
  119. def cleanup_temporary_files(*file_paths):
  120. """
  121. 清理临时文件
  122. @description: 安全地删除指定的临时文件,支持多种文件类型
  123. @param file_paths: 要删除的文件路径(可变参数)
  124. """
  125. import tempfile
  126. for file_path in file_paths:
  127. if not file_path:
  128. continue
  129. try:
  130. if os.path.exists(file_path) and os.path.isfile(file_path):
  131. os.remove(file_path)
  132. logger.info(f"已清理临时文件: {os.path.basename(file_path)}")
  133. # 如果是shapefile,也删除相关的配套文件
  134. if file_path.endswith('.shp'):
  135. base_path = os.path.splitext(file_path)[0]
  136. for ext in ['.shx', '.dbf', '.prj', '.cpg']:
  137. related_file = base_path + ext
  138. if os.path.exists(related_file):
  139. os.remove(related_file)
  140. logger.info(f"已清理相关文件: {os.path.basename(related_file)}")
  141. except Exception as e:
  142. logger.warning(f"清理文件失败 {file_path}: {str(e)}")
  143. def cleanup_temp_files_in_directory(directory: str, patterns: List[str] = None) -> int:
  144. """
  145. 清理指定目录下的临时文件
  146. @description: 根据文件名模式清理目录中的临时文件
  147. @param directory: 要清理的目录路径
  148. @param patterns: 文件名模式列表,默认为['memory_raster_', 'temp_', 'tmp_']
  149. @returns: 清理的文件数量
  150. """
  151. if patterns is None:
  152. patterns = ['memory_raster_', 'temp_', 'tmp_']
  153. if not os.path.exists(directory) or not os.path.isdir(directory):
  154. logger.warning(f"目录不存在或不是有效目录: {directory}")
  155. return 0
  156. cleaned_count = 0
  157. try:
  158. for filename in os.listdir(directory):
  159. file_path = os.path.join(directory, filename)
  160. # 检查是否是文件
  161. if not os.path.isfile(file_path):
  162. continue
  163. # 检查文件名是否匹配任何模式
  164. should_clean = any(pattern in filename for pattern in patterns)
  165. if should_clean:
  166. try:
  167. os.remove(file_path)
  168. logger.info(f"已清理临时文件: {filename}")
  169. cleaned_count += 1
  170. except Exception as e:
  171. logger.warning(f"清理文件失败 {filename}: {str(e)}")
  172. except Exception as e:
  173. logger.error(f"清理目录失败 {directory}: {str(e)}")
  174. return cleaned_count
  175. # 土地数据处理函数
  176. def process_land_data(land_type, coefficient_params=None, save_csv=True):
  177. """处理单个土地类型数据并生成清洗后的简化数据"""
  178. base_dir = get_base_dir()
  179. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  180. logger.info(f"处理土地类型: {land_type}")
  181. logger.info(f"Excel文件: {xls_file}")
  182. # 从数据库获取土地利用数据,替代原来的SHP文件读取
  183. logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
  184. land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
  185. if land_centers_df is None or land_centers_df.empty:
  186. logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
  187. return None, None, None
  188. logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
  189. logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
  190. # 调用辅助函数完成处理
  191. return complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir)
  192. def process_multiple_land_data(land_types, coefficient_params=None, save_csv=True):
  193. """
  194. 处理多个土地类型数据并生成合并的清洗后简化数据
  195. @param land_types: 土地类型列表
  196. @param coefficient_params: 系数参数字典,格式为 {land_type: (param1, param2)}
  197. @param save_csv: 是否保存CSV文件
  198. @returns: (cleaned_data, combined_coeff_info, cleaned_csv_path)
  199. """
  200. base_dir = get_base_dir()
  201. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  202. logger.info(f"处理多个土地类型: {', '.join(land_types)}")
  203. logger.info(f"Excel文件: {xls_file}")
  204. # 从数据库获取多个土地类型的合并数据
  205. logger.info(f"从数据库获取多个土地类型的土地利用数据...")
  206. land_centers_df = land_use_service.get_multiple_land_centers_for_processing(land_types)
  207. if land_centers_df is None or land_centers_df.empty:
  208. logger.warning(f"数据库中没有找到任何指定土地类型的数据: {land_types}")
  209. return None, None, None
  210. logger.info(f"从数据库获取到 {len(land_centers_df)} 个合并的土地数据点")
  211. logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
  212. # 读取Excel采样点数据
  213. if not os.path.exists(xls_file):
  214. logger.error(f"采样点Excel文件不存在: {xls_file}")
  215. return None, None, None
  216. df_xls = pd.read_excel(xls_file)
  217. logger.info(f"读取到 {len(df_xls)} 个采样点数据")
  218. # 设置默认土地类型系数
  219. default_params = {
  220. "水田": (711, 0.524),
  221. "水浇地": (427, 0.599),
  222. "旱地": (200, 0.7)
  223. }
  224. params = coefficient_params or default_params
  225. # 为多个土地类型构建系数信息
  226. combined_coeff_info = {}
  227. # 高效处理:使用空间索引查找最近采样点
  228. logger.info("开始高效距离计算和Cd值计算...")
  229. start_time = time()
  230. # 使用优化的空间索引方法查找最近采样点
  231. nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
  232. # 批量计算Cd含量值,按土地类型应用不同系数
  233. centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
  234. cd_values = []
  235. for i, (_, row) in enumerate(land_centers_df.iterrows()):
  236. land_type = row['land_type']
  237. param1, param2 = params.get(land_type, (200, 0.7))
  238. Num = param1 * param2
  239. # 记录每种土地类型使用的系数
  240. if land_type not in combined_coeff_info:
  241. combined_coeff_info[land_type] = {
  242. 'param1': param1,
  243. 'param2': param2,
  244. 'multiplier': Num,
  245. 'count': 0
  246. }
  247. combined_coeff_info[land_type]['count'] += 1
  248. # 计算该点的Cd值
  249. cd_value = df_xls.iloc[nearest_indices[i]]['Cd (ug/L)'] * Num
  250. cd_values.append(cd_value)
  251. calculation_time = time() - start_time
  252. logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
  253. logger.info(f"处理了 {len(centers)} 个土地中心点")
  254. # 记录各土地类型的系数使用情况
  255. for land_type, info in combined_coeff_info.items():
  256. logger.info(f"{land_type}: 系数 {info['param1']} * {info['param2']} = {info['multiplier']}, 应用于 {info['count']} 个点")
  257. # 创建简化数据DataFrame
  258. simplified_data = pd.DataFrame({
  259. 'lon': [c[0] for c in centers],
  260. 'lat': [c[1] for c in centers],
  261. 'Prediction': cd_values,
  262. 'land_type': land_centers_df['land_type'].values # 保留土地类型信息用于分析
  263. })
  264. # 应用3σ原则检测异常值
  265. mean_value = simplified_data['Prediction'].mean()
  266. std_value = simplified_data['Prediction'].std()
  267. lower_bound = mean_value - 3 * std_value
  268. upper_bound = mean_value + 3 * std_value
  269. logger.info(f"合并数据Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
  270. logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
  271. # 识别异常值
  272. outliers = simplified_data[
  273. (simplified_data['Prediction'] < lower_bound) |
  274. (simplified_data['Prediction'] > upper_bound)
  275. ]
  276. logger.info(f"检测到异常值数量: {len(outliers)}")
  277. # 创建清洗后的数据
  278. cleaned_data = simplified_data[
  279. (simplified_data['Prediction'] >= lower_bound) &
  280. (simplified_data['Prediction'] <= upper_bound)
  281. ]
  282. logger.info(f"清洗后数据点数: {len(cleaned_data)}")
  283. # 可选:保存CSV文件用于兼容性和调试
  284. cleaned_csv_path = None
  285. if save_csv:
  286. # 创建输出目录
  287. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  288. os.makedirs(data_dir, exist_ok=True)
  289. logger.info(f"数据目录: {data_dir}")
  290. # 使用组合的土地类型名称
  291. combined_type_name = '_'.join(land_types)
  292. cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
  293. # 移除land_type列(仅用于内部处理),保持与原始格式兼容
  294. cleaned_data_for_csv = cleaned_data[['lon', 'lat', 'Prediction']].copy()
  295. cleaned_data_for_csv.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
  296. logger.info(f"保存CSV: {cleaned_csv_path}")
  297. else:
  298. logger.info("跳过CSV文件生成(内存处理优化)")
  299. # 返回清洗后的数据(移除land_type列保持兼容性)
  300. final_cleaned_data = cleaned_data[['lon', 'lat', 'Prediction']].copy()
  301. return final_cleaned_data, combined_coeff_info, cleaned_csv_path
  302. # 继续完成原始的process_land_data函数
  303. def complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir):
  304. """完成单个土地类型的数据处理"""
  305. xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
  306. # 读取Excel采样点数据
  307. if not os.path.exists(xls_file):
  308. logger.error(f"采样点Excel文件不存在: {xls_file}")
  309. return None, None, None
  310. df_xls = pd.read_excel(xls_file)
  311. logger.info(f"读取到 {len(df_xls)} 个采样点数据")
  312. # 设置土地类型系数
  313. default_params = {
  314. "水田": (711, 0.524),
  315. "水浇地": (427, 0.599),
  316. "旱地": (200, 0.7)
  317. }
  318. params = coefficient_params or default_params
  319. param1, param2 = params.get(land_type, (200, 0.7))
  320. Num = param1 * param2
  321. logger.info(f"系数: {param1} * {param2} = {Num}")
  322. # 高效处理:使用空间索引查找最近采样点
  323. logger.info("开始高效距离计算和Cd值计算...")
  324. start_time = time()
  325. # 使用优化的空间索引方法查找最近采样点
  326. nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
  327. # 批量计算Cd含量值
  328. centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
  329. cd_values = df_xls.iloc[nearest_indices]['Cd (ug/L)'].values * Num
  330. calculation_time = time() - start_time
  331. logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
  332. logger.info(f"处理了 {len(centers)} 个土地中心点")
  333. # 创建简化数据DataFrame
  334. simplified_data = pd.DataFrame({
  335. 'lon': [c[0] for c in centers],
  336. 'lat': [c[1] for c in centers],
  337. 'Prediction': cd_values
  338. })
  339. # 应用3σ原则检测异常值
  340. mean_value = simplified_data['Prediction'].mean()
  341. std_value = simplified_data['Prediction'].std()
  342. lower_bound = mean_value - 3 * std_value
  343. upper_bound = mean_value + 3 * std_value
  344. logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
  345. logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
  346. # 识别异常值
  347. outliers = simplified_data[
  348. (simplified_data['Prediction'] < lower_bound) |
  349. (simplified_data['Prediction'] > upper_bound)
  350. ]
  351. logger.info(f"检测到异常值数量: {len(outliers)}")
  352. # 创建清洗后的数据
  353. cleaned_data = simplified_data[
  354. (simplified_data['Prediction'] >= lower_bound) &
  355. (simplified_data['Prediction'] <= upper_bound)
  356. ]
  357. logger.info(f"清洗后数据点数: {len(cleaned_data)}")
  358. # 可选:保存CSV文件用于兼容性和调试
  359. cleaned_csv_path = None
  360. if save_csv:
  361. # 创建输出目录
  362. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  363. os.makedirs(data_dir, exist_ok=True)
  364. logger.info(f"数据目录: {data_dir}")
  365. cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  366. cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
  367. logger.info(f"保存CSV: {cleaned_csv_path}")
  368. else:
  369. logger.info("跳过CSV文件生成(内存处理优化)")
  370. return cleaned_data, Num, cleaned_csv_path # 返回DataFrame, 系数, 和CSV路径(如果生成了)
  371. # 可视化函数(完全使用统一的MappingUtils接口)
  372. def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
  373. """
  374. 生成栅格地图可视化
  375. @description: 使用统一的MappingUtils接口生成栅格地图
  376. @param shp_path: 边界shapefile路径
  377. @param tif_path: 栅格数据文件路径
  378. @param color_map_name: 色彩方案名称(中文)
  379. @param title_name: 地图标题
  380. @param output_path: 输出图片路径
  381. @param output_size: 图片尺寸
  382. @returns: 输出图片路径或None(失败时)
  383. """
  384. try:
  385. logger.info(f"生成栅格地图: {title_name}")
  386. # 确保输出目录存在
  387. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  388. # 创建MappingUtils实例
  389. mapper = MappingUtils()
  390. # 转换颜色方案名称
  391. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  392. # 直接调用统一的绘图接口,无需额外包装
  393. generated_path = mapper.create_raster_map(
  394. shp_path=shp_path,
  395. tif_path=tif_path,
  396. output_path=os.path.splitext(output_path)[0], # 去掉扩展名,MappingUtils会自动添加.jpg
  397. colormap=colormap_key,
  398. title=title_name,
  399. output_size=output_size,
  400. figsize=None,
  401. dpi=300,
  402. enable_interpolation=False,
  403. interpolation_method='linear'
  404. )
  405. # 如果生成的路径与期望路径不同,则重命名
  406. if generated_path != output_path and os.path.exists(generated_path):
  407. shutil.move(generated_path, output_path)
  408. logger.info(f"重命名图片: {generated_path} -> {output_path}")
  409. return output_path
  410. return generated_path
  411. except Exception as e:
  412. logger.error(f"栅格地图生成失败: {str(e)}")
  413. return None
  414. def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
  415. xlabel='Cd(ug/L)', ylabel='frequency density',
  416. title='Irrigation water input flux'):
  417. """
  418. 生成TIFF文件的直方图
  419. @description: 使用统一的MappingUtils接口生成直方图
  420. @param file_path: TIFF文件路径
  421. @param output_path: 输出图片路径
  422. @param figsize: 图片尺寸元组
  423. @param xlabel: X轴标签
  424. @param ylabel: Y轴标签
  425. @param title: 图表标题
  426. @returns: 输出图片路径或None(失败时)
  427. """
  428. try:
  429. logger.info(f"生成直方图: {file_path}")
  430. # 创建MappingUtils实例
  431. mapper = MappingUtils()
  432. # 直接调用统一的绘图接口,无需额外包装
  433. generated_path = mapper.create_histogram(
  434. file_path=file_path,
  435. save_path=output_path,
  436. figsize=figsize,
  437. xlabel=xlabel,
  438. ylabel=ylabel,
  439. title=title,
  440. bins=100,
  441. dpi=300
  442. )
  443. return generated_path
  444. except Exception as e:
  445. logger.error(f"直方图生成失败: {str(e)}")
  446. return None
  447. def process_land_to_visualization(land_types=None, land_type=None, coefficient_params=None,
  448. color_map_name="绿-黄-红-紫",
  449. output_size=8,
  450. area: Optional[str] = None,
  451. level: Optional[str] = None,
  452. enable_interpolation: Optional[bool] = True,
  453. interpolation_method: Optional[str] = "linear",
  454. resolution_factor: Optional[float] = 4.0,
  455. save_csv: Optional[bool] = True,
  456. cleanup_temp_files: Optional[bool] = True):
  457. """
  458. 完整的土地数据处理可视化流程(支持单个或多个土地类型,使用统一的MappingUtils接口,支持动态边界和插值控制)
  459. @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
  460. 支持通过area和level参数动态控制地图边界,支持插值控制参数,支持多个土地类型合并处理
  461. 工作流程:
  462. 1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量(支持多个类型合并)
  463. 2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
  464. 3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
  465. 4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
  466. 5. 生成直方图 - 直接调用MappingUtils.create_histogram()
  467. @param land_types: 土地类型列表(支持多个,如['水田', '旱地']),优先使用此参数
  468. @param land_type: 单个土地类型(向后兼容,如果land_types为None则使用此参数)
  469. @param coefficient_params: 可选的系数参数字典
  470. @param color_map_name: 色彩方案名称(中文)
  471. @param output_size: 输出图片尺寸
  472. @param area: 可选的地区名称,如"乐昌市",用于动态控制地图边界
  473. @param level: 可选的行政层级,county | city | province
  474. @param enable_interpolation: 是否启用空间插值,默认True
  475. @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
  476. @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
  477. @param save_csv: 是否生成CSV文件,默认True
  478. @param cleanup_temp_files: 是否清理临时文件,默认True
  479. @returns: 包含所有生成文件路径的元组
  480. """
  481. base_dir = get_base_dir()
  482. # 处理参数兼容性 - 支持单个和多个土地类型
  483. if land_types is not None:
  484. # 使用新的多类型参数,并标准化顺序
  485. input_types = land_types if isinstance(land_types, list) else [land_types]
  486. actual_land_types = standardize_land_types_order(input_types)
  487. logger.info(f"开始处理多个土地类型: {', '.join(actual_land_types)}")
  488. is_multiple_types = len(actual_land_types) > 1
  489. elif land_type is not None:
  490. # 向后兼容单个类型参数
  491. actual_land_types = [land_type.strip()]
  492. logger.info(f"开始处理单个土地类型: {land_type}")
  493. is_multiple_types = False
  494. else:
  495. raise ValueError("必须提供land_types或land_type参数")
  496. # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
  497. if is_multiple_types:
  498. # 使用多类型处理函数
  499. cleaned_data, used_coeff, cleaned_csv_path = process_multiple_land_data(
  500. actual_land_types,
  501. coefficient_params,
  502. save_csv=save_csv
  503. )
  504. else:
  505. # 使用单类型处理函数
  506. cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
  507. actual_land_types[0],
  508. coefficient_params,
  509. save_csv=save_csv
  510. )
  511. if cleaned_data is None:
  512. logger.error(f"处理土地数据失败: {', '.join(actual_land_types)}")
  513. return None, None, None, None, None, None
  514. # 2. 获取动态边界数据(提前获取用于栅格转换)
  515. boundary_gdf = None
  516. if area and level:
  517. logger.info(f"尝试获取动态边界: area='{area}', level='{level}'")
  518. boundary_gdf = get_boundary_gdf_from_database(area, level)
  519. if boundary_gdf is not None:
  520. logger.info(f"成功获取动态边界用于栅格转换: {area} ({level}) - 包含 {len(boundary_gdf)} 个几何体")
  521. logger.info(f"边界数据范围: {boundary_gdf.bounds.to_dict('records')[0] if len(boundary_gdf) > 0 else 'N/A'}")
  522. else:
  523. logger.warning(f"未能获取动态边界,栅格转换将使用完整范围: {area} ({level})")
  524. logger.warning("可能的原因: 1) 数据库中没有该地区数据 2) 地区名称拼写错误 3) level参数不正确")
  525. else:
  526. logger.info("未指定area和level参数,尝试获取默认边界(乐昌市)")
  527. boundary_gdf = get_boundary_gdf_from_database("乐昌市", "county")
  528. if boundary_gdf is not None:
  529. logger.info(f"成功获取默认边界(乐昌市)用于栅格转换 - 包含 {len(boundary_gdf)} 个几何体")
  530. else:
  531. logger.info("未能获取默认边界,栅格转换将使用数据点范围")
  532. # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
  533. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  534. template_tif = os.path.join(raster_dir, "meanTemp.tif")
  535. output_dir = raster_dir # 直接使用raster目录作为输出目录
  536. # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
  537. workflow_result = dataframe_to_raster_workflow(
  538. df=cleaned_data, # 直接使用DataFrame
  539. template_tif=template_tif,
  540. output_dir=output_dir,
  541. boundary_gdf=boundary_gdf, # 直接使用GeoDataFrame
  542. resolution_factor=resolution_factor, # 可配置的分辨率因子
  543. interpolation_method=interpolation_method, # 可配置的插值方法
  544. field_name='Prediction',
  545. lon_col=0, # DataFrame中经度列索引
  546. lat_col=1, # DataFrame中纬度列索引
  547. value_col=2, # DataFrame中数值列索引
  548. enable_interpolation=enable_interpolation # 可配置的插值开关
  549. )
  550. # 获取输出的栅格文件路径
  551. output_tif = workflow_result['raster']
  552. logger.info(f"生成栅格文件: {output_tif}")
  553. # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
  554. # 生成输出文件名 - 支持多个土地类型
  555. combined_type_name = '_'.join(actual_land_types)
  556. map_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量地图.jpg")
  557. try:
  558. # 创建MappingUtils实例并直接调用
  559. mapper = MappingUtils()
  560. # 转换颜色方案名称
  561. colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
  562. # 统一使用boundary_gdf,不再依赖Shapefile
  563. map_result = mapper.create_raster_map(
  564. shp_path=None, # 不使用Shapefile
  565. tif_path=output_tif,
  566. output_path=os.path.splitext(map_output)[0], # 去掉扩展名
  567. colormap=colormap_key,
  568. title="Irrigation water input flux",
  569. output_size=output_size,
  570. dpi=300,
  571. enable_interpolation=enable_interpolation, # 可配置的插值开关
  572. interpolation_method=interpolation_method, # 可配置的插值方法
  573. boundary_gdf=boundary_gdf # 使用从数据库获取的边界数据,如果为None则使用数据点范围
  574. )
  575. # 如果生成的路径与期望不同,则重命名
  576. if map_result != map_output and os.path.exists(map_result):
  577. shutil.move(map_result, map_output)
  578. logger.info(f"栅格地图已生成: {map_output}")
  579. except Exception as e:
  580. logger.error(f"栅格地图生成失败: {str(e)}")
  581. # 5. 生成直方图(直接使用统一的MappingUtils接口)
  582. hist_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量直方图.jpg")
  583. try:
  584. # 直接调用统一绘图接口生成直方图
  585. hist_result = mapper.create_histogram(
  586. file_path=output_tif,
  587. save_path=hist_output,
  588. figsize=(8, 8),
  589. xlabel='Cd(ug/L)',
  590. ylabel='frequency density',
  591. title='Irrigation water input flux',
  592. bins=100,
  593. dpi=300
  594. )
  595. if hist_result:
  596. logger.info(f"直方图已生成: {hist_output}")
  597. except Exception as e:
  598. logger.error(f"直方图生成失败: {str(e)}")
  599. # 使用实际的CSV路径(如果生成了)或生成兼容性路径
  600. if cleaned_csv_path:
  601. cleaned_csv = cleaned_csv_path
  602. else:
  603. # 为了兼容性,生成路径(即使文件不存在)
  604. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  605. cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
  606. # 清理临时文件(如果启用)
  607. if cleanup_temp_files:
  608. logger.info("开始清理临时文件...")
  609. # 要清理的临时文件列表
  610. temp_files_to_cleanup = []
  611. # 添加临时栅格文件(如果是memory_raster_开头的)
  612. if output_tif and 'memory_raster_' in os.path.basename(output_tif):
  613. temp_files_to_cleanup.append(output_tif)
  614. # 添加临时shapefile(如果存在且是临时文件)
  615. temp_shapefile = workflow_result.get('shapefile')
  616. if temp_shapefile and ('temp' in temp_shapefile.lower() or 'memory' in temp_shapefile.lower()):
  617. temp_files_to_cleanup.append(temp_shapefile)
  618. # 如果不保存CSV,也清理CSV文件
  619. if not save_csv and cleaned_csv_path and os.path.exists(cleaned_csv_path):
  620. temp_files_to_cleanup.append(cleaned_csv_path)
  621. # 执行清理
  622. if temp_files_to_cleanup:
  623. cleanup_temporary_files(*temp_files_to_cleanup)
  624. logger.info(f"已清理 {len(temp_files_to_cleanup)} 个临时文件")
  625. # 如果清理了栅格文件,将返回路径设为None以避免引用已删除的文件
  626. if output_tif in temp_files_to_cleanup:
  627. output_tif = None
  628. logger.info("注意:临时栅格文件已被清理,返回的栅格路径为None")
  629. else:
  630. logger.info("没有临时文件需要清理")
  631. return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
  632. def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
  633. """
  634. 获取指定土地类型的Cd预测结果统计信息
  635. @param {str} land_type - 土地类型(水田、旱地或水浇地)
  636. @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
  637. """
  638. try:
  639. logger.info(f"获取土地类型统计信息: {land_type}")
  640. # 获取基础目录
  641. base_dir = get_base_dir()
  642. data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
  643. # 构建数据文件路径
  644. data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
  645. logger.info(f"数据文件路径: {data_file}")
  646. if not os.path.exists(data_file):
  647. logger.warning(f"未找到土地类型数据文件: {data_file}")
  648. return None
  649. # 读取预测数据
  650. df = pd.read_csv(data_file)
  651. logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
  652. # 检查必要的列是否存在
  653. if 'Prediction' not in df.columns:
  654. logger.warning("数据文件中缺少'Prediction'列")
  655. return None
  656. predictions = df['Prediction']
  657. # 计算基础统计信息
  658. stats = {
  659. "土地类型": land_type,
  660. "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'),
  661. "数据点总数": len(predictions),
  662. "均值": float(predictions.mean()),
  663. "中位数": float(predictions.median()),
  664. "标准差": float(predictions.std()),
  665. "最小值": float(predictions.min()),
  666. "最大值": float(predictions.max()),
  667. "25%分位数": float(predictions.quantile(0.25)),
  668. "75%分位数": float(predictions.quantile(0.75)),
  669. "偏度": float(predictions.skew()),
  670. "峰度": float(predictions.kurtosis())
  671. }
  672. return stats
  673. except Exception as e:
  674. logger.error(f"获取土地类型统计信息失败: {str(e)}")
  675. return None
  676. # 主函数
  677. def main():
  678. """主函数用于测试完整的处理流程"""
  679. logger.info("=" * 50)
  680. logger.info("土地数据处理与可视化系统")
  681. logger.info("=" * 50)
  682. try:
  683. # 处理水田数据
  684. logger.info("\n===== 处理水田数据 =====")
  685. results = process_land_to_visualization("水田")
  686. if results and all(results):
  687. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  688. logger.info(f"清洗后CSV: {cleaned_csv}")
  689. logger.info(f"Shapefile: {shapefile}")
  690. logger.info(f"GeoTIFF文件: {tif_file}")
  691. logger.info(f"地图图片: {map_file}")
  692. logger.info(f"直方图: {hist_file}")
  693. logger.info(f"使用系数: {used_coeff}")
  694. else:
  695. logger.error("水田数据处理失败")
  696. # 处理旱地数据(使用自定义参数)
  697. logger.info("\n===== 处理旱地数据 =====")
  698. custom_params = {"旱地": (220, 0.65)}
  699. results = process_land_to_visualization(
  700. "旱地",
  701. coefficient_params=custom_params,
  702. color_map_name="蓝色系"
  703. )
  704. if results and all(results):
  705. cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
  706. logger.info(f"清洗后CSV: {cleaned_csv}")
  707. logger.info(f"Shapefile: {shapefile}")
  708. logger.info(f"GeoTIFF文件: {tif_file}")
  709. logger.info(f"地图图片: {map_file}")
  710. logger.info(f"直方图: {hist_file}")
  711. logger.info(f"使用系数: {used_coeff}")
  712. else:
  713. logger.error("旱地数据处理失败")
  714. except Exception as e:
  715. logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
  716. finally:
  717. # 清理临时文件
  718. base_dir = get_base_dir()
  719. raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
  720. cleaned_count = cleanup_temp_files_in_directory(raster_dir)
  721. if cleaned_count > 0:
  722. logger.info(f"已清理 {cleaned_count} 个临时文件")
  723. logger.info("处理完成")
  724. if __name__ == "__main__":
  725. main()