netcore下RabbitMQ队列、死信队列、延时队列及小应用

关于安装rabbitmq这里一笔掠过了。

下面进入正题:

1.新建aspnetcorewebapi空项目,NormalQueue,删除controllers文件夹已经无关的文件,这里为了偷懒不用console控制台:

public class Program     {         public static void Main(string[] args)         {             var builder = WebApplication.CreateBuilder(args);              // Add services to the container.              builder.Services.AddControllers();             // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle             builder.Services.AddEndpointsApiExplorer();             builder.Services.AddSwaggerGen();             builder.Services.AddHostedService<ConsumerService>();             builder.Services.AddHostedService<DeadLetterExchangeConsuerService>();             builder.Services.AddHostedService<DelayExchangeConsumerService>();             var app = builder.Build();              // Configure the HTTP request pipeline.             if (app.Environment.IsDevelopment())             {                 app.UseSwagger();                 app.UseSwaggerUI();             }             app.MapGet("/normal/{message}", ([FromRoute] string message) =>             {                 ConnectionFactory factory = new ConnectionFactory();                 factory.HostName = "localhost";                 factory.Port = 5672;                 using (IConnection connection = factory.CreateConnection())                 {                     using (IModel channel = connection.CreateModel())                     {                         var queueName = "rbTest202301";                         channel.QueueDeclare(queueName, true, false, false, null);                          {                             string sendMessage = string.Format("Message_{0}", message);                             byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);                             IBasicProperties basicProperties = channel.CreateBasicProperties();                             basicProperties.DeliveryMode = 2; //持久化  1=非持久化                             channel.BasicPublish("", queueName, basicProperties, buffer);                             Console.WriteLine("消息发送成功:" + sendMessage);                         }                     }                 }             });              app.MapGet("/deadletterexchange/{message}",([FromRoute] string message) =>{                 DeadLetterExchange.Send(message);             });              app.MapGet("/delayexchange/{message}", ([FromRoute] string message) => {                 DelayExchange.SendMessage(message);             });             app.UseHttpsRedirection();              app.UseAuthorization();               app.MapControllers();              app.Run();         }     }

大概的介绍一下program文件:

这里有三个mini控制器,从这里发送对应的消息到rabbitmq

"/normal/{message}"   普通队列,
"/deadletterexchange/{message}" 死信队列
"/deadletterexchange/{message}"   延时队列

netcore下RabbitMQ队列、死信队列、延时队列及小应用

 

        builder.Services.AddHostedService<ConsumerService>();             builder.Services.AddHostedService<DeadLetterExchangeConsuerService>();             builder.Services.AddHostedService<DelayExchangeConsumerService>();

这里就是消费的服务,注册成HostedService。

ConsumerService代码如下:
 public class ConsumerService : BackgroundService     {         protected override async Task ExecuteAsync(CancellationToken stoppingToken)         {             Console.WriteLine("normal Rabbitmq消费端开始工作!");             while (!stoppingToken.IsCancellationRequested)             {                 ConnectionFactory factory = new ConnectionFactory();                 factory.HostName = "localhost";                 factory.Port = 5672;                                 IConnection connection = factory.CreateConnection();                 {                     IModel channel = connection.CreateModel();                     {                         var queueName = "rbTest202301";                         channel.QueueDeclare(queueName, true, false, false, null);                         //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息                         channel.BasicQos(0, 1, false);                         //在队列上定义一个消费者                         var consumer = new EventingBasicConsumer(channel);                         channel.BasicConsume(queueName, false, consumer);                         consumer.Received += (ch, ea) =>                         {                             byte[] bytes = ea.Body.ToArray();                             string str = Encoding.UTF8.GetString(bytes);                             Console.WriteLine("队列消息:" + str.ToString());                             //回复确认                             channel.BasicAck(ea.DeliveryTag, false);                         };                     }                 }                 await Task.Delay(5000);             }         }     }

 

DeadLetterExchangeConsuerService代码如下:

 public class DeadLetterExchangeConsuerService : BackgroundService     {         protected override async Task ExecuteAsync(CancellationToken stoppingToken)         {             Console.WriteLine("RabbitMQ消费端死信队列开始工作");             while (!stoppingToken.IsCancellationRequested)             {                 DeadLetterExchange.Consumer();                 await Task.Delay(5000);             }         }     }

