import argparse import random import time from datetime import datetime, timedelta from selenium import webdriver from selenium.common import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from fujian.fujian_parse_excel import parse_excel from utils.constants import DOWNLOAD_DIR from utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel from utils.parse_utils import traverse_and_process # 基础配置 MAX_RETRY = 3 BASE_URL = "http://fuzhou.customs.gov.cn/fuzhou_customs/zfxxgk19/2963574/2963954/484131/index.html" download_dir = DOWNLOAD_DIR / "fujian" def detect_latest_month(driver): """三级回溯智能检测最新有效月份""" driver.get(BASE_URL) current_date = datetime.now() for offset in range(0, 3): check_date = current_date - timedelta(days=offset * 30) check_year = check_date.year check_month = check_date.month target_title = f"{check_year}年{check_month}月和1-{check_month}月福建省外贸进出口情况表(分地市)" try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]')) ) print(f"已找到最新月份数据 {check_year}-{check_month}") return check_year, check_month except: print(f"未找到 {target_title}") continue raise Exception("三个月内未找到有效数据") def process_month_data(driver, year, month): """ 处理地市贸易数据(增强1月逻辑 + 下载失败重试机制) """ required_title = f"{year}年{month}月和1-{month}月福建省外贸进出口情况表(分地市)" found_count = 0 # 获取所有匹配的链接 links = driver.find_elements(By.XPATH, '//a[contains(@title,"福建省")]') for link in links: title = link.get_attribute("title") if title == required_title: url = link.get_attribute("href") retry = 0 success = False while retry < MAX_RETRY and not success: try: download_excel(driver, url, year, month, title, download_dir) found_count += 1 success = True time.sleep(random.uniform(0.5, 1.5)) # 成功后等待 except Exception as e: retry += 1 print(f"下载 {title} 失败(第{retry}次重试): {str(e)}") if retry >= MAX_RETRY: print(f"❌ 超出最大重试次数,跳过该文件:{title}") return 1000 else: print(f"🔄 第{retry}次重试:{title}") time.sleep(random.uniform(2, 4)) # 重试前随机等待 print(f"本页找到{found_count}个有效表格") return found_count def reverse_crawler(driver, target_months): """逆向分页抓取核心(优化分页逻辑)""" processed_months = set() # target_months = [(2023, 5), (2023, 4)] page = 1 for year, month in target_months: print(f"\n开始处理 {year}年{month}月数据".center(50, "=")) WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) current_page = 1 found_tables = 0 while True: # 智能等待页面稳定 random_sleep(base=2, variance=3) try: # 动态检测当前页面月份 print(f"当前页面:{driver.current_url}, 第{page}页") # 处理当前页面的表格数据 found = process_month_data(driver, year, month) found_tables += found if found_tables == 1000: print(f"❌{year}年{month}月数据采集失败,跳过当前月") break # 完成四个表格采集 if found_tables >= 1: print(f"已完成{year}年{month}月全部表格采集") processed_months.add((year, month)) break print(f"第{page}页已采集表格数:{found_tables}/1,前往下一页采集") # 分页操作(增强定位稳定性) WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]')) ).click() current_page += 1 page += 1 except TimeoutException: print(f"未找到更多分页,已采集表格数:{found_tables}/1") break except Exception as e: print(f"分页异常:{str(e)}") handle_retry(driver) # 异常恢复函数 break return processed_months def random_sleep(base=2, variance=5): """智能随机等待""" sleep_time = base + random.random() * variance time.sleep(sleep_time) def handle_retry(driver): """异常恢复处理""" try: driver.refresh() WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) print("浏览器异常已恢复") except: print("需要人工干预的严重错误") raise def main(): """主入口(优化参数处理逻辑)""" parser = argparse.ArgumentParser(description='海关数据智能抓取系统') parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月') args = parser.parse_args() driver = webdriver.Firefox(options=configure_stealth_options(download_dir)) try: # 智能检测最新有效月份 valid_year, valid_month = detect_latest_month(driver) print(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月") # 生成目标序列 if args.year: # 指定年份时:从最新月到目标年1月 target_months = generate_month_sequence( start_year=valid_year, start_month=valid_month, end_year=args.year, skip_january=True ) else: # 未指定年份时:取最近两个月 target_months = generate_month_sequence(valid_year, valid_month) print(f"目标采集月份序列:{target_months}") reverse_crawler(driver, target_months) print(f"{len(target_months)}个月份数据已采集完毕") finally: if 'driver' in locals(): driver.quit() print("\n数据清洗入库中...") traverse_and_process(download_dir, parse_excel, province_name="fujian") if __name__ == "__main__": main()