123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- import os
- import random
- import re
- import subprocess
- import time
- import rarfile
- import shutil
- from pathlib import Path
- from faker import Faker
- from selenium import webdriver
- from selenium.webdriver import FirefoxOptions
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- import gov_commodity_jiangsu_country
- import gov_commodity_jiangsu_city
- import gov_commodity_jiangsu_import_export
- from utils import base_country_code, base_mysql
- from utils.log import log
- # rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
- rarfile.UNRAR_EXECUTABLE = "unrar"
- download_dir = base_country_code.download_dir
- Path(download_dir).mkdir(parents=True, exist_ok=True)
- def configure_stealth_options():
- """增强型反检测配置[1,4](@ref)"""
- opts = FirefoxOptions()
- print("当前下载路径:", Path(download_dir).resolve())
- # 文件下载配置
- opts.set_preference("browser.download.dir", download_dir)
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.download.manager.showWhenStarting", False)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
- "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
- opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
- opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
- # 反检测参数
- opts.set_preference("dom.webdriver.enabled", False)
- opts.set_preference("useAutomationExtension", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- # 动态指纹
- fake = Faker()
- opts.set_preference("general.useragent.override", fake.firefox())
- opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
- # 视口配置
- opts.add_argument("--width=1440")
- opts.add_argument("--height=900")
- opts.add_argument("--headless")
- return opts
- def find_target_links(driver):
- """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
- # 等待页面加载完成
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- try:
- # 使用 XPath 精准匹配标题文本
- xpath = '//ul[@class="conList_ul"]//a[contains(@href, ".rar")]'
- # 检查页面中是否存在该 title 对应的元素
- elements = driver.find_elements(By.XPATH, xpath)
- if not elements:
- return None
- # 用于记录已处理过的文件名(防止重复下载)
- processed_files = set()
- # 遍历所有链接并点击下载
- for download_btn in elements:
- # 获取文件名(用于后续判断)
- file_name = download_btn.text.strip()
- log.info(f"正在下载: {file_name}")
- # 记录下载前的文件列表
- existing_files = set(f.name for f in Path(download_dir).glob('*'))
- # 模拟点击
- download_btn.click()
- time.sleep(random.uniform(1, 3))
- # 等待文件下载完成
- rar_files = wait_for_download_complete(existing_files=existing_files)
- if not rar_files:
- log.info("未找到新下载的 .rar 文件")
- continue
- downloaded_file = rar_files[0]
- if downloaded_file.suffix == '.rar':
- # 解压文件
- with rarfile.RarFile(downloaded_file) as rf:
- # 获取压缩包中的第一个 .xls 文件
- xls_files = [f for f in rf.namelist() if f.endswith('.xls') or f.endswith('.xlsx')]
- if not xls_files:
- log.info(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
- continue
- for xls_file in xls_files:
- if xls_file.startswith('2022'):
- return 'stop'
- if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file:
- log.info(f"检测到不需要的文件:{xls_file},跳过")
- continue
- # 解压到临时目录
- temp_dir = Path(download_dir) / 'temp'
- temp_dir.mkdir(parents=True, exist_ok=True)
- if not extract_rar(downloaded_file, temp_dir):
- log.info(f"解压文件 {downloaded_file.name} 时发生错误")
- continue
- # 获取解压后的文件路径
- match = re.search(r"(\d{4})年(\d{1,2})月", xls_file)
- if not match:
- raise ValueError(f"无效标题格式:{xls_file}")
- year = match.group(1)
- month = match.group(2).zfill(2)
- extracted_file = temp_dir / xls_file
- final_path = Path(download_dir) / year / month / extracted_file.name
- if os.path.exists(final_path):
- log.info(f"文件已存在:{extracted_file.name} 正在覆盖...")
- os.unlink(final_path)
- final_dir = Path(download_dir) / year / month
- final_dir.mkdir(parents=True, exist_ok=True)
- log.info(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
- try:
- extracted_file.rename(final_path)
- log.info(f"√ 下载成功:{final_path}")
- except Exception as e:
- log.info(f"文件移动失败: {str(e)}")
- # 删除临时目录(无论是否为空)
- try:
- shutil.rmtree(temp_dir) # 替换 os.rmdir(temp_dir)
- except Exception as e:
- log.info(f"删除临时目录失败: {str(e)}")
- # 删除 .rar 文件
- log.info(f"删除 .rar 文件:{downloaded_file}")
- os.unlink(downloaded_file)
- else:
- log.info(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
- # 将已处理的文件名加入集合
- processed_files.add(file_name)
- return None
- except Exception as e:
- log.info(f"下载时发生异常: {str(e)}")
- def extract_rar(rar_path, extract_to):
- """备用解压函数(当 rarfile 失效时使用)"""
- # winrar_path = r"C:\Program Files\WinRAR\Rar.exe" # 推荐使用 Rar.exe 而非 WinRAR.exe
- # cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
- cmd = ["unrar", 'x', '-y', rar_path, str(extract_to)]
- # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
- creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
- result = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- creationflags=creationflags # 关键点:隐藏窗口
- )
- if result.returncode == 0:
- log.info(f"解压成功: {rar_path} → {extract_to}")
- return True
- else:
- log.info(f"解压失败: {result.stderr.decode('gbk')}")
- return False
- def crawl_with_selenium(url):
- driver = webdriver.Firefox(options=configure_stealth_options())
- try:
- # 注入反检测脚本
- driver.execute_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- });
- window.alert = () => {};
- """)
- # 页面加载策略
- driver.get(url)
- while True:
- # 访问当前页
- result = find_target_links(driver)
- if result == 'stop':
- break
- # 等待页面加载完成
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
- )
- # 模拟点击下一页
- xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
- next_page_btn = WebDriverWait(driver, 15).until(
- EC.element_to_be_clickable((By.XPATH, xpath))
- )
- # 获取下一页的URL
- next_page_url = next_page_btn.get_attribute("onclick")
- if not next_page_url:
- log.info("已到达最后一页,停止爬取")
- break
- # 从onclick属性中提取URL
- next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
- if not next_page_url.startswith(('http://', 'https://')):
- base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名
- next_page_url = base_url + next_page_url
- # 访问下一页
- driver.get(next_page_url)
- log.info(f"开始爬取 {next_page_url} 页面数据")
- finally:
- driver.quit()
- def wait_for_download_complete(timeout=30, existing_files=None):
- start_time = time.time()
- if existing_files is None:
- existing_files = set(f.name for f in Path(download_dir).glob('*'))
- while (time.time() - start_time) < timeout:
- new_files = [f for f in Path(download_dir).glob('*.rar') if f.name not in existing_files]
- if new_files:
- # 等待文件大小稳定(不再变化),确保下载完成
- stable = True
- for file in new_files:
- prev_size = file.stat().st_size
- time.sleep(1)
- curr_size = file.stat().st_size
- if curr_size != prev_size:
- stable = False
- break
- if stable:
- return new_files
- time.sleep(2)
- raise TimeoutError("未找到 .rar 文件或超时")
- def hierarchical_traversal(root_path, all_records):
- """分层遍历:省份->年份->月目录"""
- root = Path(root_path)
- # 获取所有年份目录
- year_dirs = [
- item for item in root.iterdir()
- if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
- ]
- # 按年倒序
- for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
- # 构造完整的路径:download/shandong/2025/03
- log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
- # 提取月份目录
- month_dirs = []
- for item in year_dir.iterdir():
- if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
- month_dirs.append({
- "path": item,
- "month": int(item.name)
- })
- # 按月倒序输出
- if month_dirs:
- for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
- log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}")
- gov_commodity_jiangsu_import_export.process_folder(md['path'], all_records)
- gov_commodity_jiangsu_country.process_folder(md['path'])
- gov_commodity_jiangsu_city.process_folder(md['path'])
- if __name__ == "__main__":
- crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html')
- log.info(f"江苏南京海关全量数据下载任务完成")
- # 等待5s后执行
- time.sleep(5)
- all_records = base_mysql.get_hs_all()
- hierarchical_traversal(base_country_code.download_dir, all_records)
- log.info("江苏南京海关类章、国家、城市所有文件处理完成!")
- time.sleep(5)
- base_mysql.update_january_yoy('江苏省')
- base_mysql.update_shandong_yoy('江苏省')
- log.info("江苏南京海关城市同比sql处理完成")
|