""" 数据匹配接口 @description: 提供耕地数据和采样点数据的匹配与合并功能 """ import json from fastapi import APIRouter, Form, HTTPException, UploadFile, File from fastapi.responses import FileResponse from typing import Dict, Any import os import logging import io import csv import math from collections import defaultdict import geopandas as gpd from flask_migrate import show from matplotlib import pyplot as plt from matplotlib.colors import BoundaryNorm, ListedColormap import numpy as np import pandas as pd import rasterio from ...Water.Python_codes import point_match, Figure_raster_mapping, Transfer_csv_to_geotif, Into_Geo router = APIRouter() # 设置日志 logger = logging.getLogger(__name__) @router.post("/water/point-match", summary="合并耕地数据和采样点数据", description="根据上传的耕地数据文件和采样点数据文件,合并数据并返回合并后的CSV文件") async def point_match( farmland_file: UploadFile = File(..., description="耕地中心点数据文件,CSV格式"), sample_file: UploadFile = File(..., description="采样点数据文件,CSV格式") ): """ 合并耕地数据和采样点数据 @param farmland_file: 耕地中心点数据文件 @param sample_file: 采样点数据文件 @returns {FileResponse} 合并后的数据文件 """ try: logger.info("开始合并耕地数据和采样点数据") # 验证文件格式 if not farmland_file.filename.endswith('.csv') or not sample_file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="仅支持CSV格式文件") # 读取CSV数据 farmland_content = await farmland_file.read() sample_content = await sample_file.read() farmland_data = point_match.read_csv(farmland_content) sample_data = point_match.read_csv(sample_content) if not farmland_data or not sample_data: raise HTTPException(status_code=400, detail="错误: 缺少必要的数据") # 合并数据 merged_data = point_match.merge_data(farmland_data, sample_data) # 保存临时数据文件 temp_file_path = "temp_matched_data.csv" output_file = point_match.write_csv(merged_data, temp_file_path) return FileResponse( path=output_file, filename="matched_data.csv", media_type="text/csv" ) except HTTPException: raise except Exception as e: logger.error(f"合并耕地数据和采样点数据失败: {str(e)}") raise HTTPException( status_code=500, detail=f"合并耕地数据和采样点数据失败: {str(e)}" ) # ============================================================================= # 一键生成并获取地图接口 # ============================================================================= # 设定一些常用的colormap colormap1 = ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'] # 黄-橙-棕 colormap2 = ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'] # 蓝色系 colormap3 = ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'] # 淡黄-草绿 colormap4 = ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'] # 绿色-棕色 colormap5 = ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'] # 黄-粉-紫 colormap6 = ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F'] # 绿-黄-红-紫 # 对应 Figure_raster_mapping.py 的 mapping_raster 函数的接口 @router.post("/generate-raster-map", summary="生成栅格地图", description="根据矢量数据、栅格数据和颜色方案生成栅格地图并返回图片文件") async def generate_raster_map( shp_file: UploadFile = File(..., description="矢量数据文件,如.shp格式"), tif_file: UploadFile = File(..., description="栅格数据文件,如.tif格式"), color_map_name: str = Form(..., description="使用的色彩方案,如colormap1"), title_name: str = Form(..., description="输出数据的图的名称"), output_size: int = Form(..., description="输出图片的尺寸") ): try: logger.info("开始生成栅格地图") # 保存上传的文件到临时目录 shp_path = f"temp_{shp_file.filename}" tif_path = f"temp_{tif_file.filename}" with open(shp_path, "wb") as f: f.write(await shp_file.read()) with open(tif_path, "wb") as f: f.write(await tif_file.read()) # 获取颜色方案 color_map = { "colormap1": colormap1, "colormap2": colormap2, "colormap3": colormap3, "colormap4": colormap4, "colormap5": colormap5, "colormap6": colormap6 }.get(color_map_name) if color_map is None: raise HTTPException(status_code=400, detail="无效的颜色方案") output_path = "output_raster_map" Figure_raster_mapping.mapping_raster(shp_path, tif_path, color_map, title_name, output_path, output_size) # 删除临时文件 os.remove(shp_path) os.remove(tif_path) output_image_path = f"{output_path}.jpg" return FileResponse( path=output_image_path, filename="raster_map.jpg", media_type="image/jpeg" ) except Exception as e: logger.error(f"生成栅格地图失败: {str(e)}") raise HTTPException( status_code=500, detail=f"生成栅格地图失败: {str(e)}" ) # 对应 Transfer_csv_to_geotif.py 的 csv_to_shapefile 和 vector_to_raster 函数的接口 @router.post("/csv-to-geotif", summary="将CSV文件转换为GeoTIFF文件", description="根据CSV文件和模板GeoTIFF文件生成新的GeoTIFF文件并返回") async def csv_to_geotif( csv_file: UploadFile = File(..., description="CSV文件,第一列为经度,第二列为纬度,第三列为数值"), template_tif_file: UploadFile = File(..., description="用作模板的GeoTIFF文件"), field: str = Form(..., description="用于栅格化的属性字段名") ): try: logger.info("开始将CSV文件转换为GeoTIFF文件") # 保存上传的文件到临时目录 csv_path = f"temp_{csv_file.filename}" template_tif_path = f"temp_{template_tif_file.filename}" with open(csv_path, "wb") as f: f.write(await csv_file.read()) with open(template_tif_path, "wb") as f: f.write(await template_tif_file.read()) shapefile_output = "temp_shapefile.shp" Transfer_csv_to_geotif.csv_to_shapefile(csv_path, shapefile_output, os.getcwd()) output_tif = "output_geotif.tif" Transfer_csv_to_geotif.vector_to_raster(shapefile_output, template_tif_path, output_tif, field) # 删除临时文件 os.remove(csv_path) os.remove(template_tif_path) os.remove(shapefile_output) return FileResponse( path=output_tif, filename="output_geotif.tif", media_type="image/tiff" ) except Exception as e: logger.error(f"将CSV文件转换为GeoTIFF文件失败: {str(e)}") raise HTTPException( status_code=500, detail=f"将CSV文件转换为GeoTIFF文件失败: {str(e)}" ) # 对应 Figure_raster_mapping.py 的 plot_tif_histogram 函数的接口 @router.post("/generate-tif-histogram", summary="生成GeoTIFF文件的直方图", description="根据GeoTIFF文件生成其波段值分布的直方图并返回图片文件") async def generate_tif_histogram( tif_file: UploadFile = File(..., description="GeoTIFF文件"), figsize_width: int = Form(10, description="图像宽度,默认10"), figsize_height: int = Form(6, description="图像高度,默认6"), xlabel: str = Form('像元值', description="横坐标标签,默认'像元值'"), ylabel: str = Form('频率密度', description="纵坐标标签,默认'频率密度'"), title: str = Form('GeoTIFF 波段值分布图', description="图标题,默认'GeoTIFF 波段值分布图'") ): try: logger.info("开始生成GeoTIFF文件的直方图") # 保存上传的文件到临时目录 tif_path = f"temp_{tif_file.filename}" with open(tif_path, "wb") as f: f.write(await tif_file.read()) save_path = "output_tif_histogram.jpg" Figure_raster_mapping.plot_tif_histogram(tif_path, figsize=(figsize_width, figsize_height), xlabel=xlabel, ylabel=ylabel, title=title, save_path=save_path) # 删除临时文件 os.remove(tif_path) return FileResponse( path=save_path, filename="tif_histogram.jpg", media_type="image/jpeg" ) except Exception as e: logger.error(f"生成GeoTIFF文件的直方图失败: {str(e)}") raise HTTPException( status_code=500, detail=f"生成GeoTIFF文件的直方图失败: {str(e)}" ) # ============================================================================= # TIFF 转 GeoJSON 接口 # ============================================================================= @router.post("/tif-to-geojson", summary="将 TIFF 文件转换为 GeoJSON 文件", description="根据上传的 TIFF 文件和指定的输出文件名生成 GeoJSON 文件并返回") async def tif_to_geojson_api( output_filename: str = Form(..., description="输出的 GeoJSON 文件名,如:output.geojson"), input_file: UploadFile = File(..., description="TIFF 格式的栅格数据文件"), band: int = Form(1, description="要读取的波段,默认为 1"), mask_value: int = Form(None, description="可选:要忽略的值,如背景值,默认为 None") ): """ 将 TIFF 文件转换为 GeoJSON 文件 @param output_filename: 输出的 GeoJSON 文件名 @param input_file: TIFF 数据文件 @param band: 要读取的波段,默认为 1 @param mask_value: 可选:要忽略的值,如背景值,默认为 None @returns {FileResponse} 转换后的 GeoJSON 文件 """ try: logger.info(f"开始将 {input_file.filename} 转换为 {output_filename}") # 验证文件格式 if not input_file.filename.endswith('.tif'): raise HTTPException(status_code=400, detail="仅支持 TIFF 格式文件") # 保存临时 TIFF 文件 temp_tif_path = f"temp_{input_file.filename}" with open(temp_tif_path, "wb") as f: f.write(await input_file.read()) # 调用 Into_Geo.py 中的函数进行转换 Into_Geo.tif_to_geojson(temp_tif_path, output_filename, band=band, mask_value=mask_value) # 删除临时 TIFF 文件 os.remove(temp_tif_path) if not os.path.exists(output_filename): raise HTTPException(status_code=500, detail="GeoJSON 文件生成失败") return FileResponse( path=output_filename, filename=output_filename, media_type="application/geo+json" ) except HTTPException: raise except Exception as e: logger.error(f"将 {input_file.filename} 转换为 {output_filename} 失败: {str(e)}") raise HTTPException( status_code=500, detail=f"将 {input_file.filename} 转换为 {output_filename} 失败: {str(e)}" )