from decimal import Decimal from pathlib import Path import pandas as pd from openpyxl import load_workbook from db_helper import DBHelper from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY from utils.log import log from utils.parse_utils import traverse_and_process, extract_year_month_from_path, get_previous_month_dir # 配置日志 PROV_CODE = "440000" PROV_NAME = "广东省" download_dir = DOWNLOAD_DIR / "guangdong" db = DBHelper() def match_customs_file(filename, customs_name, year, month): """匹配海关文件""" filename_lower = filename.lower() if customs_name == "广州海关": return "7地市进出口综合统计" in filename_lower elif customs_name == "深圳海关": return "深圳海关综合统计资料" in filename_lower elif customs_name == "汕头海关": return "5市报表" in filename_lower elif customs_name == "黄埔海关": return "东莞市进出口企业性质总值表" in filename_lower elif customs_name == "江门海关": if "江门市" in filename_lower or "阳江市" in filename_lower: return "外贸进出口有关情况统计表" in filename_lower elif customs_name == "湛江海关": if "湛江市" in filename_lower or "茂名市" in filename_lower: return "外贸进出口数据" in filename_lower return False def process_guangzhou_customs(file_path, year, month): """处理广州海关数据""" try: # 读取Excel文件 wb = load_workbook(file_path, data_only=True) sheet = wb.worksheets[0] # 查找包含月份的表头行 month_str = f"{year}年{month}月" header_row = None for i in range(1, 4): # 检查前3行 row_values = [str(cell.value).strip() if cell.value else "" for cell in sheet[i]] if any(month_str in val for val in row_values): header_row = i break if header_row is None: log.error(f"未找到 {month_str} 的表头") return pd.DataFrame() # 确定数据列位置 data_cols = [] for cell in sheet[header_row]: if cell.value and month_str in str(cell.value): data_cols.append(cell.column - 1) # 转换为0-based索引 if len(data_cols) < 6: log.error(f"未找到足够的 {month_str} 数据列") return pd.DataFrame() # 提取7地市数据 results = [] target_cities = ["广州市", "深圳市", "东莞市", "汕头市", "江门市", "湛江市", "茂名市"] for row in sheet.iter_rows(min_row=header_row + 1): city_cell = row[0].value if city_cell and "广东省" in str(city_cell): city_name = str(city_cell).replace("广东省", "").strip() if city_name in target_cities: try: # 获取各列值 total = row[data_cols[0]].value export = row[data_cols[1]].value import_val = row[data_cols[2]].value yoy_total = row[data_cols[3]].value yoy_export = row[data_cols[4]].value yoy_import = row[data_cols[5]].value # 转换数据类型 def convert_value(val): if isinstance(val, (int, float)): return Decimal(str(val)) elif isinstance(val, str) and val.replace(".", "").isdigit(): return Decimal(val) return Decimal(0) # 添加到结果 results.append({ "city_name": city_name, "monthly_total": convert_value(total), "monthly_import": convert_value(import_val), "monthly_export": convert_value(export), "yoy_import_export": convert_value(yoy_total), "yoy_import": convert_value(yoy_import), "yoy_export": convert_value(yoy_export) }) except Exception as e: log.error(f"处理城市 {city_name} 出错: {e}") return pd.DataFrame(results) except Exception as e: log.error(f"处理广州海关文件出错: {str(e)}") return pd.DataFrame() def process_shenzhen_customs(file_path, year, month): """处理深圳海关数据""" try: wb = load_workbook(file_path, data_only=True) results = [] # 处理深圳和惠州两个sheet for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"), ("惠州市", "惠州市进出口(贸易方式)")]: try: if sheet_name in wb.sheetnames: sheet = wb[sheet_name] else: log.warning(f"未找到sheet: {sheet_name}") continue # 查找总值行 total_row_idx = None for i, row in enumerate(sheet.iter_rows(values_only=True), 1): if row and "总值" in str(row[0]): total_row_idx = i break if total_row_idx is None: log.error(f"未找到总值行: {sheet_name}") continue # 查找包含月份的表头 month_str = f"{year}年{month}月" header_row = None data_col = None for i, row in enumerate(sheet.iter_rows(max_row=3, values_only=True), 1): if any(month_str in str(cell) for cell in row if cell): header_row = i for col_idx, cell_val in enumerate(row): if cell_val and month_str in str(cell_val): data_col = col_idx break break if data_col is None: log.error(f"未找到 {month_str} 列") continue # 获取数据值 (亿元转换为万元) total_value = sheet.cell(row=total_row_idx, column=data_col + 1).value yoy_value = sheet.cell(row=total_row_idx, column=data_col + 2).value if total_value is None or yoy_value is None: log.error(f"{city} 数据为空") continue # 转换数据类型 def convert_value(val): if isinstance(val, (int, float)): return Decimal(str(val)) elif isinstance(val, str) and val.replace(".", "").isdigit(): return Decimal(val) return Decimal(0) # 添加到结果 results.append({ "city_name": city, "monthly_total": convert_value(total_value) * Decimal('10000'), "monthly_import": None, # 没有单独的进口/出口数据 "monthly_export": None, "yoy_import_export": convert_value(yoy_value), "yoy_import": Decimal(0), "yoy_export": Decimal(0) }) except Exception as e: log.error(f"处理 {city} 数据出错: {str(e)}") return pd.DataFrame(results) except Exception as e: log.error(f"处理深圳海关文件出错: {str(e)}") return pd.DataFrame() def process_shantou_customs(file_path, year, month): """处理汕头海关数据 (逻辑同广州海关)""" log.info(f"处理汕头海关文件: {file_path.name}") return process_guangzhou_customs(file_path, year, month) def process_huangpu_customs(file_path, year, month): """处理黄埔海关数据""" try: wb = load_workbook(file_path, data_only=True) sheet = wb.active # 查找合计行 total_row_idx = None for i, row in enumerate(sheet.iter_rows(values_only=True), 1): if row and "合计" in str(row[0]): total_row_idx = i break if total_row_idx is None: log.error("未找到合计行") return pd.DataFrame() # 查找包含月份的表头 month_str = f"{year}年{month}月" header_row = None data_cols = [] for i in range(1, 4): # 检查前3行 row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]] if any(month_str in val and "人民币" in val for val in row_values): header_row = i for col_idx, val in enumerate(row_values): if val and month_str in val and "人民币" in val: data_cols.append(col_idx) break if len(data_cols) < 6: log.error(f"未找到足够的 {month_str} 人民币数据列") return pd.DataFrame() # 获取合计行数据 row_values = [cell.value for cell in sheet[total_row_idx]] # 转换数据类型 def convert_value(val): if isinstance(val, (int, float)): return Decimal(str(val)) elif isinstance(val, str) and val.replace(".", "").isdigit(): return Decimal(val) return Decimal(0) # 提取数据 results = [{ "city_name": "东莞市", "monthly_total": convert_value(row_values[data_cols[0]]), # 进出口 "monthly_export": convert_value(row_values[data_cols[1]]), # 出口 "monthly_import": convert_value(row_values[data_cols[2]]), # 进口 "yoy_import_export": convert_value(row_values[data_cols[3]]), # 进出口同比 "yoy_export": convert_value(row_values[data_cols[4]]), # 出口同比 "yoy_import": convert_value(row_values[data_cols[5]]) # 进口同比 }] return pd.DataFrame(results) except Exception as e: log.error(f"处理黄埔海关文件出错: {str(e)}") return pd.DataFrame() def process_jiangmen_customs(file_path, year, month): """处理江门海关数据""" try: wb = load_workbook(file_path, data_only=True) sheet = wb.active # 从文件名确定城市 city_name = "江门市" if "江门市" in file_path.name else "阳江市" target_row_name = "江门市进出口商品" if city_name == "江门市" else "阳江市进出口商品总值" # 查找目标行 target_row_idx = None for i, row in enumerate(sheet.iter_rows(values_only=True), 1): if row and target_row_name in str(row[0]): target_row_idx = i break if target_row_idx is None: log.error(f"未找到 {target_row_name} 行") return pd.DataFrame() # 查找包含月份的表头 month_str = f"{year}年{month}月" header_row = None data_cols = [] for i in range(1, 4): # 检查前3行 row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]] if any(month_str in val for val in row_values): header_row = i for col_idx, val in enumerate(row_values): if val and month_str in val: data_cols.append(col_idx) break if len(data_cols) < 6: log.error(f"未找到足够的 {month_str} 数据列") return pd.DataFrame() # 获取目标行数据 row_values = [cell.value for cell in sheet[target_row_idx]] # 转换数据类型 def convert_value(val): if isinstance(val, (int, float)): return Decimal(str(val)) elif isinstance(val, str) and val.replace(".", "").isdigit(): return Decimal(val) return Decimal(0) # 提取数据 (亿元转换为万元) return pd.DataFrame([{ "city_name": city_name, "monthly_total": convert_value(row_values[data_cols[0]]) * Decimal('10000'), # 进出口 "monthly_export": convert_value(row_values[data_cols[1]]) * Decimal('10000'), # 出口 "monthly_import": convert_value(row_values[data_cols[2]]) * Decimal('10000'), # 进口 "yoy_import_export": convert_value(row_values[data_cols[3]]), # 进出口同比 "yoy_export": convert_value(row_values[data_cols[4]]), # 出口同比 "yoy_import": convert_value(row_values[data_cols[5]]) # 进口同比 }]) except Exception as e: log.error(f"处理江门海关文件出错: {str(e)}") return pd.DataFrame() def process_zhanjiang_customs(file_path, year, month): """处理湛江海关数据""" try: wb = load_workbook(file_path, data_only=True) sheet = wb.worksheets[0] # 从文件名确定城市 city_name = "湛江市" if "湛江市" in file_path.name else "茂名市" # 查找月度数据表格 table_start_row = None month_str = f"{year}年{month}月" for i, row in enumerate(sheet.iter_rows(values_only=True), 1): if row and any(month_str in str(cell) for cell in row if cell): table_start_row = i break if table_start_row is None: log.error(f"未找到 {month_str} 月度数据表") return pd.DataFrame() # 查找目标行(城市名所在行) target_row_idx = None for i in range(table_start_row, table_start_row + 20): # 在后续行中查找 row_val = sheet.cell(row=i, column=1).value if row_val and city_name in str(row_val): target_row_idx = i break if target_row_idx is None: log.error(f"未找到 {city_name} 数据行") return pd.DataFrame() # 提取数据 results = [] for col in [2, 3, 4, 5, 6, 7]: # 依次为进出口、出口、进口、进出口同比、出口同比、进口同比 cell_value = sheet.cell(row=target_row_idx, column=col).value results.append(cell_value) # 转换数据类型 def convert_value(val): if isinstance(val, (int, float)): return Decimal(str(val)) elif isinstance(val, str) and val.replace(".", "").isdigit(): return Decimal(val) return Decimal(0) return pd.DataFrame([{ "city_name": city_name, "monthly_total": convert_value(results[0]), "monthly_export": convert_value(results[1]), "monthly_import": convert_value(results[2]), "yoy_import_export": convert_value(results[3]), "yoy_export": convert_value(results[4]), "yoy_import": convert_value(results[5]) }]) except Exception as e: log.error(f"处理湛江海关文件出错: {str(e)}") return pd.DataFrame() def get_customs_processor(customs_name): """获取不同海关的处理函数""" processors = { "广州海关": process_guangzhou_customs, "深圳海关": process_shenzhen_customs, "汕头海关": process_shantou_customs, "黄埔海关": process_huangpu_customs, "江门海关": process_jiangmen_customs, "湛江海关": process_zhanjiang_customs } return processors.get(customs_name) def parse_excel(current_dir): """主解析入口(优化为单参数模式) Args: current_dir (str): 当前月份数据目录(格式:/年份/省份/月份) """ current_path = Path(current_dir) year, month = extract_year_month_from_path(current_path) log.info(f"开始处理 {year}年{month}月 数据: {current_dir}") try: # 动态获取前月目录 prev_dir = get_previous_month_dir(current_path) if month != 1 else None if prev_dir: log.info(f"上个月数据目录: {prev_dir}") # 获取当前目录下所有Excel文件 excel_files = list(current_path.glob("*.xls*")) if not excel_files: log.warning(f"当前目录下未找到Excel文件: {current_dir}") return # 按海关处理每个文件 all_results = pd.DataFrame() customs_map = { "广州海关": [], "深圳海关": [], "汕头海关": [], "黄埔海关": [], "江门海关": [], "湛江海关": [] } # 组织文件到不同的海关 for file_path in excel_files: for customs_name in customs_map.keys(): if match_customs_file(file_path.name, customs_name, year, month): customs_map[customs_name].append(file_path) log.info(f"匹配到 {customs_name} 文件: {file_path.name}") break # 处理每个海关的文件 for customs_name, file_list in customs_map.items(): processor = get_customs_processor(customs_name) if not processor: continue for file_path in file_list: # 特殊处理深圳海关和江门海关的2月数据(缺少1月数据) if customs_name in ["深圳海关", "江门海关"] and month == 2: # 获取2月份完整数据 df_full = processor(file_path, year, month) if df_full.empty: continue # 创建1月份数据 (取2月份数据的一半) df_half = df_full.copy() for col in ['monthly_total', 'monthly_import', 'monthly_export']: df_half[col] = df_half[col] / 2 # 设置1月份数据 df_half['month'] = 1 # 设置2月份数据 (取2月份数据的一半) df_full['month'] = 2 for col in ['monthly_total', 'monthly_import', 'monthly_export']: df_full[col] = df_full[col] / 2 # 合并数据 df_customs = pd.concat([df_half, df_full]) else: # 正常处理数据 df_customs = processor(file_path, year, month) if not df_customs.empty: df_customs['month'] = month if not df_customs.empty: all_results = pd.concat([all_results, df_customs]) # 如果没有获取到数据 if all_results.empty: log.warning(f"未处理到有效数据: {current_dir}") return # 添加公共字段 all_results['prov_code'] = PROV_CODE all_results['prov_name'] = PROV_NAME all_results['year'] = year all_results['month'] = all_results.get('month', month) all_results['crossborder_year_month'] = all_results['year'].astype(str) + '-' + all_results['month'].astype( str).str.zfill(2) # 添加城市编码 def get_city_code(row): return GUANGDONG_CITY.get(row['city_name'], '0000') all_results['city_code'] = all_results.apply(get_city_code, axis=1) # 排序并删除重复项 all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month']) all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last') # 打印处理结果 log.info(f"处理完成,共获得 {len(all_results)} 条数据") # 选择入库字段 final_df = all_results[[ 'crossborder_year_month', 'prov_code', 'prov_name', 'city_code', 'city_name', 'monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export' ]].copy() # 打印前几条数据 log.info(f"处理后数据示例:\n{final_df.head()}") # 这里调用DBHelper入库(实际使用时请取消注释) """ from db_helper import DBHelper db = DBHelper() db.bulk_insert( final_df, 't_yujin_crossborder_prov_region_trade', conflict_columns=['crossborder_year_month', 'city_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) """ log.info(f"{current_dir}数据已全部成功处理") except Exception as e: log.error(f"处理失败:{current_dir},错误:{str(e)}") raise # 遍历目录的函数(原样保留) # 测试入口 if __name__ == "__main__": traverse_and_process(download_dir, parse_excel, province_name="guangdong")