在大模型问答场景中,流式输出已成为提升用户体验的关键技术 —— 它能让模型生成的内容 “逐段呈现”,避免用户长时间等待完整结果。目前行业内最主流的实现方式是HTTP Chunked 编码,几乎所有大模型官网的示例代码都采用这种方案。
但最近我遇到了一个特殊的对接需求:必须通过WebSocket接收流式输出。虽然这种方式不常用,但梳理清楚实现逻辑后发现,它在长连接稳定性、双向通信灵活性上有独特优势。于是整理了这篇实战指南,希望能帮到有同样需求的开发者。一切都是最好的安排~
一、核心需求与技术选型
1. 需求背景
- 需通过 WebSocket 协议与大模型服务建立连接,实时接收流式输出
- 项目前端已适配 HTTP 流式输出的 Flux 格式,为减少前端改动,需将 WebSocket 接收的流转为 Flux 响应
- 需处理连接异常、消息结束等边界场景,保证流传输的稳定性
2. 关键技术依赖
实现核心是 Spring 生态中的ReactorNettyWebSocketClient,它基于响应式编程模型,能天然适配 Flux 的流处理特性,主要依赖如下(Spring Boot 项目可直接引入): 我使用的是jdk17
<!-- Spring WebFlux 响应式Web支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Reactor Netty WebSocket客户端 -->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>二、完整实现代码与逻辑解析
下面是从 WebSocket 连接建立、流式消息接收,到转为 Flux 的完整代码,每一步都附带详细注释:
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import java.net.URI;
/**
* WebSocket流式输出处理工具类
* 功能:建立WebSocket连接、接收流式消息、转为Flux响应
*
* @author : jucunqi
* @since : 2025/9/30
*/
@Slf4j
public class WebSocketFluxUtils {
/**
* 核心方法:通过WebSocket获取流式输出并转为Flux<String>
* @param request 发送给大模型的请求参数(通常为JSON格式字符串)
* @param wsUrl WebSocket服务地址(例:ws://localhost:8080/model/stream)
* @return 流式输出的Flux<String>,可直接响应给前端
*/
public Flux<String> getWebSocketStream(String request, String wsUrl) {
// 1. 创建WebSocket客户端(基于Reactor Netty实现)
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
// 2. 用Flux.create创建可手动发射消息的Flux
// 作用:将WebSocket接收的消息“转发”到Flux,供前端订阅
return Flux.create(sink -> {
// 3. 执行WebSocket连接:参数1为服务地址,参数2为会话处理逻辑
Mono<Void> connectMono = webSocketClient.execute(
URI.create(wsUrl),
session -> handleWebSocketSession(session, request, sink)
);
// 4. 订阅连接结果,处理连接层面的异常
connectMono.subscribe(
null, // onNext:连接成功无返回值,无需处理
error -> { // onError:连接失败时,通知Flux出错
log.error("WebSocket连接建立失败,原因:{}", error.getMessage(), error);
sink.error(error);
},
() -> { // onComplete:连接正常关闭时,通知Flux完成
log.info("WebSocket连接已正常关闭");
sink.complete();
}
);
});
}
/**
* 辅助方法:处理WebSocket会话逻辑(发送请求、接收消息)
* @param session WebSocket会话对象
* @param request 待发送的请求参数
* @param sink Flux的消息发射器
* @return Mono<Void> 会话处理结果(完成/异常)
*/
private Mono<Void> handleWebSocketSession(
WebSocketSession session,
String request,
FluxSink<String> sink
) {
// 第一步:向大模型服务发送初始请求(仅发送1次)
Mono<Void> sendRequestMono = session.send(
Flux.just(session.textMessage(request)) // 将请求转为WebSocket文本消息
);
// 第二步:持续接收大模型的流式消息
Mono<Void> receiveMsgMono = session.receive()
// 1. 将WebSocket消息转为字符串
.map(WebSocketMessage::getPayloadAsText)
// 2. 接收每条消息的处理逻辑
.doOnNext(msg -> {
log.debug("收到WebSocket流式消息:{}", msg);
// 将消息发射到Flux,前端可实时接收
sink.next(msg);
// 关键:判断消息是否结束(需与大模型服务约定结束标识)
// 例:若服务返回"result"表示输出完成,则关闭会话+Flux
if ("result".equals(msg)) {
log.info("收到流式输出结束标识,关闭WebSocket会话");
sink.complete(); // 通知Flux完成
session.close().subscribe(); // 主动关闭WebSocket会话
}
})
// 3. 处理消息接收异常
.doOnError(error -> {
log.error("接收WebSocket消息异常,原因:{}", error.getMessage(), error);
sink.error(error); // 通知Flux出错
})
// 4. 消息接收完成(服务主动关闭发送端)
.doOnComplete(() -> {
log.info("WebSocket消息接收完成");
sink.complete();
})
// 5. 保持连接:直到消息接收完成/异常(then()无返回值,仅等待完成)
.then();
// 组合“发送请求”和“接收消息”:先发送请求,再持续接收消息
return sendRequestMono.then(receiveMsgMono);
}
public static void main(String[] args) {
String url = "";
String initialMessage = "";
WebSocketFluxUtils webSocketFluxUtils = new WebSocketFluxUtils();
// 调用封装的方法获取消息流
Flux<String> messageFlux = webSocketFluxUtils.getWebSocketStream(initialMessage,url);
// 订阅消息流处理每条消息
messageFlux.subscribe(
message -> System.out.println("接收消息:" + message),
error -> System.err.println("发生错误: " + error.getMessage()),
() -> System.out.println("连接已关闭")
);
// 阻塞以保持程序运行
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}三、关键逻辑与边界处理说明
1. 为什么用 Flux.create?
Flux 有多种创建方式,这里选择Flux.create是因为它允许手动控制消息的发射时机—— 我们需要在 “收到 WebSocket 消息时” 主动调用sink.next(msg),将消息转发到 Flux,完美适配 WebSocket 的 “推送式” 消息接收逻辑。
2. 会话关闭的核心逻辑
流式输出的结束需要服务端与客户端约定标识(如示例中的 “result” 字符串):
- 客户端收到结束标识后,调用sink.complete()通知前端流结束
- 同时调用session.close().subscribe()主动关闭 WebSocket 会话,避免连接泄漏
3. 异常处理分层
代码中做了两层异常处理,覆盖全链路场景:
- 连接层异常:WebSocket 连接建立失败(如地址错误、服务不可用)
- 消息层异常:连接成功后,接收消息时出现 IO 异常、解析异常
四、使用示例(Spring Boot 接口)
将工具类集成到 Spring Boot 接口,前端通过 HTTP 请求即可获取 Flux 流式响应:
对应的WebSocketFluxUtils 可以换成业务类相关的Service
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class ModelStreamController {
private final WebSocketFluxUtils webSocketFluxUtils;
// 构造注入工具类
public ModelStreamController(WebSocketFluxUtils webSocketFluxUtils) {
this.webSocketFluxUtils = webSocketFluxUtils;
}
/**
* 前端调用接口:通过HTTP获取大模型流式输出(底层基于WebSocket)
* @param request 模型请求参数(如prompt、temperature等)
* @return Flux<String> 流式响应
*/
@PostMapping("/api/model/stream")
public Flux<String> getModelStream(@RequestBody ModelRequest request) {
// 1. 构建WebSocket请求参数(根据大模型服务格式调整)
String wsRequest = "{\"prompt\":\"" + request.getPrompt() + "\",\"stream\":true}";
// 2. 调用工具类获取Flux
return webSocketFluxUtils.getWebSocketStream(wsRequest, "ws://model-server:8080/stream");
}
// 请求参数实体类
static class ModelRequest {
private String prompt;
// getter/setter省略
}
}总结
本文通过实战代码,完整演示了 “WebSocket 接收流式输出→转为 Flux→响应前端” 的全流程。核心是利用 Reactor 响应式编程模型,将 WebSocket 的 “推送式” 消息与 Flux 的 “订阅式” 流无缝衔接。
虽然 HTTP Chunked 是大模型流式输出的主流方案,但 WebSocket 在长连接、双向通信场景下更有优势。如果你的项目也遇到类似需求,希望这篇文章能帮你少走弯路~
评论 (0)