import argparse import argparse import random import re import time import traceback from datetime import datetime, timedelta from selenium import webdriver from selenium.common import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from henan.henan_parse_excel import parse_excel from utils.constants import DOWNLOAD_DIR from utils.download_utils import configure_stealth_options, get_previous_month, download_excel, generate_month_sequence from utils.parse_utils import traverse_and_process # 基础配置 MAX_RETRY = 3 DOWNLOAD_TIMEOUT = 60 BASE_URL = "http://zhengzhou.customs.gov.cn/zhengzhou_customs/zfxxgk97/2967383/2967458/501407/0e9d768a-1.html" download_dir = DOWNLOAD_DIR / "henan" def detect_latest_month(driver): """三级回溯智能检测最新有效月份(使用正则简化匹配)""" driver.get(BASE_URL) current_date = datetime.now() for offset in range(0, 3): check_date = current_date - timedelta(days=offset * 30) check_year = check_date.year check_month = check_date.month # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许年/月前后有空格 pattern = re.compile( rf'{check_year}\s*年\s*(1至)?{check_month}\s*月\s*河南省进出口商品国别\(地区\)总值表', re.IGNORECASE ) try: # 使用 Python 端的正则匹配所有链接 title elements = WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.XPATH, '//a')) ) for element in elements: title = element.get_attribute("title") if pattern.search(title): print(f"已找到最新月份数据 {check_year}-{check_month}") return check_year, check_month print(f"未找到匹配项(正则:{pattern.pattern})") except TimeoutException: print(f"页面加载超时或无匹配项({check_year}-{check_month})") continue raise Exception("三个月内未找到有效数据") def process_month_data(driver, year, month): """兼容多种格式,确保三种表格都能识别并下载""" # 定义三类目标标题模板 title_templates = [ f"{year}年1至{month}月河南省出口主要商品量值表", f"{year}年1至{month}月河南省进口主要商品量值表", f"{year}年1至{month}月河南省进出口商品国别(地区)总值表" ] # 构建正则匹配模板(支持“年X月”、“年1至X月”,并允许前后有空格) patterns = [ re.compile( rf'{year}\s*年\s*(1至)?{month}\s*月\s*河南省(?:出口主要商品|进口主要商品|进出口商品国别[$(|$(]地区[$)|$)])(量值表|总值表)', re.IGNORECASE ) for _ in [month] ] found_count = 0 links = driver.find_elements(By.XPATH, '//a[contains(@title,"河南省")]') for link in links: title = link.get_attribute("title") if any(pattern.search(title) for pattern in patterns): retry = 0 max_retries = 3 # 最大重试次数 success = False while retry < max_retries and not success: try: url = link.get_attribute("href") download_excel(driver, url, year, month, title, download_dir) found_count += 1 time.sleep(random.uniform(0.5, 1.5)) # 下载间隔 success = True # 成功则跳出循环 except Exception as e: retry += 1 print(f"下载 {title} 失败(第{retry}次重试): {e}") traceback.print_exc() if retry < max_retries: time.sleep(random.uniform(2, 5)) # 随机等待后再试 else: print(f"{title} 下载已达到最大重试次数,跳过该文件。") print(f"本页找到{found_count}个有效表格") return found_count def reverse_crawler(driver, target_months): """逆向分页抓取核心逻辑""" processed_months = set() # target_months = [(2023, 5), (2023, 4)] page = 1 for year, month in target_months: print(f"\n开始处理 {year}年{month}月数据".center(50, "=")) WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) current_page = 1 found_tables = 0 while True: # 智能等待页面稳定 random_sleep(base=2, variance=3) try: print(f"当前页面:{driver.current_url}, 第{page}页") # 处理当前页面的表格数据 found = process_month_data(driver, year, month) found_tables += found # 完成四个表格采集 if found_tables >= 3: print(f"已完成{year}年{month}月全部表格采集") processed_months.add((year, month)) break print(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集") # 分页操作(增强定位稳定性) WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]')) ).click() current_page += 1 page += 1 except TimeoutException: print(f"未找到更多分页,已采集表格数:{found_tables}/3") break except Exception as e: print(f"分页异常:{str(e)}") handle_retry(driver) # 异常恢复函数 break return processed_months def extract_page_date(driver): """增强型页面日期提取""" try: date_str = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ).get_attribute("innerHTML") match = re.search(r"(\d{4})年(\d{1,2})月", date_str) return int(match.group(1)), int(match.group(2)) except: return datetime.now().year, datetime.now().month def random_sleep(base=2, variance=5): """智能随机等待""" sleep_time = base + random.random() * variance time.sleep(sleep_time) def handle_retry(driver): """异常恢复处理""" try: driver.refresh() WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) print("浏览器异常已恢复") except: print("需要人工干预的严重错误") raise def main(): """主入口(优化参数处理逻辑)""" parser = argparse.ArgumentParser(description='海关数据智能抓取系统') parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月') args = parser.parse_args() driver = webdriver.Firefox(options=configure_stealth_options(download_dir)) try: # 智能检测最新有效月份 valid_year, valid_month = detect_latest_month(driver) print(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月") # 生成目标序列 if args.year: # 指定年份时:从最新月到目标年1月 target_months = generate_month_sequence( start_year=valid_year, start_month=valid_month, end_year=args.year, skip_january=True ) else: # 未指定年份时:取最近两个月 target_months = generate_month_sequence(valid_year, valid_month) print(f"目标采集月份序列:{target_months}") reverse_crawler(driver, target_months) print(f"{len(target_months)}个月份数据已采集完毕") finally: driver.quit() print("\n数据清洗入库中...") traverse_and_process(download_dir, parse_excel, province_name="henan") if __name__ == "__main__": main()