import os from decimal import Decimal import pandas as pd import pymysql from pymysql import Error from utils.constants import COUNTRY_CODE_MAPPING YEAR = 2023 DB_CONFIG = { 'host': '10.130.75.149', 'port': 3307, 'user': 'yto_crm', 'password': '%3sFUlsolaRI', 'database': 'crm_uat', 'charset': 'utf8mb4' } # 在代码中添加例外处理 def get_country_code(chinese_name): """带异常处理的国家编码查询""" code = COUNTRY_CODE_MAPPING.get(chinese_name.strip(), None) # 特殊处理逻辑 if not code: if chinese_name.endswith("(地区)"): return "N/A" # 标记为地区 elif "国家联盟" in chinese_name: return "ORG" # 标记为国际组织 return code def parse_value(val): """增强型数值解析(含科学计数法处理),保留四位小数""" if val in ('-', None, 'None', 'null'): return None try: # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题 if 'E' in str(val).upper(): return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数 return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数 except Exception as e: print(f"数值解析错误:{val},错误:{e}") return None def parse_ratio(value): """处理百分比数据""" return value if value not in ['-', ''] else None def batch_upsert(conn, file_path, year_month): """批量插入/更新数据""" try: # 读取Excel文件 df = pd.read_excel(file_path, engine='xlrd', header=None, dtype=str) except Exception as e: raise ValueError(f"文件读取失败:{str(e)}") # 定位数据起始行(根据报表结构调整) data_start = None for idx, row in df.iterrows(): if "进出口商品国别" in str(row[1]): data_start = idx + 6 # 根据实际表格结构调整 break if not data_start: raise ValueError("无法定位数据起始行") # 读取数据区域 df_data = pd.read_excel(file_path, engine='xlrd', skiprows=data_start, header=None, usecols=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 根据实际列索引调整 # 准备批量插入数据 params = [] for _, row in df_data.iterrows(): country_name = str(row[1]).strip() # 过滤地区和组织 if not country_name or country_name in EXCLUDE_REGIONS: continue # 获取国家代码 country_code = get_country_code(country_name) if not country_code or country_code in ['N/A', 'ORG', 'XX']: continue # 数据转换(根据实际列索引调整) values = ( year_month, country_code, country_name, parse_value(row[2]), # monthly_total parse_value(row[4]), # monthly_export parse_value(row[6]), # monthly_import parse_value(row[3]), # ytd_total parse_value(row[5]), # ytd_export parse_value(row[7]), # ytd_import parse_ratio(row[8]), # yoy_import_export parse_ratio(row[9]), # yoy_export parse_ratio(row[10]) # yoy_import ) params.append(values) # 构建SQL模板(使用ON DUPLICATE KEY UPDATE)[9,10](@ref) sql = """ INSERT INTO t_yujin_crossborder_country_trade (`year_month`, country_code, country_name, \ monthly_total, monthly_export, monthly_import, \ ytd_total, ytd_export, ytd_import, \ yoy_import_export, yoy_export, yoy_import) \ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \ UPDATE \ monthly_total = \ VALUES (monthly_total), monthly_import = \ VALUES (monthly_import), monthly_export = \ VALUES (monthly_export), ytd_total = \ VALUES (ytd_total), ytd_import = \ VALUES (ytd_import), ytd_export = \ VALUES (ytd_export), yoy_import_export = \ VALUES (yoy_import_export), yoy_import = \ VALUES (yoy_import), yoy_export = \ VALUES (yoy_export) \ """ # 执行批量操作[1,4](@ref) try: with conn.cursor() as cursor: cursor.executemany(sql, params) return cursor.rowcount except Error as e: conn.rollback() raise RuntimeError(f"数据库操作失败:{str(e)}") def main(): """主执行流程""" conn = None try: # 建立数据库连接 conn = pymysql.connect(**DB_CONFIG) # 按月份顺序处理 for month in range(1, 13): # 构建文件路径 folder = f"downloads/{YEAR}/{month:02d}月" file_name = f"(2){YEAR}年进出口商品国别(地区)总值表.xls" file_path = os.path.join(folder, file_name) if not os.path.isfile(file_path): print(f"⚠️ 文件不存在:{file_path}") continue # 生成年月标识 year_month = f"{YEAR}-{month:02d}" print(f"⌛ 正在处理 {year_month} 数据...") try: count = batch_upsert(conn, file_path, year_month) conn.commit() print(f"✅ 成功更新 {year_month},影响 {count} 条记录") except Exception as e: print(f"❌ {year_month} 处理失败:{str(e)}") conn.rollback() except Error as e: print(f"数据库连接失败:{str(e)}") finally: if conn and conn.open: conn.close() print("🏁 所有月份处理完成") if __name__ == "__main__": main()