parse_utils.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import re
  2. from decimal import Decimal
  3. from pathlib import Path
  4. YEAR_PATTERN = re.compile(r"^\d{4}$")
  5. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  6. def clean_commodity_name(name):
  7. """清洗商品名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
  8. if not isinstance(name, str):
  9. return name
  10. # 去除非文字字符:星号、连续空格等
  11. name = re.sub(r"[\\*#]", "", name)
  12. # 删除中英文括号及其包含的内容,如(已加旧码)或(2023版)
  13. name = re.sub(r'[((]已加旧码[))]', '', name)
  14. # 标准化空格:合并连续空格并去除首尾空格
  15. name = re.sub(r'\s+', ' ', name).strip()
  16. # 替换英文括号为中文括号
  17. name = re.sub(r'\(', '(', name)
  18. name = re.sub(r'\)', ')', name)
  19. name = re.sub(r'\[', '【', name)
  20. name = re.sub(r'\]', '】', name)
  21. return name
  22. def clean_county_name(name):
  23. """清洗国家名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
  24. if not isinstance(name, str):
  25. return name
  26. # 去除非文字字符
  27. name = re.sub(r"[*]", "", name)
  28. name = re.sub(r'[((]已加旧码[))]', '', name)
  29. name = re.sub(r'[((]含旧码[))]', '', name)
  30. # 删除“其中:”等关键词
  31. name = re.sub(r"其中:", "", name)
  32. # 🧠 新增逻辑:删除所有空格(包括中间空格)
  33. name = re.sub(r'\s+', '', name)
  34. return name.strip()
  35. def convert_wan_to_yuan(value):
  36. return float(Decimal(str(value)).quantize(Decimal('0.0000')) * Decimal('10000'))
  37. def find_unmatched_countries(final_df):
  38. # 创建一个布尔掩码,判断 'country_code' 列是否为 NaN
  39. unmatched_mask = final_df['country_code'].isnull()
  40. # 如果有未匹配的国家
  41. if unmatched_mask.any():
  42. # 获取未匹配国家的名称
  43. unmatched_names = final_df.loc[unmatched_mask, 'country_name'].unique()
  44. # 输出警告信息
  45. print("⚠️ 以下国家名称未在 COUNTRY_CODE_MAPPING 中找到匹配:")
  46. # 打印所有未匹配的国家名称,按字母排序
  47. for name in sorted(unmatched_names):
  48. print(f" - {name}")
  49. def extract_year_month_from_path(path):
  50. parts = path.parts
  51. try:
  52. year_part = parts[-2]
  53. month_part = parts[-1]
  54. if not YEAR_PATTERN.match(year_part):
  55. raise ValueError(f"无效年份格式:{year_part}")
  56. if not MONTH_PATTERN.match(month_part):
  57. raise ValueError(f"无效月份格式:{month_part}")
  58. return int(year_part), int(month_part)
  59. except IndexError:
  60. raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04")
  61. #获取上月目录
  62. def get_previous_month_dir(current_path):
  63. """生成前月目录路径"""
  64. try:
  65. year_part = current_path.parent.name
  66. month_part = current_path.name
  67. if not (YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part)):
  68. return None
  69. prev_month = int(month_part) - 1
  70. if prev_month < 1:
  71. return None
  72. return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
  73. except Exception as e:
  74. print(f"前月目录生成失败:{str(e)}")
  75. return None
  76. #数据清洗逻
  77. def traverse_and_process(root_path, process_func, province_name="henan"):
  78. """
  79. 通用分层遍历函数,支持不同省份的 parse_excel 入口
  80. Args:
  81. root_path (str): 根目录路径(如 downloads)
  82. process_func (function): 每个省份自己的 parse_excel 函数
  83. province_name (str): 省份名称,如 "henan", "shandong", "fujian"
  84. """
  85. root = Path(root_path)
  86. # 获取年份目录(格式如 download/2025)
  87. year_dirs = [
  88. item for item in root.iterdir()
  89. if item.is_dir() and YEAR_PATTERN.match(item.name)
  90. ]
  91. # 倒序年份
  92. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  93. if not year_dir.exists() or not year_dir.is_dir():
  94. print(f"未找到 {province_name} 目录,跳过:{year_dir}")
  95. continue
  96. # 获取月份目录
  97. month_dirs = []
  98. for item in year_dir.iterdir():
  99. if item.is_dir() and MONTH_PATTERN.match(item.name):
  100. month_dirs.append({
  101. "path": item,
  102. "month": int(item.name)
  103. })
  104. # 倒序处理月份
  105. if month_dirs:
  106. print(f"\n年份:{year_dir.name} | 省份:{province_name}")
  107. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  108. print(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  109. process_func(md['path']) # 调用传入的处理函数