surging 集成SuperSocket预发布版本2.0

一、概述

周末在家试着扩展SuperSocket,因为之前都是只支持.net framework, 后面出现支持.NET CORE 的SuperSocket 2.0 ,然后集成进来和dotnetty 做下对比,dotnetty 有多强,我压测可以支持20w/s, 然后客户提供的服务器,通过外网压测网关,把上行速度50MB带宽的网络跑满了,引擎主机CPU只是在15%左右,完全没有跑满。然后再试试国人开发的SuperSocket看下性能怎么样。

surging 集成SuperSocket预发布版本2.0

 

surging 集成SuperSocket预发布版本2.0

 surging 集成SuperSocket预发布版本2.0

surging 集成SuperSocket预发布版本2.0

木舟 (Kayak) 是什么?

       木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。

     凯亚物联网平台:http://117.72.121.2:3100(用户名:fanly  密码:123456)(木舟物联网有人取了,准备改名原神凯亚,凡是交托于他的任务,总能得到解决

    链路跟踪Skywalking V8:http://117.72.121.2:8080/

      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)

 

二、集成SuperSocket

作为去中心化的微服务引擎,相关的引擎组件,中间件都可以替换,就比如核心的RPC组件dotnetty 都可以替换成其它组件,下面介绍如何进行替换

创建服务端消息监听SuperSocketServerMessageListener,需要继承IMessageListener,代码如下:

  public class SuperSocketServerMessageListener : IMessageListener, IDisposable   {       public event ReceivedDelegate Received;        private readonly ILogger<SuperSocketServerMessageListener> _logger;        private readonly ITransportMessageDecoder _transportMessageDecoder;       private readonly ITransportMessageEncoder _transportMessageEncoder;       private readonly IServiceEngineLifetime _serviceEngineLifetime;        public SuperSocketServerMessageListener(ILogger<SuperSocketServerMessageListener> logger, ITransportMessageCodecFactory codecFactory, IServiceEngineLifetime serviceEngineLifetime)       {           _logger = logger;           _transportMessageEncoder = codecFactory.GetEncoder();           _transportMessageDecoder = codecFactory.GetDecoder();           _serviceEngineLifetime = serviceEngineLifetime;       }       public async Task StartAsync(EndPoint endPoint)       {            _serviceEngineLifetime.ServiceEngineStarted.Register(async () =>           {               try               {                   var ipEndPoint = endPoint as IPEndPoint;                   var host = SuperSocketHostBuilder.Create<TransportMessage, TransportMessagePipelineFilter>()                                  .UsePackageHandler( (s, p) =>                   {                       Task.Run(async () =>                       {                           var sender = new SuperSocketServerMessageSender(_transportMessageEncoder, s);                           await OnReceived(sender, p);                       });                       return ValueTask.CompletedTask;                   })                   .ConfigureSuperSocket(options =>                   {                       options.Name = "Echo Server";                       options.Logger = _logger;                        options.AddListener(new ListenOptions                       {                           Ip = ipEndPoint.Address.ToString(),                           Port = ipEndPoint.Port,                                                }                       );                   })                   .ConfigureLogging((hostCtx, loggingBuilder) =>                   {                       loggingBuilder.AddConsole();                   })                   .Build();                   await host.RunAsync();               }               catch (Exception ex)               {                   _logger.LogError($"SuperSocket服务主机启动失败,监听地址:{endPoint}。 ");               }           });        }               public async Task OnReceived(IMessageSender sender, TransportMessage message)       {           if (Received == null)               return;           await Received(sender, message);       }        public void Dispose()       {        }   }

