import pandas as pd from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from utils.db_helper import DBHelper from quanguo.detail import parse_value from utils.constants import GUANGDONG_CITY from utils.log import log PROV_CODE = "440000" PROV_NAME = "广东省" db = DBHelper() def parse_region_table(driver, url, year, month, title): """第一阶段:按原始逻辑入库(1月是单月,其他月份是1-X月累计)""" log.info(f"开始解析{PROV_NAME} {year}年{month}月 {title}") data = parse_page_data(driver, url, year, month) if data: df = pd.DataFrame(data) db.bulk_insert( df, 't_yujin_crossborder_prov_region_trade', conflict_columns=['crossborder_year_month', 'city_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) log.info(f"{PROV_NAME} {year}年{month}月 {title}数据解析完成") def calculate_monthly_data(year, month): """第二阶段:计算并更新单月数据(适用于非1月)""" if month == 1: log.info("1月数据已经是单月数据,无需计算") return log.info(f"开始计算{PROV_NAME} {year}年{month}月单月数据...") # 获取当前月份累计数据 current_month = f"{year}-{month:02d}" current_data = db.query( f"SELECT * FROM t_yujin_crossborder_prov_region_trade " f"WHERE crossborder_year_month = '{current_month}' and prov_code = '{PROV_CODE}'" ) # 正确判断DataFrame是否为空 if current_data.empty: log.warning(f"找不到上个月({current_month})的数据,无法计算单月数据") return # 获取上个月累计数据 prev_month = month - 1 prev_month_str = f"{year}-{prev_month:02d}" prev_data = db.query( f"SELECT city_code, monthly_total, monthly_import, monthly_export " f"FROM t_yujin_crossborder_prov_region_trade " f"WHERE crossborder_year_month = '{prev_month_str}' and prov_code = '{PROV_CODE}'" ) if prev_data.empty: log.warning(f"找不到上个月({prev_month_str})的数据,无法计算单月数据") return # 转换为DataFrame并合并 current_df = pd.DataFrame(current_data) prev_df = pd.DataFrame(prev_data) merged_df = pd.merge( current_df, prev_df, on='city_code', suffixes=('', '_prev') ) # 计算单月数据 merged_df['monthly_total'] = merged_df['monthly_total'] - merged_df['monthly_total_prev'] merged_df['monthly_import'] = merged_df['monthly_import'] - merged_df['monthly_import_prev'] merged_df['monthly_export'] = merged_df['monthly_export'] - merged_df['monthly_export_prev'] # 将同比数据置为空 merged_df['yoy_import_export'] = None merged_df['yoy_import'] = None merged_df['yoy_export'] = None # 只保留需要的列 result_df = merged_df[[ 'city_name', 'city_code', 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name', 'monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export' ]] # 更新数据库 db.bulk_insert( result_df, 't_yujin_crossborder_prov_region_trade', conflict_columns=['crossborder_year_month', 'city_code'], update_columns=['monthly_total', 'monthly_import', 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'] ) log.info(f"{PROV_NAME} {year}年{month}月单月数据计算完成") def parse_page_data(driver, url, year, month): """解析页面数据(保持不变)""" data = [] try: log.info(f"当前采集数据页面:{url}") driver.execute_script(f"window.open('{url}')") driver.switch_to.window(driver.window_handles[-1]) table_xpath = '//table[contains(@style, "BORDER-COLLAPSE: collapse")]' WebDriverWait(driver, 60).until( EC.presence_of_element_located( (By.XPATH, table_xpath)) ) table = driver.find_element(By.XPATH, table_xpath) if not table: log.warning("未找到表格元素") return data rows = table.find_elements(By.TAG_NAME, 'tr') if len(rows) < 4: log.warning("表格行数不足") return data COLUMN_MAPPING = { 'city_name': 0, 'monthly_total': 1, 'yoy_import_export': 2, 'monthly_export': 5, 'yoy_export': 6, 'monthly_import': 9, 'yoy_import': 10 } for row in rows[3:]: cols = [td.text.strip() for td in row.find_elements(By.TAG_NAME, 'td')] try: city_name = cols[COLUMN_MAPPING['city_name']] city_name = city_name.replace("广东省", "") city_code = GUANGDONG_CITY.get(city_name) if not city_code: log.debug(f"跳过未识别的城市: {city_name}") continue item = { 'city_name': city_name, 'city_code': city_code, 'crossborder_year': year, 'crossborder_year_month': f"{year}-{month:02d}", 'prov_code': PROV_CODE, 'prov_name': PROV_NAME, 'monthly_total': parse_value(cols[COLUMN_MAPPING['monthly_total']]), 'monthly_import': parse_value(cols[COLUMN_MAPPING['monthly_import']]), 'monthly_export': parse_value(cols[COLUMN_MAPPING['monthly_export']]), 'yoy_import_export': parse_value(cols[COLUMN_MAPPING['yoy_import_export']]), 'yoy_export': parse_value(cols[COLUMN_MAPPING['yoy_export']]), 'yoy_import': parse_value(cols[COLUMN_MAPPING['yoy_import']]) } data.append(item) except Exception as e: log.error(f"解析行数据失败: {str(e)}") continue except Exception as e: log.error(f"解析页面失败:{str(e)}") raise finally: driver.close() driver.switch_to.window(driver.window_handles[0]) return data