RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。

系列目录

RabbitMQ 入门系列:1、MQ的应用场景的选择与RabbitMQ安装。

RabbitMQ 入门系列:2、基础含义:链接、通道、队列、交换机。

RabbitMQ 入门系列:3、基础含义:持久化、排它性、自动删除、强制性、路由键。

RabbitMQ 入门系列:4、基础编码:官方SDK使用:链接创建、单例改造、发送消息、接收消息。

RabbitMQ 入门系列:5、基础编码:交换机的进阶介绍及编码方式。

RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。

RabbitMQ 入门系列:7、保障消息:不重复消费:产生消息的唯一ID。

RabbitMQ 入门系列:8、扩展内容:接收信息时:可否根据RoutingKey过滤监听信息,答案是不能。

RabbitMQ 入门系列:9、扩展内容:死信队列:真不适合当延时队列。

RabbitMQ 入门系列:10、扩展内容:延时队列:延时队列插件及其有限的适用场景。

前言:

本篇简单介绍如何保障消息不丢失的处理方式。

1、保障消息不丢失:发送方

主要是通过消息确认或事务,来保障这个过程,下面见具体代码:

1、通过确认机制处理的代码:

using RabbitMQ.Client; using System.Text;using (var channel = Rabbit.Instance.DefaultConnection.CreateModel()) {     channel.ConfirmSelect();//开启确认     channel.QueueDeclare("FirstQueue", false, false, false);     channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("这是要发送的内容"));     if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))//设置最长超时时间     {         //发送确认成功     }     else     {         //超时或失败,需要处理是否重发消息。     } }

2、通过事务机制处理的代码:

using RabbitMQ.Client; using System.Text;  using (var channel = Rabbit.Instance.DefaultConnection.CreateModel()) {     channel.TxSelect();     channel.QueueDeclare("FirstQueue", false, false, false);     channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("这是要发送的内容"));     try     {         channel.TxCommit();     }     catch (Exception)     {         channel.TxRollback();         //处理事务提交失败的逻辑。     } }

2、保障消息不丢失:RabbitMQ端

对于RabbitMQ端的消息保障,我们人为可以处理的是,设置创建的队列或消息是否持久化。

通过创建持久化的队列或消息,可以保障消息写入硬盘,重启时仍能还原信息。

//第二个参数:是否持久化 channel.QueueDeclare("FirstQueue", true, false, false);

3、保障消息不丢失:接收方

接收方主要是通过消息确认,来指示是否收到信息。

var channel = Rabbit.Instance.DefaultConnection.CreateModel();  var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {     var message = Encoding.UTF8.GetString(ea.Body.ToArray());     Console.WriteLine("收到默认消息 {0}", message); }; channel.BasicConsume(queue: "FirstQueue",                       autoAck: true,                       consumer: consumer);

在以上代码中,通过指定authAck可以自动回应收到信息。

当然,有需要也可以手动回应:

var channel = Rabbit.Instance.DefaultConnection.CreateModel();  var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {     var message = Encoding.UTF8.GetString(ea.Body.ToArray());     Console.WriteLine("收到默认消息 {0}", message);   try     {         channel.BasicAck(ea.DeliveryTag, false);     }     catch (Exception err)     {         //处理确认失败的情况。     } }; channel.BasicConsume(queue: "FirstQueue",                       autoAck: false,                       consumer: consumer);

说明:

为了避免消息丢失问题,消息的确认,最好在是业务处理完再进行确认。  否则会出现第三方中介出问题时,或业务处理出问题时,或刚确认好消息,业务还没处理就系统异常,导致消息未消费就丢失的问题。

 

4、发送方:还有一种情况:通过交换机发送过去,但交换机没送到指定的队列时

这时候应答也是正常的,但数据丢失,这种情况,是这样处理的:

RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。

 

就两点:

1、发送信息BasicPublish方法的第三个参数:mandatory设置为true。  2、定义接收的回调:BasicReturn事件。

总结:

本篇简单介绍如何使用RabbitMQ消息时,做到消息的可靠性,不丢失。

RabbitMQ 入门系列:1、MQ的应用场景的选择与RabbitMQ安装。

RabbitMQ 入门系列:2、基础含义:链接、通道、队列、交换机。

RabbitMQ 入门系列:3、基础含义:持久化、排它性、自动删除、强制性、路由键。

RabbitMQ 入门系列:4、基础编码:官方SDK使用:链接创建、单例改造、发送消息、接收消息。

RabbitMQ 入门系列:5、基础编码:交换机的进阶介绍及编码方式。

RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。

RabbitMQ 入门系列:7、保障消息:不重复消费:产生消息的唯一ID。

RabbitMQ 入门系列:8、扩展内容:接收信息时:可否根据RoutingKey过滤监听信息,答案是不能。

RabbitMQ 入门系列:9、扩展内容:死信队列:真不适合当延时队列。

RabbitMQ 入门系列:10、扩展内容:延时队列:延时队列插件及其有限的适用场景。

RabbitMQ 入门系列:1、MQ的应用场景的选择与RabbitMQ安装。

RabbitMQ 入门系列:2、基础含义:链接、通道、队列、交换机。

RabbitMQ 入门系列:3、基础含义:持久化、排它性、自动删除、强制性、路由键。

RabbitMQ 入门系列:4、基础编码:官方SDK使用:链接创建、单例改造、发送消息、接收消息。

RabbitMQ 入门系列:5、基础编码:交换机的进阶介绍及编码方式。

RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。

RabbitMQ 入门系列:7、保障消息:不重复消费:产生消息的唯一ID。

RabbitMQ 入门系列:8、扩展内容:接收信息时:可否根据RoutingKey过滤监听信息,答案是不能。

RabbitMQ 入门系列:9、扩展内容:死信队列:真不适合当延时队列。

RabbitMQ 入门系列:10、扩展内容:延时队列:延时队列插件及其有限的适用场景。

发表评论

评论已关闭。

相关文章