Procházet zdrojové kódy

补充增量定时爬取数据逻辑

zhangfan před 2 dny
rodič
revize
7060f9b200

+ 49 - 6
hebei/crawl_gov_hebei_full.py

@@ -2,7 +2,9 @@ import os
 import random
 import re
 import time
+import sys
 from pathlib import Path
+from datetime import datetime, timedelta
 
 from faker import Faker
 from selenium import webdriver
@@ -74,7 +76,7 @@ def remove_prefix_from_url(url):
 
     return url
 
-def find_target_links(driver):
+def find_target_links(driver, year_month):
     """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
     # 等待页面加载完成
     WebDriverWait(driver, 30).until(
@@ -89,8 +91,13 @@ def find_target_links(driver):
         file_name = elements.text.strip()
         if not file_name:
             continue
-        if file_name.startswith('2022'):
-            return 'stop'
+        if year_month is None:
+            if file_name.startswith('2022'):
+                return 'stop'
+        else:
+            if not file_name.startswith(year_month):
+                log.info(f"非 {year_month} 文件: {file_name}, stop")
+                return 'stop'
         if '进口商品' in file_name or '出口商品' in file_name or '分国家' in file_name or '分国别' in file_name or '地市' in file_name:
             file_url = elements.get_attribute("href")
             file_url = remove_prefix_from_url(file_url)
@@ -138,9 +145,44 @@ def extract_year_and_month(file_name):
     else:
         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
-def crawl_with_selenium(url):
+def detect_latest_month(driver, url):
+    driver.get(url)
+    current_date = datetime.now()
+    for offset in range(0, 3):
+        check_date = current_date - timedelta(days=offset * 30)
+        check_year = check_date.year
+        check_month = check_date.month
+
+        target_title = f"{check_year}年{check_month}月"
+        try:
+            WebDriverWait(driver, 10).until(
+                EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
+            )
+            log.info(f"已找到最新月份数据 {check_year}-{check_month}")
+            # 看是否已存表,已存则跳过;
+            count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', '130000')
+            if count > 0:
+                log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
+                continue
+            return f"{check_year}年{check_month}月"
+        except:
+            log.info(f"未找到 {target_title}")
+            continue
+    log.info("三个月内未找到有效数据")
+    return None
+
+def crawl_with_selenium(url, mark):
     driver = webdriver.Firefox(options=configure_stealth_options())
 
+    year_month = None
+    if 'increment' == mark:
+        res = detect_latest_month(driver, url)
+        if res is None:
+            log.info("河北省海关没有最新数据更新")
+            sys.exit(0)
+        year_month = res
+        print(f"检测到最新有效数据:{year_month}")
+
     try:
         # 注入反检测脚本
         driver.execute_script("""
@@ -155,7 +197,7 @@ def crawl_with_selenium(url):
 
         while True:
             # 访问当前页
-            result = find_target_links(driver)
+            result = find_target_links(driver, year_month)
             if result and result == 'stop':
                 break
 
@@ -248,7 +290,8 @@ def hierarchical_traversal(root_path):
 
 
 if __name__ == "__main__":
-    crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html')
+    crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'all')
+    # crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'increment')
     # 等待5s后执行
     time.sleep(5)
     hierarchical_traversal(base_country_code.download_dir)

+ 50 - 7
jiangsu/crawl_gov_jiangsu_full.py

@@ -6,6 +6,8 @@ import time
 import rarfile
 import shutil
 from pathlib import Path
+import sys
+from datetime import datetime, timedelta
 
 from faker import Faker
 from selenium import webdriver
@@ -54,7 +56,7 @@ def configure_stealth_options():
     opts.add_argument("--headless")
     return opts
 
