123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- import argparse
- import random
- import re
- import time
- from datetime import datetime, timedelta
- from selenium import webdriver
- from selenium.common import TimeoutException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from crossborder.guangdong.guangdong_gongbei_parse_excel import parse_page_region_data, calculate_monthly_data
- from crossborder.utils.db_helper import DBHelper
- from crossborder.guangdong.guangdong_sub_customs_parse_excel import parse_excel
- from crossborder.utils.constants import DOWNLOAD_DIR, CUSTOMS_CITY_MAPPING
- from crossborder.utils.constants import GUANGDONG_CUSTOMS_URL
- from crossborder.utils.dingtalk import send_dingtalk_message
- from crossborder.utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel, download_excel2, \
- batch_download_excel
- from crossborder.utils.log import get_logger
- log = get_logger(__name__)
- from crossborder.utils.parse_utils import traverse_and_process
- download_dir = DOWNLOAD_DIR / "guangdong"
- def generate_target_title(check_year, check_month, customs_name):
- """生成正则匹配的标题模式"""
- global target_title
- if customs_name == "广州海关":
- return rf'{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月广州关区所辖7地市进出口综合统计资料'
- elif customs_name == "深圳海关":
- return rf"{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月(深圳海关|深圳关区)综合统计资料"
- elif customs_name == "拱北海关":
- return rf"\S+市{check_year}\s*年\s*(?:1[--]\s*)?{check_month}月对外贸易进出口统计表"
- elif customs_name == "汕头海关":
- return rf"5市报表{check_year}年(?:1[--]\s*{check_month}月|{check_month}月)(人民币)"
- elif customs_name == "黄埔海关":
- return rf"{check_year}年\s*(?:1[--]\s*)?{check_month}月东莞市进出口企业性质总值表"
- elif customs_name == "江门海关":
- if check_month == 3:
- target_title = rf"{check_year}年\s*(?:一季度|前{check_month}个月|\s*{check_month}月)[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)"
- elif check_month == 12:
- target_title = rf"{check_year}年(?:\s*{check_month}月)?\s*[\u4e00-\u9fa5]+市外贸进出口有关情况统计表(以人民币计价)"
- else:
- target_title = rf"{check_year}年\s*前?{check_month}个?月.*外贸进出口有关情况统计表(以人民币计价)"
- return target_title
- elif customs_name == "湛江海关":
- if check_month == 3:
- target_title = rf"{check_year}年\s*(?:一季度|前3个月|3月).*外贸进出口数据"
- elif check_month == 9:
- target_title = rf"{check_year}年\s*(?:前三季度|前9个月|9月).*外贸进出口数据"
- elif check_month == 12:
- target_title = rf'^{check_year}年(?:及{check_month}月份)?湛江市、茂名市(?:外贸)?进出口数据'
- else:
- target_title = rf"{check_year}年\s*前?{check_month}个?月.*(外贸)?进出口数据"
- return target_title
- else:
- return rf"{check_year}\s*年\s*(?:1[--]\s*)??{check_month}月{customs_name}进出口综合统计资料"
- def detect_latest_month(driver,customs_name):
- """三级回溯智能检测最新有效月份"""
- current_date = datetime.now()
- for offset in range(0, 3):
- check_date = current_date - timedelta(days=offset * 30)
- check_year = check_date.year
- check_month = check_date.month
- # 根据海关名称生成对应的标题
- target_title = generate_target_title(check_year, check_month, customs_name)
- try:
- WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- )
- # 获取所有 <a> 标签
- links = driver.find_elements(By.XPATH, '//a[@title]')
- # 使用 Python 正则匹配 title
- for link in links:
- title = link.get_attribute('title')
- if re.search(target_title, title, re.IGNORECASE):
- log.info(f"【{customs_name}】最新月份数据 {check_year}-{check_month}:{title}")
- return check_year, check_month
- except Exception as e:
- log.info(f"未找到 {target_title}: {e}")
- continue
- raise Exception("三个月内未找到有效数据")
- def process_month_data(driver, year, month, customs_name,found_count, max_retries=3):
- """带重试机制的表格数据处理"""
- target_title = generate_target_title(year, month, customs_name)
- links = driver.find_elements(By.XPATH, '//a[@title]')
- for link in links:
- try:
- title = link.get_attribute('title')
- if re.search(target_title, title, re.IGNORECASE):
- # log.info(f"【{customs_name}】匹配到目标: {title}")
- url = link.get_attribute("href")
- for attempt in range(max_retries):
- try:
- if customs_name in ['汕头海关', '江门海关']:
- download_excel2(driver, link, year, month, title, download_dir)
- elif customs_name in ['湛江海关', '广州海关']:
- batch_download_excel(driver, url, year, month, title, download_dir)
- elif customs_name == "拱北海关":
- parse_page_region_data(driver, url, year, month, title)
- else:
- download_excel(driver, url, year, month, title, download_dir)
- found_count += 1
- time.sleep(random.uniform(0.5, 1.5)) # 下载间隔
- break
- except Exception as e:
- log.info(f"【{customs_name}】第 {attempt + 1} 次重试失败: {str(e)}")
- if attempt + 1 == max_retries:
- log.info(f"【{customs_name}】已达最大重试次数,放弃采集: {title}")
- except Exception as e:
- log.info(f"无法获取 title 属性: {e}")
- log.info(f"本页找到{found_count}个有效表格")
- return found_count
- def reverse_crawler(driver, target_months, customs_name):
- """逆向分页抓取核心(优化分页逻辑)"""
- processed_months = set()
- # target_months = [(2023, 5), (2023, 4)]
- page = 1
- for year, month in target_months:
- log.info(f"开始处理{customs_name} {year}年{month}月数据".center(55, "="))
- WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- )
- found_tables = 0
- table_nums = 1
- if customs_name == "拱北海关" or customs_name == "江门海关":
- table_nums = 2
- while True:
- # 智能等待页面稳定
- random_sleep(base=2, variance=3)
- try:
- log.info(f"【{customs_name}】当前页面:{driver.current_url}, 第{page}页")
- # 处理当前页面的表格数据
- found = process_month_data(driver, year, month ,customs_name,found_tables)
- found_tables += found
- # 完成四个表格采集
- if found_tables >= table_nums:
- log.info(f"【{customs_name}】已完成{year}年{month}月全部表格采集")
- processed_months.add((year, month))
- break
- log.info(f"【{customs_name}】第{page}页已采集表格数:{found_tables}/{table_nums},前往下一页采集")
- # 分页操作(增强定位稳定性)
- WebDriverWait(driver, 15).until(
- EC.element_to_be_clickable((By.XPATH, '//a[contains(text(),"下一页")]'))
- ).click()
- page += 1
- except TimeoutException:
- log.info(f"未找到更多分页,已采集表格数:{found_tables}/{table_nums}")
- break
- except Exception as e:
- log.info(f"分页异常:{str(e)}")
- handle_retry(driver) # 异常恢复函数
- break
- return processed_months
- def handle_retry(driver):
- """异常恢复处理"""
- try:
- driver.refresh()
- WebDriverWait(driver, 15).until(
- EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
- )
- log.info("浏览器异常已恢复")
- except:
- log.info("需要人工干预的严重错误")
- raise
- def random_sleep(base=2, variance=5):
- """智能随机等待"""
- sleep_time = base + random.random() * variance
- time.sleep(sleep_time)
- # def process_customs(customs_name, args):
- # """处理单个海关的数据抓取任务"""
- # options = configure_stealth_options(download_dir)
- # driver = webdriver.Firefox(options=options)
- #
- # try:
- # driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
- # valid_year, valid_month = detect_latest_month(driver, customs_name)
- # log.info(f"检测到{customs_name}最新有效数据:{valid_year}-{valid_month:02d}")
- #
- # if customs_name in ['汕头海关', '拱北海关', '江门海关']:
- # skip_january = False
- # else:
- # skip_january = True
- #
- # if args.year:
- # target_months = generate_month_sequence(valid_year, valid_month, args.year, skip_january)
- # else:
- # target_months = generate_month_sequence(valid_year, valid_month)
- #
- # log.info(f"目标采集月份序列:{target_months}")
- # reverse_crawler(driver, target_months, customs_name)
- # log.info(f"{customs_name} {len(target_months)}个月份数据已采集完毕")
- # return customs_name, True
- # except Exception as e:
- # log.info(f"[错误] 采集失败:{customs_name} - {str(e)}")
- # return customs_name, False
- # finally:
- # driver.quit()
- #
- #
- # def main():
- # parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
- # parser.add_argument('--year', type=int, default=None,
- # help='终止年份(如2023),未指定时抓取最新两个月')
- # args = parser.parse_args()
- #
- # customs_list = GUANGDONG_CUSTOMS_URL.keys()
- #
- # # 使用线程池并发采集
- # with ThreadPoolExecutor(max_workers=3) as executor:
- # futures = []
- # for customs_name in customs_list:
- # future = executor.submit(process_customs, customs_name, args)
- # futures.append(future)
- #
- # for future in as_completed(futures):
- # customs_name, success = future.result()
- # if success:
- # log.info(f"[完成] {customs_name} 数据采集成功")
- # else:
- # log.info(f"[失败] {customs_name} 数据采集失败")
- #
- # log.info("\n广东省所有海关数据采集完成。")
- def main():
- """主入口(广东分海关优化版)"""
- parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
- parser.add_argument('--year', type=int, default=None,
- help='终止年份(如2023),未指定时抓取最新两个月')
- args = parser.parse_args()
- start_time = time.time()
- # 状态跟踪变量
- total_months_count = 0 # 总采集月份数
- customs_collected = [] # 成功采集的海关名单
- data_collected = False # 是否有数据采集
- all_customs_processed = [] # 已处理海关列表
- log.info("【广东省】分海关数据采集开始".center(66, "*"))
- driver = None
- try:
- # 1. 初始化浏览器
- driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
- log.info("浏览器初始化完成")
- # 2. 遍历各海关
- for customs_name in GUANGDONG_CUSTOMS_URL.keys():
- try:
- log.info(f"\n{'=' * 66}\n【{customs_name}】数据采集开始".center(66, "="))
- # 进入海关页面
- driver.get(GUANGDONG_CUSTOMS_URL[customs_name])
- # 检测最新有效月份
- valid_year, valid_month = detect_latest_month(driver, customs_name)
- log.info(f"【{customs_name}】检测到最新有效数据:{valid_year}-{valid_month:02d}")
- # 设置是否跳过1月数据的标志
- skip_january = customs_name not in ['汕头海关', '拱北海关']
- # 3. 生成目标月份序列
- if args.year:
- # 指定年份时:从最新月到目标年1月
- target_months = generate_month_sequence(
- start_year=valid_year,
- start_month=valid_month,
- end_year=args.year,
- skip_january=skip_january
- )
- else:
- # 未指定年份时:检查最新月份是否已存在
- db = DBHelper()
- count = db.get_code_exist(
- f'{valid_year}-{valid_month:02d}',
- "440000",
- is_city=True,
- customs_name=customs_name
- )
- if count > 0:
- log.warning(f"⏩ 跳过【{customs_name}】- 数据库已存在{CUSTOMS_CITY_MAPPING[customs_name]} {valid_year}-{valid_month:02d} 数据")
- continue
- # 未指定年份时:取最近两个月
- target_months = generate_month_sequence(
- start_year=valid_year,
- start_month=valid_month
- )
- # 记录目标月份
- total_months_count += len(target_months)
- data_collected = True
- log.info(f"【{customs_name}】目标采集月份:{len(target_months)}个月份")
- # 4. 执行数据采集
- if target_months: # 确保有月份需要采集
- reverse_crawler(driver, target_months, customs_name)
- customs_collected.append(customs_name)
- log.info(f"【{customs_name}】{len(target_months)}个月份采集完成")
- # 拱北海关特殊处理
- if customs_name == '拱北海关':
- for year, month in target_months:
- log.info(f"🔢 【拱北海关】计算 {year}-{month:02d} 单月数据...")
- calculate_monthly_data(year, month)
- # 添加分隔线
- log.info(f"【{customs_name}】处理完成".center(66, "="))
- except Exception as e:
- # 捕获单个海关采集异常
- log.exception(f"⚠️ 【{customs_name}】采集过程中发生错误: {str(e)}")
- send_dingtalk_message(f"【{customs_name}】海关采集异常: {str(e)}")
- finally:
- # 记录已处理海关
- all_customs_processed.append(customs_name)
- # 5. 所有海关处理完成后
- if data_collected:
- log.info(f"\n{'=' * 66}\n【广东省】所有海关处理完成,开始数据清洗入库")
- log.info("数据清洗入库中...")
- traverse_and_process(download_dir, parse_excel, province_name="guangdong", year=args.year)
- log.info("广东省地级市数据同比更新中...")
- db_helper = DBHelper()
- db_helper.update_prov_yoy("广东省")
- log.info("地级市数据同比更新完成")
- # 计算总耗时
- duration = time.time() - start_time
- minutes, seconds = divmod(duration, 60)
- # 准备通知信息
- if customs_collected:
- customs_str = "、".join(customs_collected)
- month_info = f"{total_months_count}个月份"
- else:
- customs_str = "无海关数据被采集"
- month_info = "0个月份"
- message = (
- f"【广东省海关数据采集完成】\n"
- f"• 已处理海关: {len(all_customs_processed)}个\n"
- f"• 成功采集海关: {len(customs_collected)}个\n"
- f"• 采集海关: {customs_str}\n"
- f"• 总采集月份: {month_info}\n"
- f"• 总耗时: {int(minutes)}分{seconds:.1f}秒"
- )
- send_dingtalk_message(message)
- else:
- log.warning("本次未采集到任何新数据")
- # send_dingtalk_message("【广东省海关采集】所有海关最新月份数据已存在,未执行采集")
- except Exception as e:
- # 全局异常捕获
- log.exception(f"‼️ 广东省海关采集全局错误: {str(e)}")
- send_dingtalk_message(f"【广东海关采集异常】全局错误: {str(e)}")
- finally:
- # 确保浏览器安全退出
- if driver:
- driver.quit()
- log.info("浏览器已安全退出")
- log.info("【广东省】分海关数据采集结束".center(66, "*"))
- if __name__ == "__main__":
- main()
|