开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

文档说明

作者:痴者工良

文档地址:https://mmq.whuanle.cn

仓库地址:https://github.com/whuanle/Maomi.MQ

作者博客:

导读

Maomi.MQ 是一个消息通讯模型项目,目前只支持了 RabbitMQ。

Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。

此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。

快速开始

本文将快速介绍 Maomi.MQ.RabbitMQ 的使用方法。

引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:

builder.Services.AddSwaggerGen(); builder.Services.AddLogging();  builder.Services.AddMaomiMQ((MqOptionsBuilder options) => { 	options.WorkId = 1; 	options.AppName = "myapp"; 	options.Rabbit = (ConnectionFactory options) => 	{ 		options.HostName = "192.168.3.248"; 		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 	}; }, [typeof(Program).Assembly]);  var app = builder.Build(); 
  • WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
  • AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
  • Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包。

var host = new HostBuilder() 	.ConfigureLogging(options => 	{ 		options.AddConsole(); 		options.AddDebug(); 	}) 	.ConfigureServices(services => 	{ 		services.AddMaomiMQ(options => 		{ 			options.WorkId = 1; 			options.AppName = "myapp"; 			options.Rabbit = (ConnectionFactory options) => 			{ 				options.HostName = "192.168.3.248"; 				options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 			}; 		}, new System.Reflection.Assembly[] { typeof(Program).Assembly }); 		 		// Your services. 		services.AddHostedService<MyPublishAsync>(); 	}).Build();  await host.RunAsync(); 

定义消息模型类,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。

public class TestEvent {     public int Id { get; set; }      public override string ToString()     {         return Id.ToString();     } } 

定义消费者,消费者需要实现 IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消费者属性。

[Consumer("test", Qos = 1, RetryFaildRequeue = true)] public class MyConsumer : IConsumer<TestEvent> {     private static int _retryCount = 0;      // 消费     public async Task ExecuteAsync(EventBody<TestEvent> message)     {         Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");         await Task.CompletedTask;     }          // 每次消费失败时执行     public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;          // 补偿     public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true); } 

然后注入 IMessagePublisher 服务发布消息:

[ApiController] [Route("[controller]")] public class IndexController : ControllerBase {     private readonly IMessagePublisher _messagePublisher;      public IndexController(IMessagePublisher messagePublisher)     {         _messagePublisher = messagePublisher;     }      [HttpGet("publish")]     public async Task<string> Publisher()     {         // 发布消息         await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent 		{         	Id = i         });         return "ok";     } } 

消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中。

通过注入 IMessagePublisher 接口即可向 RabbitMQ 推送消息,示例项目请参考 PublisherWeb

定义一个事件模型类:

public class TestEvent { 	public int Id { get; set; }  	public override string ToString() 	{ 		return Id.ToString(); 	} } 

注入 IMessagePublisher 服务后发布消息:

[ApiController] [Route("[controller]")] public class IndexController : ControllerBase { 	private readonly IMessagePublisher _messagePublisher;  	public IndexController(IMessagePublisher messagePublisher) 	{ 		_messagePublisher = messagePublisher; 	}  	[HttpGet("publish")] 	public async Task<string> Publisher() 	{ 		for (var i = 0; i < 100; i++) 		{ 			await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent 			{ 				Id = i 			}); 		}  		return "ok"; 	} } 

IMessagePublisher

IMessagePublisher 定义比较简单,只有三个方法和一个属性:

public ConnectionPool ConnectionPool { get; }  Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null) where TEvent : class;  Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);  //  不建议直接使用该接口。 Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties); 

三个 PublishAsync 方法用于发布事件,ConnectionPool 属性用于获取 RabbitMQ.Client.IConnection 对象。

由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以 Maomi.MQ.RabbitMQ 没必要过度设计,只提供了简单的功能接口。

例如,可以通过 BasicProperties 配置单条消息的过期时间:

await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent { 	Id = i }, (BasicProperties p) => { 	p.Expiration = "1000"; }); 

当发布一条消息时,实际上框架传递的是 EventBody<T> 类型,EventBody<T> 中包含了一些重要的附加消息属性,这些属性会给消息处理和故障诊断带来很大的方便。

public class EventBody<TEvent> { 	// 事件唯一 id. 	public long Id { get; init; }  	// Queue. 	public string Queue { get; init; } = null!;  	// App name. 	public string Publisher { get; init; } = null!;  	// 事件创建时间. 	public DateTimeOffset CreationTime { get; init; }  	// 事件体. 	public TEvent Body { get; init; } = default!; } 

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Singleton:

services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>(); 

生命周期不重要,如果需要修改默认的生命周期,可以手动修改替换。

services.AddScoped<IMessagePublisher, DefaultMessagePublisher>(); 

开发者也可以自行实现 IMessagePublisher 接口,具体示例请参考 DefaultMessagePublisher 类型。

连接池

为了复用 RabbitMQ.Client.IConnection ,Maomi.MQ.RabbitMQ 内部实现了 ConnectionPool 类型,通过对象池维护复用的 RabbitMQ.Client.IConnection 对象。

默认对象池中的 RabbitMQ.Client.IConnection 数量为 0,只有当连接被真正使用时才会从对象池委托中创建,连接对象会随着程序并发量而自动增加,但是,默认最大连接对象数量为 Environment.ProcessorCount * 2

除了 IMessagePublisher 接口提供的 PublishAsync 方法可以发布事件,开发者还可以从 ConnectionPool 获取连接对象,请务必在使用完毕后通过 ConnectionPool.Return() 方法将其归还到连接对象池。

通过连接池直接使用 IConnection 对象发布消息:

[HttpGet("publish")] public async Task<string> Publisher() { 	for (var i = 0; i < 100; i++) 	{ 		var connectionPool = _messagePublisher.ConnectionPool; 		var connection = connectionPool.Get();  		try 		{ 			connection.Channel.BasicPublishAsync( 			exchange: string.Empty, 			routingKey: "queue", 			basicProperties: properties, 			body: _jsonSerializer.Serializer(message), 			mandatory: true); 		} 		finally 		{ 			connectionPool.Return(connection); 		} 	}  	return "ok"; } 

你也可以绕开 IMessagePublisher ,直接注入 ConnectionPool 使用 RabbitMQ 连接对象,但是不建议这样使用。

private readonly ConnectionPool _connectionPool;  public DefaultMessagePublisher(ConnectionPool connectionPool) { 	_connectionPool = connectionPool; }  public async Task MyPublshAsync() { 	var connection = _connectionPool.Get(); 	try 	{ 		await connection.Channel.BasicPublishAsync(...); 	} 	finally 	{ 		_connectionPool.Return(connection); 	} } 

为了更加简便地管理连接对象,可以使用 CreateAutoReturn() 函数创建连接管理对象,该对象被释放时会自动将 IConnection 返还给连接池。

using var poolObject = _messagePublisher.ConnectionPool.CreateAutoReturn(); poolObject.Channel.BasicPublishAsync( 	exchange: string.Empty, 	routingKey: "queue", 	basicProperties: properties, 	body: _jsonSerializer.Serializer(message), 	mandatory: true); 

如果你自行使用 ConnectionPool 推送消息到 RabbitMQ,请务必通过序列化 EventBody<TEvent> 事件对象,这样 Maomi.MQ.RabbitMQ 消费者才能正常工作。同时,Moami.MQ 对可观测性做了支持,如果自行使用 ConnectionPool 获取连接对象推送消息,可能会导致可观测性信息缺失。

正常情况下,RabbitMQ.Client 中包含了可观测性的功能,但是 Maomi.MQ.RabbitMQ 附加的可观测性信息有助于诊断故障问题。

请注意:

  • Maomi.MQ.RqbbitMQ 通过 EventBody<TEvent> 泛型对象发布和接收事件。

  • DefaultMessagePublisher 包含了链路追踪等可观测性代码。

消息过期

IMessagePublisher 对外开放了 BasicProperties 或 BasicProperties,可以自由配置消息属性。

例如为消息配置过期时间:

[HttpGet("publish")] public async Task<string> Publisher() { 	for (var i = 0; i < 1; i++) 	{ 		await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent 		{ 			Id = i 		}, properties => 		{ 			properties.Expiration = "6000"; 		}); 	}  	return "ok"; } 

如果此时为 test 绑定死信队列,那么该消息长时间没有被消费时,会被移动到另一个队列,请参考 死信队列

还可以通过配置消息属性实现更多的功能,请参考 IBasicProperties

事务

RabbitMQ 支持事务,不过据 RabbitMQ 官方文档显示,事务会使吞吐量减少 250 倍。

RabbitMQ 事务使用上比较简单,可以保证发布的消息已经被推送到 RabbitMQ 服务器,只有当提交事务时,提交的消息才会被 RabbitMQ 存储并推送给消费者。

使用示例:

[HttpGet("publish_tran")] public async Task<string> Publisher_Tran() { 	using var tranPublisher = await _messagePublisher.TxSelectAsync();  	try 	{ 		await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent 		{ 			Id = 666 		}); 		await tranPublisher.TxCommitAsync(); 	} 	catch 	{ 		await tranPublisher.TxRollbackAsync(); 		throw; 	}  	return "ok"; } 

