import argparse import random import re import time from datetime import datetime, timedelta from selenium import webdriver from selenium.common import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from db_helper import DBHelper from guangdong.guangdong_gongbei_parse_excel import parse_region_table, calculate_monthly_data from guangdong.guangdong_sub_customs_parse_excel import parse_excel from utils.constants import DOWNLOAD_DIR from utils.constants import GUANGDONG_CUSTOMS_URL from utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel, download_excel2, \ batch_download_excel from utils.log import log from utils.parse_utils import traverse_and_process download_dir = DOWNLOAD_DIR / "guangdong" def generate_target_title(check_year, check_month, customs_name): """生成正则匹配的标题模式""" global target_title if customs_name == "广州海关": return rf'{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月广州关区所辖7地市进出口综合统计资料' elif customs_name == "深圳海关": return rf"{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月(深圳海关|深圳关区)综合统计资料" elif customs_name == "拱北海关": return rf"\S+市{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月对外贸易进出口统计表" elif customs_name == "汕头海关": return rf"5市报表{check_year}年(?:1[--]\s*{check_month}月|{check_month}月)(人民币)" elif customs_name == "黄埔海关": return rf"{check_year}年\s*(?:1[--]\s*)?{check_month}月东莞市进出口企业性质总值表" elif customs_name == "江门海关": if check_month == 3: target_title = rf"{check_year}年\s*(?:一季度|前{check_month}个月|\s*{check_month}月)[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)" elif check_month == 12: target_title = rf"{check_year}年(?:\s*{check_month}月)?\s*[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)" else: target_title = rf"{check_year}年\s*前?{check_month}个?月.*外贸进出口有关情况统计表(以人民币计价)" return target_title elif customs_name == "湛江海关": if check_month == 3: target_title = rf"{check_year}年\s*(?:一季度|前3个月|3月).*外贸进出口数据" elif check_month == 9: target_title = rf"{check_year}年\s*(?:前三季度|前9个月|9月).*外贸进出口数据" elif check_month == 12: target_title = rf'^{check_year}年(?:及{check_month}月份)?湛江市、茂名市(?:外贸)?进出口数据' else: target_title = rf"{check_year}年\s*前?{check_month}个?月.*(外贸)?进出口数据" return target_title else: return rf"{check_year}\s*年\s*(?:1[--]\s*)??{check_month}月{customs_name}进出口综合统计资料" def detect_latest_month(driver,customs_name): """三级回溯智能检测最新有效月份""" current_date = datetime.now() for offset in range(0, 3): check_date = current_date - timedelta(days=offset * 30) check_year = check_date.year check_month = check_date.month # 根据海关名称生成对应的标题 target_title = generate_target_title(check_year, check_month, customs_name) try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) # 获取所有 标签 links = driver.find_elements(By.XPATH, '//a[@title]') # 使用 Python 正则匹配 title for link in links: title = link.get_attribute('title') if re.search(target_title, title, re.IGNORECASE): log.info(f"【{customs_name}】最新月份数据 {check_year}-{check_month}:{title}") return check_year, check_month except Exception as e: log.info(f"未找到 {target_title}: {e}") continue raise Exception("三个月内未找到有效数据") def process_month_data(driver, year, month, customs_name,found_count, max_retries=3): """带重试机制的表格数据处理""" target_title = generate_target_title(year, month, customs_name) links = driver.find_elements(By.XPATH, '//a[@title]') for link in links: try: title = link.get_attribute('title') if re.search(target_title, title, re.IGNORECASE): # log.info(f"【{customs_name}】匹配到目标: {title}") url = link.get_attribute("href") for attempt in range(max_retries): try: if customs_name in ['汕头海关', '江门海关']: download_excel2(driver, link, year, month, title, download_dir) elif customs_name in ['湛江海关', '广州海关']: batch_download_excel(driver, url, year, month, title, download_dir) elif customs_name == "拱北海关": parse_region_table(driver, url, year, month, title) else: download_excel(driver, url, year, month, title, download_dir) found_count += 1 time.sleep(random.uniform(0.5, 1.5)) # 下载间隔 break except Exception as e: log.info(f"【{customs_name}】第 {attempt + 1} 次重试失败: {str(e)}") if attempt + 1 == max_retries: log.info(f"【{customs_name}】已达最大重试次数,放弃采集: {title}") except Exception as e: log.info(f"无法获取 title 属性: {e}") log.info(f"本页找到{found_count}个有效表格") return found_count def reverse_crawler(driver, target_months, customs_name): """逆向分页抓取核心(优化分页逻辑)""" processed_months = set() # target_months = [(2023, 5), (2023, 4)] page = 1 for year, month in target_months: log.info(f"开始处理{customs_name} {year}年{month}月数据".center(55, "=")) WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) found_tables = 0 table_nums = 1 if customs_name == "拱北海关" or customs_name == "江门海关": table_nums = 2 while True: # 智能等待页面稳定 random_sleep(base=2, variance=3) try: log.info(f"【{customs_name}】当前页面:{driver.current_url}, 第{page}页") # 处理当前页面的表格数据 found = process_month_data(driver, year, month ,customs_name,found_tables) found_tables += found # 完成四个表格采集 if found_tables >= table_nums: log.info(f"【{customs_name}】已完成{year}年{month}月全部表格采集") processed_months.add((year, month)) break log.info(f"【{customs_name}】第{page}页已采集表格数:{found_tables}/{table_nums},前往下一页采集") # 分页操作(增强定位稳定性) WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]')) ).click() page += 1 except TimeoutException: log.info(f"未找到更多分页,已采集表格数:{found_tables}/{table_nums}") break except Exception as e: log.info(f"分页异常:{str(e)}") handle_retry(driver) # 异常恢复函数 break return processed_months def handle_retry(driver): """异常恢复处理""" try: driver.refresh() WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) log.info("浏览器异常已恢复") except: log.info("需要人工干预的严重错误") raise def random_sleep(base=2, variance=5): """智能随机等待""" sleep_time = base + random.random() * variance time.sleep(sleep_time) from concurrent.futures import ThreadPoolExecutor, as_completed # def process_customs(customs_name, args): # """处理单个海关的数据抓取任务""" # options = configure_stealth_options(download_dir) # driver = webdriver.Firefox(options=options) # # try: # driver.get(GUANGDONG_CUSTOMS_URL[customs_name]) # valid_year, valid_month = detect_latest_month(driver, customs_name) # log.info(f"检测到{customs_name}最新有效数据:{valid_year}-{valid_month:02d}") # # if customs_name in ['汕头海关', '拱北海关', '江门海关']: # skip_january = False # else: # skip_january = True # # if args.year: # target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january) # else: # target_months = generate_month_sequence(valid_year, valid_month) # # log.info(f"目标采集月份序列:{target_months}") # reverse_crawler(driver, target_months, customs_name) # log.info(f"{customs_name} {len(target_months)}个月份数据已采集完毕") # return customs_name, True # except Exception as e: # log.info(f"[错误] 采集失败:{customs_name} - {str(e)}") # return customs_name, False # finally: # driver.quit() # # # def main(): # parser = argparse.ArgumentParser(description='海关数据智能抓取系统') # parser.add_argument('--year', type=int, default=None, # help='终止年份(如2023),未指定时抓取最新两个月') # args = parser.parse_args() # # customs_list = GUANGDONG_CUSTOMS_URL.keys() # # # 使用线程池并发采集 # with ThreadPoolExecutor(max_workers=3) as executor: # futures = [] # for customs_name in customs_list: # future = executor.submit(process_customs, customs_name, args) # futures.append(future) # # for future in as_completed(futures): # customs_name, success = future.result() # if success: # log.info(f"[完成] {customs_name} 数据采集成功") # else: # log.info(f"[失败] {customs_name} 数据采集失败") # # log.info("\n广东省所有海关数据采集完成。") def main(): """主入口(优化参数处理逻辑)""" parser = argparse.ArgumentParser(description='海关数据智能抓取系统') parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月') args = parser.parse_args() driver = webdriver.Firefox(options=configure_stealth_options(download_dir)) for customs_name in GUANGDONG_CUSTOMS_URL.keys(): try: driver.get(GUANGDONG_CUSTOMS_URL[customs_name]) log.info(f"【{customs_name}】数据采集开始……") valid_year, valid_month = detect_latest_month(driver, customs_name) log.info(f"【{customs_name}】检测到最新有效数据:{valid_year}-{valid_month:02d}") if customs_name in ['汕头海关', '拱北海关']: skip_january = False else: skip_january = True if args.year: target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january) else: target_months = generate_month_sequence(valid_year, valid_month) log.info(f"【{customs_name}】目标采集月份序列:{target_months}") reverse_crawler(driver, target_months, customs_name) if customs_name == '拱北海关': for year, month in target_months: log.info(f"【{customs_name}】{year}-{month:02d}单月数据计算中...") calculate_monthly_data(year, month) log.info(f"【{customs_name}】{len(target_months)}个月份数据已采集完毕".center(66, "=")) finally: pass driver.quit() log.info("【广东省】数据抓取结束".center(66, "*")) log.info("\n广东省数据清洗入库中...") traverse_and_process(download_dir, parse_excel, province_name="guangdong") log.info("\n广东省地级市数据同比更新中...") db_helper = DBHelper() db_helper.update_prov_yoy("广东省") log.info("\n广东省地级市数据同比更新结束") if __name__ == "__main__": main()