123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import csv
- import math
- from collections import defaultdict
- def read_csv(file_path):
- """读取CSV文件并返回数据列表"""
- data = []
- try:
- with open(file_path, 'r') as file:
- reader = csv.DictReader(file)
- for row in reader:
- data.append(row)
- return data
- except FileNotFoundError:
- print(f"错误: 文件 {file_path} 未找到!")
- return []
- except Exception as e:
- print(f"错误: 读取文件 {file_path} 时发生错误: {e}")
- return []
- def calculate_distance(lat1, lon1, lat2, lon2):
- """计算两点之间的欧几里得距离(经纬度距离)"""
- # 将经纬度转换为浮点数
- try:
- lat1 = float(lat1)
- lon1 = float(lon1)
- lat2 = float(lat2)
- lon2 = float(lon2)
- except ValueError:
- print("错误: 经纬度转换为浮点数失败")
- return float('inf')
-
- # 简单的欧几里得距离计算,适用于小范围区域
- # 如需更精确的距离计算(考虑地球曲率),可使用Haversine公式
- dx = lat2 - lat1
- dy = lon2 - lon1
- return math.sqrt(dx*dx + dy*dy)
- def find_nearest_sample(farmland_point, sample_points, farmland_id=None):
- """为给定的耕地中心点找到最近的采样点并返回其cd值"""
- min_distance = float('inf')
- nearest_cd = None
-
- farm_lat = farmland_point.get('中心点经度')
- farm_lon = farmland_point.get('中心点纬度')
- farm_style = farmland_point.get('DLMC')
-
- if not farm_lat or not farm_lon:
- print(f"警告: 耕地 ID {farmland_id} 缺少经纬度信息")
- return None
-
- for sample in sample_points:
- sample_lat = sample.get('经度')
- sample_lon = sample.get('纬度')
- sample_cd = sample.get('Cd (ug/L)')
-
- if not sample_lat or not sample_lon or not sample_cd:
- continue
-
- distance = calculate_distance(farm_lat, farm_lon, sample_lat, sample_lon)
- if distance < min_distance:
- min_distance = distance
- nearest_cd = sample_cd
- nearest_cd = float(nearest_cd)
- A = 711 * 0.524
- B = 427 * 0.599
- C = 200 * 0.7
- if farm_style == "水浇地":
- nearest_cd = B*nearest_cd
- elif farm_style == "水田":
- nearest_cd = A*nearest_cd
- else:
- nearest_cd = C*nearest_cd
- return nearest_cd
- def merge_data(farmland_data, sample_data):
- """合并耕地数据和最近的采样点cd数据"""
- merged_data = []
- unmatched_count = 0
-
- for farmland in farmland_data:
- farmland_id = farmland.get('id', '未知ID')
- nearest_cd = find_nearest_sample(farmland, sample_data, farmland_id)
-
- # 创建新的合并记录,只包含中心点坐标和cd值
- merged_record = {
- 'lon': farmland.get('中心点经度'),
- 'lan': farmland.get('中心点纬度'),
- 'Prediction': nearest_cd
- }
-
- if nearest_cd is None:
- unmatched_count += 1
-
- merged_data.append(merged_record)
-
- if unmatched_count > 0:
- print(f"警告: {unmatched_count} 个耕地没有找到匹配的采样点cd值")
-
- return merged_data
- def write_csv(data, output_file):
- """将数据写入CSV文件并指定字段顺序"""
- if not data:
- print("错误: 没有数据可写入")
- return
-
- # 显式指定字段顺序
- fieldnames = ['lon', 'lan', 'Prediction']
-
- try:
- with open(output_file, 'w', newline='', encoding='utf-8') as file:
- writer = csv.DictWriter(file, fieldnames=fieldnames)
- writer.writeheader()
- writer.writerows(data)
- print(f"成功将数据写入 {output_file},字段顺序: {', '.join(fieldnames)}")
- except Exception as e:
- print(f"错误: 写入文件 {output_file} 时发生错误: {e}")
- def main():
- # 配置文件路径
- farmland_file = r'D:\17417\Documents\backend\Water\Data\四县三用地&水浇地.csv' # 耕地中心点数据文件
- sample_file = r'D:\17417\Documents\backend\Water\Data\SamplingPoint.csv' # 采样点数据文件
- output_file = r'D:\17417\Documents\backend\Water\Data\matched_data.csv' # 输出文件
-
- # 读取数据
- print("正在读取耕地数据...")
- farmland_data = read_csv(farmland_file)
- print(f"已读取 {len(farmland_data)} 条耕地记录")
-
- print("正在读取采样点数据...")
- sample_data = read_csv(sample_file)
- print(f"已读取 {len(sample_data)} 条采样点记录")
-
- if not farmland_data or not sample_data:
- print("错误: 缺少必要的数据")
- return
-
- # 合并数据
- print("正在匹配最近的采样点...")
- merged_data = merge_data(farmland_data, sample_data)
-
- # 写入结果
- print("正在写入结果...")
- write_csv(merged_data, output_file)
- if __name__ == "__main__":
- main()
|