shandong_parse_excel.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. import argparse
  2. import re
  3. from pathlib import Path
  4. import numpy as np
  5. import pandas as pd
  6. from crossborder.utils.db_helper import DBHelper
  7. from crossborder.utils.constants import DOWNLOAD_DIR, COUNTRY_CODE_MAPPING
  8. from crossborder.utils.log import get_logger
  9. log = get_logger(__name__)
  10. from crossborder.utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, \
  11. extract_year_month_from_path, get_previous_month_dir, find_unmatched_countries, traverse_and_process
  12. # 常量配置
  13. PROV_CODE = "370000"
  14. PROV_NAME = "山东省"
  15. SHANDONG_CITY = {
  16. "济南": "370100", "青岛": "370200", "淄博": "370300", "枣庄": "370400",
  17. "东营": "370500", "烟台": "370600", "潍坊": "370700", "济宁": "370800",
  18. "泰安": "370900", "威海": "371000", "日照": "371100", "临沂": "371300",
  19. "德州": "371400", "聊城": "371500", "滨州": "371600", "菏泽": "371700"
  20. }
  21. download_dir = DOWNLOAD_DIR / "shandong"
  22. YEAR_PATTERN = re.compile(r"^\d{4}$")
  23. MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
  24. def parse_excel(current_dir):
  25. """主解析入口(优化为单参数模式)
  26. Args:
  27. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  28. """
  29. current_path = Path(current_dir)
  30. year, month = extract_year_month_from_path(current_path)
  31. try:
  32. # 动态获取前月目录
  33. prev_dir = get_previous_month_dir(current_path) if month != 1 else None
  34. # 处理商品贸易数据
  35. process_combined_trade(current_path, year, month, prev_dir)
  36. # 处理地市贸易数据
  37. current_file_path = next(current_path.glob("*地市*"), None)
  38. prev_file_path = next(Path(prev_dir).glob("*地市*"), None) if prev_dir else None
  39. if current_file_path:
  40. process_region_trade(current_file_path, prev_file_path, year, month)
  41. # 处理国别贸易数据(保持原有逻辑结构)
  42. country_file = next(current_path.glob("*国别*"), None)
  43. prev_country_file = next(Path(prev_dir).glob("*国别*"), None) if prev_dir else None
  44. if country_file:
  45. process_country_trade(country_file, prev_country_file, year, month)
  46. log.info(f"{current_dir}数据已全部成功处理")
  47. except Exception as e:
  48. log.error(f"处理失败:{current_dir},错误:{str(e)}")
  49. raise
  50. def process_combined_trade(current_dir, year, month, previous_dir=None):
  51. """处理合并商品贸易数据(增强1月逻辑)"""
  52. import_file = next(current_dir.glob("*进口20位主要商品总值*"), None)
  53. export_file = next(current_dir.glob("*出口20位主要商品总值*"), None)
  54. if not (import_file and export_file):
  55. raise FileNotFoundError("缺少进口或出口文件")
  56. # 读取当前月数据(保持原有逻辑)
  57. current_data = read_trade_pair(import_file, export_file)
  58. # 处理历史数据
  59. prev_data = pd.DataFrame()
  60. if previous_dir and month != 1:
  61. prev_import = next(Path(previous_dir).glob("*进口20位主要商品总值*"), None)
  62. prev_export = next(Path(previous_dir).glob("*出口20位主要商品总值*"), None)
  63. if prev_import and prev_export:
  64. prev_data = read_trade_pair(prev_import, prev_export)
  65. # 计算逻辑优化
  66. merged_data = current_data if month == 1 else calculate_monthly_values(current_data, prev_data)
  67. # 保留原有数据库交互逻辑
  68. db = DBHelper()
  69. merged_data['commodity_code'] = merged_data['commodity_name'].apply(db.get_commodity_id)
  70. valid_data = merged_data[merged_data['commodity_code'].notnull()].copy()
  71. # 构建入库数据(保持原有字段结构)
  72. valid_data['crossborder_year'] = year
  73. valid_data['crossborder_year_month'] = f"{year}-{month:02d}"
  74. valid_data['prov_code'] = PROV_CODE
  75. valid_data['prov_name'] = PROV_NAME
  76. #当 monthly_import 和 monthly_export 中只有一个有值时,monthly_total 取不为空的那个值,
  77. # 而两者都有值时相加
  78. valid_data['monthly_total'] = valid_data['monthly_import'].fillna(0) + valid_data['monthly_export'].fillna(0)
  79. valid_data['monthly_total'] = valid_data['monthly_total'].replace(0, np.nan)
  80. valid_data = valid_data.replace({np.nan: None})
  81. # 入库逻辑保持不变
  82. target_cols = [
  83. 'crossborder_year', 'crossborder_year_month', 'prov_code', 'prov_name',
  84. 'commodity_code', 'commodity_name', 'monthly_total', 'monthly_import', 'monthly_export'
  85. ]
  86. db.bulk_insert(
  87. valid_data[target_cols],
  88. 't_yujin_crossborder_prov_commodity_trade',
  89. conflict_columns=['crossborder_year_month', 'prov_code', 'commodity_code'],
  90. update_columns=['monthly_total', 'monthly_import', 'monthly_export']
  91. )
  92. def process_region_trade(current_file_path, prev_file_path, year, month):
  93. """处理地市贸易数据(增强1月逻辑)"""
  94. # 读取当前数据
  95. current_df = pd.read_excel(
  96. current_file_path,
  97. skipfooter=1,
  98. header=4,
  99. names=['city_name', 'monthly_total', 'yoy_import_export',
  100. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
  101. )
  102. # 1月特殊处理
  103. if month == 1:
  104. df = current_df[['city_name', 'monthly_total',
  105. 'monthly_export', 'monthly_import']].copy()
  106. df['yoy_import_export'] = current_df['yoy_import_export']
  107. df['yoy_export'] = current_df['yoy_export']
  108. df['yoy_import'] = current_df['yoy_import']
  109. else:
  110. prev_df = pd.read_excel(
  111. prev_file_path,
  112. skipfooter=1,
  113. header=4,
  114. names=['city_name', 'monthly_total', 'yoy_import_export',
  115. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import']
  116. ) if prev_file_path else pd.DataFrame()
  117. merged_df = pd.merge(
  118. current_df,
  119. prev_df,
  120. on='city_name',
  121. suffixes=('_current', '_prev')
  122. )
  123. df = pd.DataFrame({
  124. 'city_name': merged_df['city_name'],
  125. 'monthly_total': merged_df['monthly_total_current'] - merged_df['monthly_total_prev'],
  126. 'yoy_import_export': merged_df['yoy_import_export_current'],
  127. 'monthly_export': merged_df['monthly_export_current'] - merged_df['monthly_export_prev'],
  128. 'yoy_export': merged_df['yoy_export_current'],
  129. 'monthly_import': merged_df['monthly_import_current'] - merged_df['monthly_import_prev'],
  130. 'yoy_import': merged_df['yoy_import_current']
  131. })
  132. # 保留原有处理逻辑
  133. df['city_code'] = df['city_name'].map(SHANDONG_CITY)
  134. df['crossborder_year'] = year
  135. df['crossborder_year_month'] = f"{year}-{month:02d}"
  136. df['prov_code'] = PROV_CODE
  137. df['prov_name'] = PROV_NAME
  138. # 单位转换
  139. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  140. df[col] = df[col].apply(convert_wan_to_yuan)
  141. db = DBHelper()
  142. df = df.replace({np.nan: None})
  143. db.bulk_insert(
  144. df,
  145. 't_yujin_crossborder_prov_region_trade',
  146. conflict_columns=['crossborder_year_month', 'city_code'],
  147. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  148. 'yoy_import_export', 'yoy_import', 'yoy_export']
  149. )
  150. def process_country_trade(current_file_path, prev_file_path, year, month):
  151. """处理国别贸易数据(增强1月逻辑)"""
  152. # 读取当前数据
  153. current_df = read_with_header4(current_file_path)
  154. current_df = current_df[~current_df['country_name'].str.contains('注:', na=False)]
  155. current_df = current_df.dropna(subset=['country_name'])
  156. current_df = current_df[current_df['country_name'].str.strip() != '']
  157. # 1月特殊处理
  158. if month == 1:
  159. final_df = current_df.copy()
  160. final_df[['monthly_total', 'monthly_export', 'monthly_import']] = \
  161. current_df[['monthly_total', 'monthly_export', 'monthly_import']]
  162. else:
  163. prev_df = read_with_header4(prev_file_path)
  164. prev_df = prev_df[~prev_df['country_name'].str.contains('注:', na=False)]
  165. prev_df = prev_df.dropna(subset=['country_name'])
  166. prev_df = prev_df[prev_df['country_name'].str.strip() != '']
  167. merged_df = pd.merge(
  168. current_df,
  169. prev_df,
  170. on='country_name',
  171. suffixes=('_current', '_prev'),
  172. how='inner'
  173. )
  174. merged_df['monthly_total'] = merged_df['monthly_total_current'] - merged_df['monthly_total_prev']
  175. merged_df['monthly_export'] = merged_df['monthly_export_current'] - merged_df['monthly_export_prev']
  176. merged_df['monthly_import'] = merged_df['monthly_import_current'] - merged_df['monthly_import_prev']
  177. merged_df['yoy_import_export'] = merged_df['yoy_import_export_current']
  178. merged_df['yoy_export'] = merged_df['yoy_export_current']
  179. merged_df['yoy_import'] = merged_df['yoy_import_current']
  180. final_df = merged_df[[
  181. 'country_name','monthly_total', 'monthly_import', 'monthly_export',
  182. 'yoy_import_export', 'yoy_import', 'yoy_export'
  183. ]]
  184. # 排除特殊国家(新增过滤逻辑)
  185. final_df = final_df[
  186. ~final_df['country_name'].str.contains('东盟|欧盟', na=False, regex=True)
  187. ]
  188. final_df['country_code'] = final_df['country_name'].map(COUNTRY_CODE_MAPPING)
  189. find_unmatched_countries(final_df)
  190. final_df['crossborder_year'] = year
  191. final_df['crossborder_year_month'] = f"{year}-{month:02d}"
  192. final_df['prov_code'] = PROV_CODE
  193. final_df['prov_name'] = PROV_NAME
  194. # 单位转换
  195. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  196. final_df[col] = final_df[col].apply(convert_wan_to_yuan)
  197. final_df = final_df.replace({np.nan: None})
  198. db = DBHelper()
  199. db.bulk_insert(
  200. final_df,
  201. 't_yujin_crossborder_prov_country_trade',
  202. conflict_columns=['crossborder_year_month', 'prov_code', 'country_code'],
  203. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  204. 'yoy_import_export', 'yoy_import', 'yoy_export']
  205. )
  206. def read_with_header4(file_path):
  207. # 第一阶段:读取原始数据(固定列范围)
  208. raw_df = pd.read_excel(
  209. file_path,
  210. usecols="A:G", # 强制读取前7列
  211. header=None, # 禁用自动表头识别
  212. skipfooter=1
  213. )
  214. # 第二阶段:计算列偏移量
  215. if raw_df.iloc[:, 0:2].isnull().all().all(): # 前两列全为空
  216. col_offset = 2 # 从第三列开始(A3起始)
  217. else:
  218. col_offset = 0 # 默认从第一列开始(A1起始)
  219. # 第三阶段:应用header=4逻辑
  220. header_row = 4 # 保持原有header行位置
  221. data_start_row = header_row + 1 # 数据起始行
  222. # 重新读取有效数据
  223. final_df = pd.read_excel(
  224. file_path,
  225. usecols=raw_df.columns[col_offset:col_offset + 7], # 动态列范围
  226. header=header_row,
  227. skipfooter=1
  228. )
  229. # 第四阶段:强制列名对齐
  230. final_df.columns = [
  231. 'country_name', 'monthly_total', 'yoy_import_export',
  232. 'monthly_export', 'yoy_export', 'monthly_import', 'yoy_import'
  233. ]
  234. # 清洗国家名称
  235. final_df['country_name'] = final_df['country_name'].apply(clean_county_name)
  236. return final_df
  237. def read_trade_pair(import_path, export_path):
  238. """进/出口表格合并"""
  239. df_import = pd.read_excel(import_path, skiprows=3, skipfooter=1,
  240. usecols=[0, 1], names=["commodity_name", "monthly_import"]).pipe(lambda df: df.assign(
  241. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  242. ))
  243. df_export = pd.read_excel(export_path, skiprows=3, skipfooter=1,
  244. usecols=[0, 1], names=["commodity_name", "monthly_export"]).pipe(lambda df: df.assign(
  245. commodity_name=df["commodity_name"].apply(clean_commodity_name)
  246. ))
  247. merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(pd.NA)
  248. merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
  249. merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
  250. return merged
  251. def calculate_monthly_values(current_data, prev_data):
  252. """根据上个月进出口数据计算当月数据"""
  253. merged = pd.merge(current_data, prev_data, on="commodity_name",
  254. how="left", suffixes=("_current", "_prev")).fillna(pd.NA)
  255. merged["monthly_import"] = merged["monthly_import_current"] - merged["monthly_import_prev"]
  256. merged["monthly_export"] = merged["monthly_export_current"] - merged["monthly_export_prev"]
  257. return merged[["commodity_name", "monthly_import", "monthly_export"]]
  258. if __name__ == "__main__":
  259. parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
  260. parser.add_argument('--year', type=int, default=None,
  261. help='终止年份(如2023),未指定时清洗最新一个月数据')
  262. args = parser.parse_args()
  263. traverse_and_process(download_dir, parse_excel, province_name="shandong", year=args.year)
  264. log.info("\n山东省地级市数据同比更新中...")
  265. db_helper = DBHelper()
  266. db_helper.update_prov_yoy("山东省")