或者手动开启事务:

[HttpGet("publish_tran")] public async Task<string> Publisher_Tran() { 	using var tranPublisher = _messagePublisher.CreateTransaction();  	try 	{ 		await tranPublisher.TxSelectAsync(); 		await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent 		{ 			Id = 666 		}); 		await tranPublisher.TxCommitAsync(); 	} 	catch 	{ 		await tranPublisher.TxRollbackAsync(); 		throw; 	}  	return "ok"; } 

注意,在该种模式之下,创建 TransactionPublisher 对象时,会从对象池中取出一个连接对象,因为开启事务模式可能会污染当前连接通道,因此 TransactionPublisher 不会向连接池归还连接对象,而是直接释放

发送方确认模式

虽然事务模式可以保证消息会被推送到 RabbitMQ 服务器中,但是由于事务模式会导致吞吐量降低 250 倍,因此不是一个好的选择。为了解决这个问题, RabbitMQ 引入了一种确认机制,这种机制就像滑动窗口,能够保证消息推送到服务器中,并且具备高性能的特性。

请参考 https://www.rabbitmq.com/docs/confirms

使用示例:

[HttpGet("publish_confirm")] public async Task<string> Publisher_Confirm() { 	using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();  	for (var i = 0; i < 5; i++) 	{ 		await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent 		{ 			Id = 666 		});  		var result = await confirmPublisher.WaitForConfirmsAsync();  		// 如果在超时内没有接收到 nacks,则为 True,否则为 false。 		Console.WriteLine($"发布 {i},{result}"); 	}  	return "ok"; } 

WaitForConfirmsAsync 方法会返回一个值,如果正常被服务器确认了消息已经传达,则结果为 true,如果超时没有被服务器确认,则返回 false。

此外,还有一个 WaitForConfirmsOrDieAsync 方法,它会一直等待该频道上的所有已发布消息都得到确认,使用示例:

using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();  for (var i = 0; i < 5; i++) { 	await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent 	{ 		Id = 666 	});  	Console.WriteLine($"发布 {i}"); }  await confirmPublisher.WaitForConfirmsOrDieAsync(); 

