selenium_fujian_download.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import argparse
  2. import random
  3. import time
  4. from datetime import datetime, timedelta
  5. from selenium import webdriver
  6. from selenium.common import TimeoutException
  7. from selenium.webdriver.common.by import By
  8. from selenium.webdriver.support import expected_conditions as EC
  9. from selenium.webdriver.support.ui import WebDriverWait
  10. from fujian.fujian_parse_excel import parse_excel
  11. from utils.constants import DOWNLOAD_DIR
  12. from utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel
  13. from utils.parse_utils import traverse_and_process
  14. # 基础配置
  15. MAX_RETRY = 3
  16. BASE_URL = "http://fuzhou.customs.gov.cn/fuzhou_customs/zfxxgk19/2963574/2963954/484131/index.html"
  17. download_dir = DOWNLOAD_DIR / "fujian"
  18. def detect_latest_month(driver):
  19. """三级回溯智能检测最新有效月份"""
  20. driver.get(BASE_URL)
  21. current_date = datetime.now()
  22. for offset in range(0, 3):
  23. check_date = current_date - timedelta(days=offset * 30)
  24. check_year = check_date.year
  25. check_month = check_date.month
  26. target_title = f"{check_year}年{check_month}月和1-{check_month}月福建省外贸进出口情况表(分地市)"
  27. try:
  28. WebDriverWait(driver, 10).until(
  29. EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
  30. )
  31. print(f"已找到最新月份数据 {check_year}-{check_month}")
  32. return check_year, check_month
  33. except:
  34. print(f"未找到 {target_title}")
  35. continue
  36. raise Exception("三个月内未找到有效数据")
  37. def process_month_data(driver, year, month):
  38. """
  39. 处理地市贸易数据(增强1月逻辑 + 下载失败重试机制)
  40. """
  41. required_title = f"{year}年{month}月和1-{month}月福建省外贸进出口情况表(分地市)"
  42. found_count = 0
  43. # 获取所有匹配的链接
  44. links = driver.find_elements(By.XPATH, '//a[contains(@title,"福建省")]')
  45. for link in links:
  46. title = link.get_attribute("title")
  47. if title == required_title:
  48. url = link.get_attribute("href")
  49. retry = 0
  50. success = False
  51. while retry < MAX_RETRY and not success:
  52. try:
  53. download_excel(driver, url, year, month, title, download_dir)
  54. found_count += 1
  55. success = True
  56. time.sleep(random.uniform(0.5, 1.5)) # 成功后等待
  57. except Exception as e:
  58. retry += 1
  59. print(f"下载 {title} 失败(第{retry}次重试): {str(e)}")
  60. if retry >= MAX_RETRY:
  61. print(f"❌ 超出最大重试次数,跳过该文件:{title}")
  62. return 1000
  63. else:
  64. print(f"🔄 第{retry}次重试:{title}")
  65. time.sleep(random.uniform(2, 4)) # 重试前随机等待
  66. print(f"本页找到{found_count}个有效表格")
  67. return found_count
  68. def reverse_crawler(driver, target_months):
  69. """逆向分页抓取核心(优化分页逻辑)"""
  70. processed_months = set()
  71. # target_months = [(2023, 5), (2023, 4)]
  72. page = 1
  73. for year, month in target_months:
  74. print(f"\n开始处理 {year}年{month}月数据".center(50, "="))
  75. WebDriverWait(driver, 15).until(
  76. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  77. )
  78. current_page = 1
  79. found_tables = 0
  80. while True:
  81. # 智能等待页面稳定
  82. random_sleep(base=2, variance=3)
  83. try:
  84. # 动态检测当前页面月份
  85. print(f"当前页面:{driver.current_url}, 第{page}页")
  86. # 处理当前页面的表格数据
  87. found = process_month_data(driver, year, month)
  88. found_tables += found
  89. if found_tables == 1000:
  90. print(f"❌{year}年{month}月数据采集失败,跳过当前月")
  91. break
  92. # 完成四个表格采集
  93. if found_tables >= 1:
  94. print(f"已完成{year}年{month}月全部表格采集")
  95. processed_months.add((year, month))
  96. break
  97. print(f"第{page}页已采集表格数:{found_tables}/1,前往下一页采集")
  98. # 分页操作(增强定位稳定性)
  99. WebDriverWait(driver, 15).until(
  100. EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]'))
  101. ).click()
  102. current_page += 1
  103. page += 1
  104. except TimeoutException:
  105. print(f"未找到更多分页,已采集表格数:{found_tables}/1")
  106. break
  107. except Exception as e:
  108. print(f"分页异常:{str(e)}")
  109. handle_retry(driver) # 异常恢复函数
  110. break
  111. return processed_months
  112. def random_sleep(base=2, variance=5):
  113. """智能随机等待"""
  114. sleep_time = base + random.random() * variance
  115. time.sleep(sleep_time)
  116. def handle_retry(driver):
  117. """异常恢复处理"""
  118. try:
  119. driver.refresh()
  120. WebDriverWait(driver, 15).until(
  121. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  122. )
  123. print("浏览器异常已恢复")
  124. except:
  125. print("需要人工干预的严重错误")
  126. raise
  127. def main():
  128. """主入口(优化参数处理逻辑)"""
  129. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  130. parser.add_argument('--year', type=int, default=None,
  131. help='终止年份(如2023),未指定时抓取最新两个月')
  132. args = parser.parse_args()
  133. driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
  134. try:
  135. # 智能检测最新有效月份
  136. valid_year, valid_month = detect_latest_month(driver)
  137. print(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月")
  138. # 生成目标序列
  139. if args.year:
  140. # 指定年份时:从最新月到目标年1月
  141. target_months = generate_month_sequence(
  142. start_year=valid_year,
  143. start_month=valid_month,
  144. end_year=args.year,
  145. skip_january=True
  146. )
  147. else:
  148. # 未指定年份时:取最近两个月
  149. target_months = generate_month_sequence(valid_year, valid_month)
  150. print(f"目标采集月份序列:{target_months}")
  151. reverse_crawler(driver, target_months)
  152. print(f"{len(target_months)}个月份数据已采集完毕")
  153. finally:
  154. if 'driver' in locals():
  155. driver.quit()
  156. print("\n数据清洗入库中...")
  157. traverse_and_process(download_dir, parse_excel, province_name="fujian")
  158. if __name__ == "__main__":
  159. main()