123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import re
- from pathlib import Path
- import pandas as pd
- from utils.db_helper import DBHelper
- from utils.constants import COUNTRY_CODE_MAPPING, EXCLUDE_REGIONS, DOWNLOAD_DIR
- from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, find_unmatched_countries, \
- extract_year_month_from_path, traverse_and_process
- # 常量配置(新增路径正则校验)
- PROV_CODE = "410000"
- PROV_NAME = "河南省"
- YEAR_PATTERN = re.compile(r"^\d{4}$")
- MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
- download_dir = DOWNLOAD_DIR / "henan"
- def parse_excel(current_dir):
- """主解析入口(优化为单参数模式)
- Args:
- current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
- """
- current_path = Path(current_dir)
- year, month = extract_year_month_from_path(current_path)
- try:
- # 处理商品贸易数据
- process_combined_trade(current_path, year, month)
- # 处理国别贸易数据(保持原有逻辑结构)
- country_file = next(current_path.glob("*国别*"), None)
- process_country_trade(country_file, year, month)
- print(f"{current_dir}数据已全部成功处理")
- except Exception as e:
- print(f"处理失败:{current_dir},错误:{str(e)}")
- raise
- def process_combined_trade(current_dir, year, month):
- """处理合并商品贸易数据(支持1月数据模拟)"""
- import_file = next(current_dir.glob("*进口主要商品量值表*"), None)
- export_file = next(current_dir.glob("*出口主要商品量值表*"), None)
- if not (import_file and export_file):
- raise FileNotFoundError("缺少进口或出口文件")
- # 读取当前月数据
- current_data = read_trade_pair(import_file, export_file)
- db = DBHelper()
- current_data['commodity_code'] = current_data['commodity_name'].apply(db.get_commodity_id)
- valid_data = current_data[current_data['commodity_code'].notnull()].copy()
- # 构建当前月数据
- valid_data['crossborder_year'] = year
- valid_data['crossborder_year_month'] = f"{year}-{month:02d}"
- valid_data['prov_code'] = PROV_CODE
- valid_data['prov_name'] = PROV_NAME
- valid_data['monthly_total'] = valid_data['monthly_import'] + valid_data['monthly_export']
- # 定义目标字段
- target_cols = [
- 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name',
- 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export'
- ]
- # 写入当前月数据
- db.bulk_insert(
- valid_data[target_cols],
- 't_yujin_crossborder_prov_commodity_trade',
- conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export']
- )
- # 当处理2月数据时,生成模拟1月数据
- if month == 2:
- # 克隆当前数据并调整月份
- january_data = valid_data.copy()
- january_data['crossborder_year_month'] = f"{year}-01"
- # 数值处理:月指标除以2(模拟1-2月均值)
- numeric_cols = ['monthly_total', 'monthly_import', 'monthly_export']
- january_data[numeric_cols] = january_data[numeric_cols] / 2
- # 写入模拟1月数据
- db.bulk_insert(
- january_data[target_cols],
- 't_yujin_crossborder_prov_commodity_trade',
- conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
- update_columns=numeric_cols # 仅更新数值字段
- )
- def process_country_trade(current_file_path, year, month):
- """处理国别贸易数据(支持1月数据模拟)"""
- # 读取原始数据
- final_df = read_with_header4(current_file_path, month)
- # 数据清洗:剔除指定区域
- final_df = final_df[
- ~final_df['country_name'].isin(EXCLUDE_REGIONS) &
- ~final_df['country_name'].str.contains(r'[((]地区[))]', regex=True) # 修正正则表达式
- ]
- # 生成基础字段
- final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING)
- find_unmatched_countries(final_df)
- # 过滤掉没有匹配到 country_code 的行
- final_df = final_df[final_df['country_code'].notnull()].copy()
- final_df['crossborder_year'] = year
- final_df['crossborder_year_month'] = f"{year}-{month:02d}"
- final_df['prov_code'] = PROV_CODE
- final_df['prov_name'] = PROV_NAME
- # 主数据写入
- db = DBHelper()
- db.bulk_insert(
- final_df,
- 't_yujin_crossborder_prov_country_trade',
- conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- # 当处理2月数据时,生成模拟1月数据
- if month == 2:
- # 克隆数据并调整月份
- january_df = final_df.copy()
- january_df['crossborder_year_month'] = f"{year}-01"
- # 数值处理:月指标除以2,同比指标清零
- numeric_cols = ['monthly_total', 'monthly_import', 'monthly_export']
- january_df[numeric_cols] = january_df[numeric_cols] / 2 # 均摊为1-2月均值
- yoy_cols = ['yoy_import_export', 'yoy_import', 'yoy_export']
- january_df[yoy_cols] = 0.0 # 模拟数据无同比
- # 模拟数据写入(增加注释说明)
- db.bulk_insert(
- january_df,
- 't_yujin_crossborder_prov_country_trade',
- conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
- update_columns=numeric_cols + yoy_cols # 仅更新数值字段
- )
- def read_with_header4(file_path, month):
- # 第一阶段:读取原始数据(固定列范围)
- # 2月份数据和其他月份表格数据不同
- if month == 2:
- target_cols = [0, 1, 2, 3, 4, 5, 6]
- else:
- raw_df = pd.read_excel(
- file_path,
- usecols="A:K", # 强制读取前11列(A到K)
- header=None, # 禁用自动表头识别
- skiprows=5,
- skipfooter=1
- )
- # 第二阶段:计算列偏移量
- if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空
- col_offset = 2 # 从第三列开始(A3起始)
- else:
- col_offset = 0 # 默认从第一列开始(A1起始)
- # 第三阶段:确定目标列索引(基于偏移后的位置)
- target_cols = [0 + col_offset, 1 + col_offset, 2 + col_offset, 5 + col_offset, 6 + col_offset, 9 + col_offset,
- 10 + col_offset]
- # 第四阶段:应用header=4逻辑并选择目标列
- final_df = pd.read_excel(
- file_path,
- usecols=target_cols, # 动态选择的目标列
- header=4, # 保持原有header行位置
- skipfooter=1
- )
- # 第五阶段:强制列名对齐
- final_df.columns = [
- 'country_name', 'monthly_total', 'yoy_import_export',
- 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
- ]
- # 清洗国家名称
- final_df['country_name'] = final_df['country_name'].apply(clean_county_name)
- # 替换 "--" 为 0,并转换为 float 类型
- yoy_columns = ['yoy_import_export', 'yoy_export', 'yoy_import']
- # 优化后的稳健类型转换方案
- final_df[yoy_columns] = (
- final_df[yoy_columns]
- # 阶段1:清理非常规占位符
- .replace({
- '--': None, # 处理横杠
- '': None, # 处理空字符串
- 'N/A': None, # 处理英文占位符
- '不详': None # 处理中文占位符
- })
- # 阶段2:安全类型转换
- .apply(pd.to_numeric, errors='coerce', downcast='float')
- # 阶段3:空值处理
- .fillna(0)
- # 阶段4:精度控制
- .round(2)
- )
- return final_df
- # 进出口数据合并为一张表
- def read_trade_pair(import_path, export_path):
- """进出口合并,读取第一列和第4列"""
- df_import = pd.read_excel(
- import_path,
- skiprows=3,
- skipfooter=1,
- usecols=[0, 4],
- names=["commodity_name", "monthly_import"],
- header=None
- ).pipe(lambda df: df.assign(
- commodity_name=df["commodity_name"].apply(clean_commodity_name)
- ))
- df_export = pd.read_excel(
- export_path,
- skiprows=3,
- skipfooter=1,
- usecols=[0, 4],
- names=["commodity_name", "monthly_export"],
- header=None
- ).pipe(lambda df: df.assign(
- commodity_name=df["commodity_name"].apply(clean_commodity_name)
- ))
- merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(0)
- merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
- merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
- return merged
- def calculate_monthly_values(current_data, prev_data):
- """"""
- merged = pd.merge(current_data, prev_data, on="commodity_name",
- how="left", suffixes=("_current", "_prev")).fillna(0)
- merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"]
- merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"]
- return merged[["commodity_name", "monthly_import", "monthly_export"]]
- # def clean_commodity_name(name):
- # return re.sub(r'[^\w\u4e00-\u9fa5]', '', str(name)).strip()
- if __name__ == "__main__":
- traverse_and_process(download_dir, parse_excel, province_name="henan")
|