c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

想用ZeroMq的发布订阅者模式,又不想写一大串switch case?

想用RPC函数代理机制,又想多对多进行通讯?

下面就结合二者的优点重新封装一套通讯模块

一、先写ZeroMq的发布订阅这模式

  •  先做个代理,负责分发事件,代码如下:
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

1  // 1. 初始化代理(Proxy) 2  var xSubSocket = new XSubscriberSocket("@tcp://127.0.0.1:61225");  3  var xPubSocket = new XPublisherSocket("@tcp://127.0.0.1:52216"); 4  { 5       _proxy = new Proxy(xSubSocket, xPubSocket); 6        // 2. 启动代理(异步运行) 7       var proxyTask = Task.Run(() => _proxy.Start()); 8   }

View Code

  • 封装客户端代码
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

  1 using Communication.Nmq.Dto;   2 using System;   3 using System.Collections.Generic;   4    5 namespace Communication.Nmq   6 {    7     public class MqClientMain   8     {   9         public readonly static MqClientMain Instance = new MqClientMain();  10         internal Transceiver _client;  11         private List<MqType> _subscriberList = new();  12         private List<MqType> _unSubscriberList = new();  13         private readonly MethodManager _methodManager = new MethodManager();  14         private bool _isListnerAll;  15         private MqType _owner;  16         protected MqClientMain() { }  17   18   19         /// <summary>  20         /// 函数反射监听(可监听多个)  21         /// </summary>  22         /// <param name="targetInstance"></param>  23         /// <param name="targgetMonitorList"></param>  24         /// <returns></returns>  25         public MqClientMain ProxyAddInstanceMethods<InterfaceType>(InterfaceType targetInstance, params MqType[] targgetMonitorList) where InterfaceType : class  26         {  27             foreach (MqType targgetMonitor in targgetMonitorList)  28             {  29                 if (!_subscriberList.Contains(targgetMonitor))  30                     _subscriberList.Add(targgetMonitor);  31             }  32   33             _methodManager.AddInstanceMethods(targetInstance);  34             return this;  35         }  36   37           38         /// <summary>  39         ///额外增加事件(可监听多个)  40         /// </summary>  41         /// <param name="targetInstance"></param>  42         /// <param name="mathName"></param>  43         /// <param name="targgetMonitor"></param>  44         /// <returns></returns>  45         public MqClientMain ProxyAddMethods(object targetInstance, string[] mathName, params MqType[] targgetMonitorList)  46         {  47             foreach (MqType targgetMonitor in targgetMonitorList)  48             {  49                 if (!_subscriberList.Contains(targgetMonitor))  50                     _subscriberList.Add(targgetMonitor);  51             }  52             _methodManager.AddMethods(mathName, targetInstance);  53             return this;  54         }  55   56         /// <summary>  57         /// 开始通讯  58         /// </summary>  59         /// <param name="owner">注册者类型(你是谁)</param>  60         public virtual void Start(MqType owner)  61         {  62             if (_client == null)  63             {  64                 if (_isListnerAll)  65                 {  66                     //监听所有会监听到自己,所以不监听自己  67                     _subscriberList.Remove(owner);  68                 }  69                 _owner = owner;  70                 _client = new Transceiver(owner, _subscriberList, _unSubscriberList, _methodManager);  71                  72   73             }  74         }  75   76         public void Stop()  77         {  78             _client.Dispose();  79         }  80   81         /// <summary>  82         /// 发布事件  83         /// </summary>  84         /// <param name="msg"></param>  85         public void MqSendMessage(string msg)  86         {  87             if (_client != null)  88             {  89                 _client.SendMessage(msg);  90             }  91         }  92            93         /// <summary>  94         /// 代理列表  95         /// </summary>  96         private Dictionary<Type, object> _proxyList = new();  97         /// <summary>  98         /// 获取代理  99         /// </summary> 100         /// <typeparam name="T"></typeparam> 101         /// <returns></returns> 102         public T GetInterfaceProxy<T>() where T : class 103         { 104             if (_client == null) 105                 return null; 106             if (!_proxyList.ContainsKey(typeof(T))) 107                 _proxyList.Add(typeof(T), InterfaceProxy<T>.Create(_client)); 108             return (T)_proxyList[typeof(T)]; 109         } 110     } 111 }

View Code

二、封装一下RPC的函数代理

  • 封装一个接口代理类
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

 1  internal class InterfaceProxy<TInterface> : DispatchProxy where TInterface : class  2  {  3      private static Transceiver _client;  4   5      private static JsonSerializerOptions _options = new JsonSerializerOptions  6      {  7          WriteIndented = true,  // 让 JSON 格式更加可读  8          Converters = { new JsonStringEnumConverter() }  // 使用字符串枚举转换器  9      }; 10      internal static TInterface Create(Transceiver client) 11      { 12          object proxy = Create<TInterface, InterfaceProxy<TInterface>>(); 13          _client = client; 14          return (TInterface)proxy; 15      } 16      protected override object Invoke(MethodInfo targetMethod, object[] args) 17      { 18          var message = new ProxyMessage 19          { 20              InterfaceType = typeof(TInterface).FullName, 21              Method = targetMethod.Name, 22              Parameters = args, 23          }; 24          _client.SendMessage(System.Text.Json.JsonSerializer.Serialize(message, _options)); 25          return targetMethod.ReturnType; 26      } 27  }

View Code

  • 复制一份RPC封装的获取类里面的所有方法
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

  1  public class MethodManager   2  {   3      private readonly string[] instanceMethodsOnObject = new string[4] { "Equals", "GetHashCode", "GetType", "ToString" };   4    5      /// <summary>   6      /// 获取一个线程安全的字典,其中键为字符串(不区分大小写),值为另一个线程安全的字典。   7      /// 内部字典的键为整数,值为 Method 对象。   8      /// </summary>   9      public ConcurrentDictionary<string, ConcurrentDictionary<int, Method>> Methods { get; } = new ConcurrentDictionary<string, ConcurrentDictionary<int, Method>>(StringComparer.OrdinalIgnoreCase);  10   11      /// <summary>  12      /// 根据方法名和参数数量获取方法  13      /// </summary>  14      /// <param name="name">方法名</param>  15      /// <param name="paramCount">参数数量</param>  16      /// <returns>找到的方法对象,若未找到则返回null</returns>  17      public Method Get(string name, int paramCount)  18      {  19          if (Methods.TryGetValue(name, out var value) && value.TryGetValue(paramCount, out var value2))  20          {  21              return value2;  22          }  23   24          if (name != "*")  25          {  26              return Get("*", 2);  27          }  28   29          return null;  30      }  31   32      /// <summary>  33      /// 向方法集合中添加一个方法。  34      /// 如果指定方法名称不存在于集合中,则创建一个新的ConcurrentDictionary来存储该方法。  35      /// 根据方法的参数信息,特别是参数类型是否为Context以及是否为可选参数,默认值等信息,  36      /// 将方法添加到对应的ConcurrentDictionary中,键为参数的索引(不包括Context类型的参数)。  37      /// </summary>  38      public void Add(Method method)  39      {  40          if (!Methods.ContainsKey(method.Name))  41          {  42              Methods.TryAdd(method.Name, new ConcurrentDictionary<int, Method>());  43          }  44   45          ConcurrentDictionary<int, Method> concurrentDictionary = Methods[method.Name];  46          ParameterInfo[] parameters = method.Parameters;  47          int num = parameters.Length;  48          int num2 = 0;  49          for (int i = 0; i < num; i++)  50          {  51              ParameterInfo parameterInfo = parameters[i];  52              if (typeof(Context).IsAssignableFrom(parameterInfo.ParameterType))  53              {  54                  num2 = 1;  55              }  56              else if (parameterInfo.IsOptional && parameterInfo.HasDefaultValue)  57              {  58                  concurrentDictionary.AddOrUpdate(i - num2, method, (int key, Method value) => method);  59              }  60          }  61   62          concurrentDictionary.AddOrUpdate(num - num2, method, (int key, Method value) => method);  63      }  64   65      /// <summary>  66      /// 添加一个方法到集合中,使用指定的方法信息、名称和目标对象。  67      /// </summary>  68      /// <param name="methodInfo">方法的信息。</param>  69      /// <param name="name">方法的名称。</param>  70      /// <param name="target">方法的目标对象,默认为null。</param>  71      public void Add(MethodInfo methodInfo, string name, object target = null)  72      {  73          Add(new Method(methodInfo, name, target));  74      }  75          76      /// <summary>  77      /// 添加一个方法到集合中,使用给定的名称、目标对象和别名。  78      /// </summary>  79      /// <param name="name">方法的名称。</param>  80      /// <param name="target">包含方法的对象。</param>  81      /// <param name="alias">方法的别名,如果为空则使用方法名称。</param>  82      public void AddMethod(string name, object target, string alias = "")  83      {  84          MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public);  85          if (string.IsNullOrEmpty(alias))  86          {  87              alias = name;  88          }  89   90          MethodInfo[] array = methods;  91          foreach (MethodInfo methodInfo in array)  92          {  93              if (methodInfo.Name.Equals(name, StringComparison.OrdinalIgnoreCase))  94              {  95                  Add(methodInfo, alias, target);  96              }  97          }  98      }  99  100      /// <summary> 101      /// 向目标对象添加方法。 102      /// </summary> 103      /// <param name="names">方法名称数组。</param> 104      /// <param name="target">目标对象,方法将添加到该对象上。</param> 105      /// <param name="ns">可选的命名空间前缀,用于区分不同来源的方法。</param> 106      public void AddMethods(string[] names, object target, string ns = "") 107      { 108          foreach (string text in names) 109          { 110              if (string.IsNullOrEmpty(ns)) 111              { 112                  AddMethod(text, target, text); 113              } 114              else 115              { 116                  AddMethod(text, target, ns + "_" + text); 117              } 118          } 119      } 120  121      /// <summary> 122      /// 向目标对象添加实例方法。 123      /// </summary> 124      /// <param name="target">目标对象,其实例方法将被添加。</param> 125      /// <param name="ns">可选的命名空间前缀,用于区分方法名。</param> 126      public void AddInstanceMethods(object target, string ns = "") 127      { 128          MethodInfo[] methods = target.GetType().GetTypeInfo().GetMethods(BindingFlags.Instance | BindingFlags.Public); 129          foreach (MethodInfo methodInfo in methods) 130          { 131              if (Array.IndexOf(instanceMethodsOnObject, methodInfo.Name) == -1) 132              { 133                  string text = methodInfo.Name; 134                  if (!string.IsNullOrEmpty(ns)) 135                  { 136                      text = ns + "_" + text; 137                  } 138  139                  Add(methodInfo, text, target); 140              } 141          } 142      } 143  }

View Code

  • 通过反射执行方法
c#运用ZeroMq发布订阅和RPC函数代理的优点结合成一个新的实用的通讯

  1 using Communication.Nmq.Dto;   2 using NetMQ;   3 using NetMQ.Sockets;   4 using System;   5 using System.Collections.Generic;   6 using System.Linq;   7 using Communication.Utils;   8 using Newtonsoft.Json;   9   10 namespace Communication.Nmq  11 {  12     internal class Transceiver : IDisposable  13     {  14         private List<MqType> SubscribeTypes;  15         private List<MqType> UnSubscribleTypes;  16         private MethodManager FunListeners;  17         private string Owner;  18         private PublisherSocket ClientPub;  19         private SubscriberSocket ClientSub;  20         private NetMQPoller Poller;  21         private static readonly object SendLock = new();  22   23      24         internal Transceiver(MqType owner, List<MqType> subscribeType, List<MqType> unSubscribleType, MethodManager funListener)  25         {  26             SubscribeTypes = subscribeType;  27             UnSubscribleTypes = unSubscribleType;  28             FunListeners = funListener;  29             Owner = owner.ToString();  30             ClientPub = new PublisherSocket(">tcp://127.0.0.1:61225");  31             ClientSub = new SubscriberSocket(">tcp://127.0.0.1:52216");  32             Poller = new NetMQPoller { ClientSub };  33             SubTopic();  34             ClientSub.ReceiveReady += ClientSub_ReceiveReady;  35             Poller.RunAsync();  36         }  37   38         private void ClientSub_ReceiveReady(object sender, NetMQSocketEventArgs e)  39         {  40             try  41             {  42                 List<string> frames = new();  43                 if (!e.Socket.TryReceiveMultipartStrings(TimeSpan.FromSeconds(3), ref frames) || frames == null)  44                 {  45                     Log.Error($"NetMQ接收异常!frames {frames}", LoggerNames.MqStr);  46                     return;  47                 }  48                 if (frames.Count == 2)  49                 {  50                     string topic = frames[0];  51                     string msg = frames[1];   52                     if (Enum.TryParse(topic, out MqType topicType))  53                     {  54                         if (TryDeserializeProxyMessage(msg, out var controlRequest) && !string.IsNullOrWhiteSpace(controlRequest.Method))  55                         {  56                             if (FunListeners.Methods.TryGetValue(controlRequest.Method, out var methods))  57                             {  58                                 foreach (var methodInfo in methods.Select(m => m.Value))  59                                 {  60                                     try  61                                     {  62                                         var parameters = controlRequest.Parameters.Select((p, i) => SafeChangeType(p, methodInfo.Parameters[i].ParameterType)).ToArray();  63                                         methodInfo.MethodInfo?.Invoke(methodInfo.Target, parameters);  64                                     }  65                                     catch (Exception ex)  66                                     {  67                                         Log.Error($"Failed to convert parameter for method {controlRequest.Method}: {ex.Message}", LoggerNames.MqStr);  68                                         return;  69                                     }  70                                 }  71                             }  72                             else  73                             {  74                                 throw new InvalidOperationException("找不到对应的函数");  75                             }  76                         }  77                         else  78                         {  79                             throw new InvalidOperationException("无法转换格式");  80                         }  81                     }  82                     else  83                     {  84                         Log.Error($"NetMQ收到不正常数据,请检测!MqType:{topic}", LoggerNames.MqStr);  85                     }  86                 }  87                 else  88                 {  89                     Log.Error($"NetMQ收到不正常数据,请检测!frames 长度为{frames.Count}", LoggerNames.MqStr);  90                 }  91             }  92             catch (Exception ex)  93             {  94                 Log.Error($"NetMQ收到消息报错:{ex.ToString()}", LoggerNames.MqStr);  95             }  96         }  97   98         public object SafeChangeType(object value, Type targetType)  99         { 100             if (targetType.IsEnum && value is string strValue) 101             { 102                 return Enum.Parse(targetType, strValue); 103             } 104             return Convert.ChangeType(value, targetType); 105         } 106  107         private bool TryDeserializeProxyMessage(string json, out ProxyMessage message) 108         { 109             message = null; 110             try 111             { 112                 message = JsonConvert.DeserializeObject<ProxyMessage>(json); 113                 return message != null; 114             } 115             catch 116             { 117                 return false; 118             } 119         } 120  121         private void SubTopic() 122         { 123             if (SubscribeTypes?.Any() == true) 124             { 125                 foreach (var item in SubscribeTypes) 126                 { 127                     ClientSub.Subscribe(item.ToString()); 128                 } 129             } 130             if (UnSubscribleTypes?.Any() == true) 131             { 132                 foreach (var item in UnSubscribleTypes) 133                 { 134                     ClientSub.Unsubscribe(item.ToString()); 135                 } 136             } 137         } 138  139         internal void SendMessage(string msg) 140         { 141             try 142             { 143                 lock (SendLock) 144                 { 145                     var success = ClientPub.SendMoreFrame(Owner).TrySendFrame(TimeSpan.FromSeconds(3), msg);  146                 } 147             } 148             catch (Exception ex) 149             { 150                 Log.Error($"发送_消息 失败 {msg},:{ex}", LoggerNames.MqStr); 151             } 152         } 153  154         public void Dispose() 155         { 156             Poller.Stop(); 157             ClientPub?.Dispose(); 158             ClientSub?.Dispose(); 159             Poller.Dispose(); 160         } 161     } 162 }

View Code

 

发表评论

评论已关闭。

相关文章