db_helper.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. from sqlalchemy import create_engine, text
  2. import logging
  3. import pymysql
  4. import pandas as pd
  5. from utils.log import log
  6. DB_CONFIG = {
  7. 'host': '10.130.75.149',
  8. 'port': 3307,
  9. 'user': 'yto_crm',
  10. 'password': '%3sFUlsolaRI',
  11. 'database': 'crm_uat',
  12. 'charset': 'utf8mb4'
  13. }
  14. # DB_CONFIG = {
  15. # 'host': '10.130.36.185',
  16. # 'port': 3306,
  17. # 'user': 'user_ytexp',
  18. # 'password': 'Rn9ib3L1C4b4%40123',
  19. # 'database': 'yto_crm',
  20. # 'charset': 'utf8mb4'
  21. # }
  22. class DBHelper:
  23. def __init__(self):
  24. self.engine = create_engine(
  25. f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}',
  26. pool_size=5,
  27. max_overflow=10
  28. )
  29. def get_commodity_id(self, name):
  30. """获取商品编码对应的分类ID[1,3](@ref)"""
  31. with self.engine.connect() as conn:
  32. result = conn.execute(
  33. text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"),
  34. {'name': name}
  35. ).fetchone()
  36. return result[0] if result else None
  37. def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None):
  38. """
  39. 增强版批量插入(支持覆盖更新)
  40. :param df: 要插入的DataFrame
  41. :param table_name: 目标表名
  42. :param conflict_columns: 冲突检测字段列表
  43. :param update_columns: 需要更新的字段列表
  44. """
  45. if df.empty:
  46. log.info("空数据集,跳过插入")
  47. return
  48. # 生成带参数的SQL模板
  49. columns = ', '.join(df.columns)
  50. placeholders = ', '.join([f":{col}" for col in df.columns])
  51. sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
  52. # 添加ON DUPLICATE KEY UPDATE(MySQL语法)
  53. if conflict_columns and update_columns:
  54. update_set = ', '.join([f"{col}=VALUES({col})" for col in update_columns])
  55. sql += f" ON DUPLICATE KEY UPDATE {update_set}"
  56. # 转换数据为字典列表格式
  57. data = df.to_dict(orient='records')
  58. # print("data:", data)
  59. try:
  60. with self.engine.connect() as conn:
  61. # 显式开启事务
  62. with conn.begin():
  63. # 使用text()包装SQL语句
  64. stmt = text(sql)
  65. # 批量执行
  66. conn.execute(stmt, data)
  67. log.info(f"成功插入/更新 {len(df)} 行到 {table_name}")
  68. except Exception as e:
  69. log.error(f"数据库操作失败: {str(e)}")
  70. raise
  71. def update_january_yoy(self, prov_name='福建省'):
  72. """
  73. 更新指定省份1月份同比数据
  74. :param prov_name: 省份名称,默认为福建省
  75. """
  76. update_sql = text("""
  77. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  78. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  79. ON curr.city_code = prev.city_code
  80. AND prev.crossborder_year_month = DATE_FORMAT(
  81. DATE_SUB(
  82. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  83. INTERVAL 1 YEAR
  84. ),
  85. '%Y-01'
  86. )
  87. SET
  88. curr.yoy_import_export = COALESCE (
  89. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  90. 0.0000
  91. ),
  92. curr.yoy_import = COALESCE (
  93. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  94. 0.0000
  95. ),
  96. curr.yoy_export = COALESCE (
  97. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  98. 0.0000
  99. )
  100. WHERE
  101. curr.prov_name = :prov_name
  102. AND curr.crossborder_year_month LIKE '%-01'
  103. AND curr.crossborder_year_month
  104. > '2023-01'
  105. """)
  106. try:
  107. with self.engine.begin() as conn:
  108. result = conn.execute(update_sql, {'prov_name': prov_name})
  109. log.info(f"Updated {result.rowcount} rows for {prov_name}")
  110. return result.rowcount
  111. except Exception as e:
  112. log.error(f"Update failed: {str(e)}")
  113. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  114. def update_prov_yoy(self, prov_name):
  115. """
  116. 完整更新山东省同比数据(包含新旧数据处理)
  117. """
  118. try:
  119. # 步骤1:清理旧数据
  120. cleared = self.clear_old_prov_yoy(prov_name)
  121. # 步骤2:计算新数据
  122. updated = self._update_prov_new_yoy(prov_name)
  123. log.info(f"山东省同比处理完成 | 清零:{cleared} 更新:{updated}")
  124. return {'cleared': cleared, 'updated': updated}
  125. except Exception as e:
  126. log.error(f"{prov_name}数据处理失败", exc_info=True)
  127. raise
  128. def clear_old_prov_yoy(self, prov_name):
  129. """
  130. 清理指定省份2024年前数据的同比指标
  131. """
  132. clear_sql = text("""
  133. UPDATE t_yujin_crossborder_prov_region_trade
  134. SET yoy_import_export = 0.0000,
  135. yoy_export = 0.0000,
  136. yoy_import = 0.0000
  137. WHERE prov_name = :prov_name
  138. AND crossborder_year_month < '2024-01'
  139. AND (yoy_import_export != 0
  140. OR yoy_export != 0
  141. OR yoy_import != 0) -- 优化:仅更新非零记录
  142. """)
  143. try:
  144. with self.engine.begin() as conn:
  145. result = conn.execute(clear_sql, {'prov_name': prov_name})
  146. log.info(f"{prov_name}旧数据清零记录数: {result.rowcount}")
  147. return result.rowcount
  148. except Exception as e:
  149. log.error(f"旧数据清零失败: {str(e)}")
  150. raise
  151. def _update_prov_new_yoy(self,prov_name):
  152. """
  153. 更新2024年及之后的省份城市同比数据
  154. """
  155. update_sql = text("""
  156. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  157. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  158. ON curr.city_code = prev.city_code
  159. AND prev.crossborder_year_month = DATE_FORMAT(
  160. DATE_SUB(
  161. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  162. INTERVAL 1 YEAR
  163. ),
  164. '%Y-%m'
  165. )
  166. SET
  167. curr.yoy_import_export = COALESCE (
  168. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  169. 0.0000
  170. ),
  171. curr.yoy_import = COALESCE (
  172. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  173. 0.0000
  174. ),
  175. curr.yoy_export = COALESCE (
  176. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  177. 0.0000
  178. )
  179. WHERE
  180. curr.prov_name = :prov_name
  181. AND curr.crossborder_year_month >= '2024-01'
  182. AND prev.monthly_total IS NOT NULL
  183. """)
  184. with self.engine.begin() as conn:
  185. result = conn.execute(update_sql, {'prov_name': prov_name})
  186. log.info(f"{prov_name}新数据更新数: {result.rowcount}")
  187. return result.rowcount
  188. def query(self, sql, params=None, return_df=True):
  189. """
  190. 执行SQL查询并返回结果
  191. :param sql: SQL查询语句
  192. :param params: 查询参数
  193. :param return_df: 是否返回DataFrame,False则返回原始结果
  194. :return: 查询结果(DataFrame或列表)
  195. """
  196. try:
  197. with self.engine.connect() as conn:
  198. if return_df:
  199. # 使用pandas直接读取为DataFrame
  200. result = pd.read_sql(sql, conn, params=params)
  201. log.info(f"查询成功,返回 {len(result)} 条记录")
  202. return result
  203. else:
  204. # 返回原始结果
  205. result = conn.execute(sql, params or {}).fetchall()
  206. log.info(f"查询成功,返回 {len(result)} 条记录")
  207. return result
  208. except Exception as e:
  209. log.error(f"查询失败: {str(e)}")
  210. raise