point_match.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import csv
  2. import math
  3. from collections import defaultdict
  4. def read_csv(file_path):
  5. """读取CSV文件并返回数据列表"""
  6. data = []
  7. try:
  8. with open(file_path, 'r') as file:
  9. reader = csv.DictReader(file)
  10. for row in reader:
  11. data.append(row)
  12. return data
  13. except FileNotFoundError:
  14. print(f"错误: 文件 {file_path} 未找到!")
  15. return []
  16. except Exception as e:
  17. print(f"错误: 读取文件 {file_path} 时发生错误: {e}")
  18. return []
  19. def calculate_distance(lat1, lon1, lat2, lon2):
  20. """计算两点之间的欧几里得距离(经纬度距离)"""
  21. # 将经纬度转换为浮点数
  22. try:
  23. lat1 = float(lat1)
  24. lon1 = float(lon1)
  25. lat2 = float(lat2)
  26. lon2 = float(lon2)
  27. except ValueError:
  28. print("错误: 经纬度转换为浮点数失败")
  29. return float('inf')
  30. # 简单的欧几里得距离计算,适用于小范围区域
  31. # 如需更精确的距离计算(考虑地球曲率),可使用Haversine公式
  32. dx = lat2 - lat1
  33. dy = lon2 - lon1
  34. return math.sqrt(dx*dx + dy*dy)
  35. def find_nearest_sample(farmland_point, sample_points, farmland_id=None):
  36. """为给定的耕地中心点找到最近的采样点并返回其cd值"""
  37. min_distance = float('inf')
  38. nearest_cd = None
  39. farm_lat = farmland_point.get('中心点经度')
  40. farm_lon = farmland_point.get('中心点纬度')
  41. farm_style = farmland_point.get('DLMC')
  42. if not farm_lat or not farm_lon:
  43. print(f"警告: 耕地 ID {farmland_id} 缺少经纬度信息")
  44. return None
  45. for sample in sample_points:
  46. sample_lat = sample.get('经度')
  47. sample_lon = sample.get('纬度')
  48. sample_cd = sample.get('Cd (ug/L)')
  49. if not sample_lat or not sample_lon or not sample_cd:
  50. continue
  51. distance = calculate_distance(farm_lat, farm_lon, sample_lat, sample_lon)
  52. if distance < min_distance:
  53. min_distance = distance
  54. nearest_cd = sample_cd
  55. nearest_cd = float(nearest_cd)
  56. A = 711 * 0.524
  57. B = 427 * 0.599
  58. C = 200 * 0.7
  59. if farm_style == "水浇地":
  60. nearest_cd = B*nearest_cd
  61. elif farm_style == "水田":
  62. nearest_cd = A*nearest_cd
  63. else:
  64. nearest_cd = C*nearest_cd
  65. return nearest_cd
  66. def merge_data(farmland_data, sample_data):
  67. """合并耕地数据和最近的采样点cd数据"""
  68. merged_data = []
  69. unmatched_count = 0
  70. for farmland in farmland_data:
  71. farmland_id = farmland.get('id', '未知ID')
  72. nearest_cd = find_nearest_sample(farmland, sample_data, farmland_id)
  73. # 创建新的合并记录,只包含中心点坐标和cd值
  74. merged_record = {
  75. 'lon': farmland.get('中心点经度'),
  76. 'lan': farmland.get('中心点纬度'),
  77. 'Prediction': nearest_cd
  78. }
  79. if nearest_cd is None:
  80. unmatched_count += 1
  81. merged_data.append(merged_record)
  82. if unmatched_count > 0:
  83. print(f"警告: {unmatched_count} 个耕地没有找到匹配的采样点cd值")
  84. return merged_data
  85. def write_csv(data, output_file):
  86. """将数据写入CSV文件并指定字段顺序"""
  87. if not data:
  88. print("错误: 没有数据可写入")
  89. return
  90. # 显式指定字段顺序
  91. fieldnames = ['lon', 'lan', 'Prediction']
  92. try:
  93. with open(output_file, 'w', newline='', encoding='utf-8') as file:
  94. writer = csv.DictWriter(file, fieldnames=fieldnames)
  95. writer.writeheader()
  96. writer.writerows(data)
  97. print(f"成功将数据写入 {output_file},字段顺序: {', '.join(fieldnames)}")
  98. except Exception as e:
  99. print(f"错误: 写入文件 {output_file} 时发生错误: {e}")
  100. def main():
  101. """主函数 - 使用数据库数据进行点匹配"""
  102. import sys
  103. from pathlib import Path
  104. # 添加项目根目录到Python路径
  105. project_root = Path(__file__).parent.parent.parent
  106. sys.path.append(str(project_root))
  107. # 导入数据库服务
  108. from app.services.land_use_service import land_use_service
  109. # 设置要处理的土地类型
  110. land_type = "水浇地"
  111. # 从数据库获取土地利用数据
  112. print(f"正在从数据库读取 {land_type} 数据...")
  113. land_centers_df = land_use_service.get_land_centers_for_processing(land_type)
  114. if land_centers_df is None or land_centers_df.empty:
  115. print(f"数据库中没有找到 '{land_type}' 类型的土地利用数据")
  116. return
  117. print(f"已从数据库读取 {len(land_centers_df)} 条 {land_type} 记录")
  118. # 转换为字典格式以兼容现有函数
  119. farmland_data = []
  120. for _, row in land_centers_df.iterrows():
  121. farmland_data.append({
  122. 'id': row['gid'],
  123. '中心点经度': row['center_lon'],
  124. '中心点纬度': row['center_lat'],
  125. 'DLMC': row['dlmc']
  126. })
  127. # 配置文件路径
  128. sample_file = Path(__file__).parent.parent / "Data" / "SamplingPoint.csv"
  129. output_dir = Path(__file__).parent.parent / "Data"
  130. output_dir.mkdir(exist_ok=True)
  131. output_file = output_dir / f"matched_data_{land_type}_DB.csv"
  132. print("正在读取采样点数据...")
  133. sample_data = read_csv(str(sample_file))
  134. print(f"已读取 {len(sample_data)} 条采样点记录")
  135. if not farmland_data or not sample_data:
  136. print("错误: 缺少必要的数据")
  137. return
  138. # 合并数据
  139. print("正在匹配最近的采样点...")
  140. merged_data = merge_data(farmland_data, sample_data)
  141. # 写入结果
  142. print("正在写入结果...")
  143. write_csv(merged_data, str(output_file))
  144. print(f"总共处理了 {len(merged_data)} 个 {land_type} 面要素")
  145. if __name__ == "__main__":
  146. main()