selenium_guangdong_city.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. import argparse
  2. import random
  3. import re
  4. import time
  5. from datetime import datetime, timedelta
  6. from selenium import webdriver
  7. from selenium.common import TimeoutException
  8. from selenium.webdriver.common.by import By
  9. from selenium.webdriver.support import expected_conditions as EC
  10. from selenium.webdriver.support.ui import WebDriverWait
  11. from crossborder.guangdong.guangdong_gongbei_parse_excel import parse_page_region_data, calculate_monthly_data
  12. from crossborder.utils.db_helper import DBHelper
  13. from crossborder.guangdong.guangdong_sub_customs_parse_excel import parse_excel
  14. from crossborder.utils.constants import DOWNLOAD_DIR, CUSTOMS_CITY_MAPPING
  15. from crossborder.utils.constants import GUANGDONG_CUSTOMS_URL
  16. from crossborder.utils.dingtalk import send_dingtalk_message
  17. from crossborder.utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel, download_excel2, \
  18. batch_download_excel
  19. from crossborder.utils.log import get_logger
  20. log = get_logger(__name__)
  21. from crossborder.utils.parse_utils import traverse_and_process
  22. download_dir = DOWNLOAD_DIR / "guangdong"
  23. def generate_target_title(check_year, check_month, customs_name):
  24. """生成正则匹配的标题模式"""
  25. global target_title
  26. if customs_name == "广州海关":
  27. return rf'{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月广州关区所辖7地市进出口综合统计资料'
  28. elif customs_name == "深圳海关":
  29. return rf"{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月(深圳海关|深圳关区)综合统计资料"
  30. elif customs_name == "拱北海关":
  31. return rf"\S+市{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月对外贸易进出口统计表"
  32. elif customs_name == "汕头海关":
  33. return rf"5市报表{check_year}年(?:1[--]\s*{check_month}月|{check_month}月)(人民币)"
  34. elif customs_name == "黄埔海关":
  35. return rf"{check_year}年\s*(?:1[--]\s*)?{check_month}月东莞市进出口企业性质总值表"
  36. elif customs_name == "江门海关":
  37. if check_month == 3:
  38. target_title = rf"{check_year}年\s*(?:一季度|前{check_month}个月|\s*{check_month}月)[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)"
  39. elif check_month == 12:
  40. target_title = rf"{check_year}年(?:\s*{check_month}月)?\s*[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)"
  41. else:
  42. target_title = rf"{check_year}年\s*前?{check_month}个?月.*外贸进出口有关情况统计表(以人民币计价)"
  43. return target_title
  44. elif customs_name == "湛江海关":
  45. if check_month == 3:
  46. target_title = rf"{check_year}年\s*(?:一季度|前3个月|3月).*外贸进出口数据"
  47. elif check_month == 9:
  48. target_title = rf"{check_year}年\s*(?:前三季度|前9个月|9月).*外贸进出口数据"
  49. elif check_month == 12:
  50. target_title = rf'^{check_year}年(?:及{check_month}月份)?湛江市、茂名市(?:外贸)?进出口数据'
  51. else:
  52. target_title = rf"{check_year}年\s*前?{check_month}个?月.*(外贸)?进出口数据"
  53. return target_title
  54. else:
  55. return rf"{check_year}\s*年\s*(?:1[--]\s*)??{check_month}月{customs_name}进出口综合统计资料"
  56. def detect_latest_month(driver,customs_name):
  57. """三级回溯智能检测最新有效月份"""
  58. current_date = datetime.now()
  59. for offset in range(0, 3):
  60. check_date = current_date - timedelta(days=offset * 30)
  61. check_year = check_date.year
  62. check_month = check_date.month
  63. # 根据海关名称生成对应的标题
  64. target_title = generate_target_title(check_year, check_month, customs_name)
  65. try:
  66. WebDriverWait(driver, 10).until(
  67. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  68. )
  69. # 获取所有 <a> 标签
  70. links = driver.find_elements(By.XPATH, '//a[@title]')
  71. # 使用 Python 正则匹配 title
  72. for link in links:
  73. title = link.get_attribute('title')
  74. if re.search(target_title, title, re.IGNORECASE):
  75. log.info(f"【{customs_name}】最新月份数据 {check_year}-{check_month}:{title}")
  76. return check_year, check_month
  77. except Exception as e:
  78. log.info(f"未找到 {target_title}: {e}")
  79. continue
  80. raise Exception("三个月内未找到有效数据")
  81. def process_month_data(driver, year, month, customs_name,found_count, max_retries=3):
  82. """带重试机制的表格数据处理"""
  83. target_title = generate_target_title(year, month, customs_name)
  84. links = driver.find_elements(By.XPATH, '//a[@title]')
  85. for link in links:
  86. try:
  87. title = link.get_attribute('title')
  88. if re.search(target_title, title, re.IGNORECASE):
  89. # log.info(f"【{customs_name}】匹配到目标: {title}")
  90. url = link.get_attribute("href")
  91. for attempt in range(max_retries):
  92. try:
  93. if customs_name in ['汕头海关', '江门海关']:
  94. download_excel2(driver, link, year, month, title, download_dir)
  95. elif customs_name in ['湛江海关', '广州海关']:
  96. batch_download_excel(driver, url, year, month, title, download_dir)
  97. elif customs_name == "拱北海关":
  98. parse_page_region_data(driver, url, year, month, title)
  99. else:
  100. download_excel(driver, url, year, month, title, download_dir)
  101. found_count += 1
  102. time.sleep(random.uniform(0.5, 1.5)) # 下载间隔
  103. break
  104. except Exception as e:
  105. log.info(f"【{customs_name}】第 {attempt + 1} 次重试失败: {str(e)}")
  106. if attempt + 1 == max_retries:
  107. log.info(f"【{customs_name}】已达最大重试次数,放弃采集: {title}")
  108. except Exception as e:
  109. log.info(f"无法获取 title 属性: {e}")
  110. log.info(f"本页找到{found_count}个有效表格")
  111. return found_count
  112. def reverse_crawler(driver, target_months, customs_name):
  113. """逆向分页抓取核心(优化分页逻辑)"""
  114. processed_months = set()
  115. # target_months = [(2023, 5), (2023, 4)]
  116. page = 1
  117. for year, month in target_months:
  118. log.info(f"开始处理{customs_name} {year}年{month}月数据".center(55, "="))
  119. WebDriverWait(driver, 15).until(
  120. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  121. )
  122. found_tables = 0
  123. table_nums = 1
  124. if customs_name == "拱北海关" or customs_name == "江门海关":
  125. table_nums = 2
  126. while True:
  127. # 智能等待页面稳定
  128. random_sleep(base=2, variance=3)
  129. try:
  130. log.info(f"【{customs_name}】当前页面:{driver.current_url}, 第{page}页")
  131. # 处理当前页面的表格数据
  132. found = process_month_data(driver, year, month ,customs_name,found_tables)
  133. found_tables += found
  134. # 完成四个表格采集
  135. if found_tables >= table_nums:
  136. log.info(f"【{customs_name}】已完成{year}年{month}月全部表格采集")
  137. processed_months.add((year, month))
  138. break
  139. log.info(f"【{customs_name}】第{page}页已采集表格数:{found_tables}/{table_nums},前往下一页采集")
  140. # 分页操作(增强定位稳定性)
  141. WebDriverWait(driver, 15).until(
  142. EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]'))
  143. ).click()
  144. page += 1
  145. except TimeoutException:
  146. log.info(f"未找到更多分页,已采集表格数:{found_tables}/{table_nums}")
  147. break
  148. except Exception as e:
  149. log.info(f"分页异常:{str(e)}")
  150. handle_retry(driver) # 异常恢复函数
  151. break
  152. return processed_months
  153. def handle_retry(driver):
  154. """异常恢复处理"""
  155. try:
  156. driver.refresh()
  157. WebDriverWait(driver, 15).until(
  158. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  159. )
  160. log.info("浏览器异常已恢复")
  161. except:
  162. log.info("需要人工干预的严重错误")
  163. raise
  164. def random_sleep(base=2, variance=5):
  165. """智能随机等待"""
  166. sleep_time = base + random.random() * variance
  167. time.sleep(sleep_time)
  168. # def process_customs(customs_name, args):
  169. # """处理单个海关的数据抓取任务"""
  170. # options = configure_stealth_options(download_dir)
  171. # driver = webdriver.Firefox(options=options)
  172. #
  173. # try:
  174. # driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
  175. # valid_year, valid_month = detect_latest_month(driver, customs_name)
  176. # log.info(f"检测到{customs_name}最新有效数据:{valid_year}-{valid_month:02d}")
  177. #
  178. # if customs_name in ['汕头海关', '拱北海关', '江门海关']:
  179. # skip_january = False
  180. # else:
  181. # skip_january = True
  182. #
  183. # if args.year:
  184. # target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january)
  185. # else:
  186. # target_months = generate_month_sequence(valid_year, valid_month)
  187. #
  188. # log.info(f"目标采集月份序列:{target_months}")
  189. # reverse_crawler(driver, target_months, customs_name)
  190. # log.info(f"{customs_name} {len(target_months)}个月份数据已采集完毕")
  191. # return customs_name, True
  192. # except Exception as e:
  193. # log.info(f"[错误] 采集失败:{customs_name} - {str(e)}")
  194. # return customs_name, False
  195. # finally:
  196. # driver.quit()
  197. #
  198. #
  199. # def main():
  200. # parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  201. # parser.add_argument('--year', type=int, default=None,
  202. # help='终止年份(如2023),未指定时抓取最新两个月')
  203. # args = parser.parse_args()
  204. #
  205. # customs_list = GUANGDONG_CUSTOMS_URL.keys()
  206. #
  207. # # 使用线程池并发采集
  208. # with ThreadPoolExecutor(max_workers=3) as executor:
  209. # futures = []
  210. # for customs_name in customs_list:
  211. # future = executor.submit(process_customs, customs_name, args)
  212. # futures.append(future)
  213. #
  214. # for future in as_completed(futures):
  215. # customs_name, success = future.result()
  216. # if success:
  217. # log.info(f"[完成] {customs_name} 数据采集成功")
  218. # else:
  219. # log.info(f"[失败] {customs_name} 数据采集失败")
  220. #
  221. # log.info("\n广东省所有海关数据采集完成。")
  222. def main():
  223. """主入口(广东分海关优化版)"""
  224. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  225. parser.add_argument('--year', type=int, default=None,
  226. help='终止年份(如2023),未指定时抓取最新两个月')
  227. args = parser.parse_args()
  228. start_time = time.time()
  229. # 状态跟踪变量
  230. total_months_count = 0 # 总采集月份数
  231. customs_collected = [] # 成功采集的海关名单
  232. data_collected = False # 是否有数据采集
  233. all_customs_processed = [] # 已处理海关列表
  234. log.info("【广东省】分海关数据采集开始".center(66, "*"))
  235. driver = None
  236. try:
  237. # 1. 初始化浏览器
  238. driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
  239. log.info("浏览器初始化完成")
  240. # 2. 遍历各海关
  241. for customs_name in GUANGDONG_CUSTOMS_URL.keys():
  242. try:
  243. log.info(f"\n{'=' * 66}\n【{customs_name}】数据采集开始".center(66, "="))
  244. # 进入海关页面
  245. driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
  246. # 检测最新有效月份
  247. valid_year, valid_month = detect_latest_month(driver, customs_name)
  248. log.info(f"【{customs_name}】检测到最新有效数据:{valid_year}-{valid_month:02d}")
  249. # 设置是否跳过1月数据的标志
  250. skip_january = customs_name not in ['汕头海关', '拱北海关']
  251. # 3. 生成目标月份序列
  252. if args.year:
  253. # 指定年份时:从最新月到目标年1月
  254. target_months = generate_month_sequence(
  255. start_year=valid_year,
  256. start_month=valid_month,
  257. end_year=args.year,
  258. skip_january=skip_january
  259. )
  260. else:
  261. # 未指定年份时:检查最新月份是否已存在
  262. db = DBHelper()
  263. count = db.get_code_exist(
  264. f'{valid_year}-{valid_month:02d}',
  265. "440000",
  266. is_city=True,
  267. customs_name=customs_name
  268. )
  269. if count > 0:
  270. log.warning(f"⏩ 跳过【{customs_name}】- 数据库已存在{CUSTOMS_CITY_MAPPING[customs_name]} {valid_year}-{valid_month:02d} 数据")
  271. continue
  272. # 未指定年份时:取最近两个月
  273. target_months = generate_month_sequence(
  274. start_year=valid_year,
  275. start_month=valid_month
  276. )
  277. # 记录目标月份
  278. total_months_count += len(target_months)
  279. data_collected = True
  280. log.info(f"【{customs_name}】目标采集月份:{len(target_months)}个月份")
  281. # 4. 执行数据采集
  282. if target_months: # 确保有月份需要采集
  283. reverse_crawler(driver, target_months, customs_name)
  284. customs_collected.append(customs_name)
  285. log.info(f"【{customs_name}】{len(target_months)}个月份采集完成")
  286. # 拱北海关特殊处理
  287. if customs_name == '拱北海关':
  288. for year, month in target_months:
  289. log.info(f"🔢 【拱北海关】计算 {year}-{month:02d} 单月数据...")
  290. calculate_monthly_data(year, month)
  291. # 添加分隔线
  292. log.info(f"【{customs_name}】处理完成".center(66, "="))
  293. except Exception as e:
  294. # 捕获单个海关采集异常
  295. log.exception(f"⚠️ 【{customs_name}】采集过程中发生错误: {str(e)}")
  296. send_dingtalk_message(f"【{customs_name}】海关采集异常: {str(e)}")
  297. finally:
  298. # 记录已处理海关
  299. all_customs_processed.append(customs_name)
  300. # 5. 所有海关处理完成后
  301. if data_collected:
  302. log.info(f"\n{'=' * 66}\n【广东省】所有海关处理完成,开始数据清洗入库")
  303. log.info("数据清洗入库中...")
  304. traverse_and_process(download_dir, parse_excel, province_name="guangdong", year=args.year)
  305. log.info("广东省地级市数据同比更新中...")
  306. db_helper = DBHelper()
  307. db_helper.update_prov_yoy("广东省")
  308. log.info("地级市数据同比更新完成")
  309. # 计算总耗时
  310. duration = time.time() - start_time
  311. minutes, seconds = divmod(duration, 60)
  312. # 准备通知信息
  313. if customs_collected:
  314. customs_str = "、".join(customs_collected)
  315. month_info = f"{total_months_count}个月份"
  316. else:
  317. customs_str = "无海关数据被采集"
  318. month_info = "0个月份"
  319. message = (
  320. f"【广东省海关数据采集完成】\n"
  321. f"• 已处理海关: {len(all_customs_processed)}个\n"
  322. f"• 成功采集海关: {len(customs_collected)}个\n"
  323. f"• 采集海关: {customs_str}\n"
  324. f"• 总采集月份: {month_info}\n"
  325. f"• 总耗时: {int(minutes)}分{seconds:.1f}秒"
  326. )
  327. send_dingtalk_message(message)
  328. else:
  329. log.warning("本次未采集到任何新数据")
  330. # send_dingtalk_message("【广东省海关采集】所有海关最新月份数据已存在,未执行采集")
  331. except Exception as e:
  332. # 全局异常捕获
  333. log.exception(f"‼️ 广东省海关采集全局错误: {str(e)}")
  334. send_dingtalk_message(f"【广东海关采集异常】全局错误: {str(e)}")
  335. finally:
  336. # 确保浏览器安全退出
  337. if driver:
  338. driver.quit()
  339. log.info("浏览器已安全退出")
  340. log.info("【广东省】分海关数据采集结束".center(66, "*"))
  341. if __name__ == "__main__":
  342. main()