parse_utils.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import re
  2. from datetime import datetime
  3. from decimal import Decimal, InvalidOperation
  4. from pathlib import Path
  5. from crossborder.utils.log import get_logger
  6. log = get_logger(__name__)
  7. YEAR_PATTERN = re.compile(r"^\d{4}$")
  8. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  9. def parse_value(val):
  10. """增强型数值解析(含科学计数法处理),保留四位小数"""
  11. if val in ('-', None, 'None', 'null'):
  12. return None
  13. try:
  14. # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题
  15. if 'E' in str(val).upper():
  16. return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数
  17. return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数
  18. except Exception as e:
  19. print(f"数值解析错误:{val},错误:{e}")
  20. return None
  21. def convert_unit(value):
  22. """亿元转万元,处理空值"""
  23. try:
  24. # 如果 value 不是特殊的无效值,进行转换并保留4位小数
  25. return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None
  26. except (InvalidOperation, ValueError):
  27. # 捕获异常,返回 None
  28. return None
  29. def parse_ratio(value):
  30. """处理百分比数据,非空时返回 Decimal 类型"""
  31. if value in ('-', '', None):
  32. return None
  33. try:
  34. return Decimal(str(value).strip('%').replace(',', ''))
  35. except (InvalidOperation, ValueError):
  36. # 如果转换失败,也返回 None
  37. return None
  38. def clean_commodity_name(name):
  39. """清洗商品名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
  40. if not isinstance(name, str):
  41. return name
  42. # 去除非文字字符:星号、连续空格等
  43. name = re.sub(r"[\\*#]", "", name)
  44. # 删除中英文括号及其包含的内容,如(已加旧码)或(2023版)
  45. name = re.sub(r'[((]已加旧码[))]', '', name)
  46. # 标准化空格:合并连续空格并去除首尾空格
  47. name = re.sub(r'\s+', ' ', name).strip()
  48. # 替换英文括号为中文括号
  49. name = re.sub(r'\(', '(', name)
  50. name = re.sub(r'\)', ')', name)
  51. name = re.sub(r'\[', '【', name)
  52. name = re.sub(r'\]', '】', name)
  53. return name
  54. def clean_county_name(name):
  55. """清洗国家名称中的特殊字符和括号注释,并替换英文括号为中文括号"""
  56. if not isinstance(name, str):
  57. return name
  58. # 去除非文字字符
  59. name = re.sub(r"[*]", "", name)
  60. name = re.sub(r'[((]已加旧码[))]', '', name)
  61. name = re.sub(r'[((]含旧码[))]', '', name)
  62. # 删除“其中:”等关键词
  63. name = re.sub(r"其中:", "", name)
  64. # 🧠 新增逻辑:删除所有空格(包括中间空格)
  65. name = re.sub(r'\s+', '', name)
  66. return name.strip()
  67. def convert_wan_to_yuan(value):
  68. return float(Decimal(str(value)).quantize(Decimal('0.0000')) * Decimal('10000'))
  69. def find_unmatched_countries(final_df):
  70. # 创建一个布尔掩码,判断 'country_code' 列是否为 NaN
  71. unmatched_mask = final_df['country_code'].isnull()
  72. # 如果有未匹配的国家
  73. if unmatched_mask.any():
  74. # 获取未匹配国家的名称
  75. unmatched_names = final_df.loc[unmatched_mask, 'country_name'].unique()
  76. # 输出警告信息
  77. log.info("⚠️ 以下国家名称未在 COUNTRY_CODE_MAPPING 中找到匹配:")
  78. # 打印所有未匹配的国家名称,按字母排序
  79. for name in sorted(unmatched_names):
  80. log.info(f" - {name}")
  81. def extract_year_month_from_path(path):
  82. """
  83. 从路径中提取年份和月份,兼容文件路径(如 .../shandong/2025/04/信息收集和环境.xls)
  84. """
  85. parts = path.parts
  86. try:
  87. # 如果是文件路径,尝试向上查找直到找到年份和月份目录
  88. idx = -1
  89. while len(parts) + idx >= 2:
  90. year_part = parts[idx - 1]
  91. month_part = parts[idx]
  92. if YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part):
  93. return int(year_part), int(month_part)
  94. idx -= 1
  95. # 如果没找到符合条件的结构
  96. raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04 或 .../shandong/2025/04/信息收集和环境.xls")
  97. except IndexError:
  98. raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04 或 .../shandong/2025/04/信息收集和环境.xls")
  99. #获取上月目录
  100. def get_previous_month_dir(current_path):
  101. """生成前月目录路径"""
  102. try:
  103. year_part = current_path.parent.name
  104. month_part = current_path.name
  105. if not (YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part)):
  106. return None
  107. prev_month = int(month_part) - 1
  108. if prev_month < 1:
  109. return None
  110. return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
  111. except Exception as e:
  112. log.info(f"前月目录生成失败:{str(e)}")
  113. return None
  114. #数据清洗逻
  115. def traverse_and_process(root_path, process_func, province_name="henan", year=None):
  116. """
  117. 通用分层遍历函数,支持不同年份范围的处理
  118. Args:
  119. root_path (str): 根目录路径(如 downloads)
  120. process_func (function): 每个省份自己的 parse_excel 函数
  121. province_name (str): 省份名称,如 "henan", "shandong", "fujian"
  122. year (int, optional): 指定截止年份(包含该年份及之后的所有年份)
  123. """
  124. root = Path(root_path)
  125. current_year = datetime.now().year
  126. # 检查根目录是否存在
  127. if not root.exists() or not root.is_dir():
  128. log.error(f"根目录不存在或不是目录: {root}")
  129. return
  130. log.info(f"开始遍历 {province_name} 目录:{root_path}")
  131. # 获取所有年份目录
  132. year_dirs = []
  133. for item in root.iterdir():
  134. if item.is_dir() and YEAR_PATTERN.match(item.name):
  135. try:
  136. year_int = int(item.name)
  137. year_dirs.append({"year": year_int, "path": item})
  138. except ValueError:
  139. continue
  140. if not year_dirs:
  141. log.warning(f"未找到任何年份目录,跳过处理: {root}")
  142. return
  143. # 按年份倒序排序
  144. year_dirs.sort(key=lambda x: x["year"], reverse=True)
  145. # 模式1: year=None,只处理最新年份的最新月份
  146. if year is None:
  147. log.info(f"模式:只处理最新年份的最新月份")
  148. # 取最新年份目录
  149. latest_year_dir = year_dirs[0]["path"]
  150. log.info(f"最新年份:{latest_year_dir.name}")
  151. # 处理该年份的最新月份
  152. process_latest_month(latest_year_dir, process_func, province_name)
  153. return
  154. # 模式2: year!=None,处理从当前年到指定年份的所有年份的所有月份
  155. if year > current_year:
  156. log.warning(f"警告:指定年份 {year} 大于当前年份 {current_year}, 将仅处理当前年")
  157. year = current_year
  158. log.info(f"模式:处理从当前年({current_year})到指定年({year})的所有月份(倒序)")
  159. # 筛选年份范围:从当前年到指定年份
  160. selected_years = [yd for yd in year_dirs if year <= yd["year"] <= current_year]
  161. if not selected_years:
  162. log.warning(f"没有找到在范围 [{year}-{current_year}] 内的年份目录")
  163. return
  164. # 按年份倒序处理所有月份
  165. for yd in selected_years:
  166. process_all_months(yd["path"], process_func, province_name)
  167. def process_latest_month(year_dir, process_func, province_name):
  168. """处理单个年份目录的最新月份"""
  169. log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
  170. # 获取所有月份目录
  171. month_dirs = []
  172. for item in year_dir.iterdir():
  173. if item.is_dir() and MONTH_PATTERN.match(item.name):
  174. try:
  175. month = int(item.name)
  176. month_dirs.append({"month": month, "path": item})
  177. except ValueError:
  178. continue
  179. if not month_dirs:
  180. log.info(f" {year_dir} 下没有月份目录,跳过")
  181. return
  182. # 按月倒序排序
  183. month_dirs.sort(key=lambda x: x["month"], reverse=True)
  184. # 取最新月份
  185. latest_month = month_dirs[0]
  186. month_name = f"{latest_month['month']:02d}"
  187. log.info(f" 处理最新月份:{month_name} | 路径:{latest_month['path']}")
  188. process_func(latest_month["path"])
  189. def process_all_months(year_dir, process_func, province_name):
  190. """处理单个年份目录的所有月份(倒序)"""
  191. log.info(f"\n年份:{year_dir.name} | 省份:{province_name}")
  192. # 获取所有月份目录
  193. month_dirs = []
  194. for item in year_dir.iterdir():
  195. if item.is_dir() and MONTH_PATTERN.match(item.name):
  196. try:
  197. month = int(item.name)
  198. month_dirs.append({"month": month, "path": item})
  199. except ValueError:
  200. continue
  201. if not month_dirs:
  202. log.info(f" {year_dir} 下没有月份目录,跳过")
  203. return
  204. # 按月倒序排序
  205. month_dirs.sort(key=lambda x: x["month"], reverse=True)
  206. # 处理所有月份
  207. for md in month_dirs:
  208. month_name = f"{md['month']:02d}"
  209. log.info(f" 月份:{month_name} | 路径:{md['path']}")
  210. process_func(md['path'])