from decimal import Decimal, InvalidOperation from pathlib import Path import pandas as pd from utils.db_helper import DBHelper from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY from utils.log import log from utils.parse_utils import traverse_and_process, extract_year_month_from_path, get_previous_month_dir # 配置日志 PROV_CODE = "440000" PROV_NAME = "广东省" download_dir = DOWNLOAD_DIR / "guangdong" db = DBHelper() _zhanjiang_first_month = None # 广州海关:万元 # 深圳海关:亿元 # 汕头海关:万元 # 黄埔海关:万元 # 江门海关:亿元 # 湛江海关:万元 def match_customs_file(filename, customs_name, year, month): """匹配海关文件""" filename_lower = filename.lower() if customs_name == "广州海关": return "7地市进出口综合统计" in filename_lower elif customs_name == "深圳海关": return "深圳海关综合统计资料" in filename_lower elif customs_name == "汕头海关": return "5市报表" in filename_lower elif customs_name == "黄埔海关": return "东莞市进出口企业性质总值表" in filename_lower elif customs_name == "江门海关": if "江门市" in filename_lower or "阳江市" in filename_lower: return "外贸进出口有关情况统计表" in filename_lower elif customs_name == "湛江海关": if "湛江市" in filename_lower or "茂名市" in filename_lower: return "外贸进出口数据" in filename_lower return False def find_header_and_columns(df, year, month): """ 查找匹配月份的表头行并定位对应的列索引。 支持三种基础格式: - "2024年12月" - "2024年12月-2024年12月" - "2024-12-01 00:00:00" 如果都未找到,则尝试匹配特殊格式:"2023年01月-2023年02月" """ candidate_month_strs = [ f"{year}年{month:02d}月", f"{year}年{month:02d}月-{year}年{month:02d}月", f"{year}-{month:02d}-01 00:00:00" ] header_row = None for i in range(min(3, len(df))): row_cells = [str(cell).strip() for cell in df.iloc[i]] for cell_val in row_cells: if any(s == cell_val for s in candidate_month_strs): header_row = i break if header_row is not None: break # 如果没找到常规格式,尝试特殊格式:2023年01月-2023年02月 special_format = "2023年01月-2023年02月" if header_row is None: log.warning(f"未找到常规格式,尝试匹配特殊格式: {special_format}") for i in range(min(3, len(df))): row_cells = [str(cell).strip() for cell in df.iloc[i]] for cell_val in row_cells: if cell_val == special_format: header_row = i log.info(f"成功匹配特殊格式: {special_format} 行号={i}") break if header_row is not None: break if header_row is None: log.error("未找到任何支持的表头格式") return None, [] # 确定数据列位置(包含所有候选格式) data_cols = [] for col in range(len(df.columns)): cell_val = str(df.iloc[header_row, col]).strip() if cell_val in candidate_month_strs: data_cols.append(col) if not data_cols: for col in range(len(df.columns)): cell_val = str(df.iloc[header_row, col]).strip() if cell_val in [special_format]: data_cols.append(col) if not data_cols: log.error("未找到对应的数据列") return header_row, [] return header_row, data_cols def process_guangzhou_customs(file_path, year, month,customs_type='guangzhou'): """处理广州海关数据""" try: # 读取Excel文件 df = pd.read_excel(file_path, sheet_name=0, header=None) log.info(f"处理广州海关文件: {file_path.name}") header_row,data_cols = find_header_and_columns(df, year, month) # 提取7地市数据 results = [] target_cities = ["广州市", "韶关市", "佛山市", "肇庆市", "河源市", "清远市", "汕头市", "梅州市", "汕尾市", "潮州市", "揭阳市", "云浮市"] for idx in range(header_row + 1, len(df)): row = df.iloc[idx] city_cell = str(row[0]) if "广东省" in city_cell: city_name = city_cell.replace("广东省", "").strip() if city_name in target_cities: try: if len(data_cols)>3: monthly_total = Decimal(str(row[data_cols[0]])) # 进出口 monthly_export = Decimal(str(row[data_cols[4]])) # 出口 monthly_import = Decimal(str(row[data_cols[8]])) # 进口 yoy_import_export = Decimal(str(row[data_cols[1]])) # 进出口同比 yoy_export = Decimal(str(row[data_cols[5]])) # 出口同比 yoy_import = Decimal(str(row[data_cols[9]])) # 进口同比 else: monthly_total = Decimal(str(row[data_cols[0]])) monthly_export = Decimal(str(row[data_cols[1]])) monthly_import = Decimal(str(row[data_cols[2]])) yoy_import_export = Decimal(str(row[data_cols[0]+1])) # 进出口同比 yoy_export = Decimal(str(row[data_cols[1]+1])) # 出口同比 yoy_import = Decimal(str(row[data_cols[2]+1])) # 进口同比 results.append({ "city_name": city_name, "monthly_total": monthly_total, "monthly_import": monthly_import, "monthly_export": monthly_export, "yoy_import_export": yoy_import_export, "yoy_import": yoy_import, "yoy_export": yoy_export }) except Exception as e: log.error(f"处理行 {idx} 出错: {e}") return pd.DataFrame(results) except Exception as e: log.error(f"处理广州海关文件出错: {str(e)}") return pd.DataFrame() def process_shenzhen_customs(file_path, year, month): """处理深圳海关数据(完整6指标版)""" try: log.info(f"处理深圳海关文件: {file_path.name}") results = [] for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"), ("惠州市", "惠州市进出口(贸易方式)")]: try: df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) except: log.warning(f"未找到sheet: {sheet_name}") continue # 查找总值行 total_row_idx = None for idx in range(len(df)): if "总值" in str(df.iloc[idx, 0]): total_row_idx = idx break if total_row_idx is None: log.error(f"未找到总值行: {sheet_name}") continue try: # 列索引映射(基于您提供的完整数据结构) # 进出口 | 出口 | 进口 的数值和同比 monthly_total = convert_unit(str(df.iloc[total_row_idx, 1])) yoy_total = Decimal(str(df.iloc[total_row_idx, 2])) monthly_export = convert_unit(str(df.iloc[total_row_idx, 3])) yoy_export = Decimal(str(df.iloc[total_row_idx, 4])) monthly_import = convert_unit(str(df.iloc[total_row_idx, 5])) yoy_import = Decimal(str(df.iloc[total_row_idx, 6])) results.append({ "city_name": city, "monthly_total": monthly_total, "monthly_export": monthly_export, "monthly_import": monthly_import, "yoy_import_export": yoy_total, # 进出口同比 "yoy_export": yoy_export, "yoy_import": yoy_import }) except Exception as e: log.error(f"处理 {city} 数据出错: {e}") # 尝试部分提取(回退方案) try: monthly_total = Decimal(str(df.iloc[total_row_idx, 1])) * Decimal('10000') results.append({ "city_name": city, "monthly_total": monthly_total, "monthly_export": None, "monthly_import": None, "yoy_import_export": Decimal('0'), "yoy_export": Decimal('0'), "yoy_import": Decimal('0') }) except: log.error(f"连基础进出口总值都无法提取: {sheet_name}") return pd.DataFrame(results) except Exception as e: log.error(f"处理深圳海关文件出错: {str(e)}") return pd.DataFrame() def process_shantou_customs(file_path, year, month): """处理汕头海关数据 (逻辑同广州海关)""" log.info(f"处理汕头海关文件: {file_path.name}") return process_guangzhou_customs(file_path, year, month,customs_type='shantou') def process_huangpu_customs(file_path, year, month): """处理黄埔海关数据""" try: df = pd.read_excel(file_path, sheet_name=0, header=None) log.info(f"处理黄埔海关文件: {file_path.name}") # 查找合计行 total_row_idx = None for idx in range(len(df)): if "合计" in str(df.iloc[idx, 0]): total_row_idx = idx break if total_row_idx is None: log.error("未找到合计行") return pd.DataFrame() # 查找包含月份的表头,匹配23年1月-23年多种格式 if year == 2024 and month == 12: month_str = '45627' elif year == 2023 and month == 12: month_str = '45261' elif year == 2023 and month == 3: month_str = f"{year}年{month:02d}月-{year}年{month:02d}月" else: month_str = f'{year}-{month:02d}-01 00:00:00' header_row = None for i in range(min(3, len(df))): row_cells = [str(cell).strip() for cell in df.iloc[i]] if any(month_str in cell in cell for cell in row_cells): header_row = i break if header_row is None: log.error(f"未找到 {month_str} 人民币表头") return pd.DataFrame() # 确定数据列位置 data_cols = [] for col in range(len(df.columns)): cell_val = str(df.iloc[header_row, col]) if month_str in cell_val : data_cols.append(col) if len(data_cols) < 3: log.error(f"未找到足够的 {month_str} 人民币数据列") return pd.DataFrame() try: result = [] # 提取数据 row = df.iloc[total_row_idx] monthly_total = Decimal(str(row[data_cols[0]])) # 进出口 monthly_export = Decimal(str(row[data_cols[1]])) # 出口 monthly_import = Decimal(str(row[data_cols[2]])) # 进口 yoy_import_export = str(row[data_cols[0]+1]) # 进出口同比 yoy_export = str(row[data_cols[1]+1]) # 出口同比 yoy_import = str(row[data_cols[2]+1]) # 进口同比 result.append({ "crossborder_year_month": f'{year}-{month:02d}', "city_name": "东莞市", "monthly_total": monthly_total, "monthly_import": monthly_import, "monthly_export": monthly_export, "yoy_import_export": yoy_import_export, "yoy_import": yoy_import, "yoy_export": yoy_export }) #东莞市一月数据比较特殊 if month == 2: monthly_total_sum = Decimal(str(row[data_cols[0]+4])) # 进出口 monthly_export_sum = Decimal(str(row[data_cols[1]+4])) # 出口 monthly_import_sum = Decimal(str(row[data_cols[2]+4])) # 进口 january_monthly_total = monthly_total_sum - monthly_total january_monthly_export = monthly_export_sum - monthly_export january_monthly_import = monthly_import_sum - monthly_import result.append({ "crossborder_year_month": f'{year}-01', "city_name": "东莞市", "monthly_total": january_monthly_total, "monthly_import": january_monthly_export, "monthly_export": january_monthly_import, }) return pd.DataFrame(result) except Exception as e: log.error(f"提取数据出错: {e}") return pd.DataFrame() except Exception as e: log.error(f"处理黄埔海关文件出错: {str(e)}") return pd.DataFrame() def process_jiangmen_customs(file_path, year, month): """处理江门海关数据""" try: df = pd.read_excel(file_path, sheet_name=0, header=None) log.info(f"处理江门海关文件: {file_path.name}") # 从文件名确定城市 city_name = "江门市" if "江门市" in file_path.name else "阳江市" target_row_name = "江门市进出口商品总值" if city_name == "江门市" else "阳江市进出口商品总值" # 查找目标行 target_row_idx = None for idx in range(len(df)): if target_row_name in str(df.iloc[idx, 0]): target_row_idx = idx break if target_row_idx is None: log.error(f"未找到 {target_row_name} 行") return pd.DataFrame() # 查找包含月份的表头 if month == 2: month_str = f"1-{month}月" else: month_str = f"{month}月" header_row = None for i in range(min(6, len(df))): if any(month_str == str(cell).strip() for cell in df.iloc[i]): header_row = i break if header_row is None: log.error(f"未找到 {month_str} 表头") return pd.DataFrame() # 确定数据列位置 data_cols = [] for col in range(len(df.columns)): cell_val = str(df.iloc[header_row, col]) if cell_val.strip() == month_str: data_cols.append(col) if len(data_cols) < 3: log.error(f"未找到足够的 {month_str} 数据列") return pd.DataFrame() # 提取数据 (亿元转换为万元) row = df.iloc[target_row_idx] monthly_total = convert_unit(str(row[data_cols[0]])) monthly_export = convert_unit(str(row[data_cols[1]])) monthly_import = convert_unit(str(row[data_cols[2]])) yoy_import_export = str(row[data_cols[0]+1]) yoy_export = str(row[data_cols[1]+1]) yoy_import = str(row[data_cols[2]+1]) return pd.DataFrame([{ "city_name": city_name, "monthly_total": monthly_total, "monthly_import": monthly_import, "monthly_export": monthly_export, "yoy_import_export": yoy_import_export, "yoy_import": yoy_import, "yoy_export": yoy_export }]) except Exception as e: log.error(f"处理江门海关文件出错: {str(e)}") return pd.DataFrame() def process_zhanjiang_customs(file_path, year, month): """处理湛江海关数据 满足「是第一次调用」或者「month == 12」任意一个条件""" global _zhanjiang_first_month # 判断是否应执行核心逻辑 if _zhanjiang_first_month is None: # 第一次调用,记录初始月份 _zhanjiang_first_month = month should_execute = True else: # 后续调用仅在以下情况下执行: # - 与初次调用的 month 相同(允许多城市同时处理) # - 或者 month == 12 should_execute = (month == _zhanjiang_first_month) or (month == 12) if not should_execute: log.warning(f"跳过湛江海关{year}年{month}文件: {file_path.name}") return pd.DataFrame() try: df = pd.read_excel(file_path, sheet_name=0, header=None) log.info(f"处理湛江海关文件: {file_path.name}") # 从文件名确定城市 city_name = "湛江市" if "湛江市" in file_path.name else "茂名市" # 查找月度数据表格 month_str = f"{year}年前{month}个月{city_name}进出口数据(月度)" # target_header_row = None # # # 查找表头行 # for i in range(min(3, len(df))): # 在前5行找表头 # if any(month_str in str(cell) for cell in df.iloc[i]): # target_header_row = i # break # # if target_header_row is None: # log.error(f"未找到 {month_str} 表头") # return pd.DataFrame() target_header_row =1 # 确定数据列位置 data_cols = {} for col in range(len(df.columns)): cell_val = str(df.iloc[target_header_row+1, col]) data_cols["year_month"] = 0 if "进出口" in cell_val : data_cols["total"] = col elif "出口" in cell_val : data_cols["export"] = col elif "进口" in cell_val : data_cols["import"] = col if len(data_cols) < 1: log.error(f"未找到足够的 {month_str} 数据列") return pd.DataFrame() start_row = target_header_row + 4 end_row = start_row + month # 提取多行数据 rows = df.iloc[start_row:end_row] results = [] for _, row in rows.iterrows(): try: year_month = str(row[data_cols["year_month"]]) formatted_year_month = f"{year_month[:4]}-{year_month[4:]}" monthly_total = Decimal(str(row[data_cols["total"]])) # 进出口 monthly_export = Decimal(str(row[data_cols["export"]])) # 出口 monthly_import = Decimal(str(row[data_cols["import"]])) # 进口 yoy_import_export = Decimal(str(row[data_cols["total"] + 1])) # 进出口同比 yoy_export = Decimal(str(row[data_cols["export"] + 1])) # 出口同比 yoy_import = Decimal(str(row[data_cols["import"] + 1])) # 进口同比 results.append({ "crossborder_year_month":formatted_year_month, "city_name": city_name, "monthly_total": monthly_total, "monthly_import": monthly_import, "monthly_export": monthly_export, "yoy_import_export": yoy_import_export, "yoy_import": yoy_import, "yoy_export": yoy_export }) except Exception as e: log.error(f"解析某一行数据出错: {e}") continue # 单行错误不影响整体处理 return pd.DataFrame(results) except Exception as e: log.error(f"处理湛江海关文件出错: {str(e)}") return pd.DataFrame() def get_customs_processor(customs_name): """获取不同海关的处理函数""" processors = { "广州海关": process_guangzhou_customs, "深圳海关": process_shenzhen_customs, "汕头海关": process_shantou_customs, "黄埔海关": process_huangpu_customs, "江门海关": process_jiangmen_customs, "湛江海关": process_zhanjiang_customs } return processors.get(customs_name) def parse_excel(current_dir): """主解析入口(优化为单参数模式) Args: current_dir (str): 当前月份数据目录(格式:/年份/省份/月份) """ current_path = Path(current_dir) year, month = extract_year_month_from_path(current_path) log.info(f"开始处理 {year}年{month}月 数据: {current_dir}") try: # 动态获取前月目录 prev_dir = get_previous_month_dir(current_path) if month != 1 else None if prev_dir: log.info(f"上个月数据目录: {prev_dir}") # 获取当前目录下所有Excel文件 excel_files = list(current_path.glob("*.xls*")) if not excel_files: log.warning(f"当前目录下未找到Excel文件: {current_dir}") return # 按海关处理每个文件 all_results = pd.DataFrame() customs_map = { "广州海关": [], "深圳海关": [], "汕头海关": [], "黄埔海关": [], "江门海关": [], "湛江海关": [] } # 组织文件到不同的海关 for file_path in excel_files: for customs_name in customs_map.keys(): if match_customs_file(file_path.name, customs_name, year, month): customs_map[customs_name].append(file_path) log.info(f"匹配到 {customs_name} 文件: {file_path.name}") break # 处理每个海关的文件 for customs_name, file_list in customs_map.items(): processor = get_customs_processor(customs_name) if not processor: continue for file_path in file_list: # 特殊处理深圳海关和江门海关的2月数据(缺少1月数据) if customs_name in ["深圳海关", "江门海关"] and month == 2: # 获取2月份完整数据 df_full = processor(file_path, year, month) if df_full.empty: continue # 创建1月份数据 (取2月份数据的一半) df_half = df_full.copy() for col in ['monthly_total', 'monthly_import', 'monthly_export']: # 注意:只有数值列才进行减半操作,避免对字符串操作 if col in df_half.columns: df_half[col] = df_half[col] / 2 # 设置1月份 df_half['crossborder_year_month'] = f'{year}-01' # 设置2月份数据 (取2月份数据的一半) df_full['crossborder_year_month'] = f'{year}-02' for col in ['monthly_total', 'monthly_import', 'monthly_export']: if col in df_full.columns: df_full[col] = df_full[col] / 2 # 合并数据 df_customs = pd.concat([df_half, df_full]) else: # 正常处理数据 df_customs = processor(file_path, year, month) if not df_customs.empty: df_customs['month'] = month if not df_customs.empty: all_results = pd.concat([all_results, df_customs]) # 如果没有获取到数据 if all_results.empty: log.warning(f"未处理到有效数据: {current_dir}") return # 添加公共字段 all_results['prov_code'] = PROV_CODE all_results['prov_name'] = PROV_NAME all_results['crossborder_year'] = year all_results['city_code'] = all_results['city_name'].astype(str).map(GUANGDONG_CITY).fillna('0000') all_results['month'] = all_results.get('month', month) if 'crossborder_year_month' in all_results.columns: all_results['crossborder_year_month'] = ( all_results['crossborder_year_month'] .replace('', pd.NA) .fillna(f'{year}-{month:02d}') ) else: all_results['crossborder_year_month'] = f'{year}-{month:02d}' # 排序并删除重复项 # all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month']) # all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last') # 打印处理结果 log.info(f"处理完成,共获得广东省 {len(all_results)} 条地级市数据") # 选择入库字段 final_df = all_results[[ 'crossborder_year_month', 'prov_code', 'prov_name', 'crossborder_year','city_code', 'city_name', 'monthly_total','monthly_import', 'monthly_export', 'yoy_import_export','yoy_import', 'yoy_export' ]].copy() final_df = final_df.where(pd.notna(final_df), None) # 打印前几条数据 # log.debug(f"处理后数据示例:\n{final_df.head()}") # 这里调用DBHelper入库(实际使用时请取消注释) from utils.db_helper import DBHelper db = DBHelper() db.bulk_insert( final_df, 't_yujin_crossborder_prov_region_trade', conflict_columns=['crossborder_year_month', 'city_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) log.info(f"{current_dir}数据已全部成功处理") except Exception as e: log.error(f"处理失败:{current_dir},错误:{str(e)}") raise def convert_unit(value): """亿元转万元,处理空值""" try: # 如果 value 不是特殊的无效值,进行转换并保留4位小数 return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None except (InvalidOperation, ValueError): # 捕获异常,返回 None return None # 测试入口 if __name__ == "__main__": traverse_and_process(download_dir, parse_excel, province_name="guangdong") db_helper = DBHelper() db_helper.update_prov_yoy("广东省")