|
- from decimal import Decimal, InvalidOperation
- from pathlib import Path
- import pandas as pd
- from db_helper import DBHelper
- from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY
- from utils.log import log
- from utils.parse_utils import traverse_and_process, extract_year_month_from_path, get_previous_month_dir
- # 配置日志
- PROV_CODE = "440000"
- PROV_NAME = "广东省"
- download_dir = DOWNLOAD_DIR / "guangdong"
- db = DBHelper()
- _zhanjiang_first_month = None
- # 广州海关:万元
- # 深圳海关:亿元
- # 汕头海关:万元
- # 黄埔海关:万元
- # 江门海关:亿元
- # 湛江海关:万元
- def match_customs_file(filename, customs_name, year, month):
- """匹配海关文件"""
- filename_lower = filename.lower()
- if customs_name == "广州海关":
- return "7地市进出口综合统计" in filename_lower
- elif customs_name == "深圳海关":
- return "深圳海关综合统计资料" in filename_lower
- elif customs_name == "汕头海关":
- return "5市报表" in filename_lower
- elif customs_name == "黄埔海关":
- return "东莞市进出口企业性质总值表" in filename_lower
- elif customs_name == "江门海关":
- if "江门市" in filename_lower or "阳江市" in filename_lower:
- return "外贸进出口有关情况统计表" in filename_lower
- elif customs_name == "湛江海关":
- if "湛江市" in filename_lower or "茂名市" in filename_lower:
- return "外贸进出口数据" in filename_lower
- return False
- def find_header_and_columns(df, year, month):
- """
- 查找匹配月份的表头行并定位对应的列索引。
- 支持三种基础格式:
- - "2024年12月"
- - "2024年12月-2024年12月"
- - "2024-12-01 00:00:00"
- 如果都未找到,则尝试匹配特殊格式:"2023年01月-2023年02月"
- """
- candidate_month_strs = [
- f"{year}年{month:02d}月",
- f"{year}年{month:02d}月-{year}年{month:02d}月",
- f"{year}-{month:02d}-01 00:00:00"
- ]
- header_row = None
- for i in range(min(3, len(df))):
- row_cells = [str(cell).strip() for cell in df.iloc[i]]
- for cell_val in row_cells:
- if any(s == cell_val for s in candidate_month_strs):
- header_row = i
- break
- if header_row is not None:
- break
- # 如果没找到常规格式,尝试特殊格式:2023年01月-2023年02月
- special_format = "2023年01月-2023年02月"
- if header_row is None:
- log.warning(f"未找到常规格式,尝试匹配特殊格式: {special_format}")
- for i in range(min(3, len(df))):
- row_cells = [str(cell).strip() for cell in df.iloc[i]]
- for cell_val in row_cells:
- if cell_val == special_format:
- header_row = i
- log.info(f"成功匹配特殊格式: {special_format} 行号={i}")
- break
- if header_row is not None:
- break
- if header_row is None:
- log.error("未找到任何支持的表头格式")
- return None, []
- # 确定数据列位置(包含所有候选格式)
- data_cols = []
- for col in range(len(df.columns)):
- cell_val = str(df.iloc[header_row, col]).strip()
- if cell_val in candidate_month_strs:
- data_cols.append(col)
- if not data_cols:
- for col in range(len(df.columns)):
- cell_val = str(df.iloc[header_row, col]).strip()
- if cell_val in [special_format]:
- data_cols.append(col)
- if not data_cols:
- log.error("未找到对应的数据列")
- return header_row, []
- return header_row, data_cols
- def process_guangzhou_customs(file_path, year, month,customs_type='guangzhou'):
- """处理广州海关数据"""
- try:
- # 读取Excel文件
- df = pd.read_excel(file_path, sheet_name=0, header=None)
- log.info(f"处理广州海关文件: {file_path.name}")
- header_row,data_cols = find_header_and_columns(df, year, month)
- # 提取7地市数据
- results = []
- target_cities = ["广州市", "韶关市", "佛山市", "肇庆市", "河源市",
- "清远市", "汕头市", "梅州市", "汕尾市", "潮州市", "揭阳市", "云浮市"]
- for idx in range(header_row + 1, len(df)):
- row = df.iloc[idx]
- city_cell = str(row[0])
- if "广东省" in city_cell:
- city_name = city_cell.replace("广东省", "").strip()
- if city_name in target_cities:
- try:
- if len(data_cols)>3:
- monthly_total = Decimal(str(row[data_cols[0]])) # 进出口
- monthly_export = Decimal(str(row[data_cols[4]])) # 出口
- monthly_import = Decimal(str(row[data_cols[8]])) # 进口
- yoy_import_export = Decimal(str(row[data_cols[1]])) # 进出口同比
- yoy_export = Decimal(str(row[data_cols[5]])) # 出口同比
- yoy_import = Decimal(str(row[data_cols[9]])) # 进口同比
- else:
- monthly_total = Decimal(str(row[data_cols[0]]))
- monthly_export = Decimal(str(row[data_cols[1]]))
- monthly_import = Decimal(str(row[data_cols[2]]))
- yoy_import_export = Decimal(str(row[data_cols[0]+1])) # 进出口同比
- yoy_export = Decimal(str(row[data_cols[1]+1])) # 出口同比
- yoy_import = Decimal(str(row[data_cols[2]+1])) # 进口同比
- results.append({
- "city_name": city_name,
- "monthly_total": monthly_total,
- "monthly_import": monthly_import,
- "monthly_export": monthly_export,
- "yoy_import_export": yoy_import_export,
- "yoy_import": yoy_import,
- "yoy_export": yoy_export
- })
- except Exception as e:
- log.error(f"处理行 {idx} 出错: {e}")
- return pd.DataFrame(results)
- except Exception as e:
- log.error(f"处理广州海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_shenzhen_customs(file_path, year, month):
- """处理深圳海关数据(完整6指标版)"""
- try:
- log.info(f"处理深圳海关文件: {file_path.name}")
- results = []
- for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"),
- ("惠州市", "惠州市进出口(贸易方式)")]:
- try:
- df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
- except:
- log.warning(f"未找到sheet: {sheet_name}")
- continue
- # 查找总值行
- total_row_idx = None
- for idx in range(len(df)):
- if "总值" in str(df.iloc[idx, 0]):
- total_row_idx = idx
- break
- if total_row_idx is None:
- log.error(f"未找到总值行: {sheet_name}")
- continue
- try:
- # 列索引映射(基于您提供的完整数据结构)
- # 进出口 | 出口 | 进口 的数值和同比
- monthly_total = convert_unit(str(df.iloc[total_row_idx, 1]))
- yoy_total = Decimal(str(df.iloc[total_row_idx, 2]))
- monthly_export = convert_unit(str(df.iloc[total_row_idx, 3]))
- yoy_export = Decimal(str(df.iloc[total_row_idx, 4]))
- monthly_import = convert_unit(str(df.iloc[total_row_idx, 5]))
- yoy_import = Decimal(str(df.iloc[total_row_idx, 6]))
- results.append({
- "city_name": city,
- "monthly_total": monthly_total,
- "monthly_export": monthly_export,
- "monthly_import": monthly_import,
- "yoy_import_export": yoy_total, # 进出口同比
- "yoy_export": yoy_export,
- "yoy_import": yoy_import
- })
- except Exception as e:
- log.error(f"处理 {city} 数据出错: {e}")
- # 尝试部分提取(回退方案)
- try:
- monthly_total = Decimal(str(df.iloc[total_row_idx, 1])) * Decimal('10000')
- results.append({
- "city_name": city,
- "monthly_total": monthly_total,
- "monthly_export": None,
- "monthly_import": None,
- "yoy_import_export": Decimal('0'),
- "yoy_export": Decimal('0'),
- "yoy_import": Decimal('0')
- })
- except:
- log.error(f"连基础进出口总值都无法提取: {sheet_name}")
- return pd.DataFrame(results)
- except Exception as e:
- log.error(f"处理深圳海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_shantou_customs(file_path, year, month):
- """处理汕头海关数据 (逻辑同广州海关)"""
- log.info(f"处理汕头海关文件: {file_path.name}")
- return process_guangzhou_customs(file_path, year, month,customs_type='shantou')
- def process_huangpu_customs(file_path, year, month):
- """处理黄埔海关数据"""
- try:
- df = pd.read_excel(file_path, sheet_name=0, header=None)
- log.info(f"处理黄埔海关文件: {file_path.name}")
- # 查找合计行
- total_row_idx = None
- for idx in range(len(df)):
- if "合计" in str(df.iloc[idx, 0]):
- total_row_idx = idx
- break
- if total_row_idx is None:
- log.error("未找到合计行")
- return pd.DataFrame()
- # 查找包含月份的表头,匹配23年1月-23年多种格式
- if year == 2024 and month == 12:
- month_str = '45627'
- elif year == 2023 and month == 12:
- month_str = '45261'
- elif year == 2023 and month == 3:
- month_str = f"{year}年{month:02d}月-{year}年{month:02d}月"
- else:
- month_str = f'{year}-{month:02d}-01 00:00:00'
- header_row = None
- for i in range(min(3, len(df))):
- row_cells = [str(cell).strip() for cell in df.iloc[i]]
- if any(month_str in cell in cell for cell in row_cells):
- header_row = i
- break
- if header_row is None:
- log.error(f"未找到 {month_str} 人民币表头")
- return pd.DataFrame()
- # 确定数据列位置
- data_cols = []
- for col in range(len(df.columns)):
- cell_val = str(df.iloc[header_row, col])
- if month_str in cell_val :
- data_cols.append(col)
- if len(data_cols) < 3:
- log.error(f"未找到足够的 {month_str} 人民币数据列")
- return pd.DataFrame()
- try:
- result = []
- # 提取数据
- row = df.iloc[total_row_idx]
- monthly_total = Decimal(str(row[data_cols[0]])) # 进出口
- monthly_export = Decimal(str(row[data_cols[1]])) # 出口
- monthly_import = Decimal(str(row[data_cols[2]])) # 进口
- yoy_import_export = str(row[data_cols[0]+1]) # 进出口同比
- yoy_export = str(row[data_cols[1]+1]) # 出口同比
- yoy_import = str(row[data_cols[2]+1]) # 进口同比
- result.append({
- "crossborder_year_month": f'{year}-{month:02d}',
- "city_name": "东莞市",
- "monthly_total": monthly_total,
- "monthly_import": monthly_import,
- "monthly_export": monthly_export,
- "yoy_import_export": yoy_import_export,
- "yoy_import": yoy_import,
- "yoy_export": yoy_export
- })
- #东莞市一月数据比较特殊
- if month == 2:
- monthly_total_sum = Decimal(str(row[data_cols[0]+4])) # 进出口
- monthly_export_sum = Decimal(str(row[data_cols[1]+4])) # 出口
- monthly_import_sum = Decimal(str(row[data_cols[2]+4])) # 进口
- january_monthly_total = monthly_total_sum - monthly_total
- january_monthly_export = monthly_export_sum - monthly_export
- january_monthly_import = monthly_import_sum - monthly_import
- result.append({
- "crossborder_year_month": f'{year}-01',
- "city_name": "东莞市",
- "monthly_total": january_monthly_total,
- "monthly_import": january_monthly_export,
- "monthly_export": january_monthly_import,
- })
- return pd.DataFrame(result)
- except Exception as e:
- log.error(f"提取数据出错: {e}")
- return pd.DataFrame()
- except Exception as e:
- log.error(f"处理黄埔海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_jiangmen_customs(file_path, year, month):
- """处理江门海关数据"""
- try:
- df = pd.read_excel(file_path, sheet_name=0, header=None)
- log.info(f"处理江门海关文件: {file_path.name}")
- # 从文件名确定城市
- city_name = "江门市" if "江门市" in file_path.name else "阳江市"
- target_row_name = "江门市进出口商品总值" if city_name == "江门市" else "阳江市进出口商品总值"
- # 查找目标行
- target_row_idx = None
- for idx in range(len(df)):
- if target_row_name in str(df.iloc[idx, 0]):
- target_row_idx = idx
- break
- if target_row_idx is None:
- log.error(f"未找到 {target_row_name} 行")
- return pd.DataFrame()
- # 查找包含月份的表头
- if month == 2:
- month_str = f"1-{month}月"
- else:
- month_str = f"{month}月"
- header_row = None
- for i in range(min(6, len(df))):
- if any(month_str == str(cell).strip() for cell in df.iloc[i]):
- header_row = i
- break
- if header_row is None:
- log.error(f"未找到 {month_str} 表头")
- return pd.DataFrame()
- # 确定数据列位置
- data_cols = []
- for col in range(len(df.columns)):
- cell_val = str(df.iloc[header_row, col])
- if cell_val.strip() == month_str:
- data_cols.append(col)
- if len(data_cols) < 3:
- log.error(f"未找到足够的 {month_str} 数据列")
- return pd.DataFrame()
- # 提取数据 (亿元转换为万元)
- row = df.iloc[target_row_idx]
- monthly_total = convert_unit(str(row[data_cols[0]]))
- monthly_export = convert_unit(str(row[data_cols[1]]))
- monthly_import = convert_unit(str(row[data_cols[2]]))
- yoy_import_export = str(row[data_cols[0]+1])
- yoy_export = str(row[data_cols[1]+1])
- yoy_import = str(row[data_cols[2]+1])
- return pd.DataFrame([{
- "city_name": city_name,
- "monthly_total": monthly_total,
- "monthly_import": monthly_import,
- "monthly_export": monthly_export,
- "yoy_import_export": yoy_import_export,
- "yoy_import": yoy_import,
- "yoy_export": yoy_export
- }])
- except Exception as e:
- log.error(f"处理江门海关文件出错: {str(e)}")
- return pd.DataFrame()
- def process_zhanjiang_customs(file_path, year, month):
- """处理湛江海关数据 满足「是第一次调用」或者「month == 12」任意一个条件"""
- global _zhanjiang_first_month
- # 判断是否应执行核心逻辑
- if _zhanjiang_first_month is None:
- # 第一次调用,记录初始月份
- _zhanjiang_first_month = month
- should_execute = True
- else:
- # 后续调用仅在以下情况下执行:
- # - 与初次调用的 month 相同(允许多城市同时处理)
- # - 或者 month == 12
- should_execute = (month == _zhanjiang_first_month) or (month == 12)
- if not should_execute:
- log.warning(f"跳过湛江海关{year}年{month}文件: {file_path.name}")
- return pd.DataFrame()
- try:
- df = pd.read_excel(file_path, sheet_name=0, header=None)
- log.info(f"处理湛江海关文件: {file_path.name}")
- # 从文件名确定城市
- city_name = "湛江市" if "湛江市" in file_path.name else "茂名市"
- # 查找月度数据表格
- month_str = f"{year}年前{month}个月{city_name}进出口数据(月度)"
- # target_header_row = None
- #
- # # 查找表头行
- # for i in range(min(3, len(df))): # 在前5行找表头
- # if any(month_str in str(cell) for cell in df.iloc[i]):
- # target_header_row = i
- # break
- #
- # if target_header_row is None:
- # log.error(f"未找到 {month_str} 表头")
- # return pd.DataFrame()
- target_header_row =1
- # 确定数据列位置
- data_cols = {}
- for col in range(len(df.columns)):
- cell_val = str(df.iloc[target_header_row+1, col])
- data_cols["year_month"] = 0
- if "进出口" in cell_val :
- data_cols["total"] = col
- elif "出口" in cell_val :
- data_cols["export"] = col
- elif "进口" in cell_val :
- data_cols["import"] = col
- if len(data_cols) < 1:
- log.error(f"未找到足够的 {month_str} 数据列")
- return pd.DataFrame()
- start_row = target_header_row + 4
- end_row = start_row + month
- # 提取多行数据
- rows = df.iloc[start_row:end_row]
- results = []
- for _, row in rows.iterrows():
- try:
- year_month = str(row[data_cols["year_month"]])
- formatted_year_month = f"{year_month[:4]}-{year_month[4:]}"
- monthly_total = Decimal(str(row[data_cols["total"]])) # 进出口
- monthly_export = Decimal(str(row[data_cols["export"]])) # 出口
- monthly_import = Decimal(str(row[data_cols["import"]])) # 进口
- yoy_import_export = Decimal(str(row[data_cols["total"] + 1])) # 进出口同比
- yoy_export = Decimal(str(row[data_cols["export"] + 1])) # 出口同比
- yoy_import = Decimal(str(row[data_cols["import"] + 1])) # 进口同比
- results.append({
- "crossborder_year_month":formatted_year_month,
- "city_name": city_name,
- "monthly_total": monthly_total,
- "monthly_import": monthly_import,
- "monthly_export": monthly_export,
- "yoy_import_export": yoy_import_export,
- "yoy_import": yoy_import,
- "yoy_export": yoy_export
- })
- except Exception as e:
- log.error(f"解析某一行数据出错: {e}")
- continue # 单行错误不影响整体处理
- return pd.DataFrame(results)
- except Exception as e:
- log.error(f"处理湛江海关文件出错: {str(e)}")
- return pd.DataFrame()
- def get_customs_processor(customs_name):
- """获取不同海关的处理函数"""
- processors = {
- "广州海关": process_guangzhou_customs,
- "深圳海关": process_shenzhen_customs,
- "汕头海关": process_shantou_customs,
- "黄埔海关": process_huangpu_customs,
- "江门海关": process_jiangmen_customs,
- "湛江海关": process_zhanjiang_customs
- }
- return processors.get(customs_name)
- def parse_excel(current_dir):
- """主解析入口(优化为单参数模式)
- Args:
- current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
- """
- current_path = Path(current_dir)
- year, month = extract_year_month_from_path(current_path)
- log.info(f"开始处理 {year}年{month}月 数据: {current_dir}")
- try:
- # 动态获取前月目录
- prev_dir = get_previous_month_dir(current_path) if month != 1 else None
- if prev_dir:
- log.info(f"上个月数据目录: {prev_dir}")
- # 获取当前目录下所有Excel文件
- excel_files = list(current_path.glob("*.xls*"))
- if not excel_files:
- log.warning(f"当前目录下未找到Excel文件: {current_dir}")
- return
- # 按海关处理每个文件
- all_results = pd.DataFrame()
- customs_map = {
- "广州海关": [],
- "深圳海关": [],
- "汕头海关": [],
- "黄埔海关": [],
- "江门海关": [],
- "湛江海关": []
- }
- # 组织文件到不同的海关
- for file_path in excel_files:
- for customs_name in customs_map.keys():
- if match_customs_file(file_path.name, customs_name, year, month):
- customs_map[customs_name].append(file_path)
- log.info(f"匹配到 {customs_name} 文件: {file_path.name}")
- break
- # 处理每个海关的文件
- for customs_name, file_list in customs_map.items():
- processor = get_customs_processor(customs_name)
- if not processor:
- continue
- for file_path in file_list:
- # 特殊处理深圳海关和江门海关的2月数据(缺少1月数据)
- if customs_name in ["深圳海关", "江门海关"] and month == 2:
- # 获取2月份完整数据
- df_full = processor(file_path, year, month)
- if df_full.empty:
- continue
- # 创建1月份数据 (取2月份数据的一半)
- df_half = df_full.copy()
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- # 注意:只有数值列才进行减半操作,避免对字符串操作
- if col in df_half.columns:
- df_half[col] = df_half[col] / 2
- # 设置1月份
- df_half['crossborder_year_month'] = f'{year}-01'
- # 设置2月份数据 (取2月份数据的一半)
- df_full['crossborder_year_month'] = f'{year}-02'
- for col in ['monthly_total', 'monthly_import', 'monthly_export']:
- if col in df_full.columns:
- df_full[col] = df_full[col] / 2
- # 合并数据
- df_customs = pd.concat([df_half, df_full])
- else:
- # 正常处理数据
- df_customs = processor(file_path, year, month)
- if not df_customs.empty:
- df_customs['month'] = month
- if not df_customs.empty:
- all_results = pd.concat([all_results, df_customs])
- # 如果没有获取到数据
- if all_results.empty:
- log.warning(f"未处理到有效数据: {current_dir}")
- return
- # 添加公共字段
- all_results['prov_code'] = PROV_CODE
- all_results['prov_name'] = PROV_NAME
- all_results['crossborder_year'] = year
- all_results['city_code'] = all_results['city_name'].astype(str).map(GUANGDONG_CITY).fillna('0000')
- all_results['month'] = all_results.get('month', month)
- if 'crossborder_year_month' in all_results.columns:
- all_results['crossborder_year_month'] = (
- all_results['crossborder_year_month']
- .replace('', pd.NA)
- .fillna(f'{year}-{month:02d}')
- )
- else:
- all_results['crossborder_year_month'] = f'{year}-{month:02d}'
- # 排序并删除重复项
- # all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month'])
- # all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last')
- # 打印处理结果
- log.info(f"处理完成,共获得广东省 {len(all_results)} 条地级市数据")
- # 选择入库字段
- final_df = all_results[[
- 'crossborder_year_month', 'prov_code', 'prov_name',
- 'crossborder_year','city_code', 'city_name',
- 'monthly_total','monthly_import', 'monthly_export',
- 'yoy_import_export','yoy_import', 'yoy_export'
- ]].copy()
- final_df = final_df.where(pd.notna(final_df), None)
- # 打印前几条数据
- # log.debug(f"处理后数据示例:\n{final_df.head()}")
- # 这里调用DBHelper入库(实际使用时请取消注释)
- from db_helper import DBHelper
- db = DBHelper()
- db.bulk_insert(
- final_df,
- 't_yujin_crossborder_prov_region_trade',
- conflict_columns=['crossborder_year_month', 'city_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- log.info(f"{current_dir}数据已全部成功处理")
- except Exception as e:
- log.error(f"处理失败:{current_dir},错误:{str(e)}")
- raise
- def convert_unit(value):
- """亿元转万元,处理空值"""
- try:
- # 如果 value 不是特殊的无效值,进行转换并保留4位小数
- return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None
- except (InvalidOperation, ValueError):
- # 捕获异常,返回 None
- return None
- # 测试入口
- if __name__ == "__main__":
- traverse_and_process(download_dir, parse_excel, province_name="guangdong")
- db_helper = DBHelper()
- db_helper.update_prov_yoy("广东省")
|