  public class DeadLetterExchange     {         public static string dlxExchange = "dlx.exchange";         public static string dlxQueueName = "dlx.queue";         static string exchange = "direct-exchange";         static string queueName = "queue_Testdlx";         static string dlxExchangeKey = "x-dead-letter-exchange";         static string dlxQueueKey = "x-dead-letter-rounting-key";         public static void Send(string message)         {             using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())             {                 using(var channel = connection.CreateModel())                 {                                         channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //创建交换机                     channel.QueueDeclare(queueName, true, false, false,new Dictionary<string, object>                     {                         { dlxExchangeKey,dlxExchange },                         {dlxQueueKey,dlxQueueName }                     }); // 创建队列                     channel.QueueBind(queueName, exchange, queueName);                                           var properties = channel.CreateBasicProperties();                     properties.Persistent= true;//持久化                     channel.BasicPublish(exchange,queueName,properties,Encoding.UTF8.GetBytes(message));                     Console.WriteLine($"向队列:{queueName}发送消息:{message}");                 }             }         }                  public static void Consumer()         {             var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();             var channel = connection.CreateModel();             channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, true, false); //创建sixin交换机             channel.QueueDeclare(dlxQueueName, true, false, false); // 创建sixin队列             channel.QueueBind(dlxQueueName, dlxExchange, dlxQueueName); //绑定sixin队列sixin交换机              channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //创建交换机             channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>                     {                         { dlxExchangeKey,dlxExchange },                         {dlxQueueKey,dlxQueueName }                     }); // 创建队列             channel.QueueBind(queueName, exchange, queueName);              var consumer = new EventingBasicConsumer(channel);             channel.BasicQos(0, 1, false);             consumer.Received += (model, ea) =>             {                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());                 Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认");                 channel.BasicNack(ea.DeliveryTag, false, requeue: false);             };             channel.BasicConsume(queueName, autoAck: false, consumer);         }     }

 

DelayExchangeConsumerService代码如下:

 public class DelayExchangeConsumerService : BackgroundService     {         protected override async Task ExecuteAsync(CancellationToken stoppingToken)         {             Console.WriteLine("RabbitMQ消费端延迟队列开始工作");             while (!stoppingToken.IsCancellationRequested)             {                                DelayExchange.Consumer();                 await Task.Delay(5000);             }         }     }

 public class DelayExchange     {          public static void SendMessage(string message)         {             //死信交换机             string dlxexChange = "dlx.exchange";             //死信队列             string dlxQueueName = "dlx.queue";              //消息交换机             string exchange = "direct-exchange";             //消息队列             string queueName = "delay_queue";              using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     ////创建死信交换机                     //channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);                     ////创建死信队列                     //channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);                     ////死信队列绑定死信交换机                     //channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);                      // 创建消息交换机                     channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);                     //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s                     channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:                                         new Dictionary<string, object> {                                              { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)                                              { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列                                              { "x-message-ttl",10000} //设置队列的消息过期时间                                         });                     //消息队列绑定消息交换机                     channel.QueueBind(queueName, exchange, routingKey: queueName);                      var properties = channel.CreateBasicProperties();                     properties.Persistent = true;                     //properties.Expiration = "5000";发布消息,延时5s                     //发布消息                     channel.BasicPublish(exchange: exchange,                                          routingKey: queueName,                                          basicProperties: properties,                                          body: Encoding.UTF8.GetBytes(message));                     Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message}");                 }             }         }          public static void Consumer()         {             //死信交换机             string dlxexChange = "dlx.exchange";             //死信队列             string dlxQueueName = "dlx.queue";             var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();             {                 //创建信道                 var channel = connection.CreateModel();                 {                     //创建死信交换机                     channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);                     //创建死信队列                     channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);                     //死信队列绑定死信交换机                     channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);                      var consumer = new EventingBasicConsumer(channel);                     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);                     consumer.Received += (model, ea) =>                     {                         //处理业务                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"{DateTime.Now},队列{dlxQueueName}消费消息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     };                     channel.BasicConsume(dlxQueueName, autoAck: false, consumer);                 }             }         }     }

 

netcore下RabbitMQ队列、死信队列、延时队列及小应用

netcore下RabbitMQ队列、死信队列、延时队列及小应用

netcore下RabbitMQ队列、死信队列、延时队列及小应用

netcore下RabbitMQ队列、死信队列、延时队列及小应用

 

延时队列实际应用场景可能比较复杂,比如每条消息的过期时间不一样,收到的消息的顺序有可能会乱掉。这些不做深究,自行百度。

关于死信队列常见应用场景之一下单,支付,支付超时的各种场景,下面通过一个简单的例子模拟一下

同样的新建一个空的webapi项目DeadLetterQueue,

program代码如下:

public class Program     {         public static void Main(string[] args)         {             var builder = WebApplication.CreateBuilder(args);              // Add services to the container.              builder.Services.AddControllers();             // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle             builder.Services.AddEndpointsApiExplorer();             builder.Services.AddSwaggerGen();             builder.Services.AddHostedService<ConsumerService>();             builder.Services.AddHostedService<DeadConsumerService>();             var app = builder.Build();              // Configure the HTTP request pipeline.             if (app.Environment.IsDevelopment())             {                 app.UseSwagger();                 app.UseSwaggerUI();             }             app.MapGet("/normal/{message}", ([FromRoute] string message) =>             {                 ConnectionFactory factory = new ConnectionFactory();                 factory.HostName = "localhost";                 factory.Port = 5672;                 using (IConnection connection = factory.CreateConnection())                 {                     using (IModel channel = connection.CreateModel())                     {                         var queueName = "rbTest2023010";                                                //channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);                         //channel.QueueDeclare("queue.dlx", true, false, false, null);                         channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);                         channel.QueueDeclare(queueName, true, false, false,                             new Dictionary<string, object>                         {                             { "x-message-ttl" ,10000},                             {"x-dead-letter-exchange","exchange.dlx" },                             {"x-dead-letter-routing-key","routingkey" }                         }                             );                                                 channel.QueueBind(queueName, "exchange.normal", "");                         {                             string sendMessage = string.Format("Message_{0}", message);                             byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);                             IBasicProperties basicProperties = channel.CreateBasicProperties();                             basicProperties.DeliveryMode = 2; //持久化  1=非持久化                             channel.BasicPublish("exchange.normal", queueName, basicProperties, buffer);                             Console.WriteLine($"{DateTime.Now}消息发送成功:{sendMessage}" );                         }                     }                 }             });             app.UseHttpsRedirection();              app.UseAuthorization();               app.MapControllers();              app.Run();         }     }

 

下单后消费代码ConsumerService如下

