crawl_gov_jiangsu_full.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. import os
  2. import random
  3. import re
  4. import subprocess
  5. import time
  6. import rarfile
  7. import shutil
  8. from pathlib import Path
  9. import sys
  10. from datetime import datetime, timedelta
  11. from faker import Faker
  12. from selenium import webdriver
  13. from selenium.webdriver import FirefoxOptions
  14. from selenium.webdriver.common.by import By
  15. from selenium.webdriver.support import expected_conditions as EC
  16. from selenium.webdriver.support.ui import WebDriverWait
  17. import gov_commodity_jiangsu_country
  18. import gov_commodity_jiangsu_city
  19. import gov_commodity_jiangsu_import_export
  20. from utils import base_country_code, base_mysql
  21. from utils.log import log
  22. # rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
  23. rarfile.UNRAR_EXECUTABLE = "unrar"
  24. download_dir = base_country_code.download_dir
  25. Path(download_dir).mkdir(parents=True, exist_ok=True)
  26. def configure_stealth_options():
  27. """增强型反检测配置[1,4](@ref)"""
  28. opts = FirefoxOptions()
  29. print("当前下载路径:", Path(download_dir).resolve())
  30. # 文件下载配置
  31. opts.set_preference("browser.download.dir", download_dir)
  32. opts.set_preference("browser.download.folderList", 2)
  33. opts.set_preference("browser.download.manager.showWhenStarting", False)
  34. opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
  35. "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
  36. opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
  37. opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
  38. # 反检测参数
  39. opts.set_preference("dom.webdriver.enabled", False)
  40. opts.set_preference("useAutomationExtension", False)
  41. opts.add_argument("--disable-blink-features=AutomationControlled")
  42. # 动态指纹
  43. fake = Faker()
  44. opts.set_preference("general.useragent.override", fake.firefox())
  45. opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
  46. # 视口配置
  47. opts.add_argument("--width=1440")
  48. opts.add_argument("--height=900")
  49. opts.add_argument("--headless")
  50. return opts
  51. def find_target_links(driver, year_month):
  52. """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
  53. # 等待页面加载完成
  54. WebDriverWait(driver, 30).until(
  55. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  56. )
  57. try:
  58. # 使用 XPath 精准匹配标题文本
  59. xpath = '//ul[@class="conList_ul"]//a[contains(@href, ".rar")]'
  60. # 检查页面中是否存在该 title 对应的元素
  61. elements = driver.find_elements(By.XPATH, xpath)
  62. if not elements:
  63. return None
  64. # 用于记录已处理过的文件名(防止重复下载)
  65. processed_files = set()
  66. # 遍历所有链接并点击下载
  67. for download_btn in elements:
  68. # 获取文件名(用于后续判断)
  69. file_name = download_btn.text.strip()
  70. log.info(f"正在下载: {file_name}")
  71. # 记录下载前的文件列表
  72. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  73. # 模拟点击
  74. download_btn.click()
  75. time.sleep(random.uniform(1, 3))
  76. # 等待文件下载完成
  77. rar_files = wait_for_download_complete(existing_files=existing_files)
  78. if not rar_files:
  79. log.info("未找到新下载的 .rar 文件")
  80. continue
  81. downloaded_file = rar_files[0]
  82. if downloaded_file.suffix == '.rar':
  83. # 解压文件
  84. with rarfile.RarFile(downloaded_file) as rf:
  85. # 获取压缩包中的第一个 .xls 文件
  86. xls_files = [f for f in rf.namelist() if f.endswith('.xls') or f.endswith('.xlsx')]
  87. if not xls_files:
  88. log.info(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
  89. continue
  90. for xls_file in xls_files:
  91. if year_month is None:
  92. if xls_file.startswith('2022'):
  93. return 'stop'
  94. else:
  95. if not xls_file.startswith(year_month):
  96. log.info(f"非 {year_month} 文件: {file_name}, stop")
  97. return 'stop'
  98. if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file:
  99. log.info(f"检测到不需要的文件:{xls_file},跳过")
  100. continue
  101. # 解压到临时目录
  102. temp_dir = Path(download_dir) / 'temp'
  103. temp_dir.mkdir(parents=True, exist_ok=True)
  104. if not extract_rar(downloaded_file, temp_dir):
  105. log.info(f"解压文件 {downloaded_file.name} 时发生错误")
  106. continue
  107. # 获取解压后的文件路径
  108. match = re.search(r"(\d{4})年(\d{1,2})月", xls_file)
  109. if not match:
  110. raise ValueError(f"无效标题格式:{xls_file}")
  111. year = match.group(1)
  112. month = match.group(2).zfill(2)
  113. extracted_file = temp_dir / xls_file
  114. final_path = Path(download_dir) / year / month / extracted_file.name
  115. if os.path.exists(final_path):
  116. log.info(f"文件已存在:{extracted_file.name} 正在覆盖...")
  117. os.unlink(final_path)
  118. final_dir = Path(download_dir) / year / month
  119. final_dir.mkdir(parents=True, exist_ok=True)
  120. log.info(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
  121. try:
  122. extracted_file.rename(final_path)
  123. log.info(f"√ 下载成功:{final_path}")
  124. except Exception as e:
  125. log.info(f"文件移动失败: {str(e)}")
  126. # 删除临时目录(无论是否为空)
  127. try:
  128. shutil.rmtree(temp_dir) # 替换 os.rmdir(temp_dir)
  129. except Exception as e:
  130. log.info(f"删除临时目录失败: {str(e)}")
  131. # 删除 .rar 文件
  132. log.info(f"删除 .rar 文件:{downloaded_file}")
  133. os.unlink(downloaded_file)
  134. else:
  135. log.info(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
  136. # 将已处理的文件名加入集合
  137. processed_files.add(file_name)
  138. return None
  139. except Exception as e:
  140. log.info(f"下载时发生异常: {str(e)}")
  141. def extract_rar(rar_path, extract_to):
  142. """备用解压函数(当 rarfile 失效时使用)"""
  143. # winrar_path = r"C:\Program Files\WinRAR\Rar.exe" # 推荐使用 Rar.exe 而非 WinRAR.exe
  144. # cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
  145. cmd = ["unrar", 'x', '-y', rar_path, str(extract_to)]
  146. # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
  147. creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
  148. result = subprocess.run(
  149. cmd,
  150. stdout=subprocess.PIPE,
  151. stderr=subprocess.PIPE,
  152. creationflags=creationflags # 关键点:隐藏窗口
  153. )
  154. if result.returncode == 0:
  155. log.info(f"解压成功: {rar_path} → {extract_to}")
  156. return True
  157. else:
  158. log.info(f"解压失败: {result.stderr.decode('gbk')}")
  159. return False
  160. def detect_latest_month(driver, url):
  161. driver.get(url)
  162. current_date = datetime.now()
  163. for offset in range(0, 3):
  164. check_date = current_date - timedelta(days=offset * 30)
  165. check_year = check_date.year
  166. check_month = check_date.month
  167. target_title = f"{check_year}年{check_month}月"
  168. try:
  169. WebDriverWait(driver, 10).until(
  170. EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
  171. )
  172. log.info(f"已找到最新月份数据 {check_year}-{check_month}")
  173. # 看是否已存表,已存则跳过;
  174. count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', '320000')
  175. if count > 0:
  176. log.info(f"count: {count} -> 已存在 {check_year}-{check_month} 数据,跳过")
  177. continue
  178. return f"{check_year}年{check_month}月"
  179. except:
  180. log.info(f"未找到 {target_title}")
  181. continue
  182. log.info("三个月内未找到有效数据")
  183. return None
  184. def crawl_with_selenium(url, mark):
  185. driver = webdriver.Firefox(options=configure_stealth_options())
  186. year_month = None
  187. if 'increment' == mark:
  188. res = detect_latest_month(driver, url)
  189. if res is None:
  190. log.info("江苏省海关没有最新数据更新")
  191. sys.exit(0)
  192. year_month = res
  193. print(f"检测到最新有效数据:{year_month}")
  194. try:
  195. # 注入反检测脚本
  196. driver.execute_script("""
  197. Object.defineProperty(navigator, 'webdriver', {
  198. get: () => undefined
  199. });
  200. window.alert = () => {};
  201. """)
  202. # 页面加载策略
  203. driver.get(url)
  204. while True:
  205. # 访问当前页
  206. result = find_target_links(driver, year_month)
  207. if result == 'stop':
  208. break
  209. # 等待页面加载完成
  210. WebDriverWait(driver, 30).until(
  211. EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
  212. )
  213. # 模拟点击下一页
  214. xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
  215. next_page_btn = WebDriverWait(driver, 15).until(
  216. EC.element_to_be_clickable((By.XPATH, xpath))
  217. )
  218. # 获取下一页的URL
  219. next_page_url = next_page_btn.get_attribute("onclick")
  220. if not next_page_url:
  221. log.info("已到达最后一页,停止爬取")
  222. break
  223. # 从onclick属性中提取URL
  224. next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
  225. if not next_page_url.startswith(('http://', 'https://')):
  226. base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名
  227. next_page_url = base_url + next_page_url
  228. # 访问下一页
  229. driver.get(next_page_url)
  230. log.info(f"开始爬取 {next_page_url} 页面数据")
  231. finally:
  232. driver.quit()
  233. def wait_for_download_complete(timeout=30, existing_files=None):
  234. start_time = time.time()
  235. if existing_files is None:
  236. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  237. while (time.time() - start_time) < timeout:
  238. new_files = [f for f in Path(download_dir).glob('*.rar') if f.name not in existing_files]
  239. if new_files:
  240. # 等待文件大小稳定(不再变化),确保下载完成
  241. stable = True
  242. for file in new_files:
  243. prev_size = file.stat().st_size
  244. time.sleep(1)
  245. curr_size = file.stat().st_size
  246. if curr_size != prev_size:
  247. stable = False
  248. break
  249. if stable:
  250. return new_files
  251. time.sleep(2)
  252. raise TimeoutError("未找到 .rar 文件或超时")
  253. def hierarchical_traversal(root_path, all_records):
  254. """分层遍历:省份->年份->月目录"""
  255. root = Path(root_path)
  256. # 获取所有年份目录
  257. year_dirs = [
  258. item for item in root.iterdir()
  259. if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
  260. ]
  261. # 按年倒序
  262. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  263. # 构造完整的路径:download/shandong/2025/03
  264. log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
  265. # 提取月份目录
  266. month_dirs = []
  267. for item in year_dir.iterdir():
  268. if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
  269. month_dirs.append({
  270. "path": item,
  271. "month": int(item.name)
  272. })
  273. # 按月倒序输出
  274. if month_dirs:
  275. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  276. log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  277. gov_commodity_jiangsu_import_export.process_folder(md['path'], all_records)
  278. gov_commodity_jiangsu_country.process_folder(md['path'])
  279. gov_commodity_jiangsu_city.process_folder(md['path'])
  280. if __name__ == "__main__":
  281. crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html', 'all')
  282. # crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html', 'increment')
  283. log.info(f"江苏南京海关全量数据下载任务完成")
  284. # 等待5s后执行
  285. time.sleep(5)
  286. all_records = base_mysql.get_hs_all()
  287. hierarchical_traversal(base_country_code.download_dir, all_records)
  288. log.info("江苏南京海关类章、国家、城市所有文件处理完成!")
  289. time.sleep(5)
  290. base_mysql.update_january_yoy('江苏省')
  291. base_mysql.update_shandong_yoy('江苏省')
  292. log.info("江苏南京海关城市同比sql处理完成")