123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import os
- import re
- import pymysql
- from decimal import Decimal
- from pymysql import Error
- import xlrd
- # 数据库配置
- DB_CONFIG = {
- 'host': '10.130.75.149',
- 'port': 3307,
- 'user': 'yto_crm',
- 'password': '%3sFUlsolaRI',
- 'database': 'crm_uat',
- 'charset': 'utf8mb4'
- }
- # 全局参数
- YEAR = 2023 # 目标年份
- BASE_DIR = "../src/downloads" # 下载目录基础路径
- def chinese_class_to_number(class_str):
- """中文类名转数字(保持原有实现)"""
- cn_num_map = {
- '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6,
- '七': 7, '八': 8, '九': 9, '十': 10, '十一': 11, '十二': 12,
- '十三': 13, '十四': 14, '十五': 15, '十六': 16, '十七': 17,
- '十八': 18, '十九': 19, '二十': 20, '二十一': 21, '二十二': 22
- }
- match = re.match(r'^第([一二三四五六七八九十百千万]+)类.*$', class_str)
- return cn_num_map.get(match.group(1), 0) if match else 0
- def parse_value(val):
- """增强型数值解析(含科学计数法处理),保留四位小数"""
- if val in ('-', None, 'None', 'null'):
- return None
- try:
- # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题
- if 'E' in str(val).upper():
- return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数
- return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数
- except Exception as e:
- print(f"数值解析错误:{val},错误:{e}")
- return None
- def process_month_file(conn, file_path, year_month):
- """处理单个月份文件"""
- try:
- workbook = xlrd.open_workbook(file_path)
- sheet = workbook.sheet_by_index(0)
- except Exception as e:
- print(f"文件打开失败:{file_path}\n错误:{str(e)}")
- return 0
- cursor = conn.cursor()
- params = []
- current_class = None
- # 遍历数据行(从第7行开始)
- for row_idx in range(6, sheet.nrows):
- try:
- row = sheet.row_values(row_idx)
- cell_value = str(row[1]).strip() if len(row) > 1 else ""
- # 数据清洗
- clean_value = re.sub(r'\s+', ' ', cell_value)
- parts = clean_value.split(' ', 1)
- if not parts:
- continue
- identifier = parts[0]
- # 解析章数据
- if re.match(r'^\d+章$', identifier):
- if current_class:
- chapter_num = re.findall(r'\d+', identifier)[0].zfill(2)
- hs_code = f"{current_class}{chapter_num}"
- # 提取各列数据(根据实际表格结构调整)
- data_fields = [
- parse_value(row[2]), # monthly_export
- parse_value(row[3]), # ytd_export
- parse_value(row[4]), # monthly_import
- parse_value(row[5]), # ytd_import
- parse_value(row[6]), # ytd_yoy_export
- parse_value(row[7]) # ytd_yoy_import
- ]
- params.append((
- year_month,
- hs_code,
- *data_fields
- ))
- # 解析类数据
- elif re.match(r'^第([一二三四五六七八九十百千万]+)类.*$', identifier):
- class_num = chinese_class_to_number(identifier)
- if 1 <= class_num <= 22:
- current_class = f"{class_num:02d}"
- # 类级别数据插入
- class_data = [
- parse_value(row[2]), # monthly_export
- parse_value(row[3]), # ytd_export
- parse_value(row[4]), # monthly_import
- parse_value(row[5]), # ytd_import
- parse_value(row[6]), # ytd_yoy_export
- parse_value(row[7]) # ytd_yoy_import
- ]
- params.append((
- year_month,
- current_class,
- *class_data
- ))
- except Exception as e:
- print(f"行{row_idx}处理失败:{str(e)}")
- continue
- # 批量执行SQL[3,9](@ref)
- sql = """
- INSERT INTO `t_yujin_crossborder_commodity_trade`
- (`year_month`, `hs_code`, `monthly_export`, `ytd_export`,
- `monthly_import`, `ytd_import`, `ytd_yoy_export`, `ytd_yoy_import`)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \
- UPDATE \
- monthly_export = \
- VALUES (monthly_export), ytd_export = \
- VALUES (ytd_export), monthly_import = \
- VALUES (monthly_import), ytd_import = \
- VALUES (ytd_import), ytd_yoy_export = \
- VALUES (ytd_yoy_export), ytd_yoy_import = \
- VALUES (ytd_yoy_import) \
- """
- try:
- # 分批次提交(每批1000条)[9](@ref)
- batch_size = 1000
- for i in range(0, len(params), batch_size):
- cursor.executemany(sql, params[i:i + batch_size])
- conn.commit()
- return len(params)
- except Error as e:
- conn.rollback()
- print(f"数据库操作失败:{str(e)}")
- return 0
- finally:
- cursor.close()
- def main():
- """主处理流程"""
- conn = None
- try:
- conn = pymysql.connect(**DB_CONFIG)
- # 顺序处理1-12月数据[6](@ref)
- for month in range(1, 13):
- # 构建文件路径
- folder = f"{BASE_DIR}/{YEAR}/{month:02d}月"
- file_name = f"(4){YEAR}年进出口商品类章总值表.xls"
- file_path = os.path.join(folder, file_name)
- file_path = os.path.normpath(file_path)
- if not os.path.exists(file_path):
- print(f"文件不存在:{file_path}")
- continue
- # 生成年月标识
- year_month = f"{YEAR}-{month:02d}"
- print(f"⌛ 正在处理 {year_month} 数据...")
- try:
- count = process_month_file(conn, file_path, year_month)
- print(f"✅ 成功更新 {year_month},影响 {count} 条记录")
- except Exception as e:
- print(f"❌ {year_month} 处理失败:{str(e)}")
- conn.rollback()
- except Error as e:
- print(f"数据库连接失败:{str(e)}")
- finally:
- if conn and conn.open:
- conn.close()
- print("🏁 所有月份处理完成")
- if __name__ == "__main__":
- main()
|