Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

大家好,我是失业在家,正在找工作的博主Jerry,找工作之余,总结和整理以前的项目经验,动手写了个洋葱架构(整洁架构)示例解决方案 OnionArch。其目的是为了更好的实现基于DDD(领域驱动分析)和命令查询职责分离(CQRS)的洋葱架构。

OnionArch 是用来实现单个微服务的。它提供了Grpc接口和Dapr Side Car进行交互,通过Dapr来实现微服务之间的接口调用、事件发布订阅等微服务特性。但是,Dapr官方文档上只有Go语言的Grpc的微服务调用示例,没有事件发布和订阅示例,更没有基于Grpc通讯用.Net实现的事件订阅和发布示例。

一、实现目标

为了方便大家写代码,本文旨在介绍如何通过Dapr实现.Net Grpc服务之间的发布和订阅,并采用与WebApi类似的事件订阅方式。

如果是Dapr Side Car通过Web Api和微服务引用交互,在WebApi中实现事件订阅非常简单,只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通过Grpc和Grpc服务交互就不能这样写了。

为了保持WebApi和Grpc事件订阅代码的一致性,本文就是要在Grpc通讯的情况下实现如下写法来订阅并处理事件。

        [Topic("pubsub", "TestTopic")]         public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)         {             string message = "TestTopicEvent" + request.EventData.Name;             Console.WriteLine(message);             return Task.FromResult(new HelloReply             {                 Message = message             });         }

二、实现方案

Dapr实现.Net Grpc服务之间的发布和订阅,根据官方文档,需要重写AppCallback.AppCallbackBase Grpc类的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是给Dapr调用获取该微服务已订阅的事件,OnTopicEvent给Dapr调用以触发事件到达处理逻辑。但是这样就需要在AppCallback.AppCallbackBase实现类中硬编码已订阅的事件和事件处理逻辑。显然不符合我们的实现目标。

参考Dapr SDK中关于WebApi 订阅查询接口“http://localhost:<appPort>/dapr/subscribe”的实现代码,可以在AppCallback.AppCallbackBase实现类的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查询Topic Attribute的方式来搜索已订阅的事件。这样就不用在ListTopicSubscriptions中硬编码已订阅的事件了。

为了避免在OnTopicEvent方法中应编码事件处理逻辑,就需要在接收到事件触发后动态调用Grpc方法。理论上,只要有proto文件就可以动态调用Grpc方法,而proto文件本来就在项目中。但是,我没找到.Net动态调用Grpc方法的相关资料,不知道大家有没有?

我这里采用了另一种方式,根据我上一篇关于.Net 7.0 RC gRPC JSON 转码为 Swagger/OpenAPI文档。Grpc方法可以增加一个转码为Json的WebApi调用。这样就可以在OnTopicEvent方法中接收到事件触发后,通过HttpClient post到对应的WebApi地址,曲线实现动态调用Grpc方法。是不是有点脱裤子放屁的感觉?

三、代码实现

我的解决方案如下,GrpcServiceA发布事件,GrpcServiceB接收事件并处理。

Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

实现事件发布

