fujian_parse_excel.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. import re
  2. from pathlib import Path
  3. import pandas as pd
  4. from crossborder.utils.db_helper import DBHelper
  5. from crossborder.utils.constants import DOWNLOAD_DIR
  6. from crossborder.utils.parse_utils import convert_wan_to_yuan, extract_year_month_from_path, traverse_and_process
  7. FUJIAN_CITY = {
  8. "福州市": "350100",
  9. "厦门市": "350200",
  10. "莆田市": "350300",
  11. "三明市": "350400",
  12. "泉州市": "350500",
  13. "漳州市": "350600",
  14. "南平市": "350700",
  15. "宁德市": "350900",
  16. "龙岩市": "350800",
  17. "平潭地区": "350128"
  18. }
  19. # 常量配置(新增路径正则校验)
  20. PROV_CODE = "350000"
  21. PROV_NAME = "福建省"
  22. YEAR_PATTERN = re.compile(r"^\d{4}$")
  23. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  24. download_dir = DOWNLOAD_DIR / "fujian"
  25. def parse_excel(current_dir):
  26. """主解析入口(优化为单参数模式)
  27. Args:
  28. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  29. """
  30. current_path = Path(current_dir)
  31. year, month = extract_year_month_from_path(current_path)
  32. try:
  33. # 处理商品贸易数据
  34. current_file_path = next(current_path.glob("*分地市*"), None)
  35. process_region_trade(current_file_path, year, month)
  36. print(f"{current_dir}数据已全部成功处理")
  37. except Exception as e:
  38. print(f"处理失败:{current_dir},错误:{str(e)}")
  39. raise
  40. def process_region_trade(current_file_path, year, month):
  41. """处理地市贸易数据(增强1月逻辑 + 多sheet处理)"""
  42. # 动态选择列配置
  43. usecols = (
  44. list(range(7))
  45. if (year == 2023 and month <= 5)
  46. else [0, 3, 4, 7, 8, 11, 12]
  47. )
  48. #2023年5月之前的表格数据,单月数据在第二个sheet页
  49. sheet_index = 1 if (year == 2023 and month <= 5) else 0
  50. # 读取并处理主数据表
  51. current_df = load_and_process_data(
  52. current_file_path, year, month, usecols , sheet_index
  53. )
  54. # 数据库写入
  55. db = DBHelper()
  56. bulk_insert_data(
  57. db, current_df,
  58. conflict_cols=['crossborder_year_month', 'city_code'],
  59. update_cols=[
  60. 'monthly_total', 'monthly_import', 'monthly_export',
  61. 'yoy_import_export', 'yoy_import', 'yoy_export'
  62. ]
  63. )
  64. # 二月特殊处理逻辑
  65. if month == 2:
  66. print(f"根据2月表格生成{year}年1月数据...")
  67. handle_february_special_case(db, current_file_path, year, usecols)
  68. def load_and_process_data(file_path, year, month, usecols, sheet_index = 0):
  69. """通用数据加载处理流程"""
  70. df = pd.read_excel(
  71. file_path,
  72. header=4,
  73. sheet_name=sheet_index,
  74. usecols=usecols,
  75. names=[
  76. 'city_name', 'monthly_total', 'yoy_import_export',
  77. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
  78. ]
  79. )
  80. # 数据清洗流程
  81. df = (
  82. df.pipe(clean_city_names)
  83. .pipe(map_city_codes)
  84. .pipe(add_metadata, year, month)
  85. .pipe(convert_units)
  86. )
  87. return df
  88. def handle_february_special_case(db, file_path, year, usecols):
  89. """二月数据处理特殊逻辑"""
  90. try:
  91. if year == 2023:
  92. process_2023_february(db, file_path, usecols)
  93. else:
  94. process_regular_february(db, file_path, year)
  95. except Exception as e:
  96. print(f"生成模拟1月数据失败: {str(e)}")
  97. # ---------- 工具函数 ----------
  98. def clean_city_names(df):
  99. """清洗城市名称"""
  100. df['city_name'] = (
  101. df['city_name']
  102. .str.replace(r'[(].*?[)]', '', regex=True)
  103. .str.strip()
  104. )
  105. return df
  106. def map_city_codes(df):
  107. """映射城市编码"""
  108. df['city_code'] = df['city_name'].map(FUJIAN_CITY)
  109. return df[df['city_code'].notnull()].copy()
  110. def add_metadata(df, year, month):
  111. """添加元数据字段"""
  112. return df.assign(
  113. crossborder_year=year,
  114. crossborder_year_month=f"{year}-{month:02d}",
  115. prov_code=PROV_CODE,
  116. prov_name=PROV_NAME
  117. )
  118. def convert_units(df):
  119. """单位转换(万→元)"""
  120. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  121. df[col] = df[col].apply(convert_wan_to_yuan)
  122. return df
  123. def bulk_insert_data(db, df, conflict_cols, update_cols):
  124. """批量数据插入"""
  125. db.bulk_insert(
  126. df,
  127. 't_yujin_crossborder_prov_region_trade',
  128. conflict_columns=conflict_cols,
  129. update_columns=update_cols
  130. )
  131. # ---------- 二月特殊处理逻辑 ----------
  132. def process_2023_february(db, file_path, usecols):
  133. """2023年特殊处理逻辑"""
  134. # 读取双sheet数据
  135. ytd_df = load_sheet_data(file_path, sheet_index=0, usecols=usecols)
  136. current_df = load_sheet_data(file_path, sheet_index=1, usecols=usecols)
  137. # 合并计算差值
  138. merged = ytd_df.merge(
  139. current_df,
  140. on='city_code',
  141. suffixes=('_ytd', '_current')
  142. )
  143. # 生成一月数据
  144. january_df = create_january_data(merged, year=2023)
  145. bulk_insert_data(
  146. db, january_df,
  147. conflict_cols=['crossborder_year_month', 'city_code'],
  148. update_cols=[
  149. 'monthly_total', 'monthly_import', 'monthly_export',
  150. 'yoy_import_export', 'yoy_import', 'yoy_export'
  151. ]
  152. )
  153. def process_regular_february(db, file_path, year):
  154. """常规年份二月处理"""
  155. df = pd.read_excel(
  156. file_path,
  157. header=4,
  158. usecols=[0, 1, 3, 4, 5, 7, 8, 9, 11, 12],
  159. names=[
  160. 'city_name', 'ytd_monthly_total', 'monthly_total', 'yoy_import_export',
  161. 'ytd_monthly_export', 'monthly_export', 'yoy_export',
  162. 'ytd_monthly_import', 'monthly_import', 'yoy_import'
  163. ]
  164. )
  165. # 完整处理流程
  166. processed_df = (
  167. df.pipe(clean_city_names)
  168. .pipe(map_city_codes)
  169. .pipe(convert_special_units)
  170. .pipe(calculate_january_values)
  171. .pipe(add_metadata, year=year, month=1)
  172. )
  173. bulk_insert_data(
  174. db, processed_df,
  175. conflict_cols=['crossborder_year_month', 'city_code'],
  176. update_cols=[
  177. 'monthly_total', 'monthly_import', 'monthly_export',
  178. 'yoy_import_export', 'yoy_import', 'yoy_export'
  179. ]
  180. )
  181. def load_sheet_data(file_path, sheet_index, usecols):
  182. """加载指定sheet数据"""
  183. df = pd.read_excel(
  184. file_path,
  185. sheet_name=sheet_index,
  186. header=4,
  187. usecols=usecols,
  188. names=[
  189. 'city_name', 'monthly_total', 'yoy_import_export',
  190. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
  191. ]
  192. )
  193. return (
  194. df.pipe(clean_city_names)
  195. .pipe(map_city_codes)
  196. .pipe(convert_units)
  197. )
  198. def create_january_data(merged_df, year):
  199. """生成一月数据(精确控制输出列)"""
  200. return (
  201. merged_df
  202. # 步骤1:计算新字段
  203. .assign(
  204. monthly_total=lambda x: x['monthly_total_ytd'] - x['monthly_total_current'],
  205. monthly_export=lambda x: x['monthly_export_ytd'] - x['monthly_export_current'],
  206. monthly_import=lambda x: x['monthly_import_ytd'] - x['monthly_import_current'],
  207. yoy_import_export=0.0,
  208. yoy_export=0.0,
  209. yoy_import=0.0,
  210. crossborder_year_month=f"{year}-01",
  211. city_name=lambda x: x['city_name_current']
  212. )
  213. # 步骤2:精确选择输出列(关键修复)
  214. .reindex(columns=[
  215. 'city_code', 'city_name',
  216. 'monthly_total', 'monthly_export', 'monthly_import',
  217. 'yoy_import_export', 'yoy_export', 'yoy_import',
  218. 'crossborder_year_month'
  219. ])
  220. # 步骤3:合并元数据(确保字段完整)
  221. .assign(
  222. crossborder_year=year,
  223. prov_code=PROV_CODE,
  224. prov_name=PROV_NAME
  225. )
  226. )
  227. def convert_special_units(df):
  228. """特殊单位转换"""
  229. for col in [
  230. 'ytd_monthly_total', 'monthly_total',
  231. 'ytd_monthly_export', 'monthly_export',
  232. 'ytd_monthly_import', 'monthly_import'
  233. ]:
  234. df[col] = df[col].apply(convert_wan_to_yuan)
  235. return df
  236. def calculate_january_values(df):
  237. """计算一月数值"""
  238. return df.assign(
  239. monthly_total=lambda x: x['ytd_monthly_total'] - x['monthly_total'],
  240. monthly_export=lambda x: x['ytd_monthly_export'] - x['monthly_export'],
  241. monthly_import=lambda x: x['ytd_monthly_import'] - x['monthly_import'],
  242. yoy_import_export=0.0,
  243. yoy_export=0.0,
  244. yoy_import=0.0
  245. ).drop(columns=[
  246. 'ytd_monthly_total', 'ytd_monthly_export',
  247. 'ytd_monthly_import'
  248. ])
  249. # def clean_commodity_name(name):
  250. # return re.sub(r'[^\w\u4e00-\u9fa5]', '', str(name)).strip()
  251. if __name__ == "__main__":
  252. traverse_and_process(download_dir, parse_excel, province_name="fujian")
  253. print("更新同比数据……")
  254. db_helper = DBHelper()
  255. db_helper.update_january_yoy()
  256. # parse_excel(download_dir/"2023"/"02")