from pykrige import OrdinaryKriging from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Float, DateTime, select, create_engine import uuid from datetime import datetime, timezone import pandas as pd from .database_models import CurrentReduce, CurrentReflux from sqlalchemy.schema import MetaData, Table import geopandas as gpd Base = declarative_base() def create_dynamic_table(dataset_id, columns): """动态创建数据表""" # 动态构建列 dynamic_columns = { 'id': Column(Integer, primary_key=True, autoincrement=True) # 为每个表添加一个主键 } # 根据 columns 字典动态创建字段 for col_name, col_type in columns.items(): if col_type == 'str': dynamic_columns[col_name] = Column(String(255)) elif col_type == 'int': dynamic_columns[col_name] = Column(Integer) elif col_type == 'float': dynamic_columns[col_name] = Column(Float) elif col_type == 'datetime': dynamic_columns[col_name] = Column(DateTime) # 动态生成模型类,表名使用 dataset_{dataset_id} table_name = f"dataset_{dataset_id}" # 在生成的类中添加 `__tablename__` dynamic_columns['__tablename__'] = table_name # 动态创建类 dynamic_class = type(table_name, (Base,), dynamic_columns) # 打印调试信息 print("table_name:", table_name) print("dynamic_columns:", dynamic_columns) # 创建数据库引擎 engine = create_engine('sqlite:///SoilAcidification.db') # 这里需要替换为你的数据库引擎 Base.metadata.create_all(engine) # 创建所有表格 return dynamic_class # 判断文件类型是否允许 def allowed_file(filename): ALLOWED_EXTENSIONS = {'xlsx', 'xls'} return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # 生成唯一文件名 def generate_unique_filename(filename): # 获取文件的扩展名 ext = filename.rsplit('.', 1)[1].lower() # 使用 UUID 和当前时间戳生成唯一文件名(使用 UTC 时区) unique_filename = f"{uuid.uuid4().hex}_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}.{ext}" return unique_filename def infer_column_types(df): type_map = { 'object': 'str', 'int64': 'int', 'float64': 'float', 'datetime64[ns]': 'datetime' # 适应Pandas datetime类型 } # 提取列和其数据类型 return {col: type_map.get(str(df[col].dtype), 'str') for col in df.columns} def clean_column_names(dataframe): # Strip whitespace and replace non-breaking spaces and other non-printable characters dataframe.columns = [col.strip().replace('\xa0', '') for col in dataframe.columns] return dataframe # 建立excel文件的列名和数据库模型字段之间的映射 def rename_columns_for_model(dataframe, dataset_type): if dataset_type == 'reduce': rename_map = { '1/b': 'Q_over_b', 'pH': 'pH', 'OM': 'OM', 'CL': 'CL', 'H': 'H', 'Al': 'Al' } elif dataset_type == 'reflux': rename_map = { 'OM': 'OM', 'CL': 'CL', 'CEC': 'CEC', 'H+': 'H_plus', 'N': 'N', 'Al3+': 'Al3_plus', 'ΔpH': 'Delta_pH' } # 使用 rename() 方法更新列名 dataframe = dataframe.rename(columns=rename_map) return dataframe # 建立前端参数和模型预测字段之间的映射 def rename_columns_for_model_predict(dataframe, dataset_type): if dataset_type == 'reduce': rename_map = { 'init_pH': 'pH', 'OM': 'OM', 'CL': 'CL', 'H': 'H', 'Al': 'Al' } elif dataset_type == 'reflux': rename_map = { "OM": "OM", "CL": "CL", "CEC": "CEC", "H+": "H_plus", "N": "N", "Al3+": "Al3_plus" } # 使用 rename() 方法更新列名 dataframe = dataframe.rename(columns=rename_map) return dataframe def insert_data_into_existing_table(session, dataframe, model_class): """Insert data from a DataFrame into an existing SQLAlchemy model table.""" for index, row in dataframe.iterrows(): record = model_class(**row.to_dict()) session.add(record) def insert_data_into_dynamic_table(session, dataset_df, dynamic_table_class): for _, row in dataset_df.iterrows(): record_data = row.to_dict() session.execute(dynamic_table_class.__table__.insert(), [record_data]) def insert_data_by_type(session, dataset_df, dataset_type): if dataset_type == 'reduce': for _, row in dataset_df.iterrows(): record = CurrentReduce(**row.to_dict()) session.add(record) elif dataset_type == 'reflux': for _, row in dataset_df.iterrows(): record = CurrentReflux(**row.to_dict()) session.add(record) def get_current_data(session, data_type): # 根据数据类型选择相应的表模型 if data_type == 'reduce': model = CurrentReduce elif data_type == 'reflux': model = CurrentReflux else: raise ValueError("Invalid data type provided. Choose 'reduce' or 'reflux'.") # 从数据库中查询所有记录 result = session.execute(select(model)) # 将结果转换为DataFrame dataframe = pd.DataFrame([dict(row) for row in result]) return dataframe def get_dataset_by_id(session, dataset_id): # 动态获取表的元数据 metadata = MetaData(bind=session.bind) dataset_table = Table(dataset_id, metadata, autoload=True, autoload_with=session.bind) # 从数据库中查询整个表的数据 query = select(dataset_table) result = session.execute(query).fetchall() # 检查是否有数据返回 if not result: raise ValueError(f"No data found for dataset {dataset_id}.") # 将结果转换为DataFrame dataframe = pd.DataFrame(result, columns=[column.name for column in dataset_table.columns]) return dataframe def predict_to_Q(predictions, init_ph, target_ph): # 将预测结果转换为Q Q = predictions * (target_ph - init_ph) return Q # 说明:Q指生石灰投加量,单位是%,例如1%代表100g土壤中施加1g生石灰。 # 其中,土壤是指表层20cm土壤。# 如果Q的单位换算为吨/公顷,即t/ha,则需要乘以25。 # ΔpH=目标pH-初始pH def Q_to_t_ha(Q): return Q * 25 def create_kriging(file_name, emission_column, points): # 从 Excel 读取数据 df = pd.read_excel(file_name) print(df) # 转换为 GeoDataFrame gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df['longitude'], df['latitude'])) print(gdf) # 初始化并运行克里金插值 OK = OrdinaryKriging( gdf.geometry.x, gdf.geometry.y, gdf[emission_column], variogram_model='spherical', verbose=True, enable_plotting=False ) # 提取输入点的经度和纬度 input_lons = [point[0] for point in points] input_lats = [point[1] for point in points] # 对输入的点进行插值 z, ss = OK.execute('points', input_lons, input_lats) result = { "message": "Kriging interpolation for points completed successfully", "interpolated_concentrations": z.tolist() } return result