commodity_trade_year.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import os
  2. import re
  3. import pymysql
  4. from decimal import Decimal
  5. from pymysql import Error
  6. import xlrd
  7. # 数据库配置
  8. DB_CONFIG = {
  9. 'host': '10.130.75.149',
  10. 'port': 3307,
  11. 'user': 'yto_crm',
  12. 'password': '%3sFUlsolaRI',
  13. 'database': 'crm_uat',
  14. 'charset': 'utf8mb4'
  15. }
  16. # 全局参数
  17. YEAR = 2023 # 目标年份
  18. BASE_DIR = "../src/downloads" # 下载目录基础路径
  19. def chinese_class_to_number(class_str):
  20. """中文类名转数字(保持原有实现)"""
  21. cn_num_map = {
  22. '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6,
  23. '七': 7, '八': 8, '九': 9, '十': 10, '十一': 11, '十二': 12,
  24. '十三': 13, '十四': 14, '十五': 15, '十六': 16, '十七': 17,
  25. '十八': 18, '十九': 19, '二十': 20, '二十一': 21, '二十二': 22
  26. }
  27. match = re.match(r'^第([一二三四五六七八九十百千万]+)类.*$', class_str)
  28. return cn_num_map.get(match.group(1), 0) if match else 0
  29. def parse_value(val):
  30. """增强型数值解析(含科学计数法处理),保留四位小数"""
  31. if val in ('-', None, 'None', 'null'):
  32. return None
  33. try:
  34. # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题
  35. if 'E' in str(val).upper():
  36. return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数
  37. return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数
  38. except Exception as e:
  39. print(f"数值解析错误:{val},错误:{e}")
  40. return None
  41. def process_month_file(conn, file_path, year_month):
  42. """处理单个月份文件"""
  43. try:
  44. workbook = xlrd.open_workbook(file_path)
  45. sheet = workbook.sheet_by_index(0)
  46. except Exception as e:
  47. print(f"文件打开失败:{file_path}\n错误:{str(e)}")
  48. return 0
  49. cursor = conn.cursor()
  50. params = []
  51. current_class = None
  52. # 遍历数据行(从第7行开始)
  53. for row_idx in range(6, sheet.nrows):
  54. try:
  55. row = sheet.row_values(row_idx)
  56. cell_value = str(row[1]).strip() if len(row) > 1 else ""
  57. # 数据清洗
  58. clean_value = re.sub(r'\s+', ' ', cell_value)
  59. parts = clean_value.split(' ', 1)
  60. if not parts:
  61. continue
  62. identifier = parts[0]
  63. # 解析章数据
  64. if re.match(r'^\d+章$', identifier):
  65. if current_class:
  66. chapter_num = re.findall(r'\d+', identifier)[0].zfill(2)
  67. hs_code = f"{current_class}{chapter_num}"
  68. # 提取各列数据(根据实际表格结构调整)
  69. data_fields = [
  70. parse_value(row[2]), # monthly_export
  71. parse_value(row[3]), # ytd_export
  72. parse_value(row[4]), # monthly_import
  73. parse_value(row[5]), # ytd_import
  74. parse_value(row[6]), # ytd_yoy_export
  75. parse_value(row[7]) # ytd_yoy_import
  76. ]
  77. params.append((
  78. year_month,
  79. hs_code,
  80. *data_fields
  81. ))
  82. # 解析类数据
  83. elif re.match(r'^第([一二三四五六七八九十百千万]+)类.*$', identifier):
  84. class_num = chinese_class_to_number(identifier)
  85. if 1 <= class_num <= 22:
  86. current_class = f"{class_num:02d}"
  87. # 类级别数据插入
  88. class_data = [
  89. parse_value(row[2]), # monthly_export
  90. parse_value(row[3]), # ytd_export
  91. parse_value(row[4]), # monthly_import
  92. parse_value(row[5]), # ytd_import
  93. parse_value(row[6]), # ytd_yoy_export
  94. parse_value(row[7]) # ytd_yoy_import
  95. ]
  96. params.append((
  97. year_month,
  98. current_class,
  99. *class_data
  100. ))
  101. except Exception as e:
  102. print(f"行{row_idx}处理失败:{str(e)}")
  103. continue
  104. # 批量执行SQL[3,9](@ref)
  105. sql = """
  106. INSERT INTO `t_yujin_crossborder_commodity_trade`
  107. (`year_month`, `hs_code`, `monthly_export`, `ytd_export`,
  108. `monthly_import`, `ytd_import`, `ytd_yoy_export`, `ytd_yoy_import`)
  109. VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \
  110. UPDATE \
  111. monthly_export = \
  112. VALUES (monthly_export), ytd_export = \
  113. VALUES (ytd_export), monthly_import = \
  114. VALUES (monthly_import), ytd_import = \
  115. VALUES (ytd_import), ytd_yoy_export = \
  116. VALUES (ytd_yoy_export), ytd_yoy_import = \
  117. VALUES (ytd_yoy_import) \
  118. """
  119. try:
  120. # 分批次提交(每批1000条)[9](@ref)
  121. batch_size = 1000
  122. for i in range(0, len(params), batch_size):
  123. cursor.executemany(sql, params[i:i + batch_size])
  124. conn.commit()
  125. return len(params)
  126. except Error as e:
  127. conn.rollback()
  128. print(f"数据库操作失败:{str(e)}")
  129. return 0
  130. finally:
  131. cursor.close()
  132. def main():
  133. """主处理流程"""
  134. conn = None
  135. try:
  136. conn = pymysql.connect(**DB_CONFIG)
  137. # 顺序处理1-12月数据[6](@ref)
  138. for month in range(1, 13):
  139. # 构建文件路径
  140. folder = f"{BASE_DIR}/{YEAR}/{month:02d}月"
  141. file_name = f"(4){YEAR}年进出口商品类章总值表.xls"
  142. file_path = os.path.join(folder, file_name)
  143. file_path = os.path.normpath(file_path)
  144. if not os.path.exists(file_path):
  145. print(f"文件不存在:{file_path}")
  146. continue
  147. # 生成年月标识
  148. year_month = f"{YEAR}-{month:02d}"
  149. print(f"⌛ 正在处理 {year_month} 数据...")
  150. try:
  151. count = process_month_file(conn, file_path, year_month)
  152. print(f"✅ 成功更新 {year_month},影响 {count} 条记录")
  153. except Exception as e:
  154. print(f"❌ {year_month} 处理失败:{str(e)}")
  155. conn.rollback()
  156. except Error as e:
  157. print(f"数据库连接失败:{str(e)}")
  158. finally:
  159. if conn and conn.open:
  160. conn.close()
  161. print("🏁 所有月份处理完成")
  162. if __name__ == "__main__":
  163. main()