db_helper.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. from sqlalchemy import create_engine, text
  2. import logging
  3. import pymysql
  4. import pandas as pd
  5. # DB_CONFIG = {
  6. # 'host': '10.130.75.149',
  7. # 'port': 3307,
  8. # 'user': 'yto_crm',
  9. # 'password': '%3sFUlsolaRI',
  10. # 'database': 'crm_uat',
  11. # 'charset': 'utf8mb4'
  12. # }
  13. DB_CONFIG = {
  14. 'host': '10.130.36.185',
  15. 'port': 3306,
  16. 'user': 'user_ytexp',
  17. 'password': 'Rn9ib3L1C4b4%40123',
  18. 'database': 'yto_crm',
  19. 'charset': 'utf8mb4'
  20. }
  21. class DBHelper:
  22. def __init__(self):
  23. self.engine = create_engine(
  24. f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}',
  25. pool_size=5,
  26. max_overflow=10
  27. )
  28. def get_commodity_id(self, name):
  29. """获取商品编码对应的分类ID[1,3](@ref)"""
  30. with self.engine.connect() as conn:
  31. result = conn.execute(
  32. text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"),
  33. {'name': name}
  34. ).fetchone()
  35. return result[0] if result else None
  36. def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None):
  37. """
  38. 增强版批量插入(支持覆盖更新)
  39. :param df: 要插入的DataFrame
  40. :param table_name: 目标表名
  41. :param conflict_columns: 冲突检测字段列表
  42. :param update_columns: 需要更新的字段列表
  43. """
  44. if df.empty:
  45. print("空数据集,跳过插入")
  46. return
  47. # 生成带参数的SQL模板
  48. columns = ', '.join(df.columns)
  49. placeholders = ', '.join([f":{col}" for col in df.columns])
  50. sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
  51. # 添加ON DUPLICATE KEY UPDATE(MySQL语法)
  52. if conflict_columns and update_columns:
  53. update_set = ', '.join([f"{col}=VALUES({col})" for col in update_columns])
  54. sql += f" ON DUPLICATE KEY UPDATE {update_set}"
  55. # 转换数据为字典列表格式
  56. data = df.to_dict(orient='records')
  57. # print("data:", data)
  58. try:
  59. with self.engine.connect() as conn:
  60. # 显式开启事务
  61. with conn.begin():
  62. # 使用text()包装SQL语句
  63. stmt = text(sql)
  64. # 批量执行
  65. conn.execute(stmt, data)
  66. print(f"成功插入/更新 {len(df)} 行到 {table_name}")
  67. except Exception as e:
  68. print(f"数据库操作失败: {str(e)}")
  69. raise
  70. def update_january_yoy(self, prov_name='福建省'):
  71. """
  72. 更新指定省份1月份同比数据
  73. :param prov_name: 省份名称,默认为福建省
  74. """
  75. update_sql = text("""
  76. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  77. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  78. ON curr.city_code = prev.city_code
  79. AND prev.crossborder_year_month = DATE_FORMAT(
  80. DATE_SUB(
  81. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  82. INTERVAL 1 YEAR
  83. ),
  84. '%Y-01'
  85. )
  86. SET
  87. curr.yoy_import_export = COALESCE (
  88. ROUND(
  89. (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
  90. ), 0.0000
  91. ), curr.yoy_import = COALESCE (
  92. ROUND(
  93. (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
  94. ), 0.0000
  95. ), curr.yoy_export = COALESCE (
  96. ROUND(
  97. (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
  98. ), 0.0000
  99. )
  100. WHERE
  101. curr.prov_name = :prov_name
  102. AND curr.crossborder_year_month LIKE '%-01'
  103. AND curr.crossborder_year_month
  104. > '2023-01'
  105. """)
  106. try:
  107. with self.engine.begin() as conn:
  108. result = conn.execute(update_sql, {'prov_name': prov_name})
  109. print(f"Updated {result.rowcount} rows for {prov_name}")
  110. return result.rowcount
  111. except Exception as e:
  112. print(f"Update failed: {str(e)}")
  113. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  114. def clear_old_shandong_yoy(self):
  115. """
  116. 清理山东省2024年前数据的同比指标
  117. """
  118. clear_sql = text("""
  119. UPDATE t_yujin_crossborder_prov_region_trade
  120. SET yoy_import_export = 0.0000,
  121. yoy_export = 0.0000,
  122. yoy_import = 0.0000
  123. WHERE prov_name = '山东省'
  124. AND crossborder_year_month < '2024-01'
  125. AND (yoy_import_export != 0
  126. OR yoy_export != 0
  127. OR yoy_import != 0) -- 优化:仅更新非零记录
  128. """)
  129. try:
  130. with self.engine.begin() as conn:
  131. result = conn.execute(clear_sql)
  132. print(f"山东省旧数据清零记录数: {result.rowcount}")
  133. return result.rowcount
  134. except Exception as e:
  135. self.logger.error(f"旧数据清零失败: {str(e)}")
  136. raise
  137. def update_shandong_yoy(self):
  138. """
  139. 完整更新山东省同比数据(包含新旧数据处理)
  140. """
  141. try:
  142. # 步骤1:清理旧数据
  143. cleared = self.clear_old_shandong_yoy()
  144. # 步骤2:计算新数据
  145. updated = self._update_shandong_new_yoy()
  146. print(f"山东省同比处理完成 | 清零:{cleared} 更新:{updated}")
  147. return {'cleared': cleared, 'updated': updated}
  148. except Exception as e:
  149. print("山东省数据处理失败", exc_info=True)
  150. raise
  151. def _update_shandong_new_yoy(self):
  152. """
  153. 处理2024年及之后的山东省数据(内部方法)
  154. """
  155. update_sql = text("""
  156. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  157. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  158. ON curr.city_code = prev.city_code
  159. AND prev.crossborder_year_month = DATE_FORMAT(
  160. DATE_SUB(
  161. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  162. INTERVAL 1 YEAR
  163. ),
  164. '%Y-%m'
  165. )
  166. SET
  167. curr.yoy_import_export = COALESCE (
  168. ROUND(
  169. (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
  170. ), 0.0000
  171. ), curr.yoy_import = COALESCE (
  172. ROUND(
  173. (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
  174. ), 0.0000
  175. ), curr.yoy_export = COALESCE (
  176. ROUND(
  177. (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
  178. ), 0.0000
  179. )
  180. WHERE
  181. curr.prov_name = '山东省'
  182. AND curr.crossborder_year_month >= '2024-01'
  183. AND prev.monthly_total IS NOT NULL
  184. """)
  185. with self.engine.begin() as conn:
  186. result = conn.execute(update_sql)
  187. print(f"山东省新数据更新数: {result.rowcount}")
  188. return result.rowcount