123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- from sqlalchemy import create_engine, text
- import logging
- import pymysql
- import pandas as pd
- # DB_CONFIG = {
- # 'host': '10.130.75.149',
- # 'port': 3307,
- # 'user': 'yto_crm',
- # 'password': '%3sFUlsolaRI',
- # 'database': 'crm_uat',
- # 'charset': 'utf8mb4'
- # }
- DB_CONFIG = {
- 'host': '10.130.36.185',
- 'port': 3306,
- 'user': 'user_ytexp',
- 'password': 'Rn9ib3L1C4b4%40123',
- 'database': 'yto_crm',
- 'charset': 'utf8mb4'
- }
- class DBHelper:
- def __init__(self):
- self.engine = create_engine(
- f'mysql+pymysql://{DB_CONFIG["user"]}:{DB_CONFIG["password"]}@{DB_CONFIG["host"]}:{DB_CONFIG["port"]}/{DB_CONFIG["database"]}?charset={DB_CONFIG["charset"]}',
- pool_size=5,
- max_overflow=10
- )
- def get_commodity_id(self, name):
- """获取商品编码对应的分类ID[1,3](@ref)"""
- with self.engine.connect() as conn:
- result = conn.execute(
- text("SELECT id FROM t_yujin_crossborder_prov_commodity_category WHERE commodity_name = :name"),
- {'name': name}
- ).fetchone()
- return result[0] if result else None
- def bulk_insert(self, df, table_name, conflict_columns=None, update_columns=None):
- """
- 增强版批量插入(支持覆盖更新)
- :param df: 要插入的DataFrame
- :param table_name: 目标表名
- :param conflict_columns: 冲突检测字段列表
- :param update_columns: 需要更新的字段列表
- """
- if df.empty:
- print("空数据集,跳过插入")
- return
- # 生成带参数的SQL模板
- columns = ', '.join(df.columns)
- placeholders = ', '.join([f":{col}" for col in df.columns])
- sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
- # 添加ON DUPLICATE KEY UPDATE(MySQL语法)
- if conflict_columns and update_columns:
- update_set = ', '.join([f"{col}=VALUES({col})" for col in update_columns])
- sql += f" ON DUPLICATE KEY UPDATE {update_set}"
- # 转换数据为字典列表格式
- data = df.to_dict(orient='records')
- # print("data:", data)
- try:
- with self.engine.connect() as conn:
- # 显式开启事务
- with conn.begin():
- # 使用text()包装SQL语句
- stmt = text(sql)
- # 批量执行
- conn.execute(stmt, data)
- print(f"成功插入/更新 {len(df)} 行到 {table_name}")
- except Exception as e:
- print(f"数据库操作失败: {str(e)}")
- raise
- def update_january_yoy(self, prov_name='福建省'):
- """
- 更新指定省份1月份同比数据
- :param prov_name: 省份名称,默认为福建省
- """
- update_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade AS curr
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
- ON curr.city_code = prev.city_code
- AND prev.crossborder_year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-01'
- )
- SET
- curr.yoy_import_export = COALESCE (
- ROUND(
- (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
- ), 0.0000
- ), curr.yoy_import = COALESCE (
- ROUND(
- (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
- ), 0.0000
- ), curr.yoy_export = COALESCE (
- ROUND(
- (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
- ), 0.0000
- )
- WHERE
- curr.prov_name = :prov_name
- AND curr.crossborder_year_month LIKE '%-01'
- AND curr.crossborder_year_month
- > '2023-01'
- """)
- try:
- with self.engine.begin() as conn:
- result = conn.execute(update_sql, {'prov_name': prov_name})
- print(f"Updated {result.rowcount} rows for {prov_name}")
- return result.rowcount
- except Exception as e:
- print(f"Update failed: {str(e)}")
- raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
- def clear_old_shandong_yoy(self):
- """
- 清理山东省2024年前数据的同比指标
- """
- clear_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade
- SET yoy_import_export = 0.0000,
- yoy_export = 0.0000,
- yoy_import = 0.0000
- WHERE prov_name = '山东省'
- AND crossborder_year_month < '2024-01'
- AND (yoy_import_export != 0
- OR yoy_export != 0
- OR yoy_import != 0) -- 优化:仅更新非零记录
- """)
- try:
- with self.engine.begin() as conn:
- result = conn.execute(clear_sql)
- print(f"山东省旧数据清零记录数: {result.rowcount}")
- return result.rowcount
- except Exception as e:
- self.logger.error(f"旧数据清零失败: {str(e)}")
- raise
- def update_shandong_yoy(self):
- """
- 完整更新山东省同比数据(包含新旧数据处理)
- """
- try:
- # 步骤1:清理旧数据
- cleared = self.clear_old_shandong_yoy()
- # 步骤2:计算新数据
- updated = self._update_shandong_new_yoy()
- print(f"山东省同比处理完成 | 清零:{cleared} 更新:{updated}")
- return {'cleared': cleared, 'updated': updated}
- except Exception as e:
- print("山东省数据处理失败", exc_info=True)
- raise
- def _update_shandong_new_yoy(self):
- """
- 处理2024年及之后的山东省数据(内部方法)
- """
- update_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade AS curr
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
- ON curr.city_code = prev.city_code
- AND prev.crossborder_year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-%m'
- )
- SET
- curr.yoy_import_export = COALESCE (
- ROUND(
- (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
- ), 0.0000
- ), curr.yoy_import = COALESCE (
- ROUND(
- (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
- ), 0.0000
- ), curr.yoy_export = COALESCE (
- ROUND(
- (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
- ), 0.0000
- )
- WHERE
- curr.prov_name = '山东省'
- AND curr.crossborder_year_month >= '2024-01'
- AND prev.monthly_total IS NOT NULL
- """)
- with self.engine.begin() as conn:
- result = conn.execute(update_sql)
- print(f"山东省新数据更新数: {result.rowcount}")
- return result.rowcount
|