selenium_guangdong_city.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import argparse
  2. import random
  3. import re
  4. import time
  5. from datetime import datetime, timedelta
  6. from selenium import webdriver
  7. from selenium.common import TimeoutException
  8. from selenium.webdriver.common.by import By
  9. from selenium.webdriver.support import expected_conditions as EC
  10. from selenium.webdriver.support.ui import WebDriverWait
  11. from crossborder.utils.db_helper import DBHelper
  12. from guangdong.guangdong_gongbei_parse_excel import parse_region_table, calculate_monthly_data
  13. from guangdong.guangdong_sub_customs_parse_excel import parse_excel
  14. from crossborder.utils.constants import DOWNLOAD_DIR
  15. from crossborder.utils.constants import GUANGDONG_CUSTOMS_URL
  16. from crossborder.utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel, download_excel2, \
  17. batch_download_excel
  18. from crossborder.utils.log import log
  19. from crossborder.utils.parse_utils import traverse_and_process
  20. download_dir = DOWNLOAD_DIR / "guangdong"
  21. def generate_target_title(check_year, check_month, customs_name):
  22. """生成正则匹配的标题模式"""
  23. global target_title
  24. if customs_name == "广州海关":
  25. return rf'{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月广州关区所辖7地市进出口综合统计资料'
  26. elif customs_name == "深圳海关":
  27. return rf"{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月(深圳海关|深圳关区)综合统计资料"
  28. elif customs_name == "拱北海关":
  29. return rf"\S+市{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月对外贸易进出口统计表"
  30. elif customs_name == "汕头海关":
  31. return rf"5市报表{check_year}年(?:1[--]\s*{check_month}月|{check_month}月)(人民币)"
  32. elif customs_name == "黄埔海关":
  33. return rf"{check_year}年\s*(?:1[--]\s*)?{check_month}月东莞市进出口企业性质总值表"
  34. elif customs_name == "江门海关":
  35. if check_month == 3:
  36. target_title = rf"{check_year}年\s*(?:一季度|前{check_month}个月|\s*{check_month}月)[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)"
  37. elif check_month == 12:
  38. target_title = rf"{check_year}年(?:\s*{check_month}月)?\s*[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)"
  39. else:
  40. target_title = rf"{check_year}年\s*前?{check_month}个?月.*外贸进出口有关情况统计表(以人民币计价)"
  41. return target_title
  42. elif customs_name == "湛江海关":
  43. if check_month == 3:
  44. target_title = rf"{check_year}年\s*(?:一季度|前3个月|3月).*外贸进出口数据"
  45. elif check_month == 9:
  46. target_title = rf"{check_year}年\s*(?:前三季度|前9个月|9月).*外贸进出口数据"
  47. elif check_month == 12:
  48. target_title = rf'^{check_year}年(?:及{check_month}月份)?湛江市、茂名市(?:外贸)?进出口数据'
  49. else:
  50. target_title = rf"{check_year}年\s*前?{check_month}个?月.*(外贸)?进出口数据"
  51. return target_title
  52. else:
  53. return rf"{check_year}\s*年\s*(?:1[--]\s*)??{check_month}月{customs_name}进出口综合统计资料"
  54. def detect_latest_month(driver,customs_name):
  55. """三级回溯智能检测最新有效月份"""
  56. current_date = datetime.now()
  57. for offset in range(0, 3):
  58. check_date = current_date - timedelta(days=offset * 30)
  59. check_year = check_date.year
  60. check_month = check_date.month
  61. # 根据海关名称生成对应的标题
  62. target_title = generate_target_title(check_year, check_month, customs_name)
  63. try:
  64. WebDriverWait(driver, 10).until(
  65. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  66. )
  67. # 获取所有 <a> 标签
  68. links = driver.find_elements(By.XPATH, '//a[@title]')
  69. # 使用 Python 正则匹配 title
  70. for link in links:
  71. title = link.get_attribute('title')
  72. if re.search(target_title, title, re.IGNORECASE):
  73. log.info(f"【{customs_name}】最新月份数据 {check_year}-{check_month}:{title}")
  74. return check_year, check_month
  75. except Exception as e:
  76. log.info(f"未找到 {target_title}: {e}")
  77. continue
  78. raise Exception("三个月内未找到有效数据")
  79. def process_month_data(driver, year, month, customs_name,found_count, max_retries=3):
  80. """带重试机制的表格数据处理"""
  81. target_title = generate_target_title(year, month, customs_name)
  82. links = driver.find_elements(By.XPATH, '//a[@title]')
  83. for link in links:
  84. try:
  85. title = link.get_attribute('title')
  86. if re.search(target_title, title, re.IGNORECASE):
  87. # log.info(f"【{customs_name}】匹配到目标: {title}")
  88. url = link.get_attribute("href")
  89. for attempt in range(max_retries):
  90. try:
  91. if customs_name in ['汕头海关', '江门海关']:
  92. download_excel2(driver, link, year, month, title, download_dir)
  93. elif customs_name in ['湛江海关', '广州海关']:
  94. batch_download_excel(driver, url, year, month, title, download_dir)
  95. elif customs_name == "拱北海关":
  96. parse_region_table(driver, url, year, month, title)
  97. else:
  98. download_excel(driver, url, year, month, title, download_dir)
  99. found_count += 1
  100. time.sleep(random.uniform(0.5, 1.5)) # 下载间隔
  101. break
  102. except Exception as e:
  103. log.info(f"【{customs_name}】第 {attempt + 1} 次重试失败: {str(e)}")
  104. if attempt + 1 == max_retries:
  105. log.info(f"【{customs_name}】已达最大重试次数,放弃采集: {title}")
  106. except Exception as e:
  107. log.info(f"无法获取 title 属性: {e}")
  108. log.info(f"本页找到{found_count}个有效表格")
  109. return found_count
  110. def reverse_crawler(driver, target_months, customs_name):
  111. """逆向分页抓取核心(优化分页逻辑)"""
  112. processed_months = set()
  113. # target_months = [(2023, 5), (2023, 4)]
  114. page = 1
  115. for year, month in target_months:
  116. log.info(f"开始处理{customs_name} {year}年{month}月数据".center(55, "="))
  117. WebDriverWait(driver, 15).until(
  118. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  119. )
  120. found_tables = 0
  121. table_nums = 1
  122. if customs_name == "拱北海关" or customs_name == "江门海关":
  123. table_nums = 2
  124. while True:
  125. # 智能等待页面稳定
  126. random_sleep(base=2, variance=3)
  127. try:
  128. log.info(f"【{customs_name}】当前页面:{driver.current_url}, 第{page}页")
  129. # 处理当前页面的表格数据
  130. found = process_month_data(driver, year, month ,customs_name,found_tables)
  131. found_tables += found
  132. # 完成四个表格采集
  133. if found_tables >= table_nums:
  134. log.info(f"【{customs_name}】已完成{year}年{month}月全部表格采集")
  135. processed_months.add((year, month))
  136. break
  137. log.info(f"【{customs_name}】第{page}页已采集表格数:{found_tables}/{table_nums},前往下一页采集")
  138. # 分页操作(增强定位稳定性)
  139. WebDriverWait(driver, 15).until(
  140. EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]'))
  141. ).click()
  142. page += 1
  143. except TimeoutException:
  144. log.info(f"未找到更多分页,已采集表格数:{found_tables}/{table_nums}")
  145. break
  146. except Exception as e:
  147. log.info(f"分页异常:{str(e)}")
  148. handle_retry(driver) # 异常恢复函数
  149. break
  150. return processed_months
  151. def handle_retry(driver):
  152. """异常恢复处理"""
  153. try:
  154. driver.refresh()
  155. WebDriverWait(driver, 15).until(
  156. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  157. )
  158. log.info("浏览器异常已恢复")
  159. except:
  160. log.info("需要人工干预的严重错误")
  161. raise
  162. def random_sleep(base=2, variance=5):
  163. """智能随机等待"""
  164. sleep_time = base + random.random() * variance
  165. time.sleep(sleep_time)
  166. # def process_customs(customs_name, args):
  167. # """处理单个海关的数据抓取任务"""
  168. # options = configure_stealth_options(download_dir)
  169. # driver = webdriver.Firefox(options=options)
  170. #
  171. # try:
  172. # driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
  173. # valid_year, valid_month = detect_latest_month(driver, customs_name)
  174. # log.info(f"检测到{customs_name}最新有效数据:{valid_year}-{valid_month:02d}")
  175. #
  176. # if customs_name in ['汕头海关', '拱北海关', '江门海关']:
  177. # skip_january = False
  178. # else:
  179. # skip_january = True
  180. #
  181. # if args.year:
  182. # target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january)
  183. # else:
  184. # target_months = generate_month_sequence(valid_year, valid_month)
  185. #
  186. # log.info(f"目标采集月份序列:{target_months}")
  187. # reverse_crawler(driver, target_months, customs_name)
  188. # log.info(f"{customs_name} {len(target_months)}个月份数据已采集完毕")
  189. # return customs_name, True
  190. # except Exception as e:
  191. # log.info(f"[错误] 采集失败:{customs_name} - {str(e)}")
  192. # return customs_name, False
  193. # finally:
  194. # driver.quit()
  195. #
  196. #
  197. # def main():
  198. # parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  199. # parser.add_argument('--year', type=int, default=None,
  200. # help='终止年份(如2023),未指定时抓取最新两个月')
  201. # args = parser.parse_args()
  202. #
  203. # customs_list = GUANGDONG_CUSTOMS_URL.keys()
  204. #
  205. # # 使用线程池并发采集
  206. # with ThreadPoolExecutor(max_workers=3) as executor:
  207. # futures = []
  208. # for customs_name in customs_list:
  209. # future = executor.submit(process_customs, customs_name, args)
  210. # futures.append(future)
  211. #
  212. # for future in as_completed(futures):
  213. # customs_name, success = future.result()
  214. # if success:
  215. # log.info(f"[完成] {customs_name} 数据采集成功")
  216. # else:
  217. # log.info(f"[失败] {customs_name} 数据采集失败")
  218. #
  219. # log.info("\n广东省所有海关数据采集完成。")
  220. def main():
  221. """主入口(优化参数处理逻辑)"""
  222. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  223. parser.add_argument('--year', type=int, default=None,
  224. help='终止年份(如2023),未指定时抓取最新两个月')
  225. args = parser.parse_args()
  226. driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
  227. for customs_name in GUANGDONG_CUSTOMS_URL.keys():
  228. try:
  229. driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
  230. log.info(f"【{customs_name}】数据采集开始……")
  231. valid_year, valid_month = detect_latest_month(driver, customs_name)
  232. log.info(f"【{customs_name}】检测到最新有效数据:{valid_year}-{valid_month:02d}")
  233. if customs_name in ['汕头海关', '拱北海关']:
  234. skip_january = False
  235. else:
  236. skip_january = True
  237. if args.year:
  238. target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january)
  239. else:
  240. target_months = generate_month_sequence(valid_year, valid_month)
  241. log.info(f"【{customs_name}】目标采集月份序列:{target_months}")
  242. reverse_crawler(driver, target_months, customs_name)
  243. if customs_name == '拱北海关':
  244. for year, month in target_months:
  245. log.info(f"【{customs_name}】{year}-{month:02d}单月数据计算中...")
  246. calculate_monthly_data(year, month)
  247. log.info(f"【{customs_name}】{len(target_months)}个月份数据已采集完毕".center(66, "="))
  248. finally:
  249. pass
  250. driver.quit()
  251. log.info("【广东省】数据抓取结束".center(66, "*"))
  252. log.info("\n广东省数据清洗入库中...")
  253. traverse_and_process(download_dir, parse_excel, province_name="guangdong")
  254. log.info("\n广东省地级市数据同比更新中...")
  255. db_helper = DBHelper()
  256. db_helper.update_prov_yoy("广东省")
  257. log.info("\n广东省地级市数据同比更新结束")
  258. if __name__ == "__main__":
  259. main()