C# WebSocket Servers — Fleck、SuperSocket、TouchSocke

最近在维护老项目,感觉内存一直都有问题,定位到问题是WebSocketServer的问题,了解了 Fleck、SuperSocket、TouchSocke 等开源项目,这里记录一下。

.net5、.net6、.net7、.net8 项目已经集成了WebSocket,只要  app.UseWebSockets() 代码就可以了, 详情见 WebSockets support in ASP.NET Core | Microsoft Learn

0. 控制台运行的代码

代码:https://gitee.com/Karl_Albright/csharp-web-socket-server

internal class Program {     static void Main(string[] args)     {         WebSockSvr server = new WebSockSvr();         server.Start();         server.SendDatas();         Console.ReadLine();     } }

1. Fleck 

兼容 .NetFramework V4.0、.NetFramework V4.5、.NetCoreApp V2.0、.NetStandard V2.0

dotnet add package Fleck --version 1.2.0 

using Fleck;  namespace FleckDemo {public class WebSockSvr     {         public List<IWebSocketConnection> ClinetList = new();         private WebSocketServer service;         public WebSockSvr()          {             service = new WebSocketServer("ws://0.0.0.0:4040");         }         public void Start()          {             service.Start(socket =>             {                 socket.OnOpen = () =>                 {                     Console.WriteLine("Open!");                     ClinetList.Add(socket);                 };                 socket.OnClose = () =>                 {                     Console.WriteLine("Close!");                     ClinetList.Remove(socket);                 };                 socket.OnMessage = message =>                 {                     Console.WriteLine(message);                     ClinetList.ToList().ForEach(s => s.Send("Echo: " + message));                 };             });         }         public void SendDatas()         {             for (int i = 0; i < 200; i++)             {                 Task.Run(async () =>                 {                     while (true)                     {                         try                         {                             for (int j = 0; j < ClinetList.Count; j++)                             {                                 var sock = ClinetList[j];                                 if (sock.IsAvailable)                                 {                                     await sock.Send($"Dev[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}, 12.34, 34.56, 56.78, "77705683"]");                                 }                             }                         }                         catch (Exception ex)                         {                             Console.WriteLine("出现异常:" + ex.Message + "rn" + ex.StackTrace);                         }                         finally                         {                             await Task.Delay(1000);                         }                     }                 });             }         }     } }

 

2. SuperSocket1.6 

截止到现在 superSocket 2.0版本还没正式发布,有beta.26版本,和1.6相比改动挺大的。

兼容 .NetFramework V4.6.1、.NetFramework V4.6.2、.NetFramework V4.7、.NetFramework V4.7.1、.NetFramework V4.7.2、.NetFramework V4.8、.NetFramework V4.8.1

dotnet add package SuperSocket --version 1.6.6.1 dotnet add package SuperSocket.Engine --version 1.6.6.1 dotnet add package SuperSocket.WebSocket --version 1.6.6.1

