crawl_gov_jiangsu_full.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import os
  2. import random
  3. import re
  4. import subprocess
  5. import time
  6. import rarfile
  7. import shutil
  8. from pathlib import Path
  9. from faker import Faker
  10. from selenium import webdriver
  11. from selenium.webdriver import FirefoxOptions
  12. from selenium.webdriver.common.by import By
  13. from selenium.webdriver.support import expected_conditions as EC
  14. from selenium.webdriver.support.ui import WebDriverWait
  15. import gov_commodity_jiangsu_country
  16. import gov_commodity_jiangsu_city
  17. import gov_commodity_jiangsu_import_export
  18. from utils import base_country_code, base_mysql
  19. # 显式指定 unrar 路径(根据实际情况修改)
  20. rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
  21. # rarfile.UNRAR_EXECUTABLE = "/usr/bin/unrar" # Linux/macOS
  22. download_dir = base_country_code.download_dir
  23. Path(download_dir).mkdir(parents=True, exist_ok=True)
  24. def configure_stealth_options():
  25. """增强型反检测配置[1,4](@ref)"""
  26. opts = FirefoxOptions()
  27. print("当前下载路径:", Path(download_dir).resolve())
  28. # 文件下载配置
  29. opts.set_preference("browser.download.dir", download_dir)
  30. opts.set_preference("browser.download.folderList", 2)
  31. opts.set_preference("browser.download.manager.showWhenStarting", False)
  32. opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
  33. "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
  34. opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
  35. opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
  36. # 反检测参数
  37. opts.set_preference("dom.webdriver.enabled", False)
  38. opts.set_preference("useAutomationExtension", False)
  39. opts.add_argument("--disable-blink-features=AutomationControlled")
  40. # 动态指纹
  41. fake = Faker()
  42. opts.set_preference("general.useragent.override", fake.firefox())
  43. opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
  44. # 视口配置
  45. opts.add_argument("--width=1440")
  46. opts.add_argument("--height=900")
  47. opts.add_argument("--headless")
  48. return opts
  49. def find_target_links(driver):
  50. """在当前页面找到符合 TARGET_TITLES 的文件并触发下载"""
  51. # 等待页面加载完成
  52. WebDriverWait(driver, 30).until(
  53. EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
  54. )
  55. try:
  56. # 使用 XPath 精准匹配标题文本
  57. xpath = '//ul[@class="conList_ul"]//a[contains(@href, ".rar")]'
  58. # 检查页面中是否存在该 title 对应的元素
  59. elements = driver.find_elements(By.XPATH, xpath)
  60. if not elements:
  61. return None
  62. # 用于记录已处理过的文件名(防止重复下载)
  63. processed_files = set()
  64. # 遍历所有链接并点击下载
  65. for download_btn in elements:
  66. # 获取文件名(用于后续判断)
  67. file_name = download_btn.text.strip()
  68. print(f"正在下载: {file_name}")
  69. # 记录下载前的文件列表
  70. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  71. # 模拟点击
  72. download_btn.click()
  73. time.sleep(random.uniform(1, 3))
  74. # 等待文件下载完成
  75. rar_files = wait_for_download_complete(existing_files=existing_files)
  76. if not rar_files:
  77. print("未找到新下载的 .rar 文件")
  78. continue
  79. downloaded_file = rar_files[0]
  80. if downloaded_file.suffix == '.rar':
  81. # 解压文件
  82. with rarfile.RarFile(downloaded_file) as rf:
  83. # 获取压缩包中的第一个 .xls 文件
  84. xls_files = [f for f in rf.namelist() if f.endswith('.xls') or f.endswith('.xlsx')]
  85. if not xls_files:
  86. print(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
  87. continue
  88. for xls_file in xls_files:
  89. if xls_file.startswith('2022'):
  90. return 'stop'
  91. if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file:
  92. print(f"检测到不需要的文件:{xls_file},跳过")
  93. continue
  94. # 解压到临时目录
  95. temp_dir = Path(download_dir) / 'temp'
  96. temp_dir.mkdir(parents=True, exist_ok=True)
  97. if not extract_rar(downloaded_file, temp_dir):
  98. print(f"解压文件 {downloaded_file.name} 时发生错误")
  99. continue
  100. # 获取解压后的文件路径
  101. match = re.search(r"(\d{4})年(\d{1,2})月", xls_file)
  102. if not match:
  103. raise ValueError(f"无效标题格式:{xls_file}")
  104. year = match.group(1)
  105. month = match.group(2).zfill(2)
  106. extracted_file = temp_dir / xls_file
  107. final_path = Path(download_dir) / year / month / extracted_file.name
  108. if os.path.exists(final_path):
  109. print(f"文件已存在:{extracted_file.name} 正在覆盖...")
  110. os.unlink(final_path)
  111. final_dir = Path(download_dir) / year / month
  112. final_dir.mkdir(parents=True, exist_ok=True)
  113. print(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
  114. try:
  115. extracted_file.rename(final_path)
  116. print(f"√ 下载成功:{final_path}")
  117. except Exception as e:
  118. print(f"文件移动失败: {str(e)}")
  119. # 删除临时目录(无论是否为空)
  120. try:
  121. shutil.rmtree(temp_dir) # 替换 os.rmdir(temp_dir)
  122. except Exception as e:
  123. print(f"删除临时目录失败: {str(e)}")
  124. # 删除 .rar 文件
  125. print(f"删除 .rar 文件:{downloaded_file}")
  126. os.unlink(downloaded_file)
  127. else:
  128. print(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
  129. # 将已处理的文件名加入集合
  130. processed_files.add(file_name)
  131. return None
  132. except Exception as e:
  133. print(f"下载时发生异常: {str(e)}")
  134. def extract_rar(rar_path, extract_to):
  135. """备用解压函数(当 rarfile 失效时使用)"""
  136. winrar_path = r"C:\Program Files\WinRAR\Rar.exe" # 推荐使用 Rar.exe 而非 WinRAR.exe
  137. cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
  138. # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
  139. creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
  140. result = subprocess.run(
  141. cmd,
  142. stdout=subprocess.PIPE,
  143. stderr=subprocess.PIPE,
  144. creationflags=creationflags # 关键点:隐藏窗口
  145. )
  146. if result.returncode == 0:
  147. print(f"解压成功: {rar_path} → {extract_to}")
  148. return True
  149. else:
  150. print(f"解压失败: {result.stderr.decode('gbk')}")
  151. return False
  152. def crawl_with_selenium(url):
  153. driver = webdriver.Firefox(options=configure_stealth_options())
  154. try:
  155. # 注入反检测脚本
  156. driver.execute_script("""
  157. Object.defineProperty(navigator, 'webdriver', {
  158. get: () => undefined
  159. });
  160. window.alert = () => {};
  161. """)
  162. # 页面加载策略
  163. driver.get(url)
  164. while True:
  165. # 访问当前页
  166. result = find_target_links(driver)
  167. if result == 'stop':
  168. break
  169. # 等待页面加载完成
  170. WebDriverWait(driver, 30).until(
  171. EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
  172. )
  173. # 模拟点击下一页
  174. xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
  175. next_page_btn = WebDriverWait(driver, 15).until(
  176. EC.element_to_be_clickable((By.XPATH, xpath))
  177. )
  178. # 获取下一页的URL
  179. next_page_url = next_page_btn.get_attribute("onclick")
  180. if not next_page_url:
  181. print("已到达最后一页,停止爬取")
  182. break
  183. # 从onclick属性中提取URL
  184. next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
  185. if not next_page_url.startswith(('http://', 'https://')):
  186. base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名
  187. next_page_url = base_url + next_page_url
  188. # 访问下一页
  189. driver.get(next_page_url)
  190. print(f"开始爬取 {next_page_url} 页面数据")
  191. finally:
  192. driver.quit()
  193. def wait_for_download_complete(timeout=30, existing_files=None):
  194. start_time = time.time()
  195. temp_exts = ('.part', '.crdownload')
  196. if existing_files is None:
  197. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  198. while (time.time() - start_time) < timeout:
  199. current_files = set(f.name for f in Path(download_dir).glob('*'))
  200. new_files = [f for f in Path(download_dir).glob('*.rar') if f.name not in existing_files]
  201. if new_files:
  202. # 等待文件大小稳定(不再变化),确保下载完成
  203. stable = True
  204. for file in new_files:
  205. prev_size = file.stat().st_size
  206. time.sleep(1)
  207. curr_size = file.stat().st_size
  208. if curr_size != prev_size:
  209. stable = False
  210. break
  211. if stable:
  212. return new_files
  213. time.sleep(2)
  214. raise TimeoutError("未找到 .rar 文件或超时")
  215. def hierarchical_traversal(root_path, all_records):
  216. """分层遍历:省份->年份->月目录"""
  217. root = Path(root_path)
  218. # 获取所有年份目录
  219. year_dirs = [
  220. item for item in root.iterdir()
  221. if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
  222. ]
  223. # 按年倒序
  224. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  225. # 构造完整的路径:download/shandong/2025/03
  226. print(f"\n年份:{year_dir.name} | 省份:jiangsu")
  227. # 提取月份目录
  228. month_dirs = []
  229. for item in year_dir.iterdir():
  230. if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
  231. month_dirs.append({
  232. "path": item,
  233. "month": int(item.name)
  234. })
  235. # 按月倒序输出
  236. if month_dirs:
  237. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  238. print(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  239. gov_commodity_jiangsu_import_export.process_folder(md['path'], all_records)
  240. gov_commodity_jiangsu_country.process_folder(md['path'])
  241. gov_commodity_jiangsu_city.process_folder(md['path'])
  242. if __name__ == "__main__":
  243. crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html')
  244. print(f"江苏南京海关全量数据下载任务完成")
  245. # 等待5s后执行
  246. time.sleep(5)
  247. all_records = base_mysql.get_hs_all()
  248. hierarchical_traversal(base_country_code.download_dir, all_records)
  249. print("江苏南京海关类章、国家、城市所有文件处理完成!")
  250. time.sleep(5)
  251. base_mysql.update_january_yoy('江苏省')
  252. base_mysql.update_shandong_yoy('江苏省')
  253. print("江苏南京海关城市同比sql处理完成")