123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- """
- 数据匹配接口
- @description: 提供耕地数据和采样点数据的匹配与合并功能
- """
- import os
- import shutil
- import tempfile
- import zipfile
- from typing import Any
- from fastapi import APIRouter, Form, HTTPException, UploadFile, File, BackgroundTasks
- from fastapi.responses import FileResponse
- import logging
- import geopandas as gpd
- import rasterio
- from rasterio.mask import mask
- import numpy as np
- import matplotlib.pyplot as plt
- from matplotlib.colors import ListedColormap, BoundaryNorm
- import seaborn as sns
- import json
- import rasterio.plot
- from Water.Python_codes import point_match, Figure_raster_mapping, Transfer_csv_to_geotif, Into_Geo
- router = APIRouter()
- # 配置基础路径
- BASE_DATA_DIR = "Water/Raster/" # 根据实际情况修改为您的数据目录
- SHP_DIR = os.path.join(BASE_DATA_DIR)
- TIF_DIR = os.path.join(BASE_DATA_DIR)
- # 设置日志
- logger = logging.getLogger(__name__)
- async def save_shapefile_zip(upload_file: UploadFile) -> str:
- """保存并解压 Shapefile ZIP 文件到临时目录"""
- try:
- # 创建临时目录
- temp_dir = tempfile.mkdtemp()
- # 保存 ZIP 文件
- zip_path = os.path.join(temp_dir, "shapefile.zip")
- with open(zip_path, "wb") as f:
- content = await upload_file.read()
- f.write(content)
- # 解压 ZIP 文件
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
- # 删除 ZIP 文件
- os.remove(zip_path)
- return temp_dir
- except Exception as e:
- logger.error(f"Shapefile ZIP 处理失败: {str(e)}")
- raise RuntimeError(f"Shapefile ZIP 处理失败: {str(e)}")
- async def save_upload_file(upload_file: UploadFile) -> str:
- """保存上传文件到临时位置(通用文件)"""
- try:
- suffix = os.path.splitext(upload_file.filename)[1]
- with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
- content = await upload_file.read()
- tmp.write(content)
- return tmp.name
- except Exception as e:
- logger.error(f"文件保存失败: {str(e)}")
- raise RuntimeError(f"文件保存失败: {str(e)}")
- def cleanup_temp_path(path: str) -> None:
- """删除临时文件或整个临时目录"""
- try:
- if os.path.exists(path):
- if os.path.isfile(path):
- os.unlink(path)
- elif os.path.isdir(path):
- shutil.rmtree(path)
- except Exception as e:
- logger.warning(f"清理临时路径失败: {path}, 错误: {e}")
- def find_shapefile_in_directory(directory: str) -> str:
- """在目录中查找 .shp 文件"""
- for file in os.listdir(directory):
- if file.endswith('.shp'):
- return os.path.join(directory, file)
- return None
- @router.post("/point-match",
- summary="合并耕地数据和采样点数据",
- description="根据上传的耕地数据文件和采样点数据文件,合并数据并返回合并后的CSV文件")
- async def point_match(
- farmland_file: UploadFile = File(..., description="耕地中心点数据文件,CSV格式"),
- sample_file: UploadFile = File(..., description="采样点数据文件,CSV格式")
- ):
- """
- 合并耕地数据和采样点数据
- @param farmland_file: 耕地中心点数据文件
- @param sample_file: 采样点数据文件
- @returns {FileResponse} 合并后的数据文件
- """
- try:
- logger.info("开始合并耕地数据和采样点数据")
- # 验证文件格式
- if not farmland_file.filename.endswith('.csv') or not sample_file.filename.endswith('.csv'):
- raise HTTPException(status_code=400, detail="仅支持CSV格式文件")
- # 读取CSV数据
- farmland_content = await farmland_file.read()
- sample_content = await sample_file.read()
- farmland_data = point_match.read_csv(farmland_content)
- sample_data = point_match.read_csv(sample_content)
- if not farmland_data or not sample_data:
- raise HTTPException(status_code=400, detail="错误: 缺少必要的数据")
- # 合并数据
- merged_data = point_match.merge_data(farmland_data, sample_data)
- # 保存临时数据文件
- temp_file_path = "temp_matched_data.csv"
- output_file = point_match.write_csv(merged_data, temp_file_path)
- return FileResponse(
- path=output_file,
- filename="matched_data.csv",
- media_type="text/csv"
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"合并耕地数据和采样点数据失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"合并耕地数据和采样点数据失败: {str(e)}"
- )
- # =============================================================================
- # 一键生成并获取地图接口
- # =============================================================================
- # 设定一些常用的colormap
- colormap1 = ['#FFFECE', '#FFF085', '#FEBA17', '#BE3D2A', '#74512D', '#4E1F00'] # 黄-橙-棕
- colormap2 = ['#F6F8D5', '#98D2C0', '#4F959D', '#205781', '#143D60', '#2A3335'] # 蓝色系
- colormap3 = ['#FFEFC8', '#F8ED8C', '#D3E671', '#89AC46', '#5F8B4C', '#355F2E'] # 淡黄-草绿
- colormap4 = ['#F0F1C5', '#BBD8A3', '#6F826A', '#BF9264', '#735557', '#604652'] # 绿色-棕色
- colormap5 = ['#FCFAEE', '#FBF3B9', '#FFDCCC', '#FDB7EA', '#B7B1F2', '#8D77AB'] # 黄-粉-紫
- colormap6 = ['#15B392', '#73EC8B', '#FFEB55', '#EE66A6', '#D91656', '#640D5F'] # 绿-黄-红-紫
- # 对应 Figure_raster_mapping.py 的 mapping_raster 函数的接口
- # 对应 Figure_raster_mapping.py 的 mapping_raster 函数的接口
- @router.post("/generate-raster-map",
- summary="生成栅格地图",
- description="根据矢量数据(ZIP格式)、栅格数据和颜色方案生成栅格地图并返回图片文件")
- async def generate_raster_map(
- background_tasks: BackgroundTasks,
- shp_base_name: str = Form(..., description="矢量数据ZIP文件基本名称,如:'lechang'"),
- tif_type: str = Form(..., description="栅格数据类型,如:'水田'、'旱地'或'水浇地'"),
- color_map_name: str = Form(..., description="使用的色彩方案,如colormap1"),
- title_name: str = Form(..., description="输出数据的图的名称"),
- output_size: int = Form(..., description="输出图片的尺寸")
- ):
- try:
- logger.info("开始生成栅格地图")
- # 拼接完整文件路径
- shp_zip = os.path.join(SHP_DIR, f"{shp_base_name}.zip")
- tif_file = os.path.join(TIF_DIR, f"{tif_type}.tif")
- logger.info(f"shp_zip路径: {shp_zip}")
- logger.info(f"tif_file路径: {tif_file}")
- # 获取颜色方案
- color_map = {
- "colormap1": colormap1,
- "colormap2": colormap2,
- "colormap3": colormap3,
- "colormap4": colormap4,
- "colormap5": colormap5,
- "colormap6": colormap6
- }.get(color_map_name)
- if not color_map:
- raise HTTPException(status_code=400, detail="无效的颜色方案")
- # 检查文件路径是否存在
- if not os.path.exists(shp_zip):
- raise HTTPException(status_code=400, detail=f"shp_zip文件不存在: {shp_zip}")
- if not os.path.exists(tif_file):
- raise HTTPException(status_code=400, detail=f"tif_file文件不存在: {tif_file}")
- # 创建临时目录用于处理文件
- temp_dir = tempfile.mkdtemp()
- background_tasks.add_task(cleanup_temp_path, temp_dir)
- # 解压 Shapefile ZIP 文件
- shp_dir = os.path.join(temp_dir, "shp")
- os.makedirs(shp_dir, exist_ok=True)
- try:
- with zipfile.ZipFile(shp_zip, 'r') as zip_ref:
- zip_ref.extractall(shp_dir)
- except Exception as e:
- logger.error(f"Shapefile ZIP 解压失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"Shapefile ZIP 解压失败: {str(e)}"
- )
- # 在解压目录中查找 .shp 文件
- shp_path = find_shapefile_in_directory(shp_dir)
- if not shp_path:
- raise HTTPException(status_code=400, detail="未找到 .shp 文件")
- # 创建输出目录
- output_dir = os.path.join(temp_dir, "output")
- os.makedirs(output_dir, exist_ok=True)
- output_path = os.path.join(output_dir, "raster_map.jpg")
- # 生成栅格地图
- Figure_raster_mapping.mapping_raster(shp_path, tif_file, color_map, title_name, output_path, output_size)
- return FileResponse(
- path=output_path,
- filename="raster_map.jpg",
- media_type="image/jpeg"
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"生成栅格地图失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"生成栅格地图失败: {str(e)}"
- )
- # 对应 Transfer_csv_to_geotif.py 的 csv_to_shapefile 和 vector_to_raster 函数的接口
- @router.post("/csv-to-geotif",
- summary="将CSV文件转换为GeoTIFF文件",
- description="根据CSV文件和模板GeoTIFF文件生成新的GeoTIFF文件并返回")
- async def csv_to_geotif(
- csv_file: UploadFile = File(..., description="CSV文件,第一列为经度,第二列为纬度,第三列为数值"),
- template_tif_file: UploadFile = File(..., description="用作模板的GeoTIFF文件"),
- field: str = Form(..., description="用于栅格化的属性字段名")
- ):
- try:
- logger.info("开始将CSV文件转换为GeoTIFF文件")
- # 保存上传的文件到临时目录
- csv_path = f"temp_{csv_file.filename}"
- template_tif_path = f"temp_{template_tif_file.filename}"
- with open(csv_path, "wb") as f:
- f.write(await csv_file.read())
- with open(template_tif_path, "wb") as f:
- f.write(await template_tif_file.read())
- shapefile_output = "temp_shapefile.shp"
- Transfer_csv_to_geotif.csv_to_shapefile(csv_path, shapefile_output, os.getcwd())
- output_tif = "output_geotif.tif"
- Transfer_csv_to_geotif.vector_to_raster(shapefile_output, template_tif_path, output_tif, field)
- # 删除临时文件
- os.remove(csv_path)
- os.remove(template_tif_path)
- os.remove(shapefile_output)
- return FileResponse(
- path=output_tif,
- filename="output_geotif.tif",
- media_type="image/tiff"
- )
- except Exception as e:
- logger.error(f"将CSV文件转换为GeoTIFF文件失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"将CSV文件转换为GeoTIFF文件失败: {str(e)}"
- )
- # 对应 Figure_raster_mapping.py 的 plot_tif_histogram 函数的接口
- @router.post("/generate-tif-histogram",
- summary="生成GeoTIFF文件的直方图",
- description="根据GeoTIFF文件生成其波段值分布的直方图并返回图片文件")
- async def generate_tif_histogram(
- tif_type: str = Form(..., description="栅格数据类型,如:'水田'、'旱地'或'水浇地'"),
- figsize_width: int = Form(10, description="图像宽度,默认10"),
- figsize_height: int = Form(6, description="图像高度,默认6"),
- xlabel: str = Form('像元值', description="横坐标标签,默认'像元值'"),
- ylabel: str = Form('频率密度', description="纵坐标标签,默认'频率密度'"),
- title: str = Form('GeoTIFF 波段值分布图', description="图标题,默认'GeoTIFF 波段值分布图'")
- ):
- try:
- logger.info("开始生成GeoTIFF文件的直方图")
- # 拼接完整文件路径
- tif_file = os.path.join(TIF_DIR, f"{tif_type}.tif")
- logger.info(f"tif_file路径: {tif_file}")
- # 检查文件路径是否存在
- if not os.path.exists(tif_file):
- raise HTTPException(status_code=400, detail=f"tif_file文件不存在: {tif_file}")
- # 创建临时目录
- temp_dir = tempfile.mkdtemp()
- output_path = os.path.join(temp_dir, "histogram.jpg")
- # 生成直方图
- Figure_raster_mapping.plot_tif_histogram(
- tif_file,
- figsize=(figsize_width, figsize_height),
- xlabel=xlabel,
- ylabel=ylabel,
- title=title,
- save_path=output_path
- )
- return FileResponse(
- path=output_path,
- filename="tif_histogram.jpg",
- media_type="image/jpeg"
- )
- except Exception as e:
- logger.error(f"生成GeoTIFF文件的直方图失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"生成GeoTIFF文件的直方图失败: {str(e)}"
- )
- # =============================================================================
- # TIFF 转 GeoJSON 接口
- # =============================================================================
- @router.post("/tif-to-geojson",
- summary="将 TIFF 文件转换为 GeoJSON 文件",
- description="根据上传的 TIFF 文件和指定的输出文件名生成 GeoJSON 文件并返回")
- async def tif_to_geojson_api(
- output_filename: str = Form(..., description="输出的 GeoJSON 文件名,如:output.geojson"),
- input_file: UploadFile = File(..., description="TIFF 格式的栅格数据文件"),
- band: int = Form(1, description="要读取的波段,默认为 1"),
- mask_value: int = Form(None, description="可选:要忽略的值,如背景值,默认为 None")
- ):
- """
- 将 TIFF 文件转换为 GeoJSON 文件
- @param output_filename: 输出的 GeoJSON 文件名
- @param input_file: TIFF 数据文件
- @param band: 要读取的波段,默认为 1
- @param mask_value: 可选:要忽略的值,如背景值,默认为 None
- @returns {FileResponse} 转换后的 GeoJSON 文件
- """
- try:
- logger.info(f"开始将 {input_file.filename} 转换为 {output_filename}")
- # 验证文件格式
- if not input_file.filename.endswith('.tif'):
- raise HTTPException(status_code=400, detail="仅支持 TIFF 格式文件")
- # 保存临时 TIFF 文件
- temp_tif_path = f"temp_{input_file.filename}"
- with open(temp_tif_path, "wb") as f:
- f.write(await input_file.read())
- # 调用 Into_Geo.py 中的函数进行转换
- Into_Geo.tif_to_geojson(temp_tif_path, output_filename, band=band, mask_value=mask_value)
- # 删除临时 TIFF 文件
- os.remove(temp_tif_path)
- if not os.path.exists(output_filename):
- raise HTTPException(status_code=500, detail="GeoJSON 文件生成失败")
- return FileResponse(
- path=output_filename,
- filename=output_filename,
- media_type="application/geo+json"
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"将 {input_file.filename} 转换为 {output_filename} 失败: {str(e)}")
- raise HTTPException(
- status_code=500,
- detail=f"将 {input_file.filename} 转换为 {output_filename} 失败: {str(e)}"
- )
|