Просмотр исходного кода

1.最新月数据防重复抓取逻辑增加
2.采集成功/失败钉钉消息通知
3.异常处理

01495251 1 месяц назад
Родитель
Сommit
78209af05d

+ 30 - 6
crossborder/fujian/selenium_fujian_download.py

@@ -41,10 +41,10 @@ def detect_latest_month(driver):
             WebDriverWait(driver, 10).until(
                 EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
             )
-            log.info(f"已找到最新月份数据 {check_year}-{check_month}")
+            log.info(f"【福建海关】最新数据: {check_year}-{check_month}")
             return check_year, check_month
         except:
-            log.info(f"未找到 {target_title}")
+            log.error(f"未找到 {target_title}")
             continue
     raise Exception("三个月内未找到有效数据")
 
@@ -172,7 +172,13 @@ def main():
                         help='终止年份(如2023),未指定时抓取最新两个月')
     args = parser.parse_args()
     start_time = time.time()
+
+    log.info("【福建海关】数据抓取开始".center(66, "*"))
+
     driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+    log.info("浏览器初始化完成")
+
+    target_months = []  # 初始化为空列表
     try:
         # 智能检测最新有效月份
         valid_year, valid_month = detect_latest_month(driver)
@@ -186,9 +192,14 @@ def main():
                 start_month=valid_month,
                 end_year=args.year,
                 skip_january=True
-
             )
         else:
+            # 防止重复抓取
+            db = DBHelper()
+            count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', "350000")
+            if count > 0:
+                log.error(f"数据库已存在【福建省】 {valid_year}-{valid_month:02d}地市数据,本次抓取终止")
+                return
             # 未指定年份时:取最近两个月
             target_months = generate_month_sequence(valid_year, valid_month)
 
@@ -196,18 +207,31 @@ def main():
         reverse_crawler(driver, target_months)
         log.info(f"{len(target_months)}个月份数据已采集完毕")
 
-    finally:
-        if 'driver' in locals():
-            driver.quit()
+        # 如果没有发生异常,则开始数据清洗和入库
         log.info("\n数据清洗入库中...")
         traverse_and_process(download_dir, parse_excel, province_name="fujian", year=args.year)
+
         log.info("\n福建省地级市数据同比更新中...")
         db_helper = DBHelper()
         db_helper.update_prov_yoy("福建省")
+
         duration = time.time() - start_time
         minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
         message = f'【福建海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
         send_dingtalk_message(message)
 
+    except Exception as e:
+        # 捕获异常,记录日志并发送错误通知
+        log.exception(f"程序执行发生异常: {str(e)}")
+        # 发送异常通知到钉钉
+        error_message = f"【福建海关数据抓取异常】\n错误信息:{str(e)}\n请检查程序!"
+        send_dingtalk_message(error_message)
+    finally:
+        # 确保浏览器退出
+        if driver:
+            driver.quit()
+            log.info("浏览器已退出")
+        log.info("【福建海关】处理流程结束".center(66, "*"))
+
 if __name__ == "__main__":
     main()

+ 9 - 3
crossborder/guangdong/selenium_guangdong_city.py

@@ -285,6 +285,11 @@ def main():
             if args.year:
                 target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january)
             else:
+                db = DBHelper()
+                count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', "440000", is_city=True)
+                if count > 0:
+                    log.error(f"数据库已存在【广东省】 {valid_year}-{valid_month:02d}地市数据,本次抓取终止")
+                    return
                 target_months = generate_month_sequence(valid_year, valid_month)
 
             log.info(f"【{customs_name}】目标采集月份序列:{target_months}")
@@ -300,19 +305,20 @@ def main():
            pass
 
     driver.quit()
+
     log.info("【广东省】数据抓取结束".center(66, "*"))
     log.info("\n广东省数据清洗入库中...")
     traverse_and_process(download_dir, parse_excel, province_name="guangdong", year=args.year)
     log.info("\n广东省地级市数据同比更新中...")
+
     db_helper = DBHelper()
     db_helper.update_prov_yoy("广东省")
     log.info("\n广东省地级市数据同比更新结束")
+
     duration = time.time() - start_time
     minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
     message = f'【广东省-广州海关、深圳海关、拱北海关、汕头海关、江门海关、黄埔海关、湛江海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
     send_dingtalk_message(message)
 
 if __name__ == "__main__":
-    main()
-    # db_helper = DBHelper()
-    # db_helper.update_prov_yoy("广东省")
+    main()

