فهرست منبع

1.广东省下7个分属海关独立支持月维度数据抓取(最新发布月不一致时)
2.海关总署增量抓取增加已存在数据跳过逻辑

01495251 1 ماه پیش
والد
کامیت
d894211e6b

+ 136 - 50
crossborder/guangdong/selenium_guangdong_city.py

@@ -14,7 +14,7 @@ from crossborder.guangdong.guangdong_gongbei_parse_excel import parse_page_regio
 from crossborder.utils.db_helper import DBHelper
 
 from crossborder.guangdong.guangdong_sub_customs_parse_excel import parse_excel
-from crossborder.utils.constants import DOWNLOAD_DIR
+from crossborder.utils.constants import DOWNLOAD_DIR, CUSTOMS_CITY_MAPPING
 from crossborder.utils.constants import GUANGDONG_CUSTOMS_URL
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel, download_excel2, \
@@ -262,63 +262,149 @@ def random_sleep(base=2, variance=5):
 #     log.info("\n广东省所有海关数据采集完成。")
 
 def main():
-    """主入口(优化参数处理逻辑)"""
-    global target_months
+    """主入口(广东分海关优化版)"""
     parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
     parser.add_argument('--year', type=int, default=None,
                         help='终止年份(如2023),未指定时抓取最新两个月')
     args = parser.parse_args()
     start_time = time.time()
-    driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
-    for customs_name in GUANGDONG_CUSTOMS_URL.keys():
-        try:
-            driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
-            log.info(f"【{customs_name}】数据采集开始……")
-            valid_year, valid_month = detect_latest_month(driver, customs_name)
-            log.info(f"【{customs_name}】检测到最新有效数据:{valid_year}-{valid_month:02d}")
 
-            if customs_name in ['汕头海关', '拱北海关']:
-                skip_january = False
-            else:
-                skip_january = True
+    # 状态跟踪变量
+    total_months_count = 0  # 总采集月份数
+    customs_collected = []  # 成功采集的海关名单
+    data_collected = False  # 是否有数据采集
+    all_customs_processed = []  # 已处理海关列表
+
+    log.info("【广东省】分海关数据采集开始".center(66, "*"))
+
+    driver = None
+
+    try:
+        # 1. 初始化浏览器
+        driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+        log.info("浏览器初始化完成")
+
+        # 2. 遍历各海关
+        for customs_name in GUANGDONG_CUSTOMS_URL.keys():
+            try:
+                log.info(f"\n{'=' * 66}\n【{customs_name}】数据采集开始".center(66, "="))
+
+                # 进入海关页面
+                driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
+
+                # 检测最新有效月份
+                valid_year, valid_month = detect_latest_month(driver, customs_name)
+                log.info(f"【{customs_name}】检测到最新有效数据:{valid_year}-{valid_month:02d}")
+
+                # 设置是否跳过1月数据的标志
+                skip_january = customs_name not in ['汕头海关', '拱北海关']
+
+                # 3. 生成目标月份序列
+                if args.year:
+                    # 指定年份时:从最新月到目标年1月
+                    target_months = generate_month_sequence(
+                        start_year=valid_year,
+                        start_month=valid_month,
+                        end_year=args.year,
+                        skip_january=skip_january
+                    )
+                else:
+                    # 未指定年份时:检查最新月份是否已存在
+                    db = DBHelper()
+                    count = db.get_code_exist(
+                        f'{valid_year}-{valid_month:02d}',
+                        "440000",
+                        is_city=True,
+                        customs_name=customs_name
+                    )
+                    if count > 0:
+                        log.warning(f"⏩ 跳过【{customs_name}】- 数据库已存在{CUSTOMS_CITY_MAPPING[customs_name]} {valid_year}-{valid_month:02d} 数据")
+                        continue
+
+                    # 未指定年份时:取最近两个月
+                    target_months = generate_month_sequence(
+                        start_year=valid_year,
+                        start_month=valid_month
+                    )
+
+                # 记录目标月份
+                total_months_count += len(target_months)
+                data_collected = True
+
+                log.info(f"【{customs_name}】目标采集月份:{len(target_months)}个月份")
+
+                # 4. 执行数据采集
+                if target_months:  # 确保有月份需要采集
+                    reverse_crawler(driver, target_months, customs_name)
+                    customs_collected.append(customs_name)
+                    log.info(f"【{customs_name}】{len(target_months)}个月份采集完成")
+
+                    # 拱北海关特殊处理
+                    if customs_name == '拱北海关':
+                        for year, month in target_months:
+                            log.info(f"🔢 【拱北海关】计算 {year}-{month:02d} 单月数据...")
+                            calculate_monthly_data(year, month)
+
+                # 添加分隔线
+                log.info(f"【{customs_name}】处理完成".center(66, "="))
 
