selenium_guangdong_download.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. import argparse
  2. import random
  3. import re
  4. import time
  5. from datetime import datetime, timedelta
  6. import numpy as np
  7. import pandas as pd
  8. from selenium import webdriver
  9. from selenium.common import TimeoutException
  10. from selenium.webdriver.common.by import By
  11. from selenium.webdriver.support import expected_conditions as EC
  12. from selenium.webdriver.support.ui import WebDriverWait
  13. from crossborder.utils import base_mysql
  14. from crossborder.utils.db_helper import DBHelper
  15. from crossborder.utils.constants import DOWNLOAD_DIR, COUNTRY_CODE_MAPPING
  16. from crossborder.utils.dingtalk import send_dingtalk_message
  17. from crossborder.utils.download_utils import configure_stealth_options, generate_month_sequence
  18. from crossborder.utils.log import get_logger
  19. log = get_logger(__name__)
  20. from crossborder.utils.parse_utils import clean_county_name, convert_wan_to_yuan, clean_commodity_name
  21. BASE_URL = "http://gdfs.customs.gov.cn/guangdong_sub/zwgk62/sjgb59/6b4cdb3f-1.html"
  22. download_dir = DOWNLOAD_DIR / "guangdong"
  23. PROV_CODE = "440000"
  24. PROV_NAME = "广东省"
  25. db = DBHelper()
  26. #广东省海关数据 页面无下载按钮,这里数据为在线读取页面表格数据
  27. def detect_latest_month(driver):
  28. """三级回溯智能检测最新有效月份(修正年/月匹配逻辑)"""
  29. driver.get(BASE_URL)
  30. current_date = datetime.now()
  31. for offset in range(0, 3):
  32. check_date = current_date - timedelta(days=offset * 30)
  33. check_year = check_date.year
  34. check_month = check_date.month
  35. # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许前后有空格
  36. pattern = re.compile(
  37. rf'(5){check_year}\s*年\s*(?:1至)?{check_month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)',
  38. re.IGNORECASE
  39. )
  40. try:
  41. # 使用 Python 端的正则匹配所有含“广东省”的链接 title
  42. elements = WebDriverWait(driver, 10).until(
  43. EC.presence_of_all_elements_located((By.XPATH, '//a[contains(@title,"广东省")]'))
  44. )
  45. for element in elements:
  46. title = element.get_attribute("title")
  47. if pattern.search(title):
  48. log.info(f"已找到最新月份数据 {check_year}-{check_month}")
  49. return check_year, check_month
  50. log.error(f"未找到匹配项(正则:{pattern.pattern})")
  51. except TimeoutException:
  52. log.info(f"页面加载超时或无匹配项({check_year}-{check_month})")
  53. continue
  54. raise Exception("三个月内未找到有效数据")
  55. def process_month_data(driver, year, month):
  56. """处理月度数据:支持三种表格类型"""
  57. patterns = [
  58. (re.compile(rf'(5){year}\s*年\s*(1-)?{month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)'), 'country'),
  59. (re.compile(rf'(6){year}\s*年\s*(1-)?{month}\s*月广东省出口重点商品总值表(人民币值)'), 'export_commodity'),
  60. (re.compile(rf'(7){year}\s*年\s*(1-)?{month}\s*月广东省进口重点商品总值表(人民币值)'), 'import_commodity')
  61. ]
  62. found_count = 0
  63. commodity_data = {'export': [], 'import': []} # 存储商品数据等待合并
  64. links = driver.find_elements(By.XPATH, '//a[contains(@title,"广东省")]')
  65. for link in links:
  66. title = link.get_attribute("title")
  67. for pattern, table_type in patterns:
  68. if pattern.search(title):
  69. log.info(f"处理表格: {title}")
  70. url = link.get_attribute("href")
  71. # 新标签页处理
  72. driver.execute_script("window.open(arguments[0]);", url)
  73. driver.switch_to.window(driver.window_handles[-1])
  74. try:
  75. WebDriverWait(driver, 15).until(
  76. EC.presence_of_element_located((By.XPATH, "//table[@border='1']"))
  77. )
  78. # 根据表格类型处理数据
  79. if table_type == 'country':
  80. data = parse_page_country_data(driver, year, month)
  81. df_country = pd.DataFrame(data)
  82. db.bulk_insert(
  83. df_country,
  84. 't_yujin_crossborder_prov_country_trade',
  85. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  86. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  87. 'yoy_import_export', 'yoy_import', 'yoy_export']
  88. )
  89. found_count += 1
  90. else:
  91. data_type = 'export' if table_type == 'export_commodity' else 'import'
  92. commodity_data[data_type] = parse_page_commodity_data(driver, data_type, year, month)
  93. found_count += 1
  94. except Exception as e:
  95. log.info(f"表格处理失败: {e}")
  96. # 将数据返回,而不是在内部合并
  97. return found_count, commodity_data
  98. def parse_page_country_data(driver, year, month):
  99. """解析目标页面的表格数据"""
  100. data = []
  101. try:
  102. # 等待表格加载
  103. WebDriverWait(driver, 15).until(
  104. EC.presence_of_element_located((By.XPATH, "//table[@border='1']"))
  105. )
  106. table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']")
  107. rows = table.find_elements(By.TAG_NAME, 'tr')
  108. # 检测表格的列数
  109. header_row = rows[2] # 假设表头在第3行
  110. header_row.find_elements(By.TAG_NAME, 'td')
  111. # 数据行从第4行开始(跳过表头)
  112. for row in rows[4:]:
  113. cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')]
  114. country_name = cols[0]
  115. if (country_name == '广东外贸总值' or
  116. country_name == '东盟' or
  117. country_name == '欧盟' or
  118. country_name == '总计' or
  119. country_name == '广东外贸总额' or
  120. country_name == '广东外贸总计' or
  121. country_name == '总值'
  122. ):
  123. continue
  124. if month == 2:
  125. # 处理合并后的1月和2月数据
  126. monthly_total = convert_wan_to_yuan(cols[1])
  127. monthly_export = convert_wan_to_yuan(cols[3])
  128. monthly_import = convert_wan_to_yuan(cols[5])
  129. # 将2月的数据除以2,并生成1月和2月的数据
  130. for m in [1, 2]:
  131. adjusted_monthly_total = monthly_total / 2
  132. adjusted_monthly_export = monthly_export / 2
  133. adjusted_monthly_import = monthly_import / 2
  134. adjusted_yoy_total = 0
  135. adjusted_yoy_export = 0
  136. adjusted_yoy_import = 0
  137. country_name_clean = clean_county_name(country_name)
  138. country_code = COUNTRY_CODE_MAPPING.get(country_name_clean)
  139. data.append({
  140. 'crossborder_year': year,
  141. 'crossborder_year_month': f"{year}-{m:02d}",
  142. 'prov_code': PROV_CODE,
  143. 'prov_name': PROV_NAME,
  144. 'country_code': country_code,
  145. 'country_name': country_name_clean,
  146. 'monthly_total': adjusted_monthly_total,
  147. 'monthly_export': adjusted_monthly_export,
  148. 'monthly_import': adjusted_monthly_import,
  149. 'yoy_import_export': adjusted_yoy_total,
  150. 'yoy_export': adjusted_yoy_export,
  151. 'yoy_import': adjusted_yoy_import
  152. })
  153. else:
  154. # 原逻辑处理13列的情况
  155. monthly_total = convert_wan_to_yuan(cols[3])
  156. monthly_export = convert_wan_to_yuan(cols[7])
  157. monthly_import = convert_wan_to_yuan(cols[11])
  158. yoy_total = parse_number(cols[4])
  159. yoy_export = parse_number(cols[8])
  160. yoy_import = parse_number(cols[12])
  161. country_name_clean = clean_county_name(country_name)
  162. country_code = COUNTRY_CODE_MAPPING.get(country_name_clean)
  163. data.append({
  164. 'crossborder_year': year,
  165. 'crossborder_year_month': f"{year}-{month:02d}",
  166. 'prov_code': PROV_CODE,
  167. 'prov_name': PROV_NAME,
  168. 'country_code': country_code,
  169. 'country_name': country_name_clean,
  170. 'monthly_total': monthly_total,
  171. 'monthly_export': monthly_export,
  172. 'monthly_import': monthly_import,
  173. 'yoy_import_export': yoy_total,
  174. 'yoy_export': yoy_export,
  175. 'yoy_import': yoy_import
  176. })
  177. except Exception as e:
  178. log.info(f"解析表格失败: {e}")
  179. raise
  180. finally:
  181. driver.close()
  182. driver.switch_to.window(driver.window_handles[0])
  183. return data
  184. def parse_page_commodity_data(driver, data_type, year, month):
  185. """解析商品表通用函数"""
  186. data = []
  187. try:
  188. table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']")
  189. rows = table.find_elements(By.TAG_NAME, 'tr')
  190. # 检测表格的列数
  191. header_row = rows[2] # 假设表头在第3行
  192. cols = header_row.find_elements(By.TAG_NAME, 'td')
  193. num_cols = len(cols)
  194. # 数据行从第4行开始(跳过表头)
  195. for row in rows[4:]:
  196. cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')]
  197. if len(cols) < 3:
  198. continue
  199. # 清洗商品名称(处理&nbsp;和空格)
  200. name = clean_commodity_name(cols[0])
  201. if name == '肉类':
  202. name = '肉类(包括杂碎)'
  203. if month == 2:
  204. # 处理合并后的1月和2月数据
  205. value = convert_wan_to_yuan(cols[1])
  206. # 将2月的数据除以2,并生成1月和2月的数据
  207. for m in [1, 2]:
  208. adjusted_value = value / 2
  209. adjusted_yoy = 0 # 同比置为0
  210. data.append({
  211. 'commodity_name': name,
  212. 'commodity_code': db.get_commodity_id(name),
  213. 'monthly_export' if data_type == 'export' else 'monthly_import': adjusted_value,
  214. 'crossborder_year_month': f"{year}-{m:02d}"
  215. })
  216. else:
  217. # 原逻辑处理5列的情况
  218. value = convert_wan_to_yuan(cols[3] if data_type == 'export' else cols[3])
  219. data.append({
  220. 'commodity_name': name,
  221. 'commodity_code': db.get_commodity_id(name),
  222. 'monthly_export' if data_type == 'export' else 'monthly_import': value,
  223. 'crossborder_year_month': f"{year}-{month:02d}"
  224. })
  225. except Exception as e:
  226. log.info(f"解析商品表失败: {e}")
  227. raise
  228. finally:
  229. driver.close()
  230. driver.switch_to.window(driver.window_handles[0])
  231. return data
  232. def merge_commodity_data(import_data, export_data, year, month):
  233. """
  234. 根据commodity_code合并进出口数据(支持不同商品的存在情况)
  235. :param year:
  236. :param month:
  237. :param import_data: 进口数据列表(含commodity_code)
  238. :param export_data: 出口数据列表(含commodity_code)
  239. :return: 合并后的DataFrame
  240. """
  241. # 转换数据为DataFrame
  242. df_import = pd.DataFrame(import_data)
  243. df_export = pd.DataFrame(export_data)
  244. # 合并逻辑(全外连接保留所有商品)
  245. merged_df = pd.merge(
  246. df_import,
  247. df_export,
  248. on=['commodity_code', 'commodity_name', 'crossborder_year_month'],
  249. how='outer'
  250. )
  251. # 计算总量(可选,根据表结构需求)
  252. #进出口总值计算
  253. merged_df['monthly_total'] = merged_df['monthly_import'].fillna(0) + merged_df['monthly_export'].fillna(0)
  254. merged_df['monthly_total'] = merged_df['monthly_total'].replace(0, np.nan)
  255. merged_df['crossborder_year'] = year
  256. #不为空是填充传入年月,1.2月数据在上级已经构建好
  257. merged_df['crossborder_year_month'] = merged_df['crossborder_year_month'].fillna(f"{year}-{month:02d}")
  258. merged_df['prov_code'] = PROV_CODE
  259. merged_df['prov_name'] = PROV_NAME
  260. return merged_df
  261. def parse_number(text):
  262. """转换文本为浮点数(处理空值、负号)"""
  263. text = text.strip().replace(',', '')
  264. if not text or text == '-':
  265. return None
  266. try:
  267. return float(text)
  268. except ValueError:
  269. return None
  270. # 优化后的代码逻辑:
  271. def reverse_crawler(driver, target_months):
  272. """逆向分页抓取核心逻辑"""
  273. processed_months = set()
  274. page = 1
  275. for year, month in target_months:
  276. log.info(f"\n开始处理 {year}年{month}月数据".center(50, "="))
  277. WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")))
  278. current_page = 1
  279. found_tables = 0
  280. export_data = []
  281. import_data = []
  282. while True:
  283. random_sleep(base=2, variance=3)
  284. try:
  285. log.info(f"当前页面:{driver.current_url}, 第{page}页")
  286. found, commodity_data = process_month_data(driver, year, month)
  287. found_tables += found
  288. # 累积商品数据
  289. if commodity_data['export']:
  290. export_data.extend(commodity_data['export'])
  291. if commodity_data['import']:
  292. import_data.extend(commodity_data['import'])
  293. # 完成三个表格采集
  294. if found_tables >= 3:
  295. # 确保同时有进口和出口数据
  296. if export_data and import_data:
  297. final_df = merge_commodity_data(export_data, import_data, year, month)
  298. final_df = final_df.replace({np.nan: None})
  299. db.bulk_insert(df=final_df, table_name='t_yujin_crossborder_prov_commodity_trade',
  300. conflict_columns=['commodity_code', 'crossborder_year_month'],
  301. update_columns=['monthly_import', 'monthly_export', 'monthly_total'])
  302. log.info(f"已完成{year}年{month}月全部表格采集")
  303. processed_months.add((year, month))
  304. break
  305. log.info(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集")
  306. # 分页点击逻辑
  307. WebDriverWait(driver, 15).until(
  308. EC.element_to_be_clickable((By.XPATH, '//a[@class="pagingNormal next"]'))
  309. ).click()
  310. current_page += 1
  311. page += 1
  312. except TimeoutException:
  313. log.info(f"未找到更多分页,已采集表格数:{found_tables}/3")
  314. break
  315. except Exception as e:
  316. log.info(f"分页异常:{str(e)}")
  317. handle_retry(driver)
  318. break
  319. return processed_months
  320. def random_sleep(base=2, variance=5):
  321. """智能随机等待"""
  322. sleep_time = base + random.random() * variance
  323. time.sleep(sleep_time)
  324. def handle_retry(driver):
  325. """异常恢复处理"""
  326. try:
  327. driver.refresh()
  328. WebDriverWait(driver, 15).until(
  329. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  330. )
  331. log.info("浏览器异常已恢复")
  332. except:
  333. log.info("需要人工干预的严重错误")
  334. raise
  335. def main():
  336. """主入口(优化广东海关数据采集逻辑)"""
  337. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  338. parser.add_argument('--year', type=int, default=None,
  339. help='终止年份(如2023),未指定时抓取最新两个月')
  340. args = parser.parse_args()
  341. start_time = time.time()
  342. target_months = [] # 初始化目标月份列表
  343. data_collected = False # 数据采集状态标记
  344. log.info("【广东海关】数据抓取开始".center(66, "*"))
  345. driver = None # 初始化避免未定义
  346. try:
  347. # 1. 初始化浏览器
  348. driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
  349. log.info("浏览器初始化完成")
  350. # 2. 检测最新有效月份
  351. valid_year, valid_month = detect_latest_month(driver)
  352. log.info(f"【广东海关】最新数据:{valid_year}年{valid_month:02d}月")
  353. # 3. 数据存在性检查(仅在未指定年份时执行)
  354. if not args.year:
  355. db = DBHelper()
  356. count = db.get_code_exist(f'{valid_year}-{valid_month:02d}', PROV_CODE)
  357. if count > 0:
  358. log.error(f"数据库已存在【广东省】 {valid_year}-{valid_month:02d} 商品贸易数据,本次抓取终止")
  359. return
  360. # 4. 生成目标月份序列
  361. if args.year:
  362. target_months = generate_month_sequence(
  363. start_year=valid_year,
  364. start_month=valid_month,
  365. end_year=args.year,
  366. skip_january=True
  367. )
  368. else:
  369. # 未指定年份时只抓最近两个月份
  370. target_months = generate_month_sequence(
  371. start_year=valid_year,
  372. start_month=valid_month
  373. )
  374. log.info(f"【广东海关】目标采集月份序列:{len(target_months)}个月份")
  375. # 5. 执行数据采集
  376. reverse_crawler(driver, target_months)
  377. data_collected = True
  378. log.info(f"【广东海关】成功采集 {len(target_months)} 个月份数据")
  379. # 6. 数据清洗入库(如有需要可添加)
  380. # log.info("\n【广东海关】数据清洗入库中...")
  381. # traverse_and_process(...)
  382. except Exception as e:
  383. # 捕获并记录所有异常
  384. log.exception(f"【广东海关】采集过程中发生错误: {str(e)}")
  385. send_dingtalk_message(f"【广东海关数据采集异常】{str(e)}")
  386. finally:
  387. # 确保浏览器退出
  388. if driver:
  389. driver.quit()
  390. log.info("浏览器已退出")
  391. # 7. 只有在成功采集数据时才发送通知
  392. if data_collected:
  393. duration = time.time() - start_time
  394. minutes, seconds = divmod(duration, 60)
  395. message = (f"【广东海关】{len(target_months)}个月份数据采集完成"
  396. f",总耗时:{int(minutes)}分{seconds:.1f}秒")
  397. send_dingtalk_message(message)
  398. log.info("【广东海关】处理流程结束".center(66, "*"))
  399. if __name__ == "__main__":
  400. main()