注意,在该种模式之下,创建 ConfirmPublisher 对象时,会从对象池中取出一个连接对象,因为开启事务模式可能会污染当前连接通道,因此 ConfirmPublisher 不会向连接池归还连接对象,而是直接释放

注意,同一个通道不能同时使用事务和发送方确认模式。

独占模式

默认情况下,每次使用 IMessagePublisher.PublishAsync() 发布消息时,都会从连接池中取出连接对象,然后使用该连接通道发布消息,发布完毕后就会归还连接对象给连接池。

如果需要在短时间内大批量发布消息,则需要每次都要重复获取和返还连接对象。

使用独占模式时可以在一段时间内独占一个连接对象,超出作用域后,连接对象会自动放回连接池。这种模式对于需要大量发布消息的场景提高吞吐量非常有帮助。为了能够将连接通道归还连接池,请务必使用 using 关键字修饰变量,或者手动调用 Dispose 函数。

使用示例:

// 创建独占模式 using var singlePublisher = _messagePublisher.CreateSingle();  for (var i = 0; i < 500; i++) { 	await singlePublisher.PublishAsync(queue: "publish_single", message: new TestEvent 	{ 		Id = 666 	}); } 

消费者

Maomi.MQ.RabbitMQ 中,有两种消费模式,一种是消费者模式,一种是事件模式(事件总线模式)。

下面简单了解这两种模式的使用方法。

消费者模式

消费者服务需要实现 IConsumer<TEvent> 接口,并且配置 [Consumer("queue")] 特性绑定队列名称,通过消费者对象来控制消费行为。

消费者模式有具有失败通知和补偿能力,使用上也比较简单。

public class TestEvent {     public int Id { get; set; } }  [Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)] public class MyConsumer : IConsumer<TestEvent> {     private static int _retryCount = 0;      // 消费或重试     public async Task ExecuteAsync(EventBody<TestEvent> message)     {         _retryCount++;         Console.WriteLine($"执行次数:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");         await Task.CompletedTask;     }          // 失败     public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;          // 补偿     public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true); } 

事件模式

事件模式是通过事件总线的方式实现的,以事件模型为中心,通过事件来控制消费行为。

[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)] public class TestEvent { 	public string Message { get; set; } } 

然后使用 [EventOrder] 特性编排事件执行顺序。

// 编排事件消费顺序 [EventOrder(0)] public class My1EventEventHandler : IEventHandler<TestEvent> { 	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 	}  	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 		Console.WriteLine($"{@event.Id},事件 1 已被执行"); 	} }  [EventOrder(1)] public class My2EventEventHandler : IEventHandler<TestEvent> { 	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 	}  	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 		Console.WriteLine($"{@event.Id},事件 2 已被执行"); 	} } 

当然,事件模式也可以通过创建中间件增加补偿功能,通过中间件还可以将所有排序事件放到同一个事务中,一起成功或失败,避免事件执行时出现程序退出导致的一致性问题。

public class TestEventMiddleware : IEventMiddleware<TestEvent> {     private readonly BloggingContext _bloggingContext;      public TestEventMiddleware(BloggingContext bloggingContext)     {         _bloggingContext = bloggingContext;     }      public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)     {         using (var transaction = _bloggingContext.Database.BeginTransaction())         {             await next(@event, CancellationToken.None);             await transaction.CommitAsync();         }     }      public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)     {         return Task.CompletedTask;     }      public Task<bool> FallbackAsync(EventBody<TestEvent>? message)     {         return Task.FromResult(true);     } } 

分组

消费者模式和事件模式都可以设置分组,在特性上设置了 Group 属性,具有同一个分组的事件会被放到一个连接通道(RabbitMQ.Client.IConnection)中,对于消费频率不高的事件,复用连接通道可以有效较低资源消耗。

消费者模式分组示例:

[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")] public class Group_1_Consumer : IConsumer<GroupEvent> {     public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;      public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;      public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true); }  [Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")] public class Group_2_Consumer : IConsumer<GroupEvent> {     public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;      public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;      public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true); } 

事件总线模式分组示例:

[EventTopic("web1", Qos = 1, RetryFaildRequeue = true, Group = "group")] public class Test1Event { 	public string Message { get; set; } } [EventTopic("web2", Qos = 1, RetryFaildRequeue = true, Group = "group")] public class Test2Event { 	public string Message { get; set; } }  

消费者模式

消费者模式要求服务实现 IConsumer<TEvent> 接口,并添加 [Connsumer] 特性。

IConsumer<TEvent> 接口比较简单,其定义如下:

