utils.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from pykrige import OrdinaryKriging
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy import Column, Integer, String, Float, DateTime, select, create_engine
  4. import uuid
  5. from datetime import datetime, timezone
  6. import pandas as pd
  7. from .database_models import CurrentReduce, CurrentReflux
  8. from sqlalchemy.schema import MetaData, Table
  9. import geopandas as gpd
  10. Base = declarative_base()
  11. def create_dynamic_table(dataset_id, columns):
  12. """动态创建数据表"""
  13. # 动态构建列
  14. dynamic_columns = {
  15. 'id': Column(Integer, primary_key=True, autoincrement=True) # 为每个表添加一个主键
  16. }
  17. # 根据 columns 字典动态创建字段
  18. for col_name, col_type in columns.items():
  19. if col_type == 'str':
  20. dynamic_columns[col_name] = Column(String(255))
  21. elif col_type == 'int':
  22. dynamic_columns[col_name] = Column(Integer)
  23. elif col_type == 'float':
  24. dynamic_columns[col_name] = Column(Float)
  25. elif col_type == 'datetime':
  26. dynamic_columns[col_name] = Column(DateTime)
  27. # 动态生成模型类,表名使用 dataset_{dataset_id}
  28. table_name = f"dataset_{dataset_id}"
  29. # 在生成的类中添加 `__tablename__`
  30. dynamic_columns['__tablename__'] = table_name
  31. # 动态创建类
  32. dynamic_class = type(table_name, (Base,), dynamic_columns)
  33. # 打印调试信息
  34. print("table_name:", table_name)
  35. print("dynamic_columns:", dynamic_columns)
  36. # 创建数据库引擎
  37. engine = create_engine('sqlite:///SoilAcidification.db') # 这里需要替换为你的数据库引擎
  38. Base.metadata.create_all(engine) # 创建所有表格
  39. return dynamic_class
  40. # 判断文件类型是否允许
  41. def allowed_file(filename):
  42. ALLOWED_EXTENSIONS = {'xlsx', 'xls'}
  43. return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
  44. # 生成唯一文件名
  45. def generate_unique_filename(filename):
  46. # 获取文件的扩展名
  47. ext = filename.rsplit('.', 1)[1].lower()
  48. # 使用 UUID 和当前时间戳生成唯一文件名(使用 UTC 时区)
  49. unique_filename = f"{uuid.uuid4().hex}_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}.{ext}"
  50. return unique_filename
  51. def infer_column_types(df):
  52. type_map = {
  53. 'object': 'str',
  54. 'int64': 'int',
  55. 'float64': 'float',
  56. 'datetime64[ns]': 'datetime' # 适应Pandas datetime类型
  57. }
  58. # 提取列和其数据类型
  59. return {col: type_map.get(str(df[col].dtype), 'str') for col in df.columns}
  60. def clean_column_names(dataframe):
  61. # Strip whitespace and replace non-breaking spaces and other non-printable characters
  62. dataframe.columns = [col.strip().replace('\xa0', '') for col in dataframe.columns]
  63. return dataframe
  64. # 建立excel文件的列名和数据库模型字段之间的映射
  65. def rename_columns_for_model(dataframe, dataset_type):
  66. if dataset_type == 'reduce':
  67. rename_map = {
  68. '1/b': 'Q_over_b',
  69. 'pH': 'pH',
  70. 'OM': 'OM',
  71. 'CL': 'CL',
  72. 'H': 'H',
  73. 'Al': 'Al'
  74. }
  75. elif dataset_type == 'reflux':
  76. rename_map = {
  77. 'OM': 'OM',
  78. 'CL': 'CL',
  79. 'CEC': 'CEC',
  80. 'H+': 'H_plus',
  81. 'N': 'N',
  82. 'Al3+': 'Al3_plus',
  83. 'ΔpH': 'Delta_pH'
  84. }
  85. # 使用 rename() 方法更新列名
  86. dataframe = dataframe.rename(columns=rename_map)
  87. return dataframe
  88. # 建立前端参数和模型预测字段之间的映射
  89. def rename_columns_for_model_predict(dataframe, dataset_type):
  90. if dataset_type == 'reduce':
  91. rename_map = {
  92. 'init_pH': 'pH',
  93. 'OM': 'OM',
  94. 'CL': 'CL',
  95. 'H': 'H',
  96. 'Al': 'Al'
  97. }
  98. elif dataset_type == 'reflux':
  99. rename_map = {
  100. "OM": "OM",
  101. "CL": "CL",
  102. "CEC": "CEC",
  103. "H+": "H_plus",
  104. "N": "N",
  105. "Al3+": "Al3_plus"
  106. }
  107. # 使用 rename() 方法更新列名
  108. dataframe = dataframe.rename(columns=rename_map)
  109. return dataframe
  110. def insert_data_into_existing_table(session, dataframe, model_class):
  111. """Insert data from a DataFrame into an existing SQLAlchemy model table."""
  112. for index, row in dataframe.iterrows():
  113. record = model_class(**row.to_dict())
  114. session.add(record)
  115. def insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class):
  116. for _, row in dataset_df.iterrows():
  117. record_data = row.to_dict()
  118. session.execute(dynamic_table_class.__table__.insert(), [record_data])
  119. def insert_data_by_type(session, dataset_df, dataset_type):
  120. if dataset_type == 'reduce':
  121. for _, row in dataset_df.iterrows():
  122. record = CurrentReduce(**row.to_dict())
  123. session.add(record)
  124. elif dataset_type == 'reflux':
  125. for _, row in dataset_df.iterrows():
  126. record = CurrentReflux(**row.to_dict())
  127. session.add(record)
  128. def get_current_data(session, data_type):
  129. # 根据数据类型选择相应的表模型
  130. if data_type == 'reduce':
  131. model = CurrentReduce
  132. elif data_type == 'reflux':
  133. model = CurrentReflux
  134. else:
  135. raise ValueError("Invalid data type provided. Choose 'reduce' or 'reflux'.")
  136. # 从数据库中查询所有记录
  137. result = session.execute(select(model))
  138. # 将结果转换为DataFrame
  139. dataframe = pd.DataFrame([dict(row) for row in result])
  140. return dataframe
  141. def get_dataset_by_id(session, dataset_id):
  142. # 动态获取表的元数据
  143. metadata = MetaData(bind=session.bind)
  144. dataset_table = Table(dataset_id, metadata, autoload=True, autoload_with=session.bind)
  145. # 从数据库中查询整个表的数据
  146. query = select(dataset_table)
  147. result = session.execute(query).fetchall()
  148. # 检查是否有数据返回
  149. if not result:
  150. raise ValueError(f"No data found for dataset {dataset_id}.")
  151. # 将结果转换为DataFrame
  152. dataframe = pd.DataFrame(result, columns=[column.name for column in dataset_table.columns])
  153. return dataframe
  154. def predict_to_Q(predictions, init_ph, target_ph):
  155. # 将预测结果转换为Q
  156. Q = predictions * (target_ph - init_ph)
  157. return Q
  158. # 说明:Q指生石灰投加量,单位是%,例如1%代表100g土壤中施加1g生石灰。
  159. # 其中,土壤是指表层20cm土壤。# 如果Q的单位换算为吨/公顷,即t/ha,则需要乘以25。
  160. # ΔpH=目标pH-初始pH
  161. def Q_to_t_ha(Q):
  162. return Q * 25
  163. def create_kriging(file_name, emission_column, points):
  164. # 从 Excel 读取数据
  165. df = pd.read_excel(file_name)
  166. print(df)
  167. # 转换为 GeoDataFrame
  168. gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df['longitude'], df['latitude']))
  169. print(gdf)
  170. # 初始化并运行克里金插值
  171. OK = OrdinaryKriging(
  172. gdf.geometry.x,
  173. gdf.geometry.y,
  174. gdf[emission_column],
  175. variogram_model='spherical',
  176. verbose=True,
  177. enable_plotting=False
  178. )
  179. # 提取输入点的经度和纬度
  180. input_lons = [point[0] for point in points]
  181. input_lats = [point[1] for point in points]
  182. # 对输入的点进行插值
  183. z, ss = OK.execute('points', input_lons, input_lats)
  184. result = {
  185. "message": "Kriging interpolation for points completed successfully",
  186. "interpolated_concentrations": z.tolist()
  187. }
  188. return result