123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895 |
- import os
- import geopandas as gpd
- import pandas as pd
- from pyproj import Transformer
- from shapely.geometry import Point
- import rasterio
- from typing import Optional, Dict, Any, List
- from datetime import datetime
- import numpy as np
- import logging
- import shutil
- import sys
- from sklearn.neighbors import BallTree
- from time import time
- # 导入MappingUtils
- from ..utils.mapping_utils import MappingUtils, csv_to_raster_workflow, dataframe_to_raster_workflow
- # 导入土地利用服务
- from .land_use_service import land_use_service
- # 导入边界服务
- from .admin_boundary_service import get_boundary_gdf_by_name
- from ..database import SessionLocal
- import geopandas as gpd
- import tempfile
- # 配置日志
- logger = logging.getLogger(__name__)
- # 设置全局字体
- import matplotlib.pyplot as plt
- plt.rcParams['font.family'] = 'Arial'
- plt.rcParams['axes.unicode_minus'] = False
- plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
- # 设定常用的colormap(与mapping_utils保持一致)
- COLORMAPS = {
- "yellow_orange_brown": ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'],
- "blue_series": ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'],
- "yellow_green": ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'],
- "green_brown": ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'],
- "yellow_pink_purple": ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'],
- "green_yellow_red_purple": ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F']
- }
- # 建立中文到英文的colormap映射
- COLORMAP_MAPPING = {
- "黄-橙-棕": "yellow_orange_brown",
- "蓝色系": "blue_series",
- "淡黄-草绿": "yellow_green",
- "绿色-棕色": "green_brown",
- "黄-粉-紫": "yellow_pink_purple",
- "绿-黄-红-紫": "green_yellow_red_purple"
- }
- # 主路径配置
- def get_base_dir():
- """获取基础目录路径"""
- if getattr(sys, 'frozen', False):
- # 打包后的可执行文件
- return os.path.dirname(sys.executable)
- else:
- # 脚本运行模式
- return os.path.dirname(os.path.abspath(__file__))
- def standardize_land_types_order(land_types: List[str]) -> List[str]:
- """
- 标准化土地类型顺序,确保文件名一致性
-
- @param land_types: 土地类型列表
- @returns: 按标准顺序排序的土地类型列表
- """
- # 定义标准顺序
- standard_order = ["水田", "旱地", "水浇地"]
-
- # 清理并标准化
- cleaned_types = [lt.strip() for lt in land_types if lt.strip()]
-
- # 按标准顺序排序
- standardized_types = sorted(
- cleaned_types,
- key=lambda x: standard_order.index(x) if x in standard_order else 999
- )
-
- return standardized_types
- def get_boundary_gdf_from_database(area: str, level: str) -> Optional[gpd.GeoDataFrame]:
- """
- 直接从数据库获取边界数据作为GeoDataFrame
-
- @description: 与其他模块保持一致的边界获取方法
- @param area: 地区名称
- @param level: 行政层级(county | city | province)
- @returns: 边界GeoDataFrame或None
- """
- try:
- with SessionLocal() as db:
- norm_area = area.strip()
- boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
- if boundary_gdf is not None:
- logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
- return boundary_gdf
-
- except Exception as e:
- logger.warning(f"从数据库获取边界数据失败: {str(e)}")
-
- return None
- def find_nearest_sampling_points_optimized(land_centers_df: pd.DataFrame,
- sampling_points_df: pd.DataFrame) -> np.ndarray:
- """
- 使用BallTree高效计算每个土地中心点的最近采样点
-
- @description: 使用空间索引优化最近邻搜索,将O(n×m)复杂度降低到O(n×log(m))
-
- @param land_centers_df: 土地中心点数据,包含center_lon和center_lat列
- @param sampling_points_df: 采样点数据,包含经度和纬度列
- @returns: 每个土地中心点对应的最近采样点索引数组
- """
- logger.info("开始构建空间索引优化最近邻搜索...")
-
- start_time = time()
-
- # 1. 准备采样点坐标数据(转换为弧度用于BallTree)
- sampling_coords = np.radians(sampling_points_df[['经度', '纬度']].values)
-
- # 2. 构建BallTree空间索引
- logger.info(f"构建BallTree索引,采样点数量: {len(sampling_coords)}")
- tree = BallTree(sampling_coords, metric='haversine')
-
- # 3. 准备土地中心点坐标数据
- land_coords = np.radians(land_centers_df[['center_lon', 'center_lat']].values)
-
- # 4. 批量查询最近邻(k=1表示只找最近的一个点)
- logger.info(f"批量查询最近邻,土地中心点数量: {len(land_coords)}")
- distances, indices = tree.query(land_coords, k=1)
-
- # 5. 提取索引(indices是二维数组,我们只需要第一列)
- nearest_indices = indices.flatten()
-
- elapsed_time = time() - start_time
- logger.info(f"空间索引搜索完成,耗时: {elapsed_time:.2f}秒")
- logger.info(f"平均每个点查询时间: {elapsed_time/len(land_coords)*1000:.2f}毫秒")
-
- return nearest_indices
- def cleanup_temporary_files(*file_paths):
- """
- 清理临时文件
-
- @description: 安全地删除指定的临时文件,支持多种文件类型
- @param file_paths: 要删除的文件路径(可变参数)
- """
- import tempfile
-
- for file_path in file_paths:
- if not file_path:
- continue
-
- try:
- if os.path.exists(file_path) and os.path.isfile(file_path):
- os.remove(file_path)
- logger.info(f"已清理临时文件: {os.path.basename(file_path)}")
-
- # 如果是shapefile,也删除相关的配套文件
- if file_path.endswith('.shp'):
- base_path = os.path.splitext(file_path)[0]
- for ext in ['.shx', '.dbf', '.prj', '.cpg']:
- related_file = base_path + ext
- if os.path.exists(related_file):
- os.remove(related_file)
- logger.info(f"已清理相关文件: {os.path.basename(related_file)}")
-
- except Exception as e:
- logger.warning(f"清理文件失败 {file_path}: {str(e)}")
- def cleanup_temp_files_in_directory(directory: str, patterns: List[str] = None) -> int:
- """
- 清理指定目录下的临时文件
-
- @description: 根据文件名模式清理目录中的临时文件
- @param directory: 要清理的目录路径
- @param patterns: 文件名模式列表,默认为['memory_raster_', 'temp_', 'tmp_']
- @returns: 清理的文件数量
- """
- if patterns is None:
- patterns = ['memory_raster_', 'temp_', 'tmp_']
-
- if not os.path.exists(directory) or not os.path.isdir(directory):
- logger.warning(f"目录不存在或不是有效目录: {directory}")
- return 0
-
- cleaned_count = 0
-
- try:
- for filename in os.listdir(directory):
- file_path = os.path.join(directory, filename)
-
- # 检查是否是文件
- if not os.path.isfile(file_path):
- continue
-
- # 检查文件名是否匹配任何模式
- should_clean = any(pattern in filename for pattern in patterns)
-
- if should_clean:
- try:
- os.remove(file_path)
- logger.info(f"已清理临时文件: {filename}")
- cleaned_count += 1
- except Exception as e:
- logger.warning(f"清理文件失败 {filename}: {str(e)}")
-
- except Exception as e:
- logger.error(f"清理目录失败 {directory}: {str(e)}")
-
- return cleaned_count
- # 土地数据处理函数
- def process_land_data(land_type, coefficient_params=None, save_csv=True):
- """处理单个土地类型数据并生成清洗后的简化数据"""
- base_dir = get_base_dir()
- xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
- logger.info(f"处理土地类型: {land_type}")
- logger.info(f"Excel文件: {xls_file}")
- # 从数据库获取土地利用数据,替代原来的SHP文件读取
- logger.info(f"从数据库获取 '{land_type}' 类型的土地利用数据...")
- land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
-
- if land_centers_df is None or land_centers_df.empty:
- logger.warning(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
- return None, None, None
- logger.info(f"从数据库获取到 {len(land_centers_df)} 个 '{land_type}' 类型的土地数据")
- logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
- # 调用辅助函数完成处理
- return complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir)
- def process_multiple_land_data(land_types, coefficient_params=None, save_csv=True):
- """
- 处理多个土地类型数据并生成合并的清洗后简化数据
-
- @param land_types: 土地类型列表
- @param coefficient_params: 系数参数字典,格式为 {land_type: (param1, param2)}
- @param save_csv: 是否保存CSV文件
- @returns: (cleaned_data, combined_coeff_info, cleaned_csv_path)
- """
- base_dir = get_base_dir()
- xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
- logger.info(f"处理多个土地类型: {', '.join(land_types)}")
- logger.info(f"Excel文件: {xls_file}")
- # 从数据库获取多个土地类型的合并数据
- logger.info(f"从数据库获取多个土地类型的土地利用数据...")
- land_centers_df = land_use_service.get_multiple_land_centers_for_processing(land_types)
-
- if land_centers_df is None or land_centers_df.empty:
- logger.warning(f"数据库中没有找到任何指定土地类型的数据: {land_types}")
- return None, None, None
- logger.info(f"从数据库获取到 {len(land_centers_df)} 个合并的土地数据点")
- logger.info(f"预计需要进行 {len(land_centers_df)} 次最近邻搜索,使用高性能算法处理...")
- # 读取Excel采样点数据
- if not os.path.exists(xls_file):
- logger.error(f"采样点Excel文件不存在: {xls_file}")
- return None, None, None
-
- df_xls = pd.read_excel(xls_file)
- logger.info(f"读取到 {len(df_xls)} 个采样点数据")
- # 设置默认土地类型系数
- default_params = {
- "水田": (711, 0.524),
- "水浇地": (427, 0.599),
- "旱地": (200, 0.7)
- }
- params = coefficient_params or default_params
-
- # 为多个土地类型构建系数信息
- combined_coeff_info = {}
-
- # 高效处理:使用空间索引查找最近采样点
- logger.info("开始高效距离计算和Cd值计算...")
- start_time = time()
-
- # 使用优化的空间索引方法查找最近采样点
- nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
-
- # 批量计算Cd含量值,按土地类型应用不同系数
- centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
- cd_values = []
-
- for i, (_, row) in enumerate(land_centers_df.iterrows()):
- land_type = row['land_type']
- param1, param2 = params.get(land_type, (200, 0.7))
- Num = param1 * param2
-
- # 记录每种土地类型使用的系数
- if land_type not in combined_coeff_info:
- combined_coeff_info[land_type] = {
- 'param1': param1,
- 'param2': param2,
- 'multiplier': Num,
- 'count': 0
- }
- combined_coeff_info[land_type]['count'] += 1
-
- # 计算该点的Cd值
- cd_value = df_xls.iloc[nearest_indices[i]]['Cd (ug/L)'] * Num
- cd_values.append(cd_value)
-
- calculation_time = time() - start_time
- logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
- logger.info(f"处理了 {len(centers)} 个土地中心点")
-
- # 记录各土地类型的系数使用情况
- for land_type, info in combined_coeff_info.items():
- logger.info(f"{land_type}: 系数 {info['param1']} * {info['param2']} = {info['multiplier']}, 应用于 {info['count']} 个点")
- # 创建简化数据DataFrame
- simplified_data = pd.DataFrame({
- 'lon': [c[0] for c in centers],
- 'lat': [c[1] for c in centers],
- 'Prediction': cd_values,
- 'land_type': land_centers_df['land_type'].values # 保留土地类型信息用于分析
- })
- # 应用3σ原则检测异常值
- mean_value = simplified_data['Prediction'].mean()
- std_value = simplified_data['Prediction'].std()
- lower_bound = mean_value - 3 * std_value
- upper_bound = mean_value + 3 * std_value
- logger.info(f"合并数据Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
- logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
- # 识别异常值
- outliers = simplified_data[
- (simplified_data['Prediction'] < lower_bound) |
- (simplified_data['Prediction'] > upper_bound)
- ]
- logger.info(f"检测到异常值数量: {len(outliers)}")
- # 创建清洗后的数据
- cleaned_data = simplified_data[
- (simplified_data['Prediction'] >= lower_bound) &
- (simplified_data['Prediction'] <= upper_bound)
- ]
- logger.info(f"清洗后数据点数: {len(cleaned_data)}")
- # 可选:保存CSV文件用于兼容性和调试
- cleaned_csv_path = None
- if save_csv:
- # 创建输出目录
- data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
- os.makedirs(data_dir, exist_ok=True)
- logger.info(f"数据目录: {data_dir}")
-
- # 使用组合的土地类型名称
- combined_type_name = '_'.join(land_types)
- cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
-
- # 移除land_type列(仅用于内部处理),保持与原始格式兼容
- cleaned_data_for_csv = cleaned_data[['lon', 'lat', 'Prediction']].copy()
- cleaned_data_for_csv.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
- logger.info(f"保存CSV: {cleaned_csv_path}")
- else:
- logger.info("跳过CSV文件生成(内存处理优化)")
- # 返回清洗后的数据(移除land_type列保持兼容性)
- final_cleaned_data = cleaned_data[['lon', 'lat', 'Prediction']].copy()
-
- return final_cleaned_data, combined_coeff_info, cleaned_csv_path
- # 继续完成原始的process_land_data函数
- def complete_process_land_data(land_type, land_centers_df, coefficient_params, save_csv, base_dir):
- """完成单个土地类型的数据处理"""
- xls_file = os.path.join(base_dir, "..", "static", "water", "Data", "Irrigation_water_SamplingPoint.xlsx")
-
- # 读取Excel采样点数据
- if not os.path.exists(xls_file):
- logger.error(f"采样点Excel文件不存在: {xls_file}")
- return None, None, None
-
- df_xls = pd.read_excel(xls_file)
- logger.info(f"读取到 {len(df_xls)} 个采样点数据")
- # 设置土地类型系数
- default_params = {
- "水田": (711, 0.524),
- "水浇地": (427, 0.599),
- "旱地": (200, 0.7)
- }
- params = coefficient_params or default_params
- param1, param2 = params.get(land_type, (200, 0.7))
- Num = param1 * param2
- logger.info(f"系数: {param1} * {param2} = {Num}")
- # 高效处理:使用空间索引查找最近采样点
- logger.info("开始高效距离计算和Cd值计算...")
- start_time = time()
-
- # 使用优化的空间索引方法查找最近采样点
- nearest_indices = find_nearest_sampling_points_optimized(land_centers_df, df_xls)
-
- # 批量计算Cd含量值
- centers = list(zip(land_centers_df['center_lon'], land_centers_df['center_lat']))
- cd_values = df_xls.iloc[nearest_indices]['Cd (ug/L)'].values * Num
-
- calculation_time = time() - start_time
- logger.info(f"Cd值计算完成,耗时: {calculation_time:.2f}秒")
- logger.info(f"处理了 {len(centers)} 个土地中心点")
- # 创建简化数据DataFrame
- simplified_data = pd.DataFrame({
- 'lon': [c[0] for c in centers],
- 'lat': [c[1] for c in centers],
- 'Prediction': cd_values
- })
- # 应用3σ原则检测异常值
- mean_value = simplified_data['Prediction'].mean()
- std_value = simplified_data['Prediction'].std()
- lower_bound = mean_value - 3 * std_value
- upper_bound = mean_value + 3 * std_value
- logger.info(f"Cd含量 - 平均值: {mean_value:.4f}, 标准差: {std_value:.4f}")
- logger.info(f"检测范围: 下限 = {lower_bound:.4f}, 上限 = {upper_bound:.4f}")
- # 识别异常值
- outliers = simplified_data[
- (simplified_data['Prediction'] < lower_bound) |
- (simplified_data['Prediction'] > upper_bound)
- ]
- logger.info(f"检测到异常值数量: {len(outliers)}")
- # 创建清洗后的数据
- cleaned_data = simplified_data[
- (simplified_data['Prediction'] >= lower_bound) &
- (simplified_data['Prediction'] <= upper_bound)
- ]
- logger.info(f"清洗后数据点数: {len(cleaned_data)}")
- # 可选:保存CSV文件用于兼容性和调试
- cleaned_csv_path = None
- if save_csv:
- # 创建输出目录
- data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
- os.makedirs(data_dir, exist_ok=True)
- logger.info(f"数据目录: {data_dir}")
-
- cleaned_csv_path = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
- cleaned_data.to_csv(cleaned_csv_path, index=False, encoding='utf-8-sig')
- logger.info(f"保存CSV: {cleaned_csv_path}")
- else:
- logger.info("跳过CSV文件生成(内存处理优化)")
- return cleaned_data, Num, cleaned_csv_path # 返回DataFrame, 系数, 和CSV路径(如果生成了)
- # 可视化函数(完全使用统一的MappingUtils接口)
- def mapping_raster(shp_path, tif_path, color_map_name, title_name, output_path, output_size=8):
- """
- 生成栅格地图可视化
-
- @description: 使用统一的MappingUtils接口生成栅格地图
- @param shp_path: 边界shapefile路径
- @param tif_path: 栅格数据文件路径
- @param color_map_name: 色彩方案名称(中文)
- @param title_name: 地图标题
- @param output_path: 输出图片路径
- @param output_size: 图片尺寸
- @returns: 输出图片路径或None(失败时)
- """
- try:
- logger.info(f"生成栅格地图: {title_name}")
- # 确保输出目录存在
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- # 创建MappingUtils实例
- mapper = MappingUtils()
- # 转换颜色方案名称
- colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
- # 直接调用统一的绘图接口,无需额外包装
- generated_path = mapper.create_raster_map(
- shp_path=shp_path,
- tif_path=tif_path,
- output_path=os.path.splitext(output_path)[0], # 去掉扩展名,MappingUtils会自动添加.jpg
- colormap=colormap_key,
- title=title_name,
- output_size=output_size,
- figsize=None,
- dpi=300,
- enable_interpolation=False,
- interpolation_method='linear'
- )
- # 如果生成的路径与期望路径不同,则重命名
- if generated_path != output_path and os.path.exists(generated_path):
- shutil.move(generated_path, output_path)
- logger.info(f"重命名图片: {generated_path} -> {output_path}")
- return output_path
-
- return generated_path
- except Exception as e:
- logger.error(f"栅格地图生成失败: {str(e)}")
- return None
- def plot_tif_histogram(file_path, output_path, figsize=(8, 8),
- xlabel='Cd(ug/L)', ylabel='frequency density',
- title='Irrigation water input flux'):
- """
- 生成TIFF文件的直方图
-
- @description: 使用统一的MappingUtils接口生成直方图
- @param file_path: TIFF文件路径
- @param output_path: 输出图片路径
- @param figsize: 图片尺寸元组
- @param xlabel: X轴标签
- @param ylabel: Y轴标签
- @param title: 图表标题
- @returns: 输出图片路径或None(失败时)
- """
- try:
- logger.info(f"生成直方图: {file_path}")
- # 创建MappingUtils实例
- mapper = MappingUtils()
- # 直接调用统一的绘图接口,无需额外包装
- generated_path = mapper.create_histogram(
- file_path=file_path,
- save_path=output_path,
- figsize=figsize,
- xlabel=xlabel,
- ylabel=ylabel,
- title=title,
- bins=100,
- dpi=300
- )
- return generated_path
- except Exception as e:
- logger.error(f"直方图生成失败: {str(e)}")
- return None
- def process_land_to_visualization(land_types=None, land_type=None, coefficient_params=None,
- color_map_name="绿-黄-红-紫",
- output_size=8,
- area: Optional[str] = None,
- level: Optional[str] = None,
- enable_interpolation: Optional[bool] = True,
- interpolation_method: Optional[str] = "linear",
- resolution_factor: Optional[float] = 4.0,
- save_csv: Optional[bool] = True,
- cleanup_temp_files: Optional[bool] = True):
- """
- 完整的土地数据处理可视化流程(支持单个或多个土地类型,使用统一的MappingUtils接口,支持动态边界和插值控制)
-
- @description: 水样数据处理与可视化的完整工作流程,已优化为完全使用统一的绘图接口,
- 支持通过area和level参数动态控制地图边界,支持插值控制参数,支持多个土地类型合并处理
-
- 工作流程:
- 1. 生成清洗后CSV - 从数据库获取土地利用数据并计算Cd含量(支持多个类型合并)
- 2. 获取动态边界 - 根据area和level参数从数据库获取边界数据
- 3. 使用csv_to_raster_workflow转换为GeoTIFF - 调用统一的栅格转换工具,支持插值控制
- 4. 生成栅格地图 - 直接调用MappingUtils.create_raster_map(),支持动态边界
- 5. 生成直方图 - 直接调用MappingUtils.create_histogram()
- @param land_types: 土地类型列表(支持多个,如['水田', '旱地']),优先使用此参数
- @param land_type: 单个土地类型(向后兼容,如果land_types为None则使用此参数)
- @param coefficient_params: 可选的系数参数字典
- @param color_map_name: 色彩方案名称(中文)
- @param output_size: 输出图片尺寸
- @param area: 可选的地区名称,如"乐昌市",用于动态控制地图边界
- @param level: 可选的行政层级,county | city | province
- @param enable_interpolation: 是否启用空间插值,默认True
- @param interpolation_method: 插值方法,nearest | linear | cubic,默认linear
- @param resolution_factor: 分辨率因子,默认4.0,越大分辨率越高
- @param save_csv: 是否生成CSV文件,默认True
- @param cleanup_temp_files: 是否清理临时文件,默认True
- @returns: 包含所有生成文件路径的元组
- """
- base_dir = get_base_dir()
-
- # 处理参数兼容性 - 支持单个和多个土地类型
- if land_types is not None:
- # 使用新的多类型参数,并标准化顺序
- input_types = land_types if isinstance(land_types, list) else [land_types]
- actual_land_types = standardize_land_types_order(input_types)
- logger.info(f"开始处理多个土地类型: {', '.join(actual_land_types)}")
- is_multiple_types = len(actual_land_types) > 1
- elif land_type is not None:
- # 向后兼容单个类型参数
- actual_land_types = [land_type.strip()]
- logger.info(f"开始处理单个土地类型: {land_type}")
- is_multiple_types = False
- else:
- raise ValueError("必须提供land_types或land_type参数")
- # 1. 生成清洗后的数据(内存处理,可选择是否保存CSV)
- if is_multiple_types:
- # 使用多类型处理函数
- cleaned_data, used_coeff, cleaned_csv_path = process_multiple_land_data(
- actual_land_types,
- coefficient_params,
- save_csv=save_csv
- )
- else:
- # 使用单类型处理函数
- cleaned_data, used_coeff, cleaned_csv_path = process_land_data(
- actual_land_types[0],
- coefficient_params,
- save_csv=save_csv
- )
- if cleaned_data is None:
- logger.error(f"处理土地数据失败: {', '.join(actual_land_types)}")
- return None, None, None, None, None, None
- # 2. 获取动态边界数据(提前获取用于栅格转换)
- boundary_gdf = None
- if area and level:
- logger.info(f"尝试获取动态边界: area='{area}', level='{level}'")
- boundary_gdf = get_boundary_gdf_from_database(area, level)
- if boundary_gdf is not None:
- logger.info(f"成功获取动态边界用于栅格转换: {area} ({level}) - 包含 {len(boundary_gdf)} 个几何体")
- logger.info(f"边界数据范围: {boundary_gdf.bounds.to_dict('records')[0] if len(boundary_gdf) > 0 else 'N/A'}")
- else:
- logger.warning(f"未能获取动态边界,栅格转换将使用完整范围: {area} ({level})")
- logger.warning("可能的原因: 1) 数据库中没有该地区数据 2) 地区名称拼写错误 3) level参数不正确")
- else:
- logger.info("未指定area和level参数,尝试获取默认边界(乐昌市)")
- boundary_gdf = get_boundary_gdf_from_database("乐昌市", "county")
- if boundary_gdf is not None:
- logger.info(f"成功获取默认边界(乐昌市)用于栅格转换 - 包含 {len(boundary_gdf)} 个几何体")
- else:
- logger.info("未能获取默认边界,栅格转换将使用数据点范围")
-
- # 3. 使用dataframe_to_raster_workflow转换为GeoTIFF(内存处理,支持动态边界)
- raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
- template_tif = os.path.join(raster_dir, "meanTemp.tif")
- output_dir = raster_dir # 直接使用raster目录作为输出目录
- # 调用dataframe_to_raster_workflow,支持动态边界和插值控制(内存处理优化)
- workflow_result = dataframe_to_raster_workflow(
- df=cleaned_data, # 直接使用DataFrame
- template_tif=template_tif,
- output_dir=output_dir,
- boundary_gdf=boundary_gdf, # 直接使用GeoDataFrame
- resolution_factor=resolution_factor, # 可配置的分辨率因子
- interpolation_method=interpolation_method, # 可配置的插值方法
- field_name='Prediction',
- lon_col=0, # DataFrame中经度列索引
- lat_col=1, # DataFrame中纬度列索引
- value_col=2, # DataFrame中数值列索引
- enable_interpolation=enable_interpolation # 可配置的插值开关
- )
- # 获取输出的栅格文件路径
- output_tif = workflow_result['raster']
- logger.info(f"生成栅格文件: {output_tif}")
- # 4. 生成栅格地图(直接使用统一的MappingUtils接口,支持动态边界)
- # 生成输出文件名 - 支持多个土地类型
- combined_type_name = '_'.join(actual_land_types)
- map_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量地图.jpg")
-
- try:
- # 创建MappingUtils实例并直接调用
- mapper = MappingUtils()
-
- # 转换颜色方案名称
- colormap_key = COLORMAP_MAPPING.get(color_map_name, "green_yellow_red_purple")
-
- # 统一使用boundary_gdf,不再依赖Shapefile
- map_result = mapper.create_raster_map(
- shp_path=None, # 不使用Shapefile
- tif_path=output_tif,
- output_path=os.path.splitext(map_output)[0], # 去掉扩展名
- colormap=colormap_key,
- title="Irrigation water input flux",
- output_size=output_size,
- dpi=300,
- enable_interpolation=enable_interpolation, # 可配置的插值开关
- interpolation_method=interpolation_method, # 可配置的插值方法
- boundary_gdf=boundary_gdf # 使用从数据库获取的边界数据,如果为None则使用数据点范围
- )
-
- # 如果生成的路径与期望不同,则重命名
- if map_result != map_output and os.path.exists(map_result):
- shutil.move(map_result, map_output)
- logger.info(f"栅格地图已生成: {map_output}")
-
- except Exception as e:
- logger.error(f"栅格地图生成失败: {str(e)}")
- # 5. 生成直方图(直接使用统一的MappingUtils接口)
- hist_output = os.path.join(raster_dir, f"{combined_type_name}_Cd含量直方图.jpg")
-
- try:
- # 直接调用统一绘图接口生成直方图
- hist_result = mapper.create_histogram(
- file_path=output_tif,
- save_path=hist_output,
- figsize=(8, 8),
- xlabel='Cd(ug/L)',
- ylabel='frequency density',
- title='Irrigation water input flux',
- bins=100,
- dpi=300
- )
-
- if hist_result:
- logger.info(f"直方图已生成: {hist_output}")
-
- except Exception as e:
- logger.error(f"直方图生成失败: {str(e)}")
- # 使用实际的CSV路径(如果生成了)或生成兼容性路径
- if cleaned_csv_path:
- cleaned_csv = cleaned_csv_path
- else:
- # 为了兼容性,生成路径(即使文件不存在)
- data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
- cleaned_csv = os.path.join(data_dir, f"中心点经纬度与预测值&{combined_type_name}_清洗.csv")
-
- # 清理临时文件(如果启用)
- if cleanup_temp_files:
- logger.info("开始清理临时文件...")
-
- # 要清理的临时文件列表
- temp_files_to_cleanup = []
-
- # 添加临时栅格文件(如果是memory_raster_开头的)
- if output_tif and 'memory_raster_' in os.path.basename(output_tif):
- temp_files_to_cleanup.append(output_tif)
-
- # 添加临时shapefile(如果存在且是临时文件)
- temp_shapefile = workflow_result.get('shapefile')
- if temp_shapefile and ('temp' in temp_shapefile.lower() or 'memory' in temp_shapefile.lower()):
- temp_files_to_cleanup.append(temp_shapefile)
-
- # 如果不保存CSV,也清理CSV文件
- if not save_csv and cleaned_csv_path and os.path.exists(cleaned_csv_path):
- temp_files_to_cleanup.append(cleaned_csv_path)
-
- # 执行清理
- if temp_files_to_cleanup:
- cleanup_temporary_files(*temp_files_to_cleanup)
- logger.info(f"已清理 {len(temp_files_to_cleanup)} 个临时文件")
-
- # 如果清理了栅格文件,将返回路径设为None以避免引用已删除的文件
- if output_tif in temp_files_to_cleanup:
- output_tif = None
- logger.info("注意:临时栅格文件已被清理,返回的栅格路径为None")
- else:
- logger.info("没有临时文件需要清理")
-
- return cleaned_csv, workflow_result['shapefile'], output_tif, map_output, hist_output, used_coeff
- def get_land_statistics(land_type: str) -> Optional[Dict[str, Any]]:
- """
- 获取指定土地类型的Cd预测结果统计信息
- @param {str} land_type - 土地类型(水田、旱地或水浇地)
- @returns {Optional[Dict[str, Any]]} 统计信息,如果没有数据则返回None
- """
- try:
- logger.info(f"获取土地类型统计信息: {land_type}")
- # 获取基础目录
- base_dir = get_base_dir()
- data_dir = os.path.join(base_dir, "..", "static", "water", "Data")
- # 构建数据文件路径
- data_file = os.path.join(data_dir, f"中心点经纬度与预测值&{land_type}_清洗.csv")
- logger.info(f"数据文件路径: {data_file}")
- if not os.path.exists(data_file):
- logger.warning(f"未找到土地类型数据文件: {data_file}")
- return None
- # 读取预测数据
- df = pd.read_csv(data_file)
- logger.info(f"成功读取数据文件,包含 {len(df)} 行数据")
- # 检查必要的列是否存在
- if 'Prediction' not in df.columns:
- logger.warning("数据文件中缺少'Prediction'列")
- return None
- predictions = df['Prediction']
- # 计算基础统计信息
- stats = {
- "土地类型": land_type,
- "数据更新时间": datetime.fromtimestamp(os.path.getmtime(data_file)).strftime('%Y-%m-%d %H:%M:%S'),
- "数据点总数": len(predictions),
- "均值": float(predictions.mean()),
- "中位数": float(predictions.median()),
- "标准差": float(predictions.std()),
- "最小值": float(predictions.min()),
- "最大值": float(predictions.max()),
- "25%分位数": float(predictions.quantile(0.25)),
- "75%分位数": float(predictions.quantile(0.75)),
- "偏度": float(predictions.skew()),
- "峰度": float(predictions.kurtosis())
- }
- return stats
- except Exception as e:
- logger.error(f"获取土地类型统计信息失败: {str(e)}")
- return None
- # 主函数
- def main():
- """主函数用于测试完整的处理流程"""
- logger.info("=" * 50)
- logger.info("土地数据处理与可视化系统")
- logger.info("=" * 50)
- try:
- # 处理水田数据
- logger.info("\n===== 处理水田数据 =====")
- results = process_land_to_visualization("水田")
- if results and all(results):
- cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
- logger.info(f"清洗后CSV: {cleaned_csv}")
- logger.info(f"Shapefile: {shapefile}")
- logger.info(f"GeoTIFF文件: {tif_file}")
- logger.info(f"地图图片: {map_file}")
- logger.info(f"直方图: {hist_file}")
- logger.info(f"使用系数: {used_coeff}")
- else:
- logger.error("水田数据处理失败")
- # 处理旱地数据(使用自定义参数)
- logger.info("\n===== 处理旱地数据 =====")
- custom_params = {"旱地": (220, 0.65)}
- results = process_land_to_visualization(
- "旱地",
- coefficient_params=custom_params,
- color_map_name="蓝色系"
- )
- if results and all(results):
- cleaned_csv, shapefile, tif_file, map_file, hist_file, used_coeff = results
- logger.info(f"清洗后CSV: {cleaned_csv}")
- logger.info(f"Shapefile: {shapefile}")
- logger.info(f"GeoTIFF文件: {tif_file}")
- logger.info(f"地图图片: {map_file}")
- logger.info(f"直方图: {hist_file}")
- logger.info(f"使用系数: {used_coeff}")
- else:
- logger.error("旱地数据处理失败")
- except Exception as e:
- logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
- finally:
- # 清理临时文件
- base_dir = get_base_dir()
- raster_dir = os.path.join(base_dir, "..", "static", "water", "Raster")
- cleaned_count = cleanup_temp_files_in_directory(raster_dir)
- if cleaned_count > 0:
- logger.info(f"已清理 {cleaned_count} 个临时文件")
-
- logger.info("处理完成")
- if __name__ == "__main__":
- main()
|