创建客户端消息监听SuperSocketTransportClientFactory,需要继承ITransportClientFactory,代码如下:

 

 internal class SuperSocketTransportClientFactory : ITransportClientFactory, IDisposable  {      private readonly ITransportMessageEncoder _transportMessageEncoder;      private readonly ITransportMessageDecoder _transportMessageDecoder;      private readonly ILogger<SuperSocketTransportClientFactory> _logger;      private readonly IServiceExecutor _serviceExecutor;      private readonly IHealthCheckService _healthCheckService;      private readonly ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>> _clients = new ConcurrentDictionary<EndPoint, Lazy<Task<ITransportClient>>>();       public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger<SuperSocketTransportClientFactory> logger)  : this(codecFactory, healthCheckService, logger, null)      {      }       public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger<SuperSocketTransportClientFactory> logger, IServiceExecutor serviceExecutor)      {          _transportMessageEncoder = codecFactory.GetEncoder();          _transportMessageDecoder = codecFactory.GetDecoder();          _logger = logger;          _serviceExecutor = serviceExecutor;          _healthCheckService = healthCheckService;      }      public async Task<ITransportClient> CreateClientAsync(EndPoint endPoint)      {          var key = endPoint;          if (_logger.IsEnabled(LogLevel.Debug))              _logger.LogDebug($"准备为服务端地址:{key}创建客户端。");          try          {              return await _clients.GetOrAdd(key           , k => new Lazy<Task<ITransportClient>>(async () =>           {               //客户端对象                var client = new EasyClient<TransportMessage>(new TransportMessagePipelineFilter()).AsClient();               var messageListener = new MessageListener();               var messageSender = new SuperSocketMessageClientSender(_transportMessageEncoder, client);               await client.ConnectAsync(endPoint);               client.PackageHandler += async (sender, package) =>               {                   await messageListener.OnReceived(messageSender, package);               };               client.StartReceive();               //创建客户端               var transportClient = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor);               return transportClient;           }               )).Value;//返回实例          }          catch          {              //移除              _clients.TryRemove(key, out var value);              var ipEndPoint = endPoint as IPEndPoint;              //标记这个地址是失败的请求              if (ipEndPoint != null)                  await _healthCheckService.MarkFailure(new IpAddressModel(ipEndPoint.Address.ToString(), ipEndPoint.Port));              throw;          }      }        public void Dispose()      {          foreach (var client in _clients.Values)          {              (client as IDisposable)?.Dispose();          }      }  }

注册初始化SuperSocket引擎模块,需要继承EnginePartModule, 代码如下:

  public class SuperSocketModule : EnginePartModule   {       public override void Initialize(AppModuleContext context)       {           base.Initialize(context);       }        /// <summary>       /// Inject dependent third-party components       /// </summary>       /// <param name="builder"></param>       protected override void RegisterBuilder(ContainerBuilderWrapper builder)       {           base.RegisterBuilder(builder);           builder.Register(provider =>           {               IServiceExecutor serviceExecutor = null;               if (provider.IsRegistered(typeof(IServiceExecutor)))                   serviceExecutor = provider.Resolve<IServiceExecutor>();               return new SuperSocketTransportClientFactory(provider.Resolve<ITransportMessageCodecFactory>(),                     provider.Resolve<IHealthCheckService>(),                   provider.Resolve<ILogger<SuperSocketTransportClientFactory>>(),                   serviceExecutor);           }).As(typeof(ITransportClientFactory)).SingleInstance();           if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp ||               AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)           {               RegisterDefaultProtocol(builder);           }       }        private void RegisterDefaultProtocol(ContainerBuilderWrapper builder)       {           builder.Register(provider =>           {               return new SuperSocketServerMessageListener(provider.Resolve<ILogger<SuperSocketServerMessageListener>>(),                     provider.Resolve<ITransportMessageCodecFactory>(),                        provider.Resolve<IServiceEngineLifetime>());           }).SingleInstance();           builder.Register(provider =>           {               var serviceExecutor = provider.ResolveKeyed<IServiceExecutor>(CommunicationProtocol.Tcp.ToString());               var messageListener = provider.Resolve<SuperSocketServerMessageListener>();               return new DefaultServiceHost(async endPoint =>               {                   await messageListener.StartAsync(endPoint);                   return messageListener;               }, serviceExecutor);           }).As<IServiceHost>();       }   }

客户端服务端消息发送,需要继承IMessageSender, 代码如下:

 

    public abstract class SuperSocketMessageSender     {         private readonly ITransportMessageEncoder _transportMessageEncoder;          protected SuperSocketMessageSender(ITransportMessageEncoder transportMessageEncoder)         {             _transportMessageEncoder = transportMessageEncoder;         }          protected byte[] GetByteBuffer(TransportMessage message)         {             var data = _transportMessageEncoder.Encode(message).ToList();             data.AddRange(Encoding.UTF8.GetBytes("rn"));             //var buffer = PooledByteBufferAllocator.Default.Buffer();             return data.ToArray();         }     }      public class SuperSocketMessageClientSender : SuperSocketMessageSender, IMessageSender     {         private readonly IEasyClient<TransportMessage> _client;           public SuperSocketMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IEasyClient<TransportMessage> client) : base(transportMessageEncoder)         {             _client = client;          }          /// <summary>         /// 发送消息。         /// </summary>         /// <param name="message">消息内容。</param>         /// <returns>一个任务。</returns>          public async Task SendAsync(TransportMessage message)         {             var buffer = GetByteBuffer(message);            await _client.SendAsync(buffer);          }          /// <summary>         /// 发送消息并清空缓冲区。         /// </summary>         /// <param name="message">消息内容。</param>         /// <returns>一个任务。</returns>          public async Task SendAndFlushAsync(TransportMessage message)         {             var buffer = GetByteBuffer(message);             await _client.SendAsync(buffer);              // _client.StartReceive();             //var p=  await _client.ReceiveAsync();         }     }      #region Implementation of IMessageSender     public class SuperSocketServerMessageSender : SuperSocketMessageSender, IMessageSender     {         private readonly IAppSession _session;          public SuperSocketServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IAppSession session) : base(transportMessageEncoder)         {             _session = session;         }           /// <summary>         /// 发送消息。         /// </summary>         /// <param name="message">消息内容。</param>         /// <returns>一个任务。</returns>          public async Task SendAsync(TransportMessage message)         {             var buffer = GetByteBuffer(message);            await _session.SendAsync(buffer);         }          /// <summary>         /// 发送消息并清空缓冲区。         /// </summary>         /// <param name="message">消息内容。</param>         /// <returns>一个任务。</returns>          public async Task SendAndFlushAsync(TransportMessage message)         {             var buffer = GetByteBuffer(message);            await _session.SendAsync(buffer);         }      }     #endregion

 

SuperSocket过滤器,需要继承TerminatorPipelineFilter<TransportMessage>, 代码如下:

 

    public class TransportMessagePipelineFilter : TerminatorPipelineFilter<TransportMessage>     {         private readonly ITransportMessageDecoder _transportMessageDecoder;         public TransportMessagePipelineFilter() : base(new[] { (byte)'r', (byte)'n' })         {             _transportMessageDecoder = ServiceLocator.GetService<ITransportMessageCodecFactory>().GetDecoder();         }           public override TransportMessage Filter(ref SequenceReader<byte> bufferStream)         {             try             {                 var bytes = bufferStream.Sequence.Slice(0, bufferStream.Length - 2).ToArray();                 var transportMessage = _transportMessageDecoder.Decode(bytes);                 return transportMessage;             }             finally             {                 bufferStream.Advance(bufferStream.Length);             }         }     }

 

三、如何加载SuperSocket引擎组件

第一种方式:

去掉Surging.Core.DotNetty引用,添加Surging.Core.SuperSocket

surging 集成SuperSocket预发布版本2.0

 第二种方式

添加Surging.Core.DotNetty,Surging.Core.SuperSocket这两个应用,在surgingSettings.json配置文件中把Packages的using列表中的DotNettyModule改成SuperSocketModule

surging 集成SuperSocket预发布版本2.0

四、结果

可能是预发布版本,在测试当中还是有些问题,kayka物联网平台换成SuperSocket还是会发生错误,暂时没有条件进行压测,可能是因为预发布版本,作者还需要完善,等正式版之后再压测做对比吧

 

五、总结

因为这段时间比较忙,还需要协助客户拆分服务,缓存降级,消息队列,等忙完这段时间,线上物联网平台会开通端口给用户进行测试,我也会努力把物联网进行完善,让微服务物联网平台能走向新的高度。

 

发表评论

评论已关闭。

相关文章