要实现通过 API 将流式内容输出到前端,可以采用以下技术方案(以 Python 后端 + 前端 JavaScript 为例):
方案一:使用 Server-Sent Events (SSE)
这是浏览器原生支持的流式传输方案,推荐首选
# Flask 示例 from flask import Response, stream_with_context @app.route('/stream') def stream_data(): def generate(): response = client.chat.completions.create( model="deepseek-chat", messages=messages, stream=True ) for chunk in response: if chunk.choices: content = chunk.choices[0].delta.content or "" # SSE 格式要求 data: 前缀和双换行符 yield f"data: {json.dumps({'content': content})}nn" return Response(stream_with_context(generate()), mimetype='text/event-stream')
// 前端 JavaScript const eventSource = new EventSource('/stream'); eventSource.onmessage = (event) => { const data = JSON.parse(event.data); document.getElementById('output').innerHTML += data.content; }; eventSource.onerror = (err) => { console.error('EventSource failed:', err); eventSource.close(); };
方案二:使用流式 HTTP 响应(NDJSON)
更通用的流式传输方案,适合非浏览器客户端
# FastAPI 示例 from fastapi import APIRouter from fastapi.responses import StreamingResponse import json @app.get("/stream") async def stream_data(): async def generate(): response = client.chat.completions.create( model="deepseek-chat", messages=messages, stream=True ) async for chunk in response: if chunk.choices: content = chunk.choices[0].delta.content or "" yield json.dumps({"content": content}) + "n" # NDJSON 格式 return StreamingResponse(generate(), media_type='application/x-ndjson')
// 前端 JavaScript 使用 Fetch API async function streamData() { const response = await fetch('/stream'); const reader = response.body.getReader(); const decoder = new TextDecoder(); while(true) { const { done, value } = await reader.read(); if(done) break; const chunk = decoder.decode(value); const data = JSON.parse(chunk); document.getElementById('output').innerHTML += data.content; } }
关键配置说明
-
响应头设置:
# Flask headers = { 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } # FastAPI 自动处理 -
数据格式选择:
- SSE (
text/event-stream):浏览器原生支持,自动重连 - NDJSON (
application/x-ndjson):更通用的流式 JSON 格式 - 纯文本流:简单但结构化能力弱
- SSE (
-
前端处理建议:
// 更健壮的读取方式(处理分块不完整情况) let buffer = ''; async function processChunk(chunk) { buffer += chunk; while(buffer.includes('n')) { const lineEnd = buffer.indexOf('n'); const line = buffer.slice(0, lineEnd); buffer = buffer.slice(lineEnd + 1); try { const data = JSON.parse(line); // 处理数据... } catch(e) { console.error('解析错误:', e); } } }
完整工作流程示例(FastAPI + React)
后端:
# main.py from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) @app.get("/chat") async def chat_stream(prompt: str): async def generate(): response = client.chat.completions.create( model="deepseek-chat", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in response: if content := chunk.choices[0].delta.content: yield json.dumps({"content": content}) return StreamingResponse(generate(), media_type="application/x-ndjson")
前端 React 组件:
// ChatComponent.jsx import { useState } from 'react'; export default function ChatComponent() { const [output, setOutput] = useState(''); const startStream = async () => { const response = await fetch('http://api/chat?prompt=你好'); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while(true) { const { done, value } = await reader.read(); if(done) break; buffer += decoder.decode(value); while(buffer.includes('}')) { const endIndex = buffer.indexOf('}') + 1; const chunk = buffer.slice(0, endIndex); buffer = buffer.slice(endIndex); try { const data = JSON.parse(chunk); setOutput(prev => prev + data.content); } catch(e) { console.error('解析错误:', e); } } } }; return ( <div> <button onClick={startStream}>开始对话</button> <div id="output">{output}</div> </div> ); }
注意事项
-
连接管理:
- 设置合理的超时时间(通常 30-60 秒)
- 处理客户端提前断开连接的情况
# FastAPI 示例 try: async for chunk in response: # ...处理数据 if await request.is_disconnected(): break finally: await client.close() # 清理资源 -
性能优化:
- 使用异步框架(FastAPI 性能优于 Flask)
- 启用响应压缩
app = FastAPI() @app.middleware("http") async def add_compression(request, call_next): response = await call_next(request) response.headers["Content-Encoding"] = "gzip" return response -
安全考虑:
- 限制最大并发连接数
- 实施速率限制
from fastapi import Request from fastapi.middleware import Middleware from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter @app.get("/chat") @limiter.limit("10/minute") async def chat_stream(request: Request): # ... -
错误处理增强:
async def generate(): try: response = client.chat.completions.create(...) async for chunk in response: # 处理数据... except Exception as e: yield json.dumps({"error": str(e)}) finally: await client.close() # 确保释放资源
这些方案可根据具体需求组合使用,建议优先选择 SSE 方案(浏览器兼容性好),需要支持更复杂场景时可考虑 WebSocket,但后者实现成本较高。