parse_utils.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import re
  2. from decimal import Decimal
  3. from pathlib import Path
  4. from utils.log import log
  5. YEAR_PATTERN = re.compile(r"^\d{4}$")
  6. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  7. def clean_commodity_name(name):
  8. """清洗商品名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
  9. if not isinstance(name, str):
  10. return name
  11. # 去除非文字字符:星号、连续空格等
  12. name = re.sub(r"[\\*#]", "", name)
  13. # 删除中英文括号及其包含的内容,如(已加旧码)或(2023版)
  14. name = re.sub(r'[((]已加旧码[))]', '', name)
  15. # 标准化空格:合并连续空格并去除首尾空格
  16. name = re.sub(r'\s+', ' ', name).strip()
  17. # 替换英文括号为中文括号
  18. name = re.sub(r'\(', '(', name)
  19. name = re.sub(r'\)', ')', name)
  20. name = re.sub(r'\[', '【', name)
  21. name = re.sub(r'\]', '】', name)
  22. return name
  23. def clean_county_name(name):
  24. """清洗国家名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
  25. if not isinstance(name, str):
  26. return name
  27. # 去除非文字字符
  28. name = re.sub(r"[*]", "", name)
  29. name = re.sub(r'[((]已加旧码[))]', '', name)
  30. name = re.sub(r'[((]含旧码[))]', '', name)
  31. # 删除“其中:”等关键词
  32. name = re.sub(r"其中:", "", name)
  33. # 🧠 新增逻辑:删除所有空格(包括中间空格)
  34. name = re.sub(r'\s+', '', name)
  35. return name.strip()
  36. def convert_wan_to_yuan(value):
  37. return float(Decimal(str(value)).quantize(Decimal('0.0000')) * Decimal('10000'))
  38. def find_unmatched_countries(final_df):
  39. # 创建一个布尔掩码,判断 'country_code' 列是否为 NaN
  40. unmatched_mask = final_df['country_code'].isnull()
  41. # 如果有未匹配的国家
  42. if unmatched_mask.any():
  43. # 获取未匹配国家的名称
  44. unmatched_names = final_df.loc[unmatched_mask, 'country_name'].unique()
  45. # 输出警告信息
  46. log.info("⚠️ 以下国家名称未在 COUNTRY_CODE_MAPPING 中找到匹配:")
  47. # 打印所有未匹配的国家名称,按字母排序
  48. for name in sorted(unmatched_names):
  49. log.info(f" - {name}")
  50. def extract_year_month_from_path(path):
  51. parts = path.parts
  52. try:
  53. year_part = parts[-2]
  54. month_part = parts[-1]
  55. if not YEAR_PATTERN.match(year_part):
  56. raise ValueError(f"无效年份格式:{year_part}")
  57. if not MONTH_PATTERN.match(month_part):
  58. raise ValueError(f"无效月份格式:{month_part}")
  59. return int(year_part), int(month_part)
  60. except IndexError:
  61. raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04")
  62. #获取上月目录
  63. def get_previous_month_dir(current_path):
  64. """生成前月目录路径"""
  65. try:
  66. year_part = current_path.parent.name
  67. month_part = current_path.name
  68. if not (YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part)):
  69. return None
  70. prev_month = int(month_part) - 1
  71. if prev_month < 1:
  72. return None
  73. return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
  74. except Exception as e:
  75. log.info(f"前月目录生成失败:{str(e)}")
  76. return None
  77. #数据清洗逻
  78. def traverse_and_process(root_path, process_func, province_name="henan"):
  79. """
  80. 通用分层遍历函数,支持不同省份的 parse_excel 入口
  81. Args:
  82. root_path (str): 根目录路径(如 downloads)
  83. process_func (function): 每个省份自己的 parse_excel 函数
  84. province_name (str): 省份名称,如 "henan", "shandong", "fujian"
  85. """
  86. log.info(f"开始遍历 {province_name} 目录:{root_path}")
  87. root = Path(root_path)
  88. # 获取年份目录(格式如 download/2025)
  89. year_dirs = [
  90. item for item in root.iterdir()
  91. if item.is_dir() and YEAR_PATTERN.match(item.name)
  92. ]
  93. # 倒序年份
  94. for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
  95. if not year_dir.exists() or not year_dir.is_dir():
  96. log.info(f"未找到 {province_name} 目录,跳过:{year_dir}")
  97. continue
  98. # 获取月份目录
  99. month_dirs = []
  100. for item in year_dir.iterdir():
  101. if item.is_dir() and MONTH_PATTERN.match(item.name):
  102. month_dirs.append({
  103. "path": item,
  104. "month": int(item.name)
  105. })
  106. # 倒序处理月份
  107. if month_dirs:
  108. log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
  109. for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
  110. log.info(f" 月份:{md['month']:02d} | 路径:{md['path']}")
  111. process_func(md['path']) # 调用传入的处理函数