import re from pathlib import Path import pandas as pd from utils.db_helper import DBHelper from utils.constants import COUNTRY_CODE_MAPPING, EXCLUDE_REGIONS, DOWNLOAD_DIR from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, find_unmatched_countries, \ extract_year_month_from_path, traverse_and_process # 常量配置(新增路径正则校验) PROV_CODE = "410000" PROV_NAME = "河南省" YEAR_PATTERN = re.compile(r"^\d{4}$") MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$") download_dir = DOWNLOAD_DIR / "henan" def parse_excel(current_dir): """主解析入口(优化为单参数模式) Args: current_dir (str): 当前月份数据目录(格式:/年份/省份/月份) """ current_path = Path(current_dir) year, month = extract_year_month_from_path(current_path) try: # 处理商品贸易数据 process_combined_trade(current_path, year, month) # 处理国别贸易数据(保持原有逻辑结构) country_file = next(current_path.glob("*国别*"), None) process_country_trade(country_file, year, month) print(f"{current_dir}数据已全部成功处理") except Exception as e: print(f"处理失败:{current_dir},错误:{str(e)}") raise def process_combined_trade(current_dir, year, month): """处理合并商品贸易数据(支持1月数据模拟)""" import_file = next(current_dir.glob("*进口主要商品量值表*"), None) export_file = next(current_dir.glob("*出口主要商品量值表*"), None) if not (import_file and export_file): raise FileNotFoundError("缺少进口或出口文件") # 读取当前月数据 current_data = read_trade_pair(import_file, export_file) db = DBHelper() current_data['commodity_code'] = current_data['commodity_name'].apply(db.get_commodity_id) valid_data = current_data[current_data['commodity_code'].notnull()].copy() # 构建当前月数据 valid_data['crossborder_year'] = year valid_data['crossborder_year_month'] = f"{year}-{month:02d}" valid_data['prov_code'] = PROV_CODE valid_data['prov_name'] = PROV_NAME valid_data['monthly_total'] = valid_data['monthly_import'] + valid_data['monthly_export'] # 定义目标字段 target_cols = [ 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name', 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export' ] # 写入当前月数据 db.bulk_insert( valid_data[target_cols], 't_yujin_crossborder_prov_commodity_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export'] ) # 当处理2月数据时,生成模拟1月数据 if month == 2: # 克隆当前数据并调整月份 january_data = valid_data.copy() january_data['crossborder_year_month'] = f"{year}-01" # 数值处理:月指标除以2(模拟1-2月均值) numeric_cols = ['monthly_total', 'monthly_import', 'monthly_export'] january_data[numeric_cols] = january_data[numeric_cols] / 2 # 写入模拟1月数据 db.bulk_insert( january_data[target_cols], 't_yujin_crossborder_prov_commodity_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'], update_columns=numeric_cols # 仅更新数值字段 ) def process_country_trade(current_file_path, year, month): """处理国别贸易数据(支持1月数据模拟)""" # 读取原始数据 final_df = read_with_header4(current_file_path, month) # 数据清洗:剔除指定区域 final_df = final_df[ ~final_df['country_name'].isin(EXCLUDE_REGIONS) & ~final_df['country_name'].str.contains(r'[((]地区[))]', regex=True) # 修正正则表达式 ] # 生成基础字段 final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING) find_unmatched_countries(final_df) # 过滤掉没有匹配到 country_code 的行 final_df = final_df[final_df['country_code'].notnull()].copy() final_df['crossborder_year'] = year final_df['crossborder_year_month'] = f"{year}-{month:02d}" final_df['prov_code'] = PROV_CODE final_df['prov_name'] = PROV_NAME # 主数据写入 db = DBHelper() db.bulk_insert( final_df, 't_yujin_crossborder_prov_country_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) # 当处理2月数据时,生成模拟1月数据 if month == 2: # 克隆数据并调整月份 january_df = final_df.copy() january_df['crossborder_year_month'] = f"{year}-01" # 数值处理:月指标除以2,同比指标清零 numeric_cols = ['monthly_total', 'monthly_import', 'monthly_export'] january_df[numeric_cols] = january_df[numeric_cols] / 2 # 均摊为1-2月均值 yoy_cols = ['yoy_import_export', 'yoy_import', 'yoy_export'] january_df[yoy_cols] = 0.0 # 模拟数据无同比 # 模拟数据写入(增加注释说明) db.bulk_insert( january_df, 't_yujin_crossborder_prov_country_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'], update_columns=numeric_cols + yoy_cols # 仅更新数值字段 ) def read_with_header4(file_path, month): # 第一阶段:读取原始数据(固定列范围) # 2月份数据和其他月份表格数据不同 if month == 2: target_cols = [0, 1, 2, 3, 4, 5, 6] else: raw_df = pd.read_excel( file_path, usecols="A:K", # 强制读取前11列(A到K) header=None, # 禁用自动表头识别 skiprows=5, skipfooter=1 ) # 第二阶段:计算列偏移量 if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空 col_offset = 2 # 从第三列开始(A3起始) else: col_offset = 0 # 默认从第一列开始(A1起始) # 第三阶段:确定目标列索引(基于偏移后的位置) target_cols = [0 + col_offset, 1 + col_offset, 2 + col_offset, 5 + col_offset, 6 + col_offset, 9 + col_offset, 10 + col_offset] # 第四阶段:应用header=4逻辑并选择目标列 final_df = pd.read_excel( file_path, usecols=target_cols, # 动态选择的目标列 header=4, # 保持原有header行位置 skipfooter=1 ) # 第五阶段:强制列名对齐 final_df.columns = [ 'country_name', 'monthly_total', 'yoy_import_export', 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import' ] # 清洗国家名称 final_df['country_name'] = final_df['country_name'].apply(clean_county_name) # 替换 "--" 为 0,并转换为 float 类型 yoy_columns = ['yoy_import_export', 'yoy_export', 'yoy_import'] # 优化后的稳健类型转换方案 final_df[yoy_columns] = ( final_df[yoy_columns] # 阶段1:清理非常规占位符 .replace({ '--': None, # 处理横杠 '': None, # 处理空字符串 'N/A': None, # 处理英文占位符 '不详': None # 处理中文占位符 }) # 阶段2:安全类型转换 .apply(pd.to_numeric, errors='coerce', downcast='float') # 阶段3:空值处理 .fillna(0) # 阶段4:精度控制 .round(2) ) return final_df # 进出口数据合并为一张表 def read_trade_pair(import_path, export_path): """进出口合并,读取第一列和第4列""" df_import = pd.read_excel( import_path, skiprows=3, skipfooter=1, usecols=[0, 4], names=["commodity_name", "monthly_import"], header=None ).pipe(lambda df: df.assign( commodity_name=df["commodity_name"].apply(clean_commodity_name) )) df_export = pd.read_excel( export_path, skiprows=3, skipfooter=1, usecols=[0, 4], names=["commodity_name", "monthly_export"], header=None ).pipe(lambda df: df.assign( commodity_name=df["commodity_name"].apply(clean_commodity_name) )) merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(0) merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan) merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan) return merged def calculate_monthly_values(current_data, prev_data): """""" merged = pd.merge(current_data, prev_data, on="commodity_name", how="left", suffixes=("_current", "_prev")).fillna(0) merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"] merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"] return merged[["commodity_name", "monthly_import", "monthly_export"]] # def clean_commodity_name(name): # return re.sub(r'[^\w\u4e00-\u9fa5]', '', str(name)).strip() if __name__ == "__main__": traverse_and_process(download_dir, parse_excel, province_name="henan")