Procházet zdrojové kódy

广东省-6个分属海关地市数据清洗

01495251 před 2 dny
rodič
revize
3264c8dc7d

+ 4 - 4
db_helper.py

@@ -141,7 +141,7 @@ class DBHelper:
             # 步骤2:计算新数据
             updated = self._update_prov_new_yoy(prov_name)
 
-            log.info(f"山东省同比处理完成 | 清零:{cleared} 更新:{updated}")
+            log.info(f"{prov_name}同比处理完成 | 清零:{cleared} 更新:{updated}")
             return {'cleared': cleared, 'updated': updated}
         except Exception as e:
             log.error(f"{prov_name}数据处理失败", exc_info=True)
@@ -153,9 +153,9 @@ class DBHelper:
         """
         clear_sql = text("""
                          UPDATE t_yujin_crossborder_prov_region_trade
-                         SET yoy_import_export = 0.0000,
-                             yoy_export        = 0.0000,
-                             yoy_import        = 0.0000
+                         SET yoy_import_export = null,
+                             yoy_export        = null,
+                             yoy_import        = null
                          WHERE prov_name = :prov_name
                            AND crossborder_year_month < '2024-01'
                            AND (yoy_import_export != 0 

+ 382 - 260
guangdong/guangdong_sub_customs_parse_excel.py

@@ -1,8 +1,7 @@
-from decimal import Decimal
+from decimal import Decimal, InvalidOperation
 from pathlib import Path
 
 import pandas as pd
-from openpyxl import load_workbook
 
 from db_helper import DBHelper
 from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY
@@ -16,9 +15,16 @@ download_dir = DOWNLOAD_DIR / "guangdong"
 
 db = DBHelper()
 
+_zhanjiang_first_month = None
 
 
 
+# 广州海关:万元
+# 深圳海关:亿元
+# 汕头海关:万元
+# 黄埔海关:万元
+# 江门海关:亿元
+# 湛江海关:万元
 
 def match_customs_file(filename, customs_name, year, month):
     """匹配海关文件"""
@@ -41,74 +47,119 @@ def match_customs_file(filename, customs_name, year, month):
     return False
 
 
-def process_guangzhou_customs(file_path, year, month):
-    """处理广州海关数据"""
-    try:
-        # 读取Excel文件
-        wb = load_workbook(file_path, data_only=True)
-        sheet = wb.worksheets[0]
-
-        # 查找包含月份的表头行
-        month_str = f"{year}年{month}月"
-        header_row = None
-        for i in range(1, 4):  # 检查前3行
-            row_values = [str(cell.value).strip() if cell.value else "" for cell in sheet[i]]
-            if any(month_str in val for val in row_values):
+def find_header_and_columns(df, year, month):
+    """
+    查找匹配月份的表头行并定位对应的列索引。
+    支持三种基础格式:
+        - "2024年12月"
+        - "2024年12月-2024年12月"
+        - "2024-12-01 00:00:00"
+    如果都未找到,则尝试匹配特殊格式:"2023年01月-2023年02月"
+    """
+    candidate_month_strs = [
+        f"{year}年{month:02d}月",
+        f"{year}年{month:02d}月-{year}年{month:02d}月",
+        f"{year}-{month:02d}-01 00:00:00"
+    ]
+
+    header_row = None
+    for i in range(min(3, len(df))):
+        row_cells = [str(cell).strip() for cell in df.iloc[i]]
+        for cell_val in row_cells:
+            if any(s == cell_val for s in candidate_month_strs):
                 header_row = i
                 break
+        if header_row is not None:
+            break
+
+    # 如果没找到常规格式,尝试特殊格式:2023年01月-2023年02月
+    special_format = "2023年01月-2023年02月"
+    if header_row is None:
+        log.warning(f"未找到常规格式,尝试匹配特殊格式: {special_format}")
+        for i in range(min(3, len(df))):
+            row_cells = [str(cell).strip() for cell in df.iloc[i]]
+            for cell_val in row_cells:
+                if cell_val == special_format:
+                    header_row = i
+                    log.info(f"成功匹配特殊格式: {special_format} 行号={i}")
+                    break
+            if header_row is not None:
+                break
 
-        if header_row is None:
-            log.error(f"未找到 {month_str} 的表头")
-            return pd.DataFrame()
+    if header_row is None:
+        log.error("未找到任何支持的表头格式")
+        return None, []
 
-        # 确定数据列位置
-        data_cols = []
-        for cell in sheet[header_row]:
-            if cell.value and month_str in str(cell.value):
-                data_cols.append(cell.column - 1)  # 转换为0-based索引
+    # 确定数据列位置(包含所有候选格式)
+    data_cols = []
+    for col in range(len(df.columns)):
+        cell_val = str(df.iloc[header_row, col]).strip()
+        if cell_val in candidate_month_strs:
+            data_cols.append(col)
 
-        if len(data_cols) < 6:
-            log.error(f"未找到足够的 {month_str} 数据列")
-            return pd.DataFrame()
+    if not data_cols:
+        for col in range(len(df.columns)):
+            cell_val = str(df.iloc[header_row, col]).strip()
+            if cell_val in [special_format]:
+                data_cols.append(col)
+
+    if not data_cols:
+        log.error("未找到对应的数据列")
+        return header_row, []
+
+    return header_row, data_cols
+
+
+
+def process_guangzhou_customs(file_path, year, month,customs_type='guangzhou'):
+    """处理广州海关数据"""
+    try:
+        # 读取Excel文件
+        df = pd.read_excel(file_path, sheet_name=0, header=None)
+        log.info(f"处理广州海关文件: {file_path.name}")
+
+        header_row,data_cols = find_header_and_columns(df, year, month)
 
         # 提取7地市数据
         results = []
-        target_cities = ["广州市", "深圳市", "东莞市", "汕头市", "江门市", "湛江市", "茂名市"]
 
-        for row in sheet.iter_rows(min_row=header_row + 1):
-            city_cell = row[0].value
-            if city_cell and "广东省" in str(city_cell):
-                city_name = str(city_cell).replace("广东省", "").strip()
+        target_cities = ["广州市", "韶关市", "佛山市", "肇庆市", "河源市",
+                         "清远市", "汕头市", "梅州市", "汕尾市", "潮州市", "揭阳市", "云浮市"]
+
+        for idx in range(header_row + 1, len(df)):
+            row = df.iloc[idx]
+            city_cell = str(row[0])
+            if "广东省" in city_cell:
+                city_name = city_cell.replace("广东省", "").strip()
                 if city_name in target_cities:
                     try:
-                        # 获取各列值
-                        total = row[data_cols[0]].value
-                        export = row[data_cols[1]].value
-                        import_val = row[data_cols[2]].value
-                        yoy_total = row[data_cols[3]].value
-                        yoy_export = row[data_cols[4]].value
-                        yoy_import = row[data_cols[5]].value
-
-                        # 转换数据类型
-                        def convert_value(val):
-                            if isinstance(val, (int, float)):
-                                return Decimal(str(val))
-                            elif isinstance(val, str) and val.replace(".", "").isdigit():
-                                return Decimal(val)
-                            return Decimal(0)
-
-                        # 添加到结果
+                        if len(data_cols)>3:
+                            monthly_total = Decimal(str(row[data_cols[0]]))  # 进出口
+                            monthly_export = Decimal(str(row[data_cols[4]]))  # 出口
+                            monthly_import = Decimal(str(row[data_cols[8]]))  # 进口
+                            yoy_import_export = Decimal(str(row[data_cols[1]]))  # 进出口同比
+                            yoy_export = Decimal(str(row[data_cols[5]]))  # 出口同比
+                            yoy_import = Decimal(str(row[data_cols[9]]))  # 进口同比
+                        else:
+                            monthly_total = Decimal(str(row[data_cols[0]]))
+                            monthly_export = Decimal(str(row[data_cols[1]]))
+                            monthly_import = Decimal(str(row[data_cols[2]]))
+                            yoy_import_export = Decimal(str(row[data_cols[0]+1]))  # 进出口同比
+                            yoy_export = Decimal(str(row[data_cols[1]+1]))  # 出口同比
+                            yoy_import = Decimal(str(row[data_cols[2]+1]))  # 进口同比
+
+
                         results.append({
                             "city_name": city_name,
-                            "monthly_total": convert_value(total),
-                            "monthly_import": convert_value(import_val),
-                            "monthly_export": convert_value(export),
-                            "yoy_import_export": convert_value(yoy_total),
-                            "yoy_import": convert_value(yoy_import),
-                            "yoy_export": convert_value(yoy_export)
+                            "monthly_total": monthly_total,
+                            "monthly_import": monthly_import,
+                            "monthly_export": monthly_export,
+                            "yoy_import_export": yoy_import_export,
+                            "yoy_import": yoy_import,
+                            "yoy_export": yoy_export
                         })
                     except Exception as e:
-                        log.error(f"处理城市 {city_name} 出错: {e}")
+                        log.error(f"处理行 {idx} 出错: {e}")
 
         return pd.DataFrame(results)
 
@@ -118,79 +169,67 @@ def process_guangzhou_customs(file_path, year, month):
 
 
 def process_shenzhen_customs(file_path, year, month):
-    """处理深圳海关数据"""
+    """处理深圳海关数据(完整6指标版)"""
     try:
-        wb = load_workbook(file_path, data_only=True)
+        log.info(f"处理深圳海关文件: {file_path.name}")
         results = []
 
-        # 处理深圳和惠州两个sheet
         for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"),
                                  ("惠州市", "惠州市进出口(贸易方式)")]:
             try:
