import re from pathlib import Path import numpy as np import pandas as pd from utils.db_helper import DBHelper from quanguo.CountryTrade import COUNTRY_CODE_MAPPING from utils.constants import DOWNLOAD_DIR from utils.log import log from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, \ extract_year_month_from_path, get_previous_month_dir, find_unmatched_countries, traverse_and_process # 常量配置 PROV_CODE = "370000" PROV_NAME = "山东省" SHANDONG_CITY = { "济南": "370100", "青岛": "370200", "淄博": "370300", "枣庄": "370400", "东营": "370500", "烟台": "370600", "潍坊": "370700", "济宁": "370800", "泰安": "370900", "威海": "371000", "日照": "371100", "临沂": "371300", "德州": "371400", "聊城": "371500", "滨州": "371600", "菏泽": "371700" } download_dir = DOWNLOAD_DIR / "shandong" YEAR_PATTERN = re.compile(r"^\d{4}$") MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$") def parse_excel(current_dir): """主解析入口(优化为单参数模式) Args: current_dir (str): 当前月份数据目录(格式:/年份/省份/月份) """ current_path = Path(current_dir) year, month = extract_year_month_from_path(current_path) try: # 动态获取前月目录 prev_dir = get_previous_month_dir(current_path) if month != 1 else None # 处理商品贸易数据 process_combined_trade(current_path, year, month, prev_dir) # 处理地市贸易数据 current_file_path = next(current_path.glob("*地市*"), None) prev_file_path = next(Path(prev_dir).glob("*地市*"), None) if prev_dir else None if current_file_path: process_region_trade(current_file_path, prev_file_path, year, month) # 处理国别贸易数据(保持原有逻辑结构) country_file = next(current_path.glob("*国别*"), None) prev_country_file = next(Path(prev_dir).glob("*国别*"), None) if prev_dir else None if country_file: process_country_trade(country_file, prev_country_file, year, month) log.info(f"{current_dir}数据已全部成功处理") except Exception as e: log.error(f"处理失败:{current_dir},错误:{str(e)}") raise def process_combined_trade(current_dir, year, month, previous_dir=None): """处理合并商品贸易数据(增强1月逻辑)""" import_file = next(current_dir.glob("*进口20位主要商品总值*"), None) export_file = next(current_dir.glob("*出口20位主要商品总值*"), None) if not (import_file and export_file): raise FileNotFoundError("缺少进口或出口文件") # 读取当前月数据(保持原有逻辑) current_data = read_trade_pair(import_file, export_file) # 处理历史数据 prev_data = pd.DataFrame() if previous_dir and month != 1: prev_import = next(Path(previous_dir).glob("*进口20位主要商品总值*"), None) prev_export = next(Path(previous_dir).glob("*出口20位主要商品总值*"), None) if prev_import and prev_export: prev_data = read_trade_pair(prev_import, prev_export) # 计算逻辑优化 merged_data = current_data if month == 1 else calculate_monthly_values(current_data, prev_data) # 保留原有数据库交互逻辑 db = DBHelper() merged_data['commodity_code'] = merged_data['commodity_name'].apply(db.get_commodity_id) valid_data = merged_data[merged_data['commodity_code'].notnull()].copy() # 构建入库数据(保持原有字段结构) valid_data['crossborder_year'] = year valid_data['crossborder_year_month'] = f"{year}-{month:02d}" valid_data['prov_code'] = PROV_CODE valid_data['prov_name'] = PROV_NAME #当 monthly_import 和 monthly_export 中只有一个有值时,monthly_total 取不为空的那个值, # 而两者都有值时相加 valid_data['monthly_total'] = valid_data['monthly_import'].fillna(0) + valid_data['monthly_export'].fillna(0) valid_data['monthly_total'] = valid_data['monthly_total'].replace(0, np.nan) valid_data = valid_data.replace({np.nan: None}) # 入库逻辑保持不变 target_cols = [ 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name', 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export' ] db.bulk_insert( valid_data[target_cols], 't_yujin_crossborder_prov_commodity_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export'] ) def process_region_trade(current_file_path, prev_file_path, year, month): """处理地市贸易数据(增强1月逻辑)""" # 读取当前数据 current_df = pd.read_excel( current_file_path, skipfooter=1, header=4, names=['city_name', 'monthly_total', 'yoy_import_export', 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'] ) # 1月特殊处理 if month == 1: df = current_df[['city_name', 'monthly_total', 'monthly_export', 'monthly_import']].copy() df['yoy_import_export'] = current_df['yoy_import_export'] df['yoy_export'] = current_df['yoy_export'] df['yoy_import'] = current_df['yoy_import'] else: prev_df = pd.read_excel( prev_file_path, skipfooter=1, header=4, names=['city_name', 'monthly_total', 'yoy_import_export', 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'] ) if prev_file_path else pd.DataFrame() merged_df = pd.merge( current_df, prev_df, on='city_name', suffixes=('_current', '_prev') ) df = pd.DataFrame({ 'city_name': merged_df['city_name'], 'monthly_total': merged_df['monthly_total_current'] - merged_df['monthly_total_prev'], 'yoy_import_export': merged_df['yoy_import_export_current'], 'monthly_export': merged_df['monthly_export_current'] - merged_df['monthly_export_prev'], 'yoy_export': merged_df['yoy_export_current'], 'monthly_import': merged_df['monthly_import_current'] - merged_df['monthly_import_prev'], 'yoy_import': merged_df['yoy_import_current'] }) # 保留原有处理逻辑 df['city_code'] = df['city_name'].map(SHANDONG_CITY) df['crossborder_year'] = year df['crossborder_year_month'] = f"{year}-{month:02d}" df['prov_code'] = PROV_CODE df['prov_name'] = PROV_NAME # 单位转换 for col in ['monthly_total', 'monthly_import', 'monthly_export']: df[col] = df[col].apply(convert_wan_to_yuan) db = DBHelper() df = df.replace({np.nan: None}) db.bulk_insert( df, 't_yujin_crossborder_prov_region_trade', conflict_columns=['crossborder_year_month', 'city_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) def process_country_trade(current_file_path, prev_file_path, year, month): """处理国别贸易数据(增强1月逻辑)""" # 读取当前数据 current_df = read_with_header4(current_file_path) current_df = current_df[~current_df['country_name'].str.contains('注:', na=False)] current_df = current_df.dropna(subset=['country_name']) current_df = current_df[current_df['country_name'].str.strip() != ''] # 1月特殊处理 if month == 1: final_df = current_df.copy() final_df[['monthly_total', 'monthly_export', 'monthly_import']] = \ current_df[['monthly_total', 'monthly_export', 'monthly_import']] else: prev_df = read_with_header4(prev_file_path) prev_df = prev_df[~prev_df['country_name'].str.contains('注:', na=False)] prev_df = prev_df.dropna(subset=['country_name']) prev_df = prev_df[prev_df['country_name'].str.strip() != ''] merged_df = pd.merge( current_df, prev_df, on='country_name', suffixes=('_current', '_prev'), how='inner' ) merged_df['monthly_total'] = merged_df['monthly_total_current'] - merged_df['monthly_total_prev'] merged_df['monthly_export'] = merged_df['monthly_export_current'] - merged_df['monthly_export_prev'] merged_df['monthly_import'] = merged_df['monthly_import_current'] - merged_df['monthly_import_prev'] merged_df['yoy_import_export'] = merged_df['yoy_import_export_current'] merged_df['yoy_export'] = merged_df['yoy_export_current'] merged_df['yoy_import'] = merged_df['yoy_import_current'] final_df = merged_df[[ 'country_name','monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export' ]] # 排除特殊国家(新增过滤逻辑) final_df = final_df[ ~final_df['country_name'].str.contains('东盟|欧盟', na=False, regex=True) ] final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING) find_unmatched_countries(final_df) final_df['crossborder_year'] = year final_df['crossborder_year_month'] = f"{year}-{month:02d}" final_df['prov_code'] = PROV_CODE final_df['prov_name'] = PROV_NAME # 单位转换 for col in ['monthly_total', 'monthly_import', 'monthly_export']: final_df[col] = final_df[col].apply(convert_wan_to_yuan) final_df = final_df.replace({np.nan: None}) db = DBHelper() db.bulk_insert( final_df, 't_yujin_crossborder_prov_country_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) def read_with_header4(file_path): # 第一阶段:读取原始数据(固定列范围) raw_df = pd.read_excel( file_path, usecols="A:G", # 强制读取前7列 header=None, # 禁用自动表头识别 skipfooter=1 ) # 第二阶段:计算列偏移量 if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空 col_offset = 2 # 从第三列开始(A3起始) else: col_offset = 0 # 默认从第一列开始(A1起始) # 第三阶段:应用header=4逻辑 header_row = 4 # 保持原有header行位置 data_start_row = header_row + 1 # 数据起始行 # 重新读取有效数据 final_df = pd.read_excel( file_path, usecols=raw_df.columns[col_offset:col_offset + 7], # 动态列范围 header=header_row, skipfooter=1 ) # 第四阶段:强制列名对齐 final_df.columns = [ 'country_name', 'monthly_total', 'yoy_import_export', 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import' ] # 清洗国家名称 final_df['country_name'] = final_df['country_name'].apply(clean_county_name) return final_df def read_trade_pair(import_path, export_path): """进/出口表格合并""" df_import = pd.read_excel(import_path, skiprows=3, skipfooter=1, usecols=[0, 1], names=["commodity_name", "monthly_import"]).pipe(lambda df: df.assign( commodity_name=df["commodity_name"].apply(clean_commodity_name) )) df_export = pd.read_excel(export_path, skiprows=3, skipfooter=1, usecols=[0, 1], names=["commodity_name", "monthly_export"]).pipe(lambda df: df.assign( commodity_name=df["commodity_name"].apply(clean_commodity_name) )) merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(pd.NA) merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan) merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan) return merged def calculate_monthly_values(current_data, prev_data): """根据上个月进出口数据计算当月数据""" merged = pd.merge(current_data, prev_data, on="commodity_name", how="left", suffixes=("_current", "_prev")).fillna(pd.NA) merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"] merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"] return merged[["commodity_name", "monthly_import", "monthly_export"]] if __name__ == "__main__": traverse_and_process(download_dir, parse_excel, province_name="shandong") log.info("\n山东省地级市数据同比更新中...") db_helper = DBHelper() db_helper.update_shandong_yoy()