123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import random
- import re
- import time
- from pathlib import Path
- from faker import Faker
- from playwright.sync_api import sync_playwright
- TARGET_TABLES = [
- "(1)2025年进出口商品总值表 A:年度表",
- "(1)2025年进出口商品总值表 B:月度表",
- "(2)2025年进出口商品国别(地区)总值表",
- "(4)2025年进出口商品类章总值表",
- "(8)2025年进出口商品收发货人所在地总值表",
- "(15)2025年对部分国家(地区)出口商品类章金额表",
- "(16)2025年自部分国家(地区)进口商品类章金额表"
- ]
- def generate_dynamic_fingerprint(page):
- """增强型浏览器指纹生成方案(整合网页1、8技术)"""
- # ...保持不变...
- def process_table_row(row):
- """动态元素处理方案(网页4建议)"""
- # 使用locator替代静态查询
- cells = row.locator('td').all()
- if len(cells) < 2:
- return None
- try:
- table_name = cells[0].inner_text(timeout=5000).strip()
- month_links = [
- (int(a.inner_text().replace('月', '')), a.get_attribute('href'))
- for a in cells[1].locator('a.blue').all()
- if a.is_visible()
- ]
- month_links.sort(reverse=True, key=lambda x: x[0])
- return (table_name, month_links)
- except Exception as e:
- print(f"行处理异常: {str(e)}")
- return None
- def download_excel(page, table_name, month_data):
- """优化后的下载方法(整合网页6、7、8方案)"""
- max_month, max_link = month_data
- safe_name = re.sub(r'[\\/*?:"<>|]', "", table_name).replace(' ', '_')
- try:
- # 直接访问下载链接(网页6技术)
- with page.expect_download() as download_info:
- page.goto(f"http://www.customs.gov.cn{max_link}",
- wait_until="networkidle",
- timeout=60000)
- # 精准定位下载元素(适配新页面结构)
- download_btn = page.locator('span.easysite-isprase a[href$=".xls"], span.easysite-isprase a[href$=".xlsx"]')
- download_btn.click(timeout=15000)
- download = download_info.value
- file_ext = download.suggested_filename.split('.')[-1]
- file_name = f"{safe_name}_{max_month}月.{file_ext}"
- # 创建下载目录
- download_path = Path('../src/downloads') / f"{time.strftime('%Y%m%d')}"
- download_path.mkdir(parents=True, exist_ok=True)
- # 保存文件(网页8方案)
- final_path = download_path / file_name
- download.save_as(final_path)
- print(f"√ 成功下载: {file_name}")
- # 返回原始页面(关键修复点)
- page.go_back()
- page.wait_for_load_state('networkidle')
- return final_path
- except Exception as e:
- print(f"× 下载失败 {table_name}: {str(e)}")
- page.screenshot(path=f'error_{safe_name}.png')
- raise
- def crawl_with_fingerprint(url):
- with sync_playwright() as p:
- browser = p.firefox.launch(
- headless=True,
- args=[
- '--disable-blink-features=AutomationControlled',
- '--lang=zh-CN',
- '--window-size=1440,900'
- ]
- )
- context = browser.new_context(
- user_agent=Faker().firefox(),
- viewport={'width': 1440, 'height': 900},
- device_scale_factor=1,
- accept_downloads=True, # 关键参数(网页7建议)
- extra_http_headers={
- "Host": "www.customs.gov.cn",
- "Accept-Language": "zh-CN,zh;q=0.9"
- }
- )
- try:
- page = context.new_page()
- page.add_init_script("""
- Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
- window.alert = () => {};
- """)
- # 优化后的表格处理流程
- page.goto(url, wait_until="networkidle", timeout=60000)
- rows_locator = page.locator('#yb2025RMB tr')
- for i in range(1, rows_locator.count()):
- row = rows_locator.nth(i)
- if not row.is_visible():
- continue
- result = process_table_row(row)
- if not result: continue
- table_name, month_links = result
- if table_name not in TARGET_TABLES: continue
- if not month_links: continue
- try:
- download_excel(page, table_name, month_links[0])
- time.sleep(random.uniform(2, 5)) # 随机等待
- except Exception as e:
- print(f"表格处理中断: {str(e)}")
- break
- # 释放元素引用(关键修复点)
- row.evaluate('element => element.remove()')
- finally:
- context.close()
- browser.close()
- if __name__ == "__main__":
- Path('../src/downloads').mkdir(exist_ok=True)
- crawl_with_fingerprint("http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html")
|