from sqlalchemy import create_engine, text import logging import pymysql import pandas as pd # DB_CONFIG = { # 'host': '10.130.75.149', # 'port': 3307, # 'user': 'yto_crm', # 'password': '%3sFUlsolaRI', # 'database': 'crm_uat', # 'charset': 'utf8mb4' # } DB_CONFIG = { 'host': '10.130.36.185', 'port': 3306, 'user': 'user_ytexp', 'password': 'Rn9ib3L1C4b4%40123', 'database': 'yto_crm', 'charset': 'utf8mb4' } class DBHelper: def __init__(self): self.engine = create_engine( f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}', pool_size=5, max_overflow=10 ) def get_commodity_id(self, name): """获取商品编码对应的分类ID[1,3](@ref)""" with self.engine.connect() as conn: result = conn.execute( text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"), {'name': name} ).fetchone() return result[0] if result else None def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None): """ 增强版批量插入(支持覆盖更新) :param df: 要插入的DataFrame :param table_name: 目标表名 :param conflict_columns: 冲突检测字段列表 :param update_columns: 需要更新的字段列表 """ if df.empty: print("空数据集,跳过插入") return # 生成带参数的SQL模板 columns = ', '.join(df.columns) placeholders = ', '.join([f":{col}" for col in df.columns]) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 添加ON DUPLICATE KEY UPDATE(MySQL语法) if conflict_columns and update_columns: update_set = ', '.join([f"{col}=VALUES({col})" for col in update_columns]) sql += f" ON DUPLICATE KEY UPDATE {update_set}" # 转换数据为字典列表格式 data = df.to_dict(orient='records') # print("data:", data) try: with self.engine.connect() as conn: # 显式开启事务 with conn.begin(): # 使用text()包装SQL语句 stmt = text(sql) # 批量执行 conn.execute(stmt, data) print(f"成功插入/更新 {len(df)} 行到 {table_name}") except Exception as e: print(f"数据库操作失败: {str(e)}") raise def update_january_yoy(self, prov_name='福建省'): """ 更新指定省份1月份同比数据 :param prov_name: 省份名称,默认为福建省 """ update_sql = text(""" UPDATE t_yujin_crossborder_prov_region_trade AS curr INNER JOIN t_yujin_crossborder_prov_region_trade AS prev ON curr.city_code = prev.city_code AND prev.crossborder_year_month = DATE_FORMAT( DATE_SUB( STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'), INTERVAL 1 YEAR ), '%Y-01' ) SET curr.yoy_import_export = COALESCE ( ROUND( (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4 ), 0.0000 ), curr.yoy_import = COALESCE ( ROUND( (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4 ), 0.0000 ), curr.yoy_export = COALESCE ( ROUND( (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4 ), 0.0000 ) WHERE curr.prov_name = :prov_name AND curr.crossborder_year_month LIKE '%-01' AND curr.crossborder_year_month > '2023-01' """) try: with self.engine.begin() as conn: result = conn.execute(update_sql, {'prov_name': prov_name}) print(f"Updated {result.rowcount} rows for {prov_name}") return result.rowcount except Exception as e: print(f"Update failed: {str(e)}") raise RuntimeError(f"同比数据更新失败: {str(e)}") from e def clear_old_shandong_yoy(self): """ 清理山东省2024年前数据的同比指标 """ clear_sql = text(""" UPDATE t_yujin_crossborder_prov_region_trade SET yoy_import_export = 0.0000, yoy_export = 0.0000, yoy_import = 0.0000 WHERE prov_name = '山东省' AND crossborder_year_month < '2024-01' AND (yoy_import_export != 0 OR yoy_export != 0 OR yoy_import != 0) -- 优化:仅更新非零记录 """) try: with self.engine.begin() as conn: result = conn.execute(clear_sql) print(f"山东省旧数据清零记录数: {result.rowcount}") return result.rowcount except Exception as e: self.logger.error(f"旧数据清零失败: {str(e)}") raise def update_shandong_yoy(self): """ 完整更新山东省同比数据(包含新旧数据处理) """ try: # 步骤1:清理旧数据 cleared = self.clear_old_shandong_yoy() # 步骤2:计算新数据 updated = self._update_shandong_new_yoy() print(f"山东省同比处理完成 | 清零:{cleared} 更新:{updated}") return {'cleared': cleared, 'updated': updated} except Exception as e: print("山东省数据处理失败", exc_info=True) raise def _update_shandong_new_yoy(self): """ 处理2024年及之后的山东省数据(内部方法) """ update_sql = text(""" UPDATE t_yujin_crossborder_prov_region_trade AS curr INNER JOIN t_yujin_crossborder_prov_region_trade AS prev ON curr.city_code = prev.city_code AND prev.crossborder_year_month = DATE_FORMAT( DATE_SUB( STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'), INTERVAL 1 YEAR ), '%Y-%m' ) SET curr.yoy_import_export = COALESCE ( ROUND( (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4 ), 0.0000 ), curr.yoy_import = COALESCE ( ROUND( (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4 ), 0.0000 ), curr.yoy_export = COALESCE ( ROUND( (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4 ), 0.0000 ) WHERE curr.prov_name = '山东省' AND curr.crossborder_year_month >= '2024-01' AND prev.monthly_total IS NOT NULL """) with self.engine.begin() as conn: result = conn.execute(update_sql) print(f"山东省新数据更新数: {result.rowcount}") return result.rowcount