parse_year_excel.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from datetime import datetime
  2. import xlrd
  3. from crossborder.utils.db_helper import DBHelper
  4. from crossborder.utils.log import get_logger
  5. log = get_logger(__name__)
  6. from crossborder.utils.parse_utils import convert_unit, parse_ratio
  7. _parse_executed = False # 模块级变量,控制执行次数
  8. def get_upsert_sql():
  9. """使用命名占位符并正确使用VALUES函数的SQL"""
  10. return """
  11. INSERT INTO t_yujin_crossborder_yearly_summary
  12. (year, year_total, year_import, year_export, trade_balance,
  13. yoy_import_export, yoy_import, yoy_export, create_time)
  14. VALUES (:year, :year_total, :year_import, :year_export, :trade_balance, \
  15. :yoy_import_export, :yoy_import, :yoy_export, :create_time) ON DUPLICATE KEY \
  16. UPDATE \
  17. year_total = \
  18. VALUES (year_total), year_import = \
  19. VALUES (year_import), year_export = \
  20. VALUES (year_export), trade_balance = \
  21. VALUES (trade_balance), yoy_import_export = \
  22. VALUES (yoy_import_export), yoy_import = \
  23. VALUES (yoy_import), yoy_export = \
  24. VALUES (yoy_export), create_time = \
  25. VALUES (create_time) \
  26. """
  27. def parse_year_table_excel(file):
  28. global _parse_executed
  29. if _parse_executed:
  30. log.info("⚠️ parse_year_table_excel 已执行过,不再重复执行")
  31. return
  32. db_helper = DBHelper()
  33. current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  34. # 读取Excel文件
  35. try:
  36. workbook = xlrd.open_workbook(file)
  37. sheet = workbook.sheet_by_index(0)
  38. except Exception as e:
  39. log.error(f"文件读取失败: {e}")
  40. return
  41. sql = get_upsert_sql()
  42. params_list = []
  43. for row_idx in range(5, sheet.nrows):
  44. row = sheet.row_values(row_idx)
  45. if not row[1]: # 跳过空年份
  46. continue
  47. # 准备数据 - 使用字典
  48. param_dict = {
  49. "year": row[1],
  50. "year_total": convert_unit(row[2]),
  51. "year_import": convert_unit(row[4]),
  52. "year_export": convert_unit(row[3]),
  53. "trade_balance": convert_unit(row[5]),
  54. "yoy_import_export": parse_ratio(row[6]),
  55. "yoy_import": parse_ratio(row[7]),
  56. "yoy_export": parse_ratio(row[8]),
  57. "create_time": current_time
  58. }
  59. params_list.append(param_dict)
  60. # 使用 DBHelper 执行 SQL 插入
  61. try:
  62. affected_rows = db_helper.execute_sql_with_params(sql, params_list)
  63. log.info(f"成功处理 {len(params_list)} 条数据,受影响行数:{affected_rows}")
  64. _parse_executed = True
  65. except Exception as e:
  66. log.error(f"数据库操作失败: {e}")
  67. raise
  68. if __name__ == "__main__":
  69. parse_year_table_excel(r'D:\pythonSpace\crossborder\downloads\total\2025\04\(1)2025年进出口商品总值表 A-年度表.xls')