public interface IConsumer<TEvent>     where TEvent : class {     // 消息处理.     public Task ExecuteAsync(EventBody<TEvent> message);      // ExecuteAsync 异常后立即执行此代码.     public Task FaildAsync(Exception ex, int retryCount, EventBody<TEvent>? message);      // 最后一次重试失败时执行,用于补偿.     public Task<bool> FallbackAsync(EventBody<TEvent>? message); } 

使用消费者模式时,需要先定义一个模型类,用于发布者和消费者之间传递消息,事件模型类只要是类即可,能够正常序列化和反序列化,没有其它要求。

public class TestEvent { 	public int Id { get; set; }  	public override string ToString() 	{ 		return Id.ToString(); 	} } 

然后继承 IConsumer<TEvent> 接口实现消费者功能:

[Consumer("web1", Qos = 1)] public class MyConsumer : IConsumer<TestEvent> { 	// 消费 	public async Task ExecuteAsync(EventBody<TestEvent> message) 	{ 		Console.WriteLine(message.Body.Id); 	}  	// 每次失败时被执行 	public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) 	{ 		Console.WriteLine($"重试 {message.Body.Id},次数 {retryCount}"); 		await Task.CompletedTask; 	}  	// 最后一次失败时执行 	public async Task<bool> FallbackAsync(EventBody<TestEvent>? message) 	{ 		Console.WriteLine($"最后一次 {message.Body.Id}");         // 如果返回 true,说明补偿成功。 		return true; 	} } 

特性配置的说明请参考 消费者配置

消费、重试和补偿

消费者收到服务器推送的消息时,ExecuteAsync 方法会被自动执行。当 ExecuteAsync 执行异常时,FaildAsync 方法会马上被触发,开发者可以利用 FaildAsync 记录相关日志信息。

// 每次失败时被执行 public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) { 	// 当 retryCount == -1 时,错误并非是 ExecuteAsync 方法导致的 	if (retryCount == -1) 	{ 		_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);  		// 可以在此处添加告警通知代码 		await Task.Delay(1000); 	} 	else 	{ 		_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount); 	} } 

如果 FaildAsync 方法也出现异常时,不会影响整体流程,框架会等待到达间隔时间后继续重试 ExecuteAsync 方法。

建议 FaildAsync 使用 try{}cathc{} 套住代码,不要对外抛出异常,FaildAsync 的逻辑不要包含太多逻辑,并且 FaildAsync 只应记录日志或进行告警使用。

FaildAsync 被执行有一个额外情况,就是在消费消息之前就已经发生错误,例如一个事件模型类有构造函数导致不能被反序列化,这个时候 FaildAsync 会被立即执行,且 retryCount = -1

ExecuteAsync 方法执行异常时,框架会自动重试,默认会重试五次,如果五次都失败,则会执行 FallbackAsync 方法进行补偿。

重试间隔时间会逐渐增大,请参考 重试

当重试五次之后,就会立即启动补偿机制。

// 最后一次失败时执行 public async Task<bool> FallbackAsync(EventBody<TestEvent>? message) { 	return true; } 

FallbackAsync 方法需要返回 bool,如果返回 true ,表示虽然 ExecuteAsync 出现异常,但是 FallbackAsync 补偿后已经正常,该消息会被正常消费掉。如果返回 false,则说补偿失败,该消息按照消费失败处理。

只有 ExecuteAsync 异常时,才会触发 FaildAsyncFallbackAsync ,如果是在处理消息之前的异常,会直接失败。

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

消费失败

ExecuteAsync 失败次数达到阈值时,并且 FallbackAsync 返回 false,则该条消息消费失败,或者由于序列化等错误时直接失败。

[Consumer] 特性中有三个很重要的配置:

public class ConsumerAttribute : Attribute {     // 消费失败次数达到条件时,是否放回队列.     public bool RetryFaildRequeue { get; set; }      // 现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的.     public bool ExecptionRequeue { get; set; }  = true;          // 绑定死信队列.     public string? DeadQueue { get; set; } } 

ExecuteAsync 失败次数达到阈值时,并且 FallbackAsync 返回 false,则该条消息消费失败。

如果 RetryFaildRequeue == false,那么该条消息会被 RabbitMQ 丢弃。

如果绑定了死信队列,则会先推送到死信队列,接着再丢弃。

如果 RetryFaildRequeue == true,那么该条消息会被返回 RabbbitMQ 队列中,等待下一次消费。

由于消息失败后会被放回队列,因此绑定的死信队列不会收到该消息。

当序列化异常或者其它问题导致错误而不能进入 ExecuteAsync 方法时,FaildAsync 方法会首先被触发一次,此时 retryCount 参数值为 -1

出现此种问题时,一般是开发者 bug 导致的,不会进行补偿等操作,开发者可以在 FaildAsync 中处理该事件,记录相关日志信息。

// 每次失败时被执行,或者出现无法进入 ExecuteAsync 的异常 public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) { 	// 当 retryCount == -1 时,错误并非是 ExecuteAsync 方法导致的 	if (retryCount == -1) 	{ 		_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);  		// 可以在此处添加告警通知代码 		await Task.Delay(1000); 	} 	else 	{ 		_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount); 	} } 

由于这种情况不妥善处理,会导致消息丢失,因此框架默认将 ExecptionRequeue 设置为 true,也就是说出现这种异常时,消息会被放回队列。如果问题一致没有得到解决,则会出现循环:调用 FaildAsync 、放回队列、调用 FaildAsync 、放回队列... ...

所以应该在 FaildAsync 中添加代码通知开发者相关信息,并且设置间隔时间,避免重试太频繁。

自动创建队列

框架默认会自动创建队列,如果需要关闭自动创建功能,把 AutoQueueDeclare 设置为 false 即可。

builder.Services.AddMaomiMQ((MqOptionsBuilder options) => { 	options.WorkId = 1; 	options.AppName = "myapp"; 	options.AutoQueueDeclare = false; 	options.Rabbit = (ConnectionFactory options) => 	{ 		options.HostName = "192.168.3.248"; 		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 	}; }, [typeof(Program).Assembly]); 

当然还可以单独为消费者配置是否自动创建队列:

[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)] 

默认情况下,关闭了全局自动创建,则不会自动创建队列。

如果关闭全局自动创建,但是消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable,则还是会自动创建队列。

如果消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable ,则会忽略全局配置,不会创建队列。

Qos

让程序需要严格根据顺序消费时,可以使用 Qos = 1,框架会严格保证逐条消费,如果程序不需要顺序消费,希望可以快速处理所有消息,则可以将 Qos 设置大一些。由于 Qos 和重试、补偿机制组合使用会有多种情况,因此请参考 重试

Qos 是通过特性来配置的:

[Consumer("ConsumerWeb", Qos = 1)] 

可以通过调高 Qos 值,让程序在可以并发消息,提高并发量。

延迟队列

延迟队列有两种,一种设置消息过期时间,一种是设置队列过期时间。

设置消息过期时间,那么该消息在一定时间没有被消费时,会被丢弃或移动到死信队列中,该配置只对单个消息有效,请参考 消息过期

队列设置过期后,当消息在一定时间内没有被消费时,会被丢弃或移动到死信队列中,该配置只对所有消息有效。基于这一点,我们可以实现延迟队列。

首先创建消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。

[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")] public class EmptyDeadConsumer : EmptyConsumer<DeadEvent> { }  // ConsumerWeb_dead 消费失败的消息会被此消费者消费。 [Consumer("ConsumerWeb_dead_queue_2", Qos = 1)] public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent> {     // 消费     public Task ExecuteAsync(EventBody<DeadQueueEvent> message)     {         Console.WriteLine($"死信队列,事件 id:{message.Id}");         return Task.CompletedTask;     }      // 每次失败时被执行     public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;      // 最后一次失败时执行     public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false); } 

空消费者

当识别到空消费者时,框架只会创建队列,而不会启动消费者消费消息。

可以结合延迟队列一起使用,该队列不会有任何消费者,当该队列的消息过期时,都由死信队列直接消费,示例如下:

[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")] public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }  [Consumer("ConsumerWeb_empty_dead", Qos = 10)] public class MyDeadConsumer : IConsumer<TestEvent> { 	public Task ExecuteAsync(EventBody<TestEvent> message) => Task.CompletedTask;  	public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;  	public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true); } 

对于跨进程的队列,A 服务不消费只发布,B 服务负责消费,A 服务中可以加一个空消费者,保证 A 服务启动时该队列一定存在,另一方面,消费者服务不应该关注队列的定义,也不太应该创建队列。

分组

通过配置 Group 属性将多个消费者放到同一个连接通道中执行,对于那些并发量不高的队列,复用连接通道可以降低资源消耗。

示例:

[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")] public class Group_1_Consumer : IConsumer<GroupEvent> { }  [Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")] public class Group_2_Consumer : IConsumer<GroupEvent> { } 

事件总线模式

Maomi.MQ 内部设计了一个事件总线,可以帮助开发者实现事件编排、实现本地事务、正向执行和补偿。

首先定义一个事件类型,该事件绑定一个 topic 或队列,事件需要使用 [EventTopic] 标识,并设置该事件对于的队列名称。

[EventTopic] 特性拥有与 [Consumer] 相同的特性,可参考 [Consumer] 的使用配置事件,请参考 消费者配置

[EventTopic("EventWeb")] public class TestEvent { 	public string Message { get; set; }  	public override string ToString() 	{ 		return Message; 	} } 

然后编排事件执行器,每个执行器都需要继承 IEventHandler<T> 接口,然后使用 [EventOrder] 特性标记执行顺序。

[EventOrder(0)] public class My1EventEventHandler : IEventHandler<TestEvent> { 	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 	}  	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 		Console.WriteLine($"{@event.Id},事件 1 已被执行"); 	} }  [EventOrder(1)] public class My2EventEventHandler : IEventHandler<TestEvent> { 	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 	}  	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken) 	{ 		Console.WriteLine($"{@event.Id},事件 2 已被执行"); 	} } 

