shandong_parse_excel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import re
  2. from pathlib import Path
  3. import pandas as pd
  4. from db_helper import DBHelper
  5. from quanguo.CountryTrade import COUNTRY_CODE_MAPPING
  6. from utils.constants import DOWNLOAD_DIR
  7. from utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, \
  8. extract_year_month_from_path, get_previous_month_dir, find_unmatched_countries, traverse_and_process
  9. # 常量配置
  10. PROV_CODE = "370000"
  11. PROV_NAME = "山东省"
  12. SHANDONG_CITY = {
  13. "济南": "370100", "青岛": "370200", "淄博": "370300", "枣庄": "370400",
  14. "东营": "370500", "烟台": "370600", "潍坊": "370700", "济宁": "370800",
  15. "泰安": "370900", "威海": "371000", "日照": "371100", "临沂": "371300",
  16. "德州": "371400", "聊城": "371500", "滨州": "371600", "菏泽": "371700"
  17. }
  18. download_dir = DOWNLOAD_DIR / "shandong"
  19. YEAR_PATTERN = re.compile(r"^\d{4}$")
  20. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  21. def parse_excel(current_dir):
  22. """主解析入口(优化为单参数模式)
  23. Args:
  24. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  25. """
  26. current_path = Path(current_dir)
  27. year, month = extract_year_month_from_path(current_path)
  28. try:
  29. # 动态获取前月目录
  30. prev_dir = get_previous_month_dir(current_path) if month != 1 else None
  31. # 处理商品贸易数据
  32. process_combined_trade(current_path, year, month, prev_dir)
  33. # 处理地市贸易数据
  34. current_file_path = next(current_path.glob("*地市*"), None)
  35. prev_file_path = next(Path(prev_dir).glob("*地市*"), None) if prev_dir else None
  36. if current_file_path:
  37. process_region_trade(current_file_path, prev_file_path, year, month)
  38. # 处理国别贸易数据(保持原有逻辑结构)
  39. country_file = next(current_path.glob("*国别*"), None)
  40. prev_country_file = next(Path(prev_dir).glob("*国别*"), None) if prev_dir else None
  41. if country_file:
  42. process_country_trade(country_file, prev_country_file, year, month)
  43. print(f"{current_dir}数据已全部成功处理")
  44. except Exception as e:
  45. print(f"处理失败:{current_dir},错误:{str(e)}")
  46. raise
  47. def process_combined_trade(current_dir, year, month, previous_dir=None):
  48. """处理合并商品贸易数据(增强1月逻辑)"""
  49. import_file = next(current_dir.glob("*进口20位主要商品总值*"), None)
  50. export_file = next(current_dir.glob("*出口20位主要商品总值*"), None)
  51. if not (import_file and export_file):
  52. raise FileNotFoundError("缺少进口或出口文件")
  53. # 读取当前月数据(保持原有逻辑)
  54. current_data = read_trade_pair(import_file, export_file)
  55. # 处理历史数据
  56. prev_data = pd.DataFrame()
  57. if previous_dir and month != 1:
  58. prev_import = next(Path(previous_dir).glob("*进口20位主要商品总值*"), None)
  59. prev_export = next(Path(previous_dir).glob("*出口20位主要商品总值*"), None)
  60. if prev_import and prev_export:
  61. prev_data = read_trade_pair(prev_import, prev_export)
  62. # 计算逻辑优化
  63. merged_data = current_data if month == 1 else calculate_monthly_values(current_data, prev_data)
  64. # 保留原有数据库交互逻辑
  65. db = DBHelper()
  66. merged_data['commodity_code'] = merged_data['commodity_name'].apply(db.get_commodity_id)
  67. valid_data = merged_data[merged_data['commodity_code'].notnull()].copy()
  68. # 构建入库数据(保持原有字段结构)
  69. valid_data['crossborder_year'] = year
  70. valid_data['crossborder_year_month'] = f"{year}-{month:02d}"
  71. valid_data['prov_code'] = PROV_CODE
  72. valid_data['prov_name'] = PROV_NAME
  73. valid_data['monthly_total'] = valid_data['monthly_import'] + valid_data['monthly_export']
  74. # 入库逻辑保持不变
  75. target_cols = [
  76. 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name',
  77. 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export'
  78. ]
  79. db.bulk_insert(
  80. valid_data[target_cols],
  81. 't_yujin_crossborder_prov_commodity_trade',
  82. conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
  83. update_columns=['monthly_total', 'monthly_import', 'monthly_export']
  84. )
  85. def process_region_trade(current_file_path, prev_file_path, year, month):
  86. """处理地市贸易数据(增强1月逻辑)"""
  87. # 读取当前数据
  88. current_df = pd.read_excel(
  89. current_file_path,
  90. skipfooter=1,
  91. header=4,
  92. names=['city_name', 'monthly_total', 'yoy_import_export',
  93. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
  94. )
  95. # 1月特殊处理
  96. if month == 1:
  97. df = current_df[['city_name', 'monthly_total',
  98. 'monthly_export', 'monthly_import']].copy()
  99. df['yoy_import_export'] = current_df['yoy_import_export']
  100. df['yoy_export'] = current_df['yoy_export']
  101. df['yoy_import'] = current_df['yoy_import']
  102. else:
  103. prev_df = pd.read_excel(
  104. prev_file_path,
  105. skipfooter=1,
  106. header=4,
  107. names=['city_name', 'monthly_total', 'yoy_import_export',
  108. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
  109. ) if prev_file_path else pd.DataFrame()
  110. merged_df = pd.merge(
  111. current_df,
  112. prev_df,
  113. on='city_name',
  114. suffixes=('_current', '_prev')
  115. )
  116. df = pd.DataFrame({
  117. 'city_name': merged_df['city_name'],
  118. 'monthly_total': merged_df['monthly_total_current'] - merged_df['monthly_total_prev'],
  119. 'yoy_import_export': merged_df['yoy_import_export_current'],
  120. 'monthly_export': merged_df['monthly_export_current'] - merged_df['monthly_export_prev'],
  121. 'yoy_export': merged_df['yoy_export_current'],
  122. 'monthly_import': merged_df['monthly_import_current'] - merged_df['monthly_import_prev'],
  123. 'yoy_import': merged_df['yoy_import_current']
  124. })
  125. # 保留原有处理逻辑
  126. df['city_code'] = df['city_name'].map(SHANDONG_CITY)
  127. df['crossborder_year'] = year
  128. df['crossborder_year_month'] = f"{year}-{month:02d}"
  129. df['prov_code'] = PROV_CODE
  130. df['prov_name'] = PROV_NAME
  131. # 单位转换
  132. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  133. df[col] = df[col].apply(convert_wan_to_yuan)
  134. db = DBHelper()
  135. db.bulk_insert(
  136. df,
  137. 't_yujin_crossborder_prov_region_trade',
  138. conflict_columns=['crossborder_year_month', 'city_code'],
  139. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  140. 'yoy_import_export', 'yoy_import', 'yoy_export']
  141. )
  142. def process_country_trade(current_file_path, prev_file_path, year, month):
  143. """处理国别贸易数据(增强1月逻辑)"""
  144. # 读取当前数据
  145. current_df = read_with_header4(current_file_path)
  146. current_df = current_df[~current_df['country_name'].str.contains('注:', na=False)]
  147. current_df = current_df.dropna(subset=['country_name'])
  148. current_df = current_df[current_df['country_name'].str.strip() != '']
  149. # 1月特殊处理
  150. if month == 1:
  151. final_df = current_df.copy()
  152. final_df[['monthly_total', 'monthly_export', 'monthly_import']] = \
  153. current_df[['monthly_total', 'monthly_export', 'monthly_import']]
  154. else:
  155. prev_df = read_with_header4(prev_file_path)
  156. prev_df = prev_df[~prev_df['country_name'].str.contains('注:', na=False)]
  157. prev_df = prev_df.dropna(subset=['country_name'])
  158. prev_df = prev_df[prev_df['country_name'].str.strip() != '']
  159. merged_df = pd.merge(
  160. current_df,
  161. prev_df,
  162. on='country_name',
  163. suffixes=('_current', '_prev'),
  164. how='inner'
  165. )
  166. merged_df['monthly_total'] = merged_df['monthly_total_current'] - merged_df['monthly_total_prev']
  167. merged_df['monthly_export'] = merged_df['monthly_export_current'] - merged_df['monthly_export_prev']
  168. merged_df['monthly_import'] = merged_df['monthly_import_current'] - merged_df['monthly_import_prev']
  169. merged_df['yoy_import_export'] = merged_df['yoy_import_export_current']
  170. merged_df['yoy_export'] = merged_df['yoy_export_current']
  171. merged_df['yoy_import'] = merged_df['yoy_import_current']
  172. final_df = merged_df[[
  173. 'country_name','monthly_total', 'monthly_import', 'monthly_export',
  174. 'yoy_import_export', 'yoy_import', 'yoy_export'
  175. ]]
  176. # 排除特殊国家(新增过滤逻辑)
  177. final_df = final_df[
  178. ~final_df['country_name'].str.contains('东盟|欧盟', na=False, regex=True)
  179. ]
  180. final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING)
  181. find_unmatched_countries(final_df)
  182. final_df['crossborder_year'] = year
  183. final_df['crossborder_year_month'] = f"{year}-{month:02d}"
  184. final_df['prov_code'] = PROV_CODE
  185. final_df['prov_name'] = PROV_NAME
  186. # 单位转换
  187. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  188. final_df[col] = final_df[col].apply(convert_wan_to_yuan)
  189. db = DBHelper()
  190. db.bulk_insert(
  191. final_df,
  192. 't_yujin_crossborder_prov_country_trade',
  193. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  194. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  195. 'yoy_import_export', 'yoy_import', 'yoy_export']
  196. )
  197. def read_with_header4(file_path):
  198. # 第一阶段:读取原始数据(固定列范围)
  199. raw_df = pd.read_excel(
  200. file_path,
  201. usecols="A:G", # 强制读取前7列
  202. header=None, # 禁用自动表头识别
  203. skipfooter=1
  204. )
  205. # 第二阶段:计算列偏移量
  206. if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空
  207. col_offset = 2 # 从第三列开始(A3起始)
  208. else:
  209. col_offset = 0 # 默认从第一列开始(A1起始)
  210. # 第三阶段:应用header=4逻辑
  211. header_row = 4 # 保持原有header行位置
  212. data_start_row = header_row + 1 # 数据起始行
  213. # 重新读取有效数据
  214. final_df = pd.read_excel(
  215. file_path,
  216. usecols=raw_df.columns[col_offset:col_offset + 7], # 动态列范围
  217. header=header_row,
  218. skipfooter=1
  219. )
  220. # 第四阶段:强制列名对齐
  221. final_df.columns = [
  222. 'country_name', 'monthly_total', 'yoy_import_export',
  223. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
  224. ]
  225. # 清洗国家名称
  226. final_df['country_name'] = final_df['country_name'].apply(clean_county_name)
  227. return final_df
  228. def read_trade_pair(import_path, export_path):
  229. """进/出口表格合并"""
  230. df_import = pd.read_excel(import_path, skiprows=3, skipfooter=1,
  231. usecols=[0, 1], names=["commodity_name", "monthly_import"]).pipe(lambda df: df.assign(
  232. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  233. ))
  234. df_export = pd.read_excel(export_path, skiprows=3, skipfooter=1,
  235. usecols=[0, 1], names=["commodity_name", "monthly_export"]).pipe(lambda df: df.assign(
  236. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  237. ))
  238. merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(0)
  239. merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
  240. merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
  241. return merged
  242. def calculate_monthly_values(current_data, prev_data):
  243. """根据上个月进出口数据计算当月数据"""
  244. merged = pd.merge(current_data, prev_data, on="commodity_name",
  245. how="left", suffixes=("_current", "_prev")).fillna(0)
  246. merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"]
  247. merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"]
  248. return merged[["commodity_name", "monthly_import", "monthly_export"]]
  249. if __name__ == "__main__":
  250. # traverse_and_process(download_dir, parse_excel, province_name="shandong")
  251. print("\n山东省地级市数据同比更新中...")
  252. db_helper = DBHelper()
  253. db_helper.update_shandong_yoy()