|
- import pymysql
- from sqlalchemy import create_engine, text
- from urllib.parse import quote_plus
- from utils.log import log
- provinces = [
- "北京市", "天津市", "上海市", "重庆市",
- "河北省", "山西省", "辽宁省", "吉林省",
- "黑龙江省", "江苏省", "浙江省", "安徽省",
- "福建省", "江西省", "山东省", "河南省",
- "湖北省", "湖南省", "广东省", "海南省",
- "四川省", "贵州省", "云南省", "陕西省",
- "甘肃省", "青海省", "台湾省",
- "内蒙古自治区", "广西壮族自治区", "西藏自治区",
- "宁夏回族自治区", "新疆维吾尔自治区"
- ]
- # 数据库配置
- DB_CONFIG = {
- 'host': '10.130.75.149',
- 'port': 3307,
- 'user': 'yto_crm',
- 'password': '%3sFUlsolaRI',
- 'database': 'crm_uat',
- 'charset': 'utf8mb4'
- }
- def get_commodity_id(commodity_name):
- """根据商品名称查询数据库,获取商品 ID 和商品名称"""
- fix_commodity_name = commodity_name
- if commodity_name.endswith(")") or commodity_name.endswith(")"):
- fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0]
- fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '')
- try:
- # 连接数据库
- connection = pymysql.connect(**DB_CONFIG)
- with connection.cursor() as cursor:
- # 执行查询
- sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s"
- cursor.execute(sql, (f"{fix_commodity_name}%",))
- result = cursor.fetchall()
- if result:
- if len(result) == 1:
- return result[0][0], result[0][1]
- else:
- log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
- sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
- cursor.execute(sql, (f"{fix_commodity_name}",))
- result = cursor.fetchone()
- if not result:
- # 用原商品名称再查一次
- commodity_name = commodity_name.replace("(", "(").replace(")", ")")
- log.info(f"原商品名称查询,commodity_name:{commodity_name}")
- sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
- cursor.execute(sql, (f"{commodity_name}",))
- result = cursor.fetchone()
- if result:
- return result[0], result[1]
- else:
- return None, None
- else:
- return result[0], result[1]
- else:
- return None, None
- except Exception as e:
- log.info(f"查询数据库时发生异常: {str(e)}")
- return None, None
- finally:
- if connection:
- connection.close()
- def get_hs_all():
- try:
- # 连接数据库
- connection = pymysql.connect(**DB_CONFIG)
- with connection.cursor() as cursor:
- # 执行查询
- sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e"
- cursor.execute(sql)
- all_records = cursor.fetchall()
- if all_records:
- return all_records
- else:
- return None
- except Exception as e:
- log.info(f"查询数据库时发生异常: {str(e)}")
- return None
- finally:
- if connection:
- connection.close()
- def get_code_exist(crossborder_year_month, prov_code):
- try:
- # 使用 with 自动管理连接生命周期
- with pymysql.connect(**DB_CONFIG) as connection:
- with connection.cursor() as cursor:
- # 执行查询
- sql = """
- SELECT COUNT(1)
- FROM t_yujin_crossborder_prov_commodity_trade e
- WHERE e.crossborder_year_month = %s
- AND e.prov_code = %s
- """
- cursor.execute(sql, (crossborder_year_month, prov_code))
- result = cursor.fetchone()
- return int(result[0]) if result and result[0] else 0
- except Exception as e:
- log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}")
- return 0
- # 对密码进行 URL 编码
- encoded_password = quote_plus(DB_CONFIG["password"])
- # 构建 SQLAlchemy 引擎
- engine = create_engine(
- f"mysql+pymysql://{DB_CONFIG['user']}:{encoded_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}",
- pool_size=5,
- max_overflow=10
- )
- def bulk_insert(sql_statements):
- """
- 批量执行 SQL 插入语句
- :param sql_statements: 包含多个 INSERT 语句的列表
- """
- if not sql_statements:
- log.info("未提供有效的 SQL 插入语句,跳过操作")
- return
- try:
- with engine.connect() as conn:
- with conn.begin():
- for sql in sql_statements:
- stmt = text(sql.strip())
- conn.execute(stmt)
- log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
- except Exception as e:
- log.info(f"数据库操作失败: {str(e)}")
- raise
- def update_january_yoy(prov_name):
- """
- 更新指定省份1月份同比数据
- :param prov_name: 省份名称,默认为福建省
- """
- update_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade AS curr
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
- ON curr.city_code = prev.city_code
- AND prev.crossborder_year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-01'
- )
- SET
- curr.yoy_import_export = COALESCE (
- ROUND(
- (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
- ), 0.0000
- ), curr.yoy_import = COALESCE (
- ROUND(
- (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
- ), 0.0000
- ), curr.yoy_export = COALESCE (
- ROUND(
- (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
- ), 0.0000
- )
- WHERE
- curr.prov_name = :prov_name
- AND curr.crossborder_year_month LIKE '%-01'
- AND curr.crossborder_year_month
- > '2023-01'
- """)
- try:
- with engine.begin() as conn:
- result = conn.execute(update_sql, {'prov_name': prov_name})
- log.info(f"Updated {result.rowcount} rows for {prov_name}")
- return result.rowcount
- except Exception as e:
- log.info(f"Update failed: {str(e)}")
- raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
- def clear_old_shandong_yoy(prov_name):
- """
- 清理山东省2024年前数据的同比指标
- """
- clear_sql = text("""
- UPDATE t_yujin_crossborder_prov_region_trade
- SET yoy_import_export = 0.0000,
- yoy_export = 0.0000,
- yoy_import = 0.0000
- WHERE prov_name = :prov_name
- AND crossborder_year_month < '2024-01'
- AND (yoy_import_export != 0
- OR yoy_export != 0
- OR yoy_import != 0) -- 优化:仅更新非零记录
- """)
- try:
- with engine.begin() as conn:
- result = conn.execute(clear_sql, {'prov_name': prov_name})
- log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
- return result.rowcount
- except Exception as e:
- log.info(f"旧数据清零失败: {str(e)}")
- raise
- def update_shandong_yoy(prov_name):
- """
- 完整更新山东省同比数据(包含新旧数据处理)
- """
- try:
- # 步骤1:清理旧数据
- cleared = clear_old_shandong_yoy(prov_name)
- # 步骤2:计算新数据
- updated = _update_shandong_new_yoy(prov_name)
- log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
- return {'cleared': cleared, 'updated': updated}
- except Exception as e:
- log.info("{prov_name} 数据处理失败", exc_info=True)
- raise
- def _update_shandong_new_yoy(prov_name):
- """
- 处理2024年及之后的山东省数据(内部方法)
- """
- update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
- ON curr.city_code = prev.city_code
- AND prev.crossborder_year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-%m'
- )
- SET
- curr.yoy_import_export = COALESCE (
- TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
- 0.0000
- ),
- curr.yoy_import = COALESCE (
- TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
- 0.0000
- ),
- curr.yoy_export = COALESCE (
- TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
- 0.0000
- )
- WHERE
- curr.prov_name = :prov_name
- AND curr.crossborder_year_month >= '2024-01'
- AND prev.monthly_total IS NOT NULL
- """)
- with engine.begin() as conn:
- result = conn.execute(update_sql, {'prov_name': prov_name})
- log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
- return result.rowcount
- def update_january_yoy_origin(region_name):
- """
- 更新指定省份1月份同比数据
- :param region_name: 省份名称,默认为福建省
- """
- update_sql = text("""
- UPDATE t_yujin_crossborder_region_trade AS curr
- INNER JOIN t_yujin_crossborder_region_trade AS prev
- ON curr.region_code = prev.region_code
- AND prev.year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-01'
- )
- SET
- curr.ytd_total = COALESCE (
- ROUND(
- (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
- ), 0.0000
- ), curr.ytd_import = COALESCE (
- ROUND(
- (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
- ), 0.0000
- ), curr.ytd_export = COALESCE (
- ROUND(
- (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
- ), 0.0000
- )
- WHERE
- curr.region_name = :region_name
- AND curr.year_month LIKE '%-01'
- AND curr.year_month
- > '2023-01'
- """)
- try:
- with engine.begin() as conn:
- result = conn.execute(update_sql, {'region_name': region_name})
- log.info(f"Updated {result.rowcount} rows for {region_name}")
- return result.rowcount
- except Exception as e:
- log.info(f"Update failed: {str(e)}")
- raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
- def clear_old_shandong_yoy_origin(region_name):
- """
- 清理山东省2024年前数据的同比指标
- """
- clear_sql = text("""
- UPDATE t_yujin_crossborder_region_trade
- SET ytd_total = 0.0000,
- ytd_export = 0.0000,
- ytd_import = 0.0000
- WHERE region_name = :region_name
- AND `year_month` < '2024-01'
- AND (ytd_total != 0
- OR ytd_export != 0
- OR ytd_import != 0) -- 优化:仅更新非零记录
- """)
- try:
- with engine.begin() as conn:
- result = conn.execute(clear_sql, {'region_name': region_name})
- log.info(f"{region_name} 旧数据清零记录数: {result.rowcount}")
- return result.rowcount
- except Exception as e:
- log.info(f"旧数据清零失败: {str(e)}")
- raise
- def update_shandong_yoy_origin(region_name):
- """
- 完整更新山东省同比数据(包含新旧数据处理)
- """
- try:
- # 步骤1:清理旧数据
- cleared = clear_old_shandong_yoy_origin(region_name)
- # 步骤2:计算新数据
- updated = _update_shandong_new_yoy_origin(region_name)
- log.info(f"{region_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
- return {'cleared': cleared, 'updated': updated}
- except Exception as e:
- log.info("{region_name} 数据处理失败", exc_info=True)
- raise
- def _update_shandong_new_yoy_origin(region_name):
- """
- 处理2024年及之后的山东省数据(内部方法)
- """
- update_sql = text("""UPDATE t_yujin_crossborder_region_trade AS curr
- INNER JOIN t_yujin_crossborder_region_trade AS prev
- ON curr.region_code = prev.region_code
- AND prev.year_month = DATE_FORMAT(
- DATE_SUB(
- STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
- INTERVAL 1 YEAR
- ),
- '%Y-%m'
- )
- SET
- curr.ytd_total = COALESCE (
- TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
- 0.0000
- ),
- curr.ytd_import = COALESCE (
- TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
- 0.0000
- ),
- curr.ytd_export = COALESCE (
- TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
- 0.0000
- )
- WHERE
- curr.region_name = :region_name
- AND curr.year_month >= '2024-01'
- AND prev.monthly_total IS NOT NULL
- """)
- with engine.begin() as conn:
- result = conn.execute(update_sql, {'region_name': region_name})
- log.info(f"{region_name} 新数据更新数: {result.rowcount}")
- return result.rowcount
- if __name__ == '__main__':
- # check_year, check_month = 2024, 4
- # count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
- # print(count)
- # 新表更新地级市同比
- for province in provinces:
- update_january_yoy(province)
- update_shandong_yoy(province)
- # 旧表更新省份同比
- # for province in provinces:
- # update_january_yoy_origin(province)
- # update_shandong_yoy_origin(province)
- log.info("同比sql处理完成")
|