123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- from sqlalchemy import create_engine, text
- import logging
- import pymysql
- import pandas as pd
- from utils.log import log
- DB_CONFIG = {
- 'host': '10.130.75.149',
- 'port': 3307,
- 'user': 'yto_crm',
- 'password': '%3sFUlsolaRI',
- 'database': 'crm_uat',
- 'charset': 'utf8mb4'
- }
- # DB_CONFIG = {
- # 'host': '10.130.36.185',
- # 'port': 3306,
- # 'user': 'user_ytexp',
- # 'password': 'Rn9ib3L1C4b4%40123',
- # 'database': 'yto_crm',
- # 'charset': 'utf8mb4'
- # }
- class DBHelper:
- def __init__(self):
- self.engine = create_engine(
- f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}',
- pool_size=5,
- max_overflow=10
- )
- def get_commodity_id(self, name):
- """获取商品编码对应的分类ID[1,3](@ref)"""
- with self.engine.connect() as conn:
- result = conn.execute(
- text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"),
- {'name': name}
- ).fetchone()
- return result[0] if result else None
- def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None):
- """
- 增强版批量插入(支持覆盖更新)
- :param df: 要插入的DataFrame
- :param table_name: 目标表名
- :param conflict_columns: 冲突检测字段列表
- :param update_columns: 需要更新的字段列表
- """
- if df.empty:
- log.info("空数据集,跳过插入")
- return
- # 生成带参数的SQL模板
- columns = ', '.join(df.columns)
- placeholders = ', '.join([f":{col}" for col in df.columns])
- sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
- # 添加ON DUPLICATE KEY UPDATE(MySQL语法)
- if conflict_columns and update_columns:
- update_set = ', '.join([f"{col}=VALUES({col})" for col in update_columns])
- sql += f" ON DUPLICATE KEY UPDATE {update_set}"
- # 转换数据为字典列表格式
- data = df.to_dict(orient='records')
- # print("data:", data)
- try:
- with self.engine.connect() as conn:
- # 显式开启事务
- with conn.begin():
- # 使用text()包装SQL语句
- stmt = text(sql)
- # 批量执行
- conn.execute(stmt, data)
- log.info(f"成功插入/更新 {len(df)} 行到 {table_name}")
- except Exception as e:
- log.error(f"数据库操作失败: {str(e)}")
- raise
- def update_january_yoy(self, prov_name='福建省'):
- """
- 更新指定省份1月份同比数据
- :param prov_name: 省份名称,默认为福建省
- """
- update_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade AS curr
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
- ON curr.city_code = prev.city_code
- AND prev.crossborder_year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-01'
- )
- SET
- curr.yoy_import_export = COALESCE (
- TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
- 0.0000
- ),
- curr.yoy_import = COALESCE (
- TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
- 0.0000
- ),
- curr.yoy_export = COALESCE (
- TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
- 0.0000
- )
- WHERE
- curr.prov_name = :prov_name
- AND curr.crossborder_year_month LIKE '%-01'
- AND curr.crossborder_year_month
- > '2023-01'
- """)
- try:
- with self.engine.begin() as conn:
- result = conn.execute(update_sql, {'prov_name': prov_name})
- log.info(f"Updated {result.rowcount} rows for {prov_name}")
- return result.rowcount
- except Exception as e:
- log.error(f"Update failed: {str(e)}")
- raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
- def update_prov_yoy(self, prov_name):
- """
- 完整更新山东省同比数据(包含新旧数据处理)
- """
- try:
- # 步骤1:清理旧数据
- cleared = self.clear_old_prov_yoy(prov_name)
- # 步骤2:计算新数据
- updated = self._update_prov_new_yoy(prov_name)
- log.info(f"{prov_name}同比处理完成 | 清零:{cleared} 更新:{updated}")
- return {'cleared': cleared, 'updated': updated}
- except Exception as e:
- log.error(f"{prov_name}数据处理失败", exc_info=True)
- raise
- def clear_old_prov_yoy(self, prov_name):
- """
- 清理指定省份2024年前数据的同比指标
- """
- clear_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade
- SET yoy_import_export = null,
- yoy_export = null,
- yoy_import = null
- WHERE prov_name = :prov_name
- AND crossborder_year_month < '2024-01'
- AND (yoy_import_export != 0
- OR yoy_export != 0
- OR yoy_import != 0) -- 优化:仅更新非零记录
- """)
- try:
- with self.engine.begin() as conn:
- result = conn.execute(clear_sql, {'prov_name': prov_name})
- log.info(f"{prov_name}旧数据清零记录数: {result.rowcount}")
- return result.rowcount
- except Exception as e:
- log.error(f"旧数据清零失败: {str(e)}")
- raise
- def _update_prov_new_yoy(self,prov_name):
- """
- 更新2024年及之后的省份城市同比数据
- """
- update_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade AS curr
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
- ON curr.city_code = prev.city_code
- AND prev.crossborder_year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-%m'
- )
- SET
- curr.yoy_import_export = COALESCE (
- TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
- 0.0000
- ),
- curr.yoy_import = COALESCE (
- TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
- 0.0000
- ),
- curr.yoy_export = COALESCE (
- TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
- 0.0000
- )
- WHERE
- curr.prov_name = :prov_name
- AND curr.crossborder_year_month >= '2024-01'
- AND prev.monthly_total IS NOT NULL
- """)
- with self.engine.begin() as conn:
- result = conn.execute(update_sql, {'prov_name': prov_name})
- log.info(f"{prov_name}新数据更新数: {result.rowcount}")
- return result.rowcount
- def query(self, sql, params=None, return_df=True):
- """
- 执行SQL查询并返回结果
- :param sql: SQL查询语句
- :param params: 查询参数
- :param return_df: 是否返回DataFrame,False则返回原始结果
- :return: 查询结果(DataFrame或列表)
- """
- try:
- with self.engine.connect() as conn:
- if return_df:
- # 使用pandas直接读取为DataFrame
- result = pd.read_sql(sql, conn, params=params)
- log.info(f"查询成功,返回 {len(result)} 条记录")
- return result
- else:
- # 返回原始结果
- result = conn.execute(sql, params or {}).fetchall()
- log.info(f"查询成功,返回 {len(result)} 条记录")
- return result
- except Exception as e:
- log.error(f"查询失败: {str(e)}")
- raise
|