123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- import argparse
- import argparse
- import random
- import re
- import time
- import traceback
- from datetime import datetime, timedelta
- from selenium import webdriver
- from selenium.common import TimeoutException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from henan.henan_parse_excel import parse_excel
- from utils.constants import DOWNLOAD_DIR
- from utils.download_utils import configure_stealth_options, get_previous_month, download_excel, generate_month_sequence
- from utils.parse_utils import traverse_and_process
- # 基础配置
- MAX_RETRY = 3
- DOWNLOAD_TIMEOUT = 60
- BASE_URL = "http://zhengzhou.customs.gov.cn/zhengzhou_customs/zfxxgk97/2967383/2967458/501407/0e9d768a-1.html"
- download_dir = DOWNLOAD_DIR / "henan"
- def detect_latest_month(driver):
- """三级回溯智能检测最新有效月份(使用正则简化匹配)"""
- driver.get(BASE_URL)
- current_date = datetime.now()
- for offset in range(0, 3):
- check_date = current_date - timedelta(days=offset * 30)
- check_year = check_date.year
- check_month = check_date.month
- # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许年/月前后有空格
- pattern = re.compile(
- rf'{check_year}\s*年\s*(1至)?{check_month}\s*月\s*河南省进出口商品国别\(地区\)总值表',
- re.IGNORECASE
- )
- try:
- # 使用 Python 端的正则匹配所有链接 title
- elements = WebDriverWait(driver, 10).until(
- EC.presence_of_all_elements_located((By.XPATH, '//a'))
- )
- for element in elements:
- title = element.get_attribute("title")
- if pattern.search(title):
- print(f"已找到最新月份数据 {check_year}-{check_month}")
- return check_year, check_month
- print(f"未找到匹配项(正则:{pattern.pattern})")
- except TimeoutException:
- print(f"页面加载超时或无匹配项({check_year}-{check_month})")
- continue
- raise Exception("三个月内未找到有效数据")
- def process_month_data(driver, year, month):
- """兼容多种格式,确保三种表格都能识别并下载"""
- # 定义三类目标标题模板
- title_templates = [
- f"{year}年1至{month}月河南省出口主要商品量值表",
- f"{year}年1至{month}月河南省进口主要商品量值表",
- f"{year}年1至{month}月河南省进出口商品国别(地区)总值表"
- ]
- # 构建正则匹配模板(支持“年X月”、“年1至X月”,并允许前后有空格)
- patterns = [
- re.compile(
- rf'{year}\s*年\s*(1至)?{month}\s*月\s*河南省(?:出口主要商品|进口主要商品|进出口商品国别[$(|$(]地区[$)|$)])(量值表|总值表)',
- re.IGNORECASE
- )
- for _ in [month]
- ]
- found_count = 0
- links = driver.find_elements(By.XPATH, '//a[contains(@title,"河南省")]')
- for link in links:
- title = link.get_attribute("title")
- if any(pattern.search(title) for pattern in patterns):
- retry = 0
- max_retries = 3 # 最大重试次数
- success = False
- while retry < max_retries and not success:
- try:
- url = link.get_attribute("href")
- download_excel(driver, url, year, month, title, download_dir)
- found_count += 1
- time.sleep(random.uniform(0.5, 1.5)) # 下载间隔
- success = True # 成功则跳出循环
- except Exception as e:
- retry += 1
- print(f"下载 {title} 失败(第{retry}次重试): {e}")
- traceback.print_exc()
- if retry < max_retries:
- time.sleep(random.uniform(2, 5)) # 随机等待后再试
- else:
- print(f"{title} 下载已达到最大重试次数,跳过该文件。")
- print(f"本页找到{found_count}个有效表格")
- return found_count
- def reverse_crawler(driver, target_months):
- """逆向分页抓取核心逻辑"""
- processed_months = set()
- # target_months = [(2023, 5), (2023, 4)]
- page = 1
- for year, month in target_months:
- print(f"\n开始处理 {year}年{month}月数据".center(50, "="))
- WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- )
- current_page = 1
- found_tables = 0
- while True:
- # 智能等待页面稳定
- random_sleep(base=2, variance=3)
- try:
- print(f"当前页面:{driver.current_url}, 第{page}页")
- # 处理当前页面的表格数据
- found = process_month_data(driver, year, month)
- found_tables += found
- # 完成四个表格采集
- if found_tables >= 3:
- print(f"已完成{year}年{month}月全部表格采集")
- processed_months.add((year, month))
- break
- print(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集")
- # 分页操作(增强定位稳定性)
- WebDriverWait(driver, 15).until(
- EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]'))
- ).click()
- current_page += 1
- page += 1
- except TimeoutException:
- print(f"未找到更多分页,已采集表格数:{found_tables}/3")
- break
- except Exception as e:
- print(f"分页异常:{str(e)}")
- handle_retry(driver) # 异常恢复函数
- break
- return processed_months
- def extract_page_date(driver):
- """增强型页面日期提取"""
- try:
- date_str = WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- ).get_attribute("innerHTML")
- match = re.search(r"(\d{4})年(\d{1,2})月", date_str)
- return int(match.group(1)), int(match.group(2))
- except:
- return datetime.now().year, datetime.now().month
- def random_sleep(base=2, variance=5):
- """智能随机等待"""
- sleep_time = base + random.random() * variance
- time.sleep(sleep_time)
- def handle_retry(driver):
- """异常恢复处理"""
- try:
- driver.refresh()
- WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- )
- print("浏览器异常已恢复")
- except:
- print("需要人工干预的严重错误")
- raise
- def main():
- """主入口(优化参数处理逻辑)"""
- parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
- parser.add_argument('--year', type=int, default=None,
- help='终止年份(如2023),未指定时抓取最新两个月')
- args = parser.parse_args()
- driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
- try:
- # 智能检测最新有效月份
- valid_year, valid_month = detect_latest_month(driver)
- print(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月")
- # 生成目标序列
- if args.year:
- # 指定年份时:从最新月到目标年1月
- target_months = generate_month_sequence(
- start_year=valid_year,
- start_month=valid_month,
- end_year=args.year,
- skip_january=True
- )
- else:
- # 未指定年份时:取最近两个月
- target_months = generate_month_sequence(valid_year, valid_month)
- print(f"目标采集月份序列:{target_months}")
- reverse_crawler(driver, target_months)
- print(f"{len(target_months)}个月份数据已采集完毕")
- finally:
- driver.quit()
- print("\n数据清洗入库中...")
- traverse_and_process(download_dir, parse_excel, province_name="henan")
- if __name__ == "__main__":
- main()
|