crawl_gov_hebei_full.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import argparse
  2. import os
  3. import random
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from faker import Faker
  9. from selenium import webdriver
  10. from selenium.webdriver import FirefoxOptions
  11. from selenium.webdriver.common.by import By
  12. from selenium.webdriver.support import expected_conditions as EC
  13. from selenium.webdriver.support.ui import WebDriverWait
  14. from crossborder.hebei import download_dir
  15. from crossborder.hebei import gov_commodity_hebei_city
  16. from crossborder.hebei import gov_commodity_hebei_country
  17. from crossborder.hebei import gov_commodity_hebei_import_export
  18. from crossborder.utils import base_country_code, base_mysql
  19. from crossborder.utils.dingtalk import send_dingtalk_message
  20. from crossborder.utils.log import get_logger
  21. log = get_logger(__name__)
  22. def get_current_target_titles():
  23. return [
  24. f"2025年4月河北分进口商品",
  25. f"2025年4月河北分出口商品",
  26. f"2025年4月河北分国家",
  27. f"2025年4月河北分地市"
  28. ]
  29. def configure_stealth_options():
  30. """增强型反检测配置[1,4](@ref)"""
  31. opts = FirefoxOptions()
  32. print("当前下载路径:", Path(download_dir).resolve())
  33. # 文件下载配置
  34. opts.set_preference("browser.download.dir", download_dir)
  35. opts.set_preference("browser.download.folderList", 2)
  36. opts.set_preference("browser.download.manager.showWhenStarting", False)
  37. opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
  38. "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
  39. opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
  40. opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
  41. # 反检测参数
  42. opts.set_preference("dom.webdriver.enabled", False)
  43. opts.set_preference("useAutomationExtension", False)
  44. opts.add_argument("--disable-blink-features=AutomationControlled")
  45. # 动态指纹
  46. fake = Faker()
  47. opts.set_preference("general.useragent.override", fake.firefox())
  48. opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
  49. # 视口配置
  50. opts.add_argument("--width=1440")
  51. opts.add_argument("--height=900")
  52. opts.add_argument("--headless")
  53. return opts
  54. def remove_prefix_from_url(url):
  55. # 分离路径和文件名
  56. path_parts = url.split('/')
  57. filename = path_parts[-1]
  58. # 使用正则表达式去掉前缀数字和点(如 "1.")
  59. new_filename = re.sub(r'^\d+\.', '', filename)
  60. # 确保域名补全逻辑
  61. if not url.startswith(('http://', 'https://')):
  62. base_url = 'http://shijiazhuang.customs.gov.cn'
  63. url = base_url + '/'.join(path_parts[:-1]) + '/' + new_filename
  64. return url
  65. def find_target_links(driver, year_month):
  66. """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
  67. # 等待页面加载完成
  68. WebDriverWait(driver, 30).until(
  69. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  70. )
  71. element_arr = driver.find_elements(By.XPATH, '//div[@class="list_con"]//ul[@class="easysite-list-modelone"]//a')
  72. if not element_arr:
  73. log.info("未找到目标标题")
  74. return None
  75. for elements in element_arr:
  76. file_name = elements.text.strip()
  77. if not file_name:
  78. continue
  79. if year_month is None:
  80. if file_name.startswith('2022'):
  81. return 'stop'
  82. else:
  83. if not file_name.startswith(year_month):
  84. log.info(f"非 {year_month} 文件: {file_name}, stop")
  85. return 'stop'
  86. if '进口商品' in file_name or '出口商品' in file_name or '分国家' in file_name or '分国别' in file_name or '地市' in file_name:
  87. file_url = elements.get_attribute("href")
  88. file_url = remove_prefix_from_url(file_url)
  89. if not file_url.lower().endswith(('.xls', '.xlsx')):
  90. log.info(f"跳过非 Excel 文件: {file_url}")
  91. continue
  92. log.info(f"正在下载: {file_name} → {file_url}")
  93. # 记录下载前的文件列表
  94. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  95. # 随机点击延迟
  96. time.sleep(random.uniform(1, 3))
  97. elements.click()
  98. try:
  99. downloaded_file = wait_for_download_complete(existing_files=existing_files)
  100. except Exception as e:
  101. log.info(f"下载失败: {str(e)}")
  102. continue
  103. year, start_month, month = extract_year_and_month(file_name)
  104. final_path = Path(download_dir) / year / month / f"{file_name}.xls"
  105. if os.path.exists(final_path):
  106. log.info(f"文件已存在:{file_name} 正在覆盖...")
  107. os.unlink(final_path)
  108. final_dir = Path(download_dir) / year / month
  109. final_dir.mkdir(parents=True, exist_ok=True)
  110. log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
  111. downloaded_file.rename(final_path)
  112. log.info(f"√ 下载成功:{final_path}")
  113. else:
  114. log.info(f'{file_name} 不需要此文件,跳过')
  115. continue
  116. return None
  117. def extract_year_and_month(file_name):
  118. # 支持格式:1.2025年1-2月xxx 或 2025年3月xxx
  119. match = re.search(r"\b(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name)
  120. if match:
  121. year = match.group(1)
  122. start_month = match.group(2)
  123. end_month = match.group(3) if match.group(3) else start_month
  124. return year, start_month.zfill(2), end_month.zfill(2)
  125. else:
  126. raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
  127. def detect_latest_month(driver, url):
  128. driver.get(url)
  129. current_date = datetime.now()
  130. for offset in range(0, 3):
  131. check_date = current_date - timedelta(days=offset * 30)
  132. check_year = check_date.year
  133. check_month = check_date.month
  134. target_title = f"{check_year}年{check_month}月"
  135. try:
  136. WebDriverWait(driver, 10).until(
  137. EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
  138. )
  139. log.info(f"已找到最新月份数据 {check_year}-{check_month}")
  140. # 看是否已存表,已存则跳过;
  141. count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', '130000')
  142. if count > 0:
  143. log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
  144. continue
  145. return f"{check_year}年{check_month}月"
  146. except:
  147. log.error(f"未找到 {target_title}")
  148. continue
  149. log.error("三个月内未找到有效数据")
  150. return None
  151. def crawl_with_selenium(url, mark):
  152. driver = webdriver.Firefox(options=configure_stealth_options())
  153. year_month = None
  154. if 'auto' == mark:
  155. res = detect_latest_month(driver, url)
  156. if res is None:
  157. log.info("河北省海关没有最新数据更新")
  158. # sys.exit(0)
  159. return
  160. year_month = res
  161. print(f"检测到最新有效数据:{year_month}")
  162. try:
  163. # 注入反检测脚本
  164. driver.execute_script("""
  165. Object.defineProperty(navigator, 'webdriver', {
  166. get: () => undefined
  167. });
  168. window.alert = () => {};
  169. """)
  170. # 页面加载策略
  171. driver.get(url)
  172. while True:
  173. # 访问当前页
  174. result = find_target_links(driver, year_month)
  175. if result and result == 'stop':
  176. break
  177. # 等待页面加载完成
  178. WebDriverWait(driver, 30).until(
  179. EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
  180. )
  181. # 模拟点击下一页
  182. xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
  183. next_page_btn = WebDriverWait(driver, 15).until(
  184. EC.element_to_be_clickable((By.XPATH, xpath))
  185. )
  186. # 获取下一页的URL
  187. next_page_url = next_page_btn.get_attribute("onclick")
  188. if not next_page_url:
  189. log.info("已到达最后一页,停止采集")
  190. break
  191. # 从onclick属性中提取URL
  192. next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
  193. if not next_page_url.startswith(('http://', 'https://')):
  194. base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名
  195. next_page_url = base_url + next_page_url
  196. # 访问下一页
  197. driver.get(next_page_url)
  198. log.info(f"开始采集 {next_page_url} 页面数据")
  199. finally:
  200. driver.quit()
  201. # 等待5s后执行
  202. time.sleep(5)
  203. hierarchical_traversal(download_dir)
  204. log.info(f"河北省海关全量数据下载任务完成")
  205. time.sleep(5)
  206. base_mysql.update_shandong_yoy('河北省')
  207. log.info("河北省海关城市同比sql处理完成")
  208. return 'finish'
  209. def wait_for_download_complete(timeout=30, existing_files=None):
  210. """
  211. 监控下载目录,等待文件下载完成并返回新下载的文件。
  212. :param timeout: 超时时间(秒)
  213. :param existing_files: 下载前已存在的文件列表
  214. :return: 新下载的文件路径
  215. """
  216. start_time = time.time()
  217. temp_exts = ('.part', '.crdownload')
  218. if existing_files is None:
  219. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  220. while (time.time() - start_time) < timeout:
  221. # 获取有效文件列表
  222. valid_files = []
  223. for f in Path(download_dir).glob('*'):
  224. if (f.name not in existing_files and
  225. not f.name.endswith(temp_exts) and
  226. f.stat().st_size > 0):
  227. valid_files.append(f)
  228. # 等待最新文件稳定
  229. if valid_files:
  230. return max(valid_files, key=lambda x: x.stat().st_mtime)
  231. time.sleep(2)
  232. raise TimeoutError("文件下载超时")
  233. def hierarchical_traversal(root_path):
  234. """分层遍历:省份->年份->月目录"""
  235. root = Path(root_path)
  236. # 获取所有年份目录
  237. year_dirs = [
  238. item for item in root.iterdir()
  239. if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
  240. ]
  241. # 按年倒序
  242. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  243. # 构造完整的路径:download/shandong/2025/03
  244. log.info(f"\n年份:{year_dir.name} | 省份:hebei")
  245. # 提取月份目录
  246. month_dirs = []
  247. for item in year_dir.iterdir():
  248. if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
  249. month_dirs.append({
  250. "path": item,
  251. "month": int(item.name)
  252. })
  253. # 按月倒序输出
  254. if month_dirs:
  255. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  256. log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  257. gov_commodity_hebei_import_export.process_folder(md['path'])
  258. gov_commodity_hebei_country.process_folder(md['path'])
  259. gov_commodity_hebei_city.process_folder(md['path'])
  260. def main():
  261. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  262. parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
  263. args = parser.parse_args()
  264. start_time = time.time()
  265. if args.year == 2023:
  266. log.info("正在全量采集河北省海关数据")
  267. crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'all')
  268. duration = time.time() - start_time
  269. send_dingtalk_message(f'【河北省海关】全量数据采集完成,耗时 {duration:.2f} 秒')
  270. else:
  271. log.info("正在增量采集河北省海关数据")
  272. res = crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html','auto')
  273. if res == 'finish':
  274. duration = time.time() - start_time
  275. send_dingtalk_message(f'【河北省海关】增量数据采集完成,耗时 {duration:.2f} 秒')
  276. if __name__ == '__main__':
  277. main()