123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import os
- import random
- import re
- import time
- from pathlib import Path
- from faker import Faker
- from selenium import webdriver
- from selenium.common import StaleElementReferenceException
- from selenium.webdriver import FirefoxOptions, ActionChains
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from utils.constants import DOWNLOAD_DIR
- from utils.download_utils import configure_stealth_options, wait_for_download, download_excel
- from selenium.webdriver.common.by import By
- from utils.log import log
- YEAR = 2025
- TARGET_TABLES = [
- f"(2){YEAR}年进出口商品国别(地区)总值表",
- f"(4){YEAR}年进出口商品类章总值表",
- f"(8){YEAR}年进出口商品收发货人所在地总值表",
- f"(15){YEAR}年对部分国家(地区)出口商品类章金额表",
- f"(16){YEAR}年自部分国家(地区)进口商品类章金额表"
- ]
- base_url = "http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html"
- download_dir = DOWNLOAD_DIR / "total"
- def process_table_row(row):
- """动态处理表格行数据(Selenium语法)"""
- try:
- # 获取所有表格单元格(td)元素
- cells = row.find_elements(By.TAG_NAME, 'td')
- if len(cells) < 2:
- return None
- # 获取表格名
- table_name = cells[0].text.strip()
- # 获取第二列中的所有链接,提取月份和href
- month_links = []
- links = cells[1].find_elements(By.TAG_NAME, 'a')
- for a in links:
- # 获取文本并去掉‘月’
- month_text = a.text
- if '月' in month_text:
- month = int(month_text.replace('月', '').strip())
- href = a.get_attribute('href')
- if href:
- month_links.append((month, href))
- # 按月份升序排列(1-12月)
- month_links.sort(key=lambda x: x[0], reverse=True)
- return (table_name, month_links)
- except Exception as e:
- log.info(f"表格行处理异常: {str(e)}")
- return None
- def download_monthly_data(driver, table_name, month_data):
- """Selenium版单月数据下载[6,8](@ref)"""
- month_num, link = month_data
- safe_name = re.sub(r'[\\/*?:"<>|]', "", table_name).replace(' ', '_')
- try:
- # 执行下载操作
- driver.get(f"{link}")
- download_btn = WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.CSS_SELECTOR,
- 'span.easysite-isprase a[href$=".xls"], span.easysite-isprase a[href$=".xlsx"]'))
- )
- # log.info(f"excel链接:{download_btn.get_attribute("outerHTML")}")
- ActionChains(driver).move_to_element(download_btn).click().perform()
- # 等待下载完成
- downloaded_file = wait_for_download(download_dir)
- # 文件整理
- target_dir = Path(f"{download_dir}/{YEAR}/{month_data:02d}月")
- target_dir.mkdir(parents=True, exist_ok=True)
- # 构造最终文件路径
- final_path = target_dir / f"{safe_name}{downloaded_file.suffix}"
- # 覆盖处理逻辑
- if final_path.exists():
- try:
- os.remove(final_path) # 删除已有文件
- # log.info(f"检测到旧文件,已删除:{final_path}")
- except Exception as e:
- log.info(f"文件删除失败:{str(e)}")
- raise
- downloaded_file.rename(final_path)
- log.info(f"√ 成功下载:{final_path}")
- return True
- except Exception as e:
- log.info(f"× 下载失败 {table_name} {month_num}月:{str(e)}")
- driver.save_screenshot(f'error_{safe_name}_{month_num:02d}.png')
- return False
- def crawl_with_selenium(url):
- driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
- try:
- driver.execute_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- });
- window.alert = () => {};
- """)
- driver.get(url)
- WebDriverWait(driver, 30).until(
- lambda d: d.execute_script("return document.readyState === 'complete'")
- )
- while True:
- # 动态获取当前有效行(每次循环重新查询)
- try:
- table = WebDriverWait(driver, 20).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, f"#yb{YEAR}RMB"))
- )
- current_rows = table.find_elements(By.CSS_SELECTOR, "tr:not(:first-child)")
- if not current_rows:
- log.info("所有表格处理完成")
- break
- # 仅处理当前首行(避免批量失效)
- row = current_rows[0]
- result = process_table_row(row)
- if result and result[0] in TARGET_TABLES:
- table_name, month_links = result
- log.info(f"\n开始处理表格:{table_name}")
- # 处理月份数据
- handle_month_data(driver, table_name, month_links)
- # 删除已处理行并验证DOM更新
- driver.execute_script("arguments[0].remove()", row)
- WebDriverWait(driver, 10).until(
- EC.staleness_of(row)
- )
- time.sleep(random.uniform(1, 3)) # 下载间隔
- except StaleElementReferenceException:
- log.info("检测到元素失效,自动刷新表格")
- driver.refresh()
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, f"#yb{YEAR}RMB"))
- )
- finally:
- driver.quit()
- def handle_month_data(driver, table_name, month_links):
- main_window = driver.current_window_handle
- for idx, month_data in enumerate(month_links):
- if 1 <= month_data[0] <= 12:
- # 新标签页策略(防止主页面DOM变更)
- driver.switch_to.window(main_window)
- driver.execute_script(f"window.open('{month_data[1]}', '_blank_{idx}')")
- driver.switch_to.window(driver.window_handles[-1])
- month_num, link = month_data
- try:
- download_excel(driver, link, YEAR, month_num, table_name, download_dir)
- except Exception as e:
- log.info(f"【异常】下载失败: {str(e)}")
- time.sleep(random.uniform(0.5, 1.5)) # 下载间隔
- if __name__ == "__main__":
- log.info("【海关总署】全年数据抓取开始".center(66, "*"))
- crawl_with_selenium(base_url)
- log.info("【海关总署】全年数据抓取结束".center(66, "*"))
|