瀏覽代碼

数据清洗逻辑兼容,全年和部分2种模式

01495251 1 月之前
父節點
當前提交
811dcb0f2c

+ 1 - 1
crossborder/fujian/selenium_fujian_download.py

@@ -195,7 +195,7 @@ def main():
         if 'driver' in locals():
             driver.quit()
         log.info("\n数据清洗入库中...")
-        traverse_and_process(download_dir, parse_excel, province_name="fujian")
+        traverse_and_process(download_dir, parse_excel, province_name="fujian", year=args.year)
         log.info("\n福建省地级市数据同比更新中...")
         db_helper = DBHelper()
         db_helper.update_prov_yoy("福建省")

+ 1 - 1
crossborder/guangdong/selenium_guangdong_city.py

@@ -296,7 +296,7 @@ def main():
     driver.quit()
     log.info("【广东省】数据抓取结束".center(66, "*"))
     log.info("\n广东省数据清洗入库中...")
-    traverse_and_process(download_dir, parse_excel, province_name="guangdong")
+    traverse_and_process(download_dir, parse_excel, province_name="guangdong", year=args.year)
     log.info("\n广东省地级市数据同比更新中...")
     db_helper = DBHelper()
     db_helper.update_prov_yoy("广东省")

+ 1 - 1
crossborder/henan/selenium_henan_download.py

@@ -227,7 +227,7 @@ def main():
     finally:
         driver.quit()
         log.info("\n数据清洗入库中...")
-        traverse_and_process(download_dir, parse_excel, province_name="henan")
+        traverse_and_process(download_dir, parse_excel, province_name="henan", year=args.year)
 
 
 if __name__ == "__main__":

+ 2 - 2
crossborder/quanguo/data_cleaning_to_db.py

@@ -147,5 +147,5 @@ def process_all_region_tables():
 
 
 
-# if __name__ == "__main__":
-#     perform_data_cleanup_and_import(2025)
+if __name__ == "__main__":
+    perform_data_cleanup_and_import(2025)

+ 1 - 1
crossborder/quanguo/selenium_download.py

@@ -158,7 +158,7 @@ def handle_month_data(driver, table_name, month_links, year, latest_only):
 
 if __name__ == "__main__":
     parser = argparse.ArgumentParser(description="抓取海关总署年度数据")
-    parser.add_argument("--year", type=int, help="起始年份,例如:--year 2023 表示抓取 2023 和 2024")
+    parser.add_argument("--year", type=int, help="起始年份,例如:--year 2023")
     args = parser.parse_args()
 
     current_year = datetime.now().year

+ 1 - 1
crossborder/shandong/selenium_shandong_download.py

@@ -192,7 +192,7 @@ def main():
         driver.quit()
         log.info("【山东海关】数据抓取结束".center(66, "*"))
         log.info("\n山东省数据清洗入库中...")
-        traverse_and_process(download_dir, parse_excel, province_name="shandong")
+        traverse_and_process(download_dir, parse_excel, province_name="shandong",  year=args.year)
         log.info("\n山东省地级市数据同比更新中...")
         db_helper = DBHelper()
         db_helper.update_prov_yoy("山东省")

+ 2 - 2
crossborder/utils/log.py

@@ -9,8 +9,8 @@ import colorlog
 log = logging.getLogger(__name__)
 log.setLevel(logging.INFO)
 
-project_root = Path(os.getcwd()).parent.parent
-# project_root = Path(os.path.abspath(os.path.dirname(__file__))).parent.parent
+# project_root = Path(os.getcwd()).parent.parent
+project_root = Path(os.path.abspath(os.path.dirname(__file__))).parent.parent
 
 if sys.platform.startswith('linux'):
     # Linux环境指定为/home目录

+ 115 - 31
crossborder/utils/parse_utils.py

@@ -1,4 +1,5 @@
 import re
+from datetime import datetime
 from decimal import Decimal, InvalidOperation
 from pathlib import Path
 
@@ -130,43 +131,126 @@ def get_previous_month_dir(current_path):
 
 
 #数据清洗逻
-def traverse_and_process(root_path, process_func, province_name="henan"):
+def traverse_and_process(root_path, process_func, province_name="henan", year=None):
     """
-    通用分层遍历函数,支持不同省份的 parse_excel 入口
+    通用分层遍历函数,支持不同年份范围的处理
 
     Args:
         root_path (str): 根目录路径(如 downloads)
         process_func (function): 每个省份自己的 parse_excel 函数
         province_name (str): 省份名称,如 "henan", "shandong", "fujian"
+        year (int, optional): 指定截止年份(包含该年份及之后的所有年份)
     """
-    log.info(f"开始遍历 {province_name} 目录:{root_path}")
     root = Path(root_path)