-                if sheet_name in wb.sheetnames:
-                    sheet = wb[sheet_name]
-                else:
-                    log.warning(f"未找到sheet: {sheet_name}")
-                    continue
-
-                # 查找总值行
-                total_row_idx = None
-                for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
-                    if row and "总值" in str(row[0]):
-                        total_row_idx = i
-                        break
-
-                if total_row_idx is None:
-                    log.error(f"未找到总值行: {sheet_name}")
-                    continue
-
-                # 查找包含月份的表头
-                month_str = f"{year}年{month}月"
-                header_row = None
-                data_col = None
-
-                for i, row in enumerate(sheet.iter_rows(max_row=3, values_only=True), 1):
-                    if any(month_str in str(cell) for cell in row if cell):
-                        header_row = i
-                        for col_idx, cell_val in enumerate(row):
-                            if cell_val and month_str in str(cell_val):
-                                data_col = col_idx
-                                break
-                        break
-
-                if data_col is None:
-                    log.error(f"未找到 {month_str} 列")
-                    continue
-
-                # 获取数据值 (亿元转换为万元)
-                total_value = sheet.cell(row=total_row_idx, column=data_col + 1).value
-                yoy_value = sheet.cell(row=total_row_idx, column=data_col + 2).value
-
-                if total_value is None or yoy_value is None:
-                    log.error(f"{city} 数据为空")
-                    continue
-
-                # 转换数据类型
-                def convert_value(val):
-                    if isinstance(val, (int, float)):
-                        return Decimal(str(val))
-                    elif isinstance(val, str) and val.replace(".", "").isdigit():
-                        return Decimal(val)
-                    return Decimal(0)
-
-                # 添加到结果
+                df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
+            except:
+                log.warning(f"未找到sheet: {sheet_name}")
+                continue
+
+            # 查找总值行
+            total_row_idx = None
+            for idx in range(len(df)):
+                if "总值" in str(df.iloc[idx, 0]):
+                    total_row_idx = idx
+                    break
+
+            if total_row_idx is None:
+                log.error(f"未找到总值行: {sheet_name}")
+                continue
+
+            try:
+                # 列索引映射(基于您提供的完整数据结构)
+                # 进出口 | 出口 | 进口 的数值和同比
+                monthly_total = convert_unit(str(df.iloc[total_row_idx, 1]))
+                yoy_total = Decimal(str(df.iloc[total_row_idx, 2]))
+
+                monthly_export = convert_unit(str(df.iloc[total_row_idx, 3]))
+                yoy_export = Decimal(str(df.iloc[total_row_idx, 4]))
+
+                monthly_import = convert_unit(str(df.iloc[total_row_idx, 5]))
+                yoy_import = Decimal(str(df.iloc[total_row_idx, 6]))
+
                 results.append({
                     "city_name": city,
-                    "monthly_total": convert_value(total_value) * Decimal('10000'),
-                    "monthly_import": None,  # 没有单独的进口/出口数据
-                    "monthly_export": None,
-                    "yoy_import_export": convert_value(yoy_value),
-                    "yoy_import": Decimal(0),
-                    "yoy_export": Decimal(0)
+                    "monthly_total": monthly_total,
+                    "monthly_export": monthly_export,
+                    "monthly_import": monthly_import,
+                    "yoy_import_export": yoy_total,  # 进出口同比
+                    "yoy_export": yoy_export,
+                    "yoy_import": yoy_import
                 })
