123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- import re
- from pathlib import Path
- import numpy as np
- import pandas as pd
- from utils.db_helper import DBHelper
- from quanguo.CountryTrade import COUNTRY_CODE_MAPPING
- from utils.constants import DOWNLOAD_DIR
- from utils.log import log
- from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, \
- extract_year_month_from_path, get_previous_month_dir, find_unmatched_countries, traverse_and_process
- # 常量配置
- PROV_CODE = "370000"
- PROV_NAME = "山东省"
- SHANDONG_CITY = {
- "济南": "370100", "青岛": "370200", "淄博": "370300", "枣庄": "370400",
- "东营": "370500", "烟台": "370600", "潍坊": "370700", "济宁": "370800",
- "泰安": "370900", "威海": "371000", "日照": "371100", "临沂": "371300",
- "德州": "371400", "聊城": "371500", "滨州": "371600", "菏泽": "371700"
- }
- download_dir = DOWNLOAD_DIR / "shandong"
- YEAR_PATTERN = re.compile(r"^\d{4}$")
- MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
- def parse_excel(current_dir):
- """主解析入口(优化为单参数模式)
- Args:
- current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
- """
- current_path = Path(current_dir)
- year, month = extract_year_month_from_path(current_path)
- try:
- # 动态获取前月目录
- prev_dir = get_previous_month_dir(current_path) if month != 1 else None
- # 处理商品贸易数据
- process_combined_trade(current_path, year, month, prev_dir)
- # 处理地市贸易数据
- current_file_path = next(current_path.glob("*地市*"), None)
- prev_file_path = next(Path(prev_dir).glob("*地市*"), None) if prev_dir else None
- if current_file_path:
- process_region_trade(current_file_path, prev_file_path, year, month)
- # 处理国别贸易数据(保持原有逻辑结构)
- country_file = next(current_path.glob("*国别*"), None)
- prev_country_file = next(Path(prev_dir).glob("*国别*"), None) if prev_dir else None
- if country_file:
- process_country_trade(country_file, prev_country_file, year, month)
- log.info(f"{current_dir}数据已全部成功处理")
- except Exception as e:
- log.error(f"处理失败:{current_dir},错误:{str(e)}")
- raise
- def process_combined_trade(current_dir, year, month, previous_dir=None):
- """处理合并商品贸易数据(增强1月逻辑)"""
- import_file = next(current_dir.glob("*进口20位主要商品总值*"), None)
- export_file = next(current_dir.glob("*出口20位主要商品总值*"), None)
- if not (import_file and export_file):
- raise FileNotFoundError("缺少进口或出口文件")
- # 读取当前月数据(保持原有逻辑)
- current_data = read_trade_pair(import_file, export_file)
- # 处理历史数据
- prev_data = pd.DataFrame()
- if previous_dir and month != 1:
- prev_import = next(Path(previous_dir).glob("*进口20位主要商品总值*"), None)
- prev_export = next(Path(previous_dir).glob("*出口20位主要商品总值*"), None)
- if prev_import and prev_export:
- prev_data = read_trade_pair(prev_import, prev_export)
- # 计算逻辑优化
- merged_data = current_data if month == 1 else calculate_monthly_values(current_data, prev_data)
- # 保留原有数据库交互逻辑
- db = DBHelper()
- merged_data['commodity_code'] = merged_data['commodity_name'].apply(db.get_commodity_id)
- valid_data = merged_data[merged_data['commodity_code'].notnull()].copy()
- # 构建入库数据(保持原有字段结构)
- valid_data['crossborder_year'] = year
- valid_data['crossborder_year_month'] = f"{year}-{month:02d}"
- valid_data['prov_code'] = PROV_CODE
- valid_data['prov_name'] = PROV_NAME
- #当 monthly_import 和 monthly_export 中只有一个有值时,monthly_total 取不为空的那个值,
- # 而两者都有值时相加
- valid_data['monthly_total'] = valid_data['monthly_import'].fillna(0) + valid_data['monthly_export'].fillna(0)
- valid_data['monthly_total'] = valid_data['monthly_total'].replace(0, np.nan)
- valid_data = valid_data.replace({np.nan: None})
- # 入库逻辑保持不变
- target_cols = [
- 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name',
- 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export'
- ]
- db.bulk_insert(
- valid_data[target_cols],
- 't_yujin_crossborder_prov_commodity_trade',
- conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export']
- )
- def process_region_trade(current_file_path, prev_file_path, year, month):
- """处理地市贸易数据(增强1月逻辑)"""
- # 读取当前数据
- current_df = pd.read_excel(
- current_file_path,
- skipfooter=1,
- header=4,
- names=['city_name', 'monthly_total', 'yoy_import_export',
- 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
- )
- # 1月特殊处理
- if month == 1:
- df = current_df[['city_name', 'monthly_total',
- 'monthly_export', 'monthly_import']].copy()
- df['yoy_import_export'] = current_df['yoy_import_export']
- df['yoy_export'] = current_df['yoy_export']
- df['yoy_import'] = current_df['yoy_import']
- else:
- prev_df = pd.read_excel(
- prev_file_path,
- skipfooter=1,
- header=4,
- names=['city_name', 'monthly_total', 'yoy_import_export',
- 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
- ) if prev_file_path else pd.DataFrame()
- merged_df = pd.merge(
- current_df,
- prev_df,
- on='city_name',
- suffixes=('_current', '_prev')
- )
- df = pd.DataFrame({
- 'city_name': merged_df['city_name'],
- 'monthly_total': merged_df['monthly_total_current'] - merged_df['monthly_total_prev'],
- 'yoy_import_export': merged_df['yoy_import_export_current'],
- 'monthly_export': merged_df['monthly_export_current'] - merged_df['monthly_export_prev'],
- 'yoy_export': merged_df['yoy_export_current'],
- 'monthly_import': merged_df['monthly_import_current'] - merged_df['monthly_import_prev'],
- 'yoy_import': merged_df['yoy_import_current']
- })
- # 保留原有处理逻辑
- df['city_code'] = df['city_name'].map(SHANDONG_CITY)
- df['crossborder_year'] = year
- df['crossborder_year_month'] = f"{year}-{month:02d}"
- df['prov_code'] = PROV_CODE
- df['prov_name'] = PROV_NAME
- # 单位转换
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- df[col] = df[col].apply(convert_wan_to_yuan)
- db = DBHelper()
- df = df.replace({np.nan: None})
- db.bulk_insert(
- df,
- 't_yujin_crossborder_prov_region_trade',
- conflict_columns=['crossborder_year_month', 'city_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- def process_country_trade(current_file_path, prev_file_path, year, month):
- """处理国别贸易数据(增强1月逻辑)"""
- # 读取当前数据
- current_df = read_with_header4(current_file_path)
- current_df = current_df[~current_df['country_name'].str.contains('注:', na=False)]
- current_df = current_df.dropna(subset=['country_name'])
- current_df = current_df[current_df['country_name'].str.strip() != '']
- # 1月特殊处理
- if month == 1:
- final_df = current_df.copy()
- final_df[['monthly_total', 'monthly_export', 'monthly_import']] = \
- current_df[['monthly_total', 'monthly_export', 'monthly_import']]
- else:
- prev_df = read_with_header4(prev_file_path)
- prev_df = prev_df[~prev_df['country_name'].str.contains('注:', na=False)]
- prev_df = prev_df.dropna(subset=['country_name'])
- prev_df = prev_df[prev_df['country_name'].str.strip() != '']
- merged_df = pd.merge(
- current_df,
- prev_df,
- on='country_name',
- suffixes=('_current', '_prev'),
- how='inner'
- )
- merged_df['monthly_total'] = merged_df['monthly_total_current'] - merged_df['monthly_total_prev']
- merged_df['monthly_export'] = merged_df['monthly_export_current'] - merged_df['monthly_export_prev']
- merged_df['monthly_import'] = merged_df['monthly_import_current'] - merged_df['monthly_import_prev']
- merged_df['yoy_import_export'] = merged_df['yoy_import_export_current']
- merged_df['yoy_export'] = merged_df['yoy_export_current']
- merged_df['yoy_import'] = merged_df['yoy_import_current']
- final_df = merged_df[[
- 'country_name','monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export'
- ]]
- # 排除特殊国家(新增过滤逻辑)
- final_df = final_df[
- ~final_df['country_name'].str.contains('东盟|欧盟', na=False, regex=True)
- ]
- final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING)
- find_unmatched_countries(final_df)
- final_df['crossborder_year'] = year
- final_df['crossborder_year_month'] = f"{year}-{month:02d}"
- final_df['prov_code'] = PROV_CODE
- final_df['prov_name'] = PROV_NAME
- # 单位转换
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- final_df[col] = final_df[col].apply(convert_wan_to_yuan)
- final_df = final_df.replace({np.nan: None})
- db = DBHelper()
- db.bulk_insert(
- final_df,
- 't_yujin_crossborder_prov_country_trade',
- conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- def read_with_header4(file_path):
- # 第一阶段:读取原始数据(固定列范围)
- raw_df = pd.read_excel(
- file_path,
- usecols="A:G", # 强制读取前7列
- header=None, # 禁用自动表头识别
- skipfooter=1
- )
- # 第二阶段:计算列偏移量
- if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空
- col_offset = 2 # 从第三列开始(A3起始)
- else:
- col_offset = 0 # 默认从第一列开始(A1起始)
- # 第三阶段:应用header=4逻辑
- header_row = 4 # 保持原有header行位置
- data_start_row = header_row + 1 # 数据起始行
- # 重新读取有效数据
- final_df = pd.read_excel(
- file_path,
- usecols=raw_df.columns[col_offset:col_offset + 7], # 动态列范围
- header=header_row,
- skipfooter=1
- )
- # 第四阶段:强制列名对齐
- final_df.columns = [
- 'country_name', 'monthly_total', 'yoy_import_export',
- 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
- ]
- # 清洗国家名称
- final_df['country_name'] = final_df['country_name'].apply(clean_county_name)
- return final_df
- def read_trade_pair(import_path, export_path):
- """进/出口表格合并"""
- df_import = pd.read_excel(import_path, skiprows=3, skipfooter=1,
- usecols=[0, 1], names=["commodity_name", "monthly_import"]).pipe(lambda df: df.assign(
- commodity_name=df["commodity_name"].apply(clean_commodity_name)
- ))
- df_export = pd.read_excel(export_path, skiprows=3, skipfooter=1,
- usecols=[0, 1], names=["commodity_name", "monthly_export"]).pipe(lambda df: df.assign(
- commodity_name=df["commodity_name"].apply(clean_commodity_name)
- ))
- merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(pd.NA)
- merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
- merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
- return merged
- def calculate_monthly_values(current_data, prev_data):
- """根据上个月进出口数据计算当月数据"""
- merged = pd.merge(current_data, prev_data, on="commodity_name",
- how="left", suffixes=("_current", "_prev")).fillna(pd.NA)
- merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"]
- merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"]
- return merged[["commodity_name", "monthly_import", "monthly_export"]]
- if __name__ == "__main__":
- traverse_and_process(download_dir, parse_excel, province_name="shandong")
- log.info("\n山东省地级市数据同比更新中...")
- db_helper = DBHelper()
- db_helper.update_shandong_yoy()
|