通过 API 将Deepseek响应流式内容输出到前端

要实现通过 API 将流式内容输出到前端,可以采用以下技术方案(以 Python 后端 + 前端 JavaScript 为例):

方案一:使用 Server-Sent Events (SSE)

这是浏览器原生支持的流式传输方案,推荐首选

# Flask 示例 from flask import Response, stream_with_context  @app.route('/stream') def stream_data():     def generate():         response = client.chat.completions.create(             model="deepseek-chat",             messages=messages,             stream=True         )                  for chunk in response:             if chunk.choices:                 content = chunk.choices[0].delta.content or ""                 # SSE 格式要求 data: 前缀和双换行符                 yield f"data: {json.dumps({'content': content})}nn"          return Response(stream_with_context(generate()), mimetype='text/event-stream') 
// 前端 JavaScript const eventSource = new EventSource('/stream');  eventSource.onmessage = (event) => {     const data = JSON.parse(event.data);     document.getElementById('output').innerHTML += data.content; };  eventSource.onerror = (err) => {     console.error('EventSource failed:', err);     eventSource.close(); }; 

方案二:使用流式 HTTP 响应(NDJSON)

更通用的流式传输方案,适合非浏览器客户端

# FastAPI 示例 from fastapi import APIRouter from fastapi.responses import StreamingResponse import json  @app.get("/stream") async def stream_data():     async def generate():         response = client.chat.completions.create(             model="deepseek-chat",             messages=messages,             stream=True         )                  async for chunk in response:             if chunk.choices:                 content = chunk.choices[0].delta.content or ""                 yield json.dumps({"content": content}) + "n"  # NDJSON 格式          return StreamingResponse(generate(), media_type='application/x-ndjson') 
// 前端 JavaScript 使用 Fetch API async function streamData() {     const response = await fetch('/stream');     const reader = response.body.getReader();     const decoder = new TextDecoder();          while(true) {         const { done, value } = await reader.read();         if(done) break;                  const chunk = decoder.decode(value);         const data = JSON.parse(chunk);         document.getElementById('output').innerHTML += data.content;     } } 

关键配置说明

  1. 响应头设置

    # Flask headers = {     'Cache-Control': 'no-cache',     'Connection': 'keep-alive' }  # FastAPI 自动处理 
  2. 数据格式选择

    • SSE (text/event-stream):浏览器原生支持,自动重连
    • NDJSON (application/x-ndjson):更通用的流式 JSON 格式
    • 纯文本流:简单但结构化能力弱
  3. 前端处理建议

    // 更健壮的读取方式(处理分块不完整情况) let buffer = '';  async function processChunk(chunk) {     buffer += chunk;     while(buffer.includes('n')) {         const lineEnd = buffer.indexOf('n');         const line = buffer.slice(0, lineEnd);         buffer = buffer.slice(lineEnd + 1);                  try {             const data = JSON.parse(line);             // 处理数据...         } catch(e) {             console.error('解析错误:', e);         }     } } 

完整工作流程示例(FastAPI + React)

后端

# main.py from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware  app = FastAPI()  app.add_middleware(     CORSMiddleware,     allow_origins=["*"],     allow_methods=["*"],     allow_headers=["*"], )  @app.get("/chat") async def chat_stream(prompt: str):     async def generate():         response = client.chat.completions.create(             model="deepseek-chat",             messages=[{"role": "user", "content": prompt}],             stream=True         )                  async for chunk in response:             if content := chunk.choices[0].delta.content:                 yield json.dumps({"content": content})          return StreamingResponse(generate(), media_type="application/x-ndjson") 

前端 React 组件

// ChatComponent.jsx import { useState } from 'react';  export default function ChatComponent() {     const [output, setOutput] = useState('');          const startStream = async () => {         const response = await fetch('http://api/chat?prompt=你好');         const reader = response.body.getReader();         const decoder = new TextDecoder();         let buffer = '';                  while(true) {             const { done, value } = await reader.read();             if(done) break;                          buffer += decoder.decode(value);             while(buffer.includes('}')) {                 const endIndex = buffer.indexOf('}') + 1;                 const chunk = buffer.slice(0, endIndex);                 buffer = buffer.slice(endIndex);                                  try {                     const data = JSON.parse(chunk);                     setOutput(prev => prev + data.content);                 } catch(e) {                     console.error('解析错误:', e);                 }             }         }     };          return (         <div>             <button onClick={startStream}>开始对话</button>             <div id="output">{output}</div>         </div>     ); } 

注意事项

  1. 连接管理

    • 设置合理的超时时间(通常 30-60 秒)
    • 处理客户端提前断开连接的情况
    # FastAPI 示例 try:     async for chunk in response:         # ...处理数据         if await request.is_disconnected():             break finally:     await client.close()  # 清理资源 
  2. 性能优化

    • 使用异步框架(FastAPI 性能优于 Flask)
    • 启用响应压缩
    app = FastAPI() @app.middleware("http") async def add_compression(request, call_next):     response = await call_next(request)     response.headers["Content-Encoding"] = "gzip"     return response 
  3. 安全考虑

    • 限制最大并发连接数
    • 实施速率限制
    from fastapi import Request from fastapi.middleware import Middleware from slowapi import Limiter from slowapi.util import get_remote_address  limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter  @app.get("/chat") @limiter.limit("10/minute") async def chat_stream(request: Request):     # ... 
  4. 错误处理增强

    async def generate():     try:         response = client.chat.completions.create(...)         async for chunk in response:             # 处理数据...     except Exception as e:         yield json.dumps({"error": str(e)})     finally:         await client.close()  # 确保释放资源 

这些方案可根据具体需求组合使用,建议优先选择 SSE 方案(浏览器兼容性好),需要支持更复杂场景时可考虑 WebSocket,但后者实现成本较高。

发表评论

评论已关闭。

相关文章