-def find_target_links(driver):
+def find_target_links(driver, year_month):
     """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
     # 等待页面加载完成
     WebDriverWait(driver, 30).until(
@@ -102,8 +104,14 @@ def find_target_links(driver):
                         continue
 
                     for xls_file in xls_files:
-                        if xls_file.startswith('2022'):
-                            return 'stop'
+                        if year_month is None:
+                            if xls_file.startswith('2022'):
+                                return 'stop'
+                        else:
+                            if not xls_file.startswith(year_month):
+                                log.info(f"非 {year_month} 文件: {file_name}, stop")
+                                return 'stop'
+
                         if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file:
                             log.info(f"检测到不需要的文件:{xls_file},跳过")
                             continue
@@ -175,10 +183,44 @@ def extract_rar(rar_path, extract_to):
         log.info(f"解压失败: {result.stderr.decode('gbk')}")
         return False
 
-
-def crawl_with_selenium(url):
+def detect_latest_month(driver, url):
+    driver.get(url)
+    current_date = datetime.now()
+    for offset in range(0, 3):
+        check_date = current_date - timedelta(days=offset * 30)
+        check_year = check_date.year
+        check_month = check_date.month
+
+        target_title = f"{check_year}年{check_month}月"
+        try:
+            WebDriverWait(driver, 10).until(
+                EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
+            )
+            log.info(f"已找到最新月份数据 {check_year}-{check_month}")
+            # 看是否已存表,已存则跳过;
+            count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', '320000')
+            if count > 0:
+                log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
+                continue
+            return f"{check_year}年{check_month}月"
+        except:
+            log.info(f"未找到 {target_title}")
+            continue
+    log.info("三个月内未找到有效数据")
+    return None
+
+def crawl_with_selenium(url, mark):
     driver = webdriver.Firefox(options=configure_stealth_options())
 
+    year_month = None
+    if 'increment' == mark:
+        res = detect_latest_month(driver, url)
+        if res is None:
+            log.info("江苏省海关没有最新数据更新")
+            sys.exit(0)
+        year_month = res
+        print(f"检测到最新有效数据:{year_month}")
+
     try:
         # 注入反检测脚本
         driver.execute_script("""
@@ -193,7 +235,7 @@ def crawl_with_selenium(url):
 
         while True:
             # 访问当前页
-            result = find_target_links(driver)
+            result = find_target_links(driver, year_month)
             if result == 'stop':
                 break
 
@@ -280,7 +322,8 @@ def hierarchical_traversal(root_path, all_records):
                 gov_commodity_jiangsu_city.process_folder(md['path'])
 
 if __name__ == "__main__":
