| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 | 
							- import argparse
 
- import os
 
- import random
 
- import re
 
- import time
 
- from datetime import datetime, timedelta
 
- from pathlib import Path
 
- from selenium.webdriver.firefox.service import Service
 
- from webdriver_manager.firefox import GeckoDriverManager
 
- from faker import Faker
 
- from selenium import webdriver
 
- from selenium.webdriver import FirefoxOptions
 
- from selenium.webdriver.common.by import By
 
- from selenium.webdriver.support import expected_conditions as EC
 
- from selenium.webdriver.support.ui import WebDriverWait
 
- from crossborder.anhui import gov_commodity_anhui_city, download_dir
 
- from crossborder.anhui import gov_commodity_anhui_country
 
- from crossborder.anhui import gov_commodity_anhui_import_export
 
- from crossborder.utils import base_country_code, base_mysql
 
- from crossborder.utils.base_country_code import extract_year_month
 
- from crossborder.utils.dingtalk import send_dingtalk_message
 
- from crossborder.utils.log import  get_logger
 
- import urllib.request
 
- import urllib.error
 
- log = get_logger(__name__)
 
- def configure_stealth_options():
 
-     """增强型反检测配置[1,4](@ref)"""
 
-     opts = FirefoxOptions()
 
-     print("当前下载路径:", Path(download_dir).resolve())
 
-     # 文件下载配置
 
-     opts.set_preference("browser.download.dir", download_dir)
 
-     opts.set_preference("browser.download.folderList", 2)
 
-     opts.set_preference("browser.download.manager.showWhenStarting", False)
 
-     opts.set_preference("browser.helperApps.neverAsk.saveToDisk",
 
-                         "application/octet-stream, application/vnd.ms-excel")  # 覆盖常见文件类型
 
-     opts.set_preference("browser.download.manager.useWindow", False)  # 禁用下载管理器窗口
 
-     opts.set_preference("browser.download.manager.showAlertOnComplete", False)  # 关闭完成提示
 
-     # 反检测参数
 
-     opts.set_preference("dom.webdriver.enabled", False)
 
-     opts.set_preference("useAutomationExtension", False)
 
-     opts.add_argument("--disable-blink-features=AutomationControlled")
 
-     # 动态指纹
 
-     fake = Faker()
 
-     opts.set_preference("general.useragent.override", fake.firefox())
 
-     opts.set_preference("intl.accept_languages", "zh-CN,zh;q=0.9")
 
-     # 视口配置
 
-     opts.add_argument("--width=1440")
 
-     opts.add_argument("--height=900")
 
-     opts.add_argument("--headless")
 
-     return opts
 
- def find_target_links(driver, year_month):
 
-     """点击列表页链接进入详情页下载文件"""
 
-     WebDriverWait(driver, 30).until(
 
-         EC.presence_of_element_located((By.ID, "conRight"))
 
-     )
 
-     try:
 
-         # 获取列表页所有 <a> 标签
 
-         elements = driver.find_elements(By.XPATH, '//ul[@class="conList_ul"]//a')
 
-         if not elements:
 
-             return None
 
-         processed_urls = set()
 
-         for link in elements:
 
-             link_url = link.get_attribute("href")
 
-             if link_url in processed_urls:
 
-                 continue
 
-             # 新标签页打开链接
 
-             driver.execute_script("window.open(arguments[0]);", link_url)
 
-             driver.switch_to.window(driver.window_handles[-1])
 
-             log.info(f"正在处理详情页: {link_url}")
 
-             try:
 
-                 # 在详情页下载文件
 
-                 download_result = download_file_from_detail_page(driver, year_month)
 
-                 if download_result == 'stop':
 
-                     return 'stop'
 
-                 processed_urls.add(link_url)
 
-             finally:
 
-                 # 关闭当前详情页并切回主窗口
 
-                 driver.close()
 
-                 driver.switch_to.window(driver.window_handles[0])
 
-             time.sleep(random.uniform(1, 3))
 
-         return None
 
-     except Exception as e:
 
-         log.info(f"下载时发生异常: {str(e)}")
 
- def download_file_from_detail_page(driver, year_month):
 
-     WebDriverWait(driver, 30).until(
 
-         EC.presence_of_element_located((By.ID, "easysiteText"))
 
-     )
 
-     try:
 
-         elements = driver.find_elements(By.XPATH, '//div[@id="easysiteText"]//a')
 
-         if not elements:
 
-             log.info("详情页未找到目标文件链接")
 
-             return None
 
-         for download_btn in elements:
 
-             file_name = download_btn.text.strip()
 
-             if not file_name:
 
-                 continue
 
-             if year_month is None:
 
-                 if file_name.startswith('2022'):
 
-                     return 'stop'
 
-             else:
 
-                 if not file_name.startswith(year_month):
 
-                     log.info(f"非 {year_month} 文件: {file_name}, stop")
 
-                     return 'stop'
 
-             if '美元' in file_name or '商品贸易方式' in file_name or '进出口总值' in file_name or '月度表' in file_name:
 
-                 log.info(f'{file_name} 不需要此文件,跳过')
 
