import os import re import pymysql from decimal import Decimal from pymysql import Error import xlrd # 数据库配置 DB_CONFIG = { 'host': '10.130.75.149', 'port': 3307, 'user': 'yto_crm', 'password': '%3sFUlsolaRI', 'database': 'crm_uat', 'charset': 'utf8mb4' } # 全局参数 YEAR = 2023 # 目标年份 BASE_DIR = "../src/downloads" # 下载目录基础路径 def chinese_class_to_number(class_str): """中文类名转数字(保持原有实现)""" cn_num_map = { '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10, '十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15, '十六': 16, '十七': 17, '十八': 18, '十九': 19, '二十': 20, '二十一': 21, '二十二': 22 } match = re.match(r'^第([一二三四五六七八九十百千万]+)类.*$', class_str) return cn_num_map.get(match.group(1), 0) if match else 0 def parse_value(val): """增强型数值解析(含科学计数法处理),保留四位小数""" if val in ('-', None, 'None', 'null'): return None try: # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题 if 'E' in str(val).upper(): return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数 return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数 except Exception as e: print(f"数值解析错误:{val},错误:{e}") return None def process_month_file(conn, file_path, year_month): """处理单个月份文件""" try: workbook = xlrd.open_workbook(file_path) sheet = workbook.sheet_by_index(0) except Exception as e: print(f"文件打开失败:{file_path}\n错误:{str(e)}") return 0 cursor = conn.cursor() params = [] current_class = None # 遍历数据行(从第7行开始) for row_idx in range(6, sheet.nrows): try: row = sheet.row_values(row_idx) cell_value = str(row[1]).strip() if len(row) > 1 else "" # 数据清洗 clean_value = re.sub(r'\s+', ' ', cell_value) parts = clean_value.split(' ', 1) if not parts: continue identifier = parts[0] # 解析章数据 if re.match(r'^\d+章$', identifier): if current_class: chapter_num = re.findall(r'\d+', identifier)[0].zfill(2) hs_code = f"{current_class}{chapter_num}" # 提取各列数据(根据实际表格结构调整) data_fields = [ parse_value(row[2]), # monthly_export parse_value(row[3]), # ytd_export parse_value(row[4]), # monthly_import parse_value(row[5]), # ytd_import parse_value(row[6]), # ytd_yoy_export parse_value(row[7]) # ytd_yoy_import ] params.append(( year_month, hs_code, *data_fields )) # 解析类数据 elif re.match(r'^第([一二三四五六七八九十百千万]+)类.*$', identifier): class_num = chinese_class_to_number(identifier) if 1 <= class_num <= 22: current_class = f"{class_num:02d}" # 类级别数据插入 class_data = [ parse_value(row[2]), # monthly_export parse_value(row[3]), # ytd_export parse_value(row[4]), # monthly_import parse_value(row[5]), # ytd_import parse_value(row[6]), # ytd_yoy_export parse_value(row[7]) # ytd_yoy_import ] params.append(( year_month, current_class, *class_data )) except Exception as e: print(f"行{row_idx}处理失败:{str(e)}") continue # 批量执行SQL[3,9](@ref) sql = """ INSERT INTO `t_yujin_crossborder_commodity_trade` (`year_month`, `hs_code`, `monthly_export`, `ytd_export`, `monthly_import`, `ytd_import`, `ytd_yoy_export`, `ytd_yoy_import`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \ UPDATE \ monthly_export = \ VALUES (monthly_export), ytd_export = \ VALUES (ytd_export), monthly_import = \ VALUES (monthly_import), ytd_import = \ VALUES (ytd_import), ytd_yoy_export = \ VALUES (ytd_yoy_export), ytd_yoy_import = \ VALUES (ytd_yoy_import) \ """ try: # 分批次提交(每批1000条)[9](@ref) batch_size = 1000 for i in range(0, len(params), batch_size): cursor.executemany(sql, params[i:i + batch_size]) conn.commit() return len(params) except Error as e: conn.rollback() print(f"数据库操作失败:{str(e)}") return 0 finally: cursor.close() def main(): """主处理流程""" conn = None try: conn = pymysql.connect(**DB_CONFIG) # 顺序处理1-12月数据[6](@ref) for month in range(1, 13): # 构建文件路径 folder = f"{BASE_DIR}/{YEAR}/{month:02d}月" file_name = f"(4){YEAR}年进出口商品类章总值表.xls" file_path = os.path.join(folder, file_name) file_path = os.path.normpath(file_path) if not os.path.exists(file_path): print(f"文件不存在:{file_path}") continue # 生成年月标识 year_month = f"{YEAR}-{month:02d}" print(f"⌛ 正在处理 {year_month} 数据...") try: count = process_month_file(conn, file_path, year_month) print(f"✅ 成功更新 {year_month},影响 {count} 条记录") except Exception as e: print(f"❌ {year_month} 处理失败:{str(e)}") conn.rollback() except Error as e: print(f"数据库连接失败:{str(e)}") finally: if conn and conn.open: conn.close() print("🏁 所有月份处理完成") if __name__ == "__main__": main()