123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import os
- from decimal import Decimal
- import pandas as pd
- import pymysql
- from pymysql import Error
- from utils.constants import COUNTRY_CODE_MAPPING
- YEAR = 2023
- DB_CONFIG = {
- 'host': '10.130.75.149',
- 'port': 3307,
- 'user': 'yto_crm',
- 'password': '%3sFUlsolaRI',
- 'database': 'crm_uat',
- 'charset': 'utf8mb4'
- }
- # 在代码中添加例外处理
- def get_country_code(chinese_name):
- """带异常处理的国家编码查询"""
- code = COUNTRY_CODE_MAPPING.get(chinese_name.strip(), None)
- # 特殊处理逻辑
- if not code:
- if chinese_name.endswith("(地区)"):
- return "N/A" # 标记为地区
- elif "国家联盟" in chinese_name:
- return "ORG" # 标记为国际组织
- return code
- def parse_value(val):
- """增强型数值解析(含科学计数法处理),保留四位小数"""
- if val in ('-', None, 'None', 'null'):
- return None
- try:
- # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题
- if 'E' in str(val).upper():
- return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数
- return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数
- except Exception as e:
- print(f"数值解析错误:{val},错误:{e}")
- return None
- def parse_ratio(value):
- """处理百分比数据"""
- return value if value not in ['-', ''] else None
- def batch_upsert(conn, file_path, year_month):
- """批量插入/更新数据"""
- try:
- # 读取Excel文件
- df = pd.read_excel(file_path, engine='xlrd', header=None, dtype=str)
- except Exception as e:
- raise ValueError(f"文件读取失败:{str(e)}")
- # 定位数据起始行(根据报表结构调整)
- data_start = None
- for idx, row in df.iterrows():
- if "进出口商品国别" in str(row[1]):
- data_start = idx + 6 # 根据实际表格结构调整
- break
- if not data_start:
- raise ValueError("无法定位数据起始行")
- # 读取数据区域
- df_data = pd.read_excel(file_path, engine='xlrd',
- skiprows=data_start,
- header=None,
- usecols=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 根据实际列索引调整
- # 准备批量插入数据
- params = []
- for _, row in df_data.iterrows():
- country_name = str(row[1]).strip()
- # 过滤地区和组织
- if not country_name or country_name in EXCLUDE_REGIONS:
- continue
- # 获取国家代码
- country_code = get_country_code(country_name)
- if not country_code or country_code in ['N/A', 'ORG', 'XX']:
- continue
- # 数据转换(根据实际列索引调整)
- values = (
- year_month,
- country_code,
- country_name,
- parse_value(row[2]), # monthly_total
- parse_value(row[4]), # monthly_export
- parse_value(row[6]), # monthly_import
- parse_value(row[3]), # ytd_total
- parse_value(row[5]), # ytd_export
- parse_value(row[7]), # ytd_import
- parse_ratio(row[8]), # yoy_import_export
- parse_ratio(row[9]), # yoy_export
- parse_ratio(row[10]) # yoy_import
- )
- params.append(values)
- # 构建SQL模板(使用ON DUPLICATE KEY UPDATE)[9,10](@ref)
- sql = """
- INSERT INTO t_yujin_crossborder_country_trade (`year_month`, country_code, country_name, \
- monthly_total, monthly_export, monthly_import, \
- ytd_total, ytd_export, ytd_import, \
- yoy_import_export, yoy_export, yoy_import) \
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \
- UPDATE \
- monthly_total = \
- VALUES (monthly_total), monthly_import = \
- VALUES (monthly_import), monthly_export = \
- VALUES (monthly_export), ytd_total = \
- VALUES (ytd_total), ytd_import = \
- VALUES (ytd_import), ytd_export = \
- VALUES (ytd_export), yoy_import_export = \
- VALUES (yoy_import_export), yoy_import = \
- VALUES (yoy_import), yoy_export = \
- VALUES (yoy_export) \
- """
- # 执行批量操作[1,4](@ref)
- try:
- with conn.cursor() as cursor:
- cursor.executemany(sql, params)
- return cursor.rowcount
- except Error as e:
- conn.rollback()
- raise RuntimeError(f"数据库操作失败:{str(e)}")
- def main():
- """主执行流程"""
- conn = None
- try:
- # 建立数据库连接
- conn = pymysql.connect(**DB_CONFIG)
- # 按月份顺序处理
- for month in range(1, 13):
- # 构建文件路径
- folder = f"downloads/{YEAR}/{month:02d}月"
- file_name = f"(2){YEAR}年进出口商品国别(地区)总值表.xls"
- file_path = os.path.join(folder, file_name)
- if not os.path.isfile(file_path):
- print(f"⚠️ 文件不存在:{file_path}")
- continue
- # 生成年月标识
- year_month = f"{YEAR}-{month:02d}"
- print(f"⌛ 正在处理 {year_month} 数据...")
- try:
- count = batch_upsert(conn, file_path, year_month)
- conn.commit()
- print(f"✅ 成功更新 {year_month},影响 {count} 条记录")
- except Exception as e:
- print(f"❌ {year_month} 处理失败:{str(e)}")
- conn.rollback()
- except Error as e:
- print(f"数据库连接失败:{str(e)}")
- finally:
- if conn and conn.open:
- conn.close()
- print("🏁 所有月份处理完成")
- if __name__ == "__main__":
- main()
|