selenium_henan_download.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import argparse
  2. import argparse
  3. import random
  4. import re
  5. import time
  6. import traceback
  7. from datetime import datetime, timedelta
  8. from selenium import webdriver
  9. from selenium.common import TimeoutException
  10. from selenium.webdriver.common.by import By
  11. from selenium.webdriver.support import expected_conditions as EC
  12. from selenium.webdriver.support.ui import WebDriverWait
  13. from henan.henan_parse_excel import parse_excel
  14. from utils.constants import DOWNLOAD_DIR
  15. from utils.download_utils import configure_stealth_options, get_previous_month, download_excel, generate_month_sequence
  16. from utils.parse_utils import traverse_and_process
  17. # 基础配置
  18. MAX_RETRY = 3
  19. DOWNLOAD_TIMEOUT = 60
  20. BASE_URL = "http://zhengzhou.customs.gov.cn/zhengzhou_customs/zfxxgk97/2967383/2967458/501407/0e9d768a-1.html"
  21. download_dir = DOWNLOAD_DIR / "henan"
  22. def detect_latest_month(driver):
  23. """三级回溯智能检测最新有效月份(使用正则简化匹配)"""
  24. driver.get(BASE_URL)
  25. current_date = datetime.now()
  26. for offset in range(0, 3):
  27. check_date = current_date - timedelta(days=offset * 30)
  28. check_year = check_date.year
  29. check_month = check_date.month
  30. # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许年/月前后有空格
  31. pattern = re.compile(
  32. rf'{check_year}\s*年\s*(1至)?{check_month}\s*月\s*河南省进出口商品国别\(地区\)总值表',
  33. re.IGNORECASE
  34. )
  35. try:
  36. # 使用 Python 端的正则匹配所有链接 title
  37. elements = WebDriverWait(driver, 10).until(
  38. EC.presence_of_all_elements_located((By.XPATH, '//a'))
  39. )
  40. for element in elements:
  41. title = element.get_attribute("title")
  42. if pattern.search(title):
  43. print(f"已找到最新月份数据 {check_year}-{check_month}")
  44. return check_year, check_month
  45. print(f"未找到匹配项(正则:{pattern.pattern})")
  46. except TimeoutException:
  47. print(f"页面加载超时或无匹配项({check_year}-{check_month})")
  48. continue
  49. raise Exception("三个月内未找到有效数据")
  50. def process_month_data(driver, year, month):
  51. """兼容多种格式,确保三种表格都能识别并下载"""
  52. # 定义三类目标标题模板
  53. title_templates = [
  54. f"{year}年1至{month}月河南省出口主要商品量值表",
  55. f"{year}年1至{month}月河南省进口主要商品量值表",
  56. f"{year}年1至{month}月河南省进出口商品国别(地区)总值表"
  57. ]
  58. # 构建正则匹配模板(支持“年X月”、“年1至X月”,并允许前后有空格)
  59. patterns = [
  60. re.compile(
  61. rf'{year}\s*年\s*(1至)?{month}\s*月\s*河南省(?:出口主要商品|进口主要商品|进出口商品国别[$(|$(]地区[$)|$)])(量值表|总值表)',
  62. re.IGNORECASE
  63. )
  64. for _ in [month]
  65. ]
  66. found_count = 0
  67. links = driver.find_elements(By.XPATH, '//a[contains(@title,"河南省")]')
  68. for link in links:
  69. title = link.get_attribute("title")
  70. if any(pattern.search(title) for pattern in patterns):
  71. retry = 0
  72. max_retries = 3 # 最大重试次数
  73. success = False
  74. while retry < max_retries and not success:
  75. try:
  76. url = link.get_attribute("href")
  77. download_excel(driver, url, year, month, title, download_dir)
  78. found_count += 1
  79. time.sleep(random.uniform(0.5, 1.5)) # 下载间隔
  80. success = True # 成功则跳出循环
  81. except Exception as e:
  82. retry += 1
  83. print(f"下载 {title} 失败(第{retry}次重试): {e}")
  84. traceback.print_exc()
  85. if retry < max_retries:
  86. time.sleep(random.uniform(2, 5)) # 随机等待后再试
  87. else:
  88. print(f"{title} 下载已达到最大重试次数,跳过该文件。")
  89. print(f"本页找到{found_count}个有效表格")
  90. return found_count
  91. def reverse_crawler(driver, target_months):
  92. """逆向分页抓取核心逻辑"""
  93. processed_months = set()
  94. # target_months = [(2023, 5), (2023, 4)]
  95. page = 1
  96. for year, month in target_months:
  97. print(f"\n开始处理 {year}年{month}月数据".center(50, "="))
  98. WebDriverWait(driver, 15).until(
  99. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  100. )
  101. current_page = 1
  102. found_tables = 0
  103. while True:
  104. # 智能等待页面稳定
  105. random_sleep(base=2, variance=3)
  106. try:
  107. print(f"当前页面:{driver.current_url}, 第{page}页")
  108. # 处理当前页面的表格数据
  109. found = process_month_data(driver, year, month)
  110. found_tables += found
  111. # 完成四个表格采集
  112. if found_tables >= 3:
  113. print(f"已完成{year}年{month}月全部表格采集")
  114. processed_months.add((year, month))
  115. break
  116. print(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集")
  117. # 分页操作(增强定位稳定性)
  118. WebDriverWait(driver, 15).until(
  119. EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]'))
  120. ).click()
  121. current_page += 1
  122. page += 1
  123. except TimeoutException:
  124. print(f"未找到更多分页,已采集表格数:{found_tables}/3")
  125. break
  126. except Exception as e:
  127. print(f"分页异常:{str(e)}")
  128. handle_retry(driver) # 异常恢复函数
  129. break
  130. return processed_months
  131. def extract_page_date(driver):
  132. """增强型页面日期提取"""
  133. try:
  134. date_str = WebDriverWait(driver, 10).until(
  135. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  136. ).get_attribute("innerHTML")
  137. match = re.search(r"(\d{4})年(\d{1,2})月", date_str)
  138. return int(match.group(1)), int(match.group(2))
  139. except:
  140. return datetime.now().year, datetime.now().month
  141. def random_sleep(base=2, variance=5):
  142. """智能随机等待"""
  143. sleep_time = base + random.random() * variance
  144. time.sleep(sleep_time)
  145. def handle_retry(driver):
  146. """异常恢复处理"""
  147. try:
  148. driver.refresh()
  149. WebDriverWait(driver, 15).until(
  150. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  151. )
  152. print("浏览器异常已恢复")
  153. except:
  154. print("需要人工干预的严重错误")
  155. raise
  156. def main():
  157. """主入口(优化参数处理逻辑)"""
  158. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  159. parser.add_argument('--year', type=int, default=None,
  160. help='终止年份(如2023),未指定时抓取最新两个月')
  161. args = parser.parse_args()
  162. driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
  163. try:
  164. # 智能检测最新有效月份
  165. valid_year, valid_month = detect_latest_month(driver)
  166. print(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月")
  167. # 生成目标序列
  168. if args.year:
  169. # 指定年份时:从最新月到目标年1月
  170. target_months = generate_month_sequence(
  171. start_year=valid_year,
  172. start_month=valid_month,
  173. end_year=args.year,
  174. skip_january=True
  175. )
  176. else:
  177. # 未指定年份时:取最近两个月
  178. target_months = generate_month_sequence(valid_year, valid_month)
  179. print(f"目标采集月份序列:{target_months}")
  180. reverse_crawler(driver, target_months)
  181. print(f"{len(target_months)}个月份数据已采集完毕")
  182. finally:
  183. driver.quit()
  184. print("\n数据清洗入库中...")
  185. traverse_and_process(download_dir, parse_excel, province_name="henan")
  186. if __name__ == "__main__":
  187. main()