12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- from datetime import datetime
- import xlrd
- from crossborder.utils.db_helper import DBHelper
- from crossborder.utils.log import get_logger
- log = get_logger(__name__)
- from crossborder.utils.parse_utils import convert_unit, parse_ratio
- _parse_executed = False # 模块级变量,控制执行次数
- def get_upsert_sql():
- """使用命名占位符并正确使用VALUES函数的SQL"""
- return """
- INSERT INTO t_yujin_crossborder_yearly_summary
- (year, year_total, year_import, year_export, trade_balance,
- yoy_import_export, yoy_import, yoy_export, create_time)
- VALUES (:year, :year_total, :year_import, :year_export, :trade_balance, \
- :yoy_import_export, :yoy_import, :yoy_export, :create_time) ON DUPLICATE KEY \
- UPDATE \
- year_total = \
- VALUES (year_total), year_import = \
- VALUES (year_import), year_export = \
- VALUES (year_export), trade_balance = \
- VALUES (trade_balance), yoy_import_export = \
- VALUES (yoy_import_export), yoy_import = \
- VALUES (yoy_import), yoy_export = \
- VALUES (yoy_export), create_time = \
- VALUES (create_time) \
- """
- def parse_year_table_excel(file):
- global _parse_executed
- if _parse_executed:
- log.info("⚠️ parse_year_table_excel 已执行过,不再重复执行")
- return
- db_helper = DBHelper()
- current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 读取Excel文件
- try:
- workbook = xlrd.open_workbook(file)
- sheet = workbook.sheet_by_index(0)
- except Exception as e:
- log.error(f"文件读取失败: {e}")
- return
- sql = get_upsert_sql()
- params_list = []
- for row_idx in range(5, sheet.nrows):
- row = sheet.row_values(row_idx)
- if not row[1]: # 跳过空年份
- continue
- # 准备数据 - 使用字典
- param_dict = {
- "year": row[1],
- "year_total": convert_unit(row[2]),
- "year_import": convert_unit(row[4]),
- "year_export": convert_unit(row[3]),
- "trade_balance": convert_unit(row[5]),
- "yoy_import_export": parse_ratio(row[6]),
- "yoy_import": parse_ratio(row[7]),
- "yoy_export": parse_ratio(row[8]),
- "create_time": current_time
- }
- params_list.append(param_dict)
- # 使用 DBHelper 执行 SQL 插入
- try:
- affected_rows = db_helper.execute_sql_with_params(sql, params_list)
- log.info(f"成功处理 {len(params_list)} 条数据,受影响行数:{affected_rows}")
- _parse_executed = True
- except Exception as e:
- log.error(f"数据库操作失败: {e}")
- raise
- if __name__ == "__main__":
- parse_year_table_excel(r'D:\pythonSpace\crossborder\downloads\total\2025\04\(1)2025年进出口商品总值表 A-年度表.xls')
|