|
@@ -1,3 +1,5 @@
|
|
|
+import re
|
|
|
+
|
|
|
import pandas as pd
|
|
|
from selenium.webdriver.common.by import By
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
@@ -14,11 +16,22 @@ PROV_NAME = "广东省"
|
|
|
|
|
|
db = DBHelper()
|
|
|
|
|
|
+"""
|
|
|
+2023年9月 中山市数据,缺少城市数据列,需要特殊兼容
|
|
|
+个别月份数据,表头行数不一致,因此这里起始行数据,需要动态解析
|
|
|
+"""
|
|
|
+
|
|
|
+
|
|
|
+def parse_page_region_data(driver, url, year, month, title):
|
|
|
+ """第一阶段:按原始逻辑入库,增加标题中提取城市功能"""
|
|
|
+ # 先从标题中提取城市名称
|
|
|
+ page_city = extract_city_from_title(title)
|
|
|
|
|
|
-def parse_region_table(driver, url, year, month, title):
|
|
|
- """第一阶段:按原始逻辑入库(1月是单月,其他月份是1-X月累计)"""
|
|
|
- log.info(f"开始解析{PROV_NAME} {year}年{month}月 {title}")
|
|
|
- data = parse_page_data(driver, url, year, month)
|
|
|
+ if not page_city:
|
|
|
+ log.warning(f"标题中未识别到城市: {title}")
|
|
|
+
|
|
|
+ log.info(f"开始解析{PROV_NAME} {year}年{month}月 {title} ({page_city if page_city else '城市未知'})")
|
|
|
+ data = parse_page_data(driver, url, year, month, title, page_city)
|
|
|
|
|
|
if data:
|
|
|
df = pd.DataFrame(data)
|
|
@@ -32,6 +45,201 @@ def parse_region_table(driver, url, year, month, title):
|
|
|
log.info(f"{PROV_NAME} {year}年{month}月 {title}数据解析完成")
|
|
|
|
|
|
|
|
|
+def extract_city_from_title(title):
|
|
|
+ """从标题中提取城市名称"""
|
|
|
+ city_pattern = r"(中山市|珠海市)"
|
|
|
+ match = re.search(city_pattern, title)
|
|
|
+ if match:
|
|
|
+ return match.group(1)
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def parse_page_data(driver, url, year, month, title, page_city=None):
|
|
|
+ """解析页面数据,使用标题中的城市信息"""
|
|
|
+ data = []
|
|
|
+ try:
|
|
|
+ # 如果未传入城市名称,尝试从标题中提取
|
|
|
+ if not page_city:
|
|
|
+ page_city = extract_city_from_title(title)
|
|
|
+
|
|
|
+ log.info(f"当前采集数据页面:{url} [城市: {page_city if page_city else '未知'}]")
|
|
|
+ driver.execute_script(f"window.open('{url}')")
|
|
|
+ driver.switch_to.window(driver.window_handles[-1])
|
|
|
+
|
|
|
+ table_xpath = '//table[contains(@style, "BORDER-COLLAPSE: collapse")]'
|
|
|
+
|
|
|
+ WebDriverWait(driver, 60).until(
|
|
|
+ EC.presence_of_element_located(
|
|
|
+ (By.XPATH, table_xpath))
|
|
|
+ )
|
|
|
+
|
|
|
+ table = driver.find_element(By.XPATH, table_xpath)
|
|
|
+
|
|
|
+ if not table:
|
|
|
+ log.warning("未找到表格元素")
|
|
|
+ return data
|
|
|
+
|
|
|
+ rows = table.find_elements(By.TAG_NAME, 'tr')
|
|
|
+ if len(rows) < 4:
|
|
|
+ log.warning("表格行数不足")
|
|
|
+ return data
|
|
|
+
|
|
|
+ # 智能识别表头行
|
|
|
+ data_start_row = find_data_start_row(rows)
|
|
|
+ if data_start_row < 0:
|
|
|
+ log.warning("未找到数据起始行")
|
|
|
+ return data
|
|
|
+
|
|
|
+ # 定义基准列映射
|
|
|
+ BASE_COLUMN_MAPPING = {
|
|
|
+ 'monthly_total': 1,
|
|
|
+ 'yoy_import_export': 2,
|
|
|
+ 'monthly_export': 5,
|
|
|
+ 'yoy_export': 6,
|
|
|
+ 'monthly_import': 9,
|
|
|
+ 'yoy_import': 10
|
|
|
+ }
|
|
|
+
|
|
|
+ for row in rows[data_start_row:]:
|
|
|
+ cols = [td.text.strip() for td in row.find_elements(By.TAG_NAME, 'td')]
|
|
|
+
|
|
|
+ # 确定城市名称:
|
|
|
+ # 1. 优先从行中查找
|
|
|
+ # 2. 使用页面级城市名称 (从标题获取)
|
|
|
+ city_name = find_city_in_row(cols)
|
|
|
+
|
|
|
+ if not city_name and page_city:
|
|
|
+ # 如果行内找不到城市但标题中有城市信息,使用标题中的城市
|
|
|
+ city_name = page_city
|
|
|
+
|
|
|
+ if not city_name:
|
|
|
+ log.debug("无法识别城市名称,跳过此行")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 确定城市代码
|
|
|
+ city_code = GUANGDONG_CITY.get(city_name)
|
|
|
+ if not city_code:
|
|
|
+ log.debug(f"跳过未识别的城市: {city_name}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 确定列映射
|
|
|
+ column_mapping = detect_column_mapping(cols, BASE_COLUMN_MAPPING, city_name)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 从映射的列中提取数据
|
|
|
+ monthly_total = get_value_safely(cols, column_mapping.get('monthly_total'))
|
|
|
+ monthly_import = get_value_safely(cols, column_mapping.get('monthly_import'))
|
|
|
+ monthly_export = get_value_safely(cols, column_mapping.get('monthly_export'))
|
|
|
+ yoy_import_export = get_value_safely(cols, column_mapping.get('yoy_import_export'))
|
|
|
+ yoy_export = get_value_safely(cols, column_mapping.get('yoy_export'))
|
|
|
+ yoy_import = get_value_safely(cols, column_mapping.get('yoy_import'))
|
|
|
+
|
|
|
+ item = {
|
|
|
+ 'city_name': city_name,
|
|
|
+ 'city_code': city_code,
|
|
|
+ 'crossborder_year': year,
|
|
|
+ 'crossborder_year_month': f"{year}-{month:02d}",
|
|
|
+ 'prov_code': PROV_CODE,
|
|
|
+ 'prov_name': PROV_NAME,
|
|
|
+ 'monthly_total': parse_value(monthly_total),
|
|
|
+ 'monthly_import': parse_value(monthly_import),
|
|
|
+ 'monthly_export': parse_value(monthly_export),
|
|
|
+ 'yoy_import_export': parse_value(yoy_import_export),
|
|
|
+ 'yoy_export': parse_value(yoy_export),
|
|
|
+ 'yoy_import': parse_value(yoy_import)
|
|
|
+ }
|
|
|
+
|
|
|
+ data.append(item)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"解析行数据失败: {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"解析页面失败:{str(e)}")
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ driver.close()
|
|
|
+ driver.switch_to.window(driver.window_handles[0])
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+def find_city_in_row(cols):
|
|
|
+ """在表格行的列中查找城市名称"""
|
|
|
+ # 检查列中是否直接包含城市名称
|
|
|
+ for col in cols:
|
|
|
+ if "中山市" in col:
|
|
|
+ return "中山市"
|
|
|
+ if "珠海市" in col:
|
|
|
+ return "珠海市"
|
|
|
+
|
|
|
+ # 检查是否有类似"中山"或"珠海"的缩写
|
|
|
+ for col in cols:
|
|
|
+ if re.search(r"^中山$", col):
|
|
|
+ return "中山市"
|
|
|
+ if re.search(r"^珠海$", col):
|
|
|
+ return "珠海市"
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def detect_column_mapping(cols, base_mapping, city_name):
|
|
|
+ """检测列映射关系,考虑城市名称位置"""
|
|
|
+ # 首先尝试查找城市名称在行中的位置
|
|
|
+ city_index = None
|
|
|
+ for i, col in enumerate(cols):
|
|
|
+ if city_name in col:
|
|
|
+ city_index = i
|
|
|
+ break
|
|
|
+
|
|
|
+ if city_index is None:
|
|
|
+ # 如果没有找到城市名称,使用基准映射
|
|
|
+ return base_mapping
|
|
|
+
|
|
|
+ if city_index == 0:
|
|
|
+ # 城市在第0列 - 基准情况
|
|
|
+ return base_mapping
|
|
|
+ else:
|
|
|
+ # 城市在其他列 - 创建偏移映射
|
|
|
+ offset = city_index # 因为基准映射中city_name在第0列
|
|
|
+ return {
|
|
|
+ 'monthly_total': base_mapping['monthly_total'] + offset - 1,
|
|
|
+ 'yoy_import_export': base_mapping['yoy_import_export'] + offset - 1,
|
|
|
+ 'monthly_export': base_mapping['monthly_export'] + offset - 1,
|
|
|
+ 'yoy_export': base_mapping['yoy_export'] + offset - 1,
|
|
|
+ 'monthly_import': base_mapping['monthly_import'] + offset - 1,
|
|
|
+ 'yoy_import': base_mapping['yoy_import'] + offset - 1
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def find_data_start_row(rows):
|
|
|
+ """智能识别数据起始行"""
|
|
|
+ # 定义关键词用于识别表头行
|
|
|
+ header_keywords = ["人民币"]
|
|
|
+
|
|
|
+ # 检查表头特征行
|
|
|
+ for i, row in enumerate(rows):
|
|
|
+ # 获取行文本
|
|
|
+ row_text = "".join([td.text.strip() for td in row.find_elements(By.TAG_NAME, 'td')])
|
|
|
+
|
|
|
+ # 检查是否包含关键词
|
|
|
+ if any(keyword in row_text for keyword in header_keywords):
|
|
|
+ log.debug(f"在行 {i + 1} 找到表头行: {row_text}")
|
|
|
+ # 返回下一行作为数据起始行
|
|
|
+ if i + 1 < len(rows):
|
|
|
+ return i + 1
|
|
|
+ log.warning("无法识别数据起始行")
|
|
|
+ return -1
|
|
|
+def get_value_safely(cols, index):
|
|
|
+ """安全获取列值,避免索引超出范围"""
|
|
|
+ if index is None or not isinstance(index, int):
|
|
|
+ return None
|
|
|
+
|
|
|
+ if 0 <= index < len(cols):
|
|
|
+ return cols[index]
|
|
|
+ return None
|
|
|
+
|
|
|
def calculate_monthly_data(year, month):
|
|
|
"""第二阶段:计算并更新单月数据(适用于非1月)"""
|
|
|
if month == 1:
|
|
@@ -105,79 +313,3 @@ def calculate_monthly_data(year, month):
|
|
|
log.info(f"{PROV_NAME} {year}年{month}月单月数据计算完成")
|
|
|
|
|
|
|
|
|
-def parse_page_data(driver, url, year, month):
|
|
|
- """解析页面数据(保持不变)"""
|
|
|
- data = []
|
|
|
- try:
|
|
|
- log.info(f"当前采集数据页面:{url}")
|
|
|
- driver.execute_script(f"window.open('{url}')")
|
|
|
- driver.switch_to.window(driver.window_handles[-1])
|
|
|
-
|
|
|
- table_xpath = '//table[contains(@style, "BORDER-COLLAPSE: collapse")]'
|
|
|
-
|
|
|
- WebDriverWait(driver, 60).until(
|
|
|
- EC.presence_of_element_located(
|
|
|
- (By.XPATH, table_xpath))
|
|
|
- )
|
|
|
-
|
|
|
- table = driver.find_element(By.XPATH, table_xpath)
|
|
|
-
|
|
|
- if not table:
|
|
|
- log.warning("未找到表格元素")
|
|
|
- return data
|
|
|
-
|
|
|
- rows = table.find_elements(By.TAG_NAME, 'tr')
|
|
|
- if len(rows) < 4:
|
|
|
- log.warning("表格行数不足")
|
|
|
- return data
|
|
|
-
|
|
|
- COLUMN_MAPPING = {
|
|
|
- 'city_name': 0,
|
|
|
- 'monthly_total': 1,
|
|
|
- 'yoy_import_export': 2,
|
|
|
- 'monthly_export': 5,
|
|
|
- 'yoy_export': 6,
|
|
|
- 'monthly_import': 9,
|
|
|
- 'yoy_import': 10
|
|
|
- }
|
|
|
-
|
|
|
- for row in rows[3:]:
|
|
|
- cols = [td.text.strip() for td in row.find_elements(By.TAG_NAME, 'td')]
|
|
|
-
|
|
|
- try:
|
|
|
- city_name = cols[COLUMN_MAPPING['city_name']]
|
|
|
- city_name = city_name.replace("广东省", "")
|
|
|
- city_code = GUANGDONG_CITY.get(city_name)
|
|
|
- if not city_code:
|
|
|
- log.debug(f"跳过未识别的城市: {city_name}")
|
|
|
- continue
|
|
|
-
|
|
|
- item = {
|
|
|
- 'city_name': city_name,
|
|
|
- 'city_code': city_code,
|
|
|
- 'crossborder_year': year,
|
|
|
- 'crossborder_year_month': f"{year}-{month:02d}",
|
|
|
- 'prov_code': PROV_CODE,
|
|
|
- 'prov_name': PROV_NAME,
|
|
|
- 'monthly_total': parse_value(cols[COLUMN_MAPPING['monthly_total']]),
|
|
|
- 'monthly_import': parse_value(cols[COLUMN_MAPPING['monthly_import']]),
|
|
|
- 'monthly_export': parse_value(cols[COLUMN_MAPPING['monthly_export']]),
|
|
|
- 'yoy_import_export': parse_value(cols[COLUMN_MAPPING['yoy_import_export']]),
|
|
|
- 'yoy_export': parse_value(cols[COLUMN_MAPPING['yoy_export']]),
|
|
|
- 'yoy_import': parse_value(cols[COLUMN_MAPPING['yoy_import']])
|
|
|
- }
|
|
|
-
|
|
|
- data.append(item)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- log.error(f"解析行数据失败: {str(e)}")
|
|
|
- continue
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- log.error(f"解析页面失败:{str(e)}")
|
|
|
- raise
|
|
|
- finally:
|
|
|
- driver.close()
|
|
|
- driver.switch_to.window(driver.window_handles[0])
|
|
|
-
|
|
|
- return data
|