123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import re
- from datetime import datetime
- from decimal import Decimal, InvalidOperation
- from pathlib import Path
- from crossborder.utils.log import get_logger
- log = get_logger(__name__)
- YEAR_PATTERN = re.compile(r"^\d{4}$")
- MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
- def parse_value(val):
- """增强型数值解析(含科学计数法处理),保留四位小数"""
- if val in ('-', None, 'None', 'null'):
- return None
- try:
- # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题
- if 'E' in str(val).upper():
- return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数
- return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数
- except Exception as e:
- print(f"数值解析错误:{val},错误:{e}")
- return None
- def convert_unit(value):
- """亿元转万元,处理空值"""
- try:
- # 如果 value 不是特殊的无效值,进行转换并保留4位小数
- return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None
- except (InvalidOperation, ValueError):
- # 捕获异常,返回 None
- return None
- def parse_ratio(value):
- """处理百分比数据,非空时返回 Decimal 类型"""
- if value in ('-', '', None):
- return None
- try:
- return Decimal(str(value).strip('%').replace(',', ''))
- except (InvalidOperation, ValueError):
- # 如果转换失败,也返回 None
- return None
- def clean_commodity_name(name):
- """清洗商品名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
- if not isinstance(name, str):
- return name
- # 去除非文字字符:星号、连续空格等
- name = re.sub(r"[\\*#]", "", name)
- # 删除中英文括号及其包含的内容,如(已加旧码)或(2023版)
- name = re.sub(r'[((]已加旧码[))]', '', name)
- # 标准化空格:合并连续空格并去除首尾空格
- name = re.sub(r'\s+', ' ', name).strip()
- # 替换英文括号为中文括号
- name = re.sub(r'\(', '(', name)
- name = re.sub(r'\)', ')', name)
- name = re.sub(r'\[', '【', name)
- name = re.sub(r'\]', '】', name)
- return name
- def clean_county_name(name):
- """清洗国家名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
- if not isinstance(name, str):
- return name
- # 去除非文字字符
- name = re.sub(r"[*]", "", name)
- name = re.sub(r'[((]已加旧码[))]', '', name)
- name = re.sub(r'[((]含旧码[))]', '', name)
- # 删除“其中:”等关键词
- name = re.sub(r"其中:", "", name)
- # 🧠 新增逻辑:删除所有空格(包括中间空格)
- name = re.sub(r'\s+', '', name)
- return name.strip()
- def convert_wan_to_yuan(value):
- return float(Decimal(str(value)).quantize(Decimal('0.0000')) * Decimal('10000'))
- def find_unmatched_countries(final_df):
- # 创建一个布尔掩码,判断 'country_code' 列是否为 NaN
- unmatched_mask = final_df['country_code'].isnull()
- # 如果有未匹配的国家
- if unmatched_mask.any():
- # 获取未匹配国家的名称
- unmatched_names = final_df.loc[unmatched_mask, 'country_name'].unique()
- # 输出警告信息
- log.info("⚠️ 以下国家名称未在 COUNTRY_CODE_MAPPING 中找到匹配:")
- # 打印所有未匹配的国家名称,按字母排序
- for name in sorted(unmatched_names):
- log.info(f" - {name}")
- def extract_year_month_from_path(path):
- """
- 从路径中提取年份和月份,兼容文件路径(如 .../shandong/2025/04/信息收集和环境.xls)
- """
- parts = path.parts
- try:
- # 如果是文件路径,尝试向上查找直到找到年份和月份目录
- idx = -1
- while len(parts) + idx >= 2:
- year_part = parts[idx - 1]
- month_part = parts[idx]
- if YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part):
- return int(year_part), int(month_part)
- idx -= 1
- # 如果没找到符合条件的结构
- raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04 或 .../shandong/2025/04/信息收集和环境.xls")
- except IndexError:
- raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04 或 .../shandong/2025/04/信息收集和环境.xls")
- #获取上月目录
- def get_previous_month_dir(current_path):
- """生成前月目录路径"""
- try:
- year_part = current_path.parent.name
- month_part = current_path.name
- if not (YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part)):
- return None
- prev_month = int(month_part) - 1
- if prev_month < 1:
- return None
- return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
- except Exception as e:
- log.info(f"前月目录生成失败:{str(e)}")
- return None
- #数据清洗逻
- def traverse_and_process(root_path, process_func, province_name="henan", year=None):
- """
- 通用分层遍历函数,支持不同年份范围的处理
- Args:
- root_path (str): 根目录路径(如 downloads)
- process_func (function): 每个省份自己的 parse_excel 函数
- province_name (str): 省份名称,如 "henan", "shandong", "fujian"
- year (int, optional): 指定截止年份(包含该年份及之后的所有年份)
- """
- root = Path(root_path)
- current_year = datetime.now().year
- # 检查根目录是否存在
- if not root.exists() or not root.is_dir():
- log.error(f"根目录不存在或不是目录: {root}")
- return
- log.info(f"开始遍历 {province_name} 目录:{root_path}")
- # 获取所有年份目录
- year_dirs = []
- for item in root.iterdir():
- if item.is_dir() and YEAR_PATTERN.match(item.name):
- try:
- year_int = int(item.name)
- year_dirs.append({"year": year_int, "path": item})
- except ValueError:
- continue
- if not year_dirs:
- log.warning(f"未找到任何年份目录,跳过处理: {root}")
- return
- # 按年份倒序排序
- year_dirs.sort(key=lambda x: x["year"], reverse=True)
- # 模式1: year=None,只处理最新年份的最新月份
- if year is None:
- log.info(f"模式:只处理最新年份的最新月份")
- # 取最新年份目录
- latest_year_dir = year_dirs[0]["path"]
- log.info(f"最新年份:{latest_year_dir.name}")
- # 处理该年份的最新月份
- process_latest_month(latest_year_dir, process_func, province_name)
- return
- # 模式2: year!=None,处理从当前年到指定年份的所有年份的所有月份
- if year > current_year:
- log.warning(f"警告:指定年份 {year} 大于当前年份 {current_year}, 将仅处理当前年")
- year = current_year
- log.info(f"模式:处理从当前年({current_year})到指定年({year})的所有月份(倒序)")
- # 筛选年份范围:从当前年到指定年份
- selected_years = [yd for yd in year_dirs if year <= yd["year"] <= current_year]
- if not selected_years:
- log.warning(f"没有找到在范围 [{year}-{current_year}] 内的年份目录")
- return
- # 按年份倒序处理所有月份
- for yd in selected_years:
- process_all_months(yd["path"], process_func, province_name)
- def process_latest_month(year_dir, process_func, province_name):
- """处理单个年份目录的最新月份"""
- log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
- # 获取所有月份目录
- month_dirs = []
- for item in year_dir.iterdir():
- if item.is_dir() and MONTH_PATTERN.match(item.name):
- try:
- month = int(item.name)
- month_dirs.append({"month": month, "path": item})
- except ValueError:
- continue
- if not month_dirs:
- log.info(f" {year_dir} 下没有月份目录,跳过")
- return
- # 按月倒序排序
- month_dirs.sort(key=lambda x: x["month"], reverse=True)
- # 取最新月份
- latest_month = month_dirs[0]
- month_name = f"{latest_month['month']:02d}"
- log.info(f" 处理最新月份:{month_name} | 路径:{latest_month['path']}")
- process_func(latest_month["path"])
- def process_all_months(year_dir, process_func, province_name):
- """处理单个年份目录的所有月份(倒序)"""
- log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
- # 获取所有月份目录
- month_dirs = []
- for item in year_dir.iterdir():
- if item.is_dir() and MONTH_PATTERN.match(item.name):
- try:
- month = int(item.name)
- month_dirs.append({"month": month, "path": item})
- except ValueError:
- continue
- if not month_dirs:
- log.info(f" {year_dir} 下没有月份目录,跳过")
- return
- # 按月倒序排序
- month_dirs.sort(key=lambda x: x["month"], reverse=True)
- # 处理所有月份
- for md in month_dirs:
- month_name = f"{md['month']:02d}"
- log.info(f" 月份:{month_name} | 路径:{md['path']}")
- process_func(md['path'])
|