db_helper.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. from sqlalchemy import create_engine, text
  2. import logging
  3. import pymysql
  4. import pandas as pd
  5. from utils.log import log
  6. DB_CONFIG = {
  7. 'host': '10.130.75.149',
  8. 'port': 3307,
  9. 'user': 'yto_crm',
  10. 'password': '%3sFUlsolaRI',
  11. 'database': 'crm_uat',
  12. 'charset': 'utf8mb4'
  13. }
  14. # DB_CONFIG = {
  15. # 'host': '10.130.36.185',
  16. # 'port': 3306,
  17. # 'user': 'user_ytexp',
  18. # 'password': 'Rn9ib3L1C4b4%40123',
  19. # 'database': 'yto_crm',
  20. # 'charset': 'utf8mb4'
  21. # }
  22. class DBHelper:
  23. def __init__(self):
  24. self.engine = create_engine(
  25. f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}',
  26. pool_size=5,
  27. max_overflow=10
  28. )
  29. def get_commodity_id(self, name):
  30. """获取商品编码对应的分类ID[1,3](@ref)"""
  31. with self.engine.connect() as conn:
  32. result = conn.execute(
  33. text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"),
  34. {'name': name}
  35. ).fetchone()
  36. return result[0] if result else None
  37. def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None):
  38. """
  39. 增强版批量插入(支持覆盖更新)
  40. :param df: 要插入的DataFrame
  41. :param table_name: 目标表名
  42. :param conflict_columns: 冲突检测字段列表
  43. :param update_columns: 需要更新的字段列表
  44. """
  45. if df.empty:
  46. log.info("空数据集,跳过插入")
  47. return
  48. # 生成带参数的SQL模板
  49. columns = ', '.join(df.columns)
  50. placeholders = ', '.join([f":{col}" for col in df.columns])
  51. sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
  52. # 添加ON DUPLICATE KEY UPDATE(MySQL语法)
  53. if conflict_columns and update_columns:
  54. update_set = ', '.join([f"{col}=VALUES({col})" for col in update_columns])
  55. sql += f" ON DUPLICATE KEY UPDATE {update_set}"
  56. # 转换数据为字典列表格式
  57. data = df.to_dict(orient='records')
  58. # print("data:", data)
  59. try:
  60. with self.engine.connect() as conn:
  61. # 显式开启事务
  62. with conn.begin():
  63. # 使用text()包装SQL语句
  64. stmt = text(sql)
  65. # 批量执行
  66. conn.execute(stmt, data)
  67. log.info(f"成功插入/更新 {len(df)} 行到 {table_name}")
  68. except Exception as e:
  69. log.error(f"数据库操作失败: {str(e)}")
  70. raise
  71. def update_january_yoy(self, prov_name='福建省'):
  72. """
  73. 更新指定省份1月份同比数据
  74. :param prov_name: 省份名称,默认为福建省
  75. """
  76. update_sql = text("""
  77. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  78. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  79. ON curr.city_code = prev.city_code
  80. AND prev.crossborder_year_month = DATE_FORMAT(
  81. DATE_SUB(
  82. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  83. INTERVAL 1 YEAR
  84. ),
  85. '%Y-01'
  86. )
  87. SET
  88. curr.yoy_import_export = COALESCE (
  89. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  90. 0.0000
  91. ),
  92. curr.yoy_import = COALESCE (
  93. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  94. 0.0000
  95. ),
  96. curr.yoy_export = COALESCE (
  97. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  98. 0.0000
  99. )
  100. WHERE
  101. curr.prov_name = :prov_name
  102. AND curr.crossborder_year_month LIKE '%-01'
  103. AND curr.crossborder_year_month
  104. > '2023-01'
  105. """)
  106. try:
  107. with self.engine.begin() as conn:
  108. result = conn.execute(update_sql, {'prov_name': prov_name})
  109. log.info(f"Updated {result.rowcount} rows for {prov_name}")
  110. return result.rowcount
  111. except Exception as e:
  112. log.error(f"Update failed: {str(e)}")
  113. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  114. def clear_old_shandong_yoy(self):
  115. """
  116. 清理山东省2024年前数据的同比指标
  117. """
  118. clear_sql = text("""
  119. UPDATE t_yujin_crossborder_prov_region_trade
  120. SET yoy_import_export = 0.0000,
  121. yoy_export = 0.0000,
  122. yoy_import = 0.0000
  123. WHERE prov_name = '山东省'
  124. AND crossborder_year_month < '2024-01'
  125. AND (yoy_import_export != 0
  126. OR yoy_export != 0
  127. OR yoy_import != 0) -- 优化:仅更新非零记录
  128. """)
  129. try:
  130. with self.engine.begin() as conn:
  131. result = conn.execute(clear_sql)
  132. log.info(f"山东省旧数据清零记录数: {result.rowcount}")
  133. return result.rowcount
  134. except Exception as e:
  135. log.error(f"旧数据清零失败: {str(e)}")
  136. raise
  137. def update_shandong_yoy(self):
  138. """
  139. 完整更新山东省同比数据(包含新旧数据处理)
  140. """
  141. try:
  142. # 步骤1:清理旧数据
  143. cleared = self.clear_old_shandong_yoy()
  144. # 步骤2:计算新数据
  145. updated = self._update_shandong_new_yoy()
  146. log.info(f"山东省同比处理完成 | 清零:{cleared} 更新:{updated}")
  147. return {'cleared': cleared, 'updated': updated}
  148. except Exception as e:
  149. log.error("山东省数据处理失败", exc_info=True)
  150. raise
  151. def _update_shandong_new_yoy(self):
  152. """
  153. 处理2024年及之后的山东省数据(内部方法)
  154. """
  155. update_sql = text("""
  156. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  157. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  158. ON curr.city_code = prev.city_code
  159. AND prev.crossborder_year_month = DATE_FORMAT(
  160. DATE_SUB(
  161. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  162. INTERVAL 1 YEAR
  163. ),
  164. '%Y-%m'
  165. )
  166. SET
  167. curr.yoy_import_export = COALESCE (
  168. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  169. 0.0000
  170. ),
  171. curr.yoy_import = COALESCE (
  172. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  173. 0.0000
  174. ),
  175. curr.yoy_export = COALESCE (
  176. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  177. 0.0000
  178. )
  179. WHERE
  180. curr.prov_name = '山东省'
  181. AND curr.crossborder_year_month >= '2024-01'
  182. AND prev.monthly_total IS NOT NULL
  183. """)
  184. with self.engine.begin() as conn:
  185. result = conn.execute(update_sql)
  186. log.info(f"山东省新数据更新数: {result.rowcount}")
  187. return result.rowcount