CountryTradeYear.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import os
  2. from decimal import Decimal
  3. import pandas as pd
  4. import pymysql
  5. from pymysql import Error
  6. from utils.constants import COUNTRY_CODE_MAPPING
  7. YEAR = 2023
  8. DB_CONFIG = {
  9. 'host': '10.130.75.149',
  10. 'port': 3307,
  11. 'user': 'yto_crm',
  12. 'password': '%3sFUlsolaRI',
  13. 'database': 'crm_uat',
  14. 'charset': 'utf8mb4'
  15. }
  16. # 在代码中添加例外处理
  17. def get_country_code(chinese_name):
  18. """带异常处理的国家编码查询"""
  19. code = COUNTRY_CODE_MAPPING.get(chinese_name.strip(), None)
  20. # 特殊处理逻辑
  21. if not code:
  22. if chinese_name.endswith("(地区)"):
  23. return "N/A" # 标记为地区
  24. elif "国家联盟" in chinese_name:
  25. return "ORG" # 标记为国际组织
  26. return code
  27. def parse_value(val):
  28. """增强型数值解析(含科学计数法处理),保留四位小数"""
  29. if val in ('-', None, 'None', 'null'):
  30. return None
  31. try:
  32. # 科学计数法处理(如1.2E+5),使用Decimal处理以避免浮动精度问题
  33. if 'E' in str(val).upper():
  34. return Decimal(val).quantize(Decimal('0.0000')) # 用Decimal处理科学计数法,确保四位小数
  35. return Decimal(str(val).replace(',', '')).quantize(Decimal('0.0000')) # 保留四位小数
  36. except Exception as e:
  37. print(f"数值解析错误:{val},错误:{e}")
  38. return None
  39. def parse_ratio(value):
  40. """处理百分比数据"""
  41. return value if value not in ['-', ''] else None
  42. def batch_upsert(conn, file_path, year_month):
  43. """批量插入/更新数据"""
  44. try:
  45. # 读取Excel文件
  46. df = pd.read_excel(file_path, engine='xlrd', header=None, dtype=str)
  47. except Exception as e:
  48. raise ValueError(f"文件读取失败:{str(e)}")
  49. # 定位数据起始行(根据报表结构调整)
  50. data_start = None
  51. for idx, row in df.iterrows():
  52. if "进出口商品国别" in str(row[1]):
  53. data_start = idx + 6 # 根据实际表格结构调整
  54. break
  55. if not data_start:
  56. raise ValueError("无法定位数据起始行")
  57. # 读取数据区域
  58. df_data = pd.read_excel(file_path, engine='xlrd',
  59. skiprows=data_start,
  60. header=None,
  61. usecols=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 根据实际列索引调整
  62. # 准备批量插入数据
  63. params = []
  64. for _, row in df_data.iterrows():
  65. country_name = str(row[1]).strip()
  66. # 过滤地区和组织
  67. if not country_name or country_name in EXCLUDE_REGIONS:
  68. continue
  69. # 获取国家代码
  70. country_code = get_country_code(country_name)
  71. if not country_code or country_code in ['N/A', 'ORG', 'XX']:
  72. continue
  73. # 数据转换(根据实际列索引调整)
  74. values = (
  75. year_month,
  76. country_code,
  77. country_name,
  78. parse_value(row[2]), # monthly_total
  79. parse_value(row[4]), # monthly_export
  80. parse_value(row[6]), # monthly_import
  81. parse_value(row[3]), # ytd_total
  82. parse_value(row[5]), # ytd_export
  83. parse_value(row[7]), # ytd_import
  84. parse_ratio(row[8]), # yoy_import_export
  85. parse_ratio(row[9]), # yoy_export
  86. parse_ratio(row[10]) # yoy_import
  87. )
  88. params.append(values)
  89. # 构建SQL模板(使用ON DUPLICATE KEY UPDATE)[9,10](@ref)
  90. sql = """
  91. INSERT INTO t_yujin_crossborder_country_trade (`year_month`, country_code, country_name, \
  92. monthly_total, monthly_export, monthly_import, \
  93. ytd_total, ytd_export, ytd_import, \
  94. yoy_import_export, yoy_export, yoy_import) \
  95. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY \
  96. UPDATE \
  97. monthly_total = \
  98. VALUES (monthly_total), monthly_import = \
  99. VALUES (monthly_import), monthly_export = \
  100. VALUES (monthly_export), ytd_total = \
  101. VALUES (ytd_total), ytd_import = \
  102. VALUES (ytd_import), ytd_export = \
  103. VALUES (ytd_export), yoy_import_export = \
  104. VALUES (yoy_import_export), yoy_import = \
  105. VALUES (yoy_import), yoy_export = \
  106. VALUES (yoy_export) \
  107. """
  108. # 执行批量操作[1,4](@ref)
  109. try:
  110. with conn.cursor() as cursor:
  111. cursor.executemany(sql, params)
  112. return cursor.rowcount
  113. except Error as e:
  114. conn.rollback()
  115. raise RuntimeError(f"数据库操作失败:{str(e)}")
  116. def main():
  117. """主执行流程"""
  118. conn = None
  119. try:
  120. # 建立数据库连接
  121. conn = pymysql.connect(**DB_CONFIG)
  122. # 按月份顺序处理
  123. for month in range(1, 13):
  124. # 构建文件路径
  125. folder = f"downloads/{YEAR}/{month:02d}月"
  126. file_name = f"(2){YEAR}年进出口商品国别(地区)总值表.xls"
  127. file_path = os.path.join(folder, file_name)
  128. if not os.path.isfile(file_path):
  129. print(f"⚠️ 文件不存在:{file_path}")
  130. continue
  131. # 生成年月标识
  132. year_month = f"{YEAR}-{month:02d}"
  133. print(f"⌛ 正在处理 {year_month} 数据...")
  134. try:
  135. count = batch_upsert(conn, file_path, year_month)
  136. conn.commit()
  137. print(f"✅ 成功更新 {year_month},影响 {count} 条记录")
  138. except Exception as e:
  139. print(f"❌ {year_month} 处理失败:{str(e)}")
  140. conn.rollback()
  141. except Error as e:
  142. print(f"数据库连接失败:{str(e)}")
  143. finally:
  144. if conn and conn.open:
  145. conn.close()
  146. print("🏁 所有月份处理完成")
  147. if __name__ == "__main__":
  148. main()