-                 continue
 
-             file_url = download_btn.get_attribute("href")
 
-             if not file_url.startswith(('http://', 'https://')):
 
-                 base_url = driver.current_url.split('//')[0] + '//' + driver.current_url.split('/')[2]
 
-                 file_url = base_url + file_url
 
-             if not file_url.lower().endswith(('.xls', '.xlsx')):
 
-                 log.info(f"跳过非 Excel 文件: {file_url}")
 
-                 continue
 
-             log.info(f"正在下载: {file_name} → {file_url}")
 
-             # 记录下载前的文件列表
 
-             existing_files = set(f.name for f in Path(download_dir).glob('*'))
 
-             # 随机点击延迟
 
-             time.sleep(random.uniform(1, 3))
 
-             download_btn.click()
 
-             downloaded_file = wait_for_download_complete(existing_files=existing_files)
 
-             year, start_month, month = extract_year_and_month(file_name)
 
-             final_path = Path(download_dir) / year / month / f"{file_name}"
 
-             if os.path.exists(final_path):
 
-                 log.info(f"文件已存在:{file_name} 正在覆盖...")
 
-                 os.unlink(final_path)
 
-             final_dir = Path(download_dir) / year / month
 
-             final_dir.mkdir(parents=True, exist_ok=True)
 
-             log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
 
-             downloaded_file.rename(final_path)
 
-             log.info(f"√ 下载成功:{final_path} \n")
 
-         return None
 
-     except Exception as e:
 
-         log.info(f"详情页处理异常: {str(e)}")
 
-         return None
 
- def extract_year_and_month(file_name):
 
-     # 支持两种格式:
 
-     #  - 2025年1-2月xxx
 
-     #  - 2025年3月xxx
 
-     match = re.search(r"(\d{4})年(\d{1,2})(?:-(\d{1,2}))?月", file_name)
 
-     if match:
 
-         year = match.group(1)
 
-         start_month = match.group(2)
 
-         end_month = match.group(3) if match.group(3) else start_month
 
-         return year, start_month.zfill(2), end_month.zfill(2)
 
-     else:
 
-         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
- def detect_latest_month(driver, url):
 
-     driver.get(url)
 
-     current_date = datetime.now()
 
-     for offset in range(0, 3):
 
-         check_date = current_date - timedelta(days=offset * 30)
 
-         check_year = check_date.year
 
-         check_month = check_date.month
 
-         target_title = f"{check_year}年{check_month}月"
 
-         try:
 
-             WebDriverWait(driver, 10).until(
 
-                 EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
 
-             )
 
-             log.info(f"已找到最新月份数据 {check_year}-{check_month}")
 
-             # 看是否已存表,已存则跳过;
 
-             count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', "340000")
 
-             if count > 0:
 
-                 log.info(f"已存在 {check_year}-{check_month} 数据,跳过")
 
-                 continue
 
-             return f"{check_year}年{check_month}月"
 
-         except:
 
-         # except Exception as e:
 
-             log.error(f"未找到 {target_title}")
 
-             # log.error(f"未找到 {target_title} {e}")
 
-             continue
 
-     log.error("三个月内未找到有效数据")
 
-     return None
 
- def check_internet_connection(url="http://www.baidu.com", timeout=5):
 
-     """检查网络连接"""
 
-     try:
 
-         urllib.request.urlopen(url, timeout=timeout)
 
-         return True
 
-     except urllib.error.URLError:
 
-         return False
 
- def crawl_with_selenium(url, mark):
 
-     if not check_internet_connection():
 
-         log.error("无法连接到互联网,请检查网络设置")
 
-         raise Exception("网络连接失败")
 
-     driver = None
 
-     year_month = None
 
-     try:
 
-         # 使用WebDriverManager自动管理geckodriver
 
-         service = Service(GeckoDriverManager().install())
 
-         driver = webdriver.Firefox(service=service, options=configure_stealth_options())
 
-         log.info("Firefox WebDriver初始化成功")
 
-         year_month = None
 
-         if 'auto' == mark:
 
-             res = detect_latest_month(driver, url)
 
-             if res is None:
 
-                 log.info("安徽省海关没有最新数据更新")
 
-                 return None
 
-             year_month = res
 
-             print(f"检测到最新有效数据:{year_month}")
 
-         # 注入反检测脚本
 
-         driver.execute_script("""
 
-             Object.defineProperty(navigator, 'webdriver', { 
 
-                 get: () => undefined 
 
-             });
 
-             window.alert = () => {};
 
-         """)
 
-         # 页面加载策略
 
-         driver.get(url)
 
-         while True:
 
-             # 访问当前页
 
-             result = find_target_links(driver, year_month)
 
-             if result == 'stop':
 
-                 break
 
-             # 等待页面加载完成
 
-             WebDriverWait(driver, 30).until(
 
-                 EC.presence_of_element_located((By.CLASS_NAME, "gg_page"))
 
-             )
 
-             # 模拟点击下一页
 
-             xpath = f'//div[@class="easysite-page-wrap"]//a[@title="下一页"]'
 
