|
@@ -14,6 +14,7 @@ from typing import Dict, Any, List, Optional
|
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
|
from sqlalchemy import create_engine, and_
|
|
|
from ..database import SessionLocal, engine
|
|
|
+from ..models import FluxCdInputData
|
|
|
from ..models.parameters import Parameters
|
|
|
from ..models.CropCd_output import CropCdOutputData
|
|
|
from ..models.farmland import FarmlandData
|
|
@@ -609,5 +610,408 @@ class CdFluxRemovalService:
|
|
|
self.logger.warning(f"从数据库创建边界文件失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
+ def create_surface_runoff_visualization(self, area: str, level: str,
|
|
|
+ output_dir: str = "app/static/surface_runoff",
|
|
|
+ template_raster: str = "app/static/cd_flux/meanTemp.tif",
|
|
|
+ boundary_shp: str = None,
|
|
|
+ colormap: str = "green_yellow_red_purple",
|
|
|
+ resolution_factor: float = 4.0,
|
|
|
+ enable_interpolation: bool = False,
|
|
|
+ cleanup_intermediate: bool = True) -> Dict[str, str]:
|
|
|
+ """
|
|
|
+ 创建地表径流Cd通量可视化图表
|
|
|
+
|
|
|
+ @param area: 地区名称
|
|
|
+ @param level: 行政层级
|
|
|
+ @param output_dir: 输出目录
|
|
|
+ @param template_raster: 模板栅格文件路径
|
|
|
+ @param boundary_shp: 边界shapefile路径
|
|
|
+ @param colormap: 色彩方案
|
|
|
+ @param resolution_factor: 分辨率因子
|
|
|
+ @param enable_interpolation: 是否启用空间插值
|
|
|
+ @param cleanup_intermediate: 是否清理中间文件
|
|
|
+ @returns: 生成的图片文件路径字典
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从数据库获取地表径流数据
|
|
|
+ results_with_coords = []
|
|
|
+ with SessionLocal() as db:
|
|
|
+ # 查询地表径流数据(DB_Cd)及对应坐标
|
|
|
+ runoff_data = db.query(
|
|
|
+ FarmlandData.farmland_id,
|
|
|
+ FarmlandData.sample_id,
|
|
|
+ FarmlandData.lon,
|
|
|
+ FarmlandData.lan,
|
|
|
+ FluxCdInputData.surface_runoff
|
|
|
+ ).join(
|
|
|
+ FluxCdInputData,
|
|
|
+ (FluxCdInputData.farmland_id == FarmlandData.farmland_id) &
|
|
|
+ (FluxCdInputData.sample_id == FarmlandData.sample_id)
|
|
|
+ ).all()
|
|
|
+
|
|
|
+ if not runoff_data:
|
|
|
+ self.logger.warning("未找到地表径流数据")
|
|
|
+ return {}
|
|
|
+
|
|
|
+ for data in runoff_data:
|
|
|
+ results_with_coords.append({
|
|
|
+ "farmland_id": data.farmland_id,
|
|
|
+ "sample_id": data.sample_id,
|
|
|
+ "longitude": data.lon,
|
|
|
+ "latitude": data.lan,
|
|
|
+ "flux_value": data.surface_runoff
|
|
|
+ })
|
|
|
+
|
|
|
+ # 确保输出目录存在
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+ generated_files: Dict[str, str] = {}
|
|
|
+
|
|
|
+ # 生成时间戳
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+
|
|
|
+ # 创建CSV文件用于绘图
|
|
|
+ csv_filename = f"surface_runoff_{area}_temp_{timestamp}.csv"
|
|
|
+ csv_path = os.path.join(output_dir, csv_filename)
|
|
|
+
|
|
|
+ # 准备绘图数据
|
|
|
+ plot_data = []
|
|
|
+ for result in results_with_coords:
|
|
|
+ plot_data.append({
|
|
|
+ "longitude": result["longitude"],
|
|
|
+ "latitude": result["latitude"],
|
|
|
+ "flux_value": result["flux_value"]
|
|
|
+ })
|
|
|
+
|
|
|
+ # 保存为CSV
|
|
|
+ df = pd.DataFrame(plot_data)
|
|
|
+ df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
|
|
+
|
|
|
+ # 初始化绘图工具
|
|
|
+ mapper = MappingUtils()
|
|
|
+
|
|
|
+ # 生成输出文件路径
|
|
|
+ map_output = os.path.join(output_dir, f"surface_runoff_{area}_map_{timestamp}")
|
|
|
+ histogram_output = os.path.join(output_dir, f"surface_runoff_{area}_histogram_{timestamp}")
|
|
|
+
|
|
|
+ # 检查模板文件是否存在
|
|
|
+ if not os.path.exists(template_raster):
|
|
|
+ self.logger.warning(f"模板栅格文件不存在: {template_raster}")
|
|
|
+ template_raster = None
|
|
|
+
|
|
|
+ # 动态获取边界数据(严格使用指定层级)
|
|
|
+ if not level:
|
|
|
+ raise ValueError("必须提供行政层级 level:county | city | province")
|
|
|
+
|
|
|
+ # 直接从数据库获取边界GeoDataFrame
|
|
|
+ boundary_gdf = self._get_boundary_gdf_from_database(area, level)
|
|
|
+ boundary_shp = None # 不再需要临时边界文件
|
|
|
+
|
|
|
+ if boundary_gdf is None:
|
|
|
+ self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
|
|
|
+ else:
|
|
|
+ # 在绘图前进行样点边界包含性统计
|
|
|
+ try:
|
|
|
+ if boundary_gdf is not None and len(boundary_gdf) > 0:
|
|
|
+ boundary_union = boundary_gdf.unary_union
|
|
|
+ total_points = len(results_with_coords)
|
|
|
+ inside_count = 0
|
|
|
+ outside_points: List[Dict[str, Any]] = []
|
|
|
+ for r in results_with_coords:
|
|
|
+ pt = Point(float(r["longitude"]), float(r["latitude"]))
|
|
|
+ if boundary_union.contains(pt) or boundary_union.touches(pt):
|
|
|
+ inside_count += 1
|
|
|
+ else:
|
|
|
+ outside_points.append({
|
|
|
+ "farmland_id": r.get("farmland_id"),
|
|
|
+ "sample_id": r.get("sample_id"),
|
|
|
+ "longitude": r.get("longitude"),
|
|
|
+ "latitude": r.get("latitude"),
|
|
|
+ "flux_value": r.get("flux_value")
|
|
|
+ })
|
|
|
+
|
|
|
+ outside_count = total_points - inside_count
|
|
|
+ inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
|
|
|
+
|
|
|
+ self.logger.info(
|
|
|
+ f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
|
|
|
+ if outside_count > 0:
|
|
|
+ sample_preview = outside_points[:10]
|
|
|
+ self.logger.warning(
|
|
|
+ f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
|
|
|
+ except Exception as check_err:
|
|
|
+ self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
|
|
|
+
|
|
|
+ # 创建shapefile
|
|
|
+ shapefile_path = csv_path.replace('.csv', '_points.shp')
|
|
|
+ mapper.csv_to_shapefile(csv_path, shapefile_path,
|
|
|
+ lon_col='longitude', lat_col='latitude', value_col='flux_value')
|
|
|
+
|
|
|
+ # 合并已生成文件映射
|
|
|
+ generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
|
|
|
+
|
|
|
+ # 如果有模板栅格文件,创建栅格地图
|
|
|
+ if template_raster:
|
|
|
+ try:
|
|
|
+ # 创建栅格
|
|
|
+ raster_path = csv_path.replace('.csv', '_raster.tif')
|
|
|
+ raster_path, stats = mapper.vector_to_raster(
|
|
|
+ shapefile_path, template_raster, raster_path, 'flux_value',
|
|
|
+ resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
|
|
|
+ interpolation_method='nearest', enable_interpolation=enable_interpolation
|
|
|
+ )
|
|
|
+ generated_files["raster"] = raster_path
|
|
|
+
|
|
|
+ # 创建栅格地图 - 使用英文标题避免中文乱码
|
|
|
+ map_title = "Surface Runoff Cd Flux"
|
|
|
+
|
|
|
+ map_file = mapper.create_raster_map(
|
|
|
+ boundary_shp if boundary_shp else None,
|
|
|
+ raster_path,
|
|
|
+ map_output,
|
|
|
+ colormap=colormap,
|
|
|
+ title=map_title,
|
|
|
+ output_size=12,
|
|
|
+ dpi=300,
|
|
|
+ resolution_factor=4.0,
|
|
|
+ enable_interpolation=False,
|
|
|
+ interpolation_method='nearest',
|
|
|
+ boundary_gdf=boundary_gdf
|
|
|
+ )
|
|
|
+ generated_files["map"] = map_file
|
|
|
+
|
|
|
+ # 创建直方图 - 使用英文标题避免中文乱码
|
|
|
+ histogram_title = "Surface Runoff Cd Flux Distribution"
|
|
|
+
|
|
|
+ histogram_file = mapper.create_histogram(
|
|
|
+ raster_path,
|
|
|
+ f"{histogram_output}.jpg",
|
|
|
+ title=histogram_title,
|
|
|
+ xlabel='Cd Flux (g/ha/a)',
|
|
|
+ ylabel='Frequency Density'
|
|
|
+ )
|
|
|
+ generated_files["histogram"] = histogram_file
|
|
|
+
|
|
|
+ except Exception as viz_error:
|
|
|
+ self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
|
|
|
+ # 即使栅格可视化失败,也返回已生成的文件
|
|
|
+
|
|
|
+ # 清理中间文件(默认开启,仅保留最终可视化)
|
|
|
+ if cleanup_intermediate:
|
|
|
+ try:
|
|
|
+ self._cleanup_intermediate_files(generated_files, None)
|
|
|
+ except Exception as cleanup_err:
|
|
|
+ self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
|
|
|
+
|
|
|
+ self.logger.info(f"✓ 成功创建地表径流Cd通量可视化,生成文件: {list(generated_files.keys())}")
|
|
|
+ return generated_files
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"创建地表径流可视化失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+ def get_groundwater_leaching_data(self, area: str) -> List[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 获取地下渗流(地下水渗漏)通量数据及坐标
|
|
|
+
|
|
|
+ @param area: 地区名称
|
|
|
+ @returns: 包含坐标和通量值的结果列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ with SessionLocal() as db:
|
|
|
+ # 查询地下渗流数据及对应坐标
|
|
|
+ leaching_data = db.query(
|
|
|
+ FluxCdInputData.groundwater_leaching,
|
|
|
+ FarmlandData.farmland_id,
|
|
|
+ FarmlandData.sample_id,
|
|
|
+ FarmlandData.lon,
|
|
|
+ FarmlandData.lan
|
|
|
+ ).join(
|
|
|
+ FarmlandData,
|
|
|
+ and_(
|
|
|
+ FluxCdInputData.farmland_id == FarmlandData.farmland_id,
|
|
|
+ FluxCdInputData.sample_id == FarmlandData.sample_id
|
|
|
+ )
|
|
|
+ ).all()
|
|
|
+
|
|
|
+ if not leaching_data:
|
|
|
+ self.logger.warning(f"未找到地区 '{area}' 的地下渗流数据")
|
|
|
+ return []
|
|
|
+
|
|
|
+ results = []
|
|
|
+ for data in leaching_data:
|
|
|
+ results.append({
|
|
|
+ "farmland_id": data.farmland_id,
|
|
|
+ "sample_id": data.sample_id,
|
|
|
+ "longitude": data.lon,
|
|
|
+ "latitude": data.lan,
|
|
|
+ "flux_value": data.groundwater_leaching
|
|
|
+ })
|
|
|
+
|
|
|
+ self.logger.info(f"成功获取 {len(results)} 个样点的地下渗流数据")
|
|
|
+ return results
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"获取地下渗流数据失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+ def create_groundwater_leaching_visualization(self, area: str, level: str,
|
|
|
+ output_dir: str = "app/static/groundwater_leaching",
|
|
|
+ template_raster: str = "app/static/cd_flux/meanTemp.tif",
|
|
|
+ colormap: str = "blues",
|
|
|
+ resolution_factor: float = 4.0,
|
|
|
+ enable_interpolation: bool = False,
|
|
|
+ cleanup_intermediate: bool = True) -> Dict[str, str]:
|
|
|
+ """
|
|
|
+ 创建地下渗流(地下水渗漏)通量可视化图表
|
|
|
+
|
|
|
+ @param area: 地区名称
|
|
|
+ @param level: 行政层级
|
|
|
+ @param output_dir: 输出目录
|
|
|
+ @param template_raster: 模板栅格文件路径
|
|
|
+ @param colormap: 色彩方案
|
|
|
+ @param resolution_factor: 分辨率因子
|
|
|
+ @param enable_interpolation: 是否启用空间插值
|
|
|
+ @param cleanup_intermediate: 是否清理中间文件
|
|
|
+ @returns: 生成的图片文件路径字典
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 获取地下渗流数据
|
|
|
+ leaching_results = self.get_groundwater_leaching_data(area)
|
|
|
+
|
|
|
+ if not leaching_results:
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "message": f"没有找到地区 '{area}' 的地下渗流数据",
|
|
|
+ "data": None
|
|
|
+ }
|
|
|
+
|
|
|
+ # 确保输出目录存在
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+ generated_files: Dict[str, str] = {}
|
|
|
+
|
|
|
+ # 生成时间戳
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+
|
|
|
+ # 创建CSV文件用于绘图
|
|
|
+ csv_filename = f"groundwater_leaching_{area}_temp_{timestamp}.csv"
|
|
|
+ csv_path = os.path.join(output_dir, csv_filename)
|
|
|
+
|
|
|
+ # 准备绘图数据
|
|
|
+ plot_data = []
|
|
|
+ for result in leaching_results:
|
|
|
+ plot_data.append({
|
|
|
+ "longitude": result["longitude"],
|
|
|
+ "latitude": result["latitude"],
|
|
|
+ "flux_value": result["flux_value"]
|
|
|
+ })
|
|
|
+
|
|
|
+ # 保存为CSV
|
|
|
+ df = pd.DataFrame(plot_data)
|
|
|
+ df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
|
|
+
|
|
|
+ # 初始化绘图工具
|
|
|
+ mapper = MappingUtils()
|
|
|
+
|
|
|
+ # 生成输出文件路径
|
|
|
+ map_output = os.path.join(output_dir, f"groundwater_leaching_{area}_map_{timestamp}")
|
|
|
+ histogram_output = os.path.join(output_dir, f"groundwater_leaching_{area}_histogram_{timestamp}")
|
|
|
+
|
|
|
+ # 检查模板文件是否存在
|
|
|
+ if not os.path.exists(template_raster):
|
|
|
+ self.logger.warning(f"模板栅格文件不存在: {template_raster}")
|
|
|
+ template_raster = None
|
|
|
+
|
|
|
+ # 直接从数据库获取边界GeoDataFrame
|
|
|
+ boundary_gdf = self._get_boundary_gdf_from_database(area, level)
|
|
|
+ boundary_shp = None # 不再需要临时边界文件
|
|
|
+
|
|
|
+ # 在绘图前进行样点边界包含性统计
|
|
|
+ if boundary_gdf is not None and len(boundary_gdf) > 0:
|
|
|
+ boundary_union = boundary_gdf.unary_union
|
|
|
+ total_points = len(leaching_results)
|
|
|
+ inside_count = 0
|
|
|
+ for r in leaching_results:
|
|
|
+ pt = Point(float(r["longitude"]), float(r["latitude"]))
|
|
|
+ if boundary_union.contains(pt) or boundary_union.touches(pt):
|
|
|
+ inside_count += 1
|
|
|
+
|
|
|
+ outside_count = total_points - inside_count
|
|
|
+ inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
|
|
|
+
|
|
|
+ self.logger.info(
|
|
|
+ f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
|
|
|
+
|
|
|
+ # 创建shapefile
|
|
|
+ shapefile_path = csv_path.replace('.csv', '_points.shp')
|
|
|
+ mapper.csv_to_shapefile(csv_path, shapefile_path,
|
|
|
+ lon_col='longitude', lat_col='latitude', value_col='flux_value')
|
|
|
+
|
|
|
+ # 合并已生成文件映射
|
|
|
+ generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
|
|
|
+
|
|
|
+ # 如果有模板栅格文件,创建栅格地图
|
|
|
+ if template_raster:
|
|
|
+ try:
|
|
|
+ # 创建栅格
|
|
|
+ raster_path = csv_path.replace('.csv', '_raster.tif')
|
|
|
+ raster_path, stats = mapper.vector_to_raster(
|
|
|
+ shapefile_path, template_raster, raster_path, 'flux_value',
|
|
|
+ resolution_factor=resolution_factor, boundary_gdf=boundary_gdf,
|
|
|
+ interpolation_method='nearest', enable_interpolation=enable_interpolation
|
|
|
+ )
|
|
|
+ generated_files["raster"] = raster_path
|
|
|
+
|
|
|
+ # 创建栅格地图 - 使用特定标题
|
|
|
+ map_title = "Groundwater Leaching Cd Flux"
|
|
|
+
|
|
|
+ map_file = mapper.create_raster_map(
|
|
|
+ boundary_shp,
|
|
|
+ raster_path,
|
|
|
+ map_output,
|
|
|
+ colormap=colormap,
|
|
|
+ title=map_title,
|
|
|
+ output_size=12,
|
|
|
+ dpi=300,
|
|
|
+ resolution_factor=4.0,
|
|
|
+ enable_interpolation=False,
|
|
|
+ interpolation_method='nearest',
|
|
|
+ boundary_gdf=boundary_gdf
|
|
|
+ )
|
|
|
+ generated_files["map"] = map_file
|
|
|
+
|
|
|
+ # 创建直方图 - 使用特定标题
|
|
|
+ histogram_title = "Groundwater Leaching Cd Flux Distribution"
|
|
|
+
|
|
|
+ histogram_file = mapper.create_histogram(
|
|
|
+ raster_path,
|
|
|
+ f"{histogram_output}.jpg",
|
|
|
+ title=histogram_title,
|
|
|
+ xlabel='Cd Flux (g/ha/a)',
|
|
|
+ ylabel='Frequency Density'
|
|
|
+ )
|
|
|
+ generated_files["histogram"] = histogram_file
|
|
|
+
|
|
|
+ except Exception as viz_error:
|
|
|
+ self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
|
|
|
+
|
|
|
+ # 清理中间文件
|
|
|
+ if cleanup_intermediate:
|
|
|
+ try:
|
|
|
+ self._cleanup_intermediate_files(generated_files, None)
|
|
|
+ except Exception as cleanup_err:
|
|
|
+ self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
|
|
|
+
|
|
|
+ self.logger.info(f"✓ 成功创建地下渗流通量可视化,生成文件: {list(generated_files.keys())}")
|
|
|
+ return generated_files
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"创建地下渗流可视化失败: {str(e)}")
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "message": f"可视化创建失败: {str(e)}",
|
|
|
+ "data": None
|
|
|
+ }
|