import threading import time import traceback from pathlib import Path from faker import Faker from selenium.webdriver import FirefoxOptions from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from utils.log import log DOWNLOAD_TIMEOUT = 60 download_lock = threading.Lock() def configure_stealth_options(download_dir): """反检测浏览器配置""" opts = FirefoxOptions() opts.set_preference("dom.webdriver.enabled", False) opts.add_argument("--disable-blink-features=AutomationControlled") opts.add_argument("--headless") opts.set_preference("general.useragent.override", Faker().user_agent()) opts.set_preference("browser.download.dir", str(download_dir)) opts.set_preference("browser.download.folderList", 2) opts.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/vnd.ms-excel") opts.set_preference("pdfjs.disabled", True) opts.set_preference("download.prompt_for_download", False) opts.set_preference("download.directory_upgrade", True) return opts def generate_month_sequence(start_year, start_month, end_year=None, skip_january=False): """ 动态生成倒序月份序列 Args: start_year (int): 检测到的最新数据年份 start_month (int): 检测到的最新数据月份 end_year (int, optional): 终止年份(默认 None) skip_january (bool, optional): 是否跳过所有1月数据(默认 False) Returns: List[Tuple[int, int]]: 月份序列列表,格式为 [(year, month), ...] """ sequence = [] current_year = start_year current_month = start_month # 当指定终止年份时 if end_year: while not (current_year == end_year and current_month < 1): # 跳过1月判断 if not (skip_january and current_month == 1): sequence.append((current_year, current_month)) # 跨年处理 if current_month == 1: current_year -= 1 current_month = 12 else: current_month -= 1 # 终止条件:到达目标年份的1月 if current_year < end_year: break else: # 未指定年份时取最近两个月 if not (skip_january and current_month == 1): sequence.append((current_year, current_month)) prev_year, prev_month = get_previous_month(current_year, current_month) if not (skip_january and prev_month == 1): sequence.append((prev_year, prev_month)) return sequence def get_previous_month(year, month): """跨年月份计算""" if month == 1: return year - 1, 12 return year, month - 1 def download_excel(driver, url, year, month, title, download_dir): """文件下载模块""" download_dir.mkdir(parents=True, exist_ok=True) driver.execute_script(f"window.open('{url}')") driver.switch_to.window(driver.window_handles[-1]) try: download_btn = WebDriverWait(driver, 20).until( EC.element_to_be_clickable( (By.XPATH, '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]') ) ) download_btn.click() # log.info(f"√ 已点击下载按钮:{download_btn.get_attribute("href")}") downloaded_file = wait_for_download(download_dir) final_path = Path(f'{download_dir}/{year}/{month:02d}/{title}{downloaded_file.suffix}') if final_path.exists(): final_path.unlink() download_rel_dir = Path(f'{download_dir}/{year}/{month:02d}') download_rel_dir.mkdir(parents=True, exist_ok=True) downloaded_file.rename(final_path) log.info(f"√ 文件已保存至:{final_path}") finally: driver.close() driver.switch_to.window(driver.window_handles[0]) def batch_download_excel(driver, url, year, month, base_title, download_dir): """批量下载Excel文件模块""" download_dir.mkdir(parents=True, exist_ok=True) driver.execute_script(f"window.open('{url}')") driver.switch_to.window(driver.window_handles[-1]) try: # 获取所有Excel下载按钮 download_btns = WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located( (By.XPATH, '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]') ) ) for index, btn in enumerate(download_btns): # 生成唯一标题(可自定义规则) title = f"{base_title}_{index + 1}" # 点击下载按钮 btn.click() # 等待下载完成 downloaded_file = wait_for_download(download_dir) # 处理文件路径 final_dir = download_dir / f'{year}' / f'{month:02d}' final_dir.mkdir(parents=True, exist_ok=True) final_path = final_dir / f'{title}{downloaded_file.suffix}' # 重命名文件 if final_path.exists(): final_path.unlink() downloaded_file.rename(final_path) log.info(f"√ 文件 {title} 已保存至:{final_path}") finally: driver.close() driver.switch_to.window(driver.window_handles[0]) def download_excel2(driver, link, year, month, title, download_dir): download_dir = Path(download_dir) download_dir.mkdir(parents=True, exist_ok=True) try: log.info(f"正在点击链接:{title}") link.click() log.info("等待文件下载完成...") downloaded_file = wait_for_download(download_dir) if not downloaded_file.suffix: downloaded_file = downloaded_file.with_suffix('.xlsx') final_dir = download_dir / f'{year}' / f'{month:02d}' final_dir.mkdir(parents=True, exist_ok=True) final_path = final_dir / f'{title}{downloaded_file.suffix}' if final_path.exists(): final_path.unlink() downloaded_file.rename(final_path) log.info(f"√ 文件已保存至:{final_path}") except TimeoutError as te: log.info(f"[错误] 文件下载超时:{te}") raise except Exception as e: log.info(f"[错误] 发生异常:{e}") log.info(traceback.format_exc()) raise # def wait_for_download(directory): # """文件下载监控(只读取文件,忽略文件夹)""" # start_time = time.time() # while (time.time() - start_time) < DOWNLOAD_TIMEOUT: # files = [ # f for f in directory.glob('*') # if f.is_file() and not f.name.endswith(('.part', '.crdownload')) # ] # if files: # # 按照创建时间排序并返回最新文件 # return max(files, key=lambda x: x.stat().st_ctime) # time.sleep(1) # raise TimeoutError("文件下载超时") def wait_for_download(directory): """文件下载监控(只读取文件,忽略文件夹)""" start_time = time.time() while (time.time() - start_time) < DOWNLOAD_TIMEOUT: with download_lock: files = [ f for f in directory.glob('*') if f.is_file() and not f.name.endswith(('.part', '.crdownload')) ] if files: # 按照创建时间排序并返回最新文件 return max(files, key=lambda x: x.stat().st_ctime) time.sleep(1) raise TimeoutError("文件下载超时")