Java 实战:基于 WebSocket 获取大模型流式输出并转为 Flux

Luca Ju
2025-09-30 / 0 评论 / 35 阅读 / 正在检测是否收录...

在大模型问答场景中,流式输出已成为提升用户体验的关键技术 —— 它能让模型生成的内容 “逐段呈现”,避免用户长时间等待完整结果。目前行业内最主流的实现方式是HTTP Chunked 编码,几乎所有大模型官网的示例代码都采用这种方案。

但最近我遇到了一个特殊的对接需求:必须通过WebSocket接收流式输出。虽然这种方式不常用,但梳理清楚实现逻辑后发现,它在长连接稳定性、双向通信灵活性上有独特优势。于是整理了这篇实战指南,希望能帮到有同样需求的开发者。一切都是最好的安排~

一、核心需求与技术选型

1. 需求背景

  • 需通过 WebSocket 协议与大模型服务建立连接,实时接收流式输出
  • 项目前端已适配 HTTP 流式输出的 Flux 格式,为减少前端改动,需将 WebSocket 接收的流转为 Flux 响应
  • 需处理连接异常、消息结束等边界场景,保证流传输的稳定性

2. 关键技术依赖

实现核心是 Spring 生态中的ReactorNettyWebSocketClient,它基于响应式编程模型,能天然适配 Flux 的流处理特性,主要依赖如下(Spring Boot 项目可直接引入): 我使用的是jdk17

<!-- Spring WebFlux 响应式Web支持 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Reactor Netty WebSocket客户端 -->
<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty-core</artifactId>
</dependency>

二、完整实现代码与逻辑解析

下面是从 WebSocket 连接建立、流式消息接收,到转为 Flux 的完整代码,每一步都附带详细注释:

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import java.net.URI;


/**
 * WebSocket流式输出处理工具类
 * 功能:建立WebSocket连接、接收流式消息、转为Flux响应
 *
 * @author : jucunqi
 * @since : 2025/9/30
 */
@Slf4j
public class WebSocketFluxUtils {


    /**
     * 核心方法:通过WebSocket获取流式输出并转为Flux<String>
     * @param request 发送给大模型的请求参数(通常为JSON格式字符串)
     * @param wsUrl WebSocket服务地址(例:ws://localhost:8080/model/stream)
     * @return 流式输出的Flux<String>,可直接响应给前端
     */
    public Flux<String> getWebSocketStream(String request, String wsUrl) {
        // 1. 创建WebSocket客户端(基于Reactor Netty实现)
        WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();

        // 2. 用Flux.create创建可手动发射消息的Flux
        // 作用:将WebSocket接收的消息“转发”到Flux,供前端订阅
        return Flux.create(sink -> {
            // 3. 执行WebSocket连接:参数1为服务地址,参数2为会话处理逻辑
            Mono<Void> connectMono = webSocketClient.execute(
                    URI.create(wsUrl),
                    session -> handleWebSocketSession(session, request, sink)
            );

            // 4. 订阅连接结果,处理连接层面的异常
            connectMono.subscribe(
                    null,  // onNext:连接成功无返回值,无需处理
                    error -> {  // onError:连接失败时,通知Flux出错
                        log.error("WebSocket连接建立失败,原因:{}", error.getMessage(), error);
                        sink.error(error);
                    },
                    () -> {  // onComplete:连接正常关闭时,通知Flux完成
                        log.info("WebSocket连接已正常关闭");
                        sink.complete();
                    }
            );
        });

    }