-
             except Exception as e:
-                log.error(f"处理 {city} 数据出错: {str(e)}")
+                log.error(f"处理 {city} 数据出错: {e}")
+                # 尝试部分提取(回退方案)
+                try:
+                    monthly_total = Decimal(str(df.iloc[total_row_idx, 1])) * Decimal('10000')
+                    results.append({
+                        "city_name": city,
+                        "monthly_total": monthly_total,
+                        "monthly_export": None,
+                        "monthly_import": None,
+                        "yoy_import_export": Decimal('0'),
+                        "yoy_export": Decimal('0'),
+                        "yoy_import": Decimal('0')
+                    })
+                except:
+                    log.error(f"连基础进出口总值都无法提取: {sheet_name}")
 
         return pd.DataFrame(results)
 
@@ -202,67 +241,100 @@ def process_shenzhen_customs(file_path, year, month):
 def process_shantou_customs(file_path, year, month):
     """处理汕头海关数据 (逻辑同广州海关)"""
     log.info(f"处理汕头海关文件: {file_path.name}")
-    return process_guangzhou_customs(file_path, year, month)
+    return process_guangzhou_customs(file_path, year, month,customs_type='shantou')
 
 
 def process_huangpu_customs(file_path, year, month):
     """处理黄埔海关数据"""
     try:
