123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import pandas as pd
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from utils.db_helper import DBHelper
- from quanguo.detail import parse_value
- from utils.constants import GUANGDONG_CITY
- from utils.log import log
- PROV_CODE = "440000"
- PROV_NAME = "广东省"
- db = DBHelper()
- def parse_region_table(driver, url, year, month, title):
- """第一阶段:按原始逻辑入库(1月是单月,其他月份是1-X月累计)"""
- log.info(f"开始解析{PROV_NAME} {year}年{month}月 {title}")
- data = parse_page_data(driver, url, year, month)
- if data:
- df = pd.DataFrame(data)
- db.bulk_insert(
- df,
- 't_yujin_crossborder_prov_region_trade',
- conflict_columns=['crossborder_year_month', 'city_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- log.info(f"{PROV_NAME} {year}年{month}月 {title}数据解析完成")
- def calculate_monthly_data(year, month):
- """第二阶段:计算并更新单月数据(适用于非1月)"""
- if month == 1:
- log.info("1月数据已经是单月数据,无需计算")
- return
- log.info(f"开始计算{PROV_NAME} {year}年{month}月单月数据...")
- # 获取当前月份累计数据
- current_month = f"{year}-{month:02d}"
- current_data = db.query(
- f"SELECT * FROM t_yujin_crossborder_prov_region_trade "
- f"WHERE crossborder_year_month = '{current_month}' and prov_code = '{PROV_CODE}'"
- )
- # 正确判断DataFrame是否为空
- if current_data.empty:
- log.warning(f"找不到上个月({current_month})的数据,无法计算单月数据")
- return
- # 获取上个月累计数据
- prev_month = month - 1
- prev_month_str = f"{year}-{prev_month:02d}"
- prev_data = db.query(
- f"SELECT city_code, monthly_total, monthly_import, monthly_export "
- f"FROM t_yujin_crossborder_prov_region_trade "
- f"WHERE crossborder_year_month = '{prev_month_str}' and prov_code = '{PROV_CODE}'"
- )
- if prev_data.empty:
- log.warning(f"找不到上个月({prev_month_str})的数据,无法计算单月数据")
- return
- # 转换为DataFrame并合并
- current_df = pd.DataFrame(current_data)
- prev_df = pd.DataFrame(prev_data)
- merged_df = pd.merge(
- current_df,
- prev_df,
- on='city_code',
- suffixes=('', '_prev')
- )
- # 计算单月数据
- merged_df['monthly_total'] = merged_df['monthly_total'] - merged_df['monthly_total_prev']
- merged_df['monthly_import'] = merged_df['monthly_import'] - merged_df['monthly_import_prev']
- merged_df['monthly_export'] = merged_df['monthly_export'] - merged_df['monthly_export_prev']
- # 将同比数据置为空
- merged_df['yoy_import_export'] = None
- merged_df['yoy_import'] = None
- merged_df['yoy_export'] = None
- # 只保留需要的列
- result_df = merged_df[[
- 'city_name', 'city_code', 'crossborder_year', 'crossborder_year_month',
- 'prov_code', 'prov_name', 'monthly_total', 'monthly_import',
- 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'
- ]]
- # 更新数据库
- db.bulk_insert(
- result_df,
- 't_yujin_crossborder_prov_region_trade',
- conflict_columns=['crossborder_year_month', 'city_code'],
- update_columns=['monthly_total', 'monthly_import', 'monthly_export',
- 'yoy_import_export', 'yoy_import', 'yoy_export']
- )
- log.info(f"{PROV_NAME} {year}年{month}月单月数据计算完成")
- def parse_page_data(driver, url, year, month):
- """解析页面数据(保持不变)"""
- data = []
- try:
- log.info(f"当前采集数据页面:{url}")
- driver.execute_script(f"window.open('{url}')")
- driver.switch_to.window(driver.window_handles[-1])
- table_xpath = '//table[contains(@style, "BORDER-COLLAPSE: collapse")]'
- WebDriverWait(driver, 60).until(
- EC.presence_of_element_located(
- (By.XPATH, table_xpath))
- )
- table = driver.find_element(By.XPATH, table_xpath)
- if not table:
- log.warning("未找到表格元素")
- return data
- rows = table.find_elements(By.TAG_NAME, 'tr')
- if len(rows) < 4:
- log.warning("表格行数不足")
- return data
- COLUMN_MAPPING = {
- 'city_name': 0,
- 'monthly_total': 1,
- 'yoy_import_export': 2,
- 'monthly_export': 5,
- 'yoy_export': 6,
- 'monthly_import': 9,
- 'yoy_import': 10
- }
- for row in rows[3:]:
- cols = [td.text.strip() for td in row.find_elements(By.TAG_NAME, 'td')]
- try:
- city_name = cols[COLUMN_MAPPING['city_name']]
- city_name = city_name.replace("广东省", "")
- city_code = GUANGDONG_CITY.get(city_name)
- if not city_code:
- log.debug(f"跳过未识别的城市: {city_name}")
- continue
- item = {
- 'city_name': city_name,
- 'city_code': city_code,
- 'crossborder_year': year,
- 'crossborder_year_month': f"{year}-{month:02d}",
- 'prov_code': PROV_CODE,
- 'prov_name': PROV_NAME,
- 'monthly_total': parse_value(cols[COLUMN_MAPPING['monthly_total']]),
- 'monthly_import': parse_value(cols[COLUMN_MAPPING['monthly_import']]),
- 'monthly_export': parse_value(cols[COLUMN_MAPPING['monthly_export']]),
- 'yoy_import_export': parse_value(cols[COLUMN_MAPPING['yoy_import_export']]),
- 'yoy_export': parse_value(cols[COLUMN_MAPPING['yoy_export']]),
- 'yoy_import': parse_value(cols[COLUMN_MAPPING['yoy_import']])
- }
- data.append(item)
- except Exception as e:
- log.error(f"解析行数据失败: {str(e)}")
- continue
- except Exception as e:
- log.error(f"解析页面失败:{str(e)}")
- raise
- finally:
- driver.close()
- driver.switch_to.window(driver.window_handles[0])
- return data
|