point_match.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import csv
  2. import math
  3. from collections import defaultdict
  4. def read_csv(file_path):
  5. """读取CSV文件并返回数据列表"""
  6. data = []
  7. try:
  8. with open(file_path, 'r') as file:
  9. reader = csv.DictReader(file)
  10. for row in reader:
  11. data.append(row)
  12. return data
  13. except FileNotFoundError:
  14. print(f"错误: 文件 {file_path} 未找到!")
  15. return []
  16. except Exception as e:
  17. print(f"错误: 读取文件 {file_path} 时发生错误: {e}")
  18. return []
  19. def calculate_distance(lat1, lon1, lat2, lon2):
  20. """计算两点之间的欧几里得距离(经纬度距离)"""
  21. # 将经纬度转换为浮点数
  22. try:
  23. lat1 = float(lat1)
  24. lon1 = float(lon1)
  25. lat2 = float(lat2)
  26. lon2 = float(lon2)
  27. except ValueError:
  28. print("错误: 经纬度转换为浮点数失败")
  29. return float('inf')
  30. # 简单的欧几里得距离计算,适用于小范围区域
  31. # 如需更精确的距离计算(考虑地球曲率),可使用Haversine公式
  32. dx = lat2 - lat1
  33. dy = lon2 - lon1
  34. return math.sqrt(dx*dx + dy*dy)
  35. def find_nearest_sample(farmland_point, sample_points, farmland_id=None):
  36. """为给定的耕地中心点找到最近的采样点并返回其cd值"""
  37. min_distance = float('inf')
  38. nearest_cd = None
  39. farm_lat = farmland_point.get('中心点经度')
  40. farm_lon = farmland_point.get('中心点纬度')
  41. farm_style = farmland_point.get('DLMC')
  42. if not farm_lat or not farm_lon:
  43. print(f"警告: 耕地 ID {farmland_id} 缺少经纬度信息")
  44. return None
  45. for sample in sample_points:
  46. sample_lat = sample.get('经度')
  47. sample_lon = sample.get('纬度')
  48. sample_cd = sample.get('Cd (ug/L)')
  49. if not sample_lat or not sample_lon or not sample_cd:
  50. continue
  51. distance = calculate_distance(farm_lat, farm_lon, sample_lat, sample_lon)
  52. if distance < min_distance:
  53. min_distance = distance
  54. nearest_cd = sample_cd
  55. nearest_cd = float(nearest_cd)
  56. A = 711 * 0.524
  57. B = 427 * 0.599
  58. C = 200 * 0.7
  59. if farm_style == "水浇地":
  60. nearest_cd = B*nearest_cd
  61. elif farm_style == "水田":
  62. nearest_cd = A*nearest_cd
  63. else:
  64. nearest_cd = C*nearest_cd
  65. return nearest_cd
  66. def merge_data(farmland_data, sample_data):
  67. """合并耕地数据和最近的采样点cd数据"""
  68. merged_data = []
  69. unmatched_count = 0
  70. for farmland in farmland_data:
  71. farmland_id = farmland.get('id', '未知ID')
  72. nearest_cd = find_nearest_sample(farmland, sample_data, farmland_id)
  73. # 创建新的合并记录,只包含中心点坐标和cd值
  74. merged_record = {
  75. 'lon': farmland.get('中心点经度'),
  76. 'lan': farmland.get('中心点纬度'),
  77. 'Prediction': nearest_cd
  78. }
  79. if nearest_cd is None:
  80. unmatched_count += 1
  81. merged_data.append(merged_record)
  82. if unmatched_count > 0:
  83. print(f"警告: {unmatched_count} 个耕地没有找到匹配的采样点cd值")
  84. return merged_data
  85. def write_csv(data, output_file):
  86. """将数据写入CSV文件并指定字段顺序"""
  87. if not data:
  88. print("错误: 没有数据可写入")
  89. return
  90. # 显式指定字段顺序
  91. fieldnames = ['lon', 'lan', 'Prediction']
  92. try:
  93. with open(output_file, 'w', newline='', encoding='utf-8') as file:
  94. writer = csv.DictWriter(file, fieldnames=fieldnames)
  95. writer.writeheader()
  96. writer.writerows(data)
  97. print(f"成功将数据写入 {output_file},字段顺序: {', '.join(fieldnames)}")
  98. except Exception as e:
  99. print(f"错误: 写入文件 {output_file} 时发生错误: {e}")
  100. def main():
  101. # 配置文件路径
  102. farmland_file = r'D:\17417\Documents\backend\Water\Data\四县三用地&水浇地.csv' # 耕地中心点数据文件
  103. sample_file = r'D:\17417\Documents\backend\Water\Data\SamplingPoint.csv' # 采样点数据文件
  104. output_file = r'D:\17417\Documents\backend\Water\Data\matched_data.csv' # 输出文件
  105. # 读取数据
  106. print("正在读取耕地数据...")
  107. farmland_data = read_csv(farmland_file)
  108. print(f"已读取 {len(farmland_data)} 条耕地记录")
  109. print("正在读取采样点数据...")
  110. sample_data = read_csv(sample_file)
  111. print(f"已读取 {len(sample_data)} 条采样点记录")
  112. if not farmland_data or not sample_data:
  113. print("错误: 缺少必要的数据")
  114. return
  115. # 合并数据
  116. print("正在匹配最近的采样点...")
  117. merged_data = merge_data(farmland_data, sample_data)
  118. # 写入结果
  119. print("正在写入结果...")
  120. write_csv(merged_data, output_file)
  121. if __name__ == "__main__":
  122. main()