每个事件执行器都必须实现 IEventHandler<T> 接口,并且设置 [EventOrder] 特性以便确认事件的执行顺序,框架会按顺序执行 IEventHandler<T>ExecuteAsync 方法,当 ExecuteAsync 出现异常时,则反向按顺序调用 CancelAsync

由于程序可能随时挂掉,因此通过 CancelAsync 实现补偿是不太可能的,CancelAsync 主要作为记录相关信息而使用。

中间件

中间件的作用是便于开发者拦截事件、记录信息、实现本地事务等,如果开发者不配置,则框架会自动创建 DefaultEventMiddleware<TEvent> 类型作为该事件的中间件服务。

自定义事件中间件示例代码:

public class TestEventMiddleware : IEventMiddleware<TestEvent> { 	public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next) 	{ 		await next(@event, CancellationToken.None); 	} } 

next 委托是框架构建的事件执行链路,在中间件中可以拦截事件、决定是否执行事件链路。

在中间件中调用 next() 委托时,框架开始按顺序执行事件,即前面提到的 My1EventEventHandlerMy2EventEventHandler

当一个事件有多个执行器时,由于程序可能会在任何时刻挂掉,因此本地事务必不可少。

例如,在中间件中注入数据库上下文,然后启动事务执行数据库操作,当其中一个 EventHandler 执行失败时,执行链路会回滚,同时不会提交事务。

可以参考 消费者模式 实现中间件的重试和补偿方法。

示例如下:

