selenium_guangdong_download.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. import argparse
  2. import random
  3. import re
  4. import time
  5. from datetime import datetime, timedelta
  6. import pandas as pd
  7. from selenium import webdriver
  8. from selenium.common import TimeoutException
  9. from selenium.webdriver.common.by import By
  10. from selenium.webdriver.support import expected_conditions as EC
  11. from selenium.webdriver.support.ui import WebDriverWait
  12. from utils.db_helper import DBHelper
  13. from utils.constants import DOWNLOAD_DIR, COUNTRY_CODE_MAPPING
  14. from utils.download_utils import configure_stealth_options, generate_month_sequence
  15. from utils.log import log
  16. from utils.parse_utils import clean_county_name, convert_wan_to_yuan, clean_commodity_name
  17. BASE_URL = "http://gdfs.customs.gov.cn/guangdong_sub/zwgk62/sjgb59/6b4cdb3f-1.html"
  18. download_dir = DOWNLOAD_DIR / "guangdong"
  19. PROV_CODE = "440000"
  20. PROV_NAME = "广东省"
  21. db = DBHelper()
  22. def detect_latest_month(driver):
  23. """三级回溯智能检测最新有效月份(修正年/月匹配逻辑)"""
  24. driver.get(BASE_URL)
  25. current_date = datetime.now()
  26. for offset in range(0, 3):
  27. check_date = current_date - timedelta(days=offset * 30)
  28. check_year = check_date.year
  29. check_month = check_date.month
  30. # 构建正则表达式:兼容“1至X月”和“X月”两种格式,并允许前后有空格
  31. pattern = re.compile(
  32. rf'(5){check_year}\s*年\s*(?:1至)?{check_month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)',
  33. re.IGNORECASE
  34. )
  35. try:
  36. # 使用 Python 端的正则匹配所有含“广东省”的链接 title
  37. elements = WebDriverWait(driver, 10).until(
  38. EC.presence_of_all_elements_located((By.XPATH, '//a[contains(@title,"广东省")]'))
  39. )
  40. for element in elements:
  41. title = element.get_attribute("title")
  42. if pattern.search(title):
  43. log.info(f"已找到最新月份数据 {check_year}-{check_month}")
  44. return check_year, check_month
  45. log.info(f"未找到匹配项(正则:{pattern.pattern})")
  46. except TimeoutException:
  47. log.info(f"页面加载超时或无匹配项({check_year}-{check_month})")
  48. continue
  49. raise Exception("三个月内未找到有效数据")
  50. def process_month_data(driver, year, month):
  51. """处理月度数据:支持三种表格类型"""
  52. patterns = [
  53. (re.compile(rf'(5){year}\s*年\s*(1-)?{month}\s*月广东省外贸进出口主要国别(地区)总值表(人民币值)'), 'country'),
  54. (re.compile(rf'(6){year}\s*年\s*(1-)?{month}\s*月广东省出口重点商品总值表(人民币值)'), 'export_commodity'),
  55. (re.compile(rf'(7){year}\s*年\s*(1-)?{month}\s*月广东省进口重点商品总值表(人民币值)'), 'import_commodity')
  56. ]
  57. found_count = 0
  58. commodity_data = {'export': [], 'import': []} # 存储商品数据等待合并
  59. links = driver.find_elements(By.XPATH, '//a[contains(@title,"广东省")]')
  60. for link in links:
  61. title = link.get_attribute("title")
  62. for pattern, table_type in patterns:
  63. if pattern.search(title):
  64. log.info(f"处理表格: {title}")
  65. url = link.get_attribute("href")
  66. # 新标签页处理
  67. driver.execute_script("window.open(arguments[0]);", url)
  68. driver.switch_to.window(driver.window_handles[-1])
  69. try:
  70. WebDriverWait(driver, 15).until(
  71. EC.presence_of_element_located((By.XPATH, "//table[@border='1']"))
  72. )
  73. # 根据表格类型处理数据
  74. if table_type == 'country':
  75. data = parse_country_table(driver, year, month)
  76. df_country = pd.DataFrame(data)
  77. db.bulk_insert(
  78. df_country,
  79. 't_yujin_crossborder_prov_country_trade',
  80. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  81. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  82. 'yoy_import_export', 'yoy_import', 'yoy_export']
  83. )
  84. found_count += 1
  85. else:
  86. data_type = 'export' if table_type == 'export_commodity' else 'import'
  87. commodity_data[data_type] = parse_commodity_table(driver, data_type, year, month)
  88. found_count += 1
  89. except Exception as e:
  90. log.info(f"表格处理失败: {e}")
  91. # 将数据返回,而不是在内部合并
  92. return found_count, commodity_data
  93. def parse_country_table(driver, year, month):
  94. """解析目标页面的表格数据"""
  95. data = []
  96. try:
  97. # 等待表格加载
  98. WebDriverWait(driver, 15).until(
  99. EC.presence_of_element_located((By.XPATH, "//table[@border='1']"))
  100. )
  101. table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']")
  102. rows = table.find_elements(By.TAG_NAME, 'tr')
  103. # 检测表格的列数
  104. header_row = rows[2] # 假设表头在第3行
  105. header_row.find_elements(By.TAG_NAME, 'td')
  106. # 数据行从第4行开始(跳过表头)
  107. for row in rows[4:]:
  108. cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')]
  109. country_name = cols[0]
  110. if (country_name == '广东外贸总值' or
  111. country_name == '东盟' or
  112. country_name == '欧盟' or
  113. country_name == '总计' or
  114. country_name == '广东外贸总额' or
  115. country_name == '广东外贸总计' or
  116. country_name == '总值'
  117. ):
  118. continue
  119. if month == 2:
  120. # 处理合并后的1月和2月数据
  121. monthly_total = convert_wan_to_yuan(cols[1])
  122. monthly_export = convert_wan_to_yuan(cols[3])
  123. monthly_import = convert_wan_to_yuan(cols[5])
  124. # 将2月的数据除以2,并生成1月和2月的数据
  125. for m in [1, 2]:
  126. adjusted_monthly_total = monthly_total / 2
  127. adjusted_monthly_export = monthly_export / 2
  128. adjusted_monthly_import = monthly_import / 2
  129. adjusted_yoy_total = 0
  130. adjusted_yoy_export = 0
  131. adjusted_yoy_import = 0
  132. country_name_clean = clean_county_name(country_name)
  133. country_code = COUNTRY_CODE_MAPPING.get(country_name_clean)
  134. data.append({
  135. 'crossborder_year': year,
  136. 'crossborder_year_month': f"{year}-{m:02d}",
  137. 'prov_code': PROV_CODE,
  138. 'prov_name': PROV_NAME,
  139. 'country_code': country_code,
  140. 'country_name': country_name_clean,
  141. 'monthly_total': adjusted_monthly_total,
  142. 'monthly_export': adjusted_monthly_export,
  143. 'monthly_import': adjusted_monthly_import,
  144. 'yoy_import_export': adjusted_yoy_total,
  145. 'yoy_export': adjusted_yoy_export,
  146. 'yoy_import': adjusted_yoy_import
  147. })
  148. else:
  149. # 原逻辑处理13列的情况
  150. monthly_total = convert_wan_to_yuan(cols[3])
  151. monthly_export = convert_wan_to_yuan(cols[7])
  152. monthly_import = convert_wan_to_yuan(cols[11])
  153. yoy_total = parse_number(cols[4])
  154. yoy_export = parse_number(cols[8])
  155. yoy_import = parse_number(cols[12])
  156. country_name_clean = clean_county_name(country_name)
  157. country_code = COUNTRY_CODE_MAPPING.get(country_name_clean)
  158. data.append({
  159. 'crossborder_year': year,
  160. 'crossborder_year_month': f"{year}-{month:02d}",
  161. 'prov_code': PROV_CODE,
  162. 'prov_name': PROV_NAME,
  163. 'country_code': country_code,
  164. 'country_name': country_name_clean,
  165. 'monthly_total': monthly_total,
  166. 'monthly_export': monthly_export,
  167. 'monthly_import': monthly_import,
  168. 'yoy_import_export': yoy_total,
  169. 'yoy_export': yoy_export,
  170. 'yoy_import': yoy_import
  171. })
  172. except Exception as e:
  173. log.info(f"解析表格失败: {e}")
  174. raise
  175. finally:
  176. driver.close()
  177. driver.switch_to.window(driver.window_handles[0])
  178. return data
  179. def parse_commodity_table(driver, data_type, year, month):
  180. """解析商品表通用函数"""
  181. data = []
  182. try:
  183. table = driver.find_element(By.XPATH, "//table[@border='1' and @bordercolor='#000000']")
  184. rows = table.find_elements(By.TAG_NAME, 'tr')
  185. # 检测表格的列数
  186. header_row = rows[2] # 假设表头在第3行
  187. cols = header_row.find_elements(By.TAG_NAME, 'td')
  188. num_cols = len(cols)
  189. # 数据行从第4行开始(跳过表头)
  190. for row in rows[4:]:
  191. cols = [col.text.strip() for col in row.find_elements(By.TAG_NAME, 'td')]
  192. if len(cols) < 3:
  193. continue
  194. # 清洗商品名称(处理&nbsp;和空格)
  195. name = clean_commodity_name(cols[0])
  196. if month == 2:
  197. # 处理合并后的1月和2月数据
  198. value = convert_wan_to_yuan(cols[1])
  199. # 将2月的数据除以2,并生成1月和2月的数据
  200. for m in [1, 2]:
  201. adjusted_value = value / 2
  202. adjusted_yoy = 0 # 同比置为0
  203. data.append({
  204. 'commodity_name': name,
  205. 'commodity_code': db.get_commodity_id(name),
  206. 'monthly_export' if data_type == 'export' else 'monthly_import': adjusted_value,
  207. 'crossborder_year_month': f"{year}-{m:02d}"
  208. })
  209. else:
  210. # 原逻辑处理5列的情况
  211. value = convert_wan_to_yuan(cols[3] if data_type == 'export' else cols[3])
  212. data.append({
  213. 'commodity_name': name,
  214. 'commodity_code': db.get_commodity_id(name),
  215. 'monthly_export' if data_type == 'export' else 'monthly_import': value,
  216. 'crossborder_year_month': f"{year}-{month:02d}"
  217. })
  218. except Exception as e:
  219. log.info(f"解析商品表失败: {e}")
  220. raise
  221. finally:
  222. driver.close()
  223. driver.switch_to.window(driver.window_handles[0])
  224. return data
  225. def merge_commodity_data(import_data, export_data, year, month):
  226. """
  227. 根据commodity_code合并进出口数据(支持不同商品的存在情况)
  228. :param year:
  229. :param month:
  230. :param import_data: 进口数据列表(含commodity_code)
  231. :param export_data: 出口数据列表(含commodity_code)
  232. :return: 合并后的DataFrame
  233. """
  234. # 转换数据为DataFrame
  235. df_import = pd.DataFrame(import_data)
  236. df_export = pd.DataFrame(export_data)
  237. # 合并逻辑(全外连接保留所有商品)
  238. merged_df = pd.merge(
  239. df_import,
  240. df_export,
  241. on=['commodity_code', 'commodity_name', 'crossborder_year_month'],
  242. how='outer'
  243. ).fillna(0)
  244. # 计算总量(可选,根据表结构需求)
  245. merged_df['monthly_total'] = merged_df['monthly_import'] + merged_df['monthly_export']
  246. merged_df['crossborder_year'] = year
  247. merged_df['crossborder_year_month'] = f"{year}-{month:02d}"
  248. merged_df['prov_code'] = PROV_CODE
  249. merged_df['prov_name'] = PROV_NAME
  250. return merged_df
  251. def parse_number(text):
  252. """转换文本为浮点数(处理空值、负号)"""
  253. text = text.strip().replace(',', '')
  254. if not text or text == '-':
  255. return None
  256. try:
  257. return float(text)
  258. except ValueError:
  259. return None
  260. # 优化后的代码逻辑:
  261. def reverse_crawler(driver, target_months):
  262. """逆向分页抓取核心逻辑"""
  263. processed_months = set()
  264. page = 1
  265. for year, month in target_months:
  266. log.info(f"\n开始处理 {year}年{month}月数据".center(50, "="))
  267. WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CLASS_NAME, "conList_ul")))
  268. current_page = 1
  269. found_tables = 0
  270. export_data = []
  271. import_data = []
  272. while True:
  273. random_sleep(base=2, variance=3)
  274. try:
  275. log.info(f"当前页面:{driver.current_url}, 第{page}页")
  276. found, commodity_data = process_month_data(driver, year, month)
  277. found_tables += found
  278. # 累积商品数据
  279. if commodity_data['export']:
  280. export_data.extend(commodity_data['export'])
  281. if commodity_data['import']:
  282. import_data.extend(commodity_data['import'])
  283. # 完成三个表格采集
  284. if found_tables >= 3:
  285. # 确保同时有进口和出口数据
  286. if export_data and import_data:
  287. final_df = merge_commodity_data(export_data, import_data, year, month)
  288. db.bulk_insert(df=final_df, table_name='t_yujin_crossborder_prov_commodity_trade',
  289. conflict_columns=['commodity_code', 'crossborder_year_month'],
  290. update_columns=['monthly_import', 'monthly_export', 'monthly_total'])
  291. log.info(f"已完成{year}年{month}月全部表格采集")
  292. processed_months.add((year, month))
  293. break
  294. log.info(f"第{page}页已采集表格数:{found_tables}/3,前往下一页采集")
  295. # 分页点击逻辑
  296. WebDriverWait(driver, 15).until(
  297. EC.element_to_be_clickable((By.XPATH, '//a[@class="pagingNormal next"]'))
  298. ).click()
  299. current_page += 1
  300. page += 1
  301. except TimeoutException:
  302. log.info(f"未找到更多分页,已采集表格数:{found_tables}/3")
  303. break
  304. except Exception as e:
  305. log.info(f"分页异常:{str(e)}")
  306. handle_retry(driver)
  307. break
  308. return processed_months
  309. def random_sleep(base=2, variance=5):
  310. """智能随机等待"""
  311. sleep_time = base + random.random() * variance
  312. time.sleep(sleep_time)
  313. def handle_retry(driver):
  314. """异常恢复处理"""
  315. try:
  316. driver.refresh()
  317. WebDriverWait(driver, 15).until(
  318. EC.presence_of_element_located((By.CLASS_NAME, "conList_ul"))
  319. )
  320. log.info("浏览器异常已恢复")
  321. except:
  322. log.info("需要人工干预的严重错误")
  323. raise
  324. def main():
  325. """主入口(优化参数处理逻辑)"""
  326. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  327. parser.add_argument('--year', type=int, default=None,
  328. help='终止年份(如2023),未指定时抓取最新两个月')
  329. args = parser.parse_args()
  330. driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
  331. try:
  332. # 智能检测最新有效月份
  333. valid_year, valid_month = detect_latest_month(driver)
  334. log.info(f"【广东海关】最新数据:{valid_year}年{valid_month:02d}月")
  335. # 生成目标序列
  336. if args.year:
  337. # 指定年份时:从最新月到目标年1月
  338. target_months = generate_month_sequence(
  339. start_year=valid_year,
  340. start_month=valid_month,
  341. end_year=args.year,
  342. skip_january=True
  343. )
  344. else:
  345. # 未指定年份时:取最近两个月
  346. target_months = generate_month_sequence(valid_year, valid_month)
  347. log.info(f"【广东海关】目标采集月份序列:{target_months}")
  348. reverse_crawler(driver, target_months)
  349. log.info(f"【广东海关】{len(target_months)}个月份数据已采集完毕")
  350. finally:
  351. driver.quit()
  352. log.info("\n数据清洗入库中...")
  353. if __name__ == "__main__":
  354. main()