base_mysql.py 11 KB


  1. import pymysql
  2. from sqlalchemy import create_engine, text
  3. from urllib.parse import quote_plus
  4. from utils.log import log
  5. # 数据库配置
  6. DB_CONFIG = {
  7. 'host': '10.130.75.149',
  8. 'port': 3307,
  9. 'user': 'yto_crm',
  10. 'password': '%3sFUlsolaRI',
  11. 'database': 'crm_uat',
  12. 'charset': 'utf8mb4'
  13. }
  14. def get_commodity_id(commodity_name):
  15. """根据商品名称查询数据库,获取商品 ID 和商品名称"""
  16. fix_commodity_name = commodity_name
  17. if commodity_name.endswith(")") or commodity_name.endswith(")"):
  18. fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0]
  19. fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '')
  20. try:
  21. # 连接数据库
  22. connection = pymysql.connect(**DB_CONFIG)
  23. with connection.cursor() as cursor:
  24. # 执行查询
  25. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s"
  26. cursor.execute(sql, (f"{fix_commodity_name}%",))
  27. result = cursor.fetchall()
  28. if result:
  29. if len(result) == 1:
  30. return result[0][0], result[0][1]
  31. else:
  32. log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
  33. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  34. cursor.execute(sql, (f"{fix_commodity_name}",))
  35. result = cursor.fetchone()
  36. if not result:
  37. # 用原商品名称再查一次
  38. commodity_name = commodity_name.replace("(", "(").replace(")", ")")
  39. log.info(f"原商品名称查询,commodity_name:{commodity_name}")
  40. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  41. cursor.execute(sql, (f"{commodity_name}",))
  42. result = cursor.fetchone()
  43. if result:
  44. return result[0], result[1]
  45. else:
  46. return None, None
  47. else:
  48. return result[0], result[1]
  49. else:
  50. return None, None
  51. except Exception as e:
  52. log.info(f"查询数据库时发生异常: {str(e)}")
  53. return None, None
  54. finally:
  55. if connection:
  56. connection.close()
  57. def get_hs_all():
  58. try:
  59. # 连接数据库
  60. connection = pymysql.connect(**DB_CONFIG)
  61. with connection.cursor() as cursor:
  62. # 执行查询
  63. sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e"
  64. cursor.execute(sql)
  65. all_records = cursor.fetchall()
  66. if all_records:
  67. return all_records
  68. else:
  69. return None
  70. except Exception as e:
  71. log.info(f"查询数据库时发生异常: {str(e)}")
  72. return None
  73. finally:
  74. if connection:
  75. connection.close()
  76. def get_code_exist(crossborder_year_month, prov_code):
  77. try:
  78. # 使用 with 自动管理连接生命周期
  79. with pymysql.connect(**DB_CONFIG) as connection:
  80. with connection.cursor() as cursor:
  81. # 执行查询
  82. sql = """
  83. SELECT COUNT(1)
  84. FROM t_yujin_crossborder_prov_commodity_trade e
  85. WHERE e.crossborder_year_month = %s
  86. AND e.prov_code = %s
  87. """
  88. cursor.execute(sql, (crossborder_year_month, prov_code))
  89. result = cursor.fetchone()
  90. return int(result[0]) if result and result[0] else 0
  91. except Exception as e:
  92. log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}")
  93. return 0
  94. # 对密码进行 URL 编码
  95. encoded_password = quote_plus(DB_CONFIG["password"])
  96. # 构建 SQLAlchemy 引擎
  97. engine = create_engine(
  98. f"mysql+pymysql://{DB_CONFIG['user']}:{encoded_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}",
  99. pool_size=5,
  100. max_overflow=10
  101. )
  102. def bulk_insert(sql_statements):
  103. """
  104. 批量执行 SQL 插入语句
  105. :param sql_statements: 包含多个 INSERT 语句的列表
  106. """
  107. if not sql_statements:
  108. log.info("未提供有效的 SQL 插入语句,跳过操作")
  109. return
  110. try:
  111. with engine.connect() as conn:
  112. with conn.begin():
  113. for sql in sql_statements:
  114. stmt = text(sql.strip())
  115. conn.execute(stmt)
  116. log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
  117. except Exception as e:
  118. log.info(f"数据库操作失败: {str(e)}")
  119. raise
  120. def update_january_yoy(prov_name):
  121. """
  122. 更新指定省份1月份同比数据
  123. :param prov_name: 省份名称,默认为福建省
  124. """
  125. update_sql = text("""
  126. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  127. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  128. ON curr.city_code = prev.city_code
  129. AND prev.crossborder_year_month = DATE_FORMAT(
  130. DATE_SUB(
  131. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  132. INTERVAL 1 YEAR
  133. ),
  134. '%Y-01'
  135. )
  136. SET
  137. curr.yoy_import_export = COALESCE (
  138. ROUND(
  139. (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
  140. ), 0.0000
  141. ), curr.yoy_import = COALESCE (
  142. ROUND(
  143. (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
  144. ), 0.0000
  145. ), curr.yoy_export = COALESCE (
  146. ROUND(
  147. (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
  148. ), 0.0000
  149. )
  150. WHERE
  151. curr.prov_name = :prov_name
  152. AND curr.crossborder_year_month LIKE '%-01'
  153. AND curr.crossborder_year_month
  154. > '2023-01'
  155. """)
  156. try:
  157. with engine.begin() as conn:
  158. result = conn.execute(update_sql, {'prov_name': prov_name})
  159. log.info(f"Updated {result.rowcount} rows for {prov_name}")
  160. return result.rowcount
  161. except Exception as e:
  162. log.info(f"Update failed: {str(e)}")
  163. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  164. def clear_old_shandong_yoy(prov_name):
  165. """
  166. 清理山东省2024年前数据的同比指标
  167. """
  168. clear_sql = text("""
  169. UPDATE t_yujin_crossborder_prov_region_trade
  170. SET yoy_import_export = 0.0000,
  171. yoy_export = 0.0000,
  172. yoy_import = 0.0000
  173. WHERE prov_name = :prov_name
  174. AND crossborder_year_month < '2024-01'
  175. AND (yoy_import_export != 0
  176. OR yoy_export != 0
  177. OR yoy_import != 0) -- 优化:仅更新非零记录
  178. """)
  179. try:
  180. with engine.begin() as conn:
  181. result = conn.execute(clear_sql, {'prov_name': prov_name})
  182. log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
  183. return result.rowcount
  184. except Exception as e:
  185. log.info(f"旧数据清零失败: {str(e)}")
  186. raise
  187. def update_shandong_yoy(prov_name):
  188. """
  189. 完整更新山东省同比数据(包含新旧数据处理)
  190. """
  191. try:
  192. # 步骤1:清理旧数据
  193. cleared = clear_old_shandong_yoy(prov_name)
  194. # 步骤2:计算新数据
  195. updated = _update_shandong_new_yoy(prov_name)
  196. log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
  197. return {'cleared': cleared, 'updated': updated}
  198. except Exception as e:
  199. log.info("{prov_name} 数据处理失败", exc_info=True)
  200. raise
  201. def _update_shandong_new_yoy(prov_name):
  202. """
  203. 处理2024年及之后的山东省数据(内部方法)
  204. """
  205. update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr
  206. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  207. ON curr.city_code = prev.city_code
  208. AND prev.crossborder_year_month = DATE_FORMAT(
  209. DATE_SUB(
  210. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  211. INTERVAL 1 YEAR
  212. ),
  213. '%Y-%m'
  214. )
  215. SET
  216. curr.yoy_import_export = COALESCE (
  217. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  218. 0.0000
  219. ),
  220. curr.yoy_import = COALESCE (
  221. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  222. 0.0000
  223. ),
  224. curr.yoy_export = COALESCE (
  225. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  226. 0.0000
  227. )
  228. WHERE
  229. curr.prov_name = :prov_name
  230. AND curr.crossborder_year_month >= '2024-01'
  231. AND prev.monthly_total IS NOT NULL
  232. """)
  233. with engine.begin() as conn:
  234. result = conn.execute(update_sql, {'prov_name': prov_name})
  235. log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
  236. return result.rowcount
  237. if __name__ == '__main__':
  238. check_year, check_month = 2024, 4
  239. count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
  240. print(count)
  241. # update_january_yoy('浙江省')
  242. # update_shandong_yoy('浙江省')
  243. # log.info("同比sql处理完成")