import argparse import argparse import random import re import time from datetime import datetime, timedelta import pandas as pd from selenium import webdriver from selenium.common import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from db_helper import DBHelper from utils.constants import DOWNLOAD_DIR, COUNTRY_CODE_MAPPING from utils.download_utils import configure_stealth_options, generate_month_sequence from utils.log import log from utils.parse_utils import clean_county_name, convert_wan_to_yuan, clean_commodity_name BASE_URL = "http://gdfs.customs.gov.cn/guangdong_sub/zwgk62/sjgb59/6b4cdb3f-1.html" download_dir = DOWNLOAD_DIR / "guangdong" PROV_CODE = "440000" PROV_NAME = "广东省" db = DBHelper() def detect_latest_month(driver): """三级回溯智能检测最新有效月份(修正年/月匹配逻辑)""" driver.get(BASE_URL) current_date = datetime.now() for offset in range(0, 3): check_date = current_date - timedelta(days=offset * 30) check_year = check_date.year check_month = check_date.month # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许前后有空格 pattern = re.compile( rf'(5){check_year}\s*年\s*(?:1至)?{check_month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)', re.IGNORECASE ) try: # 使用 Python 端的正则匹配所有含“广东省”的链接 title elements = WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.XPATH, '//a[contains(@title,"广东省")]')) ) for element in elements: title = element.get_attribute("title") if pattern.search(title): log.info(f"已找到最新月份数据 {check_year}-{check_month}") return check_year, check_month log.info(f"未找到匹配项(正则:{pattern.pattern})") except TimeoutException: log.info(f"页面加载超时或无匹配项({check_year}-{check_month})") continue raise Exception("三个月内未找到有效数据") def process_month_data(driver, year, month): """处理月度数据:支持三种表格类型""" patterns = [ (re.compile(rf'(5){year}\s*年\s*(1-)?{month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)'), 'country'), (re.compile(rf'(6){year}\s*年\s*(1-)?{month}\s*月广东省出口重点商品总值表(人民币值)'), 'export_commodity'), (re.compile(rf'(7){year}\s*年\s*(1-)?{month}\s*月广东省进口重点商品总值表(人民币值)'), 'import_commodity') ] found_count = 0 commodity_data = {'export': [], 'import': []} # 存储商品数据等待合并 links = driver.find_elements(By.XPATH, '//a[contains(@title,"广东省")]') for link in links: title = link.get_attribute("title") for pattern, table_type in patterns: if pattern.search(title): log.info(f"处理表格: {title}") url = link.get_attribute("href") # 新标签页处理 driver.execute_script("window.open(arguments[0]);", url) driver.switch_to.window(driver.window_handles[-1]) try: WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.XPATH, "//table[@border='1']")) ) # 根据表格类型处理数据 if table_type == 'country': data = parse_country_table(driver, year, month) df_country = pd.DataFrame(data) db.bulk_insert( df_country, 't_yujin_crossborder_prov_country_trade', conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) found_count += 1 else: data_type = 'export' if table_type == 'export_commodity' else 'import' commodity_data[data_type] = parse_commodity_table(driver, data_type, year, month) found_count += 1 except Exception as e: log.info(f"表格处理失败: {e}") # 将数据返回,而不是在内部合并 return found_count, commodity_data def parse_country_table(driver, year, month): """解析目标页面的表格数据""" data = [] try: # 等待表格加载 WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.XPATH, "//table[@border='1']")) ) table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']") rows = table.find_elements(By.TAG_NAME, 'tr') # 检测表格的列数 header_row = rows[2] # 假设表头在第3行 header_row.find_elements(By.TAG_NAME, 'td') # 数据行从第4行开始(跳过表头) for row in rows[4:]: cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')] country_name = cols[0] if (country_name == '广东外贸总值' or country_name == '东盟' or country_name == '欧盟' or country_name == '总计' or country_name == '广东外贸总额' or country_name == '广东外贸总计' or country_name == '总值' ): continue if month == 2: # 处理合并后的1月和2月数据 monthly_total = convert_wan_to_yuan(cols[1]) monthly_export = convert_wan_to_yuan(cols[3]) monthly_import = convert_wan_to_yuan(cols[5]) # 将2月的数据除以2,并生成1月和2月的数据 for m in [1, 2]: adjusted_monthly_total = monthly_total / 2 adjusted_monthly_export = monthly_export / 2 adjusted_monthly_import = monthly_import / 2 adjusted_yoy_total = 0 adjusted_yoy_export = 0 adjusted_yoy_import = 0 country_name_clean = clean_county_name(country_name) country_code = COUNTRY_CODE_MAPPING.get(country_name_clean) data.append({ 'crossborder_year': year, 'crossborder_year_month': f"{year}-{m:02d}", 'prov_code': PROV_CODE, 'prov_name': PROV_NAME, 'country_code': country_code, 'country_name': country_name_clean, 'monthly_total': adjusted_monthly_total, 'monthly_export': adjusted_monthly_export, 'monthly_import': adjusted_monthly_import, 'yoy_import_export': adjusted_yoy_total, 'yoy_export': adjusted_yoy_export, 'yoy_import': adjusted_yoy_import }) else: # 原逻辑处理13列的情况 monthly_total = convert_wan_to_yuan(cols[3]) monthly_export = convert_wan_to_yuan(cols[7]) monthly_import = convert_wan_to_yuan(cols[11]) yoy_total = parse_number(cols[4]) yoy_export = parse_number(cols[8]) yoy_import = parse_number(cols[12]) country_name_clean = clean_county_name(country_name) country_code = COUNTRY_CODE_MAPPING.get(country_name_clean) data.append({ 'crossborder_year': year, 'crossborder_year_month': f"{year}-{month:02d}", 'prov_code': PROV_CODE, 'prov_name': PROV_NAME, 'country_code': country_code, 'country_name': country_name_clean, 'monthly_total': monthly_total, 'monthly_export': monthly_export, 'monthly_import': monthly_import, 'yoy_import_export': yoy_total, 'yoy_export': yoy_export, 'yoy_import': yoy_import }) except Exception as e: log.info(f"解析表格失败: {e}") raise finally: driver.close() driver.switch_to.window(driver.window_handles[0]) return data def parse_commodity_table(driver, data_type, year, month): """解析商品表通用函数""" data = [] try: table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']") rows = table.find_elements(By.TAG_NAME, 'tr') # 检测表格的列数 header_row = rows[2] # 假设表头在第3行 cols = header_row.find_elements(By.TAG_NAME, 'td') num_cols = len(cols) # 数据行从第4行开始(跳过表头) for row in rows[4:]: cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')] if len(cols) < 3: continue # 清洗商品名称(处理 和空格) name = clean_commodity_name(cols[0]) if month == 2: # 处理合并后的1月和2月数据 value = convert_wan_to_yuan(cols[1]) # 将2月的数据除以2,并生成1月和2月的数据 for m in [1, 2]: adjusted_value = value / 2 adjusted_yoy = 0 # 同比置为0 data.append({ 'commodity_name': name, 'commodity_code': db.get_commodity_id(name), 'monthly_export' if data_type == 'export' else 'monthly_import': adjusted_value, 'crossborder_year_month': f"{year}-{m:02d}" }) else: # 原逻辑处理5列的情况 value = convert_wan_to_yuan(cols[3] if data_type == 'export' else cols[3]) data.append({ 'commodity_name': name, 'commodity_code': db.get_commodity_id(name), 'monthly_export' if data_type == 'export' else 'monthly_import': value, 'crossborder_year_month': f"{year}-{month:02d}" }) except Exception as e: log.info(f"解析商品表失败: {e}") raise finally: driver.close() driver.switch_to.window(driver.window_handles[0]) return data def merge_commodity_data(import_data, export_data, year, month): """ 根据commodity_code合并进出口数据(支持不同商品的存在情况) :param year: :param month: :param import_data: 进口数据列表(含commodity_code) :param export_data: 出口数据列表(含commodity_code) :return: 合并后的DataFrame """ # 转换数据为DataFrame df_import = pd.DataFrame(import_data) df_export = pd.DataFrame(export_data) # 合并逻辑(全外连接保留所有商品) merged_df = pd.merge( df_import, df_export, on=['commodity_code', 'commodity_name', 'crossborder_year_month'], how='outer' ).fillna(0) # 计算总量(可选,根据表结构需求) merged_df['monthly_total'] = merged_df['monthly_import'] + merged_df['monthly_export'] merged_df['crossborder_year'] = year merged_df['crossborder_year_month'] = f"{year}-{month:02d}" merged_df['prov_code'] = PROV_CODE merged_df['prov_name'] = PROV_NAME return merged_df def parse_number(text): """转换文本为浮点数(处理空值、负号)""" text = text.strip().replace(',', '') if not text or text == '-': return None try: return float(text) except ValueError: return None # 优化后的代码逻辑: def reverse_crawler(driver, target_months): """逆向分页抓取核心逻辑""" processed_months = set() page = 1 for year, month in target_months: log.info(f"\n开始处理 {year}年{month}月数据".center(50, "=")) WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))) current_page = 1 found_tables = 0 export_data = [] import_data = [] while True: random_sleep(base=2, variance=3) try: log.info(f"当前页面:{driver.current_url}, 第{page}页") found, commodity_data = process_month_data(driver, year, month) found_tables += found # 累积商品数据 if commodity_data['export']: export_data.extend(commodity_data['export']) if commodity_data['import']: import_data.extend(commodity_data['import']) # 完成三个表格采集 if found_tables >= 3: # 确保同时有进口和出口数据 if export_data and import_data: final_df = merge_commodity_data(export_data, import_data, year, month) db.bulk_insert(df=final_df, table_name='t_yujin_crossborder_prov_commodity_trade', conflict_columns=['commodity_code', 'crossborder_year_month'], update_columns=['monthly_import', 'monthly_export', 'monthly_total']) log.info(f"已完成{year}年{month}月全部表格采集") processed_months.add((year, month)) break log.info(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集") # 分页点击逻辑 WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.XPATH, '//a[@class="pagingNormal next"]')) ).click() current_page += 1 page += 1 except TimeoutException: log.info(f"未找到更多分页,已采集表格数:{found_tables}/3") break except Exception as e: log.info(f"分页异常:{str(e)}") handle_retry(driver) break return processed_months def random_sleep(base=2, variance=5): """智能随机等待""" sleep_time = base + random.random() * variance time.sleep(sleep_time) def handle_retry(driver): """异常恢复处理""" try: driver.refresh() WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")) ) log.info("浏览器异常已恢复") except: log.info("需要人工干预的严重错误") raise def main(): """主入口(优化参数处理逻辑)""" parser = argparse.ArgumentParser(description='海关数据智能抓取系统') parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月') args = parser.parse_args() driver = webdriver.Firefox(options=configure_stealth_options(download_dir)) try: # 智能检测最新有效月份 valid_year, valid_month = detect_latest_month(driver) log.info(f"【广东海关】最新数据:{valid_year}年{valid_month:02d}月") # 生成目标序列 if args.year: # 指定年份时:从最新月到目标年1月 target_months = generate_month_sequence( start_year=valid_year, start_month=valid_month, end_year=args.year, skip_january=True ) else: # 未指定年份时:取最近两个月 target_months = generate_month_sequence(valid_year, valid_month) log.info(f"【广东海关】目标采集月份序列:{target_months}") reverse_crawler(driver, target_months) log.info(f"【广东海关】{len(target_months)}个月份数据已采集完毕") finally: driver.quit() log.info("\n数据清洗入库中...") if __name__ == "__main__": main()