|
@@ -0,0 +1,440 @@
|
|
|
+import argparse
|
|
|
+import argparse
|
|
|
+import random
|
|
|
+import re
|
|
|
+import time
|
|
|
+from datetime import datetime, timedelta
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+from selenium import webdriver
|
|
|
+from selenium.common import TimeoutException
|
|
|
+from selenium.webdriver.common.by import By
|
|
|
+from selenium.webdriver.support import expected_conditions as EC
|
|
|
+from selenium.webdriver.support.ui import WebDriverWait
|
|
|
+
|
|
|
+from db_helper import DBHelper
|
|
|
+from utils.constants import DOWNLOAD_DIR, COUNTRY_CODE_MAPPING
|
|
|
+from utils.download_utils import configure_stealth_options, generate_month_sequence
|
|
|
+from utils.parse_utils import clean_county_name, convert_wan_to_yuan, clean_commodity_name
|
|
|
+
|
|
|
+# 基础配置
|
|
|
+
|
|
|
+
|
|
|
+BASE_URL = "http://gdfs.customs.gov.cn/guangdong_sub/zwgk62/sjgb59/6b4cdb3f-1.html"
|
|
|
+download_dir = DOWNLOAD_DIR / "guangdong"
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+PROV_CODE = "440000"
|
|
|
+PROV_NAME = "广东省"
|
|
|
+
|
|
|
+db = DBHelper()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def detect_latest_month(driver):
|
|
|
+ """三级回溯智能检测最新有效月份(修正年/月匹配逻辑)"""
|
|
|
+ driver.get(BASE_URL)
|
|
|
+ current_date = datetime.now()
|
|
|
+
|
|
|
+ for offset in range(0, 3):
|
|
|
+ check_date = current_date - timedelta(days=offset * 30)
|
|
|
+ check_year = check_date.year
|
|
|
+ check_month = check_date.month
|
|
|
+
|
|
|
+ # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许前后有空格
|
|
|
+ pattern = re.compile(
|
|
|
+ rf'(5){check_year}\s*年\s*(?:1至)?{check_month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)',
|
|
|
+ re.IGNORECASE
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 使用 Python 端的正则匹配所有含“广东省”的链接 title
|
|
|
+ elements = WebDriverWait(driver, 10).until(
|
|
|
+ EC.presence_of_all_elements_located((By.XPATH, '//a[contains(@title,"广东省")]'))
|
|
|
+ )
|
|
|
+
|
|
|
+ for element in elements:
|
|
|
+ title = element.get_attribute("title")
|
|
|
+ if pattern.search(title):
|
|
|
+ print(f"已找到最新月份数据 {check_year}-{check_month}")
|
|
|
+ return check_year, check_month
|
|
|
+
|
|
|
+ print(f"未找到匹配项(正则:{pattern.pattern})")
|
|
|
+ except TimeoutException:
|
|
|
+ print(f"页面加载超时或无匹配项({check_year}-{check_month})")
|
|
|
+ continue
|
|
|
+
|
|
|
+ raise Exception("三个月内未找到有效数据")
|
|
|
+
|
|
|
+
|
|
|
+def process_month_data(driver, year, month):
|
|
|
+ """处理月度数据:支持三种表格类型"""
|
|
|
+ patterns = [
|
|
|
+ (re.compile(rf'(5){year}\s*年\s*(1-)?{month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)'), 'country'),
|
|
|
+ (re.compile(rf'(6){year}\s*年\s*(1-)?{month}\s*月广东省出口重点商品总值表(人民币值)'), 'export_commodity'),
|
|
|
+ (re.compile(rf'(7){year}\s*年\s*(1-)?{month}\s*月广东省进口重点商品总值表(人民币值)'), 'import_commodity')
|
|
|
+ ]
|
|
|
+
|
|
|
+ found_count = 0
|
|
|
+ commodity_data = {'export': [], 'import': []} # 存储商品数据等待合并
|
|
|
+ links = driver.find_elements(By.XPATH, '//a[contains(@title,"广东省")]')
|
|
|
+
|
|
|
+ for link in links:
|
|
|
+ title = link.get_attribute("title")
|
|
|
+ for pattern, table_type in patterns:
|
|
|
+ if pattern.search(title):
|
|
|
+ print(f"处理表格: {title}")
|
|
|
+ url = link.get_attribute("href")
|
|
|
+
|
|
|
+ # 新标签页处理
|
|
|
+ driver.execute_script("window.open(arguments[0]);", url)
|
|
|
+ driver.switch_to.window(driver.window_handles[-1])
|
|
|
+ try:
|
|
|
+ WebDriverWait(driver, 15).until(
|
|
|
+ EC.presence_of_element_located((By.XPATH, "//table[@border='1']"))
|
|
|
+ )
|
|
|
+
|
|
|
+ # 根据表格类型处理数据
|
|
|
+ if table_type == 'country':
|
|
|
+ data = parse_country_table(driver, year, month)
|
|
|
+ df_country = pd.DataFrame(data)
|
|
|
+ db.bulk_insert(
|
|
|
+ df_country,
|
|
|
+ 't_yujin_crossborder_prov_country_trade',
|
|
|
+ conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
|
|
|
+ update_columns=['monthly_total', 'monthly_import', 'monthly_export',
|
|
|
+ 'yoy_import_export', 'yoy_import', 'yoy_export']
|
|
|
+ )
|
|
|
+
|
|
|
+ found_count += 1
|
|
|
+ else:
|
|
|
+ data_type = 'export' if table_type == 'export_commodity' else 'import'
|
|
|
+ commodity_data[data_type] = parse_commodity_table(driver, data_type, year, month)
|
|
|
+ found_count += 1
|
|
|
+ except Exception as e:
|
|
|
+ print(f"表格处理失败: {e}")
|
|
|
+
|
|
|
+ # 将数据返回,而不是在内部合并
|
|
|
+ return found_count, commodity_data
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def parse_country_table(driver, year, month):
|
|
|
+ """解析目标页面的表格数据"""
|
|
|
+ data = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 等待表格加载
|
|
|
+ WebDriverWait(driver, 15).until(
|
|
|
+ EC.presence_of_element_located((By.XPATH, "//table[@border='1']"))
|
|
|
+ )
|
|
|
+ table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']")
|
|
|
+ rows = table.find_elements(By.TAG_NAME, 'tr')
|
|
|
+
|
|
|
+ # 检测表格的列数
|
|
|
+ header_row = rows[2] # 假设表头在第3行
|
|
|
+ header_row.find_elements(By.TAG_NAME, 'td')
|
|
|
+
|
|
|
+
|
|
|
+ # 数据行从第4行开始(跳过表头)
|
|
|
+ for row in rows[4:]:
|
|
|
+ cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')]
|
|
|
+ country_name = cols[0]
|
|
|
+
|
|
|
+ if (country_name == '广东外贸总值' or
|
|
|
+ country_name == '东盟' or
|
|
|
+ country_name == '欧盟' or
|
|
|
+ country_name == '总计' or
|
|
|
+ country_name == '广东外贸总额' or
|
|
|
+ country_name == '广东外贸总计' or
|
|
|
+ country_name == '总值'
|
|
|
+ ):
|
|
|
+ continue
|
|
|
+
|
|
|
+ if month == 2:
|
|
|
+ # 处理合并后的1月和2月数据
|
|
|
+ monthly_total = convert_wan_to_yuan(cols[1])
|
|
|
+ monthly_export = convert_wan_to_yuan(cols[3])
|
|
|
+ monthly_import = convert_wan_to_yuan(cols[5])
|
|
|
+
|
|
|
+ # 将2月的数据除以2,并生成1月和2月的数据
|
|
|
+ for m in [1, 2]:
|
|
|
+ adjusted_monthly_total = monthly_total / 2
|
|
|
+ adjusted_monthly_export = monthly_export / 2
|
|
|
+ adjusted_monthly_import = monthly_import / 2
|
|
|
+ adjusted_yoy_total = 0
|
|
|
+ adjusted_yoy_export = 0
|
|
|
+ adjusted_yoy_import = 0
|
|
|
+
|
|
|
+ country_name_clean = clean_county_name(country_name)
|
|
|
+ country_code = COUNTRY_CODE_MAPPING.get(country_name_clean)
|
|
|
+
|
|
|
+ data.append({
|
|
|
+ 'crossborder_year': year,
|
|
|
+ 'crossborder_year_month': f"{year}-{m:02d}",
|
|
|
+ 'prov_code': PROV_CODE,
|
|
|
+ 'prov_name': PROV_NAME,
|
|
|
+ 'country_code': country_code,
|
|
|
+ 'country_name': country_name_clean,
|
|
|
+ 'monthly_total': adjusted_monthly_total,
|
|
|
+ 'monthly_export': adjusted_monthly_export,
|
|
|
+ 'monthly_import': adjusted_monthly_import,
|
|
|
+ 'yoy_import_export': adjusted_yoy_total,
|
|
|
+ 'yoy_export': adjusted_yoy_export,
|
|
|
+ 'yoy_import': adjusted_yoy_import
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ # 原逻辑处理13列的情况
|
|
|
+ monthly_total = convert_wan_to_yuan(cols[3])
|
|
|
+ monthly_export = convert_wan_to_yuan(cols[7])
|
|
|
+ monthly_import = convert_wan_to_yuan(cols[11])
|
|
|
+ yoy_total = parse_number(cols[4])
|
|
|
+ yoy_export = parse_number(cols[8])
|
|
|
+ yoy_import = parse_number(cols[12])
|
|
|
+
|
|
|
+ country_name_clean = clean_county_name(country_name)
|
|
|
+ country_code = COUNTRY_CODE_MAPPING.get(country_name_clean)
|
|
|
+
|
|
|
+ data.append({
|
|
|
+ 'crossborder_year': year,
|
|
|
+ 'crossborder_year_month': f"{year}-{month:02d}",
|
|
|
+ 'prov_code': PROV_CODE,
|
|
|
+ 'prov_name': PROV_NAME,
|
|
|
+ 'country_code': country_code,
|
|
|
+ 'country_name': country_name_clean,
|
|
|
+ 'monthly_total': monthly_total,
|
|
|
+ 'monthly_export': monthly_export,
|
|
|
+ 'monthly_import': monthly_import,
|
|
|
+ 'yoy_import_export': yoy_total,
|
|
|
+ 'yoy_export': yoy_export,
|
|
|
+ 'yoy_import': yoy_import
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"解析表格失败: {e}")
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ driver.close()
|
|
|
+ driver.switch_to.window(driver.window_handles[0])
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def parse_commodity_table(driver, data_type, year, month):
|
|
|
+ """解析商品表通用函数"""
|
|
|
+ data = []
|
|
|
+ try:
|
|
|
+ table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']")
|
|
|
+ rows = table.find_elements(By.TAG_NAME, 'tr')
|
|
|
+
|
|
|
+ # 检测表格的列数
|
|
|
+ header_row = rows[2] # 假设表头在第3行
|
|
|
+ cols = header_row.find_elements(By.TAG_NAME, 'td')
|
|
|
+ num_cols = len(cols)
|
|
|
+
|
|
|
+ # 数据行从第4行开始(跳过表头)
|
|
|
+ for row in rows[4:]:
|
|
|
+ cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')]
|
|
|
+ if len(cols) < 3:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 清洗商品名称(处理 和空格)
|
|
|
+ name = clean_commodity_name(cols[0])
|
|
|
+
|
|
|
+ if month == 2:
|
|
|
+ # 处理合并后的1月和2月数据
|
|
|
+ value = convert_wan_to_yuan(cols[1])
|
|
|
+
|
|
|
+
|
|
|
+ # 将2月的数据除以2,并生成1月和2月的数据
|
|
|
+ for m in [1, 2]:
|
|
|
+ adjusted_value = value / 2
|
|
|
+ adjusted_yoy = 0 # 同比置为0
|
|
|
+
|
|
|
+ data.append({
|
|
|
+ 'commodity_name': name,
|
|
|
+ 'commodity_code': db.get_commodity_id(name),
|
|
|
+ 'monthly_export' if data_type == 'export' else 'monthly_import': adjusted_value,
|
|
|
+ 'crossborder_year_month': f"{year}-{m:02d}"
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ # 原逻辑处理5列的情况
|
|
|
+ value = convert_wan_to_yuan(cols[3] if data_type == 'export' else cols[3])
|
|
|
+
|
|
|
+
|
|
|
+ data.append({
|
|
|
+ 'commodity_name': name,
|
|
|
+ 'commodity_code': db.get_commodity_id(name),
|
|
|
+ 'monthly_export' if data_type == 'export' else 'monthly_import': value,
|
|
|
+ 'crossborder_year_month': f"{year}-{month:02d}"
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"解析商品表失败: {e}")
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ driver.close()
|
|
|
+ driver.switch_to.window(driver.window_handles[0])
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+def merge_commodity_data(import_data, export_data, year, month):
|
|
|
+ """
|
|
|
+ 根据commodity_code合并进出口数据(支持不同商品的存在情况)
|
|
|
+ :param year:
|
|
|
+ :param month:
|
|
|
+ :param import_data: 进口数据列表(含commodity_code)
|
|
|
+ :param export_data: 出口数据列表(含commodity_code)
|
|
|
+ :return: 合并后的DataFrame
|
|
|
+ """
|
|
|
+ # 转换数据为DataFrame
|
|
|
+ df_import = pd.DataFrame(import_data)
|
|
|
+ df_export = pd.DataFrame(export_data)
|
|
|
+
|
|
|
+ # 合并逻辑(全外连接保留所有商品)
|
|
|
+ merged_df = pd.merge(
|
|
|
+ df_import,
|
|
|
+ df_export,
|
|
|
+ on=['commodity_code', 'commodity_name', 'crossborder_year_month'],
|
|
|
+ how='outer'
|
|
|
+ ).fillna(0)
|
|
|
+
|
|
|
+ # 计算总量(可选,根据表结构需求)
|
|
|
+ merged_df['monthly_total'] = merged_df['monthly_import'] + merged_df['monthly_export']
|
|
|
+ merged_df['crossborder_year'] = year
|
|
|
+ merged_df['crossborder_year_month'] = f"{year}-{month:02d}"
|
|
|
+ merged_df['prov_code'] = PROV_CODE
|
|
|
+ merged_df['prov_name'] = PROV_NAME
|
|
|
+
|
|
|
+ return merged_df
|
|
|
+
|
|
|
+def parse_number(text):
|
|
|
+ """转换文本为浮点数(处理空值、负号)"""
|
|
|
+ text = text.strip().replace(',', '')
|
|
|
+ if not text or text == '-':
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ return float(text)
|
|
|
+ except ValueError:
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+# 优化后的代码逻辑:
|
|
|
+
|
|
|
+def reverse_crawler(driver, target_months):
|
|
|
+ """逆向分页抓取核心逻辑"""
|
|
|
+ processed_months = set()
|
|
|
+ page = 1
|
|
|
+ for year, month in target_months:
|
|
|
+ print(f"\n开始处理 {year}年{month}月数据".center(50, "="))
|
|
|
+ WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")))
|
|
|
+
|
|
|
+ current_page = 1
|
|
|
+ found_tables = 0
|
|
|
+ export_data = []
|
|
|
+ import_data = []
|
|
|
+
|
|
|
+ while True:
|
|
|
+ random_sleep(base=2, variance=3)
|
|
|
+ try:
|
|
|
+ print(f"当前页面:{driver.current_url}, 第{page}页")
|
|
|
+ found, commodity_data = process_month_data(driver, year, month)
|
|
|
+ found_tables += found
|
|
|
+
|
|
|
+ # 累积商品数据
|
|
|
+ if commodity_data['export']:
|
|
|
+ export_data.extend(commodity_data['export'])
|
|
|
+ if commodity_data['import']:
|
|
|
+ import_data.extend(commodity_data['import'])
|
|
|
+
|
|
|
+ # 完成三个表格采集
|
|
|
+ if found_tables >= 3:
|
|
|
+ # 确保同时有进口和出口数据
|
|
|
+ if export_data and import_data:
|
|
|
+ final_df = merge_commodity_data(export_data, import_data, year, month)
|
|
|
+ db.bulk_insert(df=final_df, table_name='t_yujin_crossborder_prov_commodity_trade',
|
|
|
+ conflict_columns=['commodity_code', 'crossborder_year_month'],
|
|
|
+ update_columns=['monthly_import', 'monthly_export', 'monthly_total'])
|
|
|
+ print(f"已完成{year}年{month}月全部表格采集")
|
|
|
+ processed_months.add((year, month))
|
|
|
+ break
|
|
|
+
|
|
|
+ print(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集")
|
|
|
+
|
|
|
+ # 分页点击逻辑
|
|
|
+ WebDriverWait(driver, 15).until(
|
|
|
+ EC.element_to_be_clickable((By.XPATH, '//a[@class="pagingNormal next"]'))
|
|
|
+ ).click()
|
|
|
+
|
|
|
+ current_page += 1
|
|
|
+ page += 1
|
|
|
+
|
|
|
+ except TimeoutException:
|
|
|
+ print(f"未找到更多分页,已采集表格数:{found_tables}/3")
|
|
|
+ break
|
|
|
+ except Exception as e:
|
|
|
+ print(f"分页异常:{str(e)}")
|
|
|
+ handle_retry(driver)
|
|
|
+ break
|
|
|
+
|
|
|
+ return processed_months
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def random_sleep(base=2, variance=5):
|
|
|
+ """智能随机等待"""
|
|
|
+ sleep_time = base + random.random() * variance
|
|
|
+ time.sleep(sleep_time)
|
|
|
+
|
|
|
+def handle_retry(driver):
|
|
|
+ """异常恢复处理"""
|
|
|
+ try:
|
|
|
+ driver.refresh()
|
|
|
+ WebDriverWait(driver, 15).until(
|
|
|
+ EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
|
|
|
+ )
|
|
|
+ print("浏览器异常已恢复")
|
|
|
+ except:
|
|
|
+ print("需要人工干预的严重错误")
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主入口(优化参数处理逻辑)"""
|
|
|
+ parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
|
|
|
+ parser.add_argument('--year', type=int, default=None,
|
|
|
+ help='终止年份(如2023),未指定时抓取最新两个月')
|
|
|
+ args = parser.parse_args()
|
|
|
+ driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
|
|
|
+ try:
|
|
|
+ # 智能检测最新有效月份
|
|
|
+ valid_year, valid_month = detect_latest_month(driver)
|
|
|
+ print(f"检测到最新有效数据:{valid_year}年{valid_month:02d}月")
|
|
|
+
|
|
|
+ # 生成目标序列
|
|
|
+ if args.year:
|
|
|
+ # 指定年份时:从最新月到目标年1月
|
|
|
+ target_months = generate_month_sequence(
|
|
|
+ start_year=valid_year,
|
|
|
+ start_month=valid_month,
|
|
|
+ end_year=args.year,
|
|
|
+ skip_january=True
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 未指定年份时:取最近两个月
|
|
|
+ target_months = generate_month_sequence(valid_year, valid_month)
|
|
|
+
|
|
|
+ print(f"目标采集月份序列:{target_months}")
|
|
|
+ reverse_crawler(driver, target_months)
|
|
|
+ print(f"{len(target_months)}个月份数据已采集完毕")
|
|
|
+
|
|
|
+ finally:
|
|
|
+ driver.quit()
|
|
|
+ print("\n数据清洗入库中...")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|