+ 56 - 15
crossborder/guangdong/selenium_guangdong_download.py

@@ -12,6 +12,7 @@ from selenium.webdriver.common.by import By
 from selenium.webdriver.support import expected_conditions as EC
 from selenium.webdriver.support.ui import WebDriverWait
 
+from crossborder.utils import base_mysql
 from crossborder.utils.db_helper import DBHelper
 from crossborder.utils.constants import DOWNLOAD_DIR, COUNTRY_CODE_MAPPING
 from crossborder.utils.dingtalk import send_dingtalk_message
@@ -62,7 +63,7 @@ def detect_latest_month(driver):
                     log.info(f"已找到最新月份数据 {check_year}-{check_month}")
                     return check_year, check_month
 
-            log.info(f"未找到匹配项(正则:{pattern.pattern})")
+            log.error(f"未找到匹配项(正则:{pattern.pattern})")
         except TimeoutException:
             log.info(f"页面加载超时或无匹配项({check_year}-{check_month})")
             continue
@@ -413,21 +414,36 @@ def handle_retry(driver):
 
 
 def main():
-    """主入口(优化参数处理逻辑)"""
+    """主入口(优化广东海关数据采集逻辑)"""
     parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
     parser.add_argument('--year', type=int, default=None,
                         help='终止年份(如2023),未指定时抓取最新两个月')
     args = parser.parse_args()
     start_time = time.time()
-    driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+    target_months = []  # 初始化目标月份列表
+    data_collected = False  # 数据采集状态标记
+    log.info("【广东海关】数据抓取开始".center(66, "*"))
+
+    driver = None  # 初始化避免未定义
+
     try:
-        # 智能检测最新有效月份
+        # 1. 初始化浏览器
+        driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+        log.info("浏览器初始化完成")
+
+        # 2. 检测最新有效月份
         valid_year, valid_month = detect_latest_month(driver)
         log.info(f"【广东海关】最新数据:{valid_year}年{valid_month:02d}月")
 
-        # 生成目标序列
+        # 3. 数据存在性检查(仅在未指定年份时执行)
+        if not args.year:
+            count = base_mysql.get_code_exist(f'{valid_year}-{valid_month:02d}', PROV_CODE)
+            if count > 0:
+                log.error(f"数据库已存在【广东省】 {valid_year}-{valid_month:02d} 商品贸易数据,本次抓取终止")
+                return
+
+        # 4. 生成目标月份序列
         if args.year:
-            # 指定年份时:从最新月到目标年1月
             target_months = generate_month_sequence(
                 start_year=valid_year,
                 start_month=valid_month,
@@ -435,18 +451,43 @@ def main():
                 skip_january=True
             )
         else:
-            # 未指定年份时:取最近两个月
-            target_months = generate_month_sequence(valid_year, valid_month)
+            # 未指定年份时只抓最近两个月份
+            target_months = generate_month_sequence(
+                start_year=valid_year,
+                start_month=valid_month
+            )
+
+        log.info(f"【广东海关】目标采集月份序列:{len(target_months)}个月份")
 
-        log.info(f"【广东海关】目标采集月份序列:{target_months}")
+        # 5. 执行数据采集
         reverse_crawler(driver, target_months)
-        log.info(f"【广东海关】{len(target_months)}个月份数据已采集完毕")
-        duration = time.time() - start_time
-        minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
-        message = f'【广东海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
-        send_dingtalk_message(message)
+        data_collected = True
+        log.info(f"【广东海关】成功采集 {len(target_months)} 个月份数据")
+
+        # 6. 数据清洗入库(如有需要可添加)
+        # log.info("\n【广东海关】数据清洗入库中...")
+        # traverse_and_process(...)
+
+    except Exception as e:
+        # 捕获并记录所有异常
+        log.exception(f"【广东海关】采集过程中发生错误: {str(e)}")
+        send_dingtalk_message(f"【广东海关数据采集异常】{str(e)}")
+
     finally:
-        driver.quit()
+        # 确保浏览器退出
+        if driver:
+            driver.quit()
+            log.info("浏览器已退出")
+
+        # 7. 只有在成功采集数据时才发送通知
+        if data_collected:
+            duration = time.time() - start_time
+            minutes, seconds = divmod(duration, 60)
+            message = (f"【广东海关】{len(target_months)}个月份数据采集完成"
+                       f",总耗时:{int(minutes)}分{seconds:.1f}秒")
+            send_dingtalk_message(message)
+
+        log.info("【广东海关】处理流程结束".center(66, "*"))
 
 
 if __name__ == "__main__":

