guangdong_gongbei_parse_excel.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import pandas as pd
  2. from selenium.webdriver.common.by import By
  3. from selenium.webdriver.support import expected_conditions as EC
  4. from selenium.webdriver.support.ui import WebDriverWait
  5. from utils.db_helper import DBHelper
  6. from quanguo.detail import parse_value
  7. from utils.constants import GUANGDONG_CITY
  8. from utils.log import log
  9. PROV_CODE = "440000"
  10. PROV_NAME = "广东省"
  11. db = DBHelper()
  12. def parse_region_table(driver, url, year, month, title):
  13. """第一阶段:按原始逻辑入库(1月是单月,其他月份是1-X月累计)"""
  14. log.info(f"开始解析{PROV_NAME} {year}年{month}月 {title}")
  15. data = parse_page_data(driver, url, year, month)
  16. if data:
  17. df = pd.DataFrame(data)
  18. db.bulk_insert(
  19. df,
  20. 't_yujin_crossborder_prov_region_trade',
  21. conflict_columns=['crossborder_year_month', 'city_code'],
  22. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  23. 'yoy_import_export', 'yoy_import', 'yoy_export']
  24. )
  25. log.info(f"{PROV_NAME} {year}年{month}月 {title}数据解析完成")
  26. def calculate_monthly_data(year, month):
  27. """第二阶段:计算并更新单月数据(适用于非1月)"""
  28. if month == 1:
  29. log.info("1月数据已经是单月数据,无需计算")
  30. return
  31. log.info(f"开始计算{PROV_NAME} {year}年{month}月单月数据...")
  32. # 获取当前月份累计数据
  33. current_month = f"{year}-{month:02d}"
  34. current_data = db.query(
  35. f"SELECT * FROM t_yujin_crossborder_prov_region_trade "
  36. f"WHERE crossborder_year_month = '{current_month}' and prov_code = '{PROV_CODE}'"
  37. )
  38. # 正确判断DataFrame是否为空
  39. if current_data.empty:
  40. log.warning(f"找不到上个月({current_month})的数据,无法计算单月数据")
  41. return
  42. # 获取上个月累计数据
  43. prev_month = month - 1
  44. prev_month_str = f"{year}-{prev_month:02d}"
  45. prev_data = db.query(
  46. f"SELECT city_code, monthly_total, monthly_import, monthly_export "
  47. f"FROM t_yujin_crossborder_prov_region_trade "
  48. f"WHERE crossborder_year_month = '{prev_month_str}' and prov_code = '{PROV_CODE}'"
  49. )
  50. if prev_data.empty:
  51. log.warning(f"找不到上个月({prev_month_str})的数据,无法计算单月数据")
  52. return
  53. # 转换为DataFrame并合并
  54. current_df = pd.DataFrame(current_data)
  55. prev_df = pd.DataFrame(prev_data)
  56. merged_df = pd.merge(
  57. current_df,
  58. prev_df,
  59. on='city_code',
  60. suffixes=('', '_prev')
  61. )
  62. # 计算单月数据
  63. merged_df['monthly_total'] = merged_df['monthly_total'] - merged_df['monthly_total_prev']
  64. merged_df['monthly_import'] = merged_df['monthly_import'] - merged_df['monthly_import_prev']
  65. merged_df['monthly_export'] = merged_df['monthly_export'] - merged_df['monthly_export_prev']
  66. # 将同比数据置为空
  67. merged_df['yoy_import_export'] = None
  68. merged_df['yoy_import'] = None
  69. merged_df['yoy_export'] = None
  70. # 只保留需要的列
  71. result_df = merged_df[[
  72. 'city_name', 'city_code', 'crossborder_year', 'crossborder_year_month',
  73. 'prov_code', 'prov_name', 'monthly_total', 'monthly_import',
  74. 'monthly_export', 'yoy_import_export', 'yoy_import', 'yoy_export'
  75. ]]
  76. # 更新数据库
  77. db.bulk_insert(
  78. result_df,
  79. 't_yujin_crossborder_prov_region_trade',
  80. conflict_columns=['crossborder_year_month', 'city_code'],
  81. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  82. 'yoy_import_export', 'yoy_import', 'yoy_export']
  83. )
  84. log.info(f"{PROV_NAME} {year}年{month}月单月数据计算完成")
  85. def parse_page_data(driver, url, year, month):
  86. """解析页面数据(保持不变)"""
  87. data = []
  88. try:
  89. log.info(f"当前采集数据页面:{url}")
  90. driver.execute_script(f"window.open('{url}')")
  91. driver.switch_to.window(driver.window_handles[-1])
  92. table_xpath = '//table[contains(@style, "BORDER-COLLAPSE: collapse")]'
  93. WebDriverWait(driver, 60).until(
  94. EC.presence_of_element_located(
  95. (By.XPATH, table_xpath))
  96. )
  97. table = driver.find_element(By.XPATH, table_xpath)
  98. if not table:
  99. log.warning("未找到表格元素")
  100. return data
  101. rows = table.find_elements(By.TAG_NAME, 'tr')
  102. if len(rows) < 4:
  103. log.warning("表格行数不足")
  104. return data
  105. COLUMN_MAPPING = {
  106. 'city_name': 0,
  107. 'monthly_total': 1,
  108. 'yoy_import_export': 2,
  109. 'monthly_export': 5,
  110. 'yoy_export': 6,
  111. 'monthly_import': 9,
  112. 'yoy_import': 10
  113. }
  114. for row in rows[3:]:
  115. cols = [td.text.strip() for td in row.find_elements(By.TAG_NAME, 'td')]
  116. try:
  117. city_name = cols[COLUMN_MAPPING['city_name']]
  118. city_name = city_name.replace("广东省", "")
  119. city_code = GUANGDONG_CITY.get(city_name)
  120. if not city_code:
  121. log.debug(f"跳过未识别的城市: {city_name}")
  122. continue
  123. item = {
  124. 'city_name': city_name,
  125. 'city_code': city_code,
  126. 'crossborder_year': year,
  127. 'crossborder_year_month': f"{year}-{month:02d}",
  128. 'prov_code': PROV_CODE,
  129. 'prov_name': PROV_NAME,
  130. 'monthly_total': parse_value(cols[COLUMN_MAPPING['monthly_total']]),
  131. 'monthly_import': parse_value(cols[COLUMN_MAPPING['monthly_import']]),
  132. 'monthly_export': parse_value(cols[COLUMN_MAPPING['monthly_export']]),
  133. 'yoy_import_export': parse_value(cols[COLUMN_MAPPING['yoy_import_export']]),
  134. 'yoy_export': parse_value(cols[COLUMN_MAPPING['yoy_export']]),
  135. 'yoy_import': parse_value(cols[COLUMN_MAPPING['yoy_import']])
  136. }
  137. data.append(item)
  138. except Exception as e:
  139. log.error(f"解析行数据失败: {str(e)}")
  140. continue
  141. except Exception as e:
  142. log.error(f"解析页面失败:{str(e)}")
  143. raise
  144. finally:
  145. driver.close()
  146. driver.switch_to.window(driver.window_handles[0])
  147. return data