import re import threading import time import traceback from pathlib import Path from faker import Faker from selenium.webdriver import FirefoxOptions, ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from utils.log import log DOWNLOAD_TIMEOUT = 60 download_lock = threading.Lock() def configure_stealth_options(download_dir): """反检测浏览器配置""" opts = FirefoxOptions() opts.set_preference("dom.webdriver.enabled", False) opts.add_argument("--disable-blink-features=AutomationControlled") opts.add_argument("--headless") opts.add_argument("--window-size=1366,900") # 固定视口 opts.set_preference("general.useragent.override", Faker().user_agent()) opts.set_preference("browser.download.dir", str(download_dir)) opts.set_preference("browser.download.folderList", 2) opts.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/vnd.ms-excel") opts.set_preference("pdfjs.disabled", True) opts.set_preference("download.prompt_for_download", False) opts.set_preference("download.directory_upgrade", True) return opts def generate_month_sequence(start_year, start_month, end_year=None, skip_january=False): """ 动态生成倒序月份序列 Args: start_year (int): 检测到的最新数据年份 start_month (int): 检测到的最新数据月份 end_year (int, optional): 终止年份(默认 None) skip_january (bool, optional): 是否跳过所有1月数据(默认 False) Returns: List[Tuple[int, int]]: 月份序列列表,格式为 [(year, month), ...] """ sequence = [] current_year = start_year current_month = start_month # 当指定终止年份时 if end_year: while not (current_year == end_year and current_month < 1): # 跳过1月判断 if not (skip_january and current_month == 1): sequence.append((current_year, current_month)) # 跨年处理 if current_month == 1: current_year -= 1 current_month = 12 else: current_month -= 1 # 终止条件:到达目标年份的1月 if current_year < end_year: break else: # 未指定年份时取最近两个月 if not (skip_january and current_month == 1): sequence.append((current_year, current_month)) prev_year, prev_month = get_previous_month(current_year, current_month) if not (skip_january and prev_month == 1): sequence.append((prev_year, prev_month)) return sequence def get_previous_month(year, month): """跨年月份计算""" if month == 1: return year - 1, 12 return year, month - 1 def download_excel(driver, url, year, month, title, download_dir): """文件下载模块""" download_dir.mkdir(parents=True, exist_ok=True) driver.execute_script(f"window.open('{url}')") driver.switch_to.window(driver.window_handles[-1]) try: download_btn = WebDriverWait(driver, 20).until( EC.element_to_be_clickable( (By.XPATH, '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]') ) ) # download_btn.click() driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", download_btn) # 使用 ActionChains 安全点击 ActionChains(driver).move_to_element(download_btn).pause(0.3).click().perform() # log.info(f"√ 已点击下载按钮:{download_btn.get_attribute("href")}") downloaded_file = wait_for_download(download_dir) final_path = Path(f'{download_dir}/{year}/{month:02d}/{title}{downloaded_file.suffix}') if final_path.exists(): final_path.unlink() download_rel_dir = Path(f'{download_dir}/{year}/{month:02d}') download_rel_dir.mkdir(parents=True, exist_ok=True) downloaded_file.rename(final_path) log.info(f"√ 文件已保存至:{final_path}") finally: driver.close() driver.switch_to.window(driver.window_handles[0]) # def batch_download_excel(driver, url, year, month, base_title, download_dir): # """批量下载Excel文件模块""" # download_dir.mkdir(parents=True, exist_ok=True) # # driver.execute_script(f"window.open('{url}')") # driver.switch_to.window(driver.window_handles[-1]) # # try: # # 获取所有Excel下载按钮 # download_btns = WebDriverWait(driver, 20).until( # EC.presence_of_all_elements_located( # (By.XPATH, # '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]') # ) # ) # # for index, btn in enumerate(download_btns): # # 生成唯一标题(可自定义规则) # title = f"{base_title}_{index + 1}" # # # 点击下载按钮 # btn.click() # # # 等待下载完成 # downloaded_file = wait_for_download(download_dir) # # # 处理文件路径 # final_dir = download_dir / f'{year}' / f'{month:02d}' # final_dir.mkdir(parents=True, exist_ok=True) # final_path = final_dir / f'{title}{downloaded_file.suffix}' # # # 重命名文件 # if final_path.exists(): # final_path.unlink() # downloaded_file.rename(final_path) # log.info(f"√ 文件 {title} 已保存至:{final_path}") # # finally: # driver.close() # driver.switch_to.window(driver.window_handles[0]) def batch_download_excel(driver, url, base_year, base_month, base_title, download_dir): """批量下载Excel文件模块 - 支持根据文件名自动识别并分类到对应月份""" download_dir.mkdir(parents=True, exist_ok=True) driver.execute_script(f"window.open('{url}')") driver.switch_to.window(driver.window_handles[-1]) try: # 获取所有Excel下载按钮 download_btns = WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located( (By.XPATH, '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]') ) ) for index, btn in enumerate(download_btns): raw_title = btn.text.strip() or btn.get_attribute("textContent").strip() if not raw_title: log.warning("⚠️ 标题为空,跳过处理") continue # 提取年份 year_match = re.search(r'(\d{4})\s*年', raw_title) file_year = int(year_match.group(1)) if year_match else base_year # 提取月份(支持多种形式) month_match = re.search( r'(?:1[--]\s*)?(\d{1,2})月|前?(\d{1,2})个?月|第([一二三四])季度', raw_title ) file_month = base_month if month_match: digit_groups = month_match.groups() if digit_groups[0]: file_month = int(digit_groups[0]) elif digit_groups[1]: file_month = int(digit_groups[1]) elif digit_groups[2]: quarter_map = {"一": 3, "二": 6, "三": 9, "四": 12} file_month = quarter_map.get(digit_groups[2], base_month) # 确保月份合法 if not (1 <= file_month <= 12): file_month = base_month # 构建目录 final_dir = download_dir / f'{file_year}' / f'{file_month:02d}' final_dir.mkdir(parents=True, exist_ok=True) # 下载文件 btn.click() downloaded_file = wait_for_download(download_dir) # 构建最终路径(去除已有后缀,防止重复) clean_title = Path(raw_title).stem final_path = final_dir / f'{clean_title}{downloaded_file.suffix}' # 删除已存在文件并重命名 if final_path.exists(): final_path.unlink() downloaded_file.rename(final_path) log.info(f"√ 文件 {clean_title} 已保存至:{final_path}") finally: driver.close() driver.switch_to.window(driver.window_handles[0]) def download_excel2(driver, link, year, month, title, download_dir): download_dir = Path(download_dir) download_dir.mkdir(parents=True, exist_ok=True) try: log.info(f"正在点击链接:{title}") link.click() log.info("等待文件下载完成...") downloaded_file = wait_for_download(download_dir) if not downloaded_file.suffix: downloaded_file = downloaded_file.with_suffix('.xlsx') final_dir = download_dir / f'{year}' / f'{month:02d}' final_dir.mkdir(parents=True, exist_ok=True) final_path = final_dir / f'{title}{downloaded_file.suffix}' if final_path.exists(): final_path.unlink() downloaded_file.rename(final_path) log.info(f"√ 文件已保存至:{final_path}") except TimeoutError as te: log.info(f"[错误] 文件下载超时:{te}") raise except Exception as e: log.info(f"[错误] 发生异常:{e}") log.info(traceback.format_exc()) raise # def wait_for_download(directory): # """文件下载监控(只读取文件,忽略文件夹)""" # start_time = time.time() # while (time.time() - start_time) < DOWNLOAD_TIMEOUT: # files = [ # f for f in directory.glob('*') # if f.is_file() and not f.name.endswith(('.part', '.crdownload')) # ] # if files: # # 按照创建时间排序并返回最新文件 # return max(files, key=lambda x: x.stat().st_ctime) # time.sleep(1) # raise TimeoutError("文件下载超时") def wait_for_download(directory): """文件下载监控(只读取文件,忽略文件夹)""" start_time = time.time() while (time.time() - start_time) < DOWNLOAD_TIMEOUT: with download_lock: files = [ f for f in directory.glob('*') if f.is_file() and not f.name.endswith(('.part', '.crdownload')) ] if files: # 按照创建时间排序并返回最新文件 return max(files, key=lambda x: x.stat().st_ctime) time.sleep(1) raise TimeoutError("文件下载超时")