parse_year_excel.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from datetime import datetime
  2. import xlrd
  3. from crossborder.utils.db_helper import DBHelper
  4. from crossborder.utils.log import log
  5. from crossborder.utils.parse_utils import convert_unit, parse_ratio
  6. _parse_executed = False # 模块级变量,控制执行次数
  7. def get_upsert_sql():
  8. """使用命名占位符并正确使用VALUES函数的SQL"""
  9. return """
  10. INSERT INTO t_yujin_crossborder_yearly_summary
  11. (year, year_total, year_import, year_export, trade_balance,
  12. yoy_import_export, yoy_import, yoy_export, create_time)
  13. VALUES (:year, :year_total, :year_import, :year_export, :trade_balance, \
  14. :yoy_import_export, :yoy_import, :yoy_export, :create_time) ON DUPLICATE KEY \
  15. UPDATE \
  16. year_total = \
  17. VALUES (year_total), year_import = \
  18. VALUES (year_import), year_export = \
  19. VALUES (year_export), trade_balance = \
  20. VALUES (trade_balance), yoy_import_export = \
  21. VALUES (yoy_import_export), yoy_import = \
  22. VALUES (yoy_import), yoy_export = \
  23. VALUES (yoy_export), create_time = \
  24. VALUES (create_time) \
  25. """
  26. def parse_year_table_excel(file):
  27. global _parse_executed
  28. if _parse_executed:
  29. log.info("⚠️ parse_year_table_excel 已执行过,不再重复执行")
  30. return
  31. db_helper = DBHelper()
  32. current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  33. # 读取Excel文件
  34. try:
  35. workbook = xlrd.open_workbook(file)
  36. sheet = workbook.sheet_by_index(0)
  37. except Exception as e:
  38. log.error(f"文件读取失败: {e}")
  39. return
  40. sql = get_upsert_sql()
  41. params_list = []
  42. for row_idx in range(5, sheet.nrows):
  43. row = sheet.row_values(row_idx)
  44. if not row[1]: # 跳过空年份
  45. continue
  46. # 准备数据 - 使用字典
  47. param_dict = {
  48. "year": row[1],
  49. "year_total": convert_unit(row[2]),
  50. "year_import": convert_unit(row[4]),
  51. "year_export": convert_unit(row[3]),
  52. "trade_balance": convert_unit(row[5]),
  53. "yoy_import_export": parse_ratio(row[6]),
  54. "yoy_import": parse_ratio(row[7]),
  55. "yoy_export": parse_ratio(row[8]),
  56. "create_time": current_time
  57. }
  58. params_list.append(param_dict)
  59. # 使用 DBHelper 执行 SQL 插入
  60. try:
  61. affected_rows = db_helper.execute_sql_with_params(sql, params_list)
  62. log.info(f"成功处理 {len(params_list)} 条数据,受影响行数:{affected_rows}")
  63. _parse_executed = True
  64. except Exception as e:
  65. log.error(f"数据库操作失败: {e}")
  66. raise
  67. if __name__ == "__main__":
  68. parse_year_table_excel(r'D:\pythonSpace\crossborder\downloads\total\2025\04\(1)2025年进出口商品总值表 A-年度表.xls')