import csv import math from collections import defaultdict def read_csv(file_path): """读取CSV文件并返回数据列表""" data = [] try: with open(file_path, 'r') as file: reader = csv.DictReader(file) for row in reader: data.append(row) return data except FileNotFoundError: print(f"错误: 文件 {file_path} 未找到!") return [] except Exception as e: print(f"错误: 读取文件 {file_path} 时发生错误: {e}") return [] def calculate_distance(lat1, lon1, lat2, lon2): """计算两点之间的欧几里得距离(经纬度距离)""" # 将经纬度转换为浮点数 try: lat1 = float(lat1) lon1 = float(lon1) lat2 = float(lat2) lon2 = float(lon2) except ValueError: print("错误: 经纬度转换为浮点数失败") return float('inf') # 简单的欧几里得距离计算,适用于小范围区域 # 如需更精确的距离计算(考虑地球曲率),可使用Haversine公式 dx = lat2 - lat1 dy = lon2 - lon1 return math.sqrt(dx*dx + dy*dy) def find_nearest_sample(farmland_point, sample_points, farmland_id=None): """为给定的耕地中心点找到最近的采样点并返回其cd值""" min_distance = float('inf') nearest_cd = None farm_lat = farmland_point.get('中心点经度') farm_lon = farmland_point.get('中心点纬度') farm_style = farmland_point.get('DLMC') if not farm_lat or not farm_lon: print(f"警告: 耕地 ID {farmland_id} 缺少经纬度信息") return None for sample in sample_points: sample_lat = sample.get('经度') sample_lon = sample.get('纬度') sample_cd = sample.get('Cd (ug/L)') if not sample_lat or not sample_lon or not sample_cd: continue distance = calculate_distance(farm_lat, farm_lon, sample_lat, sample_lon) if distance < min_distance: min_distance = distance nearest_cd = sample_cd nearest_cd = float(nearest_cd) A = 711 * 0.524 B = 427 * 0.599 C = 200 * 0.7 if farm_style == "水浇地": nearest_cd = B*nearest_cd elif farm_style == "水田": nearest_cd = A*nearest_cd else: nearest_cd = C*nearest_cd return nearest_cd def merge_data(farmland_data, sample_data): """合并耕地数据和最近的采样点cd数据""" merged_data = [] unmatched_count = 0 for farmland in farmland_data: farmland_id = farmland.get('id', '未知ID') nearest_cd = find_nearest_sample(farmland, sample_data, farmland_id) # 创建新的合并记录,只包含中心点坐标和cd值 merged_record = { 'lon': farmland.get('中心点经度'), 'lan': farmland.get('中心点纬度'), 'Prediction': nearest_cd } if nearest_cd is None: unmatched_count += 1 merged_data.append(merged_record) if unmatched_count > 0: print(f"警告: {unmatched_count} 个耕地没有找到匹配的采样点cd值") return merged_data def write_csv(data, output_file): """将数据写入CSV文件并指定字段顺序""" if not data: print("错误: 没有数据可写入") return # 显式指定字段顺序 fieldnames = ['lon', 'lan', 'Prediction'] try: with open(output_file, 'w', newline='', encoding='utf-8') as file: writer = csv.DictWriter(file, fieldnames=fieldnames) writer.writeheader() writer.writerows(data) print(f"成功将数据写入 {output_file},字段顺序: {', '.join(fieldnames)}") except Exception as e: print(f"错误: 写入文件 {output_file} 时发生错误: {e}") def main(): # 配置文件路径 farmland_file = r'D:\17417\Documents\backend\Water\Data\四县三用地&水浇地.csv' # 耕地中心点数据文件 sample_file = r'D:\17417\Documents\backend\Water\Data\SamplingPoint.csv' # 采样点数据文件 output_file = r'D:\17417\Documents\backend\Water\Data\matched_data.csv' # 输出文件 # 读取数据 print("正在读取耕地数据...") farmland_data = read_csv(farmland_file) print(f"已读取 {len(farmland_data)} 条耕地记录") print("正在读取采样点数据...") sample_data = read_csv(sample_file) print(f"已读取 {len(sample_data)} 条采样点记录") if not farmland_data or not sample_data: print("错误: 缺少必要的数据") return # 合并数据 print("正在匹配最近的采样点...") merged_data = merge_data(farmland_data, sample_data) # 写入结果 print("正在写入结果...") write_csv(merged_data, output_file) if __name__ == "__main__": main()