.NET+AI | MEAI | 自定义中间件(8)

DelegatingChatClient:构建企业级 AI 中间件的利器

一句话简介

通过 Microsoft.Extensions.AI 的 DelegatingChatClient 基类,轻松创建自定义中间件,实现限流、重试、安全过滤等企业级功能,让 AI 应用更安全、更稳定。


🎯 核心价值

  • 简单易用:只需继承基类并重写需要的方法
  • 灵活组合:多个中间件可以管道式串联
  • 企业就绪:实现限流、安全、监控等生产级功能
  • 标准化:遵循统一的 IChatClient 接口规范

📝 为什么需要自定义中间件?

在实际应用中,我们经常需要对 AI 服务进行增强和控制:

场景 挑战 中间件方案
API 限流 超出调用频率限制 RateLimitingChatClient
网络故障 临时性错误导致失败 RetryingChatClient
内容安全 敏感信息泄露风险 ContentFilteringChatClient
性能监控 无法追踪响应时间 PerformanceMonitoringClient
合规审计 需要记录所有交互 AuditLoggingChatClient

🏗️ DelegatingChatClient 核心概念

.NET+AI | MEAI | 自定义中间件(8)

核心特性:

  • 🔧 透明转发:默认将所有调用转发到内部客户端
  • 🔧 可选重写:只需重写需要定制的方法
  • 🔧 管道友好:支持多个中间件串联组合

可重写方法:

  • GetResponseAsync:处理完整响应
  • GetStreamingResponseAsync:处理流式响应
  • Dispose:清理资源

💻 快速开始

1. 实现限流中间件

保护 API 免受过载,控制调用频率:

