shandong_parse_excel.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. import re
  2. from pathlib import Path
  3. import numpy as np
  4. import pandas as pd
  5. from utils.db_helper import DBHelper
  6. from quanguo.CountryTrade import COUNTRY_CODE_MAPPING
  7. from utils.constants import DOWNLOAD_DIR
  8. from utils.log import log
  9. from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, \
  10. extract_year_month_from_path, get_previous_month_dir, find_unmatched_countries, traverse_and_process
  11. # 常量配置
  12. PROV_CODE = "370000"
  13. PROV_NAME = "山东省"
  14. SHANDONG_CITY = {
  15. "济南": "370100", "青岛": "370200", "淄博": "370300", "枣庄": "370400",
  16. "东营": "370500", "烟台": "370600", "潍坊": "370700", "济宁": "370800",
  17. "泰安": "370900", "威海": "371000", "日照": "371100", "临沂": "371300",
  18. "德州": "371400", "聊城": "371500", "滨州": "371600", "菏泽": "371700"
  19. }
  20. download_dir = DOWNLOAD_DIR / "shandong"
  21. YEAR_PATTERN = re.compile(r"^\d{4}$")
  22. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  23. def parse_excel(current_dir):
  24. """主解析入口(优化为单参数模式)
  25. Args:
  26. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  27. """
  28. current_path = Path(current_dir)
  29. year, month = extract_year_month_from_path(current_path)
  30. try:
  31. # 动态获取前月目录
  32. prev_dir = get_previous_month_dir(current_path) if month != 1 else None
  33. # 处理商品贸易数据
  34. process_combined_trade(current_path, year, month, prev_dir)
  35. # 处理地市贸易数据
  36. current_file_path = next(current_path.glob("*地市*"), None)
  37. prev_file_path = next(Path(prev_dir).glob("*地市*"), None) if prev_dir else None
  38. if current_file_path:
  39. process_region_trade(current_file_path, prev_file_path, year, month)
  40. # 处理国别贸易数据(保持原有逻辑结构)
  41. country_file = next(current_path.glob("*国别*"), None)
  42. prev_country_file = next(Path(prev_dir).glob("*国别*"), None) if prev_dir else None
  43. if country_file:
  44. process_country_trade(country_file, prev_country_file, year, month)
  45. log.info(f"{current_dir}数据已全部成功处理")
  46. except Exception as e:
  47. log.error(f"处理失败:{current_dir},错误:{str(e)}")
  48. raise
  49. def process_combined_trade(current_dir, year, month, previous_dir=None):
  50. """处理合并商品贸易数据(增强1月逻辑)"""
  51. import_file = next(current_dir.glob("*进口20位主要商品总值*"), None)
  52. export_file = next(current_dir.glob("*出口20位主要商品总值*"), None)
  53. if not (import_file and export_file):
  54. raise FileNotFoundError("缺少进口或出口文件")
  55. # 读取当前月数据(保持原有逻辑)
  56. current_data = read_trade_pair(import_file, export_file)
  57. # 处理历史数据
  58. prev_data = pd.DataFrame()
  59. if previous_dir and month != 1:
  60. prev_import = next(Path(previous_dir).glob("*进口20位主要商品总值*"), None)
  61. prev_export = next(Path(previous_dir).glob("*出口20位主要商品总值*"), None)
  62. if prev_import and prev_export:
  63. prev_data = read_trade_pair(prev_import, prev_export)
  64. # 计算逻辑优化
  65. merged_data = current_data if month == 1 else calculate_monthly_values(current_data, prev_data)
  66. # 保留原有数据库交互逻辑
  67. db = DBHelper()
  68. merged_data['commodity_code'] = merged_data['commodity_name'].apply(db.get_commodity_id)
  69. valid_data = merged_data[merged_data['commodity_code'].notnull()].copy()
  70. # 构建入库数据(保持原有字段结构)
  71. valid_data['crossborder_year'] = year
  72. valid_data['crossborder_year_month'] = f"{year}-{month:02d}"
  73. valid_data['prov_code'] = PROV_CODE
  74. valid_data['prov_name'] = PROV_NAME
  75. #当 monthly_import 和 monthly_export 中只有一个有值时,monthly_total 取不为空的那个值,
  76. # 而两者都有值时相加
  77. valid_data['monthly_total'] = valid_data['monthly_import'].fillna(0) + valid_data['monthly_export'].fillna(0)
  78. valid_data['monthly_total'] = valid_data['monthly_total'].replace(0, np.nan)
  79. valid_data = valid_data.replace({np.nan: None})
  80. # 入库逻辑保持不变
  81. target_cols = [
  82. 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name',
  83. 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export'
  84. ]
  85. db.bulk_insert(
  86. valid_data[target_cols],
  87. 't_yujin_crossborder_prov_commodity_trade',
  88. conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
  89. update_columns=['monthly_total', 'monthly_import', 'monthly_export']
  90. )
  91. def process_region_trade(current_file_path, prev_file_path, year, month):
  92. """处理地市贸易数据(增强1月逻辑)"""
  93. # 读取当前数据
  94. current_df = pd.read_excel(
  95. current_file_path,
  96. skipfooter=1,
  97. header=4,
  98. names=['city_name', 'monthly_total', 'yoy_import_export',
  99. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
  100. )
  101. # 1月特殊处理
  102. if month == 1:
  103. df = current_df[['city_name', 'monthly_total',
  104. 'monthly_export', 'monthly_import']].copy()
  105. df['yoy_import_export'] = current_df['yoy_import_export']
  106. df['yoy_export'] = current_df['yoy_export']
  107. df['yoy_import'] = current_df['yoy_import']
  108. else:
  109. prev_df = pd.read_excel(
  110. prev_file_path,
  111. skipfooter=1,
  112. header=4,
  113. names=['city_name', 'monthly_total', 'yoy_import_export',
  114. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
  115. ) if prev_file_path else pd.DataFrame()
  116. merged_df = pd.merge(
  117. current_df,
  118. prev_df,
  119. on='city_name',
  120. suffixes=('_current', '_prev')
  121. )
  122. df = pd.DataFrame({
  123. 'city_name': merged_df['city_name'],
  124. 'monthly_total': merged_df['monthly_total_current'] - merged_df['monthly_total_prev'],
  125. 'yoy_import_export': merged_df['yoy_import_export_current'],
  126. 'monthly_export': merged_df['monthly_export_current'] - merged_df['monthly_export_prev'],
  127. 'yoy_export': merged_df['yoy_export_current'],
  128. 'monthly_import': merged_df['monthly_import_current'] - merged_df['monthly_import_prev'],
  129. 'yoy_import': merged_df['yoy_import_current']
  130. })
  131. # 保留原有处理逻辑
  132. df['city_code'] = df['city_name'].map(SHANDONG_CITY)
  133. df['crossborder_year'] = year
  134. df['crossborder_year_month'] = f"{year}-{month:02d}"
  135. df['prov_code'] = PROV_CODE
  136. df['prov_name'] = PROV_NAME
  137. # 单位转换
  138. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  139. df[col] = df[col].apply(convert_wan_to_yuan)
  140. db = DBHelper()
  141. df = df.replace({np.nan: None})
  142. db.bulk_insert(
  143. df,
  144. 't_yujin_crossborder_prov_region_trade',
  145. conflict_columns=['crossborder_year_month', 'city_code'],
  146. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  147. 'yoy_import_export', 'yoy_import', 'yoy_export']
  148. )
  149. def process_country_trade(current_file_path, prev_file_path, year, month):
  150. """处理国别贸易数据(增强1月逻辑)"""
  151. # 读取当前数据
  152. current_df = read_with_header4(current_file_path)
  153. current_df = current_df[~current_df['country_name'].str.contains('注:', na=False)]
  154. current_df = current_df.dropna(subset=['country_name'])
  155. current_df = current_df[current_df['country_name'].str.strip() != '']
  156. # 1月特殊处理
  157. if month == 1:
  158. final_df = current_df.copy()
  159. final_df[['monthly_total', 'monthly_export', 'monthly_import']] = \
  160. current_df[['monthly_total', 'monthly_export', 'monthly_import']]
  161. else:
  162. prev_df = read_with_header4(prev_file_path)
  163. prev_df = prev_df[~prev_df['country_name'].str.contains('注:', na=False)]
  164. prev_df = prev_df.dropna(subset=['country_name'])
  165. prev_df = prev_df[prev_df['country_name'].str.strip() != '']
  166. merged_df = pd.merge(
  167. current_df,
  168. prev_df,
  169. on='country_name',
  170. suffixes=('_current', '_prev'),
  171. how='inner'
  172. )
  173. merged_df['monthly_total'] = merged_df['monthly_total_current'] - merged_df['monthly_total_prev']
  174. merged_df['monthly_export'] = merged_df['monthly_export_current'] - merged_df['monthly_export_prev']
  175. merged_df['monthly_import'] = merged_df['monthly_import_current'] - merged_df['monthly_import_prev']
  176. merged_df['yoy_import_export'] = merged_df['yoy_import_export_current']
  177. merged_df['yoy_export'] = merged_df['yoy_export_current']
  178. merged_df['yoy_import'] = merged_df['yoy_import_current']
  179. final_df = merged_df[[
  180. 'country_name','monthly_total', 'monthly_import', 'monthly_export',
  181. 'yoy_import_export', 'yoy_import', 'yoy_export'
  182. ]]
  183. # 排除特殊国家(新增过滤逻辑)
  184. final_df = final_df[
  185. ~final_df['country_name'].str.contains('东盟|欧盟', na=False, regex=True)
  186. ]
  187. final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING)
  188. find_unmatched_countries(final_df)
  189. final_df['crossborder_year'] = year
  190. final_df['crossborder_year_month'] = f"{year}-{month:02d}"
  191. final_df['prov_code'] = PROV_CODE
  192. final_df['prov_name'] = PROV_NAME
  193. # 单位转换
  194. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  195. final_df[col] = final_df[col].apply(convert_wan_to_yuan)
  196. final_df = final_df.replace({np.nan: None})
  197. db = DBHelper()
  198. db.bulk_insert(
  199. final_df,
  200. 't_yujin_crossborder_prov_country_trade',
  201. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  202. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  203. 'yoy_import_export', 'yoy_import', 'yoy_export']
  204. )
  205. def read_with_header4(file_path):
  206. # 第一阶段:读取原始数据(固定列范围)
  207. raw_df = pd.read_excel(
  208. file_path,
  209. usecols="A:G", # 强制读取前7列
  210. header=None, # 禁用自动表头识别
  211. skipfooter=1
  212. )
  213. # 第二阶段:计算列偏移量
  214. if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空
  215. col_offset = 2 # 从第三列开始(A3起始)
  216. else:
  217. col_offset = 0 # 默认从第一列开始(A1起始)
  218. # 第三阶段:应用header=4逻辑
  219. header_row = 4 # 保持原有header行位置
  220. data_start_row = header_row + 1 # 数据起始行
  221. # 重新读取有效数据
  222. final_df = pd.read_excel(
  223. file_path,
  224. usecols=raw_df.columns[col_offset:col_offset + 7], # 动态列范围
  225. header=header_row,
  226. skipfooter=1
  227. )
  228. # 第四阶段:强制列名对齐
  229. final_df.columns = [
  230. 'country_name', 'monthly_total', 'yoy_import_export',
  231. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
  232. ]
  233. # 清洗国家名称
  234. final_df['country_name'] = final_df['country_name'].apply(clean_county_name)
  235. return final_df
  236. def read_trade_pair(import_path, export_path):
  237. """进/出口表格合并"""
  238. df_import = pd.read_excel(import_path, skiprows=3, skipfooter=1,
  239. usecols=[0, 1], names=["commodity_name", "monthly_import"]).pipe(lambda df: df.assign(
  240. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  241. ))
  242. df_export = pd.read_excel(export_path, skiprows=3, skipfooter=1,
  243. usecols=[0, 1], names=["commodity_name", "monthly_export"]).pipe(lambda df: df.assign(
  244. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  245. ))
  246. merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(pd.NA)
  247. merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
  248. merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
  249. return merged
  250. def calculate_monthly_values(current_data, prev_data):
  251. """根据上个月进出口数据计算当月数据"""
  252. merged = pd.merge(current_data, prev_data, on="commodity_name",
  253. how="left", suffixes=("_current", "_prev")).fillna(pd.NA)
  254. merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"]
  255. merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"]
  256. return merged[["commodity_name", "monthly_import", "monthly_export"]]
  257. if __name__ == "__main__":
  258. traverse_and_process(download_dir, parse_excel, province_name="shandong")
  259. log.info("\n山东省地级市数据同比更新中...")
  260. db_helper = DBHelper()
  261. db_helper.update_shandong_yoy()