crawl_gov_anhui_full.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import os
  2. import random
  3. import re
  4. import time
  5. from datetime import datetime, timedelta
  6. from pathlib import Path
  7. from faker import Faker
  8. from selenium import webdriver
  9. from selenium.webdriver import FirefoxOptions
  10. from selenium.webdriver.common.by import By
  11. from selenium.webdriver.support import expected_conditions as EC
  12. from selenium.webdriver.support.ui import WebDriverWait
  13. from anhui import gov_commodity_anhui_city, download_dir
  14. from anhui import gov_commodity_anhui_country
  15. from anhui import gov_commodity_anhui_import_export
  16. from utils import base_country_code, base_mysql
  17. from utils.log import log
  18. def configure_stealth_options():
  19. """增强型反检测配置[1,4](@ref)"""
  20. opts = FirefoxOptions()
  21. print("当前下载路径:", Path(download_dir).resolve())
  22. # 文件下载配置
  23. opts.set_preference("browser.download.dir", download_dir)
  24. opts.set_preference("browser.download.folderList", 2)
  25. opts.set_preference("browser.download.manager.showWhenStarting", False)
  26. opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
  27. "application/octet-stream, application/vnd.ms-excel") # 覆盖常见文件类型
  28. opts.set_preference("browser.download.manager.useWindow", False) # 禁用下载管理器窗口
  29. opts.set_preference("browser.download.manager.showAlertOnComplete", False) # 关闭完成提示
  30. # 反检测参数
  31. opts.set_preference("dom.webdriver.enabled", False)
  32. opts.set_preference("useAutomationExtension", False)
  33. opts.add_argument("--disable-blink-features=AutomationControlled")
  34. # 动态指纹
  35. fake = Faker()
  36. opts.set_preference("general.useragent.override", fake.firefox())
  37. opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
  38. # 视口配置
  39. opts.add_argument("--width=1440")
  40. opts.add_argument("--height=900")
  41. opts.add_argument("--headless")
  42. return opts
  43. def find_target_links(driver, year_month):
  44. """点击列表页链接进入详情页下载文件"""
  45. WebDriverWait(driver, 30).until(
  46. EC.presence_of_element_located((By.ID, "conRight"))
  47. )
  48. try:
  49. # 获取列表页所有 <a> 标签
  50. elements = driver.find_elements(By.XPATH, '//ul[@class="conList_ul"]//a')
  51. if not elements:
  52. return None
  53. processed_urls = set()
  54. for link in elements:
  55. link_url = link.get_attribute("href")
  56. if link_url in processed_urls:
  57. continue
  58. # 新标签页打开链接
  59. driver.execute_script("window.open(arguments[0]);", link_url)
  60. driver.switch_to.window(driver.window_handles[-1])
  61. log.info(f"正在处理详情页: {link_url}")
  62. try:
  63. # 在详情页下载文件
  64. download_result = download_file_from_detail_page(driver, year_month)
  65. if download_result == 'stop':
  66. return 'stop'
  67. processed_urls.add(link_url)
  68. finally:
  69. # 关闭当前详情页并切回主窗口
  70. driver.close()
  71. driver.switch_to.window(driver.window_handles[0])
  72. time.sleep(random.uniform(1, 3))
  73. return None
  74. except Exception as e:
  75. log.info(f"下载时发生异常: {str(e)}")
  76. def download_file_from_detail_page(driver, year_month):
  77. WebDriverWait(driver, 30).until(
  78. EC.presence_of_element_located((By.ID, "easysiteText"))
  79. )
  80. try:
  81. elements = driver.find_elements(By.XPATH, '//div[@id="easysiteText"]//a')
  82. if not elements:
  83. log.info("详情页未找到目标文件链接")
  84. return None
  85. for download_btn in elements:
  86. file_name = download_btn.text.strip()
  87. if not file_name:
  88. continue
  89. if year_month is None:
  90. if file_name.startswith('2022'):
  91. return 'stop'
  92. else:
  93. if not file_name.startswith(year_month):
  94. log.info(f"非 {year_month} 文件: {file_name}, stop")
  95. return 'stop'
  96. if '美元' in file_name or '商品贸易方式' in file_name or '进出口总值' in file_name or '月度表' in file_name:
  97. log.info(f'{file_name} 不需要此文件,跳过')
  98. continue
  99. file_url = download_btn.get_attribute("href")
  100. if not file_url.startswith(('http://', 'https://')):
  101. base_url = driver.current_url.split('//')[0] + '//' + driver.current_url.split('/')[2]
  102. file_url = base_url + file_url
  103. if not file_url.lower().endswith(('.xls', '.xlsx')):
  104. log.info(f"跳过非 Excel 文件: {file_url}")
  105. continue
  106. log.info(f"正在下载: {file_name} → {file_url}")
  107. # 记录下载前的文件列表
  108. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  109. # 随机点击延迟
  110. time.sleep(random.uniform(1, 3))
  111. download_btn.click()
  112. downloaded_file = wait_for_download_complete(existing_files=existing_files)
  113. year, start_month, month = extract_year_and_month(file_name)
  114. final_path = Path(download_dir) / year / month / f"{file_name}"
  115. if os.path.exists(final_path):
  116. log.info(f"文件已存在:{file_name} 正在覆盖...")
  117. os.unlink(final_path)
  118. final_dir = Path(download_dir) / year / month
  119. final_dir.mkdir(parents=True, exist_ok=True)
  120. log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
  121. downloaded_file.rename(final_path)
  122. log.info(f"√ 下载成功:{final_path} \n")
  123. return None
  124. except Exception as e:
  125. log.info(f"详情页处理异常: {str(e)}")
  126. return None
  127. def extract_year_and_month(file_name):
  128. # 支持两种格式:
  129. # - 2025年1-2月xxx
  130. # - 2025年3月xxx
  131. match = re.search(r"(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name)
  132. if match:
  133. year = match.group(1)
  134. start_month = match.group(2)
  135. end_month = match.group(3) if match.group(3) else start_month
  136. return year, start_month.zfill(2), end_month.zfill(2)
  137. else:
  138. raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
  139. def detect_latest_month(driver, url):
  140. driver.get(url)
  141. current_date = datetime.now()
  142. for offset in range(0, 3):
  143. check_date = current_date - timedelta(days=offset * 30)
  144. check_year = check_date.year
  145. check_month = check_date.month
  146. target_title = f"{check_year}年{check_month}月"
  147. try:
  148. WebDriverWait(driver, 10).until(
  149. EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
  150. )
  151. log.info(f"已找到最新月份数据 {check_year}-{check_month}")
  152. # 看是否已存表,已存则跳过;
  153. count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', "340000")
  154. if count > 0:
  155. log.info(f"已存在 {check_year}-{check_month} 数据,跳过")
  156. continue
  157. return f"{check_year}年{check_month}月"
  158. except:
  159. log.info(f"未找到 {target_title}")
  160. continue
  161. log.info("三个月内未找到有效数据")
  162. return None
  163. def crawl_with_selenium(url, mark):
  164. driver = webdriver.Firefox(options=configure_stealth_options())
  165. year_month = None
  166. if 'increment' == mark:
  167. res = detect_latest_month(driver, url)
  168. if res is None:
  169. log.info("安徽省海关没有最新数据更新")
  170. # sys.exit(0)
  171. return None
  172. year_month = res
  173. print(f"检测到最新有效数据:{year_month}")
  174. try:
  175. # 注入反检测脚本
  176. driver.execute_script("""
  177. Object.defineProperty(navigator, 'webdriver', {
  178. get: () => undefined
  179. });
  180. window.alert = () => {};
  181. """)
  182. # 页面加载策略
  183. driver.get(url)
  184. while True:
  185. # 访问当前页
  186. result = find_target_links(driver, year_month)
  187. if result == 'stop':
  188. break
  189. # 等待页面加载完成
  190. WebDriverWait(driver, 30).until(
  191. EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
  192. )
  193. # 模拟点击下一页
  194. xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
  195. next_page_btn = WebDriverWait(driver, 15).until(
  196. EC.element_to_be_clickable((By.XPATH, xpath))
  197. )
  198. # 获取下一页的URL
  199. next_page_url = next_page_btn.get_attribute("onclick")
  200. if not next_page_url:
  201. log.info("已到达最后一页,停止爬取")
  202. break
  203. # 从onclick属性中提取URL
  204. next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
  205. if not next_page_url.startswith(('http://', 'https://')):
  206. base_url = 'http://shijiazhuang.customs.gov.cn' # 替换为实际的域名
  207. next_page_url = base_url + next_page_url
  208. # 访问下一页
  209. driver.get(next_page_url)
  210. log.info(f"开始爬取 {next_page_url} 页面数据")
  211. finally:
  212. driver.quit()
  213. print(f"安徽合肥海关全量数据下载任务完成")
  214. # 等待5s后执行
  215. time.sleep(5)
  216. hierarchical_traversal(download_dir)
  217. print("安徽合肥海关类章、国家、城市所有文件处理完成!")
  218. time.sleep(5)
  219. base_mysql.update_january_yoy('安徽省')
  220. base_mysql.update_shandong_yoy('安徽省')
  221. print("安徽合肥海关城市同比sql处理完成")
  222. def wait_for_download_complete(timeout=30, existing_files=None):
  223. """
  224. 监控下载目录,等待文件下载完成并返回新下载的文件。
  225. :param timeout: 超时时间(秒)
  226. :param existing_files: 下载前已存在的文件列表
  227. :return: 新下载的文件路径
  228. """
  229. start_time = time.time()
  230. temp_exts = ('.part', '.crdownload')
  231. if existing_files is None:
  232. existing_files = set(f.name for f in Path(download_dir).glob('*'))
  233. while (time.time() - start_time) < timeout:
  234. # 获取有效文件列表
  235. valid_files = []
  236. for f in Path(download_dir).glob('*'):
  237. if (f.name not in existing_files and
  238. not f.name.endswith(temp_exts) and
  239. f.stat().st_size > 0):
  240. valid_files.append(f)
  241. # 等待最新文件稳定
  242. if valid_files:
  243. return max(valid_files, key=lambda x: x.stat().st_mtime)
  244. time.sleep(2)
  245. raise TimeoutError("文件下载超时")
  246. def hierarchical_traversal(root_path):
  247. """分层遍历:省份->年份->月目录"""
  248. root = Path(root_path)
  249. # 获取所有年份目录
  250. year_dirs = [
  251. item for item in root.iterdir()
  252. if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
  253. ]
  254. # 按年倒序
  255. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  256. # 构造完整的路径:download/shandong/2025/03
  257. print(f"\n年份:{year_dir.name} | 省份:anhui")
  258. # 提取月份目录
  259. month_dirs = []
  260. for item in year_dir.iterdir():
  261. if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
  262. month_dirs.append({
  263. "path": item,
  264. "month": int(item.name)
  265. })
  266. # 按月倒序输出
  267. if month_dirs:
  268. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  269. print(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  270. gov_commodity_anhui_import_export.process_folder(md['path'])
  271. gov_commodity_anhui_country.process_folder(md['path'])
  272. gov_commodity_anhui_city.process_folder(md['path'])
  273. def main():
  274. crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html', 'all')
  275. # crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html', 'increment')
  276. # print(f"安徽合肥海关全量数据下载任务完成")
  277. # # 等待5s后执行
  278. # time.sleep(5)
  279. # hierarchical_traversal(base_country_code.download_dir)
  280. # print("安徽合肥海关类章、国家、城市所有文件处理完成!")
  281. # time.sleep(5)
  282. # base_mysql.update_january_yoy('安徽省')
  283. # base_mysql.update_shandong_yoy('安徽省')
  284. # print("安徽合肥海关城市同比sql处理完成")
  285. if __name__ == '__main__':
  286. main()