123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- import os
- import re
- import time
- from pathlib import Path
- from faker import Faker
- from selenium import webdriver
- from selenium.common import StaleElementReferenceException
- from selenium.webdriver import FirefoxOptions, ActionChains
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- YEAR = 2025
- TARGET_TABLES = [
- f"(2){YEAR}年进出口商品国别(地区)总值表",
- f"(4){YEAR}年进出口商品类章总值表",
- f"(8){YEAR}年进出口商品收发货人所在地总值表",
- f"(15){YEAR}年对部分国家(地区)出口商品类章金额表",
- f"(16){YEAR}年自部分国家(地区)进口商品类章金额表"
- ]
- def wait_for_download_complete(download_dir, timeout=15):
- """监控下载目录(包括子目录)变化实现下载等待"""
- initial_files = set(Path(download_dir).rglob('*')) # 使用 rglob 递归获取所有文件
- start_time = time.time()
- while (time.time() - start_time) < timeout:
- current_files = set(Path(download_dir).rglob('*')) # 同样使用 rglob 获取当前所有文件
- new_files = current_files - initial_files # 获取新增文件
- if new_files: # 如果有新文件
- return max(new_files, key=lambda f: f.stat().st_ctime) # 返回最新的下载文件
- time.sleep(1)
- raise TimeoutError("文件下载超时")
- from selenium.webdriver.common.by import By
- def process_table_row(row):
- """动态处理表格行数据(Selenium语法)"""
- try:
- # 获取所有表格单元格(td)元素
- cells = row.find_elements(By.TAG_NAME, 'td')
- if len(cells) < 2:
- return None
- # 获取表格名
- table_name = cells[0].text.strip()
- # 获取第二列中的所有链接,提取月份和href
- month_links = []
- links = cells[1].find_elements(By.TAG_NAME, 'a')
- for a in links:
- # 获取文本并去掉‘月’
- month_text = a.text
- if '月' in month_text:
- month = int(month_text.replace('月', '').strip())
- href = a.get_attribute('href')
- if href:
- month_links.append((month, href))
- # 按月份升序排列(1-12月)
- month_links.sort(key=lambda x: x[0])
- return (table_name, month_links)
- except Exception as e:
- print(f"表格行处理异常: {str(e)}")
- return None
- def download_monthly_data(driver, table_name, month_data):
- """Selenium版单月数据下载[6,8](@ref)"""
- month_num, link = month_data
- safe_name = re.sub(r'[\\/*?:"<>|]', "", table_name).replace(' ', '_')
- try:
- download_dir = os.path.abspath(f"downloads/{YEAR}")
- # initial_files = set(download_dir.glob('*'))
- # 执行下载操作
- driver.get(f"{link}")
- download_btn = WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.CSS_SELECTOR,
- 'span.easysite-isprase a[href$=".xls"], span.easysite-isprase a[href$=".xlsx"]'))
- )
- # print(f"excel链接:{download_btn.get_attribute("outerHTML")}")
- ActionChains(driver).move_to_element(download_btn).click().perform()
- # 等待下载完成
- downloaded_file = wait_for_download_complete(download_dir)
- # 文件整理
- target_dir = Path(f"{download_dir}/{month_num:02d}月")
- target_dir.mkdir(parents=True, exist_ok=True)
- # 构造最终文件路径
- final_path = target_dir / f"{safe_name}{downloaded_file.suffix}"
- # 覆盖处理逻辑
- if final_path.exists():
- try:
- os.remove(final_path) # 删除已有文件
- # print(f"检测到旧文件,已删除:{final_path}")
- except Exception as e:
- print(f"文件删除失败:{str(e)}")
- raise
- downloaded_file.rename(final_path)
- print(f"√ 成功下载:{final_path}")
- return True
- except Exception as e:
- print(f"× 下载失败 {table_name} {month_num}月:{str(e)}")
- driver.save_screenshot(f'error_{safe_name}_{month_num:02d}.png')
- return False
- def configure_stealth_options():
- """增强型反检测配置[1,4](@ref)"""
- opts = FirefoxOptions()
- download_dir = os.path.abspath(f"downloads/{YEAR}")
- # 文件下载配置
- opts.set_preference("browser.download.dir", download_dir)
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.download.manager.showWhenStarting", False)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
- "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
- # 反检测参数
- opts.set_preference("dom.webdriver.enabled", False)
- opts.set_preference("useAutomationExtension", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- # 动态指纹
- fake = Faker()
- opts.set_preference("general.useragent.override", fake.firefox())
- opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
- # 视口配置
- opts.add_argument("--width=1440")
- opts.add_argument("--height=900")
- opts.add_argument("--headless")
- return opts
- def crawl_with_selenium(url):
- driver = webdriver.Firefox(options=configure_stealth_options())
- try:
- # 注入反检测脚本
- driver.execute_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- });
- window.alert = () => {};
- """)
- # 页面加载策略[7,8](@ref)
- driver.get(url)
- WebDriverWait(driver, 30).until(
- lambda d: d.execute_script("return document.readyState === 'complete'")
- )
- while True:
- # 动态获取当前有效行(每次循环重新查询)
- try:
- table = WebDriverWait(driver, 20).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, f"#yb{YEAR}RMB"))
- )
- current_rows = table.find_elements(By.CSS_SELECTOR, "tr:not(:first-child)")
- if not current_rows:
- print("所有表格处理完成")
- break
- # 仅处理当前首行(避免批量失效)
- row = current_rows[0]
- result = process_table_row(row)
- if result and result[0] in TARGET_TABLES:
- table_name, month_links = result
- print(f"\n开始处理表格:{table_name}")
- # 处理月份数据(关键修改点)
- handle_month_data(driver, table_name, month_links)
- # 删除已处理行并验证DOM更新
- driver.execute_script("arguments[0].remove()", row)
- WebDriverWait(driver, 10).until(
- EC.staleness_of(row) # 强制等待元素失效[2,7](@ref)
- )
- except StaleElementReferenceException:
- print("检测到元素失效,自动刷新表格")
- driver.refresh()
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, f"#yb{YEAR}RMB"))
- )
- finally:
- driver.quit()
- def handle_month_data(driver, table_name, month_links):
- main_window = driver.current_window_handle
- for idx, month_data in enumerate(month_links):
- if 1 <= month_data[0] <= 12:
- # 新标签页策略(防止主页面DOM变更)
- driver.switch_to.window(main_window)
- driver.execute_script(f"window.open('{month_data[1]}', '_blank_{idx}')")
- driver.switch_to.window(driver.window_handles[-1])
- # 下载逻辑
- try:
- if download_monthly_data(driver, table_name, month_data):
- print(f"{month_data[0]}月下载成功")
- finally:
- driver.close()
- driver.switch_to.window(main_window)
- WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, f"#yb{YEAR}RMB"))
- )
- if __name__ == "__main__":
- Path('downloads').mkdir(exist_ok=True)
- target_url = "http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html"
- crawl_with_selenium(target_url)
- print("全年数据下载任务已完成")
|