123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765 |
- """
- 农业投入Cd通量计算服务
- @description: 根据Parameters表中的数据计算各个地区的农业投入输入Cd通量预测
- @author: AcidMap Team
- @version: 1.0.0
- """
- import logging
- import math
- import os
- import pandas as pd
- import numpy as np
- from datetime import datetime
- from typing import Dict, Any, List, Optional
- from sqlalchemy.orm import sessionmaker, Session
- from sqlalchemy import create_engine, and_
- from ..database import SessionLocal, engine
- from ..models.parameters import Parameters
- from ..models.farmland import FarmlandData
- from ..utils.mapping_utils import MappingUtils
- from .admin_boundary_service import get_boundary_geojson_by_name, get_boundary_gdf_by_name
- import geopandas as gpd
- from shapely.geometry import shape, Point
- import tempfile
- import json
- class AgriculturalInputService:
- """
- 农业投入Cd通量计算服务类
-
- @description: 提供基于Parameters表数据的农业投入输入Cd通量预测计算功能
- """
-
- def __init__(self):
- """
- 初始化农业投入服务
- """
- self.logger = logging.getLogger(__name__)
-
- def calculate_cd_flux_by_area(self, area: str) -> Dict[str, Any]:
- """
- 根据地区计算农业投入输入Cd通量
-
- @param area: 地区名称
- @returns: 计算结果
- """
- try:
- with SessionLocal() as db:
- # 查询指定地区的参数
- parameter = db.query(Parameters).filter(Parameters.area == area).first()
-
- if not parameter:
- return {
- "success": False,
- "message": f"未找到地区 '{area}' 的参数数据",
- "data": None
- }
-
- # 计算农业投入输入Cd通量
- # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
- cd_flux = (
- parameter.f3 * parameter.nf + # 氮肥
- parameter.f4 * parameter.pf + # 磷肥
- parameter.f5 * parameter.kf + # 钾肥
- parameter.f6 * parameter.cf + # 复合肥
- parameter.f7 * parameter.of + # 有机肥
- parameter.f8 * parameter.p + # 农药
- parameter.f9 * parameter.ff + # 农家肥
- parameter.f10 * parameter.af # 农膜
- )
-
- # 计算各项明细
- details = {
- "nitrogen_fertilizer": parameter.f3 * parameter.nf, # 氮肥贡献
- "phosphorus_fertilizer": parameter.f4 * parameter.pf, # 磷肥贡献
- "potassium_fertilizer": parameter.f5 * parameter.kf, # 钾肥贡献
- "compound_fertilizer": parameter.f6 * parameter.cf, # 复合肥贡献
- "organic_fertilizer": parameter.f7 * parameter.of, # 有机肥贡献
- "pesticide": parameter.f8 * parameter.p, # 农药贡献
- "farmyard_manure": parameter.f9 * parameter.ff, # 农家肥贡献
- "agricultural_film": parameter.f10 * parameter.af # 农膜贡献
- }
-
- return {
- "success": True,
- "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量",
- "data": {
- "area": area,
- "total_cd_flux": round(cd_flux, 6),
- "unit": "g/ha/a",
- "details": {key: round(value, 6) for key, value in details.items()},
- "parameters_used": {
- "f3_nitrogen_cd_content": parameter.f3,
- "f4_phosphorus_cd_content": parameter.f4,
- "f5_potassium_cd_content": parameter.f5,
- "f6_compound_cd_content": parameter.f6,
- "f7_organic_cd_content": parameter.f7,
- "f8_pesticide_cd_content": parameter.f8,
- "f9_farmyard_cd_content": parameter.f9,
- "f10_film_cd_content": parameter.f10,
- "nf_nitrogen_usage": parameter.nf,
- "pf_phosphorus_usage": parameter.pf,
- "kf_potassium_usage": parameter.kf,
- "cf_compound_usage": parameter.cf,
- "of_organic_usage": parameter.of,
- "p_pesticide_usage": parameter.p,
- "ff_farmyard_usage": parameter.ff,
- "af_film_usage": parameter.af
- }
- }
- }
-
- except Exception as e:
- self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
- return {
- "success": False,
- "message": f"计算失败: {str(e)}",
- "data": None
- }
-
- def calculate_all_areas_cd_flux(self) -> Dict[str, Any]:
- """
- 计算所有地区的农业投入输入Cd通量
-
- @returns: 所有地区的计算结果
- """
- try:
- with SessionLocal() as db:
- # 查询所有参数记录
- parameters = db.query(Parameters).all()
-
- if not parameters:
- return {
- "success": False,
- "message": "数据库中未找到任何参数数据",
- "data": None
- }
-
- results = []
-
- for param in parameters:
- # 计算每个地区的Cd通量
- cd_flux = (
- param.f3 * param.nf +
- param.f4 * param.pf +
- param.f5 * param.kf +
- param.f6 * param.cf +
- param.f7 * param.of +
- param.f8 * param.p +
- param.f9 * param.ff +
- param.f10 * param.af
- )
-
- details = {
- "nitrogen_fertilizer": param.f3 * param.nf,
- "phosphorus_fertilizer": param.f4 * param.pf,
- "potassium_fertilizer": param.f5 * param.kf,
- "compound_fertilizer": param.f6 * param.cf,
- "organic_fertilizer": param.f7 * param.of,
- "pesticide": param.f8 * param.p,
- "farmyard_manure": param.f9 * param.ff,
- "agricultural_film": param.f10 * param.af
- }
-
- result_item = {
- "area": param.area,
- "total_cd_flux": round(cd_flux, 6),
- "details": {key: round(value, 6) for key, value in details.items()}
- }
-
- results.append(result_item)
-
- # 按总通量排序
- results.sort(key=lambda x: x["total_cd_flux"], reverse=True)
-
- return {
- "success": True,
- "message": f"成功计算所有 {len(results)} 个地区的农业投入输入Cd通量",
- "data": {
- "total_areas": len(results),
- "unit": "g/ha/a",
- "results": results,
- "summary": {
- "max_cd_flux": max(results, key=lambda x: x["total_cd_flux"]) if results else None,
- "min_cd_flux": min(results, key=lambda x: x["total_cd_flux"]) if results else None,
- "average_cd_flux": round(sum(r["total_cd_flux"] for r in results) / len(results), 6) if results else 0
- }
- }
- }
-
- except Exception as e:
- self.logger.error(f"计算所有地区Cd通量时发生错误: {str(e)}")
- return {
- "success": False,
- "message": f"计算失败: {str(e)}",
- "data": None
- }
-
- def get_available_areas(self) -> Dict[str, Any]:
- """
- 获取数据库中所有可用的地区列表
-
- @returns: 可用地区列表
- """
- try:
- with SessionLocal() as db:
- # 查询所有不重复的地区
- areas = db.query(Parameters.area).distinct().all()
- area_list = [area[0] for area in areas if area[0]]
-
- return {
- "success": True,
- "message": f"成功获取 {len(area_list)} 个可用地区",
- "data": {
- "areas": sorted(area_list),
- "total_count": len(area_list)
- }
- }
-
- except Exception as e:
- self.logger.error(f"获取可用地区列表时发生错误: {str(e)}")
- return {
- "success": False,
- "message": f"获取失败: {str(e)}",
- "data": None
- }
-
- def calculate_cd_flux_with_custom_data(self, request) -> Dict[str, Any]:
- """
- 使用用户提供的自定义数据计算农业投入输入Cd通量
-
- @param request: 包含计算参数的请求对象
- @returns: 计算结果
- """
- try:
- # 进行参数验证
- if not self._validate_calculation_parameters(request):
- return {
- "success": False,
- "message": "提供的参数数据不完整或无效",
- "data": None
- }
-
- # 计算农业投入输入Cd通量
- # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
- cd_flux = (
- request.f3_nitrogen_cd_content * request.nf_nitrogen_usage + # 氮肥
- request.f4_phosphorus_cd_content * request.pf_phosphorus_usage + # 磷肥
- request.f5_potassium_cd_content * request.kf_potassium_usage + # 钾肥
- request.f6_compound_cd_content * request.cf_compound_usage + # 复合肥
- request.f7_organic_cd_content * request.of_organic_usage + # 有机肥
- request.f8_pesticide_cd_content * request.p_pesticide_usage + # 农药
- request.f9_farmyard_cd_content * request.ff_farmyard_usage + # 农家肥
- request.f10_film_cd_content * request.af_film_usage # 农膜
- )
-
- # 计算各项明细贡献
- details = {
- "nitrogen_fertilizer": request.f3_nitrogen_cd_content * request.nf_nitrogen_usage,
- "phosphorus_fertilizer": request.f4_phosphorus_cd_content * request.pf_phosphorus_usage,
- "potassium_fertilizer": request.f5_potassium_cd_content * request.kf_potassium_usage,
- "compound_fertilizer": request.f6_compound_cd_content * request.cf_compound_usage,
- "organic_fertilizer": request.f7_organic_cd_content * request.of_organic_usage,
- "pesticide": request.f8_pesticide_cd_content * request.p_pesticide_usage,
- "farmyard_manure": request.f9_farmyard_cd_content * request.ff_farmyard_usage,
- "agricultural_film": request.f10_film_cd_content * request.af_film_usage
- }
-
- # 构建返回结果
- result_data = {
- "total_cd_flux": round(cd_flux, 6),
- "unit": "g/ha/a",
- "details": {key: round(value, 6) for key, value in details.items()},
- "input_parameters": {
- "cd_contents": {
- "f3_nitrogen_cd_content": request.f3_nitrogen_cd_content,
- "f4_phosphorus_cd_content": request.f4_phosphorus_cd_content,
- "f5_potassium_cd_content": request.f5_potassium_cd_content,
- "f6_compound_cd_content": request.f6_compound_cd_content,
- "f7_organic_cd_content": request.f7_organic_cd_content,
- "f8_pesticide_cd_content": request.f8_pesticide_cd_content,
- "f9_farmyard_cd_content": request.f9_farmyard_cd_content,
- "f10_film_cd_content": request.f10_film_cd_content
- },
- "usage_amounts": {
- "nf_nitrogen_usage": request.nf_nitrogen_usage,
- "pf_phosphorus_usage": request.pf_phosphorus_usage,
- "kf_potassium_usage": request.kf_potassium_usage,
- "cf_compound_usage": request.cf_compound_usage,
- "of_organic_usage": request.of_organic_usage,
- "p_pesticide_usage": request.p_pesticide_usage,
- "ff_farmyard_usage": request.ff_farmyard_usage,
- "af_film_usage": request.af_film_usage
- }
- }
- }
-
- # 如果有描述信息,添加到结果中
- if request.description:
- result_data["description"] = request.description
-
- return {
- "success": True,
- "message": "成功计算农业投入输入Cd通量",
- "data": result_data
- }
-
- except Exception as e:
- self.logger.error(f"使用自定义数据计算Cd通量时发生错误: {str(e)}")
- return {
- "success": False,
- "message": f"计算失败: {str(e)}",
- "data": None
- }
-
- def _validate_calculation_parameters(self, request) -> bool:
- """
- 验证计算参数的有效性
-
- @param request: 请求对象
- @returns: 验证是否通过
- """
- try:
- # 检查所有必需的参数是否存在且为非负数
- required_attrs = [
- 'f3_nitrogen_cd_content', 'f4_phosphorus_cd_content', 'f5_potassium_cd_content',
- 'f6_compound_cd_content', 'f7_organic_cd_content', 'f8_pesticide_cd_content',
- 'f9_farmyard_cd_content', 'f10_film_cd_content', 'nf_nitrogen_usage',
- 'pf_phosphorus_usage', 'kf_potassium_usage', 'cf_compound_usage',
- 'of_organic_usage', 'p_pesticide_usage', 'ff_farmyard_usage', 'af_film_usage'
- ]
-
- for attr in required_attrs:
- if not hasattr(request, attr):
- self.logger.warning(f"缺少必需参数: {attr}")
- return False
-
- value = getattr(request, attr)
- if value is None or value < 0:
- self.logger.warning(f"参数 {attr} 无效: {value}")
- return False
-
- return True
-
- except Exception as e:
- self.logger.error(f"验证参数时发生错误: {str(e)}")
- return False
-
- def calculate_cd_flux_for_visualization(self, area: str) -> Dict[str, Any]:
- """
- 专门用于可视化的农业投入Cd通量计算(参数固定使用韶关)
-
- @param area: 地区名称(仅用于地图边界,参数固定使用"韶关")
- @returns: 计算结果
- """
- try:
- with SessionLocal() as db:
- # 参数固定使用"韶关",area参数仅用于地图边界
- parameter = db.query(Parameters).filter(Parameters.area == "韶关").first()
-
- if not parameter:
- return {
- "success": False,
- "message": f"未找到韶关地区的参数数据",
- "data": None
- }
-
- # 计算农业投入输入Cd通量
- # 公式:F3*NF + F4*PF + F5*KF + F6*CF + F7*OF + F8*P + F9*FF + F10*AF
- cd_flux = (
- parameter.f3 * parameter.nf + # 氮肥
- parameter.f4 * parameter.pf + # 磷肥
- parameter.f5 * parameter.kf + # 钾肥
- parameter.f6 * parameter.cf + # 复合肥
- parameter.f7 * parameter.of + # 有机肥
- parameter.f8 * parameter.p + # 农药
- parameter.f9 * parameter.ff + # 农家肥
- parameter.f10 * parameter.af # 农膜
- )
-
- # 计算各项明细
- details = {
- "nitrogen_fertilizer": parameter.f3 * parameter.nf, # 氮肥贡献
- "phosphorus_fertilizer": parameter.f4 * parameter.pf, # 磷肥贡献
- "potassium_fertilizer": parameter.f5 * parameter.kf, # 钾肥贡献
- "compound_fertilizer": parameter.f6 * parameter.cf, # 复合肥贡献
- "organic_fertilizer": parameter.f7 * parameter.of, # 有机肥贡献
- "pesticide": parameter.f8 * parameter.p, # 农药贡献
- "farmyard_manure": parameter.f9 * parameter.ff, # 农家肥贡献
- "agricultural_film": parameter.f10 * parameter.af # 农膜贡献
- }
-
- return {
- "success": True,
- "message": f"成功计算地区 '{area}' 的农业投入输入Cd通量(使用韶关参数)",
- "data": {
- "area": area,
- "total_cd_flux": round(cd_flux, 6),
- "unit": "g/ha/a",
- "details": {key: round(value, 6) for key, value in details.items()},
- "parameters_used": {
- "f3_nitrogen_cd_content": parameter.f3,
- "f4_phosphorus_cd_content": parameter.f4,
- "f5_potassium_cd_content": parameter.f5,
- "f6_compound_cd_content": parameter.f6,
- "f7_organic_cd_content": parameter.f7,
- "f8_pesticide_cd_content": parameter.f8,
- "f9_farmyard_cd_content": parameter.f9,
- "f10_film_cd_content": parameter.f10,
- "nf_nitrogen_usage": parameter.nf,
- "pf_phosphorus_usage": parameter.pf,
- "kf_potassium_usage": parameter.kf,
- "cf_compound_usage": parameter.cf,
- "of_organic_usage": parameter.of,
- "p_pesticide_usage": parameter.p,
- "ff_farmyard_usage": parameter.ff,
- "af_film_usage": parameter.af
- }
- }
- }
-
- except Exception as e:
- self.logger.error(f"计算地区 '{area}' 的Cd通量时发生错误: {str(e)}")
- return {
- "success": False,
- "message": f"计算失败: {str(e)}",
- "data": None
- }
-
- def get_coordinates_for_results(self, results_data: Dict[str, Any], area: str) -> List[Dict[str, Any]]:
- """
- 获取农业投入计算结果对应的坐标信息
-
- @param results_data: 计算结果数据
- @param area: 地区名称
- @returns: 包含坐标的结果列表
- """
- try:
- # 农业投入计算只有一个结果值,需要与所有农田数据关联
- total_cd_flux = results_data.get("total_cd_flux", 0)
-
- with SessionLocal() as db:
- # 查询所有农田数据获取坐标
- farmland_data = db.query(FarmlandData).all()
-
- if not farmland_data:
- self.logger.warning("未找到农田坐标数据")
- return []
-
- coordinates_results = []
- for farmland in farmland_data:
- coord_result = {
- "farmland_id": farmland.farmland_id,
- "sample_id": farmland.sample_id,
- "longitude": farmland.lon,
- "latitude": farmland.lan,
- "flux_value": total_cd_flux, # 所有点使用相同的农业投入Cd通量值
- "area": area
- }
- coordinates_results.append(coord_result)
-
- self.logger.info(f"✓ 成功获取 {len(coordinates_results)} 个农田样点的坐标信息,农业投入Cd通量: {total_cd_flux}")
- return coordinates_results
-
- except Exception as e:
- self.logger.error(f"获取坐标信息失败: {str(e)}")
- raise
-
- def export_results_to_csv(self, results_data: Dict[str, Any], output_dir: str = "app/static/agricultural_input") -> str:
- """
- 将农业投入计算结果导出为CSV文件
-
- @param results_data: 计算结果数据
- @param output_dir: 输出目录
- @returns: CSV文件路径
- """
- try:
- # 确保输出目录存在
- os.makedirs(output_dir, exist_ok=True)
-
- # 生成时间戳
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 生成文件名
- area = results_data.get("area", "unknown")
- filename = f"agricultural_input_{area}_{timestamp}.csv"
- csv_path = os.path.join(output_dir, filename)
-
- # 准备导出数据
- export_data = {
- "area": results_data.get("area"),
- "total_cd_flux": results_data.get("total_cd_flux"),
- "unit": results_data.get("unit"),
- "nitrogen_fertilizer": results_data["details"]["nitrogen_fertilizer"],
- "phosphorus_fertilizer": results_data["details"]["phosphorus_fertilizer"],
- "potassium_fertilizer": results_data["details"]["potassium_fertilizer"],
- "compound_fertilizer": results_data["details"]["compound_fertilizer"],
- "organic_fertilizer": results_data["details"]["organic_fertilizer"],
- "pesticide": results_data["details"]["pesticide"],
- "farmyard_manure": results_data["details"]["farmyard_manure"],
- "agricultural_film": results_data["details"]["agricultural_film"]
- }
-
- # 转换为DataFrame
- df = pd.DataFrame([export_data])
-
- # 保存CSV文件
- df.to_csv(csv_path, index=False, encoding='utf-8-sig')
-
- self.logger.info(f"✓ 成功导出农业投入结果到: {csv_path}")
- return csv_path
-
- except Exception as e:
- self.logger.error(f"导出CSV文件失败: {str(e)}")
- raise
-
- def create_agricultural_input_visualization(self, area: str, calculation_type: str,
- results_with_coords: List[Dict[str, Any]],
- level: str = None,
- output_dir: str = "app/static/agricultural_input",
- template_raster: str = "app/static/cd_flux/meanTemp.tif",
- boundary_shp: str = None,
- colormap: str = "green_yellow_red_purple",
- resolution_factor: float = 4.0,
- enable_interpolation: bool = False,
- cleanup_intermediate: bool = True) -> Dict[str, str]:
- """
- 创建农业投入Cd通量可视化图表
-
- @param area: 地区名称
- @param calculation_type: 计算类型(agricultural_input)
- @param results_with_coords: 包含坐标的结果数据
- @param level: 行政层级
- @param output_dir: 输出目录
- @param template_raster: 模板栅格文件路径
- @param boundary_shp: 边界shapefile路径
- @param colormap: 色彩方案
- @param resolution_factor: 分辨率因子
- @param enable_interpolation: 是否启用空间插值
- @param cleanup_intermediate: 是否清理中间文件
- @returns: 生成的图片文件路径字典
- """
- try:
- if not results_with_coords:
- raise ValueError("没有包含坐标的结果数据")
-
- # 确保输出目录存在
- os.makedirs(output_dir, exist_ok=True)
- generated_files: Dict[str, str] = {}
-
- # 生成时间戳
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 创建CSV文件用于绘图
- csv_filename = f"agricultural_input_{area}_temp_{timestamp}.csv"
- csv_path = os.path.join(output_dir, csv_filename)
-
- # 准备绘图数据
- plot_data = []
- for result in results_with_coords:
- plot_data.append({
- "longitude": result["longitude"],
- "latitude": result["latitude"],
- "flux_value": result["flux_value"]
- })
-
- # 保存为CSV
- df = pd.DataFrame(plot_data)
- df.to_csv(csv_path, index=False, encoding='utf-8-sig')
-
- # 初始化绘图工具
- mapper = MappingUtils()
-
- # 生成输出文件路径
- map_output = os.path.join(output_dir, f"agricultural_input_{area}_map_{timestamp}")
- histogram_output = os.path.join(output_dir, f"agricultural_input_{area}_histogram_{timestamp}")
-
- # 检查模板文件是否存在
- if not os.path.exists(template_raster):
- self.logger.warning(f"模板栅格文件不存在: {template_raster}")
- template_raster = None
-
- # 动态获取边界数据(严格使用指定层级)
- if not level:
- raise ValueError("必须提供行政层级 level:county | city | province")
-
- # 直接从数据库获取边界GeoDataFrame
- boundary_gdf = self._get_boundary_gdf_from_database(area, level)
- boundary_shp = None # 不再需要临时边界文件
-
- if boundary_gdf is None:
- self.logger.warning(f"未找到地区 '{area}' 的边界数据,将不使用边界裁剪")
- else:
- # 在绘图前进行样点边界包含性统计
- try:
- if boundary_gdf is not None and len(boundary_gdf) > 0:
- boundary_union = boundary_gdf.unary_union
- total_points = len(results_with_coords)
- inside_count = 0
- outside_points: List[Dict[str, Any]] = []
- for r in results_with_coords:
- pt = Point(float(r["longitude"]), float(r["latitude"]))
- if boundary_union.contains(pt) or boundary_union.touches(pt):
- inside_count += 1
- else:
- outside_points.append({
- "farmland_id": r.get("farmland_id"),
- "sample_id": r.get("sample_id"),
- "longitude": r.get("longitude"),
- "latitude": r.get("latitude"),
- "flux_value": r.get("flux_value")
- })
- outside_count = total_points - inside_count
- inside_pct = (inside_count / total_points * 100.0) if total_points > 0 else 0.0
- self.logger.info(
- f"样点边界检查 - 总数: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), 边界外: {outside_count}")
- if outside_count > 0:
- sample_preview = outside_points[:10]
- self.logger.warning(
- f"存在 {outside_count} 个样点位于边界之外,绘图时将被掩膜隐藏。示例(最多10条): {sample_preview}")
- # 在日志中打印边界检查统计结果
- self.logger.info(
- f"边界检查统计 - 地区: {area}, 层级: {level}, 计算类型: {calculation_type}, "
- f"总样点: {total_points}, 边界内: {inside_count} ({inside_pct:.2f}%), "
- f"边界外: {outside_count}"
- )
- if outside_count > 0 and len(outside_points) > 0:
- sample_preview = outside_points[:5] # 只显示前5个样本
- self.logger.info(f"边界外样点示例(前5个): {sample_preview}")
- else:
- generated_files = {}
- except Exception as check_err:
- self.logger.warning(f"样点边界包含性检查失败: {str(check_err)}")
-
- # 创建shapefile
- shapefile_path = csv_path.replace('.csv', '_points.shp')
- mapper.csv_to_shapefile(csv_path, shapefile_path,
- lon_col='longitude', lat_col='latitude', value_col='flux_value')
-
- # 合并已生成文件映射
- generated_files.update({"csv": csv_path, "shapefile": shapefile_path})
-
- # 如果有模板栅格文件,创建栅格地图
- if template_raster:
- try:
- # 创建栅格
- raster_path = csv_path.replace('.csv', '_raster.tif')
- raster_path, stats = mapper.vector_to_raster(
- shapefile_path, template_raster, raster_path, 'flux_value',
- resolution_factor=resolution_factor, boundary_shp=boundary_shp, boundary_gdf=boundary_gdf,
- interpolation_method='nearest', enable_interpolation=enable_interpolation
- )
- generated_files["raster"] = raster_path
-
- # 创建栅格地图 - 使用英文标题避免中文乱码
- map_title = "Agricultural Input Cd Flux"
-
- map_file = mapper.create_raster_map(
- boundary_shp if boundary_shp else None,
- raster_path,
- map_output,
- colormap=colormap,
- title=map_title,
- output_size=12,
- dpi=300,
- resolution_factor=4.0,
- enable_interpolation=False,
- interpolation_method='nearest',
- boundary_gdf=boundary_gdf
- )
- generated_files["map"] = map_file
-
- # 创建直方图 - 使用英文标题避免中文乱码
- histogram_title = "Agricultural Input Cd Flux Distribution"
-
- histogram_file = mapper.create_histogram(
- raster_path,
- f"{histogram_output}.jpg",
- title=histogram_title,
- xlabel='Cd Flux (g/ha/a)',
- ylabel='Frequency Density'
- )
- generated_files["histogram"] = histogram_file
-
- except Exception as viz_error:
- self.logger.warning(f"栅格可视化创建失败: {str(viz_error)}")
- # 即使栅格可视化失败,也返回已生成的文件
-
- # 清理中间文件(默认开启,仅保留最终可视化)
- if cleanup_intermediate:
- try:
- self._cleanup_intermediate_files(generated_files, None)
- except Exception as cleanup_err:
- self.logger.warning(f"中间文件清理失败: {str(cleanup_err)}")
- self.logger.info(f"✓ 成功创建农业投入Cd通量可视化,生成文件: {list(generated_files.keys())}")
- return generated_files
-
- except Exception as e:
- self.logger.error(f"创建农业投入可视化失败: {str(e)}")
- raise
- def _cleanup_intermediate_files(self, generated_files: Dict[str, str], boundary_shp: Optional[str]) -> None:
- """
- 清理中间文件:CSV、Shapefile 及其配套文件、栅格TIFF;若边界为临时目录,则一并删除
- """
- import shutil
- import tempfile
- def _safe_remove(path: str) -> None:
- try:
- if path and os.path.exists(path) and os.path.isfile(path):
- os.remove(path)
- except Exception:
- pass
- # 删除 CSV
- _safe_remove(generated_files.get("csv"))
- # 删除栅格
- _safe_remove(generated_files.get("raster"))
- # 删除 Shapefile 全家桶
- shp_path = generated_files.get("shapefile")
- if shp_path:
- base, _ = os.path.splitext(shp_path)
- for ext in (".shp", ".shx", ".dbf", ".prj", ".cpg"):
- _safe_remove(base + ext)
- # 如果边界文件来自系统临时目录,删除其所在目录
- if boundary_shp:
- temp_root = tempfile.gettempdir()
- try:
- if os.path.commonprefix([os.path.abspath(boundary_shp), temp_root]) == temp_root:
- temp_dir = os.path.dirname(os.path.abspath(boundary_shp))
- if os.path.isdir(temp_dir):
- shutil.rmtree(temp_dir, ignore_errors=True)
- except Exception:
- pass
-
- def _get_boundary_gdf_from_database(self, area: str, level: str) -> Optional[gpd.GeoDataFrame]:
- """
- 直接从数据库获取边界数据作为GeoDataFrame
-
- @param area: 地区名称
- @param level: 行政层级
- @returns: 边界GeoDataFrame或None
- """
- try:
- with SessionLocal() as db:
- norm_area = area.strip()
- boundary_gdf = get_boundary_gdf_by_name(db, norm_area, level=level)
- if boundary_gdf is not None:
- self.logger.info(f"从数据库获取边界数据: {norm_area} ({level})")
- return boundary_gdf
-
- except Exception as e:
- self.logger.warning(f"从数据库获取边界数据失败: {str(e)}")
-
- return None
-
-
|