http流量镜像

http流量镜像

“流量镜像”是指将网络中的数据流量复制一份,并将这份复制流量发送到另一个目的地(如监控、分析或安全检测系统)。这项技术常用于网络安全、故障排查、业务灰度发布等场景。

主要应用场景

  • 安全监控与威胁检测

    将生产环境的流量镜像到安全分析设备(如IDS/IPS),用于实时监控和威胁检测。

  • 性能分析与故障排查

    将流量镜像到分析平台,对网络异常、延迟、丢包等问题进行实时排查和定位。

  • 灰度发布和A/B测试

    将真实用户流量镜像到新版本服务,进行灰度环境验证和兼容性测试,不影响真实用户体验。

  • 合规与审计

    对重要业务流量进行行为记录,以满足合规和审计要求。

VKProxy 目前只支持 http流量镜像, 需注意由于会再一次发送http 请求,请求body会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别body很大的请求

设置

大家可以在Metadata中设置缓存, 具体设置项如下

  • MirrorCluster

    镜像流量发送到的集群id

配置示例:

{   "ReverseProxy": {     "Routes": {       "a": {         "Order": 0,         "Match": {             "Hosts": [ "api.com" ],             "Paths": [ "*" ]         },         "ClusterId": "apidemo",         "Metadata": {           "MirrorCluster": "apidemoMirror"         }       }     },     "Clusters": {       "apidemo": {         "LoadBalancingPolicy": "Hash",         "Metadata": {           "HashBy": "header",           "Key": "X-forwarded-For"         },         "Destinations": [           {             "Address": "https://xxx.lt"           }         ]       },       "apidemoMirror": {         "LoadBalancingPolicy": "Hash",         "Metadata": {           "HashBy": "header",           "Key": "X-forwarded-For"         },         "Destinations": [           {             "Address": "http://xxx.org/"           }         ]       }     }   } } 

具体实现

首先需要缓存body 内容,这里实现一个简单的 ReadBufferingStream

 public class ReadBufferingStream : Stream, IDisposable {     private readonly SparseBufferWriter<byte> bufferWriter;     protected Stream innerStream;      public ReadBufferingStream(Stream innerStream)     {         this.innerStream = innerStream;         bufferWriter = new SparseBufferWriter<byte>();     }      public override bool CanRead => innerStream.CanRead;      public override bool CanSeek => innerStream.CanSeek;      public override bool CanWrite => innerStream.CanWrite;      public override long Length => innerStream.Length;      public override long Position     {         get => innerStream.Position;         set => innerStream.Position = value;     }      public override int WriteTimeout     {         get => innerStream.WriteTimeout;         set => innerStream.WriteTimeout = value;     }      public Stream BufferingStream => bufferWriter.AsStream(true);      public override void Flush()     {         innerStream.Flush();     }      public override Task FlushAsync(CancellationToken cancellationToken)     {         return innerStream.FlushAsync(cancellationToken);     }      public override int Read(byte[] buffer, int offset, int count)     {         var res = innerStream.Read(buffer, offset, count);          // Zero-byte reads (where the passed in buffer has 0 length) can occur when using PipeReader, we don't want to accidentally complete the RequestBody logging in this case         if (count == 0)         {             return res;         }          bufferWriter.Write(buffer.AsSpan(offset, res));          return res;     }      public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)     {         var res = await innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);          if (count == 0)         {             return res;         }          bufferWriter.Write(buffer.AsSpan(offset, res));          return res;     }      public override long Seek(long offset, SeekOrigin origin)     {         return innerStream.Seek(offset, origin);     }      public override void SetLength(long value)     {         innerStream.SetLength(value);     }      public override void Write(byte[] buffer, int offset, int count)     {         innerStream.Write(buffer, offset, count);     }      public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)     {         return innerStream.WriteAsync(buffer, offset, count, cancellationToken);     }      public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)     {         return innerStream.WriteAsync(buffer, cancellationToken);     }      public override void Write(ReadOnlySpan<byte> buffer)     {         innerStream.Write(buffer);     }      public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)     {         return innerStream.BeginRead(buffer, offset, count, callback, state);     }      public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)     {         return innerStream.BeginWrite(buffer, offset, count, callback, state);     }      public override int EndRead(IAsyncResult asyncResult)     {         return innerStream.EndRead(asyncResult);     }      public override void EndWrite(IAsyncResult asyncResult)     {         innerStream.EndWrite(asyncResult);     }      public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)     {         var res = await innerStream.ReadAsync(buffer, cancellationToken);         if (buffer.IsEmpty)         {             return res;         }          bufferWriter.Write(buffer.Slice(0, res).Span);          return res;     }      public override ValueTask DisposeAsync()     {         return innerStream.DisposeAsync();     }      protected override void Dispose(bool disposing)     {         if (disposing)         {             bufferWriter.Dispose();         }     } } 

然后利用中间件进行镜像处理

public class MirrorFunc : IHttpFunc {     private readonly IServiceProvider serviceProvider;     private readonly IHttpForwarder forwarder;     private readonly ILoadBalancingPolicyFactory loadBalancing;     private readonly IForwarderHttpClientFactory forwarderHttpClientFactory;     private readonly ProxyLogger logger;      public int Order => int.MinValue;      public MirrorFunc(IServiceProvider serviceProvider, IHttpForwarder forwarder, ILoadBalancingPolicyFactory loadBalancing, IForwarderHttpClientFactory forwarderHttpClientFactory, ProxyLogger logger)     {         this.serviceProvider = serviceProvider;         this.forwarder = forwarder;         this.loadBalancing = loadBalancing;         this.forwarderHttpClientFactory = forwarderHttpClientFactory;         this.logger = logger;     }      public RequestDelegate Create(RouteConfig config, RequestDelegate next)     {         if (config.Metadata == null || !config.Metadata.TryGetValue("MirrorCluster", out var mirrorCluster) || string.IsNullOrWhiteSpace(mirrorCluster)) return next;          return c => Mirror(c, mirrorCluster, next);     }      private async Task Mirror(HttpContext c, string mirrorCluster, RequestDelegate next)     {         var config = serviceProvider.GetRequiredService<IConfigSource<IProxyConfig>>();         if (config.CurrentSnapshot == null || config.CurrentSnapshot.Clusters == null || !config.CurrentSnapshot.Clusters.TryGetValue(mirrorCluster, out var cluster) || cluster == null)         {             await next(c);             return;         }          var originBody = c.Request.Body;         using var buffer = new ReadBufferingStream(originBody);         c.Request.Body = buffer;          try         {             await next(c);         }         finally         {             c.Request.Body = buffer.BufferingStream;             try             {                 var proxyFeature = c.Features.GetRequiredFeature<IReverseProxyFeature>();                 var origin = proxyFeature.SelectedDestination;                 var selectedDestination = loadBalancing.PickDestination(proxyFeature, cluster);                 proxyFeature.SelectedDestination = origin;                 if (selectedDestination != null)                 {                     cluster.InitHttp(forwarderHttpClientFactory);                     await forwarder.SendAsync(c, proxyFeature, selectedDestination, cluster, new NonHttpTransformer(proxyFeature.Route.Transformer));                 }             }             catch (Exception ex)             {                 logger.LogWarning(ex, "Mirror failed");             }             finally             {                 c.Request.Body = originBody;             }         }     } } 

所以说会再一次发送http 请求,请求body会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别body很大的请求

VKProxy 是使用c#开发的基于 Kestrel 实现 L4/L7的代理(感兴趣的同学烦请点个github小赞赞呢)

发表评论

评论已关闭。

相关文章