-    crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html')
+    crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html', 'all')
+    # crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html', 'increment')
     log.info(f"江苏南京海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)

+ 70 - 13
zhejiang/crawl_gov_zhejiangi_full.py

@@ -2,6 +2,8 @@ import os
 import random
 import re
 import time
+import sys
+from datetime import datetime, timedelta
 from pathlib import Path
 from urllib.parse import urljoin
 
@@ -51,7 +53,7 @@ def configure_stealth_options():
     opts.add_argument("--headless")
     return opts
 
-def crawl_by_year_tabs(driver, base_url):
+def crawl_by_year_tabs(driver, base_url, year_month):
     """按年份Tab导航爬取数据"""
     years = ['2023年', '2024年', '2025年']
     WebDriverWait(driver, 30).until(
@@ -74,15 +76,15 @@ def crawl_by_year_tabs(driver, base_url):
         driver.switch_to.window(driver.window_handles[-1])
         log.info(f"\n正在处理 {year_text} 年份页面")
 
-        process_month_tabs(driver, year_text, base_url)
+        process_month_tabs(driver, year_text, base_url, year_month)
 
         # 返回主窗口
         driver.close()
         driver.switch_to.window(driver.window_handles[0])
 
-def process_month_tabs(driver, year, base_url):
+def process_month_tabs(driver, year, base_url, year_month):
     """处理月份Tab导航(动态获取真实存在的月份)"""
-    # 显式等待容器加载
+    # 显式等待容器加载
     WebDriverWait(driver, 30).until(
         EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
     )
@@ -92,8 +94,7 @@ def process_month_tabs(driver, year, base_url):
     processed_months = set()  # 已处理月份记录
     retry_count = 0
 
-    # while retry_count < 3:  # 最多重试3次
-    while True:  # 最多重试3次
+    while retry_count < 3:
         try:
             # 全量获取所有月份Tab
             month_items = driver.find_elements(By.XPATH, '//ul[@class="nav_tab"]//li')
@@ -115,6 +116,14 @@ def process_month_tabs(driver, year, base_url):
                     continue  # 跳过已处理月份
 
                 log.info(f"点击月份Tab:{year}-{month_text}")
+                if year_month is not None:
+                    tar_year, tar_month = year_month.split('-')[0], year_month.split('-')[1]
+                    if tar_year != year:
+                        retry_count += 1
+                        break
+                    if tar_month != month_text:
+                        log.info(f"{year}年 {month_text} 月份跳过, increment tar: {year_month}")
+                        continue
                 a_tag.click()
 
                 # 处理详情页逻辑
@@ -145,13 +154,10 @@ def process_month_tabs(driver, year, base_url):
                 break
             else:
                 # 部分月份未找到,重新获取元素
-                # retry_count += 1
                 log.info(f"第 {retry_count} 次重试获取月份Tab...")
-                time.sleep(2)
 
         except StaleElementReferenceException:
             log.info("页面刷新,重新获取月份Tab列表...")
-            # retry_count += 1
             time.sleep(2)
 
     log.info(f"{year}年最终处理的月份:{processed_months}")
@@ -234,10 +240,60 @@ def extract_year_and_month(file_name):
     else:
         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
-def crawl_with_selenium(url):
+
+def convert_to_chinese_uppercase(num):
+    if not 1 <= num <= 12:
+        return None  # 超出范围的数字返回 None 或根据需要处理
+    if num < 10:
+        return '零一二三四五六七八九'[num]
+    elif num == 10:
+        return '十'
+    elif num == 11:
+        return '十一'
+    elif num == 12:
+        return '十二'
+    return None
+
+def detect_latest_month(driver, url):
+    driver.get(url)
+    current_date = datetime.now()
+    for offset in range(0, 3):
+        check_date = current_date - timedelta(days=offset * 30)
+        check_year = check_date.year
+        month = check_date.month
+        check_month = convert_to_chinese_uppercase(month)
+
+        target_title = f"{check_month}月"
+        try:
+            WebDriverWait(driver, 10).until(
+                EC.presence_of_element_located((By.XPATH, f'//ul[@class="nav_tab"]//li/a[normalize-space()="{target_title}"]'))
+            )
+            log.info(f"已找到最新月份数据 {check_year}-{check_month}")
+            # 看是否已存表,已存则跳过;
+            count = base_mysql.get_code_exist(f'{check_year}-{month:02d}', '330000')
+            if count > 0:
+                log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
+                continue
+            return f"{check_year}年-{check_month}月"
+        except:
+            log.info(f"未找到 {target_title}")
+            continue
+    log.info("三个月内未找到有效数据")
+    return None
+
+def crawl_with_selenium(url, mark):
     driver = webdriver.Firefox(options=configure_stealth_options())
-    base_url = 'http://hangzhou.customs.gov.cn'
 
+    year_month = None
+    if 'increment' == mark:
+        res = detect_latest_month(driver, url)
+        if res is None:
+            log.info("浙江省海关没有最新数据更新")
+            sys.exit(0)
+        year_month = res
+        print(f"检测到最新有效数据:{year_month}")
+
+    base_url = 'http://hangzhou.customs.gov.cn'
     try:
         # 注入反检测脚本
         driver.execute_script("""
@@ -251,7 +307,7 @@ def crawl_with_selenium(url):
         driver.get(url)
 
         # 按年份导航
-        crawl_by_year_tabs(driver, base_url)
+        crawl_by_year_tabs(driver, base_url, year_month)
 
     finally:
         driver.quit()
@@ -316,7 +372,8 @@ def hierarchical_traversal(root_path):
                 gov_commodity_zhejiang_city.process_folder(md['path'])
 
 if __name__ == "__main__":
-    crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html')
+    # crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html', 'all')
+    crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html', 'increment')
     log.info(f"浙江杭州海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)