-            if args.year:
-                target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january)
+            except Exception as e:
+                # 捕获单个海关采集异常
+                log.exception(f"⚠️ 【{customs_name}】采集过程中发生错误: {str(e)}")
+                send_dingtalk_message(f"【{customs_name}】海关采集异常: {str(e)}")
+
+            finally:
+                # 记录已处理海关
+                all_customs_processed.append(customs_name)
+
+        # 5. 所有海关处理完成后
+        if data_collected:
+            log.info(f"\n{'=' * 66}\n【广东省】所有海关处理完成,开始数据清洗入库")
+            log.info("数据清洗入库中...")
+            traverse_and_process(download_dir, parse_excel, province_name="guangdong", year=args.year)
+
+            log.info("广东省地级市数据同比更新中...")
+            db_helper = DBHelper()
+            db_helper.update_prov_yoy("广东省")
+            log.info("地级市数据同比更新完成")
+
+            # 计算总耗时
+            duration = time.time() - start_time
+            minutes, seconds = divmod(duration, 60)
+
+            # 准备通知信息
+            if customs_collected:
+                customs_str = "、".join(customs_collected)
+                month_info = f"{total_months_count}个月份"
             else:
-                db = DBHelper()
-                count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', "440000", is_city=True)
-                if count > 0:
-                    log.error(f"数据库已存在【广东省】 {valid_year}-{valid_month:02d}地市数据,本次抓取终止")
-                    return
-                target_months = generate_month_sequence(valid_year, valid_month)
-
-            log.info(f"【{customs_name}】目标采集月份序列:{target_months}")
-            reverse_crawler(driver, target_months, customs_name)
-
-            if customs_name == '拱北海关':
-                for year, month in target_months:
-                    log.info(f"【{customs_name}】{year}-{month:02d}单月数据计算中...")
-                    calculate_monthly_data(year, month)
-
-            log.info(f"【{customs_name}】{len(target_months)}个月份数据已采集完毕".center(66, "="))
-        finally:
-           pass
-
-    driver.quit()
-
-    log.info("【广东省】数据抓取结束".center(66, "*"))
-    log.info("\n广东省数据清洗入库中...")
-    traverse_and_process(download_dir, parse_excel, province_name="guangdong", year=args.year)
-    log.info("\n广东省地级市数据同比更新中...")
-
-    db_helper = DBHelper()
-    db_helper.update_prov_yoy("广东省")
-    log.info("\n广东省地级市数据同比更新结束")
-
-    duration = time.time() - start_time
-    minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
-    message = f'【广东省-广州海关、深圳海关、拱北海关、汕头海关、江门海关、黄埔海关、湛江海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
-    send_dingtalk_message(message)
+                customs_str = "无海关数据被采集"
+                month_info = "0个月份"
+
+            message = (
+                f"【广东省海关数据采集完成】\n"
+                f"• 已处理海关: {len(all_customs_processed)}个\n"
+                f"• 成功采集海关: {len(customs_collected)}个\n"
+                f"• 采集海关: {customs_str}\n"
+                f"• 总采集月份: {month_info}\n"
+                f"• 总耗时: {int(minutes)}分{seconds:.1f}秒"
+            )
+            send_dingtalk_message(message)
+        else:
+            log.warning("本次未采集到任何新数据")
+            # send_dingtalk_message("【广东省海关采集】所有海关最新月份数据已存在,未执行采集")
+
+    except Exception as e:
+        # 全局异常捕获
+        log.exception(f"‼️ 广东省海关采集全局错误: {str(e)}")
+        send_dingtalk_message(f"【广东海关采集异常】全局错误: {str(e)}")
+
+    finally:
+        # 确保浏览器安全退出
+        if driver:
+            driver.quit()
+            log.info("浏览器已安全退出")
+
+        log.info("【广东省】分海关数据采集结束".center(66, "*"))
 
 if __name__ == "__main__":
     main()

+ 2 - 1
crossborder/guangdong/selenium_guangdong_download.py

@@ -437,7 +437,8 @@ def main():
 
         # 3. 数据存在性检查(仅在未指定年份时执行)
         if not args.year:
-            count = base_mysql.get_code_exist(f'{valid_year}-{valid_month:02d}', PROV_CODE)
+            db = DBHelper()
+            count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', PROV_CODE)
             if count > 0:
                 log.error(f"数据库已存在【广东省】 {valid_year}-{valid_month:02d} 商品贸易数据,本次抓取终止")
                 return

+ 108 - 26
crossborder/quanguo/selenium_download.py

@@ -1,6 +1,7 @@
 import argparse
 import random
 import re
+import sys
 import time
 from datetime import datetime
 
@@ -13,6 +14,7 @@ from selenium.webdriver.support.ui import WebDriverWait
 
 from crossborder.quanguo.data_cleaning_to_db import perform_data_cleanup_and_import
 from crossborder.utils.constants import DOWNLOAD_DIR
+from crossborder.utils.db_helper import DBHelper
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.download_utils import configure_stealth_options, download_excel
 from crossborder.utils.log import  get_logger
@@ -116,7 +118,7 @@ def go_to_year_page(driver, year):
         return False
 
 
-def crawl_with_selenium(driver, year, latest_only=False):
+def crawl_with_selenium(driver, year, latest_only=False,data_collected=False):
     """主抓取函数"""
     if year < datetime.now().year:
         if not go_to_year_page(driver, year):
@@ -140,8 +142,21 @@ def crawl_with_selenium(driver, year, latest_only=False):
 
             table_title = generate_table_title(year)
 
+            if latest_only:
+                if month_links:
+                    # 只取第一个月份(最新月份)
+                    month_links = [month_links[0]]
+                    new_month = month_links[0][0]
+                    log.info(f"【{table_name}】处理最新月份:{new_month}月")
+                    db = DBHelper()
+                    count = db.get_total_info_exist(f'{year}-{new_month:02d}')
+                    if count > 0:
+                        log.error(f"数据库已存在【海关总署】 {year}-{new_month:02d} 收发件人数据,本次抓取终止")
+                        break
+
+            data_collected = True
             if result and result[0] in table_title:
-                handle_month_data(driver, sanitize_filename(table_name), month_links, year=year, latest_only=latest_only)
+                handle_month_data(driver, sanitize_filename(table_name), month_links, year=year)
 
             driver.execute_script("arguments[0].remove()", row)
             WebDriverWait(driver, 10).until(EC.staleness_of(row))
@@ -159,9 +174,10 @@ def sanitize_filename(filename):
     return re.sub(r'[<>:"/\\|?*\x00-\x1F]', '-', filename)
 
 
-def handle_month_data(driver, table_name, month_links, year, latest_only):
+def handle_month_data(driver, table_name, month_links, year):
     global downloaded_tables
     main_window = driver.current_window_handle
+
     for idx, month_data in enumerate(month_links):
         if 1 <= month_data[0] <= 12:
             # 年度表月度表只下载一次(最新月份数据)
@@ -181,39 +197,105 @@ def handle_month_data(driver, table_name, month_links, year, latest_only):
             except Exception as e:
                log.info(f"【异常】下载失败: {str(e)}")
             time.sleep(random.uniform(0.5, 1.5))  # 下载间隔
+def  main():
 
-
-
-if __name__ == "__main__":
+    global current_year, start_year
     parser = argparse.ArgumentParser(description="抓取海关总署年度数据")
     parser.add_argument("--year", type=int, help="起始年份,例如:--year 2023")
     args = parser.parse_args()
 
     start_time = time.time()
+    years_processed = []  # 记录成功处理的年份
+    data_collected = False  # 是否有数据被采集
+    driver =  None
 
-    current_year = datetime.now().year
-    start_year = args.year if args.year else current_year
-    years_to_crawl = list(range(start_year, current_year + 1))
-    years_to_crawl.reverse()
+    try:
+        # 1. 确定采集年份范围
+        current_year = datetime.now().year
+        start_year = args.year if args.year else current_year
 
+        # 确保年份有效
+        if start_year > current_year:
+            log.error(f"起始年份 {start_year} 不能大于当前年份 {current_year}")
+            sys.exit(1)
 
-    log.info(f"即将抓取 {start_year} - {current_year} 年度数据")
-    options = configure_stealth_options(download_dir)
-    driver = webdriver.Firefox(options=options)
+        # 生成年份序列(从新到旧)
+        years_to_crawl = list(range(start_year, current_year + 1))
+        years_to_crawl.reverse()
 