public class TestEventMiddleware : IEventMiddleware<TestEvent> {     private readonly BloggingContext _bloggingContext;      public TestEventMiddleware(BloggingContext bloggingContext)     {         _bloggingContext = bloggingContext;     }      public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)     {         using (var transaction = _bloggingContext.Database.BeginTransaction())         {             await next(@event, CancellationToken.None);             await transaction.CommitAsync();         }     }      public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)     {         return Task.CompletedTask;     }      public Task<bool> FallbackAsync(EventBody<TestEvent>? message)     {         return Task.FromResult(true);     } }  [EventOrder(0)] public class My1EventEventHandler : IEventHandler<TestEvent> {     private readonly BloggingContext _bloggingContext;      public My1EventEventHandler(BloggingContext bloggingContext)     {         _bloggingContext = bloggingContext;     }      public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)     {         Console.WriteLine($"{@event.Id} 被补偿,[1]");     }      public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)     {         await _bloggingContext.Posts.AddAsync(new Post         {             Title = "鲁滨逊漂流记",             Content = "随便写写就对了"         });         await _bloggingContext.SaveChangesAsync();     } }  [EventOrder(1)] public class My2EventEventHandler : IEventHandler<TestEvent> {     private readonly BloggingContext _bloggingContext;      public My2EventEventHandler(BloggingContext bloggingContext)     {         _bloggingContext = bloggingContext;     }     public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)     {         Console.WriteLine($"{@event.Id} 被补偿,[2]");     }      public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)     {         await _bloggingContext.Posts.AddAsync(new Post         {             Title = "红楼梦",             Content = "贾宝玉初试云雨情"         });         await _bloggingContext.SaveChangesAsync();          throw new OperationCanceledException("故意报错");     } } 

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

事件执行时,如果出现异常,也是会被重试的,中间件 TestEventMiddleware 的 FaildAsync、FallbackAsync 会被依次执行。

你可以参考 消费者模式 或者 重试

分组消费

事件分组消费主要是利用同一个 IConnection 同时处理多个消息队列,提高通道利用率。

示例:

[EventTopic("EventGroup_1", Group = "aaa")] public class Test1Event { 	public string Message { get; set; }  	public override string ToString() 	{ 		return Message; 	} }  [EventTopic("EventGroup_2", Group = "aaa")] public class Test2Event { 	public string Message { get; set; }  	public override string ToString() 	{ 		return Message; 	} } 

Maomi.MQ 的 IConsumer<T> 是一个消费者(一个队列)使用一个 IConnection,默认情况下事件总线也是。

对于哪些并发量不大或利用率较低的队列,可以通过事件分组将其合并到同一个 IConnection 中进行处理。

使用方法很简单,只需要在定义事件时,配置 [EventTopic] 特性的 Group 方法即可。

由于不同队列被放到一个 IConnection 中消费,如果事件都设置了 Qos,那么框架会默认计算平均值,例如:

[EventTopic("web3_1", Group = "aaa", Qos = 10)] public class Test1Event  [EventTopic("web3_2", Group = "aaa", Qos = 6)] public class Test2Event 

此时框架会设置 Qos 为 8

配置

在引入 Maomi.MQ 框架时,可以配置相关属性,示例和说明如下:

// this. builder.Services.AddMaomiMQ((MqOptionsBuilder options) => {     // 当前程序节点,用于配置分布式雪花 id 	options.WorkId = 1;          // 是否自动创建队列 	options.AutoQueueDeclare = true;          // 当前应用名称,用于标识消息的发布者和消费者程序 	options.AppName = "myapp";          // RabbitMQ 配置 	options.Rabbit = (ConnectionFactory options) => 	{ 		options.HostName = "192.168.3.248"; 		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 	}; }, [typeof(Program).Assembly]);  // 要被扫描的程序集 

消费者配置

消费者模式 [Consumer] 和事件总线模式 [EventTopic] 具有相同的属性配置,其配置说明如下:

名称 类型 默认值 说明
Queue string 队列名称
DeadQueue string? 绑定死信队列名称
ExecptionRequeue bool true 出现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的
Expiration int 0 队列消息过期时间,单位毫秒
Qos ushort 1 Qos
RetryFaildRequeue bool false 消费失败次数达到条件时,是否放回队列
Group string? 分组名称
AutoQueueDeclare AutoQueueDeclare AutoQueueDeclare.None 是否自动创建队列

环境隔离

目前还在考虑要不要支持多租户模式。

在开发中,往往需要在本地调试,本地程序启动后会连接到开发服务器上,一个队列收到消息时,会向其中一个消费者推送消息。那么我本地调试时,发布一个消息后,可能本地程序收不到该消息,而是被开发环境中的程序消费掉了。

这个时候,我们希望可以将本地调试环境跟开发环境隔离开来,可以使用 RabbitMQ 提供的 VirtualHost 功能。

首先通过 put 请求创建一个新的 VirtualHost,请参考文档:https://www.rabbitmq.com/docs/vhosts#using-http-api

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

然后在代码中配置 VirtualHost:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) => { 	options.WorkId = 1; 	options.AutoQueueDeclare = true; 	options.AppName = "myapp"; 	options.Rabbit = (ConnectionFactory options) => 	{ 		options.HostName = "192.168.3.248"; #if DEBUG 		options.VirtualHost = "debug"; #endif 		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 	}; }, [typeof(Program).Assembly]); 

雪花 id 配置

Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每个事件在集群中都有一个唯一 id。

框架通过 IIdFactory 接口创建雪花 id,你可以通过替换 IIdFactory 接口配置雪花 id 生成规则。

services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId)); 

示例:

public class DefaultIdFactory : IIdFactory {     /// <summary>     /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.     /// </summary>     /// <param name="workId"></param>     public DefaultIdFactory(ushort workId)     {         var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };         YitIdHelper.SetIdGenerator(options);     }      /// <inheritdoc />     public long NextId() => YitIdHelper.NextId(); } 

IdGenerator 框架生成雪花 id 配置请参考:

https://github.com/yitter/IdGenerator/tree/master/C%23

Qos 并发和顺序

基于消费者模式和基于事件模式都是通过特性来配置消费属性,Qos 是其中一个重要的属性。

Qos 场景

对于消费者模式和事件总线模式,在没有使用 Group 属性配置消费行为时,每个队列都会独占一个 IConnection 以及 Host service。

对于消费频率很高但是不能并发的队列,最好不要设置 Group 属性,以及务必设置 Qos = 1。这样依赖,该消费者会独占资源进行消费,在保证顺序的情况下,独占资源有助于提高消费能力。

[Consumer("web1", Qos = 1)] public class MyConsumer : IConsumer<TestEvent> { } 

当需要需要提高消费吞吐量,而且不需要顺序消费时,可以将 Qos 设置高一些,RabbitMQ Client 框架会通过预取等方式提高吞吐量,并且多条消息可以并发消费。

如果判断一些消费者的消费频率不会很高时,可以将这些消费者放到一个分组中。

当多个消费者或事件配置共用一个分组时,那么这些事件的 Qos 应当一致,否则按照平均值来算。

示例:

[Consumer("web1", Qos = 10, Group = "group")] public class My1Consumer : IConsumer<TestEvent> { }  [Consumer("web2", Qos = 6, Group = "group")] public class My2Consumer : IConsumer<TestEvent> { } 

由于两个消费者使用相同的分组,因此复用通道的 Qos 会被设置为 8。

如果消费频率不高,但是需要顺序消费时,可以将这些消费者放到同一个分组中,并且 Qos 设置为 1。

[Consumer("web1", Qos = 1, Group = "group1")] public class My1Consumer : IConsumer<TestEvent> { }  [Consumer("web2", Qos = 1, Group = "group1")] public class My2Consumer : IConsumer<TestEvent> { } 

并发和异常处理

第一次情况,Qos 为 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。

第二种情况,Qos 为 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。

Qos 为 1 时,会保证严格顺序消费,ExecptionRequeue 、RetryFaildRequeue 会影响失败的消息是否会被放回队列,如果放回队列,下一次消费会继续消费之前失败的消息。如果错误(如 bug)得不到解决,则会出现消费、失败、放回队列、重新消费这样的循环。

第三次情况,Qos > 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。

第四种情况,Qos > 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。

当 Qos 大于 1 时,如果设置了 RetryFaildRequeue = true,那么消费失败的消息会被放回队列中,但是不一定下一次会立即重新消费该条消息。

重试

重试时间

当消费者 ExecuteAsync 方法异常时,框架会进行重试,默认会重试五次,按照 2 作为指数设置重试时间间隔。

第一次失败后,间隔 2 秒重试,第二次失败后,间隔 4 秒,接着分别是 8、16、32 秒。

Maomi.MQ.RabbitMQ 使用了 Polly 框架做重试策略管理器,默认通过 DefaultRetryPolicyFactory 服务生成重试间隔策略。

DefaultRetryPolicyFactory 代码示例如下:

/// <summary> /// Default retry policy.<br /> /// 默认的策略提供器. /// </summary> public class DefaultRetryPolicyFactory : IRetryPolicyFactory {     /// <inheritdoc/>     public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)     {         // Create a retry policy.         // 创建重试策略.         var retryPolicy = Policy             .Handle<Exception>()             .WaitAndRetryAsync(                 retryCount: 5,                 sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),                 onRetry: async (exception, timeSpan, retryCount, context) =>                 {                     _logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);                     await FaildAsync(queue, exception, timeSpan, retryCount, context);                 });          return Task.FromResult(retryPolicy);     }           public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)     {         return Task.CompletedTask;     } } 

你可以通过实现 IRetryPolicyFactory 接口,替换默认的重试策略服务服务。

services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>(); 

重试机制

设定消费者代码如下:

    [Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]     public class MyConsumer : IConsumer<TestEvent>     {         private  int _retryCount = 0;         // 消费         public async Task ExecuteAsync(EventBody<TestEvent> message)         {             Console.WriteLine($"执行 {message.Body.Id} 第几次:{_retryCount} {DateTime.Now}");             _retryCount++;             throw new Exception("1");         }          // 每次失败时被执行         public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)         {             Console.WriteLine($"重试 {message.Body.Id} 第几次:{retryCount} {DateTime.Now}");             await Task.CompletedTask;         }           // 最后一次失败时执行         public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)         {             Console.WriteLine($"执行 {message.Body.Id} 补偿 {DateTime.Now}");             return true;         }     } } 

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

首先会执行 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 消费消息,此时 ExecuteAsync() 执行失败,立即触发 FaildAsync() 函数。

然后等待一段时间间隔后,接着会重新执行 ExecuteAsync() 方法。

比如默认重试机制是重试五次,那么最终 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 都会被执行 6次,一次正常消费和五次重试消费。

FallbackAsync() 方法会在最后一次重试失败后被调用,该函数要返回一个 bool 类型。

当多次重试失败后,框架会调用 FallbackAsync 方法,如果该方法放回 true,那么框架会认为虽然 ExecuteAsync() 执行失败,但是通过 FallbackAsync() 已经补偿好了,该消息会被当做正常完成消费,框架会向 RabbitMQ 服务器发送 ACK,接着消费下一条消息。

如果 FallbackAsync() 返回 true,框架会认为该消息彻底失败,如果设置了 RetryFaildRequeue = true,那么该条消息会被放回消息队列,等待下一次消费。否则该条消息会被直接丢弃。

持久化剩余重试次数

当消费者处理消息失败时,默认消费者会重试 5 次,如果已经重试了 3 次,此时程序重启,那么下一次消费该消息时,依然是继续重试五次。

需要记忆重试次数,在程序重启时,能够按照剩余次数进行重试。

引入 Maomi.MQ.RedisRetry 包。

配置示例:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) => { 	options.WorkId = 1; 	options.AutoQueueDeclare = true; 	options.AppName = "myapp"; 	options.Rabbit = (ConnectionFactory options) => 	{ 		options.HostName = "192.168.3.248"; 		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 	}; }, [typeof(Program).Assembly]);  builder.Services.AddMaomiMQRedisRetry((s) => { 	ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248"); 	IDatabase db = redis.GetDatabase(); 	return db; }); 

默认 key 只会保留 5 分钟。也就是说,如果五分钟之后程序才重新消费该消息,那么就会剩余重试次数就会重置。

死信队列

死信队列

可以给一个消费者或事件绑定死信队列,当该队列的消息失败后并且不会放回队列时,该消息会被推送到死信队列中,示例:

[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)] public class DeadConsumer : IConsumer<DeadEvent> { 	// 消费 	public Task ExecuteAsync(EventBody<DeadEvent> message) 	{ 		Console.WriteLine($"事件 id:{message.Id}"); 		throw new OperationCanceledException(); 	}  	// 每次失败时被执行 	public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadEvent>? message) => Task.CompletedTask;  	// 最后一次失败时执行 	public Task<bool> FallbackAsync(EventBody<DeadEvent>? message) => Task.FromResult(false); }  // ConsumerWeb_dead 消费失败的消息会被此消费者消费。 [Consumer("ConsumerWeb_dead_queue", Qos = 1)] public class DeadQueueConsumer : IConsumer<DeadQueueEvent> { 	// 消费 	public Task ExecuteAsync(EventBody<DeadQueueEvent> message) 	{ 		Console.WriteLine($"死信队列,事件 id:{message.Id}"); 		return Task.CompletedTask; 	}  	// 每次失败时被执行 	public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;  	// 最后一次失败时执行 	public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false); }  

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

如果使用死信队列,则务必将 RetryFaildRequeue 设置为 false,那么消费者会在重试多次失败后,向 RabbitMQ 发送 nack 信号,RabbitMQ 就会将该消息转发到绑定的死信队列中。

延迟队列

创建一个消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。

[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")] public class EmptyDeadConsumer : EmptyConsumer<DeadEvent> { }  // ConsumerWeb_dead 消费失败的消息会被此消费者消费。 [Consumer("ConsumerWeb_dead_queue_2", Qos = 1)] public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent> {     // 消费     public Task ExecuteAsync(EventBody<DeadQueueEvent> message)     {         Console.WriteLine($"事件 id:{message.Id} 已到期");         return Task.CompletedTask;     }      // 每次失败时被执行     public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;      // 最后一次失败时执行     public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false); } 

例如,用户下单之后,如果 15 分钟之内没有付款,那么消息到期时,自动取消订单。

可观测性

功能还在继续完善中。请参考 ActivitySourceApi 示例。

为了快速部署可观测性平台,可以使用 OpenTelemetry 官方提供的示例包快速部署相关的服务。

下载示例仓库源码:

git clone https://github.com/open-telemetry/opentelemetry-demo.git 

由于示例中会包含大量的 demo 微服务,因此我们需要打开 docker-compose.yml 文件,将 services 节点的 Core Demo ServicesDependent Services 服务直接删除,只保留可观测性组件。或者直接点击下载笔者已经修改好的版本: docker-compose.yml

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

执行命令部署可观测性服务:

docker-compose up -d 

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

opentelemetry-collector-contrib 用于收集链路追踪的可观测性信息,有 grpc 和 http 两种,监听端口如下:

Port Protocol Endpoint Function
4317 gRPC n/a Accepts traces in OpenTelemetry OTLP format  (Protobuf).
4318 HTTP /v1/traces Accepts traces in OpenTelemetry OTLP format  (Protobuf and JSON).

经过容器端口映射后,对外端口可能不是 4317、4318 了。

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

引入 Maomi.MQ.Instrumentation 包,以及其它相关 OpenTelemetry 包。

<PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" /> <PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" /> <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" /> <PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" /> <PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" /> 

然后注入服务:

const string serviceName = "myapp";  builder.Services.AddMaomiMQ((MqOptionsBuilder options) => { 	options.WorkId = 1; 	options.AutoQueueDeclare = true; 	options.AppName = serviceName; 	options.Rabbit = (ConnectionFactory options) => 	{ 		options.HostName = "192.168.3.248"; 		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name; 	}; }, [typeof(Program).Assembly]);  builder.Services.AddOpenTelemetry() 	  .ConfigureResource(resource => resource.AddService(serviceName)) 	  .WithTracing(tracing => 	  { 		  tracing.AddMaomiMQInstrumentation(options => 		  {               options.Sources.AddRange(MaomiMQDiagnostic.Sources); 			  options.RecordException = true; 		  }) 		  .AddAspNetCoreInstrumentation() 		  .AddOtlpExporter(options => 		  { 			  options.Endpoint = new Uri("http://127.0.0.1:32772/v1/traces"); 			  options.Protocol = OtlpExportProtocol.HttpProtobuf; 		  }); 	  }); 

启动服务后,进行发布、消费,链路追踪信息会被自动推送到 OpenTelemetry Collector 中,通过 Jaeger 、Skywalking 等组件可以读取出来。

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

由于 publish、consumer 属于兄弟 trace 而不是同一个 trace,因此需要通过 Tags 查询相关联的 trace,格式 event.id=xxx

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

发表评论

相关文章