+ 58 - 19
crossborder/henan/selenium_henan_download.py

@@ -14,6 +14,7 @@ from selenium.webdriver.support.ui import WebDriverWait
 
 from crossborder.henan.henan_parse_excel import parse_excel
 from crossborder.utils.constants import DOWNLOAD_DIR
+from crossborder.utils.db_helper import DBHelper
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.download_utils import configure_stealth_options, get_previous_month, download_excel, generate_month_sequence
 from crossborder.utils.log import  get_logger
@@ -59,7 +60,7 @@ def detect_latest_month(driver):
                     log.info(f"已找到最新月份数据 {check_year}-{check_month}")
                     return check_year, check_month
 
-            log.info(f"未找到匹配项(正则:{pattern.pattern})")
+            log.error(f"未找到匹配项(正则:{pattern.pattern})")
         except TimeoutException:
             log.error(f"页面加载超时或无匹配项({check_year}-{check_month})")
             continue
@@ -199,22 +200,38 @@ def handle_retry(driver):
 
 
 def main():
-    """主入口(优化参数处理逻辑)"""
-    global target_months
+    """主入口(优化河南海关数据采集逻辑)"""
     parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
     parser.add_argument('--year', type=int, default=None,
                         help='终止年份(如2023),未指定时抓取最新两个月')
     args = parser.parse_args()
     start_time = time.time()
-    driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+    target_months = []  # 初始化目标月份列表
+    data_collected = False  # 数据采集状态标记
+    log.info("【河南海关】数据抓取开始".center(66, "*"))
+
+    # 仅初始化浏览器一次,避免重复创建
+    driver = None
+
     try:
-        # 智能检测最新有效月份
+        # 1. 初始化浏览器
+        driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+        log.info("浏览器初始化完成")
+
+        # 2. 检测最新有效月份
         valid_year, valid_month = detect_latest_month(driver)
