import pymysql from sqlalchemy import create_engine, text from urllib.parse import quote_plus from utils.log import log provinces = [ "北京市", "天津市", "上海市", "重庆市", "河北省", "山西省", "辽宁省", "吉林省", "黑龙江省", "江苏省", "浙江省", "安徽省", "福建省", "江西省", "山东省", "河南省", "湖北省", "湖南省", "广东省", "海南省", "四川省", "贵州省", "云南省", "陕西省", "甘肃省", "青海省", "台湾省", "内蒙古自治区", "广西壮族自治区", "西藏自治区", "宁夏回族自治区", "新疆维吾尔自治区" ] # 数据库配置 DB_CONFIG = { 'host': '10.130.75.149', 'port': 3307, 'user': 'yto_crm', 'password': '%3sFUlsolaRI', 'database': 'crm_uat', 'charset': 'utf8mb4' } def get_commodity_id(commodity_name): """根据商品名称查询数据库,获取商品 ID 和商品名称""" fix_commodity_name = commodity_name if commodity_name.endswith(")") or commodity_name.endswith(")"): fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0] fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '') try: # 连接数据库 connection = pymysql.connect(**DB_CONFIG) with connection.cursor() as cursor: # 执行查询 sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s" cursor.execute(sql, (f"{fix_commodity_name}%",)) result = cursor.fetchall() if result: if len(result) == 1: return result[0][0], result[0][1] else: log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}") sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s" cursor.execute(sql, (f"{fix_commodity_name}",)) result = cursor.fetchone() if not result: # 用原商品名称再查一次 commodity_name = commodity_name.replace("(", "(").replace(")", ")") log.info(f"原商品名称查询,commodity_name:{commodity_name}") sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s" cursor.execute(sql, (f"{commodity_name}",)) result = cursor.fetchone() if result: return result[0], result[1] else: return None, None else: return result[0], result[1] else: return None, None except Exception as e: log.info(f"查询数据库时发生异常: {str(e)}") return None, None finally: if connection: connection.close() def get_hs_all(): try: # 连接数据库 connection = pymysql.connect(**DB_CONFIG) with connection.cursor() as cursor: # 执行查询 sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e" cursor.execute(sql) all_records = cursor.fetchall() if all_records: return all_records else: return None except Exception as e: log.info(f"查询数据库时发生异常: {str(e)}") return None finally: if connection: connection.close() def get_code_exist(crossborder_year_month, prov_code): try: # 使用 with 自动管理连接生命周期 with pymysql.connect(**DB_CONFIG) as connection: with connection.cursor() as cursor: # 执行查询 sql = """ SELECT COUNT(1) FROM t_yujin_crossborder_prov_commodity_trade e WHERE e.crossborder_year_month = %s AND e.prov_code = %s """ cursor.execute(sql, (crossborder_year_month, prov_code)) result = cursor.fetchone() return int(result[0]) if result and result[0] else 0 except Exception as e: log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}") return 0 # 对密码进行 URL 编码 encoded_password = quote_plus(DB_CONFIG["password"]) # 构建 SQLAlchemy 引擎 engine = create_engine( f"mysql+pymysql://{DB_CONFIG['user']}:{encoded_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}", pool_size=5, max_overflow=10 ) def bulk_insert(sql_statements): """ 批量执行 SQL 插入语句 :param sql_statements: 包含多个 INSERT 语句的列表 """ if not sql_statements: log.info("未提供有效的 SQL 插入语句,跳过操作") return try: with engine.connect() as conn: with conn.begin(): for sql in sql_statements: stmt = text(sql.strip()) conn.execute(stmt) log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句") except Exception as e: log.info(f"数据库操作失败: {str(e)}") raise def update_january_yoy(prov_name): """ 更新指定省份1月份同比数据 :param prov_name: 省份名称,默认为福建省 """ update_sql = text(""" UPDATE t_yujin_crossborder_prov_region_trade AS curr INNER JOIN t_yujin_crossborder_prov_region_trade AS prev ON curr.city_code = prev.city_code AND prev.crossborder_year_month = DATE_FORMAT( DATE_SUB( STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'), INTERVAL 1 YEAR ), '%Y-01' ) SET curr.yoy_import_export = COALESCE ( ROUND( (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4 ), 0.0000 ), curr.yoy_import = COALESCE ( ROUND( (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4 ), 0.0000 ), curr.yoy_export = COALESCE ( ROUND( (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4 ), 0.0000 ) WHERE curr.prov_name = :prov_name AND curr.crossborder_year_month LIKE '%-01' AND curr.crossborder_year_month > '2023-01' """) try: with engine.begin() as conn: result = conn.execute(update_sql, {'prov_name': prov_name}) log.info(f"Updated {result.rowcount} rows for {prov_name}") return result.rowcount except Exception as e: log.info(f"Update failed: {str(e)}") raise RuntimeError(f"同比数据更新失败: {str(e)}") from e def clear_old_shandong_yoy(prov_name): """ 清理山东省2024年前数据的同比指标 """ clear_sql = text(""" UPDATE t_yujin_crossborder_prov_region_trade SET yoy_import_export = 0.0000, yoy_export = 0.0000, yoy_import = 0.0000 WHERE prov_name = :prov_name AND crossborder_year_month < '2024-01' AND (yoy_import_export != 0 OR yoy_export != 0 OR yoy_import != 0) -- 优化:仅更新非零记录 """) try: with engine.begin() as conn: result = conn.execute(clear_sql, {'prov_name': prov_name}) log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}") return result.rowcount except Exception as e: log.info(f"旧数据清零失败: {str(e)}") raise def update_shandong_yoy(prov_name): """ 完整更新山东省同比数据(包含新旧数据处理) """ try: # 步骤1:清理旧数据 cleared = clear_old_shandong_yoy(prov_name) # 步骤2:计算新数据 updated = _update_shandong_new_yoy(prov_name) log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}") return {'cleared': cleared, 'updated': updated} except Exception as e: log.info("{prov_name} 数据处理失败", exc_info=True) raise def _update_shandong_new_yoy(prov_name): """ 处理2024年及之后的山东省数据(内部方法) """ update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr INNER JOIN t_yujin_crossborder_prov_region_trade AS prev ON curr.city_code = prev.city_code AND prev.crossborder_year_month = DATE_FORMAT( DATE_SUB( STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'), INTERVAL 1 YEAR ), '%Y-%m' ) SET curr.yoy_import_export = COALESCE ( TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4), 0.0000 ), curr.yoy_import = COALESCE ( TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4), 0.0000 ), curr.yoy_export = COALESCE ( TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4), 0.0000 ) WHERE curr.prov_name = :prov_name AND curr.crossborder_year_month >= '2024-01' AND prev.monthly_total IS NOT NULL """) with engine.begin() as conn: result = conn.execute(update_sql, {'prov_name': prov_name}) log.info(f"{prov_name} 新数据更新数: {result.rowcount}") return result.rowcount def update_january_yoy_origin(region_name): """ 更新指定省份1月份同比数据 :param region_name: 省份名称,默认为福建省 """ update_sql = text(""" UPDATE t_yujin_crossborder_region_trade AS curr INNER JOIN t_yujin_crossborder_region_trade AS prev ON curr.region_code = prev.region_code AND prev.year_month = DATE_FORMAT( DATE_SUB( STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'), INTERVAL 1 YEAR ), '%Y-01' ) SET curr.ytd_total = COALESCE ( ROUND( (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4 ), 0.0000 ), curr.ytd_import = COALESCE ( ROUND( (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4 ), 0.0000 ), curr.ytd_export = COALESCE ( ROUND( (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4 ), 0.0000 ) WHERE curr.region_name = :region_name AND curr.year_month LIKE '%-01' AND curr.year_month > '2023-01' """) try: with engine.begin() as conn: result = conn.execute(update_sql, {'region_name': region_name}) log.info(f"Updated {result.rowcount} rows for {region_name}") return result.rowcount except Exception as e: log.info(f"Update failed: {str(e)}") raise RuntimeError(f"同比数据更新失败: {str(e)}") from e def clear_old_shandong_yoy_origin(region_name): """ 清理山东省2024年前数据的同比指标 """ clear_sql = text(""" UPDATE t_yujin_crossborder_region_trade SET ytd_total = 0.0000, ytd_export = 0.0000, ytd_import = 0.0000 WHERE region_name = :region_name AND `year_month` < '2024-01' AND (ytd_total != 0 OR ytd_export != 0 OR ytd_import != 0) -- 优化:仅更新非零记录 """) try: with engine.begin() as conn: result = conn.execute(clear_sql, {'region_name': region_name}) log.info(f"{region_name} 旧数据清零记录数: {result.rowcount}") return result.rowcount except Exception as e: log.info(f"旧数据清零失败: {str(e)}") raise def update_shandong_yoy_origin(region_name): """ 完整更新山东省同比数据(包含新旧数据处理) """ try: # 步骤1:清理旧数据 cleared = clear_old_shandong_yoy_origin(region_name) # 步骤2:计算新数据 updated = _update_shandong_new_yoy_origin(region_name) log.info(f"{region_name} 同比处理完成 | 清零:{cleared} 更新:{updated}") return {'cleared': cleared, 'updated': updated} except Exception as e: log.info("{region_name} 数据处理失败", exc_info=True) raise def _update_shandong_new_yoy_origin(region_name): """ 处理2024年及之后的山东省数据(内部方法) """ update_sql = text("""UPDATE t_yujin_crossborder_region_trade AS curr INNER JOIN t_yujin_crossborder_region_trade AS prev ON curr.region_code = prev.region_code AND prev.year_month = DATE_FORMAT( DATE_SUB( STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'), INTERVAL 1 YEAR ), '%Y-%m' ) SET curr.ytd_total = COALESCE ( TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4), 0.0000 ), curr.ytd_import = COALESCE ( TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4), 0.0000 ), curr.ytd_export = COALESCE ( TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4), 0.0000 ) WHERE curr.region_name = :region_name AND curr.year_month >= '2024-01' AND prev.monthly_total IS NOT NULL """) with engine.begin() as conn: result = conn.execute(update_sql, {'region_name': region_name}) log.info(f"{region_name} 新数据更新数: {result.rowcount}") return result.rowcount if __name__ == '__main__': # check_year, check_month = 2024, 4 # count = get_code_exist(f'{check_year}-{check_month:02d}', "340000") # print(count) # 新表更新地级市同比 for province in provinces: update_january_yoy(province) update_shandong_yoy(province) # 旧表更新省份同比 # for province in provinces: # update_january_yoy_origin(province) # update_shandong_yoy_origin(province) log.info("同比sql处理完成")