import os import random import re import subprocess import time from pathlib import Path from urllib.parse import urljoin from faker import Faker from selenium import webdriver from selenium.common.exceptions import StaleElementReferenceException from selenium.webdriver import FirefoxOptions from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait import gov_commodity_zhejiang_city import gov_commodity_zhejiang_country import gov_commodity_zhejiang_import_export from utils import base_country_code, base_mysql download_dir = base_country_code.download_dir Path(download_dir).mkdir(parents=True, exist_ok=True) def configure_stealth_options(): """增强型反检测配置[1,4](@ref)""" opts = FirefoxOptions() print("当前下载路径:", Path(download_dir).resolve()) # 文件下载配置 opts.set_preference("browser.download.dir", download_dir) opts.set_preference("browser.download.folderList", 2) opts.set_preference("browser.download.manager.showWhenStarting", False) opts.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型 opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口 opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示 # 反检测参数 opts.set_preference("dom.webdriver.enabled", False) opts.set_preference("useAutomationExtension", False) opts.add_argument("--disable-blink-features=AutomationControlled") # 动态指纹 fake = Faker() opts.set_preference("general.useragent.override", fake.firefox()) opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9") # 视口配置 opts.add_argument("--width=1440") opts.add_argument("--height=900") opts.add_argument("--headless") return opts def crawl_by_year_tabs(driver, base_url): """按年份Tab导航爬取数据""" years = ['2023年', '2024年', '2025年'] WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "portlet")) ) year_tabs = driver.find_elements(By.XPATH, '//ul[@class="nav_sj"]//li//a') for tab in year_tabs: year_text = tab.text.strip() if year_text not in years: continue year_url = tab.get_attribute("href") if not year_url.startswith(('http://', 'https://')): year_url = base_url.split('//')[0] + '//' + base_url.split('/')[2] + year_url # 新标签页打开年份页面 driver.execute_script("window.open(arguments[0]);", year_url) driver.switch_to.window(driver.window_handles[-1]) print(f"\n正在处理 {year_text} 年份页面") process_month_tabs(driver, year_text, base_url) # 返回主窗口 driver.close() driver.switch_to.window(driver.window_handles[0]) def process_month_tabs(driver, year, base_url): """处理月份Tab导航(动态获取真实存在的月份)""" # ✅ 显式等待容器加载 WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "portlet")) ) target_months = ['一月', '二月', '三月', '四月', '五月', '六月', '七月', '八月', '九月', '十月', '十一月', '十二月'] processed_months = set() # 已处理月份记录 retry_count = 0 # while retry_count < 3: # 最多重试3次 while True: # 最多重试3次 try: # 全量获取所有月份Tab month_items = driver.find_elements(By.XPATH, '//ul[@class="nav_tab"]//li') if not month_items: print(f"{year}年没有月份Tab,停止处理") break all_found = True month_text = '' found = False for i,item in enumerate(month_items): a_tag = item.find_element(By.XPATH, './/a') month_text = a_tag.text.strip() if month_text in processed_months: continue if not month_text in target_months: continue # 跳过已处理月份 print(f"点击月份Tab:{year}-{month_text}") a_tag.click() # 处理详情页逻辑 WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "portlet")) ) detail_link_arr = get_behind_detail_link(driver, base_url) if not detail_link_arr: print(f"{year}-{month_text} 未找到详情链接") for detail_link in detail_link_arr: print(f"{year}-{month_text} 详情链接:{detail_link}") driver.get(detail_link) download_file_from_detail_page(driver) driver.back() WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "portlet")) ) processed_months.add(month_text) found = True if not found: print(f"{year}年未找到 {month_text} Tab") all_found = False if all_found: print(f"{year}年所有目标月份处理完成") break else: # 部分月份未找到,重新获取元素 # retry_count += 1 print(f"第 {retry_count} 次重试获取月份Tab...") time.sleep(2) except StaleElementReferenceException: print("页面刷新,重新获取月份Tab列表...") # retry_count += 1 time.sleep(2) print(f"{year}年最终处理的月份:{processed_months}") def get_behind_detail_link(driver, base_url): """获取点击月份Tab后 conList_ul 下所有 li 的 a 标签完整链接""" href_arr = [] try: elements = WebDriverWait(driver, 30).until( EC.element_to_be_clickable((By.XPATH, '//ul[@class="conList_ul"]/li/a')) ) elements = elements.find_elements(By.XPATH, '//ul[@class="conList_ul"]/li/a') for element in elements: href = element.get_attribute("href") full_url = urljoin(base_url, href) # 自动处理相对路径 href_arr.append(full_url) return href_arr except Exception as e: print(f"获取详情链接失败: {str(e)}") return [] def download_file_from_detail_page(driver): WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "portlet")) ) try: elements = driver.find_elements(By.XPATH, '//div[@class="easysite-news-content"]//div[@id="easysiteText"]//p//a') if not elements: print("详情页未找到目标文件链接") return for download_btn in elements: file_name = download_btn.text.strip() if not file_name: continue file_url = download_btn.get_attribute("href") if not file_url.lower().endswith(('.xls', '.xlsx')): print(f"跳过非 Excel 文件: {file_url}") continue print(f"正在下载: {file_name} → {file_url}") # 记录下载前的文件列表 existing_files = set(f.name for f in Path(download_dir).glob('*')) # 随机点击延迟 time.sleep(random.uniform(1, 3)) download_btn.click() downloaded_file = wait_for_download_complete(existing_files=existing_files) year, start_month, month = extract_year_and_month(file_name) final_path = Path(download_dir) / year / month / f"{file_name}" if os.path.exists(final_path): print(f"文件已存在:{file_name} 正在覆盖...") os.unlink(final_path) final_dir = Path(download_dir) / year / month final_dir.mkdir(parents=True, exist_ok=True) print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}") downloaded_file.rename(final_path) print(f"√ 下载成功:{final_path}") except Exception as e: print(f"详情页处理异常: {str(e)}") def extract_year_and_month(file_name): # 支持两种格式: # - 2025年1-2月xxx # - 2025年3月xxx match = re.search(r"(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name) if match: year = match.group(1) start_month = match.group(2) end_month = match.group(3) if match.group(3) else start_month return year, start_month.zfill(2), end_month.zfill(2) else: raise ValueError(f"无法从文件名中提取年份和月份:{file_name}") def extract_rar(rar_path, extract_to): """备用解压函数(当 rarfile 失效时使用)""" winrar_path = r"C:\Program Files\WinRAR\Rar.exe" # 推荐使用 Rar.exe 而非 WinRAR.exe cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)] # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口 creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, creationflags=creationflags # 关键点:隐藏窗口 ) if result.returncode == 0: print(f"解压成功: {rar_path} → {extract_to}") return True else: print(f"解压失败: {result.stderr.decode('gbk')}") return False def crawl_with_selenium(url): driver = webdriver.Firefox(options=configure_stealth_options()) base_url = 'http://hangzhou.customs.gov.cn' try: # 注入反检测脚本 driver.execute_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.alert = () => {}; """) # 页面加载策略 driver.get(url) # 按年份导航 crawl_by_year_tabs(driver, base_url) finally: driver.quit() def wait_for_download_complete(timeout=30, existing_files=None): """ 监控下载目录,等待文件下载完成并返回新下载的文件。 :param timeout: 超时时间(秒) :param existing_files: 下载前已存在的文件列表 :return: 新下载的文件路径 """ start_time = time.time() temp_exts = ('.part', '.crdownload') if existing_files is None: existing_files = set(f.name for f in Path(download_dir).glob('*')) while (time.time() - start_time) < timeout: # 获取有效文件列表 valid_files = [] for f in Path(download_dir).glob('*'): if (f.name not in existing_files and not f.name.endswith(temp_exts) and f.stat().st_size > 0): valid_files.append(f) # 等待最新文件稳定 if valid_files: return max(valid_files, key=lambda x: x.stat().st_mtime) time.sleep(2) raise TimeoutError("文件下载超时") def hierarchical_traversal(root_path): """分层遍历:省份->年份->月目录""" root = Path(root_path) # 获取所有年份目录 year_dirs = [ item for item in root.iterdir() if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name) ] # 按年倒序 for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True): # 构造完整的路径:download/shandong/2025/03 print(f"\n年份:{year_dir.name} | 省份:jiangsu") # 提取月份目录 month_dirs = [] for item in year_dir.iterdir(): if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name): month_dirs.append({ "path": item, "month": int(item.name) }) # 按月倒序输出 if month_dirs: for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True): print(f" 月份:{md['month']:02d} | 路径:{md['path']}") gov_commodity_zhejiang_import_export.process_folder(md['path']) gov_commodity_zhejiang_country.process_folder(md['path']) gov_commodity_zhejiang_city.process_folder(md['path']) if __name__ == "__main__": crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html') print(f"浙江杭州海关全量数据下载任务完成") # 等待5s后执行 time.sleep(5) hierarchical_traversal(base_country_code.download_dir) print("浙江杭州海关类章、国家、城市所有文件处理完成!") time.sleep(5) base_mysql.update_january_yoy('浙江省') base_mysql.update_shandong_yoy('浙江省') print("浙江杭州海关城市同比sql处理完成")