import os import random import re import subprocess import time import rarfile import shutil from pathlib import Path import sys from datetime import datetime, timedelta from faker import Faker from selenium import webdriver from selenium.webdriver import FirefoxOptions from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait import gov_commodity_jiangsu_country import gov_commodity_jiangsu_city import gov_commodity_jiangsu_import_export from utils import base_country_code, base_mysql from utils.log import log # rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe" rarfile.UNRAR_EXECUTABLE = "unrar" download_dir = base_country_code.download_dir Path(download_dir).mkdir(parents=True, exist_ok=True) def configure_stealth_options(): """增强型反检测配置[1,4](@ref)""" opts = FirefoxOptions() print("当前下载路径:", Path(download_dir).resolve()) # 文件下载配置 opts.set_preference("browser.download.dir", download_dir) opts.set_preference("browser.download.folderList", 2) opts.set_preference("browser.download.manager.showWhenStarting", False) opts.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型 opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口 opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示 # 反检测参数 opts.set_preference("dom.webdriver.enabled", False) opts.set_preference("useAutomationExtension", False) opts.add_argument("--disable-blink-features=AutomationControlled") # 动态指纹 fake = Faker() opts.set_preference("general.useragent.override", fake.firefox()) opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9") # 视口配置 opts.add_argument("--width=1440") opts.add_argument("--height=900") opts.add_argument("--headless") return opts def find_target_links(driver, year_month): """在当前页面找到符合 TARGET_TITLES 的文件并触发下载""" # 等待页面加载完成 WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "portlet")) ) try: # 使用 XPath 精准匹配标题文本 xpath = '//ul[@class="conList_ul"]//a[contains(@href, ".rar")]' # 检查页面中是否存在该 title 对应的元素 elements = driver.find_elements(By.XPATH, xpath) if not elements: return None # 用于记录已处理过的文件名(防止重复下载) processed_files = set() # 遍历所有链接并点击下载 for download_btn in elements: # 获取文件名(用于后续判断) file_name = download_btn.text.strip() log.info(f"正在下载: {file_name}") # 记录下载前的文件列表 existing_files = set(f.name for f in Path(download_dir).glob('*')) # 模拟点击 download_btn.click() time.sleep(random.uniform(1, 3)) # 等待文件下载完成 rar_files = wait_for_download_complete(existing_files=existing_files) if not rar_files: log.info("未找到新下载的 .rar 文件") continue downloaded_file = rar_files[0] if downloaded_file.suffix == '.rar': # 解压文件 with rarfile.RarFile(downloaded_file) as rf: # 获取压缩包中的第一个 .xls 文件 xls_files = [f for f in rf.namelist() if f.endswith('.xls') or f.endswith('.xlsx')] if not xls_files: log.info(f"压缩包 {downloaded_file.name} 中没有 .xls 文件") continue for xls_file in xls_files: if year_month is None: if xls_file.startswith('2022'): return 'stop' else: if not xls_file.startswith(year_month): log.info(f"非 {year_month} 文件: {file_name}, stop") return 'stop' if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file: log.info(f"检测到不需要的文件:{xls_file},跳过") continue # 解压到临时目录 temp_dir = Path(download_dir) / 'temp' temp_dir.mkdir(parents=True, exist_ok=True) if not extract_rar(downloaded_file, temp_dir): log.info(f"解压文件 {downloaded_file.name} 时发生错误") continue # 获取解压后的文件路径 match = re.search(r"(\d{4})年(\d{1,2})月", xls_file) if not match: raise ValueError(f"无效标题格式:{xls_file}") year = match.group(1) month = match.group(2).zfill(2) extracted_file = temp_dir / xls_file final_path = Path(download_dir) / year / month / extracted_file.name if os.path.exists(final_path): log.info(f"文件已存在:{extracted_file.name} 正在覆盖...") os.unlink(final_path) final_dir = Path(download_dir) / year / month final_dir.mkdir(parents=True, exist_ok=True) log.info(f"√ 正在移动文件 {extracted_file} 至 {final_path}") try: extracted_file.rename(final_path) log.info(f"√ 下载成功:{final_path}") except Exception as e: log.info(f"文件移动失败: {str(e)}") # 删除临时目录(无论是否为空) try: shutil.rmtree(temp_dir) # 替换 os.rmdir(temp_dir) except Exception as e: log.info(f"删除临时目录失败: {str(e)}") # 删除 .rar 文件 log.info(f"删除 .rar 文件:{downloaded_file}") os.unlink(downloaded_file) else: log.info(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理") # 将已处理的文件名加入集合 processed_files.add(file_name) return None except Exception as e: log.info(f"下载时发生异常: {str(e)}") def extract_rar(rar_path, extract_to): """备用解压函数(当 rarfile 失效时使用)""" # winrar_path = r"C:\Program Files\WinRAR\Rar.exe" # 推荐使用 Rar.exe 而非 WinRAR.exe # cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)] cmd = ["unrar", 'x', '-y', rar_path, str(extract_to)] # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口 creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, creationflags=creationflags # 关键点:隐藏窗口 ) if result.returncode == 0: log.info(f"解压成功: {rar_path} → {extract_to}") return True else: log.info(f"解压失败: {result.stderr.decode('gbk')}") return False def detect_latest_month(driver, url): driver.get(url) current_date = datetime.now() for offset in range(0, 3): check_date = current_date - timedelta(days=offset * 30) check_year = check_date.year check_month = check_date.month target_title = f"{check_year}年{check_month}月" try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]')) ) log.info(f"已找到最新月份数据 {check_year}-{check_month}") # 看是否已存表,已存则跳过; count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', '320000') if count > 0: log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过") continue return f"{check_year}年{check_month}月" except: log.info(f"未找到 {target_title}") continue log.info("三个月内未找到有效数据") return None def crawl_with_selenium(url, mark): driver = webdriver.Firefox(options=configure_stealth_options()) year_month = None if 'increment' == mark: res = detect_latest_month(driver, url) if res is None: log.info("江苏省海关没有最新数据更新") sys.exit(0) year_month = res print(f"检测到最新有效数据:{year_month}") try: # 注入反检测脚本 driver.execute_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.alert = () => {}; """) # 页面加载策略 driver.get(url) while True: # 访问当前页 result = find_target_links(driver, year_month) if result == 'stop': break # 等待页面加载完成 WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CLASS_NAME, "gg_page")) ) # 模拟点击下一页 xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]' next_page_btn = WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.XPATH, xpath)) ) # 获取下一页的URL next_page_url = next_page_btn.get_attribute("onclick") if not next_page_url: log.info("已到达最后一页,停止爬取") break # 从onclick属性中提取URL next_page_url = re.search(r"'(.*?)'", next_page_url).group(1) if not next_page_url.startswith(('http://', 'https://')): base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名 next_page_url = base_url + next_page_url # 访问下一页 driver.get(next_page_url) log.info(f"开始爬取 {next_page_url} 页面数据") finally: driver.quit() def wait_for_download_complete(timeout=30, existing_files=None): start_time = time.time() if existing_files is None: existing_files = set(f.name for f in Path(download_dir).glob('*')) while (time.time() - start_time) < timeout: new_files = [f for f in Path(download_dir).glob('*.rar') if f.name not in existing_files] if new_files: # 等待文件大小稳定(不再变化),确保下载完成 stable = True for file in new_files: prev_size = file.stat().st_size time.sleep(1) curr_size = file.stat().st_size if curr_size != prev_size: stable = False break if stable: return new_files time.sleep(2) raise TimeoutError("未找到 .rar 文件或超时") def hierarchical_traversal(root_path, all_records): """分层遍历:省份->年份->月目录""" root = Path(root_path) # 获取所有年份目录 year_dirs = [ item for item in root.iterdir() if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name) ] # 按年倒序 for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True): # 构造完整的路径:download/shandong/2025/03 log.info(f"\n年份:{year_dir.name} | 省份:jiangsu") # 提取月份目录 month_dirs = [] for item in year_dir.iterdir(): if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name): month_dirs.append({ "path": item, "month": int(item.name) }) # 按月倒序输出 if month_dirs: for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True): log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}") gov_commodity_jiangsu_import_export.process_folder(md['path'], all_records) gov_commodity_jiangsu_country.process_folder(md['path']) gov_commodity_jiangsu_city.process_folder(md['path']) if __name__ == "__main__": crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html', 'all') # crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html', 'increment') log.info(f"江苏南京海关全量数据下载任务完成") # 等待5s后执行 time.sleep(5) all_records = base_mysql.get_hs_all() hierarchical_traversal(base_country_code.download_dir, all_records) log.info("江苏南京海关类章、国家、城市所有文件处理完成!") time.sleep(5) base_mysql.update_january_yoy('江苏省') base_mysql.update_shandong_yoy('江苏省') log.info("江苏南京海关城市同比sql处理完成")