import re from decimal import Decimal from pathlib import Path YEAR_PATTERN = re.compile(r"^\d{4}$") MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$") def clean_commodity_name(name): """清洗商品名称中的特殊字符和括号注释,并替换英文括号为中文括号""" if not isinstance(name, str): return name # 去除非文字字符:星号、连续空格等 name = re.sub(r"[\\*#]", "", name) # 删除中英文括号及其包含的内容,如(已加旧码)或(2023版) name = re.sub(r'[((]已加旧码[))]', '', name) # 标准化空格:合并连续空格并去除首尾空格 name = re.sub(r'\s+', ' ', name).strip() # 替换英文括号为中文括号 name = re.sub(r'\(', '(', name) name = re.sub(r'\)', ')', name) name = re.sub(r'\[', '【', name) name = re.sub(r'\]', '】', name) return name def clean_county_name(name): """清洗国家名称中的特殊字符和括号注释,并替换英文括号为中文括号""" if not isinstance(name, str): return name # 去除非文字字符 name = re.sub(r"[*]", "", name) name = re.sub(r'[((]已加旧码[))]', '', name) name = re.sub(r'[((]含旧码[))]', '', name) # 删除“其中:”等关键词 name = re.sub(r"其中:", "", name) # 🧠 新增逻辑:删除所有空格(包括中间空格) name = re.sub(r'\s+', '', name) return name.strip() def convert_wan_to_yuan(value): return float(Decimal(str(value)).quantize(Decimal('0.0000')) * Decimal('10000')) def find_unmatched_countries(final_df): # 创建一个布尔掩码,判断 'country_code' 列是否为 NaN unmatched_mask = final_df['country_code'].isnull() # 如果有未匹配的国家 if unmatched_mask.any(): # 获取未匹配国家的名称 unmatched_names = final_df.loc[unmatched_mask, 'country_name'].unique() # 输出警告信息 print("⚠️ 以下国家名称未在 COUNTRY_CODE_MAPPING 中找到匹配:") # 打印所有未匹配的国家名称,按字母排序 for name in sorted(unmatched_names): print(f" - {name}") def extract_year_month_from_path(path): parts = path.parts try: year_part = parts[-2] month_part = parts[-1] if not YEAR_PATTERN.match(year_part): raise ValueError(f"无效年份格式:{year_part}") if not MONTH_PATTERN.match(month_part): raise ValueError(f"无效月份格式:{month_part}") return int(year_part), int(month_part) except IndexError: raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04") #获取上月目录 def get_previous_month_dir(current_path): """生成前月目录路径""" try: year_part = current_path.parent.name month_part = current_path.name if not (YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part)): return None prev_month = int(month_part) - 1 if prev_month < 1: return None return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}" except Exception as e: print(f"前月目录生成失败:{str(e)}") return None #数据清洗逻 def traverse_and_process(root_path, process_func, province_name="henan"): """ 通用分层遍历函数,支持不同省份的 parse_excel 入口 Args: root_path (str): 根目录路径(如 downloads) process_func (function): 每个省份自己的 parse_excel 函数 province_name (str): 省份名称,如 "henan", "shandong", "fujian" """ root = Path(root_path) # 获取年份目录(格式如 download/2025) year_dirs = [ item for item in root.iterdir() if item.is_dir() and YEAR_PATTERN.match(item.name) ] # 倒序年份 for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True): if not year_dir.exists() or not year_dir.is_dir(): print(f"未找到 {province_name} 目录,跳过:{year_dir}") continue # 获取月份目录 month_dirs = [] for item in year_dir.iterdir(): if item.is_dir() and MONTH_PATTERN.match(item.name): month_dirs.append({ "path": item, "month": int(item.name) }) # 倒序处理月份 if month_dirs: print(f"\n年份:{year_dir.name} | 省份:{province_name}") for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True): print(f" 月份:{md['month']:02d} | 路径:{md['path']}") process_func(md['path']) # 调用传入的处理函数