123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- import os
- import random
- import re
- import time
- from pathlib import Path
- from urllib.parse import urljoin
- import pandas as pd
- import requests
- from faker import Faker
- from selenium import webdriver
- from selenium.webdriver import FirefoxOptions, ActionChains
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- YEAR_MONTH = "2025年3月"
- TARGET_TITLES = [
- f"{YEAR_MONTH}山东省进口20位主要商品总值",
- f"{YEAR_MONTH}山东省出口20位主要商品总值",
- f"{YEAR_MONTH}山东省各地市进出口总值",
- f"{YEAR_MONTH}山东省进出口主要国别(地区)总值"
- ]
- URL = "http://qingdao.customs.gov.cn/qingdao_customs/406535/fdzdgknr30/406514/406515/index.html"
- def process_table_row(row):
- """动态处理表格行数据(Selenium语法)"""
- try:
- # 获取所有表格单元格(td)元素
- cells = row.find_elements(By.TAG_NAME, 'td')
- if len(cells) < 2:
- return None
- # 获取表格名
- table_name = cells[0].text.strip()
- # 获取第二列中的所有链接,提取月份和href
- month_links = []
- links = cells[1].find_elements(By.TAG_NAME, 'a')
- for a in links:
- # 获取文本并去掉‘月’
- month_text = a.text
- if '月' in month_text:
- month = int(month_text.replace('月', '').strip())
- href = a.get_attribute('href')
- if href:
- month_links.append((month, href))
- # 按月份升序排列(1-12月)
- month_links.sort(key=lambda x: x[0])
- return (table_name, month_links)
- except Exception as e:
- print(f"表格行处理异常: {str(e)}")
- return None
- def configure_stealth_options():
- """增强型反检测配置[1,4](@ref)"""
- opts = FirefoxOptions()
- download_dir = os.path.abspath(os.path.join('../../downloads', "2025"))
- print("当前下载路径:", Path(download_dir).resolve())
- # 文件下载配置
- opts.set_preference("browser.download.dir", download_dir)
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.download.manager.showWhenStarting", False)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
- "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
- opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
- opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
- # 反检测参数
- opts.set_preference("dom.webdriver.enabled", False)
- opts.set_preference("useAutomationExtension", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- # 动态指纹
- fake = Faker()
- opts.set_preference("general.useragent.override", fake.firefox())
- opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
- # 视口配置
- opts.add_argument("--width=1440")
- opts.add_argument("--height=900")
- opts.add_argument("--headless")
- return opts
- def find_target_links(driver):
- """定位目标列表项(网页7、8的XPath文本定位方案)"""
- WebDriverWait(driver, 20).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- )
- targets = []
- # 使用XPath精准匹配标题文本
- for title in TARGET_TITLES:
- xpath = f'//ul[@class="conList_ul"]//a[@title="{title}"]'
- link = WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.XPATH, xpath))
- ).get_attribute("href")
- targets.append((title, link))
- time.sleep(random.uniform(1, 3)) # 随机延迟防检测
- return targets
- def wait_for_download_complete(download_dir, timeout=60): # 延长超时时间至60秒
- temp_extensions = ('.part', '.crdownload')
- start_time = time.time()
- while (time.time() - start_time) < timeout:
- current_files = set(Path(download_dir).rglob('*'))
- # 过滤临时文件和未完成下载的文件
- valid_files = {f for f in current_files if not f.name.endswith(temp_extensions)}
- if valid_files:
- try:
- newest_file = max(valid_files, key=lambda f: f.stat().st_ctime)
- with newest_file.open('rb') as test_file: # 尝试读取文件
- return newest_file
- except (PermissionError, IOError):
- continue # 文件仍被占用或未完成写入
- time.sleep(1)
- raise TimeoutError("文件下载超时")
- def read_remote_excel(url):
- try:
- # 发送HTTP请求获取文件流
- response = requests.get(url, timeout=30)
- response.raise_for_status() # 检查状态码
- # 将二进制流转换为DataFrame
- excel_data = pd.read_excel(
- io=response.content,
- engine='openpyxl' # 必须指定引擎(网页6)
- )
- return excel_data
- except requests.exceptions.RequestException as e:
- print(f"远程读取失败: {str(e)}")
- return None
- def download_excel(title ,driver, url):
- """处理下载逻辑(网页7的新标签页策略)"""
- main_window = driver.current_window_handle
- # 新标签页打开(避免主页面DOM变化)
- driver.execute_script(f"window.open('{url}')")
- driver.switch_to.window(driver.window_handles[-1])
- try:
- # 等待下载按钮出现
- excel_link = WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.XPATH, '//a[text()="表格下载" and contains(@href, ".xls")]'))
- )
- # 获取相对路径并转换为绝对URL(关键步骤[2,7](@ref))
- relative_url = excel_link.get_attribute("href")
- base_url = "http://qingdao.customs.gov.cn" # 根据实际情况调整
- absolute_url = urljoin(base_url, relative_url)
- print(f"数据:{read_remote_excel(absolute_url)}")
- finally:
- driver.close()
- driver.switch_to.window(main_window)
- def crawl_with_selenium(url):
- driver = webdriver.Firefox(options=configure_stealth_options())
- try:
- # 注入反检测脚本
- driver.execute_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- });
- window.alert = () => {};
- """)
- # 页面加载策略[7,8](@ref)
- driver.get(url)
- # 获取目标链接
- targets = find_target_links(driver)
- # 遍历下载
- for title, url in targets:
- print(f"正在处理:{title}")
- download_excel(title, driver, url)
- time.sleep(random.randint(5, 10)) # 大间隔防封禁
- finally:
- driver.quit()
- if __name__ == "__main__":
- crawl_with_selenium(URL)
- print(f"山东省{YEAR_MONTH}下载任务已完成")
|