123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- from decimal import Decimal
- from pathlib import Path
- import pandas as pd
- from openpyxl import load_workbook
- from db_helper import DBHelper
- from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY
- from utils.log import log
- from utils.parse_utils import traverse_and_process, extract_year_month_from_path, get_previous_month_dir
- # 配置日志
- PROV_CODE = "440000"
- PROV_NAME = "广东省"
- download_dir = DOWNLOAD_DIR / "guangdong"
- db = DBHelper()
- def match_customs_file(filename, customs_name, year, month):
- """匹配海关文件"""
- filename_lower = filename.lower()
- if customs_name == "广州海关":
- return "7地市进出口综合统计" in filename_lower
- elif customs_name == "深圳海关":
- return "深圳海关综合统计资料" in filename_lower
- elif customs_name == "汕头海关":
- return "5市报表" in filename_lower
- elif customs_name == "黄埔海关":
- return "东莞市进出口企业性质总值表" in filename_lower
- elif customs_name == "江门海关":
- if "江门市" in filename_lower or "阳江市" in filename_lower:
- return "外贸进出口有关情况统计表" in filename_lower
- elif customs_name == "湛江海关":
- if "湛江市" in filename_lower or "茂名市" in filename_lower:
- return "外贸进出口数据" in filename_lower
- return False
- def process_guangzhou_customs(file_path, year, month):
- """处理广州海关数据"""
- try:
- # 读取Excel文件
- wb = load_workbook(file_path, data_only=True)
- sheet = wb.worksheets[0]
- # 查找包含月份的表头行
- month_str = f"{year}年{month}月"
- header_row = None
- for i in range(1, 4): # 检查前3行
- row_values = [str(cell.value).strip() if cell.value else "" for cell in sheet[i]]
- if any(month_str in val for val in row_values):
- header_row = i
- break
- if header_row is None:
- log.error(f"未找到 {month_str} 的表头")
- return pd.DataFrame()
- # 确定数据列位置
- data_cols = []
- for cell in sheet[header_row]:
- if cell.value and month_str in str(cell.value):
- data_cols.append(cell.column - 1) # 转换为0-based索引
- if len(data_cols) < 6:
- log.error(f"未找到足够的 {month_str} 数据列")
- return pd.DataFrame()
- # 提取7地市数据
- results = []
- target_cities = ["广州市", "深圳市", "东莞市", "汕头市", "江门市", "湛江市", "茂名市"]
- for row in sheet.iter_rows(min_row=header_row + 1):
- city_cell = row[0].value
- if city_cell and "广东省" in str(city_cell):
- city_name = str(city_cell).replace("广东省", "").strip()
- if city_name in target_cities:
- try:
- # 获取各列值
- total = row[data_cols[0]].value
- export = row[data_cols[1]].value
- import_val = row[data_cols[2]].value
- yoy_total = row[data_cols[3]].value
- yoy_export = row[data_cols[4]].value
- yoy_import = row[data_cols[5]].value
- # 转换数据类型
- def convert_value(val):
- if isinstance(val, (int, float)):
- return Decimal(str(val))
- elif isinstance(val, str) and val.replace(".", "").isdigit():
- return Decimal(val)
- return Decimal(0)
- # 添加到结果
- results.append({
- "city_name": city_name,
- "monthly_total": convert_value(total),
- "monthly_import": convert_value(import_val),
- "monthly_export": convert_value(export),
- "yoy_import_export": convert_value(yoy_total),
- "yoy_import": convert_value(yoy_import),
- "yoy_export": convert_value(yoy_export)
- })
- except Exception as e:
- log.error(f"处理城市 {city_name} 出错: {e}")
- return pd.DataFrame(results)
- except Exception as e:
- log.error(f"处理广州海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_shenzhen_customs(file_path, year, month):
- """处理深圳海关数据"""
- try:
- wb = load_workbook(file_path, data_only=True)
- results = []
- # 处理深圳和惠州两个sheet
- for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"),
- ("惠州市", "惠州市进出口(贸易方式)")]:
- try:
- if sheet_name in wb.sheetnames:
- sheet = wb[sheet_name]
- else:
- log.warning(f"未找到sheet: {sheet_name}")
- continue
- # 查找总值行
- total_row_idx = None
- for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
- if row and "总值" in str(row[0]):
- total_row_idx = i
- break
- if total_row_idx is None:
- log.error(f"未找到总值行: {sheet_name}")
- continue
- # 查找包含月份的表头
- month_str = f"{year}年{month}月"
- header_row = None
- data_col = None
- for i, row in enumerate(sheet.iter_rows(max_row=3, values_only=True), 1):
- if any(month_str in str(cell) for cell in row if cell):
- header_row = i
- for col_idx, cell_val in enumerate(row):
- if cell_val and month_str in str(cell_val):
- data_col = col_idx
- break
- break
- if data_col is None:
- log.error(f"未找到 {month_str} 列")
- continue
- # 获取数据值 (亿元转换为万元)
- total_value = sheet.cell(row=total_row_idx, column=data_col + 1).value
- yoy_value = sheet.cell(row=total_row_idx, column=data_col + 2).value
- if total_value is None or yoy_value is None:
- log.error(f"{city} 数据为空")
- continue
- # 转换数据类型
- def convert_value(val):
- if isinstance(val, (int, float)):
- return Decimal(str(val))
- elif isinstance(val, str) and val.replace(".", "").isdigit():
- return Decimal(val)
- return Decimal(0)
- # 添加到结果
- results.append({
- "city_name": city,
- "monthly_total": convert_value(total_value) * Decimal('10000'),
- "monthly_import": None, # 没有单独的进口/出口数据
- "monthly_export": None,
- "yoy_import_export": convert_value(yoy_value),
- "yoy_import": Decimal(0),
- "yoy_export": Decimal(0)
- })
- except Exception as e:
- log.error(f"处理 {city} 数据出错: {str(e)}")
- return pd.DataFrame(results)
- except Exception as e:
- log.error(f"处理深圳海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_shantou_customs(file_path, year, month):
- """处理汕头海关数据 (逻辑同广州海关)"""
- log.info(f"处理汕头海关文件: {file_path.name}")
- return process_guangzhou_customs(file_path, year, month)
- def process_huangpu_customs(file_path, year, month):
- """处理黄埔海关数据"""
- try:
- wb = load_workbook(file_path, data_only=True)
- sheet = wb.active
- # 查找合计行
- total_row_idx = None
- for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
- if row and "合计" in str(row[0]):
- total_row_idx = i
- break
- if total_row_idx is None:
- log.error("未找到合计行")
- return pd.DataFrame()
- # 查找包含月份的表头
- month_str = f"{year}年{month}月"
- header_row = None
- data_cols = []
- for i in range(1, 4): # 检查前3行
- row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]]
- if any(month_str in val and "人民币" in val for val in row_values):
- header_row = i
- for col_idx, val in enumerate(row_values):
- if val and month_str in val and "人民币" in val:
- data_cols.append(col_idx)
- break
- if len(data_cols) < 6:
- log.error(f"未找到足够的 {month_str} 人民币数据列")
- return pd.DataFrame()
- # 获取合计行数据
- row_values = [cell.value for cell in sheet[total_row_idx]]
- # 转换数据类型
- def convert_value(val):
- if isinstance(val, (int, float)):
- return Decimal(str(val))
- elif isinstance(val, str) and val.replace(".", "").isdigit():
- return Decimal(val)
- return Decimal(0)
- # 提取数据
- results = [{
- "city_name": "东莞市",
- "monthly_total": convert_value(row_values[data_cols[0]]), # 进出口
- "monthly_export": convert_value(row_values[data_cols[1]]), # 出口
- "monthly_import": convert_value(row_values[data_cols[2]]), # 进口
- "yoy_import_export": convert_value(row_values[data_cols[3]]), # 进出口同比
- "yoy_export": convert_value(row_values[data_cols[4]]), # 出口同比
- "yoy_import": convert_value(row_values[data_cols[5]]) # 进口同比
- }]
- return pd.DataFrame(results)
- except Exception as e:
- log.error(f"处理黄埔海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_jiangmen_customs(file_path, year, month):
- """处理江门海关数据"""
- try:
- wb = load_workbook(file_path, data_only=True)
- sheet = wb.active
- # 从文件名确定城市
- city_name = "江门市" if "江门市" in file_path.name else "阳江市"
- target_row_name = "江门市进出口商品" if city_name == "江门市" else "阳江市进出口商品总值"
- # 查找目标行
- target_row_idx = None
- for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
- if row and target_row_name in str(row[0]):
- target_row_idx = i
- break
- if target_row_idx is None:
- log.error(f"未找到 {target_row_name} 行")
- return pd.DataFrame()
- # 查找包含月份的表头
- month_str = f"{year}年{month}月"
- header_row = None
- data_cols = []
- for i in range(1, 4): # 检查前3行
- row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]]
- if any(month_str in val for val in row_values):
- header_row = i
- for col_idx, val in enumerate(row_values):
- if val and month_str in val:
- data_cols.append(col_idx)
- break
- if len(data_cols) < 6:
- log.error(f"未找到足够的 {month_str} 数据列")
- return pd.DataFrame()
- # 获取目标行数据
- row_values = [cell.value for cell in sheet[target_row_idx]]
- # 转换数据类型
- def convert_value(val):
- if isinstance(val, (int, float)):
- return Decimal(str(val))
- elif isinstance(val, str) and val.replace(".", "").isdigit():
- return Decimal(val)
- return Decimal(0)
- # 提取数据 (亿元转换为万元)
- return pd.DataFrame([{
- "city_name": city_name,
- "monthly_total": convert_value(row_values[data_cols[0]]) * Decimal('10000'), # 进出口
- "monthly_export": convert_value(row_values[data_cols[1]]) * Decimal('10000'), # 出口
- "monthly_import": convert_value(row_values[data_cols[2]]) * Decimal('10000'), # 进口
- "yoy_import_export": convert_value(row_values[data_cols[3]]), # 进出口同比
- "yoy_export": convert_value(row_values[data_cols[4]]), # 出口同比
- "yoy_import": convert_value(row_values[data_cols[5]]) # 进口同比
- }])
- except Exception as e:
- log.error(f"处理江门海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_zhanjiang_customs(file_path, year, month):
- """处理湛江海关数据"""
- try:
- wb = load_workbook(file_path, data_only=True)
- sheet = wb.worksheets[0]
- # 从文件名确定城市
- city_name = "湛江市" if "湛江市" in file_path.name else "茂名市"
- # 查找月度数据表格
- table_start_row = None
- month_str = f"{year}年{month}月"
- for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
- if row and any(month_str in str(cell) for cell in row if cell):
- table_start_row = i
- break
- if table_start_row is None:
- log.error(f"未找到 {month_str} 月度数据表")
- return pd.DataFrame()
- # 查找目标行(城市名所在行)
- target_row_idx = None
- for i in range(table_start_row, table_start_row + 20): # 在后续行中查找
- row_val = sheet.cell(row=i, column=1).value
- if row_val and city_name in str(row_val):
- target_row_idx = i
- break
- if target_row_idx is None:
- log.error(f"未找到 {city_name} 数据行")
- return pd.DataFrame()
- # 提取数据
- results = []
- for col in [2, 3, 4, 5, 6, 7]: # 依次为进出口、出口、进口、进出口同比、出口同比、进口同比
- cell_value = sheet.cell(row=target_row_idx, column=col).value
- results.append(cell_value)
- # 转换数据类型
- def convert_value(val):
- if isinstance(val, (int, float)):
- return Decimal(str(val))
- elif isinstance(val, str) and val.replace(".", "").isdigit():
- return Decimal(val)
- return Decimal(0)
- return pd.DataFrame([{
- "city_name": city_name,
- "monthly_total": convert_value(results[0]),
- "monthly_export": convert_value(results[1]),
- "monthly_import": convert_value(results[2]),
- "yoy_import_export": convert_value(results[3]),
- "yoy_export": convert_value(results[4]),
- "yoy_import": convert_value(results[5])
- }])
- except Exception as e:
- log.error(f"处理湛江海关文件出错: {str(e)}")
- return pd.DataFrame()
- def get_customs_processor(customs_name):
- """获取不同海关的处理函数"""
- processors = {
- "广州海关": process_guangzhou_customs,
- "深圳海关": process_shenzhen_customs,
- "汕头海关": process_shantou_customs,
- "黄埔海关": process_huangpu_customs,
- "江门海关": process_jiangmen_customs,
- "湛江海关": process_zhanjiang_customs
- }
- return processors.get(customs_name)
- def parse_excel(current_dir):
- """主解析入口(优化为单参数模式)
- Args:
- current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
- """
- current_path = Path(current_dir)
- year, month = extract_year_month_from_path(current_path)
- log.info(f"开始处理 {year}年{month}月 数据: {current_dir}")
- try:
- # 动态获取前月目录
- prev_dir = get_previous_month_dir(current_path) if month != 1 else None
- if prev_dir:
- log.info(f"上个月数据目录: {prev_dir}")
- # 获取当前目录下所有Excel文件
- excel_files = list(current_path.glob("*.xls*"))
- if not excel_files:
- log.warning(f"当前目录下未找到Excel文件: {current_dir}")
- return
- # 按海关处理每个文件
- all_results = pd.DataFrame()
- customs_map = {
- "广州海关": [],
- "深圳海关": [],
- "汕头海关": [],
- "黄埔海关": [],
- "江门海关": [],
- "湛江海关": []
- }
- # 组织文件到不同的海关
- for file_path in excel_files:
- for customs_name in customs_map.keys():
- if match_customs_file(file_path.name, customs_name, year, month):
- customs_map[customs_name].append(file_path)
- log.info(f"匹配到 {customs_name} 文件: {file_path.name}")
- break
- # 处理每个海关的文件
- for customs_name, file_list in customs_map.items():
- processor = get_customs_processor(customs_name)
- if not processor:
- continue
- for file_path in file_list:
- # 特殊处理深圳海关和江门海关的2月数据(缺少1月数据)
- if customs_name in ["深圳海关", "江门海关"] and month == 2:
- # 获取2月份完整数据
- df_full = processor(file_path, year, month)
- if df_full.empty:
- continue
- # 创建1月份数据 (取2月份数据的一半)
- df_half = df_full.copy()
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- df_half[col] = df_half[col] / 2
- # 设置1月份数据
- df_half['month'] = 1
- # 设置2月份数据 (取2月份数据的一半)
- df_full['month'] = 2
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- df_full[col] = df_full[col] / 2
- # 合并数据
- df_customs = pd.concat([df_half, df_full])
- else:
- # 正常处理数据
- df_customs = processor(file_path, year, month)
- if not df_customs.empty:
- df_customs['month'] = month
- if not df_customs.empty:
- all_results = pd.concat([all_results, df_customs])
- # 如果没有获取到数据
- if all_results.empty:
- log.warning(f"未处理到有效数据: {current_dir}")
- return
- # 添加公共字段
- all_results['prov_code'] = PROV_CODE
- all_results['prov_name'] = PROV_NAME
- all_results['year'] = year
- all_results['month'] = all_results.get('month', month)
- all_results['crossborder_year_month'] = all_results['year'].astype(str) + '-' + all_results['month'].astype(
- str).str.zfill(2)
- # 添加城市编码
- def get_city_code(row):
- return GUANGDONG_CITY.get(row['city_name'], '0000')
- all_results['city_code'] = all_results.apply(get_city_code, axis=1)
- # 排序并删除重复项
- all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month'])
- all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last')
- # 打印处理结果
- log.info(f"处理完成,共获得 {len(all_results)} 条数据")
- # 选择入库字段
- final_df = all_results[[
- 'crossborder_year_month', 'prov_code', 'prov_name',
- 'city_code', 'city_name', 'monthly_total',
- 'monthly_import', 'monthly_export', 'yoy_import_export',
- 'yoy_import', 'yoy_export'
- ]].copy()
- # 打印前几条数据
- log.info(f"处理后数据示例:\n{final_df.head()}")
- # 这里调用DBHelper入库(实际使用时请取消注释)
- """
- from db_helper import DBHelper
- db = DBHelper()
- db.bulk_insert(
- final_df,
- 't_yujin_crossborder_prov_region_trade',
- conflict_columns=['crossborder_year_month', 'city_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- """
- log.info(f"{current_dir}数据已全部成功处理")
- except Exception as e:
- log.error(f"处理失败:{current_dir},错误:{str(e)}")
- raise
- # 遍历目录的函数(原样保留)
- # 测试入口
- if __name__ == "__main__":
- traverse_and_process(download_dir, parse_excel, province_name="guangdong")
|