GrpcServiceA发布事件比较简单,和WebApi的方式是一样一样的。

 public async override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)         {             //await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);             EventData eventData = new EventData() { Id = 6, Name = request.Name, Description = "Looking for a job" };             await _daprClient.PublishEventAsync<EventData>("pubsub", "TestTopic", eventData);             return new HelloReply             {                 Message = "Hello" + request.Name             };         }

_daprClient怎么来的?我参考Dapr .Net SDK的代码,给IGrpcServerBuilder 增加了扩展方法:
Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

 public static IGrpcServerBuilder AddDapr(this IGrpcServerBuilder builder, Action<DaprClientBuilder> configureClient = null)         {             if (builder is null)             {                 throw new ArgumentNullException(nameof(builder));             }              // This pattern prevents registering services multiple times in the case AddDapr is called             // by non-user-code.             if (builder.Services.Any(s => s.ImplementationType == typeof(DaprMvcMarkerService)))             {                 return builder;             }              builder.Services.AddDaprClient(configureClient);              builder.Services.AddSingleton<DaprMvcMarkerService>();              return builder;         }           private class DaprMvcMarkerService         {         }

View Code

然后就可以这样把DaprClient依赖注入到服务中。

builder.Services.AddGrpc().AddJsonTranscoding().AddDapr();

实现事件订阅

根据上述实现方案,GrpcServiceB接收事件并处理有点复杂,参考我Grpc接口转码Json的的内容,在要接收事件的Grpc方法上增加转码Json WebApi 配置。

 rpc TestTopicEvent (TestTopicEventRequest) returns (HelloReply){     option (google.api.http) = {       post: "/v1/greeter/testtopicevent",       body: "eventData"     };   }

增加google.api.http选项,,可以通过post eventData 数据到地址“/v1/greeter/testtopicevent”调用该Grpc方法。然后实现该Grpc接口。

 [Topic("pubsub", "TestTopic")]         public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)         {             string message = "TestTopicEvent" + request.EventData.Name;             Console.WriteLine(message);             return Task.FromResult(new HelloReply             {                 Message = message             });         }

我重用了Dapr .Net SDK 的Topic Attribute来标记该Grpc的实现接口,这样就可以搜索所有带Topic Attribute的Grpc方法来获取已经订阅的事件。

接下来才是重头戏,重写AppCallback.AppCallbackBase Grpc接口类的ListTopicSubscriptions方法和OnTopicEvent方法

 public async override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context)         {             var result = new ListTopicSubscriptionsResponse();              var subcriptions = _endpointDataSource.GetDaprSubscriptions(_loggerFactory);             foreach (var subscription in subcriptions)             {                 TopicSubscription subscr = new TopicSubscription()                 {                     PubsubName = subscription.PubsubName,                     Topic = subscription.Topic,                     Routes = new TopicRoutes()                 };                 subscr.Routes.Default = subscription.Route;                 result.Subscriptions.Add(subscr);             }             return result;         }

该方法返回所有已订阅的事件和对应的WebApi Url,将事件对应的WebApi地址放入subscr.Routes.Default中。

其中_endpointDataSource.GetDaprSubscriptions 方法参考了Dapr .Net SDK的实现。