-    base_url = "http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html"
-    driver.get(base_url)
-    try:
+        log.info(f"【海关总署】开始抓取 {start_year}-{current_year} 年度数据".center(66, "*"))
+
+        # 2. 初始化浏览器
+        options = configure_stealth_options(download_dir)
+        driver = webdriver.Firefox(options=options)
+        log.info("浏览器初始化完成")
+
+        # 3. 访问基础页面
+        driver.get(base_url)
+
+        # 4. 年份遍历采集
         for year in years_to_crawl:
-            log.info(f"\n【{year}年】开始抓取...".center(66, "-"))
-            crawl_with_selenium(driver, year=year, latest_only=args.year is None)
+            try:
+                log.info(f"\n【{year}年】开始处理".center(66, "-"))
+
+                is_latest_only = (not args.year) and (year == current_year)
+
+                # 执行年份采集
+                crawl_with_selenium(driver, year=year, latest_only=is_latest_only,data_collected =  data_collected)
+                years_processed.append(year)
+
+                log.info(f"【{year}年】处理完成".center(66, "-"))
+
+            except Exception as e:
+                log.exception(f"⚠️ {year}年数据采集异常: {str(e)}")
+                send_dingtalk_message(f"【海关总署{year}年采集异常】{str(e)}")
+                continue
+
+    except Exception as e:
+        log.exception(f"‼️ 海关总署采集全局错误: {str(e)}")
+        send_dingtalk_message(f"【海关总署全局异常】{str(e)}")
+
     finally:
-        driver.quit()
-        log.info("【海关总署】全年数据抓取结束".center(66, "*"))
-        log.info("\n数据清洗入库中...")
-        perform_data_cleanup_and_import(current_year)
-        log.info("\n数据清洗入库完毕...")
+        # 5. 保证浏览器退出
+        if 'driver' in locals():
+            driver.quit()
+            log.info("浏览器已退出")
+
+        # 6. 数据清洗入库(仅当有数据被采集时)
+        if data_collected:
+            log.info("\n【海关总署】数据清洗入库开始".center(66, "*"))
+
+            try:
+                log.info("数据清洗入库中...")
+                perform_data_cleanup_and_import(years_processed)
+                log.info("数据清洗入库完毕")
+
+            except Exception as e:
+                log.exception(f"数据清洗入库异常: {str(e)}")
+                send_dingtalk_message(f"【海关总署数据清洗异常】{str(e)}")
+
+        # 7. 生成报告并发送通知
         duration = time.time() - start_time
-        minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
-        message = f'【海关总署】{start_year}年-{current_year}年数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
-        send_dingtalk_message(message)
+        minutes, seconds = divmod(duration, 60)
+
+        # 准备通知消息
+        year_range = f"{start_year}年-{current_year}年"
+        total_years = len(years_processed) if data_collected else 0
+
+        if data_collected:
+            # 成功采集通知
+            if years_processed:
+                processed_years = "、".join(map(str, years_processed))
+                message = (
+                    f"【海关总署数据采集完成】\n"
+                    f"• 目标年份: {year_range}\n"
+                    f"• 实际处理年份: {processed_years}\n"
+                    f"• 处理年份数量: {len(years_processed)}个\n"
+                    f"• 总耗时: {int(minutes)}分{seconds:.1f}秒"
+                )
+                send_dingtalk_message(message)
+        log.info("【海关总署】数据采集任务结束".center(66, "*"))
+
+
+if __name__ == "__main__":
+   main()

+ 17 - 0
crossborder/utils/constants.py

@@ -383,6 +383,23 @@ GUANGDONG_CUSTOMS_URL = {
     "湛江海关": "http://zhanjiang.customs.gov.cn/zhanjiang_customs/534855/zfxxgkzn24/534857/index.html"
 }
 
+# 定义海关到城市的映射关系
+CUSTOMS_CITY_MAPPING = {
+    "广州海关": ["广州市", "佛山市", "肇庆市", "韶关市", "清远市", "河源市", "云浮市"],
+    "深圳海关": ["深圳市", "惠州市"],
+    "拱北海关": ["中山市", "珠海市"],
+    "汕头海关": ["汕头市", "梅州市", "汕尾市", "潮州市", "揭阳市"],
+    "黄埔海关": ["东莞市"],
+    "江门海关": ["阳江市", "江门市"],
+    "湛江海关": ["茂名市", "湛江市"]
+}
+
+# 创建反向映射:城市->海关(可选,根据需求)
+CITY_CUSTOMS_MAPPING = {}
+for customs, cities in CUSTOMS_CITY_MAPPING.items():
+    for city in cities:
+        CITY_CUSTOMS_MAPPING[city] = customs
+
 
 
 

