import re from decimal import Decimal, InvalidOperation import xlrd import pymysql from datetime import datetime # 数据库配置(需根据实际情况修改) DB_CONFIG = { 'host': '10.130.75.149', 'port': 3307, 'user': 'yto_crm', 'password': '%3sFUlsolaRI', 'database': 'crm_uat', 'charset': 'utf8mb4' } def convert_unit(value): """亿元转万元,处理空值""" try: # 如果 value 不是特殊的无效值,进行转换并保留4位小数 return round(Decimal(value) * 10000, 4) if value not in ['-', ''] else None except (InvalidOperation, ValueError): # 捕获异常,返回 None return None def get_upsert_sql(): """生成带覆盖更新的SQL模板""" return """ INSERT INTO `t_yujin_crossborder_monthly_summary` (`year_month`, `monthly_total`, `monthly_import`, `monthly_export`, `trade_balance`, `ytd_total`, `ytd_import`, `ytd_export`, `ytd_trade_balance`, `create_time`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE monthly_total = VALUES(monthly_total), monthly_import = VALUES(monthly_import), monthly_export = VALUES(monthly_export), trade_balance = VALUES(trade_balance), ytd_total = VALUES(ytd_total), ytd_import = VALUES(ytd_import), ytd_export = VALUES(ytd_export), ytd_trade_balance = VALUES(ytd_trade_balance), create_time = VALUES(create_time) """ def main(): # 连接数据库[2,5](@ref) try: conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() except pymysql.Error as e: print(f"数据库连接失败: {e}") return # 读取Excel文件 try: workbook = xlrd.open_workbook('../src/downloads/20250513/2025011810224811354 (1).xls') sheet = workbook.sheet_by_index(0) except Exception as e: print(f"文件读取失败: {e}") conn.close() return sql = get_upsert_sql() current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') processed = 0 # 数据遍历(从第6行开始) for row_idx in range(5, sheet.nrows): try: base_row = sheet.row_values(row_idx) date_cell = str(base_row[1]) # 验证2023年数据格式[6](@ref) if not re.match(r"^2023\.\d{2}$", date_cell): continue # 数据转换 params = ( base_row[1].replace('.', '-'), # year_month convert_unit(base_row[2]), # monthly_total convert_unit(base_row[4]), # monthly_import convert_unit(base_row[3]), # monthly_export convert_unit(base_row[5]), # trade_balance convert_unit(base_row[6]), # ytd_total convert_unit(base_row[7]), # ytd_import convert_unit(base_row[8]), # ytd_export convert_unit(base_row[9]), # ytd_balance current_time # create_time ) # 执行覆盖插入[6,7](@ref) cursor.execute(sql, params) processed += 1 except Exception as e: print(f"处理行{row_idx}出错: {str(e)}") conn.rollback() # 提交事务 try: conn.commit() print(f"成功处理 {processed} 条数据,时间:{current_time}") except pymysql.Error as e: print(f"事务提交失败: {e}") finally: cursor.close() conn.close() if __name__ == "__main__": main()