public static List<Subscription> GetDaprSubscriptions(this EndpointDataSource dataSource, ILoggerFactory loggerFactory, SubscribeOptions options = null)         {             var logger = loggerFactory.CreateLogger("DaprTopicSubscription");             var subscriptions = dataSource.Endpoints                 .OfType<RouteEndpoint>()                 .Where(e => e.Metadata.GetOrderedMetadata<ITopicMetadata>().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name.                 .SelectMany(e =>                 {                     var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>();                     var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();                      var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();                      for (int i = 0; i < topicMetadata.Count(); i++)                     {                         subs.Add((topicMetadata[i].PubsubName,                             topicMetadata[i].Name,                             (topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,                             (topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,                             topicMetadata[i].Match,                             topicMetadata[i].Priority,                             originalTopicMetadata.Where(m => (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.OwnedMetadatas?.Any(o => o.Equals(m.Id)) == true || string.IsNullOrEmpty(m.Id))                                                  .GroupBy(c => c.Name)                                                  .ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),                             (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,                             e.RoutePattern));                     }                      return subs;                 })                 .Distinct()                 .GroupBy(e => new { e.PubsubName, e.Name })                 .Select(e => e.OrderBy(e => e.Priority))                 .Select(e =>                 {                     var first = e.First();                     var rawPayload = e.Any(e => e.EnableRawPayload.GetValueOrDefault());                     var metadataSeparator = e.FirstOrDefault(e => !string.IsNullOrEmpty(e.MetadataSeparator)).MetadataSeparator ?? ",";                     var rules = e.Where(e => !string.IsNullOrEmpty(e.Match)).ToList();                     var defaultRoutes = e.Where(e => string.IsNullOrEmpty(e.Match)).Select(e => RoutePatternToString(e.RoutePattern)).ToList();                     //var defaultRoute = defaultRoutes.FirstOrDefault();                     var defaultRoute = defaultRoutes.LastOrDefault();                      //multiple identical names. use comma separation.                     var metadata = new Metadata(e.SelectMany(c => c.OriginalTopicMetadata).GroupBy(c => c.Key).ToDictionary(c => c.Key, c => string.Join(metadataSeparator, c.SelectMany(c => c.Value).Distinct())));                     if (rawPayload || options?.EnableRawPayload is true)                     {                         metadata.Add(Metadata.RawPayload, "true");                     }                      if (logger != null)                     {                         if (defaultRoutes.Count > 1)                         {                             logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName);                         }                          var duplicatePriorities = rules.GroupBy(e => e.Priority)                           .Where(g => g.Count() > 1)                           .ToDictionary(x => x.Key, y => y.Count());                          foreach (var entry in duplicatePriorities)                         {                             logger.LogError("A subscription to topic {name} on pubsub {pubsub} has duplicate priorities for {priority}: found {count} occurrences.", first.Name, first.PubsubName, entry.Key, entry.Value);                         }                     }                      var subscription = new Subscription()                     {                         Topic = first.Name,                         PubsubName = first.PubsubName,                         Metadata = metadata.Count > 0 ? metadata : null,                     };                      if (first.DeadLetterTopic != null)                     {                         subscription.DeadLetterTopic = first.DeadLetterTopic;                     }                      // Use the V2 routing rules structure                     if (rules.Count > 0)                     {                         subscription.Routes = new Routes                         {                             Rules = rules.Select(e => new Rule                             {                                 Match = e.Match,                                 Path = RoutePatternToString(e.RoutePattern),                             }).ToList(),                             Default = defaultRoute,                         };                     }                     // Use the V1 structure for backward compatibility.                     else                     {                         subscription.Route = defaultRoute;                     }                      return subscription;                 })                 .OrderBy(e => (e.PubsubName, e.Topic));              return subscriptions.ToList();         }          private static string RoutePatternToString(RoutePattern routePattern)         {             return string.Join("/", routePattern.PathSegments                                     .Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>()                                     .Select(part => part.Content))));         }

注意标红的哪一行是我唯一改动的地方,因为Grpc接口增加了Web Api配置后会返回两个Route,一个是原始Grpc的,一个是WebApi的,我们需要后面那个。

接着重写OnTopicEvent方法

public async override Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)         {             TopicEventResponse topicResponse = new TopicEventResponse();             string payloadString = request.Data.ToStringUtf8();             Console.WriteLine("OnTopicEvent Data:" + payloadString);                          HttpContent postContent = new StringContent(payloadString, new MediaTypeWithQualityHeaderValue("application/json"));             var response = await _httpClient4TopicEvent.PostAsync("http://" + context.Host + "/" + request.Path, postContent);             string responseContent = await response.Content.ReadAsStringAsync();             Console.WriteLine(responseContent);             if (response.IsSuccessStatusCode)             {                 Console.WriteLine("OnTopicEvent Invoke Success.");                 topicResponse.Status = TopicEventResponseStatus.Success;             }             else             {                 Console.WriteLine("OnTopicEvent Invoke Error.");                 topicResponse.Status = TopicEventResponseStatus.Drop;             }             return topicResponse;         }

这里简单处理了事件触发的返回参数TopicEventResponse ,未处理重试的情况。request.path是在ListTopicSubscriptions方法中返回给Dapr的事件对应的WebApi调用地址。

参数_httpClient4TopicEvent是这样注入的:

builder.Services.AddHttpClient("HttpClient4TopicEvent", httpClient => {     httpClient.DefaultRequestVersion = HttpVersion.Version20;     httpClient.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrHigher;     httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); } );

因为Grpc是基于Http2.0及以上版本的,所以需要修改HtttpClient默认配置,不然无法访基于Http2.0的WebApi。

然后将我们的AppCallback.AppCallbackBase实现类DaprAppCallbackService Map到GrpcService即可。

app.MapGrpcService<DaprAppCallbackService>();

四、实现效果

分别通过Dapr运行ServiceA和ServiceB微服务,注意指定--app-protocol协议为Grpc,我这里还使用了.Net 热重载技术

dapr run --app-protocol grpc --app-id serviceA --app-port 5002 --dapr-grpc-port 50002 -- dotnet watch run --launch-profile https  dapr run --app-protocol grpc --app-id serviceB --app-port 5003 --dapr-grpc-port 50003 -- dotnet watch run --launch-profile https

在ServiceA中发布事件

Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

 

在ServiceB中查看已订阅的事件和接收到的事件触发

Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

 

五、找工作

▪ 博主有15年以上的软件技术实施经验(Technical Leader),专注于微服务(Dapr)和云原生(K8s)软件架构设计、专注于 .Net CoreJava开发和Devops构建发布。
▪ 博主10年以上的软件交付管理经验(Project Manager & Product Ower),致力于敏捷(Scrum)项目管理、软件产品业务需求分析和原型设计。
▪ 博主熟练配置和使用 Microsoft Azure云。
▪ 博主为人诚恳,积极乐观,工作认真负责。 

我家在广州,也可以去深圳工作。做架构师、产品经理、项目经理都可以。有工作机会推荐的朋友可以加我微信 15920128707,微信名字叫Jerry。

本文源代码在这里:iamxiaozhuang/TestDaprGrpcSubscripber (github.com) 大家可以随便取用。

发表评论

评论已关闭。

相关文章