-        wb = load_workbook(file_path, data_only=True)
-        sheet = wb.active
+        df = pd.read_excel(file_path, sheet_name=0, header=None)
+        log.info(f"处理黄埔海关文件: {file_path.name}")
 
         # 查找合计行
         total_row_idx = None
-        for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
-            if row and "合计" in str(row[0]):
-                total_row_idx = i
+        for idx in range(len(df)):
+            if "合计" in str(df.iloc[idx, 0]):
+                total_row_idx = idx
                 break
 
         if total_row_idx is None:
             log.error("未找到合计行")
             return pd.DataFrame()
 
-        # 查找包含月份的表头
-        month_str = f"{year}年{month}月"
-        header_row = None
-        data_cols = []
+        # 查找包含月份的表头,匹配23年1月-23年多种格式
 
-        for i in range(1, 4):  # 检查前3行
-            row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]]
-            if any(month_str in val and "人民币" in val for val in row_values):
+        if year == 2024 and month == 12:
+            month_str = '45627'
+        elif year == 2023 and month == 12:
+            month_str = '45261'
+        elif year == 2023 and month == 3:
+            month_str = f"{year}年{month:02d}月-{year}年{month:02d}月"
+        else:
+            month_str = f'{year}-{month:02d}-01 00:00:00'
+
+        header_row = None
+        for i in range(min(3, len(df))):
+            row_cells = [str(cell).strip() for cell in df.iloc[i]]
+            if any(month_str in cell  in cell for cell in row_cells):
                 header_row = i
-                for col_idx, val in enumerate(row_values):
-                    if val and month_str in val and "人民币" in val:
-                        data_cols.append(col_idx)
                 break
 
-        if len(data_cols) < 6:
+        if header_row is None:
+            log.error(f"未找到 {month_str} 人民币表头")
+            return pd.DataFrame()
+
+        # 确定数据列位置
+        data_cols = []
+        for col in range(len(df.columns)):
+            cell_val = str(df.iloc[header_row, col])
+            if month_str in cell_val :
+                data_cols.append(col)
+
+        if len(data_cols) < 3:
             log.error(f"未找到足够的 {month_str} 人民币数据列")
             return pd.DataFrame()
 
-        # 获取合计行数据
-        row_values = [cell.value for cell in sheet[total_row_idx]]
-
-        # 转换数据类型
-        def convert_value(val):
-            if isinstance(val, (int, float)):
-                return Decimal(str(val))
-            elif isinstance(val, str) and val.replace(".", "").isdigit():
-                return Decimal(val)
-            return Decimal(0)
-
-        # 提取数据
-        results = [{
-            "city_name": "东莞市",
-            "monthly_total": convert_value(row_values[data_cols[0]]),  # 进出口
-            "monthly_export": convert_value(row_values[data_cols[1]]),  # 出口
-            "monthly_import": convert_value(row_values[data_cols[2]]),  # 进口
-            "yoy_import_export": convert_value(row_values[data_cols[3]]),  # 进出口同比
-            "yoy_export": convert_value(row_values[data_cols[4]]),  # 出口同比
-            "yoy_import": convert_value(row_values[data_cols[5]])  # 进口同比
-        }]
+        try:
+            result = []
+            # 提取数据
+            row = df.iloc[total_row_idx]
+            monthly_total = Decimal(str(row[data_cols[0]]))  # 进出口
+            monthly_export = Decimal(str(row[data_cols[1]]))  # 出口
+            monthly_import = Decimal(str(row[data_cols[2]]))  # 进口
+            yoy_import_export = str(row[data_cols[0]+1])  # 进出口同比
+            yoy_export = str(row[data_cols[1]+1])  # 出口同比
+            yoy_import = str(row[data_cols[2]+1])  # 进口同比
+
+            result.append({
+                "crossborder_year_month": f'{year}-{month:02d}',
+                "city_name": "东莞市",
+                "monthly_total": monthly_total,
+                "monthly_import": monthly_import,
+                "monthly_export": monthly_export,
+                "yoy_import_export": yoy_import_export,
+                "yoy_import": yoy_import,
+                "yoy_export": yoy_export
+            })
+            #东莞市一月数据比较特殊
+            if month == 2:
+                monthly_total_sum = Decimal(str(row[data_cols[0]+4]))  # 进出口
+                monthly_export_sum = Decimal(str(row[data_cols[1]+4]))  # 出口
+                monthly_import_sum = Decimal(str(row[data_cols[2]+4]))  # 进口
+                january_monthly_total = monthly_total_sum - monthly_total
+                january_monthly_export = monthly_export_sum - monthly_export
+                january_monthly_import = monthly_import_sum - monthly_import
+                result.append({
+                    "crossborder_year_month": f'{year}-01',
+                    "city_name": "东莞市",
+                    "monthly_total": january_monthly_total,
+                    "monthly_import": january_monthly_export,
+                    "monthly_export": january_monthly_import,
+                })
 
