db_helper.py 9.9 KB


  1. from sqlalchemy import create_engine, text
  2. import logging
  3. import pymysql
  4. import pandas as pd
  5. from utils.log import log
  6. DB_CONFIG = {
  7. 'host': '10.130.75.149',
  8. 'port': 3307,
  9. 'user': 'yto_crm',
  10. 'password': '%3sFUlsolaRI',
  11. 'database': 'crm_uat',
  12. 'charset': 'utf8mb4'
  13. }
  14. # DB_CONFIG = {
  15. # 'host': '10.130.36.185',
  16. # 'port': 3306,
  17. # 'user': 'user_ytexp',
  18. # 'password': 'Rn9ib3L1C4b4%40123',
  19. # 'database': 'yto_crm',
  20. # 'charset': 'utf8mb4'
  21. # }
  22. class DBHelper:
  23. def __init__(self):
  24. self.engine = create_engine(
  25. f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}',
  26. pool_size=5,
  27. max_overflow=10
  28. )
  29. def get_commodity_id(self, name):
  30. """获取商品编码对应的分类ID[1,3](@ref)"""
  31. with self.engine.connect() as conn:
  32. result = conn.execute(
  33. text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"),
  34. {'name': name}
  35. ).fetchone()
  36. return result[0] if result else None
  37. def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None):
  38. """
  39. 增强版批量插入(支持覆盖更新)
  40. :param df: 要插入的DataFrame
  41. :param table_name: 目标表名
  42. :param conflict_columns: 冲突检测字段列表
  43. :param update_columns: 需要更新的字段列表
  44. """
  45. if df.empty:
  46. log.info("空数据集,跳过插入")
  47. return
  48. # 生成带参数的SQL模板
  49. columns = ', '.join(df.columns)
  50. placeholders = ', '.join([f":{col}" for col in df.columns])
  51. sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
  52. # 添加ON DUPLICATE KEY UPDATE(MySQL语法)
  53. if conflict_columns and update_columns:
  54. # 1. 处理用户指定的更新字段
  55. update_clauses = [f"{col}=VALUES({col})" for col in update_columns]
  56. # 2. 强制添加create_time=NOW()
  57. update_clauses.append("create_time = NOW()") # 新增
  58. # 3. 合并所有更新条件
  59. update_set = ', '.join(update_clauses)
  60. sql += f" ON DUPLICATE KEY UPDATE {update_set}"
  61. # 转换数据为字典列表格式
  62. data = df.to_dict(orient='records')
  63. # print("data:", data)
  64. try:
  65. with self.engine.connect() as conn:
  66. # 显式开启事务
  67. with conn.begin():
  68. # 使用text()包装SQL语句
  69. stmt = text(sql)
  70. # 批量执行
  71. conn.execute(stmt, data)
  72. log.info(f"成功插入/更新 {len(df)} 行到 {table_name}")
  73. except Exception as e:
  74. log.error(f"数据库操作失败: {str(e)}")
  75. raise
  76. def update_january_yoy(self, prov_name='福建省'):
  77. """
  78. 更新指定省份1月份同比数据
  79. :param prov_name: 省份名称,默认为福建省
  80. """
  81. update_sql = text("""
  82. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  83. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  84. ON curr.city_code = prev.city_code
  85. AND prev.crossborder_year_month = DATE_FORMAT(
  86. DATE_SUB(
  87. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  88. INTERVAL 1 YEAR
  89. ),
  90. '%Y-01'
  91. )
  92. SET
  93. curr.yoy_import_export = COALESCE (
  94. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  95. 0.0000
  96. ),
  97. curr.yoy_import = COALESCE (
  98. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  99. 0.0000
  100. ),
  101. curr.yoy_export = COALESCE (
  102. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  103. 0.0000
  104. )
  105. WHERE
  106. curr.prov_name = :prov_name
  107. AND curr.crossborder_year_month LIKE '%-01'
  108. AND curr.crossborder_year_month
  109. > '2023-01'
  110. """)
  111. try:
  112. with self.engine.begin() as conn:
  113. result = conn.execute(update_sql, {'prov_name': prov_name})
  114. log.info(f"Updated {result.rowcount} rows for {prov_name}")
  115. return result.rowcount
  116. except Exception as e:
  117. log.error(f"Update failed: {str(e)}")
  118. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  119. def update_prov_yoy(self, prov_name):
  120. """
  121. 完整更新山东省同比数据(包含新旧数据处理)
  122. """
  123. try:
  124. # 步骤1:清理旧数据
  125. cleared = self.clear_old_prov_yoy(prov_name)
  126. # 步骤2:计算新数据
  127. updated = self._update_prov_new_yoy(prov_name)
  128. log.info(f"{prov_name}同比处理完成 | 清零:{cleared} 更新:{updated}")
  129. return {'cleared': cleared, 'updated': updated}
  130. except Exception as e:
  131. log.error(f"{prov_name}数据处理失败", exc_info=True)
  132. raise
  133. def clear_old_prov_yoy(self, prov_name):
  134. """
  135. 清理指定省份2024年前数据的同比指标
  136. """
  137. clear_sql = text("""
  138. UPDATE t_yujin_crossborder_prov_region_trade
  139. SET yoy_import_export = null,
  140. yoy_export = null,
  141. yoy_import = null
  142. WHERE prov_name = :prov_name
  143. AND crossborder_year_month < '2024-01'
  144. AND (yoy_import_export != 0
  145. OR yoy_export != 0
  146. OR yoy_import != 0) -- 优化:仅更新非零记录
  147. """)
  148. try:
  149. with self.engine.begin() as conn:
  150. result = conn.execute(clear_sql, {'prov_name': prov_name})
  151. log.info(f"{prov_name}旧数据清零记录数: {result.rowcount}")
  152. return result.rowcount
  153. except Exception as e:
  154. log.error(f"旧数据清零失败: {str(e)}")
  155. raise
  156. def _update_prov_new_yoy(self,prov_name):
  157. """
  158. 更新2024年及之后的省份城市同比数据
  159. """
  160. update_sql = text("""
  161. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  162. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  163. ON curr.city_code = prev.city_code
  164. AND prev.crossborder_year_month = DATE_FORMAT(
  165. DATE_SUB(
  166. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  167. INTERVAL 1 YEAR
  168. ),
  169. '%Y-%m'
  170. )
  171. SET
  172. curr.yoy_import_export = COALESCE (
  173. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  174. 0.0000
  175. ),
  176. curr.yoy_import = COALESCE (
  177. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  178. 0.0000
  179. ),
  180. curr.yoy_export = COALESCE (
  181. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  182. 0.0000
  183. )
  184. WHERE
  185. curr.prov_name = :prov_name
  186. AND curr.crossborder_year_month >= '2024-01'
  187. AND prev.monthly_total IS NOT NULL
  188. """)
  189. with self.engine.begin() as conn:
  190. result = conn.execute(update_sql, {'prov_name': prov_name})
  191. log.info(f"{prov_name}新数据更新数: {result.rowcount}")
  192. return result.rowcount
  193. def query(self, sql, params=None, return_df=True):
  194. """
  195. 执行SQL查询并返回结果
  196. :param sql: SQL查询语句
  197. :param params: 查询参数
  198. :param return_df: 是否返回DataFrame,False则返回原始结果
  199. :return: 查询结果(DataFrame或列表)
  200. """
  201. try:
  202. with self.engine.connect() as conn:
  203. if return_df:
  204. # 使用pandas直接读取为DataFrame
  205. result = pd.read_sql(sql, conn, params=params)
  206. log.info(f"查询成功,返回 {len(result)} 条记录")
  207. return result
  208. else:
  209. # 返回原始结果
  210. result = conn.execute(sql, params or {}).fetchall()
  211. log.info(f"查询成功,返回 {len(result)} 条记录")
  212. return result
  213. except Exception as e:
  214. log.error(f"查询失败: {str(e)}")
  215. raise