+ 54 - 19
crossborder/utils/db_helper.py

@@ -2,6 +2,7 @@ import pandas as pd
 from sqlalchemy import create_engine, text
 from sqlalchemy.exc import SQLAlchemyError
 
+from crossborder.utils.constants import CUSTOMS_CITY_MAPPING
 from crossborder.utils.crypto_utils import AESCryptor
 
 from crossborder.utils.log import  get_logger
@@ -235,43 +236,60 @@ class DBHelper:
             log.error(f"SQL执行失败: {str(e)}")
             raise
 
-    def get_code_exist(self, year_month, prov_code, is_city=True):
+    def get_code_exist(self, year_month, prov_code, is_city=True, customs_name=None, city_names=None):
         """
-        检查指定月份和省份在表中是否存在
+        检查指定月份和地区在表中是否存在记录
 
         参数:
         year_month: 年月字符串 (格式: YYYY-MM)
         prov_code: 省份代码
+        is_city: 是否为城市级数据
+        customs_name: 海关名称(可选)
+        city_names: 城市名称列表(可选)
 
         返回:
         匹配的记录数量
         """
-        # 根据省份代码确定表名(示例逻辑,实际需替换为您的表名规则)
-        table_name = self.get_table_name_by_province(prov_code,is_city)
+        # 获取表名
+        table_name = self.get_table_name_by_province(prov_code, is_city)
 
-        # 安全检查:防止SQL注入
-        if not table_name.isidentifier():
-            raise ValueError(f"非法表名: {table_name}")
+        # 构建基础查询
+        base_query = f"SELECT COUNT(*) FROM `{table_name}` WHERE crossborder_year_month = :year_month"
+        params = {"year_month": year_month}
 
-        # 使用参数化查询防止SQL注入
-        query = text(f"""
-            SELECT COUNT(*) FROM `{table_name}` 
-            WHERE crossborder_year_month = :year_month 
-              AND prov_code = :prov_code
-        """)
+        # 添加地区条件
+        conditions = []
+
+        if customs_name and customs_name in CUSTOMS_CITY_MAPPING:
+            # 根据海关获取对应的城市
+            cities = CUSTOMS_CITY_MAPPING[customs_name]
+            # 添加城市条件
+            conditions.append(f"city_name IN :cities")
+            params["cities"] = tuple(cities)
+
+        elif city_names:
+            # 直接使用提供的城市列表
+            conditions.append(f"city_name IN :cities")
+            params["cities"] = tuple(city_names)
+
+        elif is_city:
+            # 默认添加省份代码条件
+            conditions.append(f"prov_code = :prov_code")
+            params["prov_code"] = prov_code
+
+        # 组合查询条件
+        if conditions:
+            base_query += " AND " + " AND ".join(conditions)
 
         try:
+            query = text(base_query)
             with self.engine.connect() as connection:
-                result = connection.execute(
-                    query,
-                    {"year_month": year_month, "prov_code": prov_code}
-                ).scalar()
+                result = connection.execute(query, params).scalar()
                 return result or 0
 
         except SQLAlchemyError as e:
-            # 实际项目中应使用更详细的日志记录
             log.error(f"查询错误: {str(e)}")
-            return -1  # 表示查询出错
+            return -1
 
     # 辅助函数:根据省份代码获取表名(示例实现,按需修改)
     def get_table_name_by_province(self, prov_code, is_city=True):
@@ -301,3 +319,20 @@ class DBHelper:
             return "t_yujin_crossborder_prov_region_trade"
         else:
             return "t_yujin_crossborder_prov_commodity_trade"
+
+    def get_total_info_exist(self, year_month):
+        query = text(f"""
+             SELECT COUNT(*) FROM `t_yujin_crossborder_region_trade` 
+             WHERE `year_month` = :year_month 
+         """)
+
+        try:
+            with self.engine.connect() as connection:
+                result = connection.execute(
+                    query,
+                    {"year_month": year_month}
+                ).scalar()
+                return result or 0
+        except SQLAlchemyError as e:
+            log.error(f"查询错误: {str(e)}")
+            return -1  # 表示查询出错