-        return pd.DataFrame(results)
+            return pd.DataFrame(result)
+        except Exception as e:
+            log.error(f"提取数据出错: {e}")
+            return pd.DataFrame()
 
     except Exception as e:
         log.error(f"处理黄埔海关文件出错: {str(e)}")
@@ -272,18 +344,18 @@ def process_huangpu_customs(file_path, year, month):
 def process_jiangmen_customs(file_path, year, month):
     """处理江门海关数据"""
     try:
-        wb = load_workbook(file_path, data_only=True)
-        sheet = wb.active
+        df = pd.read_excel(file_path, sheet_name=0, header=None)
+        log.info(f"处理江门海关文件: {file_path.name}")
 
         # 从文件名确定城市
         city_name = "江门市" if "江门市" in file_path.name else "阳江市"
-        target_row_name = "江门市进出口商品" if city_name == "江门市" else "阳江市进出口商品总值"
+        target_row_name = "江门市进出口商品总值" if city_name == "江门市" else "阳江市进出口商品总值"
 
         # 查找目标行
         target_row_idx = None
-        for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
-            if row and target_row_name in str(row[0]):
-                target_row_idx = i
+        for idx in range(len(df)):
+            if target_row_name in str(df.iloc[idx, 0]):
+                target_row_idx = idx
                 break
 
         if target_row_idx is None:
@@ -291,43 +363,48 @@ def process_jiangmen_customs(file_path, year, month):
             return pd.DataFrame()
 
         # 查找包含月份的表头
-        month_str = f"{year}年{month}月"
+        if month == 2:
+           month_str = f"1-{month}月"
+        else:
+           month_str = f"{month}月"
         header_row = None
-        data_cols = []
-
-        for i in range(1, 4):  # 检查前3行
-            row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]]
-            if any(month_str in val for val in row_values):
+        for i in range(min(6, len(df))):
+            if any(month_str == str(cell).strip() for cell in df.iloc[i]):
                 header_row = i
-                for col_idx, val in enumerate(row_values):
-                    if val and month_str in val:
-                        data_cols.append(col_idx)
                 break
 
-        if len(data_cols) < 6:
-            log.error(f"未找到足够的 {month_str} 数据列")
+        if header_row is None:
+            log.error(f"未找到 {month_str} 表头")
             return pd.DataFrame()
 
-        # 获取目标行数据
-        row_values = [cell.value for cell in sheet[target_row_idx]]
+        # 确定数据列位置
+        data_cols = []
+        for col in range(len(df.columns)):
+            cell_val = str(df.iloc[header_row, col])
+            if cell_val.strip() == month_str:
+                data_cols.append(col)
 
-        # 转换数据类型
-        def convert_value(val):
-            if isinstance(val, (int, float)):
-                return Decimal(str(val))
-            elif isinstance(val, str) and val.replace(".", "").isdigit():
-                return Decimal(val)
-            return Decimal(0)
+        if len(data_cols) < 3:
+            log.error(f"未找到足够的 {month_str} 数据列")
+            return pd.DataFrame()
 
         # 提取数据 (亿元转换为万元)