+    current_year = datetime.now().year
+
+    # 检查根目录是否存在
+    if not root.exists() or not root.is_dir():
+        log.error(f"根目录不存在或不是目录: {root}")
+        return
+
+    log.info(f"开始遍历 {province_name} 目录:{root_path}")
 
-    # 获取年份目录(格式如 download/2025)
-    year_dirs = [
-        item for item in root.iterdir()
-        if item.is_dir() and YEAR_PATTERN.match(item.name)
-    ]
-
-    # 倒序年份
-    for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-
-        if not year_dir.exists() or not year_dir.is_dir():
-            log.info(f"未找到 {province_name} 目录,跳过:{year_dir}")
-            continue
-
-        # 获取月份目录
-        month_dirs = []
-        for item in year_dir.iterdir():
-            if item.is_dir() and MONTH_PATTERN.match(item.name):
-                month_dirs.append({
-                    "path": item,
-                    "month": int(item.name)
-                })
-
-        # 倒序处理月份
-        if month_dirs:
-            log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
-            for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                log.info(f"   月份:{md['month']:02d} | 路径:{md['path']}")
-                process_func(md['path'])  # 调用传入的处理函数
+    # 获取所有年份目录
+    year_dirs = []
+    for item in root.iterdir():
+        if item.is_dir() and YEAR_PATTERN.match(item.name):
+            try:
+                year_int = int(item.name)
+                year_dirs.append({"year": year_int, "path": item})
+            except ValueError:
+                continue
+
+    if not year_dirs:
+        log.warning(f"未找到任何年份目录,跳过处理: {root}")
+        return
+
+    # 按年份倒序排序
+    year_dirs.sort(key=lambda x: x["year"], reverse=True)
+
+    # 模式1: year=None,只处理最新年份的最新月份
+    if year is None:
+        log.info(f"模式:只处理最新年份的最新月份")
+
+        # 取最新年份目录
+        latest_year_dir = year_dirs[0]["path"]
+        log.info(f"最新年份:{latest_year_dir.name}")
+
+        # 处理该年份的最新月份
+        process_latest_month(latest_year_dir, process_func, province_name)
+        return
+
+    # 模式2: year!=None,处理从当前年到指定年份的所有年份的所有月份
+    if year > current_year:
+        log.warning(f"警告:指定年份 {year} 大于当前年份 {current_year}, 将仅处理当前年")
+        year = current_year
+
+    log.info(f"模式:处理从当前年({current_year})到指定年({year})的所有月份(倒序)")
+
+    # 筛选年份范围:从当前年到指定年份
+    selected_years = [yd for yd in year_dirs if year <= yd["year"] <= current_year]
+
+    if not selected_years:
+        log.warning(f"没有找到在范围 [{year}-{current_year}] 内的年份目录")
+        return
+
+    # 按年份倒序处理所有月份
+    for yd in selected_years:
+        process_all_months(yd["path"], process_func, province_name)
+
+
+def process_latest_month(year_dir, process_func, province_name):
+    """处理单个年份目录的最新月份"""
+    log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
+
+    # 获取所有月份目录
+    month_dirs = []
+    for item in year_dir.iterdir():
+        if item.is_dir() and MONTH_PATTERN.match(item.name):
+            try:
+                month = int(item.name)
+                month_dirs.append({"month": month, "path": item})
+            except ValueError:
+                continue
+
+    if not month_dirs:
+        log.info(f"   {year_dir} 下没有月份目录,跳过")
+        return
+
+    # 按月倒序排序
+    month_dirs.sort(key=lambda x: x["month"], reverse=True)
+
+    # 取最新月份
+    latest_month = month_dirs[0]
+    month_name = f"{latest_month['month']:02d}"
+
+    log.info(f"  处理最新月份:{month_name} | 路径:{latest_month['path']}")
+    process_func(latest_month["path"])
+
+
+def process_all_months(year_dir, process_func, province_name):
+    """处理单个年份目录的所有月份(倒序)"""
+    log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
+
+    # 获取所有月份目录
+    month_dirs = []
+    for item in year_dir.iterdir():
+        if item.is_dir() and MONTH_PATTERN.match(item.name):
+            try:
+                month = int(item.name)
+                month_dirs.append({"month": month, "path": item})
+            except ValueError:
+                continue
+
+    if not month_dirs:
+        log.info(f"   {year_dir} 下没有月份目录,跳过")
+        return
+
+    # 按月倒序排序
+    month_dirs.sort(key=lambda x: x["month"], reverse=True)
+
+    # 处理所有月份
+    for md in month_dirs:
+        month_name = f"{md['month']:02d}"
+        log.info(f"  月份:{month_name} | 路径:{md['path']}")
+        process_func(md['path'])