henan_parse_excel.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import re
  2. from pathlib import Path
  3. import pandas as pd
  4. from utils.db_helper import DBHelper
  5. from utils.constants import COUNTRY_CODE_MAPPING, EXCLUDE_REGIONS, DOWNLOAD_DIR
  6. from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, find_unmatched_countries, \
  7. extract_year_month_from_path, traverse_and_process
  8. # 常量配置(新增路径正则校验)
  9. PROV_CODE = "410000"
  10. PROV_NAME = "河南省"
  11. YEAR_PATTERN = re.compile(r"^\d{4}$")
  12. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  13. download_dir = DOWNLOAD_DIR / "henan"
  14. def parse_excel(current_dir):
  15. """主解析入口(优化为单参数模式)
  16. Args:
  17. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  18. """
  19. current_path = Path(current_dir)
  20. year, month = extract_year_month_from_path(current_path)
  21. try:
  22. # 处理商品贸易数据
  23. process_combined_trade(current_path, year, month)
  24. # 处理国别贸易数据(保持原有逻辑结构)
  25. country_file = next(current_path.glob("*国别*"), None)
  26. process_country_trade(country_file, year, month)
  27. print(f"{current_dir}数据已全部成功处理")
  28. except Exception as e:
  29. print(f"处理失败:{current_dir},错误:{str(e)}")
  30. raise
  31. def process_combined_trade(current_dir, year, month):
  32. """处理合并商品贸易数据(支持1月数据模拟)"""
  33. import_file = next(current_dir.glob("*进口主要商品量值表*"), None)
  34. export_file = next(current_dir.glob("*出口主要商品量值表*"), None)
  35. if not (import_file and export_file):
  36. raise FileNotFoundError("缺少进口或出口文件")
  37. # 读取当前月数据
  38. current_data = read_trade_pair(import_file, export_file)
  39. db = DBHelper()
  40. current_data['commodity_code'] = current_data['commodity_name'].apply(db.get_commodity_id)
  41. valid_data = current_data[current_data['commodity_code'].notnull()].copy()
  42. # 构建当前月数据
  43. valid_data['crossborder_year'] = year
  44. valid_data['crossborder_year_month'] = f"{year}-{month:02d}"
  45. valid_data['prov_code'] = PROV_CODE
  46. valid_data['prov_name'] = PROV_NAME
  47. valid_data['monthly_total'] = valid_data['monthly_import'] + valid_data['monthly_export']
  48. # 定义目标字段
  49. target_cols = [
  50. 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name',
  51. 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export'
  52. ]
  53. # 写入当前月数据
  54. db.bulk_insert(
  55. valid_data[target_cols],
  56. 't_yujin_crossborder_prov_commodity_trade',
  57. conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
  58. update_columns=['monthly_total', 'monthly_import', 'monthly_export']
  59. )
  60. # 当处理2月数据时,生成模拟1月数据
  61. if month == 2:
  62. # 克隆当前数据并调整月份
  63. january_data = valid_data.copy()
  64. january_data['crossborder_year_month'] = f"{year}-01"
  65. # 数值处理:月指标除以2(模拟1-2月均值)
  66. numeric_cols = ['monthly_total', 'monthly_import', 'monthly_export']
  67. january_data[numeric_cols] = january_data[numeric_cols] / 2
  68. # 写入模拟1月数据
  69. db.bulk_insert(
  70. january_data[target_cols],
  71. 't_yujin_crossborder_prov_commodity_trade',
  72. conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
  73. update_columns=numeric_cols # 仅更新数值字段
  74. )
  75. def process_country_trade(current_file_path, year, month):
  76. """处理国别贸易数据(支持1月数据模拟)"""
  77. # 读取原始数据
  78. final_df = read_with_header4(current_file_path, month)
  79. # 数据清洗:剔除指定区域
  80. final_df = final_df[
  81. ~final_df['country_name'].isin(EXCLUDE_REGIONS) &
  82. ~final_df['country_name'].str.contains(r'[((]地区[))]', regex=True) # 修正正则表达式
  83. ]
  84. # 生成基础字段
  85. final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING)
  86. find_unmatched_countries(final_df)
  87. # 过滤掉没有匹配到 country_code 的行
  88. final_df = final_df[final_df['country_code'].notnull()].copy()
  89. final_df['crossborder_year'] = year
  90. final_df['crossborder_year_month'] = f"{year}-{month:02d}"
  91. final_df['prov_code'] = PROV_CODE
  92. final_df['prov_name'] = PROV_NAME
  93. # 主数据写入
  94. db = DBHelper()
  95. db.bulk_insert(
  96. final_df,
  97. 't_yujin_crossborder_prov_country_trade',
  98. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  99. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  100. 'yoy_import_export', 'yoy_import', 'yoy_export']
  101. )
  102. # 当处理2月数据时,生成模拟1月数据
  103. if month == 2:
  104. # 克隆数据并调整月份
  105. january_df = final_df.copy()
  106. january_df['crossborder_year_month'] = f"{year}-01"
  107. # 数值处理:月指标除以2,同比指标清零
  108. numeric_cols = ['monthly_total', 'monthly_import', 'monthly_export']
  109. january_df[numeric_cols] = january_df[numeric_cols] / 2 # 均摊为1-2月均值
  110. yoy_cols = ['yoy_import_export', 'yoy_import', 'yoy_export']
  111. january_df[yoy_cols] = 0.0 # 模拟数据无同比
  112. # 模拟数据写入(增加注释说明)
  113. db.bulk_insert(
  114. january_df,
  115. 't_yujin_crossborder_prov_country_trade',
  116. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  117. update_columns=numeric_cols + yoy_cols # 仅更新数值字段
  118. )
  119. def read_with_header4(file_path, month):
  120. # 第一阶段:读取原始数据(固定列范围)
  121. # 2月份数据和其他月份表格数据不同
  122. if month == 2:
  123. target_cols = [0, 1, 2, 3, 4, 5, 6]
  124. else:
  125. raw_df = pd.read_excel(
  126. file_path,
  127. usecols="A:K", # 强制读取前11列(A到K)
  128. header=None, # 禁用自动表头识别
  129. skiprows=5,
  130. skipfooter=1
  131. )
  132. # 第二阶段:计算列偏移量
  133. if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空
  134. col_offset = 2 # 从第三列开始(A3起始)
  135. else:
  136. col_offset = 0 # 默认从第一列开始(A1起始)
  137. # 第三阶段:确定目标列索引(基于偏移后的位置)
  138. target_cols = [0 + col_offset, 1 + col_offset, 2 + col_offset, 5 + col_offset, 6 + col_offset, 9 + col_offset,
  139. 10 + col_offset]
  140. # 第四阶段:应用header=4逻辑并选择目标列
  141. final_df = pd.read_excel(
  142. file_path,
  143. usecols=target_cols, # 动态选择的目标列
  144. header=4, # 保持原有header行位置
  145. skipfooter=1
  146. )
  147. # 第五阶段:强制列名对齐
  148. final_df.columns = [
  149. 'country_name', 'monthly_total', 'yoy_import_export',
  150. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
  151. ]
  152. # 清洗国家名称
  153. final_df['country_name'] = final_df['country_name'].apply(clean_county_name)
  154. # 替换 "--" 为 0,并转换为 float 类型
  155. yoy_columns = ['yoy_import_export', 'yoy_export', 'yoy_import']
  156. # 优化后的稳健类型转换方案
  157. final_df[yoy_columns] = (
  158. final_df[yoy_columns]
  159. # 阶段1:清理非常规占位符
  160. .replace({
  161. '--': None, # 处理横杠
  162. '': None, # 处理空字符串
  163. 'N/A': None, # 处理英文占位符
  164. '不详': None # 处理中文占位符
  165. })
  166. # 阶段2:安全类型转换
  167. .apply(pd.to_numeric, errors='coerce', downcast='float')
  168. # 阶段3:空值处理
  169. .fillna(0)
  170. # 阶段4:精度控制
  171. .round(2)
  172. )
  173. return final_df
  174. # 进出口数据合并为一张表
  175. def read_trade_pair(import_path, export_path):
  176. """进出口合并,读取第一列和第4列"""
  177. df_import = pd.read_excel(
  178. import_path,
  179. skiprows=3,
  180. skipfooter=1,
  181. usecols=[0, 4],
  182. names=["commodity_name", "monthly_import"],
  183. header=None
  184. ).pipe(lambda df: df.assign(
  185. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  186. ))
  187. df_export = pd.read_excel(
  188. export_path,
  189. skiprows=3,
  190. skipfooter=1,
  191. usecols=[0, 4],
  192. names=["commodity_name", "monthly_export"],
  193. header=None
  194. ).pipe(lambda df: df.assign(
  195. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  196. ))
  197. merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(0)
  198. merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
  199. merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
  200. return merged
  201. def calculate_monthly_values(current_data, prev_data):
  202. """"""
  203. merged = pd.merge(current_data, prev_data, on="commodity_name",
  204. how="left", suffixes=("_current", "_prev")).fillna(0)
  205. merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"]
  206. merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"]
  207. return merged[["commodity_name", "monthly_import", "monthly_export"]]
  208. # def clean_commodity_name(name):
  209. # return re.sub(r'[^\w\u4e00-\u9fa5]', '', str(name)).strip()
  210. if __name__ == "__main__":
  211. traverse_and_process(download_dir, parse_excel, province_name="henan")