+        row = df.iloc[target_row_idx]
+        monthly_total = convert_unit(str(row[data_cols[0]]))
+        monthly_export = convert_unit(str(row[data_cols[1]]))
+        monthly_import = convert_unit(str(row[data_cols[2]]))
+        yoy_import_export = str(row[data_cols[0]+1])
+        yoy_export = str(row[data_cols[1]+1])
+        yoy_import = str(row[data_cols[2]+1])
+
         return pd.DataFrame([{
             "city_name": city_name,
-            "monthly_total": convert_value(row_values[data_cols[0]]) * Decimal('10000'),  # 进出口
-            "monthly_export": convert_value(row_values[data_cols[1]]) * Decimal('10000'),  # 出口
-            "monthly_import": convert_value(row_values[data_cols[2]]) * Decimal('10000'),  # 进口
-            "yoy_import_export": convert_value(row_values[data_cols[3]]),  # 进出口同比
-            "yoy_export": convert_value(row_values[data_cols[4]]),  # 出口同比
-            "yoy_import": convert_value(row_values[data_cols[5]])  # 进口同比
+            "monthly_total": monthly_total,
+            "monthly_import": monthly_import,
+            "monthly_export": monthly_export,
+            "yoy_import_export": yoy_import_export,
+            "yoy_import": yoy_import,
+            "yoy_export": yoy_export
         }])
 
     except Exception as e:
@@ -336,61 +413,94 @@ def process_jiangmen_customs(file_path, year, month):
 
 
 def process_zhanjiang_customs(file_path, year, month):
-    """处理湛江海关数据"""
+    """处理湛江海关数据 满足「是第一次调用」或者「month == 12」任意一个条件"""
+    global _zhanjiang_first_month
+
+    # 判断是否应执行核心逻辑
+    if _zhanjiang_first_month is None:
+        # 第一次调用,记录初始月份
+        _zhanjiang_first_month = month
+        should_execute = True
+    else:
+        # 后续调用仅在以下情况下执行:
+        # - 与初次调用的 month 相同(允许多城市同时处理)
+        # - 或者 month == 12
+        should_execute = (month == _zhanjiang_first_month) or (month == 12)
+
+    if not should_execute:
+        log.warning(f"跳过湛江海关{year}年{month}文件: {file_path.name}")
+        return pd.DataFrame()
     try:
-        wb = load_workbook(file_path, data_only=True)
-        sheet = wb.worksheets[0]
-
+        df = pd.read_excel(file_path, sheet_name=0, header=None)
+        log.info(f"处理湛江海关文件: {file_path.name}")
         # 从文件名确定城市
         city_name = "湛江市" if "湛江市" in file_path.name else "茂名市"
-
         # 查找月度数据表格
-        table_start_row = None
-        month_str = f"{year}年{month}月"
-        for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
-            if row and any(month_str in str(cell) for cell in row if cell):
-                table_start_row = i
-                break
+        month_str = f"{year}年前{month}个月{city_name}进出口数据(月度)"
+
+        # target_header_row = None
+        #
+        # # 查找表头行
+        # for i in range(min(3, len(df))):  # 在前5行找表头
+        #     if any(month_str in str(cell) for cell in df.iloc[i]):
+        #         target_header_row = i
+        #         break
+        #
+        # if target_header_row is None:
+        #     log.error(f"未找到 {month_str} 表头")
+        #     return pd.DataFrame()
+
+        target_header_row =1
 
-        if table_start_row is None:
-            log.error(f"未找到 {month_str} 月度数据表")
+        # 确定数据列位置
+        data_cols = {}
+        for col in range(len(df.columns)):
+            cell_val = str(df.iloc[target_header_row+1, col])
+            data_cols["year_month"] = 0
+            if "进出口" in cell_val :
+                data_cols["total"] = col
+            elif "出口" in cell_val :
+                data_cols["export"] = col
+            elif "进口" in cell_val :
+                data_cols["import"] = col
+
+        if len(data_cols) < 1:
+            log.error(f"未找到足够的 {month_str} 数据列")
             return pd.DataFrame()
 
-        # 查找目标行(城市名所在行)
-        target_row_idx = None
-        for i in range(table_start_row, table_start_row + 20):  # 在后续行中查找
-            row_val = sheet.cell(row=i, column=1).value
-            if row_val and city_name in str(row_val):
-                target_row_idx = i
-                break
+        start_row = target_header_row + 4
+        end_row = start_row + month
 
-        if target_row_idx is None:
-            log.error(f"未找到 {city_name} 数据行")
-            return pd.DataFrame()
-
-        # 提取数据
+        # 提取多行数据
+        rows = df.iloc[start_row:end_row]
         results = []