    /**
     * 辅助方法:处理WebSocket会话逻辑(发送请求、接收消息)
     * @param session WebSocket会话对象
     * @param request 待发送的请求参数
     * @param sink Flux的消息发射器
     * @return Mono<Void> 会话处理结果(完成/异常)
     */
    private Mono<Void> handleWebSocketSession(
            WebSocketSession session,
            String request,
            FluxSink<String> sink
    ) {
        // 第一步:向大模型服务发送初始请求(仅发送1次)
        Mono<Void> sendRequestMono = session.send(
                Flux.just(session.textMessage(request))  // 将请求转为WebSocket文本消息
        );

        // 第二步:持续接收大模型的流式消息
        Mono<Void> receiveMsgMono = session.receive()
                // 1. 将WebSocket消息转为字符串
                .map(WebSocketMessage::getPayloadAsText)
                // 2. 接收每条消息的处理逻辑
                .doOnNext(msg -> {
                    log.debug("收到WebSocket流式消息:{}", msg);
                    // 将消息发射到Flux,前端可实时接收
                    sink.next(msg);

                    // 关键:判断消息是否结束(需与大模型服务约定结束标识)
                    // 例:若服务返回"result"表示输出完成,则关闭会话+Flux
                    if ("result".equals(msg)) {
                        log.info("收到流式输出结束标识,关闭WebSocket会话");
                        sink.complete();  // 通知Flux完成
                        session.close().subscribe();  // 主动关闭WebSocket会话
                    }
                })
                // 3. 处理消息接收异常
                .doOnError(error -> {
                    log.error("接收WebSocket消息异常,原因:{}", error.getMessage(), error);
                    sink.error(error);  // 通知Flux出错
                })
                // 4. 消息接收完成(服务主动关闭发送端)
                .doOnComplete(() -> {
                    log.info("WebSocket消息接收完成");
                    sink.complete();
                })
                // 5. 保持连接:直到消息接收完成/异常(then()无返回值,仅等待完成)
                .then();

        // 组合“发送请求”和“接收消息”:先发送请求,再持续接收消息
        return sendRequestMono.then(receiveMsgMono);
    }


    public static void main(String[] args) {

        String url = "";
        String initialMessage = "";
        WebSocketFluxUtils webSocketFluxUtils = new WebSocketFluxUtils();


        // 调用封装的方法获取消息流
        Flux<String> messageFlux = webSocketFluxUtils.getWebSocketStream(initialMessage,url);

        // 订阅消息流处理每条消息
        messageFlux.subscribe(
                message -> System.out.println("接收消息:" + message),
                error -> System.err.println("发生错误: " + error.getMessage()),
                () -> System.out.println("连接已关闭")
        );

        // 阻塞以保持程序运行
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

三、关键逻辑与边界处理说明

1. 为什么用 Flux.create?

Flux 有多种创建方式,这里选择Flux.create是因为它允许手动控制消息的发射时机—— 我们需要在 “收到 WebSocket 消息时” 主动调用sink.next(msg),将消息转发到 Flux,完美适配 WebSocket 的 “推送式” 消息接收逻辑。

2. 会话关闭的核心逻辑

流式输出的结束需要服务端与客户端约定标识(如示例中的 “result” 字符串):

  • 客户端收到结束标识后,调用sink.complete()通知前端流结束
  • 同时调用session.close().subscribe()主动关闭 WebSocket 会话,避免连接泄漏

3. 异常处理分层

代码中做了两层异常处理,覆盖全链路场景:

  • 连接层异常:WebSocket 连接建立失败(如地址错误、服务不可用)
  • 消息层异常:连接成功后,接收消息时出现 IO 异常、解析异常

四、使用示例(Spring Boot 接口)

将工具类集成到 Spring Boot 接口,前端通过 HTTP 请求即可获取 Flux 流式响应:

对应的WebSocketFluxUtils 可以换成业务类相关的Service

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class ModelStreamController {
    private final WebSocketFluxUtils webSocketFluxUtils;

    // 构造注入工具类
    public ModelStreamController(WebSocketFluxUtils webSocketFluxUtils) {
        this.webSocketFluxUtils = webSocketFluxUtils;
    }

    /**
     * 前端调用接口:通过HTTP获取大模型流式输出(底层基于WebSocket)
     * @param request 模型请求参数(如prompt、temperature等)
     * @return Flux<String> 流式响应
     */
    @PostMapping("/api/model/stream")
    public Flux<String> getModelStream(@RequestBody ModelRequest request) {
        // 1. 构建WebSocket请求参数(根据大模型服务格式调整)
        String wsRequest = "{\"prompt\":\"" + request.getPrompt() + "\",\"stream\":true}";
        // 2. 调用工具类获取Flux
        return webSocketFluxUtils.getWebSocketStream(wsRequest, "ws://model-server:8080/stream");
    }

    // 请求参数实体类
    static class ModelRequest {
        private String prompt;
        // getter/setter省略
    }
}

总结

本文通过实战代码,完整演示了 “WebSocket 接收流式输出→转为 Flux→响应前端” 的全流程。核心是利用 Reactor 响应式编程模型,将 WebSocket 的 “推送式” 消息与 Flux 的 “订阅式” 流无缝衔接。

虽然 HTTP Chunked 是大模型流式输出的主流方案,但 WebSocket 在长连接、双向通信场景下更有优势。如果你的项目也遇到类似需求,希望这篇文章能帮你少走弯路~

1

评论 (0)

取消