zhangfan 1 هفته پیش
والد
کامیت
e5533c9938
1فایلهای تغییر یافته به همراه243 افزوده شده و 0 حذف شده
  1. 243 0
      utils/base_mysql.py

+ 243 - 0
utils/base_mysql.py

@@ -0,0 +1,243 @@
+import pymysql
+from sqlalchemy import create_engine, text
+from urllib.parse import quote_plus
+
+# 数据库配置
+DB_CONFIG = {
+    "host": "10.130.36.185",
+    "port": 3306,
+    "user": "user_ytexp",
+    "password": "Rn9ib3L1C4b4@123",
+    "database": "yto_crm",
+    "charset": "utf8mb4"
+    # "cursorclass": pymysql.cursors.DictCursor
+}
+
+def get_commodity_id(commodity_name):
+    """根据商品名称查询数据库,获取商品 ID 和商品名称"""
+    fix_commodity_name = commodity_name
+    if commodity_name.endswith(")") or commodity_name.endswith(")"):
+        fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0]
+    fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '')
+
+    try:
+        # 连接数据库
+        connection = pymysql.connect(**DB_CONFIG)
+        with connection.cursor() as cursor:
+            # 执行查询
+            sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s"
+            cursor.execute(sql, (f"{fix_commodity_name}%",))
+            result = cursor.fetchall()
+            if result:
+                if len(result) == 1:
+                    return result[0][0], result[0][1]
+                else:
+                    print(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
+                    sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
+                    cursor.execute(sql, (f"{fix_commodity_name}",))
+                    result = cursor.fetchone()
+                    if not result:
+                        # 用原商品名称再查一次
+                        commodity_name = commodity_name.replace("(", "(").replace(")", ")")
+                        print(f"原商品名称查询,commodity_name:{commodity_name}")
+                        sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
+                        cursor.execute(sql, (f"{commodity_name}",))
+                        result = cursor.fetchone()
+                        if result:
+                            return result[0], result[1]
+                        else:
+                            return None, None
+                    else:
+                        return result[0], result[1]
+            else:
+                return None, None
+    except Exception as e:
+        print(f"查询数据库时发生异常: {str(e)}")
+        return None, None
+    finally:
+        if connection:
+            connection.close()
+
+
+def get_hs_all():
+    try:
+        # 连接数据库
+        connection = pymysql.connect(**DB_CONFIG)
+        with connection.cursor() as cursor:
+            # 执行查询
+            sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e"
+            cursor.execute(sql)
+            all_records = cursor.fetchall()
+            if all_records:
+                return all_records
+            else:
+                return None
+    except Exception as e:
+        print(f"查询数据库时发生异常: {str(e)}")
+        return None
+    finally:
+        if connection:
+            connection.close()
+
+# 对密码进行 URL 编码
+encoded_password = quote_plus(DB_CONFIG["password"])
+
+# 构建 SQLAlchemy 引擎
+engine = create_engine(
+    f"mysql+pymysql://{DB_CONFIG['user']}:{encoded_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}",
+    pool_size=5,
+    max_overflow=10
+)
+
+def bulk_insert(sql_statements):
+    """
+    批量执行 SQL 插入语句
+    :param sql_statements: 包含多个 INSERT 语句的列表
+    """
+    if not sql_statements:
+        print("未提供有效的 SQL 插入语句,跳过操作")
+        return
+
+    try:
+        with engine.connect() as conn:
+            with conn.begin():
+                for sql in sql_statements:
+                    stmt = text(sql.strip())
+                    conn.execute(stmt)
+                print(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
+    except Exception as e:
+        print(f"数据库操作失败: {str(e)}")
+        raise
+
+def update_january_yoy(prov_name):
+    """
+    更新指定省份1月份同比数据
+    :param prov_name: 省份名称,默认为福建省
+    """
+    update_sql = text("""
+                      UPDATE t_yujin_crossborder_prov_region_trade AS curr
+                          INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
+                      ON curr.city_code = prev.city_code
+                          AND prev.crossborder_year_month = DATE_FORMAT(
+                          DATE_SUB(
+                          STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
+                          INTERVAL 1 YEAR
+                          ),
+                          '%Y-01'
+                          )
+                          SET
+                              curr.yoy_import_export = COALESCE (
+                              ROUND(
+                              (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
+                              ), 0.0000
+                              ), curr.yoy_import = COALESCE (
+                              ROUND(
+                              (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
+                              ), 0.0000
+                              ), curr.yoy_export = COALESCE (
+                              ROUND(
+                              (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
+                              ), 0.0000
+                              )
+                      WHERE
+                          curr.prov_name = :prov_name
+                        AND curr.crossborder_year_month LIKE '%-01'
+                        AND curr.crossborder_year_month
+                          > '2023-01'
+                      """)
+
+    try:
+        with engine.begin() as conn:
+            result = conn.execute(update_sql, {'prov_name': prov_name})
+            print(f"Updated {result.rowcount} rows for {prov_name}")
+            return result.rowcount
+
+    except Exception as e:
+        print(f"Update failed: {str(e)}")
+        raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
+
+def clear_old_shandong_yoy(prov_name):
+    """
+    清理山东省2024年前数据的同比指标
+    """
+    clear_sql = text("""
+                     UPDATE t_yujin_crossborder_prov_region_trade
+                     SET yoy_import_export = 0.0000,
+                         yoy_export        = 0.0000,
+                         yoy_import        = 0.0000
+                     WHERE prov_name = :prov_name
+                       AND crossborder_year_month < '2024-01'
+                       AND (yoy_import_export != 0 
+           OR yoy_export != 0 
+           OR yoy_import != 0) -- 优化:仅更新非零记录
+                     """)
+
+    try:
+        with engine.begin() as conn:
+            result = conn.execute(clear_sql, {'prov_name': prov_name})
+            print(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
+            return result.rowcount
+    except Exception as e:
+        print(f"旧数据清零失败: {str(e)}")
+        raise
+
+def update_shandong_yoy(prov_name):
+    """
+    完整更新山东省同比数据(包含新旧数据处理)
+    """
+    try:
+        # 步骤1:清理旧数据
+        cleared = clear_old_shandong_yoy(prov_name)
+
+        # 步骤2:计算新数据
+        updated = _update_shandong_new_yoy(prov_name)
+
+        print(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
+        return {'cleared': cleared, 'updated': updated}
+    except Exception as e:
+        print("{prov_name} 数据处理失败", exc_info=True)
+        raise
+
+def _update_shandong_new_yoy(prov_name):
+    """
+    处理2024年及之后的山东省数据(内部方法)
+    """
+    update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr
+                                INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
+                            ON curr.city_code = prev.city_code
+                                AND prev.crossborder_year_month = DATE_FORMAT(
+                                    DATE_SUB(
+                                        STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
+                                        INTERVAL 1 YEAR
+                                    ),
+                                    '%Y-%m'
+                                )
+                            SET
+                                curr.yoy_import_export = COALESCE (
+                                    TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
+                                    0.0000
+                                ),
+                                curr.yoy_import = COALESCE (
+                                    TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
+                                    0.0000
+                                ),
+                                curr.yoy_export = COALESCE (
+                                    TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
+                                    0.0000
+                                )
+                            WHERE
+                                curr.prov_name = :prov_name
+                              AND curr.crossborder_year_month >= '2024-01'
+                              AND prev.monthly_total IS NOT NULL
+                        """)
+
+    with engine.begin() as conn:
+        result = conn.execute(update_sql, {'prov_name': prov_name})
+        print(f"{prov_name} 新数据更新数: {result.rowcount}")
+        return result.rowcount
+
+
+if __name__ == '__main__':
+    update_january_yoy('浙江省')
+    update_shandong_yoy('浙江省')
+    print("同比sql处理完成")