detail_year.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # ---------------------------- 核心解析逻辑修改 ----------------------------
  2. import os
  3. import re
  4. from decimal import Decimal
  5. import xlrd
  6. import pymysql
  7. from pymysql import Error
  8. from quanguo.CountryTrade import COUNTRY_CODE_MAPPING
  9. DB_CONFIG = {
  10. 'host': '10.130.75.149',
  11. 'port': 3307,
  12. 'user': 'yto_crm',
  13. 'password': '%3sFUlsolaRI',
  14. 'database': 'crm_uat',
  15. 'charset': 'utf8mb4'
  16. }
  17. # 全局参数
  18. YEAR = 2023 # 目标年份
  19. BATCH_SIZE = 1000 # 每批插入量
  20. def parse_value(value):
  21. """增强型数值解析,处理空值和特殊符号"""
  22. try:
  23. cleaned = str(value).strip().replace(',', '')
  24. if cleaned in ('', '-', 'NA', 'None'):
  25. return Decimal('0.0000')
  26. return Decimal(cleaned).quantize(Decimal('0.0000'))
  27. except Exception:
  28. return Decimal('0.0000')
  29. def parse_hscode(row_value):
  30. """解析类/章信息(增强格式兼容性)"""
  31. global current_class
  32. cell_value = str(row_value).strip()
  33. clean_value = re.sub(r'\s+', ' ', cell_value) # 合并连续空格
  34. # 匹配类格式:第X类(兼容空格)
  35. if class_match := re.match(r'^第\s*(\d+)\s*类\b.*', clean_value):
  36. class_num = class_match.group(1)
  37. current_class = f"{int(class_num):02d}" # 两位数格式化
  38. return current_class
  39. # 匹配章格式:XX章(兼容前导零)
  40. if chapter_match := re.match(r'^(\d+)\s*章\b.*', clean_value):
  41. if current_class:
  42. chapter_num = chapter_match.group(1).zfill(2) # 自动补零
  43. return f"{current_class}{chapter_num}"
  44. return None
  45. def process_month_file(conn, folder, month, trade_type):
  46. """处理单月数据文件"""
  47. global current_class
  48. current_class = None
  49. # 构建文件路径
  50. file_prefix = "(15)" if trade_type == "export" else "(16)"
  51. file_name = f"{file_prefix}{YEAR}年{'自' if trade_type == 'import' else '对'}部分国家(地区){'进' if trade_type == 'import' else '出'}口商品类章金额表.xls"
  52. file_path = os.path.join(folder, f"{month:02d}月", file_name)
  53. file_path = os.path.normpath(file_path)
  54. if not os.path.exists(file_path):
  55. print(f"文件不存在:{file_path}")
  56. return 0
  57. try:
  58. workbook = xlrd.open_workbook(file_path)
  59. sheet = workbook.sheet_by_index(0)
  60. except Exception as e:
  61. print(f"文件打开失败:{file_path}\n错误:{str(e)}")
  62. return 0
  63. params = []
  64. year_month = f"{YEAR}-{month:02d}"
  65. # 遍历数据行(从第7行开始)
  66. for row_idx in range(6, sheet.nrows):
  67. try:
  68. row = sheet.row_values(row_idx)
  69. if not row[1]:
  70. continue
  71. # 解析HS编码
  72. hs_code = parse_hscode(row[1])
  73. if not hs_code:
  74. print(f"[行{row_idx}] 忽略无效行:{row[1]}")
  75. continue
  76. # 读取国家数据
  77. for col_idx in range(2, sheet.ncols, 2):
  78. country_name = sheet.cell_value(3, col_idx).strip()
  79. country_code = COUNTRY_CODE_MAPPING.get(country_name, 'XX')
  80. month_val = parse_value(row[col_idx])
  81. cumulative_val = parse_value(row[col_idx + 1]) if (col_idx + 1) < sheet.ncols else Decimal('0.0000')
  82. params.append((
  83. year_month,
  84. hs_code,
  85. trade_type,
  86. country_code,
  87. country_name,
  88. month_val,
  89. cumulative_val
  90. ))
  91. except Exception as e:
  92. print(f"行{row_idx}-{col_idx}处理失败:{str(e)}")
  93. continue
  94. # 批量写入数据库
  95. try:
  96. with conn.cursor() as cursor:
  97. # 分批次提交[5,9](@ref)
  98. for i in range(0, len(params), BATCH_SIZE):
  99. batch = params[i:i + BATCH_SIZE]
  100. sql = """
  101. INSERT INTO t_yujin_crossborder_commodity_country
  102. (`year_month`, hs_code, trade_type, country_code, country_name,
  103. month_amount, cumulative_amount)
  104. VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \
  105. UPDATE \
  106. month_amount = \
  107. VALUES (month_amount), cumulative_amount = \
  108. VALUES (cumulative_amount) \
  109. """
  110. cursor.executemany(sql, batch)
  111. conn.commit()
  112. return len(params)
  113. except Error as e:
  114. conn.rollback()
  115. print(f"数据库操作失败:{str(e)}")
  116. return 0
  117. def main():
  118. """主处理流程"""
  119. conn = None
  120. try:
  121. conn = pymysql.connect(**DB_CONFIG)
  122. # 处理全年数据
  123. for month in range(1, 13):
  124. folder = f"downloads/{YEAR}"
  125. print(f"⌛ 正在处理 {YEAR}-{month:02d} 数据...")
  126. # 处理出口数据
  127. export_count = process_month_file(conn, folder, month, "export")
  128. # 处理进口数据
  129. import_count = process_month_file(conn, folder, month, "import")
  130. print(f"✅ {YEAR}-{month:02d} 完成 | 出口:{export_count}条 进口:{import_count}条")
  131. except Error as e:
  132. print(f"数据库连接失败:{str(e)}")
  133. finally:
  134. if conn and conn.open:
  135. conn.close()
  136. print("🏁 所有月份处理完成")
  137. if __name__ == "__main__":
  138. main()