-        log.info(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月")
+        log.info(f"【河南海关】最新数据:{valid_year}年{valid_month:02d}月")
 
-        # 生成目标序列
+        # 3. 数据存在性检查(仅在未指定年份时执行)
+        if not args.year:
+            db = DBHelper()
+            count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', "410000")
+            if count > 0:
+                log.error(f"数据库已存在【河南省】 {valid_year}-{valid_month:02d} 商品贸易数据,本次抓取终止")
+                return
+
+        # 4. 生成目标月份序列
         if args.year:
-            # 指定年份时:从最新月到目标年1月
             target_months = generate_month_sequence(
                 start_year=valid_year,
                 start_month=valid_month,
@@ -222,22 +239,44 @@ def main():
                 skip_january=True
             )
         else:
-            # 未指定年份时:取最近两个月
-            target_months = generate_month_sequence(valid_year, valid_month)
+            # 未指定年份时只抓最近两个月份
+            target_months = generate_month_sequence(
+                start_year=valid_year,
+                start_month=valid_month
+            )
+
+        log.info(f"【河南海关】目标采集月份序列:{len(target_months)}个月份")
 
-        log.info(f"目标采集月份序列:{target_months}")
+        # 5. 执行数据采集
         reverse_crawler(driver, target_months)
-        log.info(f"{len(target_months)}个月份数据已采集完毕")
+        data_collected = True
+        log.info(f"【河南海关】成功采集 {len(target_months)} 个月份数据")
 
+        # 6. 数据清洗入库
+        log.info("\n【河南海关】数据清洗入库中...")
+        traverse_and_process(download_dir, parse_excel, province_name="henan", year=args.year)
+        log.info("数据清洗入库完成")
+
+    except Exception as e:
+        # 捕获并记录所有异常
+        log.exception(f"【河南海关】采集过程中发生错误: {str(e)}")
+        send_dingtalk_message(f"【河南海关数据采集异常】{str(e)}")
 
     finally:
-        driver.quit()
-        log.info("\n数据清洗入库中...")
-        traverse_and_process(download_dir, parse_excel, province_name="henan", year=args.year)
-        duration = time.time() - start_time
-        minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
-        message = f'【河南海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
-        send_dingtalk_message(message)
+        # 确保浏览器退出
+        if driver:
+            driver.quit()
+            log.info("浏览器已退出")
+
+        # 7. 只有在成功采集数据时才发送通知
+        if data_collected:
+            duration = time.time() - start_time
+            minutes, seconds = divmod(duration, 60)
+            message = (f"【河南海关】{len(target_months)}个月份数据采集完成"
+                       f",总耗时:{int(minutes)}分{seconds:.1f}秒")
+            send_dingtalk_message(message)
+
+        log.info("【河南海关】处理流程结束".center(66, "*"))
 
 if __name__ == "__main__":
     main()

+ 55 - 18
crossborder/shandong/selenium_shandong_download.py

@@ -163,48 +163,85 @@ def handle_retry(driver):
 
 
 def main():
-    """主入口(优化参数处理逻辑)"""
-    global target_months
+    """主入口(优化山东海关数据采集逻辑)"""
     parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
     parser.add_argument('--year', type=int, default=None,
                         help='终止年份(如2023),未指定时抓取最新两个月')
     args = parser.parse_args()
     start_time = time.time()
+    target_months = []  # 初始化避免未定义
+    data_collected = False  # 标记是否成功采集数据
+
+    # 初始化日志标题
     log.info("【山东海关】数据抓取开始".center(66, "*"))
-    driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+
+    driver = None
     try:
-        # 智能检测最新有效月份
+        # 1. 初始化浏览器
+        driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
+        log.info("浏览器初始化完成")
+
+        # 2. 检测最新有效月份
         valid_year, valid_month = detect_latest_month(driver)
         log.info(f"【山东海关】最新数据:{valid_year}年{valid_month:02d}月")
 
-        # 生成目标序列
+
+        db = DBHelper()
+
+        # 4. 生成目标序列
         if args.year:
-            # 指定年份时:从最新月到目标年1月
             target_months = generate_month_sequence(
                 start_year=valid_year,
                 start_month=valid_month,
                 end_year=args.year
             )
         else:
-            # 未指定年份时:取最近两个月
+            # 检查数据是否已存在
+            count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', "370000")
+            if count > 0 and not args.year:
+                log.error(f"数据库已存在【山东省】 {valid_year}-{valid_month:02d} 数据,本次抓取终止")
+                return
+            # 未指定年份时只抓最近两个月份
             target_months = generate_month_sequence(valid_year, valid_month)
 
         log.info(f"【山东海关】目标采集月份序列:{target_months}")
+
+        # 5. 执行数据采集
         reverse_crawler(driver, target_months)
         log.info(f"{len(target_months)}个月份数据已采集完毕")
+        data_collected = True
+
+        # 6. 数据清洗入库
+        log.info("\n【山东海关】数据清洗入库中...")
+        traverse_and_process(download_dir, parse_excel,
+                             province_name="shandong",
+                             year=args.year)
+
+        # 7. 同比数据更新
+        log.info("\n【山东海关】地级市数据同比更新中...")
+        db.update_prov_yoy("山东省")
+        log.info("\n【山东海关】同比更新完成")
+
+    except Exception as e:
+        # 捕获并记录所有异常
+        log.exception(f"【山东海关】采集过程中发生错误: {str(e)}")
+        send_dingtalk_message(f"【山东海关数据采集异常】{str(e)}")
 
     finally:
-        driver.quit()
-        log.info("【山东海关】数据抓取结束".center(66, "*"))
-        log.info("\n山东省数据清洗入库中...")
-        traverse_and_process(download_dir, parse_excel, province_name="shandong",  year=args.year)
-        log.info("\n山东省地级市数据同比更新中...")
-        db_helper = DBHelper()
-        db_helper.update_prov_yoy("山东省")
-        duration = time.time() - start_time
-        minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
-        message = f'【山东海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
-        send_dingtalk_message(message)
+        # 确保浏览器退出
+        if driver:
+            driver.quit()
+            log.info("浏览器已退出")
+
+        # 只有成功采集才发送成功通知
+        if data_collected:
+            duration = time.time() - start_time
+            minutes, seconds = divmod(duration, 60)
+            message = (f"【山东海关】{len(target_months)}个月份数据采集完成"
+                       f",总耗时:{int(minutes)}分{seconds:.1f}秒")
+            send_dingtalk_message(message)
+
+        log.info("【山东海关】处理流程结束".center(66, "*"))
 
 
 if __name__ == "__main__":

+ 1 - 1
crossborder/utils/constants.py

@@ -6,7 +6,7 @@ PROJECT_ROOT = Path(os.path.abspath(os.path.dirname(__file__))).parent.parent
 
 if sys.platform.startswith('linux'):
     # Linux环境指定为/home目录
-    DOWNLOAD_DIR = Path('/home/crossborder/downloads')
+    DOWNLOAD_DIR = Path('/home/testadmin/downloads')
 else:
     # Windows保持原有结构(项目根目录下的downloads文件夹)
     DOWNLOAD_DIR = PROJECT_ROOT / 'downloads'

+ 69 - 0
crossborder/utils/db_helper.py

@@ -1,5 +1,7 @@
 import pandas as pd
 from sqlalchemy import create_engine, text
+from sqlalchemy.exc import SQLAlchemyError
+
 from crossborder.utils.crypto_utils import AESCryptor
 
 from crossborder.utils.log import  get_logger
@@ -232,3 +234,70 @@ class DBHelper:
         except Exception as e:
             log.error(f"SQL执行失败: {str(e)}")
             raise
+
+    def get_code_exist(self, year_month, prov_code, is_city=True):
+        """
+        检查指定月份和省份在表中是否存在
+
+        参数:
+        year_month: 年月字符串 (格式: YYYY-MM)
+        prov_code: 省份代码
+
+        返回:
+        匹配的记录数量
+        """
+        # 根据省份代码确定表名(示例逻辑,实际需替换为您的表名规则)
+        table_name = self.get_table_name_by_province(prov_code,is_city)
+
+        # 安全检查:防止SQL注入
+        if not table_name.isidentifier():
+            raise ValueError(f"非法表名: {table_name}")
+
+        # 使用参数化查询防止SQL注入
+        query = text(f"""
+            SELECT COUNT(*) FROM `{table_name}` 
+            WHERE crossborder_year_month = :year_month 
+              AND prov_code = :prov_code
+        """)
+
+        try:
+            with self.engine.connect() as connection:
+                result = connection.execute(
+                    query,
+                    {"year_month": year_month, "prov_code": prov_code}
+                ).scalar()
+                return result or 0
+
+        except SQLAlchemyError as e:
+            # 实际项目中应使用更详细的日志记录
+            log.error(f"查询错误: {str(e)}")
+            return -1  # 表示查询出错
+
+    # 辅助函数:根据省份代码获取表名(示例实现,按需修改)
+    def get_table_name_by_province(self, prov_code, is_city=True):
+        """
+        根据省份代码和数据类型返回对应表名
+        """
+        # 主要表名映射规则
+        # 350000=福建,370000=山东,410000=河南,440000=广东
+        table_mapping = {
+            "350000": "t_yujin_crossborder_prov_region_trade",
+            "370000": "t_yujin_crossborder_prov_region_trade",
+            "410000": "t_yujin_crossborder_prov_commodity_trade",
+            "440000": self.get_guangdong_table(is_city)  # 特殊处理广东省
+        }
+
+        if prov_code not in table_mapping:
+            raise ValueError(f"不支持省份代码: {prov_code}")
+
+        return table_mapping[prov_code]
+
+    # 新增方法:处理广东省的特殊情况
+    def get_guangdong_table(self, is_city):
+        """
+        根据数据类型返回广东省对应的表名
+        """
+        if is_city:
+            return "t_yujin_crossborder_prov_region_trade"
+        else:
+            return "t_yujin_crossborder_prov_commodity_trade"

+ 1 - 1
crossborder/utils/log.py

@@ -23,7 +23,7 @@ def configure_logging():
 
     # 日志目录设置
     if sys.platform.startswith('linux'):
-        log_dir = Path('/home/crossborder/logs')
+        log_dir = Path('/home/testadmin/logs')
     else:
         log_dir = project_root / 'logs'
 

+ 0 - 2
pyproject.toml

@@ -43,7 +43,6 @@ dependencies = [
     "packaging (>=25.0)",
     "pandas (>=2.2.3)",
     "parsel (>=1.10.0)",
-    "playwright (>=1.52.0)",
     "protego (>=0.4.0)",
     "pyasn1 (>=0.6.1)",
     "pyasn1-modules (>=0.4.2)",
@@ -61,7 +60,6 @@ dependencies = [
     "requests-file (>=2.1.0)",
     "schedule (>=1.2.2)",
     "scrapy (>=2.13.0)",
-    "scrapy-playwright (>=0.0.43)",
     "selenium (>=4.32.0)",
     "service-identity (>=24.2.0)",
     "setuptools (>=80.4.0)",