 public class ConsumerService : BackgroundService     {         protected override async Task ExecuteAsync(CancellationToken stoppingToken)         {             Console.WriteLine("normal Rabbitmq消费端开始工作!");             while (!stoppingToken.IsCancellationRequested)             {                 ConnectionFactory factory = new ConnectionFactory();                 factory.HostName = "localhost";                 factory.Port = 5672;                  IConnection connection = factory.CreateConnection();                 {                     IModel channel = connection.CreateModel();                     {                         var queueName = "rbTest2023010";                         channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);                         channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>                         {                             { "x-message-ttl" ,10000},                             {"x-dead-letter-exchange","exchange.dlx" },                             {"x-dead-letter-routing-key","routingkey" }                         });                          channel.QueueBind(queueName, "exchange.normal", "");                         //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息                         channel.BasicQos(0, 1, false);                         //在队列上定义一个消费者                         var consumer = new EventingBasicConsumer(channel);                         channel.BasicConsume(queueName, false, consumer);                         consumer.Received += (ch, ea) =>                         {                             byte[] bytes = ea.Body.ToArray();                             string str = Encoding.UTF8.GetString(bytes);                             Console.WriteLine($"{DateTime.Now}来自死信队列获取的消息: {str.ToString()}");                             //回复确认                             if (str.Contains("跳过")) //假设超时不处理,留给后面deadconsumerservice处理                             {                                 Console.WriteLine($"{DateTime.Now}来自死信队列获取的消息: {str.ToString()},该消息被拒绝");                                 channel.BasicNack(ea.DeliveryTag, false,false);                             }                             else  //正常消息处理                             {                                 Console.WriteLine($"{DateTime.Now}来自死信队列获取的消息: {str.ToString()},该消息被接受");                                 channel.BasicAck(ea.DeliveryTag, false);                             }                         };                      }                 }                 await Task.Delay(5000);             }         }     }

通过模拟发送的消息加入跳过两个字会拒收这条消息,这样就会跳到设置的exchange.dlx交换机队列去,如果没有跳过那么这条消息就正常处理掉,消费确认。

超时不处理后我们通过新的消费服务DeadConsumerService来处理这异常的消费,比如回复库存,订单状态改为取消等等

public class DeadConsumerService:BackgroundService     {         protected override async Task ExecuteAsync(CancellationToken stoppingToken)         {             Console.WriteLine("normal Rabbitmq消费端开始工作!");             while (!stoppingToken.IsCancellationRequested)             {                 ConnectionFactory factory = new ConnectionFactory();                 factory.HostName = "localhost";                 factory.Port = 5672;                              IConnection connection = factory.CreateConnection();                 {                     IModel channel = connection.CreateModel();                     {                         var queueName = "queue.dlx";                         channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);                         channel.QueueDeclare("queue.dlx", true, false, false, null);                                                 channel.QueueDeclare(queueName, true, false, false, null);                          channel.QueueBind(queueName, "exchange.dlx", "");                         //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息                         channel.BasicQos(0, 1, false);                         //在队列上定义一个消费者                         var consumer = new EventingBasicConsumer(channel);                         channel.BasicConsume("queue.dlx", false, consumer);                         consumer.Received += (ch, ea) =>                         {                             byte[] bytes = ea.Body.ToArray();                             string str = Encoding.UTF8.GetString(bytes);                             Console.WriteLine($"{DateTime.Now}超时未处理的消息: {str.ToString()}");                             //回复确认                             {                                 channel.BasicAck(ea.DeliveryTag, false);                             }                         };                      }                 }                 await Task.Delay(5000);             }         }     }

 

运行结果:

netcore下RabbitMQ队列、死信队列、延时队列及小应用

netcore下RabbitMQ队列、死信队列、延时队列及小应用

netcore下RabbitMQ队列、死信队列、延时队列及小应用

关于rabbitmq的死信队列和延时队列的介绍什么的这里不去贴baidu了,应用demo就这么多了,代码这里exercisebook/RabbitMQ.Test at main · liuzhixin405/exercisebook (github.com) 。小面分享一个完整一点的例子。

exercisebook/cat.seckill/cat.seckill at main · liuzhixin405/exercisebook (github.com)

感觉自己还是不合适写这些玩意儿,没有那么细心和耐心,有这时间真不如写写demo。

发表评论

相关文章