-        for col in [2, 3, 4, 5, 6, 7]:  # 依次为进出口、出口、进口、进出口同比、出口同比、进口同比
-            cell_value = sheet.cell(row=target_row_idx, column=col).value
-            results.append(cell_value)
-
-        # 转换数据类型
-        def convert_value(val):
-            if isinstance(val, (int, float)):
-                return Decimal(str(val))
-            elif isinstance(val, str) and val.replace(".", "").isdigit():
-                return Decimal(val)
-            return Decimal(0)
 
-        return pd.DataFrame([{
-            "city_name": city_name,
-            "monthly_total": convert_value(results[0]),
-            "monthly_export": convert_value(results[1]),
-            "monthly_import": convert_value(results[2]),
-            "yoy_import_export": convert_value(results[3]),
-            "yoy_export": convert_value(results[4]),
-            "yoy_import": convert_value(results[5])
-        }])
+        for _, row in rows.iterrows():
+            try:
+                year_month = str(row[data_cols["year_month"]])
+                formatted_year_month = f"{year_month[:4]}-{year_month[4:]}"
+                monthly_total = Decimal(str(row[data_cols["total"]]))  # 进出口
+                monthly_export = Decimal(str(row[data_cols["export"]]))  # 出口
+                monthly_import = Decimal(str(row[data_cols["import"]]))  # 进口
+                yoy_import_export = Decimal(str(row[data_cols["total"] + 1]))  # 进出口同比
+                yoy_export = Decimal(str(row[data_cols["export"] + 1]))  # 出口同比
+                yoy_import = Decimal(str(row[data_cols["import"] + 1]))  # 进口同比
+
+                results.append({
+                    "crossborder_year_month":formatted_year_month,
+                    "city_name": city_name,
+                    "monthly_total": monthly_total,
+                    "monthly_import": monthly_import,
+                    "monthly_export": monthly_export,
+                    "yoy_import_export": yoy_import_export,
+                    "yoy_import": yoy_import,
+                    "yoy_export": yoy_export
+                })
+            except Exception as e:
+                log.error(f"解析某一行数据出错: {e}")
+                continue  # 单行错误不影响整体处理
+
+        return pd.DataFrame(results)
 
     except Exception as e:
         log.error(f"处理湛江海关文件出错: {str(e)}")
@@ -468,15 +578,18 @@ def parse_excel(current_dir):
                     # 创建1月份数据 (取2月份数据的一半)
                     df_half = df_full.copy()
                     for col in ['monthly_total', 'monthly_import', 'monthly_export']:
-                        df_half[col] = df_half[col] / 2
+                        # 注意:只有数值列才进行减半操作,避免对字符串操作
+                        if col in df_half.columns:
+                            df_half[col] = df_half[col] / 2
 
-                    # 设置1月份数据
-                    df_half['month'] = 1
+                    # 设置1月份
+                    df_half['crossborder_year_month'] = f'{year}-01'
 
                     # 设置2月份数据 (取2月份数据的一半)
-                    df_full['month'] = 2
+                    df_full['crossborder_year_month'] = f'{year}-02'
                     for col in ['monthly_total', 'monthly_import', 'monthly_export']:
-                        df_full[col] = df_full[col] / 2
+                        if col in df_full.columns:
+                            df_full[col] = df_full[col] / 2
 
                     # 合并数据
                     df_customs = pd.concat([df_half, df_full])
@@ -497,37 +610,39 @@ def parse_excel(current_dir):
         # 添加公共字段
         all_results['prov_code'] = PROV_CODE
         all_results['prov_name'] = PROV_NAME
-        all_results['year'] = year
+        all_results['crossborder_year'] = year
+        all_results['city_code'] = all_results['city_name'].astype(str).map(GUANGDONG_CITY).fillna('0000')
         all_results['month'] = all_results.get('month', month)
-        all_results['crossborder_year_month'] = all_results['year'].astype(str) + '-' + all_results['month'].astype(
-            str).str.zfill(2)
-
-        # 添加城市编码
-        def get_city_code(row):
-            return GUANGDONG_CITY.get(row['city_name'], '0000')
-
-        all_results['city_code'] = all_results.apply(get_city_code, axis=1)
+        if 'crossborder_year_month' in all_results.columns:
+            all_results['crossborder_year_month'] = (
+                all_results['crossborder_year_month']
+                .replace('', pd.NA)
+                .fillna(f'{year}-{month:02d}')
+            )
+        else:
+            all_results['crossborder_year_month'] = f'{year}-{month:02d}'
 
         # 排序并删除重复项
-        all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month'])
-        all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last')
+        # all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month'])
+        # all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last')
 
         # 打印处理结果
