123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import re
- from decimal import Decimal
- from pathlib import Path
- YEAR_PATTERN = re.compile(r"^\d{4}$")
- MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
- def clean_commodity_name(name):
- """清洗商品名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
- if not isinstance(name, str):
- return name
- # 去除非文字字符:星号、连续空格等
- name = re.sub(r"[\\*#]", "", name)
- # 删除中英文括号及其包含的内容,如(已加旧码)或(2023版)
- name = re.sub(r'[((]已加旧码[))]', '', name)
- # 标准化空格:合并连续空格并去除首尾空格
- name = re.sub(r'\s+', ' ', name).strip()
- # 替换英文括号为中文括号
- name = re.sub(r'\(', '(', name)
- name = re.sub(r'\)', ')', name)
- name = re.sub(r'\[', '【', name)
- name = re.sub(r'\]', '】', name)
- return name
- def clean_county_name(name):
- """清洗国家名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
- if not isinstance(name, str):
- return name
- # 去除非文字字符
- name = re.sub(r"[*]", "", name)
- name = re.sub(r'[((]已加旧码[))]', '', name)
- name = re.sub(r'[((]含旧码[))]', '', name)
- # 删除“其中:”等关键词
- name = re.sub(r"其中:", "", name)
- # 🧠 新增逻辑:删除所有空格(包括中间空格)
- name = re.sub(r'\s+', '', name)
- return name.strip()
- def convert_wan_to_yuan(value):
- return float(Decimal(str(value)).quantize(Decimal('0.0000')) * Decimal('10000'))
- def find_unmatched_countries(final_df):
- # 创建一个布尔掩码,判断 'country_code' 列是否为 NaN
- unmatched_mask = final_df['country_code'].isnull()
- # 如果有未匹配的国家
- if unmatched_mask.any():
- # 获取未匹配国家的名称
- unmatched_names = final_df.loc[unmatched_mask, 'country_name'].unique()
- # 输出警告信息
- print("⚠️ 以下国家名称未在 COUNTRY_CODE_MAPPING 中找到匹配:")
- # 打印所有未匹配的国家名称,按字母排序
- for name in sorted(unmatched_names):
- print(f" - {name}")
- def extract_year_month_from_path(path):
- parts = path.parts
- try:
- year_part = parts[-2]
- month_part = parts[-1]
- if not YEAR_PATTERN.match(year_part):
- raise ValueError(f"无效年份格式:{year_part}")
- if not MONTH_PATTERN.match(month_part):
- raise ValueError(f"无效月份格式:{month_part}")
- return int(year_part), int(month_part)
- except IndexError:
- raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04")
- #获取上月目录
- def get_previous_month_dir(current_path):
- """生成前月目录路径"""
- try:
- year_part = current_path.parent.name
- month_part = current_path.name
- if not (YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part)):
- return None
- prev_month = int(month_part) - 1
- if prev_month < 1:
- return None
- return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
- except Exception as e:
- print(f"前月目录生成失败:{str(e)}")
- return None
- #数据清洗逻
- def traverse_and_process(root_path, process_func, province_name="henan"):
- """
- 通用分层遍历函数,支持不同省份的 parse_excel 入口
- Args:
- root_path (str): 根目录路径(如 downloads)
- process_func (function): 每个省份自己的 parse_excel 函数
- province_name (str): 省份名称,如 "henan", "shandong", "fujian"
- """
- root = Path(root_path)
- # 获取年份目录(格式如 download/2025)
- year_dirs = [
- item for item in root.iterdir()
- if item.is_dir() and YEAR_PATTERN.match(item.name)
- ]
- # 倒序年份
- for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
- if not year_dir.exists() or not year_dir.is_dir():
- print(f"未找到 {province_name} 目录,跳过:{year_dir}")
- continue
- # 获取月份目录
- month_dirs = []
- for item in year_dir.iterdir():
- if item.is_dir() and MONTH_PATTERN.match(item.name):
- month_dirs.append({
- "path": item,
- "month": int(item.name)
- })
- # 倒序处理月份
- if month_dirs:
- print(f"\n年份:{year_dir.name} | 省份:{province_name}")
- for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
- print(f" 月份:{md['month']:02d} | 路径:{md['path']}")
- process_func(md['path']) # 调用传入的处理函数
|