123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- import re
- from pathlib import Path
- import pandas as pd
- from utils.db_helper import DBHelper
- from utils.constants import DOWNLOAD_DIR
- from utils.parse_utils import convert_wan_to_yuan, extract_year_month_from_path, traverse_and_process
- FUJIAN_CITY = {
- "福州市": "350100",
- "厦门市": "350200",
- "莆田市": "350300",
- "三明市": "350400",
- "泉州市": "350500",
- "漳州市": "350600",
- "南平市": "350700",
- "宁德市": "350900",
- "龙岩市": "350800",
- "平潭地区": "350128"
- }
- # 常量配置(新增路径正则校验)
- PROV_CODE = "350000"
- PROV_NAME = "福建省"
- YEAR_PATTERN = re.compile(r"^\d{4}$")
- MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
- download_dir = DOWNLOAD_DIR / "fujian"
- def parse_excel(current_dir):
- """主解析入口(优化为单参数模式)
- Args:
- current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
- """
- current_path = Path(current_dir)
- year, month = extract_year_month_from_path(current_path)
- try:
- # 处理商品贸易数据
- current_file_path = next(current_path.glob("*分地市*"), None)
- process_region_trade(current_file_path, year, month)
- print(f"{current_dir}数据已全部成功处理")
- except Exception as e:
- print(f"处理失败:{current_dir},错误:{str(e)}")
- raise
- def process_region_trade(current_file_path, year, month):
- """处理地市贸易数据(增强1月逻辑 + 多sheet处理)"""
- # 动态选择列配置
- usecols = (
- list(range(7))
- if (year == 2023 and month <= 5)
- else [0, 3, 4, 7, 8, 11, 12]
- )
- #2023年5月之前的表格数据,单月数据在第二个sheet页
- sheet_index = 1 if (year == 2023 and month <= 5) else 0
- # 读取并处理主数据表
- current_df = load_and_process_data(
- current_file_path, year, month, usecols , sheet_index
- )
- # 数据库写入
- db = DBHelper()
- bulk_insert_data(
- db, current_df,
- conflict_cols=['crossborder_year_month', 'city_code'],
- update_cols=[
- 'monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export'
- ]
- )
- # 二月特殊处理逻辑
- if month == 2:
- print(f"根据2月表格生成{year}年1月数据...")
- handle_february_special_case(db, current_file_path, year, usecols)
- def load_and_process_data(file_path, year, month, usecols, sheet_index = 0):
- """通用数据加载处理流程"""
- df = pd.read_excel(
- file_path,
- header=4,
- sheet_name=sheet_index,
- usecols=usecols,
- names=[
- 'city_name', 'monthly_total', 'yoy_import_export',
- 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
- ]
- )
- # 数据清洗流程
- df = (
- df.pipe(clean_city_names)
- .pipe(map_city_codes)
- .pipe(add_metadata, year, month)
- .pipe(convert_units)
- )
- return df
- def handle_february_special_case(db, file_path, year, usecols):
- """二月数据处理特殊逻辑"""
- try:
- if year == 2023:
- process_2023_february(db, file_path, usecols)
- else:
- process_regular_february(db, file_path, year)
- except Exception as e:
- print(f"生成模拟1月数据失败: {str(e)}")
- # ---------- 工具函数 ----------
- def clean_city_names(df):
- """清洗城市名称"""
- df['city_name'] = (
- df['city_name']
- .str.replace(r'[(].*?[)]', '', regex=True)
- .str.strip()
- )
- return df
- def map_city_codes(df):
- """映射城市编码"""
- df['city_code'] = df['city_name'].map(FUJIAN_CITY)
- return df[df['city_code'].notnull()].copy()
- def add_metadata(df, year, month):
- """添加元数据字段"""
- return df.assign(
- crossborder_year=year,
- crossborder_year_month=f"{year}-{month:02d}",
- prov_code=PROV_CODE,
- prov_name=PROV_NAME
- )
- def convert_units(df):
- """单位转换(万→元)"""
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- df[col] = df[col].apply(convert_wan_to_yuan)
- return df
- def bulk_insert_data(db, df, conflict_cols, update_cols):
- """批量数据插入"""
- db.bulk_insert(
- df,
- 't_yujin_crossborder_prov_region_trade',
- conflict_columns=conflict_cols,
- update_columns=update_cols
- )
- # ---------- 二月特殊处理逻辑 ----------
- def process_2023_february(db, file_path, usecols):
- """2023年特殊处理逻辑"""
- # 读取双sheet数据
- ytd_df = load_sheet_data(file_path, sheet_index=0, usecols=usecols)
- current_df = load_sheet_data(file_path, sheet_index=1, usecols=usecols)
- # 合并计算差值
- merged = ytd_df.merge(
- current_df,
- on='city_code',
- suffixes=('_ytd', '_current')
- )
- # 生成一月数据
- january_df = create_january_data(merged, year=2023)
- bulk_insert_data(
- db, january_df,
- conflict_cols=['crossborder_year_month', 'city_code'],
- update_cols=[
- 'monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export'
- ]
- )
- def process_regular_february(db, file_path, year):
- """常规年份二月处理"""
- df = pd.read_excel(
- file_path,
- header=4,
- usecols=[0, 1, 3, 4, 5, 7, 8, 9, 11, 12],
- names=[
- 'city_name', 'ytd_monthly_total', 'monthly_total', 'yoy_import_export',
- 'ytd_monthly_export', 'monthly_export', 'yoy_export',
- 'ytd_monthly_import', 'monthly_import', 'yoy_import'
- ]
- )
- # 完整处理流程
- processed_df = (
- df.pipe(clean_city_names)
- .pipe(map_city_codes)
- .pipe(convert_special_units)
- .pipe(calculate_january_values)
- .pipe(add_metadata, year=year, month=1)
- )
- bulk_insert_data(
- db, processed_df,
- conflict_cols=['crossborder_year_month', 'city_code'],
- update_cols=[
- 'monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export'
- ]
- )
- def load_sheet_data(file_path, sheet_index, usecols):
- """加载指定sheet数据"""
- df = pd.read_excel(
- file_path,
- sheet_name=sheet_index,
- header=4,
- usecols=usecols,
- names=[
- 'city_name', 'monthly_total', 'yoy_import_export',
- 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
- ]
- )
- return (
- df.pipe(clean_city_names)
- .pipe(map_city_codes)
- .pipe(convert_units)
- )
- def create_january_data(merged_df, year):
- """生成一月数据(精确控制输出列)"""
- return (
- merged_df
- # 步骤1:计算新字段
- .assign(
- monthly_total=lambda x: x['monthly_total_ytd'] - x['monthly_total_current'],
- monthly_export=lambda x: x['monthly_export_ytd'] - x['monthly_export_current'],
- monthly_import=lambda x: x['monthly_import_ytd'] - x['monthly_import_current'],
- yoy_import_export=0.0,
- yoy_export=0.0,
- yoy_import=0.0,
- crossborder_year_month=f"{year}-01",
- city_name=lambda x: x['city_name_current']
- )
- # 步骤2:精确选择输出列(关键修复)
- .reindex(columns=[
- 'city_code', 'city_name',
- 'monthly_total', 'monthly_export', 'monthly_import',
- 'yoy_import_export', 'yoy_export', 'yoy_import',
- 'crossborder_year_month'
- ])
- # 步骤3:合并元数据(确保字段完整)
- .assign(
- crossborder_year=year,
- prov_code=PROV_CODE,
- prov_name=PROV_NAME
- )
- )
- def convert_special_units(df):
- """特殊单位转换"""
- for col in [
- 'ytd_monthly_total', 'monthly_total',
- 'ytd_monthly_export', 'monthly_export',
- 'ytd_monthly_import', 'monthly_import'
- ]:
- df[col] = df[col].apply(convert_wan_to_yuan)
- return df
- def calculate_january_values(df):
- """计算一月数值"""
- return df.assign(
- monthly_total=lambda x: x['ytd_monthly_total'] - x['monthly_total'],
- monthly_export=lambda x: x['ytd_monthly_export'] - x['monthly_export'],
- monthly_import=lambda x: x['ytd_monthly_import'] - x['monthly_import'],
- yoy_import_export=0.0,
- yoy_export=0.0,
- yoy_import=0.0
- ).drop(columns=[
- 'ytd_monthly_total', 'ytd_monthly_export',
- 'ytd_monthly_import'
- ])
- # def clean_commodity_name(name):
- # return re.sub(r'[^\w\u4e00-\u9fa5]', '', str(name)).strip()
- if __name__ == "__main__":
- traverse_and_process(download_dir, parse_excel, province_name="fujian")
- print("更新同比数据……")
- db_helper = DBHelper()
- db_helper.update_january_yoy()
- # parse_excel(download_dir/"2023"/"02")
|