123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- import threading
- import time
- import traceback
- from pathlib import Path
- from faker import Faker
- from selenium.webdriver import FirefoxOptions
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from utils.log import log
- DOWNLOAD_TIMEOUT = 60
- download_lock = threading.Lock()
- def configure_stealth_options(download_dir):
- """反检测浏览器配置"""
- opts = FirefoxOptions()
- opts.set_preference("dom.webdriver.enabled", False)
- opts.add_argument("--disable-blink-features=AutomationControlled")
- opts.add_argument("--headless")
- opts.set_preference("general.useragent.override", Faker().user_agent())
- opts.set_preference("browser.download.dir", str(download_dir))
- opts.set_preference("browser.download.folderList", 2)
- opts.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/vnd.ms-excel")
- opts.set_preference("pdfjs.disabled", True)
- opts.set_preference("download.prompt_for_download", False)
- opts.set_preference("download.directory_upgrade", True)
- return opts
- def generate_month_sequence(start_year, start_month, end_year=None, skip_january=False):
- """
- 动态生成倒序月份序列
- Args:
- start_year (int): 检测到的最新数据年份
- start_month (int): 检测到的最新数据月份
- end_year (int, optional): 终止年份(默认 None)
- skip_january (bool, optional): 是否跳过所有1月数据(默认 False)
- Returns:
- List[Tuple[int, int]]: 月份序列列表,格式为 [(year, month), ...]
- """
- sequence = []
- current_year = start_year
- current_month = start_month
- # 当指定终止年份时
- if end_year:
- while not (current_year == end_year and current_month < 1):
- # 跳过1月判断
- if not (skip_january and current_month == 1):
- sequence.append((current_year, current_month))
- # 跨年处理
- if current_month == 1:
- current_year -= 1
- current_month = 12
- else:
- current_month -= 1
- # 终止条件:到达目标年份的1月
- if current_year < end_year:
- break
- else:
- # 未指定年份时取最近两个月
- if not (skip_january and current_month == 1):
- sequence.append((current_year, current_month))
- prev_year, prev_month = get_previous_month(current_year, current_month)
- if not (skip_january and prev_month == 1):
- sequence.append((prev_year, prev_month))
- return sequence
- def get_previous_month(year, month):
- """跨年月份计算"""
- if month == 1:
- return year - 1, 12
- return year, month - 1
- def download_excel(driver, url, year, month, title, download_dir):
- """文件下载模块"""
- download_dir.mkdir(parents=True, exist_ok=True)
- driver.execute_script(f"window.open('{url}')")
- driver.switch_to.window(driver.window_handles[-1])
- try:
- download_btn = WebDriverWait(driver, 20).until(
- EC.element_to_be_clickable(
- (By.XPATH,
- '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]')
- )
- )
- download_btn.click()
- # log.info(f"√ 已点击下载按钮:{download_btn.get_attribute("href")}")
- downloaded_file = wait_for_download(download_dir)
- final_path = Path(f'{download_dir}/{year}/{month:02d}/{title}{downloaded_file.suffix}')
- if final_path.exists():
- final_path.unlink()
- download_rel_dir = Path(f'{download_dir}/{year}/{month:02d}')
- download_rel_dir.mkdir(parents=True, exist_ok=True)
- downloaded_file.rename(final_path)
- log.info(f"√ 文件已保存至:{final_path}")
- finally:
- driver.close()
- driver.switch_to.window(driver.window_handles[0])
- def batch_download_excel(driver, url, year, month, base_title, download_dir):
- """批量下载Excel文件模块"""
- download_dir.mkdir(parents=True, exist_ok=True)
- driver.execute_script(f"window.open('{url}')")
- driver.switch_to.window(driver.window_handles[-1])
- try:
- # 获取所有Excel下载按钮
- download_btns = WebDriverWait(driver, 20).until(
- EC.presence_of_all_elements_located(
- (By.XPATH,
- '//a[substring(@href, string-length(@href)-3) = ".xls" or substring(@href, string-length(@href)-4) = ".xlsx"]')
- )
- )
- for index, btn in enumerate(download_btns):
- # 生成唯一标题(可自定义规则)
- title = f"{base_title}_{index + 1}"
- # 点击下载按钮
- btn.click()
- # 等待下载完成
- downloaded_file = wait_for_download(download_dir)
- # 处理文件路径
- final_dir = download_dir / f'{year}' / f'{month:02d}'
- final_dir.mkdir(parents=True, exist_ok=True)
- final_path = final_dir / f'{title}{downloaded_file.suffix}'
- # 重命名文件
- if final_path.exists():
- final_path.unlink()
- downloaded_file.rename(final_path)
- log.info(f"√ 文件 {title} 已保存至:{final_path}")
- finally:
- driver.close()
- driver.switch_to.window(driver.window_handles[0])
- def download_excel2(driver, link, year, month, title, download_dir):
- download_dir = Path(download_dir)
- download_dir.mkdir(parents=True, exist_ok=True)
- try:
- log.info(f"正在点击链接:{title}")
- link.click()
- log.info("等待文件下载完成...")
- downloaded_file = wait_for_download(download_dir)
- if not downloaded_file.suffix:
- downloaded_file = downloaded_file.with_suffix('.xlsx')
- final_dir = download_dir / f'{year}' / f'{month:02d}'
- final_dir.mkdir(parents=True, exist_ok=True)
- final_path = final_dir / f'{title}{downloaded_file.suffix}'
- if final_path.exists():
- final_path.unlink()
- downloaded_file.rename(final_path)
- log.info(f"√ 文件已保存至:{final_path}")
- except TimeoutError as te:
- log.info(f"[错误] 文件下载超时:{te}")
- raise
- except Exception as e:
- log.info(f"[错误] 发生异常:{e}")
- log.info(traceback.format_exc())
- raise
- # def wait_for_download(directory):
- # """文件下载监控(只读取文件,忽略文件夹)"""
- # start_time = time.time()
- # while (time.time() - start_time) < DOWNLOAD_TIMEOUT:
- # files = [
- # f for f in directory.glob('*')
- # if f.is_file() and not f.name.endswith(('.part', '.crdownload'))
- # ]
- # if files:
- # # 按照创建时间排序并返回最新文件
- # return max(files, key=lambda x: x.stat().st_ctime)
- # time.sleep(1)
- # raise TimeoutError("文件下载超时")
- def wait_for_download(directory):
- """文件下载监控(只读取文件,忽略文件夹)"""
- start_time = time.time()
- while (time.time() - start_time) < DOWNLOAD_TIMEOUT:
- with download_lock:
- files = [
- f for f in directory.glob('*')
- if f.is_file() and not f.name.endswith(('.part', '.crdownload'))
- ]
- if files:
- # 按照创建时间排序并返回最新文件
- return max(files, key=lambda x: x.stat().st_ctime)
- time.sleep(1)
- raise TimeoutError("文件下载超时")
|