123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- import re
- import threading
- import time
- import traceback
- from pathlib import Path
- from faker import Faker
- from selenium.webdriver import FirefoxOptions, ActionChains
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from utils.log import log
- DOWNLOAD_TIMEOUT = 60
- download_lock = threading.Lock()
- def configure_stealth_options(download_dir):
- """反检测浏览器配置"""
- opts = FirefoxOptions()
- opts.set_preference("dom.webdriver.enabled", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- opts.add_argument("--headless")
- opts.add_argument("--window-size=1366,900") # 固定视口
- opts.set_preference("general.useragent.override", Faker().user_agent())
- opts.set_preference("browser.download.dir", str(download_dir))
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/vnd.ms-excel")
- opts.set_preference("pdfjs.disabled", True)
- opts.set_preference("download.prompt_for_download", False)
- opts.set_preference("download.directory_upgrade", True)
- return opts
- def generate_month_sequence(start_year, start_month, end_year=None, skip_january=False):
- """
- 动态生成倒序月份序列
- Args:
- start_year (int): 检测到的最新数据年份
- start_month (int): 检测到的最新数据月份
- end_year (int, optional): 终止年份(默认 None)
- skip_january (bool, optional): 是否跳过所有1月数据(默认 False)
- Returns:
- List[Tuple[int, int]]: 月份序列列表,格式为 [(year, month), ...]
- """
- sequence = []
- current_year = start_year
- current_month = start_month
- # 当指定终止年份时
- if end_year:
- while not (current_year == end_year and current_month < 1):
- # 跳过1月判断
- if not (skip_january and current_month == 1):
- sequence.append((current_year, current_month))
- # 跨年处理
- if current_month == 1:
- current_year -= 1
- current_month = 12
- else:
- current_month -= 1
- # 终止条件:到达目标年份的1月
- if current_year < end_year:
- break
- else:
- # 未指定年份时取最近两个月
- if not (skip_january and current_month == 1):
- sequence.append((current_year, current_month))
- prev_year, prev_month = get_previous_month(current_year, current_month)
- if not (skip_january and prev_month == 1):
- sequence.append((prev_year, prev_month))
- return sequence
- def get_previous_month(year, month):
- """跨年月份计算"""
- if month == 1:
- return year - 1, 12
- return year, month - 1
- def download_excel(driver, url, year, month, title, download_dir):
- """文件下载模块"""
- download_dir.mkdir(parents=True, exist_ok=True)
- driver.execute_script(f"window.open('{url}')")
- driver.switch_to.window(driver.window_handles[-1])
- try:
- download_btn = WebDriverWait(driver, 20).until(
- EC.element_to_be_clickable(
- (By.XPATH,
- '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]')
- )
- )
- # download_btn.click()
- driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", download_btn)
- # 使用 ActionChains 安全点击
- ActionChains(driver).move_to_element(download_btn).pause(0.3).click().perform()
- # log.info(f"√ 已点击下载按钮:{download_btn.get_attribute("href")}")
- downloaded_file = wait_for_download(download_dir)
- final_path = Path(f'{download_dir}/{year}/{month:02d}/{title}{downloaded_file.suffix}')
- if final_path.exists():
- final_path.unlink()
- download_rel_dir = Path(f'{download_dir}/{year}/{month:02d}')
- download_rel_dir.mkdir(parents=True, exist_ok=True)
- downloaded_file.rename(final_path)
- log.info(f"√ 文件已保存至:{final_path}")
- finally:
- driver.close()
- driver.switch_to.window(driver.window_handles[0])
- # def batch_download_excel(driver, url, year, month, base_title, download_dir):
- # """批量下载Excel文件模块"""
- # download_dir.mkdir(parents=True, exist_ok=True)
- #
- # driver.execute_script(f"window.open('{url}')")
- # driver.switch_to.window(driver.window_handles[-1])
- #
- # try:
- # # 获取所有Excel下载按钮
- # download_btns = WebDriverWait(driver, 20).until(
- # EC.presence_of_all_elements_located(
- # (By.XPATH,
- # '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]')
- # )
- # )
- #
- # for index, btn in enumerate(download_btns):
- # # 生成唯一标题(可自定义规则)
- # title = f"{base_title}_{index + 1}"
- #
- # # 点击下载按钮
- # btn.click()
- #
- # # 等待下载完成
- # downloaded_file = wait_for_download(download_dir)
- #
- # # 处理文件路径
- # final_dir = download_dir / f'{year}' / f'{month:02d}'
- # final_dir.mkdir(parents=True, exist_ok=True)
- # final_path = final_dir / f'{title}{downloaded_file.suffix}'
- #
- # # 重命名文件
- # if final_path.exists():
- # final_path.unlink()
- # downloaded_file.rename(final_path)
- # log.info(f"√ 文件 {title} 已保存至:{final_path}")
- #
- # finally:
- # driver.close()
- # driver.switch_to.window(driver.window_handles[0])
- def batch_download_excel(driver, url, base_year, base_month, base_title, download_dir):
- """批量下载Excel文件模块 - 支持根据文件名自动识别并分类到对应月份"""
- download_dir.mkdir(parents=True, exist_ok=True)
- driver.execute_script(f"window.open('{url}')")
- driver.switch_to.window(driver.window_handles[-1])
- try:
- # 获取所有Excel下载按钮
- download_btns = WebDriverWait(driver, 20).until(
- EC.presence_of_all_elements_located(
- (By.XPATH,
- '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]')
- )
- )
- for index, btn in enumerate(download_btns):
- raw_title = btn.text.strip() or btn.get_attribute("textContent").strip()
- if not raw_title:
- log.warning("⚠️ 标题为空,跳过处理")
- continue
- # 提取年份
- year_match = re.search(r'(\d{4})\s*年', raw_title)
- file_year = int(year_match.group(1)) if year_match else base_year
- # 提取月份(支持多种形式)
- month_match = re.search(
- r'(?:1[--]\s*)?(\d{1,2})月|前?(\d{1,2})个?月|第([一二三四])季度',
- raw_title
- )
- file_month = base_month
- if month_match:
- digit_groups = month_match.groups()
- if digit_groups[0]:
- file_month = int(digit_groups[0])
- elif digit_groups[1]:
- file_month = int(digit_groups[1])
- elif digit_groups[2]:
- quarter_map = {"一": 3, "二": 6, "三": 9, "四": 12}
- file_month = quarter_map.get(digit_groups[2], base_month)
- # 确保月份合法
- if not (1 <= file_month <= 12):
- file_month = base_month
- # 构建目录
- final_dir = download_dir / f'{file_year}' / f'{file_month:02d}'
- final_dir.mkdir(parents=True, exist_ok=True)
- # 下载文件
- btn.click()
- downloaded_file = wait_for_download(download_dir)
- # 构建最终路径(去除已有后缀,防止重复)
- clean_title = Path(raw_title).stem
- final_path = final_dir / f'{clean_title}{downloaded_file.suffix}'
- # 删除已存在文件并重命名
- if final_path.exists():
- final_path.unlink()
- downloaded_file.rename(final_path)
- log.info(f"√ 文件 {clean_title} 已保存至:{final_path}")
- finally:
- driver.close()
- driver.switch_to.window(driver.window_handles[0])
- def download_excel2(driver, link, year, month, title, download_dir):
- download_dir = Path(download_dir)
- download_dir.mkdir(parents=True, exist_ok=True)
- try:
- log.info(f"正在点击链接:{title}")
- link.click()
- log.info("等待文件下载完成...")
- downloaded_file = wait_for_download(download_dir)
- if not downloaded_file.suffix:
- downloaded_file = downloaded_file.with_suffix('.xlsx')
- final_dir = download_dir / f'{year}' / f'{month:02d}'
- final_dir.mkdir(parents=True, exist_ok=True)
- final_path = final_dir / f'{title}{downloaded_file.suffix}'
- if final_path.exists():
- final_path.unlink()
- downloaded_file.rename(final_path)
- log.info(f"√ 文件已保存至:{final_path}")
- except TimeoutError as te:
- log.info(f"[错误] 文件下载超时:{te}")
- raise
- except Exception as e:
- log.info(f"[错误] 发生异常:{e}")
- log.info(traceback.format_exc())
- raise
- # def wait_for_download(directory):
- # """文件下载监控(只读取文件,忽略文件夹)"""
- # start_time = time.time()
- # while (time.time() - start_time) < DOWNLOAD_TIMEOUT:
- # files = [
- # f for f in directory.glob('*')
- # if f.is_file() and not f.name.endswith(('.part', '.crdownload'))
- # ]
- # if files:
- # # 按照创建时间排序并返回最新文件
- # return max(files, key=lambda x: x.stat().st_ctime)
- # time.sleep(1)
- # raise TimeoutError("文件下载超时")
- def wait_for_download(directory):
- """文件下载监控(只读取文件,忽略文件夹)"""
- start_time = time.time()
- while (time.time() - start_time) < DOWNLOAD_TIMEOUT:
- with download_lock:
- files = [
- f for f in directory.glob('*')
- if f.is_file() and not f.name.endswith(('.part', '.crdownload'))
- ]
- if files:
- # 按照创建时间排序并返回最新文件
- return max(files, key=lambda x: x.stat().st_ctime)
- time.sleep(1)
- raise TimeoutError("文件下载超时")
|