guangdong_sub_customs_parse_excel.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. from decimal import Decimal, InvalidOperation
  2. from pathlib import Path
  3. import pandas as pd
  4. from db_helper import DBHelper
  5. from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY
  6. from utils.log import log
  7. from utils.parse_utils import traverse_and_process, extract_year_month_from_path, get_previous_month_dir
  8. # 配置日志
  9. PROV_CODE = "440000"
  10. PROV_NAME = "广东省"
  11. download_dir = DOWNLOAD_DIR / "guangdong"
  12. db = DBHelper()
  13. _zhanjiang_first_month = None
  14. # 广州海关:万元
  15. # 深圳海关:亿元
  16. # 汕头海关:万元
  17. # 黄埔海关:万元
  18. # 江门海关:亿元
  19. # 湛江海关:万元
  20. def match_customs_file(filename, customs_name, year, month):
  21. """匹配海关文件"""
  22. filename_lower = filename.lower()
  23. if customs_name == "广州海关":
  24. return "7地市进出口综合统计" in filename_lower
  25. elif customs_name == "深圳海关":
  26. return "深圳海关综合统计资料" in filename_lower
  27. elif customs_name == "汕头海关":
  28. return "5市报表" in filename_lower
  29. elif customs_name == "黄埔海关":
  30. return "东莞市进出口企业性质总值表" in filename_lower
  31. elif customs_name == "江门海关":
  32. if "江门市" in filename_lower or "阳江市" in filename_lower:
  33. return "外贸进出口有关情况统计表" in filename_lower
  34. elif customs_name == "湛江海关":
  35. if "湛江市" in filename_lower or "茂名市" in filename_lower:
  36. return "外贸进出口数据" in filename_lower
  37. return False
  38. def find_header_and_columns(df, year, month):
  39. """
  40. 查找匹配月份的表头行并定位对应的列索引。
  41. 支持三种基础格式:
  42. - "2024年12月"
  43. - "2024年12月-2024年12月"
  44. - "2024-12-01 00:00:00"
  45. 如果都未找到,则尝试匹配特殊格式:"2023年01月-2023年02月"
  46. """
  47. candidate_month_strs = [
  48. f"{year}年{month:02d}月",
  49. f"{year}年{month:02d}月-{year}年{month:02d}月",
  50. f"{year}-{month:02d}-01 00:00:00"
  51. ]
  52. header_row = None
  53. for i in range(min(3, len(df))):
  54. row_cells = [str(cell).strip() for cell in df.iloc[i]]
  55. for cell_val in row_cells:
  56. if any(s == cell_val for s in candidate_month_strs):
  57. header_row = i
  58. break
  59. if header_row is not None:
  60. break
  61. # 如果没找到常规格式,尝试特殊格式:2023年01月-2023年02月
  62. special_format = "2023年01月-2023年02月"
  63. if header_row is None:
  64. log.warning(f"未找到常规格式,尝试匹配特殊格式: {special_format}")
  65. for i in range(min(3, len(df))):
  66. row_cells = [str(cell).strip() for cell in df.iloc[i]]
  67. for cell_val in row_cells:
  68. if cell_val == special_format:
  69. header_row = i
  70. log.info(f"成功匹配特殊格式: {special_format} 行号={i}")
  71. break
  72. if header_row is not None:
  73. break
  74. if header_row is None:
  75. log.error("未找到任何支持的表头格式")
  76. return None, []
  77. # 确定数据列位置(包含所有候选格式)
  78. data_cols = []
  79. for col in range(len(df.columns)):
  80. cell_val = str(df.iloc[header_row, col]).strip()
  81. if cell_val in candidate_month_strs:
  82. data_cols.append(col)
  83. if not data_cols:
  84. for col in range(len(df.columns)):
  85. cell_val = str(df.iloc[header_row, col]).strip()
  86. if cell_val in [special_format]:
  87. data_cols.append(col)
  88. if not data_cols:
  89. log.error("未找到对应的数据列")
  90. return header_row, []
  91. return header_row, data_cols
  92. def process_guangzhou_customs(file_path, year, month,customs_type='guangzhou'):
  93. """处理广州海关数据"""
  94. try:
  95. # 读取Excel文件
  96. df = pd.read_excel(file_path, sheet_name=0, header=None)
  97. log.info(f"处理广州海关文件: {file_path.name}")
  98. header_row,data_cols = find_header_and_columns(df, year, month)
  99. # 提取7地市数据
  100. results = []
  101. target_cities = ["广州市", "韶关市", "佛山市", "肇庆市", "河源市",
  102. "清远市", "汕头市", "梅州市", "汕尾市", "潮州市", "揭阳市", "云浮市"]
  103. for idx in range(header_row + 1, len(df)):
  104. row = df.iloc[idx]
  105. city_cell = str(row[0])
  106. if "广东省" in city_cell:
  107. city_name = city_cell.replace("广东省", "").strip()
  108. if city_name in target_cities:
  109. try:
  110. if len(data_cols)>3:
  111. monthly_total = Decimal(str(row[data_cols[0]])) # 进出口
  112. monthly_export = Decimal(str(row[data_cols[4]])) # 出口
  113. monthly_import = Decimal(str(row[data_cols[8]])) # 进口
  114. yoy_import_export = Decimal(str(row[data_cols[1]])) # 进出口同比
  115. yoy_export = Decimal(str(row[data_cols[5]])) # 出口同比
  116. yoy_import = Decimal(str(row[data_cols[9]])) # 进口同比
  117. else:
  118. monthly_total = Decimal(str(row[data_cols[0]]))
  119. monthly_export = Decimal(str(row[data_cols[1]]))
  120. monthly_import = Decimal(str(row[data_cols[2]]))
  121. yoy_import_export = Decimal(str(row[data_cols[0]+1])) # 进出口同比
  122. yoy_export = Decimal(str(row[data_cols[1]+1])) # 出口同比
  123. yoy_import = Decimal(str(row[data_cols[2]+1])) # 进口同比
  124. results.append({
  125. "city_name": city_name,
  126. "monthly_total": monthly_total,
  127. "monthly_import": monthly_import,
  128. "monthly_export": monthly_export,
  129. "yoy_import_export": yoy_import_export,
  130. "yoy_import": yoy_import,
  131. "yoy_export": yoy_export
  132. })
  133. except Exception as e:
  134. log.error(f"处理行 {idx} 出错: {e}")
  135. return pd.DataFrame(results)
  136. except Exception as e:
  137. log.error(f"处理广州海关文件出错: {str(e)}")
  138. return pd.DataFrame()
  139. def process_shenzhen_customs(file_path, year, month):
  140. """处理深圳海关数据(完整6指标版)"""
  141. try:
  142. log.info(f"处理深圳海关文件: {file_path.name}")
  143. results = []
  144. for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"),
  145. ("惠州市", "惠州市进出口(贸易方式)")]:
  146. try:
  147. df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
  148. except:
  149. log.warning(f"未找到sheet: {sheet_name}")
  150. continue
  151. # 查找总值行
  152. total_row_idx = None
  153. for idx in range(len(df)):
  154. if "总值" in str(df.iloc[idx, 0]):
  155. total_row_idx = idx
  156. break
  157. if total_row_idx is None:
  158. log.error(f"未找到总值行: {sheet_name}")
  159. continue
  160. try:
  161. # 列索引映射(基于您提供的完整数据结构)
  162. # 进出口 | 出口 | 进口 的数值和同比
  163. monthly_total = convert_unit(str(df.iloc[total_row_idx, 1]))
  164. yoy_total = Decimal(str(df.iloc[total_row_idx, 2]))
  165. monthly_export = convert_unit(str(df.iloc[total_row_idx, 3]))
  166. yoy_export = Decimal(str(df.iloc[total_row_idx, 4]))
  167. monthly_import = convert_unit(str(df.iloc[total_row_idx, 5]))
  168. yoy_import = Decimal(str(df.iloc[total_row_idx, 6]))
  169. results.append({
  170. "city_name": city,
  171. "monthly_total": monthly_total,
  172. "monthly_export": monthly_export,
  173. "monthly_import": monthly_import,
  174. "yoy_import_export": yoy_total, # 进出口同比
  175. "yoy_export": yoy_export,
  176. "yoy_import": yoy_import
  177. })
  178. except Exception as e:
  179. log.error(f"处理 {city} 数据出错: {e}")
  180. # 尝试部分提取(回退方案)
  181. try:
  182. monthly_total = Decimal(str(df.iloc[total_row_idx, 1])) * Decimal('10000')
  183. results.append({
  184. "city_name": city,
  185. "monthly_total": monthly_total,
  186. "monthly_export": None,
  187. "monthly_import": None,
  188. "yoy_import_export": Decimal('0'),
  189. "yoy_export": Decimal('0'),
  190. "yoy_import": Decimal('0')
  191. })
  192. except:
  193. log.error(f"连基础进出口总值都无法提取: {sheet_name}")
  194. return pd.DataFrame(results)
  195. except Exception as e:
  196. log.error(f"处理深圳海关文件出错: {str(e)}")
  197. return pd.DataFrame()
  198. def process_shantou_customs(file_path, year, month):
  199. """处理汕头海关数据 (逻辑同广州海关)"""
  200. log.info(f"处理汕头海关文件: {file_path.name}")
  201. return process_guangzhou_customs(file_path, year, month,customs_type='shantou')
  202. def process_huangpu_customs(file_path, year, month):
  203. """处理黄埔海关数据"""
  204. try:
  205. df = pd.read_excel(file_path, sheet_name=0, header=None)
  206. log.info(f"处理黄埔海关文件: {file_path.name}")
  207. # 查找合计行
  208. total_row_idx = None
  209. for idx in range(len(df)):
  210. if "合计" in str(df.iloc[idx, 0]):
  211. total_row_idx = idx
  212. break
  213. if total_row_idx is None:
  214. log.error("未找到合计行")
  215. return pd.DataFrame()
  216. # 查找包含月份的表头,匹配23年1月-23年多种格式
  217. if year == 2024 and month == 12:
  218. month_str = '45627'
  219. elif year == 2023 and month == 12:
  220. month_str = '45261'
  221. elif year == 2023 and month == 3:
  222. month_str = f"{year}年{month:02d}月-{year}年{month:02d}月"
  223. else:
  224. month_str = f'{year}-{month:02d}-01 00:00:00'
  225. header_row = None
  226. for i in range(min(3, len(df))):
  227. row_cells = [str(cell).strip() for cell in df.iloc[i]]
  228. if any(month_str in cell in cell for cell in row_cells):
  229. header_row = i
  230. break
  231. if header_row is None:
  232. log.error(f"未找到 {month_str} 人民币表头")
  233. return pd.DataFrame()
  234. # 确定数据列位置
  235. data_cols = []
  236. for col in range(len(df.columns)):
  237. cell_val = str(df.iloc[header_row, col])
  238. if month_str in cell_val :
  239. data_cols.append(col)
  240. if len(data_cols) < 3:
  241. log.error(f"未找到足够的 {month_str} 人民币数据列")
  242. return pd.DataFrame()
  243. try:
  244. result = []
  245. # 提取数据
  246. row = df.iloc[total_row_idx]
  247. monthly_total = Decimal(str(row[data_cols[0]])) # 进出口
  248. monthly_export = Decimal(str(row[data_cols[1]])) # 出口
  249. monthly_import = Decimal(str(row[data_cols[2]])) # 进口
  250. yoy_import_export = str(row[data_cols[0]+1]) # 进出口同比
  251. yoy_export = str(row[data_cols[1]+1]) # 出口同比
  252. yoy_import = str(row[data_cols[2]+1]) # 进口同比
  253. result.append({
  254. "crossborder_year_month": f'{year}-{month:02d}',
  255. "city_name": "东莞市",
  256. "monthly_total": monthly_total,
  257. "monthly_import": monthly_import,
  258. "monthly_export": monthly_export,
  259. "yoy_import_export": yoy_import_export,
  260. "yoy_import": yoy_import,
  261. "yoy_export": yoy_export
  262. })
  263. #东莞市一月数据比较特殊
  264. if month == 2:
  265. monthly_total_sum = Decimal(str(row[data_cols[0]+4])) # 进出口
  266. monthly_export_sum = Decimal(str(row[data_cols[1]+4])) # 出口
  267. monthly_import_sum = Decimal(str(row[data_cols[2]+4])) # 进口
  268. january_monthly_total = monthly_total_sum - monthly_total
  269. january_monthly_export = monthly_export_sum - monthly_export
  270. january_monthly_import = monthly_import_sum - monthly_import
  271. result.append({
  272. "crossborder_year_month": f'{year}-01',
  273. "city_name": "东莞市",
  274. "monthly_total": january_monthly_total,
  275. "monthly_import": january_monthly_export,
  276. "monthly_export": january_monthly_import,
  277. })
  278. return pd.DataFrame(result)
  279. except Exception as e:
  280. log.error(f"提取数据出错: {e}")
  281. return pd.DataFrame()
  282. except Exception as e:
  283. log.error(f"处理黄埔海关文件出错: {str(e)}")
  284. return pd.DataFrame()
  285. def process_jiangmen_customs(file_path, year, month):
  286. """处理江门海关数据"""
  287. try:
  288. df = pd.read_excel(file_path, sheet_name=0, header=None)
  289. log.info(f"处理江门海关文件: {file_path.name}")
  290. # 从文件名确定城市
  291. city_name = "江门市" if "江门市" in file_path.name else "阳江市"
  292. target_row_name = "江门市进出口商品总值" if city_name == "江门市" else "阳江市进出口商品总值"
  293. # 查找目标行
  294. target_row_idx = None
  295. for idx in range(len(df)):
  296. if target_row_name in str(df.iloc[idx, 0]):
  297. target_row_idx = idx
  298. break
  299. if target_row_idx is None:
  300. log.error(f"未找到 {target_row_name} 行")
  301. return pd.DataFrame()
  302. # 查找包含月份的表头
  303. if month == 2:
  304. month_str = f"1-{month}月"
  305. else:
  306. month_str = f"{month}月"
  307. header_row = None
  308. for i in range(min(6, len(df))):
  309. if any(month_str == str(cell).strip() for cell in df.iloc[i]):
  310. header_row = i
  311. break
  312. if header_row is None:
  313. log.error(f"未找到 {month_str} 表头")
  314. return pd.DataFrame()
  315. # 确定数据列位置
  316. data_cols = []
  317. for col in range(len(df.columns)):
  318. cell_val = str(df.iloc[header_row, col])
  319. if cell_val.strip() == month_str:
  320. data_cols.append(col)
  321. if len(data_cols) < 3:
  322. log.error(f"未找到足够的 {month_str} 数据列")
  323. return pd.DataFrame()
  324. # 提取数据 (亿元转换为万元)
  325. row = df.iloc[target_row_idx]
  326. monthly_total = convert_unit(str(row[data_cols[0]]))
  327. monthly_export = convert_unit(str(row[data_cols[1]]))
  328. monthly_import = convert_unit(str(row[data_cols[2]]))
  329. yoy_import_export = str(row[data_cols[0]+1])
  330. yoy_export = str(row[data_cols[1]+1])
  331. yoy_import = str(row[data_cols[2]+1])
  332. return pd.DataFrame([{
  333. "city_name": city_name,
  334. "monthly_total": monthly_total,
  335. "monthly_import": monthly_import,
  336. "monthly_export": monthly_export,
  337. "yoy_import_export": yoy_import_export,
  338. "yoy_import": yoy_import,
  339. "yoy_export": yoy_export
  340. }])
  341. except Exception as e:
  342. log.error(f"处理江门海关文件出错: {str(e)}")
  343. return pd.DataFrame()
  344. def process_zhanjiang_customs(file_path, year, month):
  345. """处理湛江海关数据 满足「是第一次调用」或者「month == 12」任意一个条件"""
  346. global _zhanjiang_first_month
  347. # 判断是否应执行核心逻辑
  348. if _zhanjiang_first_month is None:
  349. # 第一次调用,记录初始月份
  350. _zhanjiang_first_month = month
  351. should_execute = True
  352. else:
  353. # 后续调用仅在以下情况下执行:
  354. # - 与初次调用的 month 相同(允许多城市同时处理)
  355. # - 或者 month == 12
  356. should_execute = (month == _zhanjiang_first_month) or (month == 12)
  357. if not should_execute:
  358. log.warning(f"跳过湛江海关{year}年{month}文件: {file_path.name}")
  359. return pd.DataFrame()
  360. try:
  361. df = pd.read_excel(file_path, sheet_name=0, header=None)
  362. log.info(f"处理湛江海关文件: {file_path.name}")
  363. # 从文件名确定城市
  364. city_name = "湛江市" if "湛江市" in file_path.name else "茂名市"
  365. # 查找月度数据表格
  366. month_str = f"{year}年前{month}个月{city_name}进出口数据(月度)"
  367. # target_header_row = None
  368. #
  369. # # 查找表头行
  370. # for i in range(min(3, len(df))): # 在前5行找表头
  371. # if any(month_str in str(cell) for cell in df.iloc[i]):
  372. # target_header_row = i
  373. # break
  374. #
  375. # if target_header_row is None:
  376. # log.error(f"未找到 {month_str} 表头")
  377. # return pd.DataFrame()
  378. target_header_row =1
  379. # 确定数据列位置
  380. data_cols = {}
  381. for col in range(len(df.columns)):
  382. cell_val = str(df.iloc[target_header_row+1, col])
  383. data_cols["year_month"] = 0
  384. if "进出口" in cell_val :
  385. data_cols["total"] = col
  386. elif "出口" in cell_val :
  387. data_cols["export"] = col
  388. elif "进口" in cell_val :
  389. data_cols["import"] = col
  390. if len(data_cols) < 1:
  391. log.error(f"未找到足够的 {month_str} 数据列")
  392. return pd.DataFrame()
  393. start_row = target_header_row + 4
  394. end_row = start_row + month
  395. # 提取多行数据
  396. rows = df.iloc[start_row:end_row]
  397. results = []
  398. for _, row in rows.iterrows():
  399. try:
  400. year_month = str(row[data_cols["year_month"]])
  401. formatted_year_month = f"{year_month[:4]}-{year_month[4:]}"
  402. monthly_total = Decimal(str(row[data_cols["total"]])) # 进出口
  403. monthly_export = Decimal(str(row[data_cols["export"]])) # 出口
  404. monthly_import = Decimal(str(row[data_cols["import"]])) # 进口
  405. yoy_import_export = Decimal(str(row[data_cols["total"] + 1])) # 进出口同比
  406. yoy_export = Decimal(str(row[data_cols["export"] + 1])) # 出口同比
  407. yoy_import = Decimal(str(row[data_cols["import"] + 1])) # 进口同比
  408. results.append({
  409. "crossborder_year_month":formatted_year_month,
  410. "city_name": city_name,
  411. "monthly_total": monthly_total,
  412. "monthly_import": monthly_import,
  413. "monthly_export": monthly_export,
  414. "yoy_import_export": yoy_import_export,
  415. "yoy_import": yoy_import,
  416. "yoy_export": yoy_export
  417. })
  418. except Exception as e:
  419. log.error(f"解析某一行数据出错: {e}")
  420. continue # 单行错误不影响整体处理
  421. return pd.DataFrame(results)
  422. except Exception as e:
  423. log.error(f"处理湛江海关文件出错: {str(e)}")
  424. return pd.DataFrame()
  425. def get_customs_processor(customs_name):
  426. """获取不同海关的处理函数"""
  427. processors = {
  428. "广州海关": process_guangzhou_customs,
  429. "深圳海关": process_shenzhen_customs,
  430. "汕头海关": process_shantou_customs,
  431. "黄埔海关": process_huangpu_customs,
  432. "江门海关": process_jiangmen_customs,
  433. "湛江海关": process_zhanjiang_customs
  434. }
  435. return processors.get(customs_name)
  436. def parse_excel(current_dir):
  437. """主解析入口(优化为单参数模式)
  438. Args:
  439. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  440. """
  441. current_path = Path(current_dir)
  442. year, month = extract_year_month_from_path(current_path)
  443. log.info(f"开始处理 {year}年{month}月 数据: {current_dir}")
  444. try:
  445. # 动态获取前月目录
  446. prev_dir = get_previous_month_dir(current_path) if month != 1 else None
  447. if prev_dir:
  448. log.info(f"上个月数据目录: {prev_dir}")
  449. # 获取当前目录下所有Excel文件
  450. excel_files = list(current_path.glob("*.xls*"))
  451. if not excel_files:
  452. log.warning(f"当前目录下未找到Excel文件: {current_dir}")
  453. return
  454. # 按海关处理每个文件
  455. all_results = pd.DataFrame()
  456. customs_map = {
  457. "广州海关": [],
  458. "深圳海关": [],
  459. "汕头海关": [],
  460. "黄埔海关": [],
  461. "江门海关": [],
  462. "湛江海关": []
  463. }
  464. # 组织文件到不同的海关
  465. for file_path in excel_files:
  466. for customs_name in customs_map.keys():
  467. if match_customs_file(file_path.name, customs_name, year, month):
  468. customs_map[customs_name].append(file_path)
  469. log.info(f"匹配到 {customs_name} 文件: {file_path.name}")
  470. break
  471. # 处理每个海关的文件
  472. for customs_name, file_list in customs_map.items():
  473. processor = get_customs_processor(customs_name)
  474. if not processor:
  475. continue
  476. for file_path in file_list:
  477. # 特殊处理深圳海关和江门海关的2月数据(缺少1月数据)
  478. if customs_name in ["深圳海关", "江门海关"] and month == 2:
  479. # 获取2月份完整数据
  480. df_full = processor(file_path, year, month)
  481. if df_full.empty:
  482. continue
  483. # 创建1月份数据 (取2月份数据的一半)
  484. df_half = df_full.copy()
  485. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  486. # 注意:只有数值列才进行减半操作,避免对字符串操作
  487. if col in df_half.columns:
  488. df_half[col] = df_half[col] / 2
  489. # 设置1月份
  490. df_half['crossborder_year_month'] = f'{year}-01'
  491. # 设置2月份数据 (取2月份数据的一半)
  492. df_full['crossborder_year_month'] = f'{year}-02'
  493. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  494. if col in df_full.columns:
  495. df_full[col] = df_full[col] / 2
  496. # 合并数据
  497. df_customs = pd.concat([df_half, df_full])
  498. else:
  499. # 正常处理数据
  500. df_customs = processor(file_path, year, month)
  501. if not df_customs.empty:
  502. df_customs['month'] = month
  503. if not df_customs.empty:
  504. all_results = pd.concat([all_results, df_customs])
  505. # 如果没有获取到数据
  506. if all_results.empty:
  507. log.warning(f"未处理到有效数据: {current_dir}")
  508. return
  509. # 添加公共字段
  510. all_results['prov_code'] = PROV_CODE
  511. all_results['prov_name'] = PROV_NAME
  512. all_results['crossborder_year'] = year
  513. all_results['city_code'] = all_results['city_name'].astype(str).map(GUANGDONG_CITY).fillna('0000')
  514. all_results['month'] = all_results.get('month', month)
  515. if 'crossborder_year_month' in all_results.columns:
  516. all_results['crossborder_year_month'] = (
  517. all_results['crossborder_year_month']
  518. .replace('', pd.NA)
  519. .fillna(f'{year}-{month:02d}')
  520. )
  521. else:
  522. all_results['crossborder_year_month'] = f'{year}-{month:02d}'
  523. # 排序并删除重复项
  524. # all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month'])
  525. # all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last')
  526. # 打印处理结果
  527. log.info(f"处理完成,共获得广东省 {len(all_results)} 条地级市数据")
  528. # 选择入库字段
  529. final_df = all_results[[
  530. 'crossborder_year_month', 'prov_code', 'prov_name',
  531. 'crossborder_year','city_code', 'city_name',
  532. 'monthly_total','monthly_import', 'monthly_export',
  533. 'yoy_import_export','yoy_import', 'yoy_export'
  534. ]].copy()
  535. final_df = final_df.where(pd.notna(final_df), None)
  536. # 打印前几条数据
  537. # log.debug(f"处理后数据示例:\n{final_df.head()}")
  538. # 这里调用DBHelper入库(实际使用时请取消注释)
  539. from db_helper import DBHelper
  540. db = DBHelper()
  541. db.bulk_insert(
  542. final_df,
  543. 't_yujin_crossborder_prov_region_trade',
  544. conflict_columns=['crossborder_year_month', 'city_code'],
  545. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  546. 'yoy_import_export', 'yoy_import', 'yoy_export']
  547. )
  548. log.info(f"{current_dir}数据已全部成功处理")
  549. except Exception as e:
  550. log.error(f"处理失败:{current_dir},错误:{str(e)}")
  551. raise
  552. def convert_unit(value):
  553. """亿元转万元,处理空值"""
  554. try:
  555. # 如果 value 不是特殊的无效值,进行转换并保留4位小数
  556. return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None
  557. except (InvalidOperation, ValueError):
  558. # 捕获异常,返回 None
  559. return None
  560. # 测试入口
  561. if __name__ == "__main__":
  562. traverse_and_process(download_dir, parse_excel, province_name="guangdong")
  563. db_helper = DBHelper()
  564. db_helper.update_prov_yoy("广东省")