SpringBoot 2.x 接入非标准SSE格式大模型流式响应实践 🚀

近期DeepSeek等国产大模型热度持续攀升,其关注度甚至超过了OpenAI(被戏称为CloseAI)。在SpringBoot3.x环境中,可以使用官方的Spring AI轻松接入,但对于仍在使用JDK8SpringBoot2.7.3的企业级应用来说,往往需要自定义实现。特别是当大模型团队返回的数据格式不符合标准SSE规范时,更需要灵活处理。本文将分享我们的实战解决方案。


📦 引入Gradle依赖

核心依赖说明:

  • spring-boot-starter-web:基础Web支持
  • spring-boot-starter-webflux:响应式编程支持(WebClient所在模块)
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux' 

🌐 WebClient配置要点

初始化时特别注意Header配置:

@Bean public WebClient init() {     return WebClient.builder()             .baseUrl(baseUrl)             .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)             // ⚠️ 必须设置为JSON格式             .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)             .build(); } 

🚨 关键踩坑点:初始设置MediaType.TEXT_EVENT_STREAM_VALUE会导致请求失败,必须使用APPLICATION_JSON_VALUE


🧠 核心处理逻辑

流式请求入口

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {     // 请求体构建     String requestBody = String.format("""         {             "model": "%s",             "messages": [{"role": "user", "content": "%s"}],             "stream": true         }         """, model, prompt);                                             return webClient.post()             // 请求配置             .uri("/v1/chat/completions")             .bodyValue(requestBody)             .accept(MediaType.TEXT_EVENT_STREAM)             .retrieve()             .bodyToFlux(DataBuffer.class)  // 🔑 关键配置点             .transform(this::processStream)             // 重试和超时配置             .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))             .timeout(Duration.ofSeconds(180));             // 错误处理             .doOnError(e -> log.error("Stream error", e))             .doFinally(signal -> log.info("Stream completed: {}", signal)); } 

技术原理说明

当使用bodyToFlux(DataBuffer.class)时:

  • ✅ 获得原始字节流控制权
  • ❌ 避免自动SSE格式解析(适用于非标准响应)
  • 📡 动态数据流处理:类似Java Stream,但数据持续追加

🔧 非标准SSE数据处理

核心处理流程

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {     return dataBufferFlux             .transform(DataBufferUtils::join)          // 字节流合并             .map(buffer -> {                          // 字节转字符串                 String content = buffer.toString(StandardCharsets.UTF_8);                 DataBufferUtils.release(buffer);                 return content;             })             .flatMap(content ->                       // 处理粘包问题                 Flux.fromArray(content.split("\r?\n\r?\n")))             .filter(event -> !event.trim().isEmpty()) // 过滤空事件             .map(event -> {                           // 格式标准化处理                 String trimmed = event.trim();                 if (trimmed.startsWith("data:")) {                     String substring = trimmed.substring(5);                     return substring.startsWith(" ") ? substring.substring(1) : substring;                 }                 return trimmed;             })             .filter(event -> !event.startsWith("data:")); // 二次过滤 } 

三大关键技术点

  1. 粘包处理
    通过split("\r?\n\r?\n")解决网络传输中的消息边界问题,示例原始数据:

    data:{response1}nndata:{response2}nn 
  2. 格式兼容处理
    自动去除服务端可能返回的data:前缀,同时保留Spring自动添加SSE前缀的能力

  3. 双重过滤机制
    确保最终输出不包含任何残留的SSE格式标识


⚠️ 特别注意

当接口设置produces = MediaType.TEXT_EVENT_STREAM_VALUE时:

  • Spring WebFlux会自动添加data: 前缀

  • 前端收到的格式示例:

    data: {实际内容} 
  • 若手动添加

    data:  

    前缀会导致重复:

    data: data: {错误内容}  // ❌ 错误格式 

🛠️ 完整实现代码

// 包声明和导入...  @Service @Slf4j public class OpenAiService {     // 配置项和初始化     private String openAiApiKey = "sk-xxxxxx";          private String baseUrl = "https://openai.com/xxxx";      private String model = "gpt-4o";      private WebClient webClient;      @PostConstruct     public void init() {         webClient = WebClient.builder()                 .baseUrl(baseUrl)                 .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey)                 .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)                 .build();     }      @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)     public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {         // 构建请求体         String requestBody = String.format("""                 {                     "model": "gpt-4o-mini",                     "messages": [{"role": "user", "content": "%s"}],                     "stream": true                 }                 """, prompt);                                                     // 发送流式请求         return webClient.post()             .uri("/v1/chat/completions")             .bodyValue(requestBody)             .retrieve()             .onStatus(HttpStatusCode::isError, response ->                     response.bodyToMono(String.class)                             .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))             )             .bodyToFlux(DataBuffer.class)             .transform(this::processStream)             .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))             .timeout(Duration.ofSeconds(180))             .doOnError(e -> log.error("Stream error", e))             .doFinally(signal -> log.info("Stream completed: {}", signal));     }      private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {         return dataBufferFlux                 // 使用字节流处理                 .transform(DataBufferUtils::join)                 .map(buffer -> {                     String content = buffer.toString(StandardCharsets.UTF_8);                     DataBufferUtils.release(buffer);                     return content;                 })                 // 按 SSE 事件边界,防止粘包的问题                 .flatMap(content -> Flux.fromArray(content.split("\r?\n\r?\n")))                 // 过滤空事件                 .filter(event -> !event.trim().isEmpty())                 // 规范 SSE 事件格式                 .map(event -> {                     String trimmed = event.trim();                      // 由于webflux设置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE",                     // 所以在返回数据时会自动添加“data:”,因此如果返回的格式带了“data:”需要手动去除                     if (trimmed.startsWith("data:")) {                         trimmed = trimmed.replaceFirst("data:","").trim();                     }                     return trimmed;                 })                 .filter(event -> !event.startsWith("data:"));     } } 

发表评论

评论已关闭。

相关文章