|
- import os
- import random
- import re
- import time
- import sys
- from pathlib import Path
- from datetime import datetime, timedelta
- from faker import Faker
- from selenium import webdriver
- from selenium.webdriver import FirefoxOptions
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from utils import base_country_code, base_mysql
- import gov_commodity_hebei_import_export
- import gov_commodity_hebei_country
- import gov_commodity_hebei_city
- from utils.log import log
- download_dir = base_country_code.download_dir
- Path(download_dir).mkdir(parents=True, exist_ok=True)
- def get_current_target_titles():
- return [
- f"2025年4月河北分进口商品",
- f"2025年4月河北分出口商品",
- f"2025年4月河北分国家",
- f"2025年4月河北分地市"
- ]
- def configure_stealth_options():
- """增强型反检测配置[1,4](@ref)"""
- opts = FirefoxOptions()
- print("当前下载路径:", Path(download_dir).resolve())
- # 文件下载配置
- opts.set_preference("browser.download.dir", download_dir)
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.download.manager.showWhenStarting", False)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
- "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
- opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
- opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
- # 反检测参数
- opts.set_preference("dom.webdriver.enabled", False)
- opts.set_preference("useAutomationExtension", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- # 动态指纹
- fake = Faker()
- opts.set_preference("general.useragent.override", fake.firefox())
- opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
- # 视口配置
- opts.add_argument("--width=1440")
- opts.add_argument("--height=900")
- opts.add_argument("--headless")
- return opts
- def remove_prefix_from_url(url):
- # 分离路径和文件名
- path_parts = url.split('/')
- filename = path_parts[-1]
- # 使用正则表达式去掉前缀数字和点(如 "1.")
- new_filename = re.sub(r'^\d+\.', '', filename)
- # 确保域名补全逻辑
- if not url.startswith(('http://', 'https://')):
- base_url = 'http://shijiazhuang.customs.gov.cn'
- url = base_url + '/'.join(path_parts[:-1]) + '/' + new_filename
- return url
- def find_target_links(driver, year_month):
- """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
- # 等待页面加载完成
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
- )
- element_arr = driver.find_elements(By.XPATH, '//div[@class="list_con"]//ul[@class="easysite-list-modelone"]//a')
- if not element_arr:
- log.info("未找到目标标题")
- return None
- for elements in element_arr:
- file_name = elements.text.strip()
- if not file_name:
- continue
- if year_month is None:
- if file_name.startswith('2022'):
- return 'stop'
- else:
- if not file_name.startswith(year_month):
- log.info(f"非 {year_month} 文件: {file_name}, stop")
- return 'stop'
- if '进口商品' in file_name or '出口商品' in file_name or '分国家' in file_name or '分国别' in file_name or '地市' in file_name:
- file_url = elements.get_attribute("href")
- file_url = remove_prefix_from_url(file_url)
- if not file_url.lower().endswith(('.xls', '.xlsx')):
- log.info(f"跳过非 Excel 文件: {file_url}")
- continue
- log.info(f"正在下载: {file_name} → {file_url}")
- # 记录下载前的文件列表
- existing_files = set(f.name for f in Path(download_dir).glob('*'))
- # 随机点击延迟
- time.sleep(random.uniform(1, 3))
- elements.click()
- try:
- downloaded_file = wait_for_download_complete(existing_files=existing_files)
- except Exception as e:
- log.info(f"下载失败: {str(e)}")
- continue
- year, start_month, month = extract_year_and_month(file_name)
- final_path = Path(download_dir) / year / month / f"{file_name}.xls"
- if os.path.exists(final_path):
- log.info(f"文件已存在:{file_name} 正在覆盖...")
- os.unlink(final_path)
- final_dir = Path(download_dir) / year / month
- final_dir.mkdir(parents=True, exist_ok=True)
- log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
- downloaded_file.rename(final_path)
- log.info(f"√ 下载成功:{final_path}")
- else:
- log.info(f'{file_name} 不需要此文件,跳过')
- continue
- return None
- def extract_year_and_month(file_name):
- # 支持格式:1.2025年1-2月xxx 或 2025年3月xxx
- match = re.search(r"\b(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name)
- if match:
- year = match.group(1)
- start_month = match.group(2)
- end_month = match.group(3) if match.group(3) else start_month
- return year, start_month.zfill(2), end_month.zfill(2)
- else:
- raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
- def detect_latest_month(driver, url):
- driver.get(url)
- current_date = datetime.now()
- for offset in range(0, 3):
- check_date = current_date - timedelta(days=offset * 30)
- check_year = check_date.year
- check_month = check_date.month
- target_title = f"{check_year}年{check_month}月"
- try:
- WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
- )
- log.info(f"已找到最新月份数据 {check_year}-{check_month}")
- # 看是否已存表,已存则跳过;
- count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', '130000')
- if count > 0:
- log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
- continue
- return f"{check_year}年{check_month}月"
- except:
- log.info(f"未找到 {target_title}")
- continue
- log.info("三个月内未找到有效数据")
- return None
- def crawl_with_selenium(url, mark):
- driver = webdriver.Firefox(options=configure_stealth_options())
- year_month = None
- if 'increment' == mark:
- res = detect_latest_month(driver, url)
- if res is None:
- log.info("河北省海关没有最新数据更新")
- sys.exit(0)
- year_month = res
- print(f"检测到最新有效数据:{year_month}")
- try:
- # 注入反检测脚本
- driver.execute_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => undefined
- });
- window.alert = () => {};
- """)
- # 页面加载策略
- driver.get(url)
- while True:
- # 访问当前页
- result = find_target_links(driver, year_month)
- if result and result == 'stop':
- break
- # 等待页面加载完成
- WebDriverWait(driver, 30).until(
- EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
- )
- # 模拟点击下一页
- xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
- next_page_btn = WebDriverWait(driver, 15).until(
- EC.element_to_be_clickable((By.XPATH, xpath))
- )
- # 获取下一页的URL
- next_page_url = next_page_btn.get_attribute("onclick")
- if not next_page_url:
- log.info("已到达最后一页,停止爬取")
- break
- # 从onclick属性中提取URL
- next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
- if not next_page_url.startswith(('http://', 'https://')):
- base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名
- next_page_url = base_url + next_page_url
- # 访问下一页
- driver.get(next_page_url)
- log.info(f"开始爬取 {next_page_url} 页面数据")
- finally:
- driver.quit()
- def wait_for_download_complete(timeout=30, existing_files=None):
- """
- 监控下载目录,等待文件下载完成并返回新下载的文件。
- :param timeout: 超时时间(秒)
- :param existing_files: 下载前已存在的文件列表
- :return: 新下载的文件路径
- """
- start_time = time.time()
- temp_exts = ('.part', '.crdownload')
- if existing_files is None:
- existing_files = set(f.name for f in Path(download_dir).glob('*'))
- while (time.time() - start_time) < timeout:
- # 获取有效文件列表
- valid_files = []
- for f in Path(download_dir).glob('*'):
- if (f.name not in existing_files and
- not f.name.endswith(temp_exts) and
- f.stat().st_size > 0):
- valid_files.append(f)
- # 等待最新文件稳定
- if valid_files:
- return max(valid_files, key=lambda x: x.stat().st_mtime)
- time.sleep(2)
- raise TimeoutError("文件下载超时")
- def hierarchical_traversal(root_path):
- """分层遍历:省份->年份->月目录"""
- root = Path(root_path)
- # 获取所有年份目录
- year_dirs = [
- item for item in root.iterdir()
- if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
- ]
- # 按年倒序
- for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
- # 构造完整的路径:download/shandong/2025/03
- log.info(f"\n年份:{year_dir.name} | 省份:hebei")
- # 提取月份目录
- month_dirs = []
- for item in year_dir.iterdir():
- if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
- month_dirs.append({
- "path": item,
- "month": int(item.name)
- })
- # 按月倒序输出
- if month_dirs:
- for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
- log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}")
- gov_commodity_hebei_import_export.process_folder(md['path'])
- gov_commodity_hebei_country.process_folder(md['path'])
- gov_commodity_hebei_city.process_folder(md['path'])
- if __name__ == "__main__":
- crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'all')
- # crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'increment')
- # 等待5s后执行
- time.sleep(5)
- hierarchical_traversal(base_country_code.download_dir)
- log.info(f"河北石家庄海关全量数据下载任务完成")
- time.sleep(5)
- base_mysql.update_january_yoy('河北省')
- base_mysql.update_shandong_yoy('河北省')
- log.info("河北石家庄海关城市同比sql处理完成")
|