using Microsoft.Extensions.AI; using System.Threading.RateLimiting;  public sealed class RateLimitingChatClient : DelegatingChatClient {     private readonly RateLimiter _rateLimiter;      public RateLimitingChatClient(IChatClient innerClient, RateLimiter rateLimiter)         : base(innerClient)     {         _rateLimiter = rateLimiter;     }      public override async Task<ChatResponse> GetResponseAsync(         IEnumerable<ChatMessage> messages,         ChatOptions? options = null,         CancellationToken cancellationToken = default)     {         // 获取限流许可         using var lease = await _rateLimiter.AcquireAsync(1, cancellationToken);                  if (!lease.IsAcquired)             throw new InvalidOperationException("请求被限流拒绝");          // 转发到内部客户端         return await base.GetResponseAsync(messages, options, cancellationToken);     } } 

使用方式:

var limiter = new ConcurrencyLimiter(new() { PermitLimit = 2 }); var client = new RateLimitingChatClient(baseClient, limiter); 

2. 实现安全过滤中间件

过滤敏感信息,保护数据安全:

public sealed class ContentFilteringChatClient : DelegatingChatClient {     private readonly HashSet<string> _sensitiveWords;      public ContentFilteringChatClient(         IChatClient innerClient,          IEnumerable<string> sensitiveWords)         : base(innerClient)     {         _sensitiveWords = new HashSet<string>(             sensitiveWords,              StringComparer.OrdinalIgnoreCase);     }      public override async Task<ChatResponse> GetResponseAsync(         IEnumerable<ChatMessage> messages,         ChatOptions? options = null,         CancellationToken cancellationToken = default)     {         // 过滤输入消息         var filteredMessages = FilterMessages(messages);                  // 调用底层客户端         return await base.GetResponseAsync(             filteredMessages,              options,              cancellationToken);     }      private List<ChatMessage> FilterMessages(IEnumerable<ChatMessage> messages)     {         return messages.Select(m =>          {             if (m.Text != null && ContainsSensitiveWords(m.Text))             {                 return new ChatMessage(m.Role, MaskSensitiveWords(m.Text));             }             return m;         }).ToList();     } } 

3. 使用 ChatClientBuilder.Use 简化开发

除了继承 DelegatingChatClient,还可以使用内联方式:

var client = baseClient.AsBuilder()     // 添加日志中间件     .Use(async (messages, options, innerClient, cancellationToken) =>     {         Console.WriteLine($"[日志] 收到 {messages.Count()} 条消息");         var sw = Stopwatch.StartNew();                  var response = await innerClient.GetResponseAsync(             messages, options, cancellationToken);                  Console.WriteLine($"[日志] 耗时: {sw.ElapsedMilliseconds}ms");         return response;     })     // 添加重试中间件     .Use(async (messages, options, innerClient, cancellationToken) =>     {         for (int i = 0; i < 3; i++)         {             try             {                 return await innerClient.GetResponseAsync(                     messages, options, cancellationToken);             }             catch (Exception ex) when (i < 2)             {                 Console.WriteLine($"[重试] 第 {i + 1} 次失败,准备重试...");                 await Task.Delay(1000 * (i + 1));             }         }         throw new Exception("重试失败");     })     .Build(); 

优势对比:

方式 适用场景 优势
继承方式 复杂逻辑、资源管理 完全控制、可复用
内联方式 简单场景、快速开发 代码简洁、灵活

🔧 创建可复用扩展方法

将中间件封装为扩展方法,提高复用性:

public static class ChatClientExtensions {     public static ChatClientBuilder UseRateLimiting(         this ChatClientBuilder builder,         RateLimiter rateLimiter)     {         return builder.Use(innerClient =>              new RateLimitingChatClient(innerClient, rateLimiter));     }      public static ChatClientBuilder UseContentFiltering(         this ChatClientBuilder builder,         IEnumerable<string> sensitiveWords)     {         return builder.Use(innerClient =>              new ContentFilteringChatClient(innerClient, sensitiveWords));     }      public static ChatClientBuilder UsePerformanceMonitoring(         this ChatClientBuilder builder)     {         return builder.Use(async (messages, options, innerClient, ct) =>         {             var sw = Stopwatch.StartNew();             var response = await innerClient.GetResponseAsync(messages, options, ct);             Console.WriteLine($"[性能] {sw.ElapsedMilliseconds}ms");             return response;         });     } } 

使用扩展方法:

var client = baseClient.AsBuilder()     .UsePerformanceMonitoring()     .UseContentFiltering(new[] { "密码", "账号" })     .UseRateLimiting(rateLimiter)     .Build(); 

🏢 企业级最佳实践

1. 中间件执行顺序(洋葱模型)

请求: 外层 → 内层 → AI 模型 响应: AI 模型 → 内层 → 外层 

推荐顺序:

层级 中间件类型 原因
最外层 日志、监控 记录所有请求和响应
中间层 安全过滤 在消耗资源前拦截
内层 限流、缓存 减少 API 调用

示例配置:

var client = baseClient.AsBuilder()     .UsePerformanceMonitoring()  // 外层:监控     .UseContentFiltering(words)  // 中层:安全     .UseRateLimiting(limiter)    // 内层:限流     .Build(); 

2. 处理流式和非流式响应

需要同时支持两种模式:

// 非流式响应 public override async Task<ChatResponse> GetResponseAsync(...) {     var response = await base.GetResponseAsync(...);     ProcessFullContent(response.Text);     return response; }  // 流式响应 public override async IAsyncEnumerable<ChatResponseUpdate>      GetStreamingResponseAsync(...) {     StringBuilder accumulated = new();     await foreach (var update in base.GetStreamingResponseAsync(...))     {         accumulated.Append(update.Text);         yield return update;     }     ProcessFullContent(accumulated.ToString()); } 

3. 资源管理和生命周期

正确实现资源释放:

public sealed class MyCustomChatClient : DelegatingChatClient {     private readonly IDisposable _resource;          protected override void Dispose(bool disposing)     {         if (disposing)         {             _resource?.Dispose();         }         base.Dispose(disposing);     } } 

最佳实践:

  • ✅ 始终调用 base.Dispose(disposing)
  • ✅ 在 disposing == true 时释放托管资源
  • ✅ 使用 using 语句确保释放

4. 依赖注入集成

在 ASP.NET Core 中使用:

// Program.cs builder.Services.AddSingleton<RateLimiter>(_ =>      new ConcurrencyLimiter(new() { PermitLimit = 10 }));  builder.Services.AddChatClient(services => {     var baseClient = /* 创建基础客户端 */;          return baseClient         .AsBuilder()         .UsePerformanceMonitoring()         .UseContentFiltering(new[] { "敏感词" })         .UseRateLimiting(services.GetRequiredService<RateLimiter>())         .Build(); });  // 在服务中注入使用 public class MyService {     private readonly IChatClient _chatClient;          public MyService(IChatClient chatClient)     {         _chatClient = chatClient;     } } 

🎯 总结

  • 三种实现方式:继承 DelegatingChatClient、Use 内联、扩展方法
  • 常见场景:限流、安全、监控、重试、审计
  • 洋葱模型:外层监控、中层安全、内层限流
  • 生产就绪:资源管理、依赖注入、错误处理

选择建议:

  • 💻 复杂逻辑、需要资源管理 → 继承 DelegatingChatClient
  • 💻 简单场景、快速开发 → Use 内联方式
  • 💻 可复用组件 → 扩展方法

下一步: 探索 MEAI ChatClient中间件和Function Invoker的区别

发表评论

评论已关闭。

相关文章