# ---------------------------- 核心解析逻辑修改 ---------------------------- import os import re from decimal import Decimal import xlrd import pymysql from pymysql import Error from quanguo.CountryTrade import COUNTRY_CODE_MAPPING DB_CONFIG = { 'host': '10.130.75.149', 'port': 3307, 'user': 'yto_crm', 'password': '%3sFUlsolaRI', 'database': 'crm_uat', 'charset': 'utf8mb4' } # 全局参数 YEAR = 2023 # 目标年份 BATCH_SIZE = 1000 # 每批插入量 def parse_value(value): """增强型数值解析,处理空值和特殊符号""" try: cleaned = str(value).strip().replace(',', '') if cleaned in ('', '-', 'NA', 'None'): return Decimal('0.0000') return Decimal(cleaned).quantize(Decimal('0.0000')) except Exception: return Decimal('0.0000') def parse_hscode(row_value): """解析类/章信息(增强格式兼容性)""" global current_class cell_value = str(row_value).strip() clean_value = re.sub(r'\s+', ' ', cell_value) # 合并连续空格 # 匹配类格式:第X类(兼容空格) if class_match := re.match(r'^第\s*(\d+)\s*类\b.*', clean_value): class_num = class_match.group(1) current_class = f"{int(class_num):02d}" # 两位数格式化 return current_class # 匹配章格式:XX章(兼容前导零) if chapter_match := re.match(r'^(\d+)\s*章\b.*', clean_value): if current_class: chapter_num = chapter_match.group(1).zfill(2) # 自动补零 return f"{current_class}{chapter_num}" return None def process_month_file(conn, folder, month, trade_type): """处理单月数据文件""" global current_class current_class = None # 构建文件路径 file_prefix = "(15)" if trade_type == "export" else "(16)" file_name = f"{file_prefix}{YEAR}年{'自' if trade_type == 'import' else '对'}部分国家(地区){'进' if trade_type == 'import' else '出'}口商品类章金额表.xls" file_path = os.path.join(folder, f"{month:02d}月", file_name) file_path = os.path.normpath(file_path) if not os.path.exists(file_path): print(f"文件不存在:{file_path}") return 0 try: workbook = xlrd.open_workbook(file_path) sheet = workbook.sheet_by_index(0) except Exception as e: print(f"文件打开失败:{file_path}\n错误:{str(e)}") return 0 params = [] year_month = f"{YEAR}-{month:02d}" # 遍历数据行(从第7行开始) for row_idx in range(6, sheet.nrows): try: row = sheet.row_values(row_idx) if not row[1]: continue # 解析HS编码 hs_code = parse_hscode(row[1]) if not hs_code: print(f"[行{row_idx}] 忽略无效行:{row[1]}") continue # 读取国家数据 for col_idx in range(2, sheet.ncols, 2): country_name = sheet.cell_value(3, col_idx).strip() country_code = COUNTRY_CODE_MAPPING.get(country_name, 'XX') month_val = parse_value(row[col_idx]) cumulative_val = parse_value(row[col_idx + 1]) if (col_idx + 1) < sheet.ncols else Decimal('0.0000') params.append(( year_month, hs_code, trade_type, country_code, country_name, month_val, cumulative_val )) except Exception as e: print(f"行{row_idx}-{col_idx}处理失败:{str(e)}") continue # 批量写入数据库 try: with conn.cursor() as cursor: # 分批次提交[5,9](@ref) for i in range(0, len(params), BATCH_SIZE): batch = params[i:i + BATCH_SIZE] sql = """ INSERT INTO t_yujin_crossborder_commodity_country (`year_month`, hs_code, trade_type, country_code, country_name, month_amount, cumulative_amount) VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \ UPDATE \ month_amount = \ VALUES (month_amount), cumulative_amount = \ VALUES (cumulative_amount) \ """ cursor.executemany(sql, batch) conn.commit() return len(params) except Error as e: conn.rollback() print(f"数据库操作失败:{str(e)}") return 0 def main(): """主处理流程""" conn = None try: conn = pymysql.connect(**DB_CONFIG) # 处理全年数据 for month in range(1, 13): folder = f"downloads/{YEAR}" print(f"⌛ 正在处理 {YEAR}-{month:02d} 数据...") # 处理出口数据 export_count = process_month_file(conn, folder, month, "export") # 处理进口数据 import_count = process_month_file(conn, folder, month, "import") print(f"✅ {YEAR}-{month:02d} 完成 | 出口:{export_count}条 进口:{import_count}条") except Error as e: print(f"数据库连接失败:{str(e)}") finally: if conn and conn.open: conn.close() print("🏁 所有月份处理完成") if __name__ == "__main__": main()