using SuperSocket.SocketBase; using SuperSocket.WebSocket; using System; using System.Collections.Generic; using System.Threading.Tasks;  namespace SuperSocketDemo {     public class WebSockSvr     {         public List<WebSocketSession> ClinetList { get; set; } = new List<WebSocketSession>();         private WebSocketServer server;         public WebSockSvr()         {             server = new WebSocketServer();             server.NewMessageReceived += Ws_NewMessageReceived;//当有信息传入时             server.NewSessionConnected += Ws_NewSessionConnected;//当有用户连入时             server.SessionClosed += Ws_SessionClosed;//当有用户退出时             server.NewDataReceived += Ws_NewDataReceived;//当有数据传入时         }          public void Start()         {             if (server.Setup(4040))//绑定端口                 server.Start();//启动服务           }          //public void SendDatas()         //{         //    //对当前已连接的所有会话进行广播         //    foreach (var session in server.GetAllSessions())         //    {         //        session.Send($"Dev[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}, 12.34, 34.56, 56.78, "77705683"]");         //        Thread.Sleep(1000);         //    }         //}         public void SendDatas()         {             for (int i = 0; i < 200; i++)             {                 Task.Run(async () =>                 {                     while (true)                     {                         try                         {                             for (int j = 0; j < ClinetList.Count; j++)                             {                                 var sock = ClinetList[j];                                 if (sock.Connected)                                 {                                     sock.TrySend($"Dev[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}, 12.34, 34.56, 56.78, "77705683"]");                                 }                             }                         }                         catch (Exception ex)                         {                             Console.WriteLine("出现异常:" + ex.Message + "rn" + ex.StackTrace);                         }                         finally                         {                             await Task.Delay(10);                         }                     }                 });             }          }          private void Ws_NewSessionConnected(WebSocketSession session)         {             ClinetList.Add(session);         }          private void Ws_NewMessageReceived(WebSocketSession session, string value)         {          }          private void Ws_SessionClosed(WebSocketSession session, CloseReason value)         {             ClinetList.Remove(session);         }          private void Ws_NewDataReceived(WebSocketSession session, byte[] value)         {          }     } }

 

3. SuperSocket2.0.0-beta.26

兼容 .NetStandard V2.1、.Net5、.Net6、.Net7、.Net8

dotnet add package SuperSocket.WebSocket.Server --version 2.0.0-beta.26

using Microsoft.Extensions.Hosting; using SuperSocket.Server; using SuperSocket.Server.Abstractions; using SuperSocket.Server.Host; using SuperSocket.WebSocket.Server;  namespace SuperSocket2Demo {
public class WebSockSvr { public List<WebSocketSession> ClinetList { get; set; } = new(); private IServer server; public WebSockSvr() { server = WebSocketHostBuilder.Create() .ConfigureSuperSocket(opts => { opts.AddListener(new ListenOptions { Ip = "127.0.0.1", Port = 4040 }); }) .UseSessionHandler((session) => { var sess = (WebSocketSession)session; ClinetList.Add(sess); return ValueTask.CompletedTask; }, (session, reason) => { var sess = (WebSocketSession)session; ClinetList.Remove(sess); return ValueTask.CompletedTask; }) .UseWebSocketMessageHandler(async (session, message) => { }) .BuildAsServer(); } public Task Start() { return server.StartAsync(); } public void SendDatas() { for (int i = 0; i < 200; i++) { Task.Run(async () => { while (true) { try { for (int j = 0; j < ClinetList.Count; j++) { var sock = ClinetList[j]; if (sock.State == SessionState.Connected) { await sock.SendAsync($"Dev[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}, 12.34, 34.56, 56.78, "77705683"]"); } } } catch (Exception ex) { Console.WriteLine("出现异常:" + ex.Message + "rn" + ex.StackTrace); } finally { await Task.Delay(10); } } }); } } } }

 

4. TouchSocket

目前兼容 .NetFramework V4.5、.NetFramework V4.6.2、.NetFramework V4.7.2、.NetFramework V4.8.1、.NetStandard V2.0、.NetStandard V2.1、.Net6、.Net7、.Net8

dotnet add package TouchSocket --version 2.1.5 dotnet add package TouchSocket.Http --version 2.1.5 dotnet add package TouchSocket.WebApi --version 2.1.5

using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using TouchSocket.Core; using TouchSocket.Http; using TouchSocket.Http.WebSockets; using TouchSocket.Rpc; using TouchSocket.Sockets; using TouchSocket.WebApi;  namespace TouchSocketDemo.Core {     public class WebSockSvr     {         public List<IHttpSession> ClinetList { get; set; } = new List<IHttpSession>();         private HttpService service;         public WebSockSvr()          {             service = new HttpService();         }         public async Task Start()          {             await service.SetupAsync(new TouchSocketConfig()//加载配置                  .SetListenIPHosts(4040)                  .ConfigureContainer(a =>                  {                      a.AddConsoleLogger();                  })                  .ConfigurePlugins(a =>                  {                      a.UseWebSocket().SetWSUrl(null).UseAutoPong();                      //a.Add<MyWebSocketPlugin>();                      a.Add(typeof(IWebSocketHandshakedPlugin), async (IWebSocket client, HttpContextEventArgs e) =>                      {                          ClinetList.Add(client.Client);                          await e.InvokeNext();                      });                      a.Add(typeof(IWebSocketClosingPlugin), async (IWebSocket client, ClosedEventArgs e) =>                      {                          ClinetList.Remove(client.Client);                          await e.InvokeNext();                      });                       a.Add(typeof(IWebSocketReceivedPlugin), async (IWebSocket client, WSDataFrameEventArgs e) =>                      {                          switch (e.DataFrame.Opcode)                          {                              case WSDataType.Close:                                  {                                      await client.CloseAsync("断开");                                  }                                  return;                              case WSDataType.Ping:                                  await client.PongAsync();//收到ping时,一般需要响应pong                                  break;                              case WSDataType.Pong:                                  break;                              default:                                  break;                          }                           await e.InvokeNext();                      });                       a.UseWebSocketReconnection();//a.Add<MyWebSocketPlugin>();                  }));             await service.StartAsync();         }           public void SendDatas()         {             for (int i = 0; i < 200; i++)             {                 Task.Run(async () =>                 {                     while (true)                     {                         try                         {                             var clientList = ClinetList.ToList();                             for (int j = 0; j < clientList.Count; j++)                             {                                 var sock = (HttpSessionClient)clientList[j];                                 if (sock.Online)                                 {                                     await sock.WebSocket.SendAsync($"Dev[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}, 12.34, 34.56, 56.78, "77705683"]");                                 }                             }                         }                         catch (Exception ex)                         {                             Console.WriteLine("出现异常:" + ex.Message + "rn" + ex.StackTrace);                         }                         finally                         {                             await Task.Delay(10);                         }                     }                 });             }         }
public class MyWebSocketPlugin : PluginBase, IWebSocketHandshakingPlugin, IWebSocketHandshakedPlugin, IWebSocketReceivedPlugin { public MyWebSocketPlugin(ILog logger) { this.m_logger = logger; } public async Task OnWebSocketHandshaking(IWebSocket client, HttpContextEventArgs e) { if (client.Client is IHttpSessionClient socketClient) { //服务端 var id = socketClient.Id; } else if (client.Client is IHttpClient httpClient) { //客户端 } this.m_logger.Info("WebSocket正在连接"); await e.InvokeNext(); } public async Task OnWebSocketHandshaked(IWebSocket client, HttpContextEventArgs e) { this.m_logger.Info("WebSocket成功连接"); await e.InvokeNext(); } private readonly ILog m_logger; public async Task OnWebSocketReceived(IWebSocket client, WSDataFrameEventArgs e) { switch (e.DataFrame.Opcode) { case WSDataType.Close: {await client.CloseAsync("断开"); } return; case WSDataType.Ping:await client.PongAsync();//收到ping时,一般需要响应pong break; case WSDataType.Pong: this.m_logger.Info("Pong"); break; default: { //其他报文,需要考虑中继包的情况。所以需要手动合并 WSDataType.Cont类型的包。 //或者使用消息合并器 //获取消息组合器 var messageCombinator = client.GetMessageCombinator(); try { //尝试组合 if (messageCombinator.TryCombine(e.DataFrame, out var webSocketMessage)) { //组合成功,必须using释放模式 using (webSocketMessage) { //合并后的消息 var dataType = webSocketMessage.Opcode; //合并后的完整消息 var data = webSocketMessage.PayloadData; if (dataType == WSDataType.Text) { //按文本处理 } else if (dataType == WSDataType.Binary) { //按字节处理 } else { //可能是其他自定义协议 } } } } catch (Exception ex) { this.m_logger.Exception(ex); messageCombinator.Clear();//当组合发生异常时,应该清空组合器数据 } } break; } await e.InvokeNext(); } } } }

 

发表评论

评论已关闭。

相关文章