-             next_page_btn = WebDriverWait(driver, 15).until(
 
-                 EC.element_to_be_clickable((By.XPATH, xpath))
 
-             )
 
-             # 获取下一页的URL
 
-             next_page_url = next_page_btn.get_attribute("onclick")
 
-             if not next_page_url:
 
-                 log.info("已到达最后一页,停止采集")
 
-                 break
 
-             # 从onclick属性中提取URL
 
-             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
 
-             if not next_page_url.startswith(('http://', 'https://')):
 
-                 base_url = 'http://shijiazhuang.customs.gov.cn'  # 替换为实际的域名
 
-                 next_page_url = base_url + next_page_url
 
-             # 访问下一页
 
-             driver.get(next_page_url)
 
-             log.info(f"开始采集 {next_page_url} 页面数据")
 
-     finally:
 
-         if driver:
 
-             driver.quit()
 
-         print(f"安徽省合肥海关全量数据下载任务完成")
 
-         # 等待5s后执行
 
-         time.sleep(3)
 
-         hierarchical_traversal(download_dir, year_month)
 
-         print("安徽省海关类章、国家、城市所有文件处理完成!")
 
-         time.sleep(3)
 
-         base_mysql.update_shandong_yoy('安徽省')
 
-         print("安徽省合肥海关城市同比sql处理完成")
 
-     return 'finish', year_month
 
- def wait_for_download_complete(timeout=30, existing_files=None):
 
-     """
 
-     监控下载目录,等待文件下载完成并返回新下载的文件。
 
-     :param timeout: 超时时间(秒)
 
-     :param existing_files: 下载前已存在的文件列表
 
-     :return: 新下载的文件路径
 
-     """
 
-     start_time = time.time()
 
-     temp_exts = ('.part', '.crdownload')
 
-     if existing_files is None:
 
-         existing_files = set(f.name for f in Path(download_dir).glob('*'))
 
-     while (time.time() - start_time) < timeout:
 
-         # 获取有效文件列表
 
-         valid_files = []
 
-         for f in Path(download_dir).glob('*'):
 
-             if (f.name not in existing_files and
 
-                 not f.name.endswith(temp_exts) and
 
-                 f.stat().st_size > 0):
 
-                 valid_files.append(f)
 
-         # 等待最新文件稳定
 
-         if valid_files:
 
-             return max(valid_files, key=lambda x: x.stat().st_mtime)
 
-         time.sleep(2)
 
-     raise TimeoutError("文件下载超时")
 
- def hierarchical_traversal(root_path, year_month):
 
-     """分层遍历:省份->年份->月目录"""
 
-     root = Path(root_path)
 
-     # 获取所有年份目录
 
-     year_dirs = [
 
-         item for item in root.iterdir()
 
-         if item.is_dir() and base_country_code.YEAR_PATTERN.match(item.name)
 
-     ]
 
-     # 按年倒序
 
-     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
 
-         # 构造完整的路径:download/shandong/2025/03
 
-         print(f"\n年份:{year_dir.name} | 省份:anhui")
 
-         # 提取月份目录
 
-         month_dirs = []
 
-         for item in year_dir.iterdir():
 
-             if item.is_dir() and base_country_code.MONTH_PATTERN.match(item.name):
 
-                 month_dirs.append({
 
-                     "path": item,
 
-                     "month": int(item.name)
 
-                 })
 
-         # 按月倒序输出
 
-         if month_dirs:
 
-             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
 
-                 print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
 
-                 path = md['path']
 
-                 if year_month is not None:
 
-                     year, month = extract_year_month(year_month)
 
-                     parts = path.parts
 
-                     if year_dir.name != year or parts[-1] != month:
 
-                         log.info(f"安徽省海关已处理 {year_month} 数据,返回")
 
-                         return
 
-                 gov_commodity_anhui_import_export.process_folder(path)
 
-                 gov_commodity_anhui_country.process_folder(path)
 
-                 gov_commodity_anhui_city.process_folder(path)
 
- def main():
 
-     try:
 
-         parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
 
-         parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
 
-         args = parser.parse_args()
 
-         start_time = time.time()
 
-         if args.year == 2023:
 
-             log.info("正在全量采集安徽省海关数据")
 
-             crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html','all')
 
-             duration = time.time() - start_time
 
-             minutes, seconds = divmod(duration, 60)
 
-             send_dingtalk_message(f'【安徽省海关】全量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
 
-         else:
 
-             log.info("正在增量采集安徽省海关数据")
 
-             res = crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html','auto')
 
-             if res is not None:
 
-                 r1, r2 = res
 
-                 if r1 == 'finish':
 
-                     duration = time.time() - start_time
 
-                     minutes, seconds = divmod(duration, 60)
 
-                     send_dingtalk_message(f'【安徽省海关】 {r2} 增量数据采集完成,{int(minutes)}分{seconds:.1f}秒')
 
-     except Exception as e:
 
-         # send_dingtalk_message(f'【安徽省海关】发生错误:{e}')
 
-         log.error(f'【安徽省海关】发生错误:{e}')
 
- if __name__ == '__main__':
 
-     main()
 
 
  |