crawl_gov_zhejiang_full.py 19 KB


  1. import argparse
  2. import os
  3. import random
  4. import re
  5. import time
  6. import sys
  7. from datetime import datetime, timedelta
  8. from pathlib import Path
  9. from urllib.parse import urljoin
  10. from faker import Faker
  11. from selenium import webdriver
  12. from selenium.common.exceptions import StaleElementReferenceException
  13. from selenium.webdriver import FirefoxOptions
  14. from selenium.webdriver.common.by import By
  15. from selenium.webdriver.support import expected_conditions as EC
  16. from selenium.webdriver.support.ui import WebDriverWait
  17. from crossborder.utils.base_country_code import extract_year_month
  18. from crossborder.utils.dingtalk import send_dingtalk_message
  19. from crossborder.zhejiang import download_dir
  20. from crossborder.zhejiang import gov_commodity_zhejiang_city
  21. from crossborder.zhejiang import gov_commodity_zhejiang_country
  22. from crossborder.zhejiang import gov_commodity_zhejiang_import_export
  23. from crossborder.utils import base_country_code, base_mysql
  24. from crossborder.utils.log import get_logger
  25. log = get_logger(__name__)
  26. def configure_stealth_options():
  27. """增强型反检测配置[1,4](@ref)"""
  28. opts = FirefoxOptions()
  29. print("当前下载路径:", Path(download_dir).resolve())
  30. # 文件下载配置
  31. opts.set_preference("browser.download.dir", download_dir)
  32. opts.set_preference("browser.download.folderList", 2)
  33. opts.set_preference("browser.download.manager.showWhenStarting", False)
  34. opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
  35. "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
  36. opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
  37. opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
  38. # 反检测参数
  39. opts.set_preference("dom.webdriver.enabled", False)
  40. opts.set_preference("useAutomationExtension", False)
  41. opts.add_argument("--disable-blink-features=AutomationControlled")
  42. # 动态指纹
  43. fake = Faker()
  44. opts.set_preference("general.useragent.override", fake.firefox())
  45. opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
  46. # 视口配置
  47. opts.add_argument("--width=1440")
  48. opts.add_argument("--height=900")
  49. opts.add_argument("--headless")
  50. return opts
  51. def crawl_by_year_tabs(driver, base_url, year_month):
  52. """按年份Tab导航采集数据"""
  53. years = ['2023年', '2024年', '2025年']
  54. WebDriverWait(driver, 30).until(
  55. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  56. )
  57. year_tabs = driver.find_elements(By.XPATH, '//ul[@class="nav_sj"]//li//a')
  58. for tab in year_tabs:
  59. year_text = tab.text.strip()
  60. if int(year_text[:4]) <= 2022:
  61. log.info(f"{year_text} 后的数据无需下载")
  62. continue
  63. year_url = tab.get_attribute("href")
  64. if not year_url.startswith(('http://', 'https://')):
  65. year_url = base_url.split('//')[0] + '//' + base_url.split('/')[2] + year_url
  66. # 新标签页打开年份页面
  67. driver.execute_script("window.open(arguments[0]);", year_url)
  68. driver.switch_to.window(driver.window_handles[-1])
  69. log.info(f"\n正在处理 {year_text} 年份页面")
  70. process_month_tabs(driver, year_text, base_url, year_month)
  71. # 返回主窗口
  72. driver.close()
  73. driver.switch_to.window(driver.window_handles[0])
  74. def get_current_and_previous_month(text):
  75. """
  76. 将类似 "2025年-五月" 或 "2025年十二月" 的字符串解析为:
  77. - 当前年份
  78. - 当前中文月份
  79. - 上一个月的 (年份, 中文月份)
  80. :param text: 输入文本,如 "2025年-五月"
  81. :return: tuple(current_year, current_month, previous_year, previous_month)
  82. """
  83. # 中文月份映射表
  84. month_map = {
  85. '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6,
  86. '七': 7, '八': 8, '九': 9, '十': 10, '十一': 11, '十二': 12
  87. }
  88. reverse_month_map = {v: k for k, v in month_map.items()}
  89. # 提取年份和中文月份
  90. match = re.search(r'(\d{4})年[-\s]?([一二三四五六七八九十][一|二]?月)', text)
  91. if not match:
  92. raise ValueError(f"无法从 '{text}' 提取有效的年份和月份")
  93. year_str = match.group(1)
  94. chinese_month_str = match.group(2).replace('月', '')
  95. # 处理中文月份转数字
  96. if chinese_month_str in month_map:
  97. current_month_num = month_map[chinese_month_str]
  98. elif chinese_month_str == '十月':
  99. current_month_num = 10
  100. else:
  101. raise ValueError(f"不支持的中文月份格式: {chinese_month_str}")
  102. current_year_num = int(year_str)
  103. # 计算上个月
  104. if current_month_num > 1:
  105. previous_year_num = current_year_num
  106. previous_month_num = current_month_num - 1
  107. else:
  108. previous_year_num = current_year_num - 1
  109. previous_month_num = 12
  110. # 转换为中文月份
  111. previous_month_chinese = reverse_month_map[previous_month_num]
  112. return (
  113. f"{current_year_num}年",
  114. chinese_month_str + '月',
  115. f"{previous_year_num}年",
  116. previous_month_chinese + '月'
  117. )
  118. def process_month_tabs(driver, year, base_url, year_month):
  119. """处理月份Tab导航(动态获取真实存在的月份)"""
  120. # 显式等待容器加载
  121. WebDriverWait(driver, 30).until(
  122. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  123. )
  124. target_months = ['一月', '二月', '三月', '四月', '五月', '六月',
  125. '七月', '八月', '九月', '十月', '十一月', '十二月']
  126. processed_months = set() # 已处理月份记录
  127. retry_count = 0
  128. y1, m1, y2, m2 = get_current_and_previous_month(year_month)
  129. while retry_count < 3:
  130. try:
  131. # 全量获取所有月份Tab
  132. month_items = driver.find_elements(By.XPATH, '//ul[@class="nav_tab"]//li')
  133. if not month_items:
  134. log.info(f"{year}年没有月份Tab,停止处理")
  135. break
  136. all_found = True
  137. month_text = ''
  138. found = False
  139. for i,item in enumerate(month_items):
  140. a_tag = item.find_element(By.XPATH, './/a')
  141. month_text = a_tag.text.strip()
  142. if month_text in processed_months:
  143. continue
  144. if not month_text in target_months:
  145. continue # 跳过已处理月份
  146. log.info(f"点击月份Tab:{year}-{month_text}")
  147. if year_month is not None:
  148. if y1 != year or y2 != year:
  149. retry_count += 1
  150. break
  151. if not (y1 == year and m1 == month_text) and not (y2 == year and m2 == month_text):
  152. log.info(f"{year}年 {month_text} 月份跳过, auto tar: {year_month}")
  153. continue
  154. a_tag.click()
  155. # 处理详情页逻辑
  156. WebDriverWait(driver, 30).until(
  157. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  158. )
  159. detail_link_arr = get_behind_detail_link(driver, base_url)
  160. if not detail_link_arr:
  161. log.info(f"{year}-{month_text} 未找到详情链接")
  162. for detail_link in detail_link_arr:
  163. log.info(f"{year}-{month_text} 详情链接:{detail_link}")
  164. driver.get(detail_link)
  165. download_file_from_detail_page(driver)
  166. driver.back()
  167. WebDriverWait(driver, 30).until(
  168. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  169. )
  170. processed_months.add(month_text)
  171. found = True
  172. if not found:
  173. log.info(f"{year}年未找到 {month_text} Tab")
  174. all_found = False
  175. if all_found:
  176. log.info(f"{year}年所有目标月份处理完成")
  177. break
  178. else:
  179. # 部分月份未找到,重新获取元素
  180. log.info(f"第 {retry_count} 次重试获取月份Tab...")
  181. except StaleElementReferenceException:
  182. log.info("页面刷新,重新获取月份Tab列表...")
  183. time.sleep(2)
  184. log.info(f"{year}年最终处理的月份:{processed_months}")
  185. def get_behind_detail_link(driver, base_url):
  186. """获取点击月份Tab后 conList_ul 下所有 li 的 a 标签完整链接"""
  187. href_arr = []
  188. try:
  189. elements = WebDriverWait(driver, 30).until(
  190. EC.element_to_be_clickable((By.XPATH, '//ul[@class="conList_ul"]/li/a'))
  191. )
  192. elements = elements.find_elements(By.XPATH, '//ul[@class="conList_ul"]/li/a')
  193. for element in elements:
  194. href = element.get_attribute("href")
  195. full_url = urljoin(base_url, href) # 自动处理相对路径
  196. href_arr.append(full_url)
  197. return href_arr
  198. except Exception as e:
  199. log.info(f"获取详情链接失败: {str(e)}")
  200. return []
  201. def download_file_from_detail_page(driver):
  202. WebDriverWait(driver, 30).until(
  203. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  204. )
  205. try:
  206. elements = driver.find_elements(By.XPATH, '//div[@class="easysite-news-content"]//div[@id="easysiteText"]//p//a')
  207. if not elements:
  208. log.info("详情页未找到目标文件链接")
  209. return
  210. for download_btn in elements:
  211. file_name = download_btn.text.strip()
  212. if not file_name:
  213. continue
  214. file_url = download_btn.get_attribute("href")
  215. if not file_url.lower().endswith(('.xls', '.xlsx')):
  216. log.info(f"跳过非 Excel 文件: {file_url}")
  217. continue
  218. log.info(f"正在下载: {file_name} → {file_url}")
  219. # 记录下载前的文件列表
  220. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  221. # 随机点击延迟
  222. time.sleep(random.uniform(1, 3))
  223. download_btn.click()
  224. downloaded_file = wait_for_download_complete(existing_files=existing_files)
  225. year, start_month, month = extract_year_and_month(file_name)
  226. final_path = Path(download_dir) / year / month / f"{file_name}"
  227. if os.path.exists(final_path):
  228. log.info(f"文件已存在:{file_name} 正在覆盖...")
  229. os.unlink(final_path)
  230. final_dir = Path(download_dir) / year / month
  231. final_dir.mkdir(parents=True, exist_ok=True)
  232. log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
  233. downloaded_file.rename(final_path)
  234. log.info(f"√ 下载成功:{final_path}")
  235. except Exception as e:
  236. log.info(f"详情页处理异常: {str(e)}")
  237. def extract_year_and_month(file_name):
  238. # 支持两种格式:
  239. # - 2025年1-2月xxx
  240. # - 2025年3月xxx
  241. match = re.search(r"(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name)
  242. if match:
  243. year = match.group(1)
  244. start_month = match.group(2)
  245. end_month = match.group(3) if match.group(3) else start_month
  246. return year, start_month.zfill(2), end_month.zfill(2)
  247. else:
  248. raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
  249. def convert_to_chinese_uppercase(num):
  250. if not 1 <= num <= 12:
  251. return None # 超出范围的数字返回 None 或根据需要处理
  252. if num < 10:
  253. return '零一二三四五六七八九'[num]
  254. elif num == 10:
  255. return '十'
  256. elif num == 11:
  257. return '十一'
  258. elif num == 12:
  259. return '十二'
  260. return None
  261. def detect_latest_month(driver, url):
  262. driver.get(url)
  263. current_date = datetime.now()
  264. for offset in range(0, 3):
  265. check_date = current_date - timedelta(days=offset * 30)
  266. check_year = check_date.year
  267. month = check_date.month
  268. check_month = convert_to_chinese_uppercase(month)
  269. target_title = f"{check_month}月"
  270. try:
  271. WebDriverWait(driver, 10).until(
  272. EC.presence_of_element_located((By.XPATH, f'//ul[@class="nav_tab"]//li/a[normalize-space()="{target_title}"]'))
  273. )
  274. log.info(f"已找到最新月份数据 {check_year}-{check_month}")
  275. # 看是否已存表,已存则跳过;
  276. count = base_mysql.get_code_exist(f'{check_year}-{month:02d}', '330000')
  277. if count > 0:
  278. log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
  279. continue
  280. return f"{check_year}年-{check_month}月"
  281. except:
  282. log.error(f"未找到 {target_title}")
  283. continue
  284. log.error("三个月内未找到有效数据")
  285. return None
  286. def extract_year_month_chinese(text):
  287. """
  288. 支持格式:
  289. - 2025年四月
  290. - 2025年-四月
  291. - 2024年十二月
  292. - 2023年-二月
  293. """
  294. # 中文月份映射表
  295. month_map = {
  296. '一': '01',
  297. '二': '02',
  298. '三': '03',
  299. '四': '04',
  300. '五': '05',
  301. '六': '06',
  302. '七': '07',
  303. '八': '08',
  304. '九': '09',
  305. '十': '10',
  306. '十一': '11',
  307. '十二': '12'
  308. }
  309. # 正则匹配年份和中文月份
  310. match = re.search(r"(\d{4})年-?([一|二|三|四|五|六|七|八|九|十]{1,2}[一|二]?月)", text)
  311. if not match:
  312. raise ValueError(f"无法从文本中提取年份和月份: {text}")
  313. year = match.group(1) # 提取年份
  314. # 提取中文月份并处理成数字
  315. chinese_month = match.group(2).replace('月', '')
  316. if chinese_month in month_map:
  317. month = month_map[chinese_month]
  318. else:
  319. # 特殊处理 "十月"
  320. if chinese_month == '十月':
  321. month = '10'
  322. else:
  323. raise ValueError(f"不支持的中文月份格式: {chinese_month}")
  324. return year, month
  325. def crawl_with_selenium(url, mark):
  326. driver = webdriver.Firefox(options=configure_stealth_options())
  327. year_month = None
  328. if 'auto' == mark:
  329. res = detect_latest_month(driver, url)
  330. if res is None:
  331. log.info("浙江省海关没有最新数据更新")
  332. return None
  333. year_month = res
  334. print(f"检测到最新有效数据:{year_month}")
  335. base_url = 'http://hangzhou.customs.gov.cn'
  336. try:
  337. # 注入反检测脚本
  338. driver.execute_script("""
  339. Object.defineProperty(navigator, 'webdriver', {
  340. get: () => undefined
  341. });
  342. window.alert = () => {};
  343. """)
  344. # 页面加载策略
  345. driver.get(url)
  346. # 按年份导航
  347. crawl_by_year_tabs(driver, base_url, year_month)
  348. finally:
  349. driver.quit()
  350. log.info(f"浙江省海关全量数据下载任务完成")
  351. # 等待5s后执行
  352. time.sleep(5)
  353. hierarchical_traversal(download_dir, year_month)
  354. log.info("浙江省海关类章、国家、城市所有文件处理完成!")
  355. time.sleep(5)
  356. base_mysql.update_shandong_yoy('浙江省')
  357. log.info("浙江省海关城市同比sql处理完成")
  358. return 'finish', year_month
  359. def wait_for_download_complete(timeout=30, existing_files=None):
  360. """
  361. 监控下载目录,等待文件下载完成并返回新下载的文件。
  362. :param timeout: 超时时间(秒)
  363. :param existing_files: 下载前已存在的文件列表
  364. :return: 新下载的文件路径
  365. """
  366. start_time = time.time()
  367. temp_exts = ('.part', '.crdownload')
  368. if existing_files is None:
  369. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  370. while (time.time() - start_time) < timeout:
  371. # 获取有效文件列表
  372. valid_files = []
  373. for f in Path(download_dir).glob('*'):
  374. if (f.name not in existing_files and
  375. not f.name.endswith(temp_exts) and
  376. f.stat().st_size > 0):
  377. valid_files.append(f)
  378. # 等待最新文件稳定
  379. if valid_files:
  380. return max(valid_files, key=lambda x: x.stat().st_mtime)
  381. time.sleep(2)
  382. raise TimeoutError("文件下载超时")
  383. def hierarchical_traversal(root_path, year_month):
  384. """分层遍历:省份->年份->月目录"""
  385. root = Path(root_path)
  386. # 获取所有年份目录
  387. year_dirs = [
  388. item for item in root.iterdir()
  389. if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
  390. ]
  391. # 按年倒序
  392. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  393. # 构造完整的路径:download/shandong/2025/03
  394. log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
  395. # 提取月份目录
  396. month_dirs = []
  397. for item in year_dir.iterdir():
  398. if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
  399. month_dirs.append({
  400. "path": item,
  401. "month": int(item.name)
  402. })
  403. # 按月倒序输出
  404. if month_dirs:
  405. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  406. log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  407. path = md['path']
  408. if year_month is not None:
  409. year, month = extract_year_month_chinese(year_month)
  410. parts = path.parts
  411. if year_dir.name != year or parts[-1] != month:
  412. log.info(f"浙江省海关已处理 {year_month} 数据,返回")
  413. return
  414. gov_commodity_zhejiang_import_export.process_folder(path)
  415. gov_commodity_zhejiang_country.process_folder(path)
  416. gov_commodity_zhejiang_city.process_folder(path)
  417. def main():
  418. try:
  419. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  420. parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
  421. args = parser.parse_args()
  422. start_time = time.time()
  423. if args.year == 2023:
  424. log.info("正在全量采集浙江省海关数据")
  425. crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html', 'all')
  426. duration = time.time() - start_time
  427. minutes, seconds = divmod(duration, 60)
  428. send_dingtalk_message(f'【浙江省海关】全量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
  429. else:
  430. log.info("正在增量采集浙江省海关数据")
  431. res = crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html','auto')
  432. if res is not None:
  433. r1, r2 = res
  434. if r1 == 'finish':
  435. duration = time.time() - start_time
  436. minutes, seconds = divmod(duration, 60)
  437. send_dingtalk_message(f'【浙江省海关】 {r2} 增量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
  438. except Exception as e:
  439. send_dingtalk_message(f"【浙江省海关】发生错误:{e}")
  440. if __name__ == '__main__':
  441. main()