-        log.info(f"处理完成,共获得 {len(all_results)} 条数据")
+        log.info(f"处理完成,共获得广东省 {len(all_results)} 条地级市数据")
 
         # 选择入库字段
         final_df = all_results[[
             'crossborder_year_month', 'prov_code', 'prov_name',
-            'city_code', 'city_name', 'monthly_total',
-            'monthly_import', 'monthly_export', 'yoy_import_export',
-            'yoy_import', 'yoy_export'
+            'crossborder_year','city_code', 'city_name',
+            'monthly_total','monthly_import', 'monthly_export',
+            'yoy_import_export','yoy_import', 'yoy_export'
         ]].copy()
 
+        final_df = final_df.where(pd.notna(final_df), None)
+
         # 打印前几条数据
-        log.info(f"处理后数据示例:\n{final_df.head()}")
+        # log.debug(f"处理后数据示例:\n{final_df.head()}")
 
         # 这里调用DBHelper入库(实际使用时请取消注释)
-        """
         from db_helper import DBHelper
         db = DBHelper()
         db.bulk_insert(
@@ -537,7 +652,6 @@ def parse_excel(current_dir):
             update_columns=['monthly_total', 'monthly_import', 'monthly_export',
                             'yoy_import_export', 'yoy_import', 'yoy_export']
         )
-        """
 
         log.info(f"{current_dir}数据已全部成功处理")
 
@@ -545,12 +659,20 @@ def parse_excel(current_dir):
         log.error(f"处理失败:{current_dir},错误:{str(e)}")
         raise
 
-
-# 遍历目录的函数(原样保留)
+def convert_unit(value):
+    """亿元转万元,处理空值"""
+    try:
+        # 如果 value 不是特殊的无效值,进行转换并保留4位小数
+        return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None
+    except (InvalidOperation, ValueError):
+        # 捕获异常,返回 None
+        return None
 
 
 
 # 测试入口
 if __name__ == "__main__":
 
-    traverse_and_process(download_dir, parse_excel, province_name="guangdong")
+    traverse_and_process(download_dir, parse_excel, province_name="guangdong")
+    db_helper = DBHelper()
+    db_helper.update_prov_yoy("广东省")

+ 7 - 6
guangdong/selenium_guangdong_city.py

@@ -297,12 +297,13 @@ def main():
            pass
 
     driver.quit()
-    # log.info("【广东省】数据抓取结束".center(66, "*"))
-    # log.info("\n广东省数据清洗入库中...")
-    # traverse_and_process(download_dir, parse_excel, province_name="guangdong")
-    # log.info("\n山东省地级市数据同比更新中...")
-    # db_helper = DBHelper()
-    # db_helper.update_prov_yoy("广东省")
+    log.info("【广东省】数据抓取结束".center(66, "*"))
+    log.info("\n广东省数据清洗入库中...")
+    traverse_and_process(download_dir, parse_excel, province_name="guangdong")
+    log.info("\n广东省地级市数据同比更新中...")
+    db_helper = DBHelper()
+    db_helper.update_prov_yoy("广东省")
+    log.info("\n广东省地级市数据同比更新结束")
 
 if __name__ == "__main__":
     main()

+ 2 - 4
utils/constants.py

@@ -319,15 +319,13 @@ GUANGDONG_CITY = {
 GUANGDONG_CUSTOMS_URL = {
     "广州海关": "http://guangzhou.customs.gov.cn/guangzhou_customs/381558/fdzdgknr33/381638/381572/381573/index.html",
     "深圳海关": "http://shenzhen.customs.gov.cn/shenzhen_customs/zfxxgk15/2966748/hgtj40/index.html",
-    # "拱北海关": "http://gongbei.customs.gov.cn/gongbei_customs/374280/fdzdgknr19/374301/index.html",
+    "拱北海关": "http://gongbei.customs.gov.cn/gongbei_customs/374280/fdzdgknr19/374301/index.html",
     "汕头海关": "http://shantou.customs.gov.cn/shantou_customs/zfxxgk39/3008252/3008606/596222/index.html",
     "黄埔海关": "http://huangpu.customs.gov.cn/huangpu_customs/zfxxgk35/2969690/2969697/tjsj/index.html",
     "江门海关": "http://jiangmen.customs.gov.cn/jiangmen_customs/536578/fdzdgknr7/536580/index.html",
     "湛江海关": "http://zhanjiang.customs.gov.cn/zhanjiang_customs/534855/zfxxgkzn24/534857/index.html"
 }
-# "中山市2025年1-4月对外贸易进出口统计表"
-# "珠海市2025年1-4月对外贸易进出口统计表"
-# "5市报表2025年1-4月(人民币)"
+