123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- # ---------------------------- 核心解析逻辑修改 ----------------------------
- import os
- import re
- from decimal import Decimal
- import xlrd
- import pymysql
- from pymysql import Error
- from quanguo.CountryTrade import COUNTRY_CODE_MAPPING
- DB_CONFIG = {
- 'host': '10.130.75.149',
- 'port': 3307,
- 'user': 'yto_crm',
- 'password': '%3sFUlsolaRI',
- 'database': 'crm_uat',
- 'charset': 'utf8mb4'
- }
- # 全局参数
- YEAR = 2023 # 目标年份
- BATCH_SIZE = 1000 # 每批插入量
- def parse_value(value):
- """增强型数值解析,处理空值和特殊符号"""
- try:
- cleaned = str(value).strip().replace(',', '')
- if cleaned in ('', '-', 'NA', 'None'):
- return Decimal('0.0000')
- return Decimal(cleaned).quantize(Decimal('0.0000'))
- except Exception:
- return Decimal('0.0000')
- def parse_hscode(row_value):
- """解析类/章信息(增强格式兼容性)"""
- global current_class
- cell_value = str(row_value).strip()
- clean_value = re.sub(r'\s+', ' ', cell_value) # 合并连续空格
- # 匹配类格式:第X类(兼容空格)
- if class_match := re.match(r'^第\s*(\d+)\s*类\b.*', clean_value):
- class_num = class_match.group(1)
- current_class = f"{int(class_num):02d}" # 两位数格式化
- return current_class
- # 匹配章格式:XX章(兼容前导零)
- if chapter_match := re.match(r'^(\d+)\s*章\b.*', clean_value):
- if current_class:
- chapter_num = chapter_match.group(1).zfill(2) # 自动补零
- return f"{current_class}{chapter_num}"
- return None
- def process_month_file(conn, folder, month, trade_type):
- """处理单月数据文件"""
- global current_class
- current_class = None
- # 构建文件路径
- file_prefix = "(15)" if trade_type == "export" else "(16)"
- file_name = f"{file_prefix}{YEAR}年{'自' if trade_type == 'import' else '对'}部分国家(地区){'进' if trade_type == 'import' else '出'}口商品类章金额表.xls"
- file_path = os.path.join(folder, f"{month:02d}月", file_name)
- file_path = os.path.normpath(file_path)
- if not os.path.exists(file_path):
- print(f"文件不存在:{file_path}")
- return 0
- try:
- workbook = xlrd.open_workbook(file_path)
- sheet = workbook.sheet_by_index(0)
- except Exception as e:
- print(f"文件打开失败:{file_path}\n错误:{str(e)}")
- return 0
- params = []
- year_month = f"{YEAR}-{month:02d}"
- # 遍历数据行(从第7行开始)
- for row_idx in range(6, sheet.nrows):
- try:
- row = sheet.row_values(row_idx)
- if not row[1]:
- continue
- # 解析HS编码
- hs_code = parse_hscode(row[1])
- if not hs_code:
- print(f"[行{row_idx}] 忽略无效行:{row[1]}")
- continue
- # 读取国家数据
- for col_idx in range(2, sheet.ncols, 2):
- country_name = sheet.cell_value(3, col_idx).strip()
- country_code = COUNTRY_CODE_MAPPING.get(country_name, 'XX')
- month_val = parse_value(row[col_idx])
- cumulative_val = parse_value(row[col_idx + 1]) if (col_idx + 1) < sheet.ncols else Decimal('0.0000')
- params.append((
- year_month,
- hs_code,
- trade_type,
- country_code,
- country_name,
- month_val,
- cumulative_val
- ))
- except Exception as e:
- print(f"行{row_idx}-{col_idx}处理失败:{str(e)}")
- continue
- # 批量写入数据库
- try:
- with conn.cursor() as cursor:
- # 分批次提交[5,9](@ref)
- for i in range(0, len(params), BATCH_SIZE):
- batch = params[i:i + BATCH_SIZE]
- sql = """
- INSERT INTO t_yujin_crossborder_commodity_country
- (`year_month`, hs_code, trade_type, country_code, country_name,
- month_amount, cumulative_amount)
- VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \
- UPDATE \
- month_amount = \
- VALUES (month_amount), cumulative_amount = \
- VALUES (cumulative_amount) \
- """
- cursor.executemany(sql, batch)
- conn.commit()
- return len(params)
- except Error as e:
- conn.rollback()
- print(f"数据库操作失败:{str(e)}")
- return 0
- def main():
- """主处理流程"""
- conn = None
- try:
- conn = pymysql.connect(**DB_CONFIG)
- # 处理全年数据
- for month in range(1, 13):
- folder = f"downloads/{YEAR}"
- print(f"⌛ 正在处理 {YEAR}-{month:02d} 数据...")
- # 处理出口数据
- export_count = process_month_file(conn, folder, month, "export")
- # 处理进口数据
- import_count = process_month_file(conn, folder, month, "import")
- print(f"✅ {YEAR}-{month:02d} 完成 | 出口:{export_count}条 进口:{import_count}条")
- except Error as e:
- print(f"数据库连接失败:{str(e)}")
- finally:
- if conn and conn.open:
- conn.close()
- print("🏁 所有月份处理完成")
- if __name__ == "__main__":
- main()
|