import re from pathlib import Path import pandas as pd from utils.db_helper import DBHelper from utils.constants import DOWNLOAD_DIR from utils.parse_utils import convert_wan_to_yuan, extract_year_month_from_path, traverse_and_process FUJIAN_CITY = { "福州市": "350100", "厦门市": "350200", "莆田市": "350300", "三明市": "350400", "泉州市": "350500", "漳州市": "350600", "南平市": "350700", "宁德市": "350900", "龙岩市": "350800", "平潭地区": "350128" } # 常量配置(新增路径正则校验) PROV_CODE = "350000" PROV_NAME = "福建省" YEAR_PATTERN = re.compile(r"^\d{4}$") MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$") download_dir = DOWNLOAD_DIR / "fujian" def parse_excel(current_dir): """主解析入口(优化为单参数模式) Args: current_dir (str): 当前月份数据目录(格式:/年份/省份/月份) """ current_path = Path(current_dir) year, month = extract_year_month_from_path(current_path) try: # 处理商品贸易数据 current_file_path = next(current_path.glob("*分地市*"), None) process_region_trade(current_file_path, year, month) print(f"{current_dir}数据已全部成功处理") except Exception as e: print(f"处理失败:{current_dir},错误:{str(e)}") raise def process_region_trade(current_file_path, year, month): """处理地市贸易数据(增强1月逻辑 + 多sheet处理)""" # 动态选择列配置 usecols = ( list(range(7)) if (year == 2023 and month <= 5) else [0, 3, 4, 7, 8, 11, 12] ) #2023年5月之前的表格数据,单月数据在第二个sheet页 sheet_index = 1 if (year == 2023 and month <= 5) else 0 # 读取并处理主数据表 current_df = load_and_process_data( current_file_path, year, month, usecols , sheet_index ) # 数据库写入 db = DBHelper() bulk_insert_data( db, current_df, conflict_cols=['crossborder_year_month', 'city_code'], update_cols=[ 'monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export' ] ) # 二月特殊处理逻辑 if month == 2: print(f"根据2月表格生成{year}年1月数据...") handle_february_special_case(db, current_file_path, year, usecols) def load_and_process_data(file_path, year, month, usecols, sheet_index = 0): """通用数据加载处理流程""" df = pd.read_excel( file_path, header=4, sheet_name=sheet_index, usecols=usecols, names=[ 'city_name', 'monthly_total', 'yoy_import_export', 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import' ] ) # 数据清洗流程 df = ( df.pipe(clean_city_names) .pipe(map_city_codes) .pipe(add_metadata, year, month) .pipe(convert_units) ) return df def handle_february_special_case(db, file_path, year, usecols): """二月数据处理特殊逻辑""" try: if year == 2023: process_2023_february(db, file_path, usecols) else: process_regular_february(db, file_path, year) except Exception as e: print(f"生成模拟1月数据失败: {str(e)}") # ---------- 工具函数 ---------- def clean_city_names(df): """清洗城市名称""" df['city_name'] = ( df['city_name'] .str.replace(r'[(].*?[)]', '', regex=True) .str.strip() ) return df def map_city_codes(df): """映射城市编码""" df['city_code'] = df['city_name'].map(FUJIAN_CITY) return df[df['city_code'].notnull()].copy() def add_metadata(df, year, month): """添加元数据字段""" return df.assign( crossborder_year=year, crossborder_year_month=f"{year}-{month:02d}", prov_code=PROV_CODE, prov_name=PROV_NAME ) def convert_units(df): """单位转换(万→元)""" for col in ['monthly_total', 'monthly_import', 'monthly_export']: df[col] = df[col].apply(convert_wan_to_yuan) return df def bulk_insert_data(db, df, conflict_cols, update_cols): """批量数据插入""" db.bulk_insert( df, 't_yujin_crossborder_prov_region_trade', conflict_columns=conflict_cols, update_columns=update_cols ) # ---------- 二月特殊处理逻辑 ---------- def process_2023_february(db, file_path, usecols): """2023年特殊处理逻辑""" # 读取双sheet数据 ytd_df = load_sheet_data(file_path, sheet_index=0, usecols=usecols) current_df = load_sheet_data(file_path, sheet_index=1, usecols=usecols) # 合并计算差值 merged = ytd_df.merge( current_df, on='city_code', suffixes=('_ytd', '_current') ) # 生成一月数据 january_df = create_january_data(merged, year=2023) bulk_insert_data( db, january_df, conflict_cols=['crossborder_year_month', 'city_code'], update_cols=[ 'monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export' ] ) def process_regular_february(db, file_path, year): """常规年份二月处理""" df = pd.read_excel( file_path, header=4, usecols=[0, 1, 3, 4, 5, 7, 8, 9, 11, 12], names=[ 'city_name', 'ytd_monthly_total', 'monthly_total', 'yoy_import_export', 'ytd_monthly_export', 'monthly_export', 'yoy_export', 'ytd_monthly_import', 'monthly_import', 'yoy_import' ] ) # 完整处理流程 processed_df = ( df.pipe(clean_city_names) .pipe(map_city_codes) .pipe(convert_special_units) .pipe(calculate_january_values) .pipe(add_metadata, year=year, month=1) ) bulk_insert_data( db, processed_df, conflict_cols=['crossborder_year_month', 'city_code'], update_cols=[ 'monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export' ] ) def load_sheet_data(file_path, sheet_index, usecols): """加载指定sheet数据""" df = pd.read_excel( file_path, sheet_name=sheet_index, header=4, usecols=usecols, names=[ 'city_name', 'monthly_total', 'yoy_import_export', 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import' ] ) return ( df.pipe(clean_city_names) .pipe(map_city_codes) .pipe(convert_units) ) def create_january_data(merged_df, year): """生成一月数据(精确控制输出列)""" return ( merged_df # 步骤1:计算新字段 .assign( monthly_total=lambda x: x['monthly_total_ytd'] - x['monthly_total_current'], monthly_export=lambda x: x['monthly_export_ytd'] - x['monthly_export_current'], monthly_import=lambda x: x['monthly_import_ytd'] - x['monthly_import_current'], yoy_import_export=0.0, yoy_export=0.0, yoy_import=0.0, crossborder_year_month=f"{year}-01", city_name=lambda x: x['city_name_current'] ) # 步骤2:精确选择输出列(关键修复) .reindex(columns=[ 'city_code', 'city_name', 'monthly_total', 'monthly_export', 'monthly_import', 'yoy_import_export', 'yoy_export', 'yoy_import', 'crossborder_year_month' ]) # 步骤3:合并元数据(确保字段完整) .assign( crossborder_year=year, prov_code=PROV_CODE, prov_name=PROV_NAME ) ) def convert_special_units(df): """特殊单位转换""" for col in [ 'ytd_monthly_total', 'monthly_total', 'ytd_monthly_export', 'monthly_export', 'ytd_monthly_import', 'monthly_import' ]: df[col] = df[col].apply(convert_wan_to_yuan) return df def calculate_january_values(df): """计算一月数值""" return df.assign( monthly_total=lambda x: x['ytd_monthly_total'] - x['monthly_total'], monthly_export=lambda x: x['ytd_monthly_export'] - x['monthly_export'], monthly_import=lambda x: x['ytd_monthly_import'] - x['monthly_import'], yoy_import_export=0.0, yoy_export=0.0, yoy_import=0.0 ).drop(columns=[ 'ytd_monthly_total', 'ytd_monthly_export', 'ytd_monthly_import' ]) # def clean_commodity_name(name): # return re.sub(r'[^\w\u4e00-\u9fa5]', '', str(name)).strip() if __name__ == "__main__": traverse_and_process(download_dir, parse_excel, province_name="fujian") print("更新同比数据……") db_helper = DBHelper() db_helper.update_january_yoy() # parse_excel(download_dir/"2023"/"02")