123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- import os
- import random
- import re
- import subprocess
- import time
- from pathlib import Path
- from urllib.parse import urljoin
- from faker import Faker
- from selenium import webdriver
- from selenium.common.exceptions import StaleElementReferenceException
- from selenium.webdriver import FirefoxOptions
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- import gov_commodity_zhejiang_city
- import gov_commodity_zhejiang_country
- import gov_commodity_zhejiang_import_export
- from utils import base_country_code, base_mysql
- download_dir = base_country_code.download_dir
- Path(download_dir).mkdir(parents=True, exist_ok=True)
- def configure_stealth_options():
- """增强型反检测配置[1,4](@ref)"""
- opts = FirefoxOptions()
- print("当前下载路径:", Path(download_dir).resolve())
- # 文件下载配置
- opts.set_preference("browser.download.dir", download_dir)
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.download.manager.showWhenStarting", False)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
- "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
- opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
- opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
- # 反检测参数
- opts.set_preference("dom.webdriver.enabled", False)
- opts.set_preference("useAutomationExtension", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- # 动态指纹
- fake = Faker()
- opts.set_preference("general.useragent.override", fake.firefox())
- opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
- # 视口配置
- opts.add_argument("--width=1440")
- opts.add_argument("--height=900")
- opts.add_argument("--headless")
- return opts
- def crawl_by_year_tabs(driver, base_url):
- """按年份Tab导航爬取数据"""
- years = ['2023年', '2024年', '2025年']
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- year_tabs = driver.find_elements(By.XPATH, '//ul[@class="nav_sj"]//li//a')
- for tab in year_tabs:
- year_text = tab.text.strip()
- if int(year_text[:4]) <= 2022:
- print(f"{year_text} 后的数据无需下载")
- continue
- year_url = tab.get_attribute("href")
- if not year_url.startswith(('http://', 'https://')):
- year_url = base_url.split('//')[0] + '//' + base_url.split('/')[2] + year_url
- # 新标签页打开年份页面
- driver.execute_script("window.open(arguments[0]);", year_url)
- driver.switch_to.window(driver.window_handles[-1])
- print(f"\n正在处理 {year_text} 年份页面")
- process_month_tabs(driver, year_text, base_url)
- # 返回主窗口
- driver.close()
- driver.switch_to.window(driver.window_handles[0])
- def process_month_tabs(driver, year, base_url):
- """处理月份Tab导航(动态获取真实存在的月份)"""
- # ✅ 显式等待容器加载
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- target_months = ['一月', '二月', '三月', '四月', '五月', '六月',
- '七月', '八月', '九月', '十月', '十一月', '十二月']
- processed_months = set() # 已处理月份记录
- retry_count = 0
- # while retry_count < 3: # 最多重试3次
- while True: # 最多重试3次
- try:
- # 全量获取所有月份Tab
- month_items = driver.find_elements(By.XPATH, '//ul[@class="nav_tab"]//li')
- if not month_items:
- print(f"{year}年没有月份Tab,停止处理")
- break
- all_found = True
- month_text = ''
- found = False
- for i,item in enumerate(month_items):
- a_tag = item.find_element(By.XPATH, './/a')
- month_text = a_tag.text.strip()
- if month_text in processed_months:
- continue
- if not month_text in target_months:
- continue # 跳过已处理月份
- print(f"点击月份Tab:{year}-{month_text}")
- a_tag.click()
- # 处理详情页逻辑
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- detail_link_arr = get_behind_detail_link(driver, base_url)
- if not detail_link_arr:
- print(f"{year}-{month_text} 未找到详情链接")
- for detail_link in detail_link_arr:
- print(f"{year}-{month_text} 详情链接:{detail_link}")
- driver.get(detail_link)
- download_file_from_detail_page(driver)
- driver.back()
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- processed_months.add(month_text)
- found = True
- if not found:
- print(f"{year}年未找到 {month_text} Tab")
- all_found = False
- if all_found:
- print(f"{year}年所有目标月份处理完成")
- break
- else:
- # 部分月份未找到,重新获取元素
- # retry_count += 1
- print(f"第 {retry_count} 次重试获取月份Tab...")
- time.sleep(2)
- except StaleElementReferenceException:
- print("页面刷新,重新获取月份Tab列表...")
- # retry_count += 1
- time.sleep(2)
- print(f"{year}年最终处理的月份:{processed_months}")
- def get_behind_detail_link(driver, base_url):
- """获取点击月份Tab后 conList_ul 下所有 li 的 a 标签完整链接"""
- href_arr = []
- try:
- elements = WebDriverWait(driver, 30).until(
- EC.element_to_be_clickable((By.XPATH, '//ul[@class="conList_ul"]/li/a'))
- )
- elements = elements.find_elements(By.XPATH, '//ul[@class="conList_ul"]/li/a')
- for element in elements:
- href = element.get_attribute("href")
- full_url = urljoin(base_url, href) # 自动处理相对路径
- href_arr.append(full_url)
- return href_arr
- except Exception as e:
- print(f"获取详情链接失败: {str(e)}")
- return []
- def download_file_from_detail_page(driver):
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- try:
- elements = driver.find_elements(By.XPATH, '//div[@class="easysite-news-content"]//div[@id="easysiteText"]//p//a')
- if not elements:
- print("详情页未找到目标文件链接")
- return
- for download_btn in elements:
- file_name = download_btn.text.strip()
- if not file_name:
- continue
- file_url = download_btn.get_attribute("href")
- if not file_url.lower().endswith(('.xls', '.xlsx')):
- print(f"跳过非 Excel 文件: {file_url}")
- continue
- print(f"正在下载: {file_name} → {file_url}")
- # 记录下载前的文件列表
- existing_files = set(f.name for f in Path(download_dir).glob('*'))
- # 随机点击延迟
- time.sleep(random.uniform(1, 3))
- download_btn.click()
- downloaded_file = wait_for_download_complete(existing_files=existing_files)
- year, start_month, month = extract_year_and_month(file_name)
- final_path = Path(download_dir) / year / month / f"{file_name}"
- if os.path.exists(final_path):
- print(f"文件已存在:{file_name} 正在覆盖...")
- os.unlink(final_path)
- final_dir = Path(download_dir) / year / month
- final_dir.mkdir(parents=True, exist_ok=True)
- print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
- downloaded_file.rename(final_path)
- print(f"√ 下载成功:{final_path}")
- except Exception as e:
- print(f"详情页处理异常: {str(e)}")
- def extract_year_and_month(file_name):
- # 支持两种格式:
- # - 2025年1-2月xxx
- # - 2025年3月xxx
- match = re.search(r"(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name)
- if match:
- year = match.group(1)
- start_month = match.group(2)
- end_month = match.group(3) if match.group(3) else start_month
- return year, start_month.zfill(2), end_month.zfill(2)
- else:
- raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
- def extract_rar(rar_path, extract_to):
- """备用解压函数(当 rarfile 失效时使用)"""
- winrar_path = r"C:\Program Files\WinRAR\Rar.exe" # 推荐使用 Rar.exe 而非 WinRAR.exe
- cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
- # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
- creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
- result = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- creationflags=creationflags # 关键点:隐藏窗口
- )
- if result.returncode == 0:
- print(f"解压成功: {rar_path} → {extract_to}")
- return True
- else:
- print(f"解压失败: {result.stderr.decode('gbk')}")
- return False
- def crawl_with_selenium(url):
- driver = webdriver.Firefox(options=configure_stealth_options())
- base_url = 'http://hangzhou.customs.gov.cn'
- try:
- # 注入反检测脚本
- driver.execute_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- });
- window.alert = () => {};
- """)
- # 页面加载策略
- driver.get(url)
- # 按年份导航
- crawl_by_year_tabs(driver, base_url)
- finally:
- driver.quit()
- def wait_for_download_complete(timeout=30, existing_files=None):
- """
- 监控下载目录,等待文件下载完成并返回新下载的文件。
- :param timeout: 超时时间(秒)
- :param existing_files: 下载前已存在的文件列表
- :return: 新下载的文件路径
- """
- start_time = time.time()
- temp_exts = ('.part', '.crdownload')
- if existing_files is None:
- existing_files = set(f.name for f in Path(download_dir).glob('*'))
- while (time.time() - start_time) < timeout:
- # 获取有效文件列表
- valid_files = []
- for f in Path(download_dir).glob('*'):
- if (f.name not in existing_files and
- not f.name.endswith(temp_exts) and
- f.stat().st_size > 0):
- valid_files.append(f)
- # 等待最新文件稳定
- if valid_files:
- return max(valid_files, key=lambda x: x.stat().st_mtime)
- time.sleep(2)
- raise TimeoutError("文件下载超时")
- def hierarchical_traversal(root_path):
- """分层遍历:省份->年份->月目录"""
- root = Path(root_path)
- # 获取所有年份目录
- year_dirs = [
- item for item in root.iterdir()
- if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
- ]
- # 按年倒序
- for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
- # 构造完整的路径:download/shandong/2025/03
- print(f"\n年份:{year_dir.name} | 省份:jiangsu")
- # 提取月份目录
- month_dirs = []
- for item in year_dir.iterdir():
- if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
- month_dirs.append({
- "path": item,
- "month": int(item.name)
- })
- # 按月倒序输出
- if month_dirs:
- for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
- print(f" 月份:{md['month']:02d} | 路径:{md['path']}")
- gov_commodity_zhejiang_import_export.process_folder(md['path'])
- gov_commodity_zhejiang_country.process_folder(md['path'])
- gov_commodity_zhejiang_city.process_folder(md['path'])
- if __name__ == "__main__":
- crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html')
- print(f"浙江杭州海关全量数据下载任务完成")
- # 等待5s后执行
- time.sleep(5)
- hierarchical_traversal(base_country_code.download_dir)
- print("浙江杭州海关类章、国家、城市所有文件处理完成!")
- time.sleep(5)
- base_mysql.update_january_yoy('浙江省')
- base_mysql.update_shandong_yoy('浙江省')
- print("浙江杭州海关城市同比sql处理完成")
|