写于:2019-09-20 22:52:37

参考资料:《RabbitMQ实战指南》 RabbitMQ 官网

在前面的 RabbitMQ【理论篇】消息的可靠投递-投递消息RabbitMQ【理论篇】消息的可靠投递-消息路由 中描述了如何保证 msg(消息) 投递到 RabbitMQ Borker(RabbitMQ 服务节点) 的可靠性。 当 msg(消息) 到达 RabbitMQ Borker(RabbitMQ 服务节点) 之后 producer(生产者) 的使命就完成了,接下来就是 consumer(消费者) 消费 msg(消费者)

# msg(消息)两种消费模式

msg(消息)的消费有两种模式

  • 推(由 RabbitMQ Borker(RabbitMQ 服务节点) 主动推送过来,consumer(消费者)被动接收
  • 拉(由 consumer(消费者)主动拉取 msg(消息),RabbitMQ Borker(RabbitMQ 服务节点) 被动推送

# 推模式

由 RabbitMQ Borker(RabbitMQ 服务节点) 主动将 msg(消息) 推送给 consumer(消费者)

# 演示代码

模拟数据:使用 **producer(生产者)**生成 100 条 msg(消息)

// 声明队列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		String msg = new String(body, "UTF-8");
		System.err.println("Current Consumer:" + consumerTag);
		System.out.println("Received message : '" + msg + "'"); 
	}
};
// 开始获取消息
channel.basicConsume(QUEUE_NAME, true, consumer);

代码逻辑

autoAck=true (自动确认)

创建一个 consuemr(消费者) 进行**msg(消息)**的消费,并打印相关内容。

# 演示结果

推模式_演示结果

通过控制台打印,以及查看 RabbitMQ Web 管理界面中的 **Queue(队列)**信息,此时 **Queue(队列)**中的 **msg(消息)**被全部消费。

推模式 下,只要存在 consumer(消费者) ,RabbitMQ Borker(RabbitMQ 服务节点) 就会不断给 consumer(消费者) 推送 msg(消息)

# 抓包图

推模式_抓包图

通过抓包图,能够直观的看到 RabbitMQ Borker(RabbitMQ 服务节点) 一批一批的将 msg(消息) 推送给了 consumer(消费者)

# 拉模式

模拟数据:使用 **producer(生产者)**生成 100 条 msg(消息)

# 演示代码

关键代码

// 主动拉取消息(每次只能获取一条)
// 不能使用 while 循环,模拟的推的效果,会有性能问题
GetResponse getResponse = channel.basicGet(QUEUE_NAME, true);
System.out.println("Received message : '" + new String(getResponse.getBody()) + "'");

代码逻辑

autoAck = true (自动确认)

主动从指定的 Queue(队列) 中拉取一条 msg(消息)

# 演示结果

拉模式_演示结果

从控制台和 RabbitMQ web 管理界面中的 **Queue(队列)**显示,能够看到,拉模式 只消费一条 msg(消息)

# 抓包图

拉模式_抓包图

对比 推模式 ,拉模式consumer(消费者) 主动从 RabbitMQ Borker(RabbitMQ 服务节点) 中拉取 msg(消息),并且只拉取一条 msg(消息)

# 如何保证 msg(消息)与 consumer(消费者)间的正常流转

为了确保 **msg(消息)**在与 consuemr(消费者)之间的正常流转,RabbitMQ 提供了消息确认机制

# 消息确认机制(message acknowledement)

消息确认机制 的实现通过配置项 “autoAck” 实现。该配置项为 boolean 类型,可以配置:

  • autoAck = true
  • autoAck = false

# 当 autoAck = true 时

autoAck = true时,consumer(消费者) 在进行 msg(消息) 消费处理过程中,无论成功与否,都不需要给 RabbitMQ Borker(RabbitMQ 服务节点) 发送处理结果。

RabbitMQ Borker(RabbitMQ 服务节点)msg(消息) 推送给 consumer(消费者) 之后,会直接把 msg(消息)Queue(队列) 和磁盘中移除。

如果此时 consumer(消费者)msg(消息) 处理异常,由于 msg(消息) 已经从 RabbitMQ Borker(RabbitMQ 服务节点) 中移除,此时 msg(消息) 消费失败,且无法再次进行处理。(针对业务需要进行相关的补偿处理

# 当 autoAck = false 时

autoAck = false 时,consumer(消费者) 在进行 msg(消息) 消费处理之后,需要将处理结果通知 RabbitMQ Borker(RabbitMQ 服务节点)

RabbitMQ Borker(RabbitMQ 服务节点)msg(消息) 推送给 consumer(消费者) 后,会把Queue(队列) 中对应的 **msg(消息)**进行逻辑删除。

  • 处理方式一RabbitMQ Borker(RabbitMQ 服务节点) 收到 consumer(消费者) 的确认信息,对 msg(消息) 进行物理删除
  • 处理方式二RabbitMQ Borker(RabbitMQ 服务节点) 长时间未收到 consumer(消费者) 的 Basic.Ack,同时 consumer(消费者) 断开了 connection(连接),RabbitMQ Borker(RabbitMQ 服务节点) 判定此次msg(消息) 消费失败,将 msg(消息) 重新放入队列中。

# 对比 autoAck 为 true or false

# 抓包对比 autoAck 为 true or false 时 AMQP 的消息流转

RabbitMQ_autoack_false和autoack_true对比抓包图

  • 当 autoAck = false 时,consumer(消费者) 需要对处理结果进行手动确认,此时需要向 RabbitMQ Borker(RabbitMQ 服务节点) 发送 Basic.ACK 确认指令。RabbitMQ Borker(RabbitMQ 服务节点) 收到 Basic.ACK 之后,会对相应的 msg(消息) 进行物理删除。

  • 当 autoAck = true 时,当 RabbitMQ Borker(RabbitMQ 服务节点) 将消息推送给 **consumer(消费者)**后,两者间的交互就完了,RabbitMQ Borker(RabbitMQ 服务节点) 不管 msg(消息) 是否被正常消费,直接从 Queue(队列) 中移除 msg(消息) 并从磁盘中移除。(通过代码的 debug,同样能够得到,所谓的 自动确认,就是在 RabbitMQ Borker(RabbitMQ 服务节点)msg(消息) 成功推送给 consumer(消费者)后,就认定msg(消息) 发送消费成功,直接进行 msg(消息) 的物理删除。

# autoAck 为 true or false 时,可能存在的问题

采用自动确认机制(autoAck = true),可能出现 msg(消息)未正常消费,但是 RabbitMQ 已经将该 msg(消息)物理删除的情况。

采用手动确认(autoAck = false),可能存在 msg(消息)已经正常消费,但是由于网络抖动或者分区等情况 RabbitMQ 没有收到确认消息(Basic.Ack),且此时 RabbitMQ Borker(RabbitMQ 服务节点)consumer(消费者)间的 连接(connection) 被判定为断开,这时 RabbitMQ Borker(RabbitMQ 服务节点) 判定该 msg(消息) 没有被正确消费,重新入 Queue(队列),等待投递给下一个consumer(消费者),这时候可能导致 msg(消息) 的重复消费。

# autoAck 为 true or false 时的性能对比

通过上面的抓包图,能够知道 手动确认(autoAck = false) 比 自动确认(autoAck = true)多了 Basic.Ack 这一步骤,所以在性能上 自动确认(autoAck = true) 在性能上会比 手动确认(autoAck =false) 高一点。

# autoAck 为 true or false 取决于业务的应用场景

如:在分布式链路追踪中,在使用 RabbitMQ 作为中间流转节点进行信息上报时,可以使用 自动确认机制(autoAck = true)。 链路信息的上报本身就是有一定的采样率,本身就是一种概率性的统计,在消费信息时即使出现消费失败的情况,消息丢失就丢失了,并不会对整体的评估有什么影响。

如:使用 RabbitMQ 作为柔性分布式事务的组成部件,分布式事务需要依赖于消息的可靠投递的前提下,还需要保证 msg(消息)一定被消费。这时候就需要 手动确认机制(autoAck = false) (不过在使用 手动确认机制时,会存在 consumer(消费者)发送了 Basic.Ack ,但是网络抖动等导致 RabbitMQ Borker(RabbitMQ 服务节点)将 msg(消息)重新入 Queue(队列)中,此时就需要对 msg(消息)实现幂等操作)

# 通过代码验证 消息确认机制

# autoAck = false

producer(生产者):发送一条 msg(消息)到 RabbitMQ Borker(RabbitMQ 服务节点)。

此时 RabbitMQ web 界面 Queue(队列)信息显示如下

RabbitMQ_生产者_RabbitMQ_Web界面图

消费者:关键代码

RabbitMQ_生产者_utoack_false_消费者_主要代码图

设定:autoAck = false,同时进行手动确认操作 Basic.Ack

运行结果

RabbitMQ web 界面显示如下

RabbitMQ_生产者_utoack_false_消费者_未发送确认前的RabbitMQ状态变更图

1、在没有任何 consumer(消费者) 绑定 Queue(队列) 时,**msg(消息)**在 Queue(队列) 中的状态为 Ready 。 2、在 autoAck = false 情况下,当 msg(消息) 推送给 consumer(消费者) 之后,在未收到 **consumer(消费者)**的 Basic.Ack 之前,RabbitMQ Queue(队列)msg(消息) 状态为 Unacked 。 3、Queue(队列)msg(消息) 状态为 "Unacked" 时,如果 RabbitMQ Borker(RabbitMQ 服务节点) 收到 **consumer(消费者)**的 Basic.Ack,msg(消息) 从队列中删除,如果未收到 Basic.Ack 并且 consumer(消费者)RabbitMQ Borker(RabbitMQ 服务节点) 间的 connection(连接) 断开,则 msg(消息) 重新入 Queue(队列) ,同时状态变更为 Ready

# autoAck = true

producer(生产者):发送一条 msg(消息)到 RabbitMQ Borker(RabbitMQ 服务节点)。

此时 RabbitMQ web 界面 Queue(队列)信息显示如下

RabbitMQ_生产者_RabbitMQ_Web界面图

消费者:关键代码

消息确认机制_ack_true_演示代码

运行结果

RabbitMQ web 界面显示如下

消息确认机制_ack_true_运行结果_rabbitMq_web_图

当 autoAck = true 时,msg(消息) 发送给 consumer(消费者) 之后,msg(消息) 直接从 Queue(队列) 中移除。

# 消息确认机制 - 消息拒绝

在开启手动确认(autoAck = false)时,在代码操作中,我们需要手动指定两个操作:

  • 指定:autoAck = false
  • 发送 Basic.Ack 指令给 RabbitMQ Borker(RabbitMQ 服务节点)

consumer(消费者) 在进行 msg(消息) 的时候除了向 RabbitMQ Borker(RabbitMQ 服务节点) 发送 Basic.Ack 指令,还可以发送 Basic.Reject 指令。

Basic.RejectBasic.Ack 区别:RabbitMQ Borker(RabbitMQ 服务节点) 收到 Basic.Ack 指令后,直接从 Queue(队列) 中移除 msg(消息),而 Basic.Reject 可以指定参数 requeue ,该参数可以用来指定被拒绝消费的 msg(消息) 是否需要重新入 Queue(队列) 中。

  • requeue = true ,表示:需要将 msg(消息) 重新放入 Queue(队列) 中,等待重新被消费
  • requeue = false ,表示:直接将 msg(消息) 删除。

# 代码演示

# 消费者 关键代码

消息确认机制_ack_true_Reject_演示代码

代码逻辑

  • 指定 autoAck = true
  • 测试一:指定 Basic.Reject 参数 requeue = false
  • 测试二:指定 Basic.Reject 参数 requeue = true

# 运行结果

Reject 中 requeue = false 时

rabbitmq web 显示

消息确认机制_ack_true_Reject_requeue_false_rabbitMQ_web

当 requeue = false 时,被拒绝的 msg(消息) 直接从 Queue(队列) 中移除。

Reject 中 requeue = true 时

rabbitmq web 显示

消息确认机制_ack_true_Reject_requeue_true_rabbitMQ_web

当 requeue = true 时,被拒绝的 msg(消息) 被重新放入 Queue(队列) 中,等待投递给下一个 consumer(消费者)

抓包图

消息确认机制_ack_true_Reject_抓包

# 总结

# consumer(消费者)对于 msg(消息)的消费有两种方式

# 推模式

RabbitMQ Borker(RabbitMQ 服务节点)msg(消息) 主动推送给 consumer(消费者),只要一直有 consumer(消费者) 订阅,只要 **Queue(队列)**中有 msg(消息)RabbitMQ Borker(RabbitMQ 服务节点) 就会不断给 consumer(消费者) 进行 msg(消息) 推送。

# 拉模式

consumer(消费者) 主动从 RabbitMQ Borker(RabbitMQ 服务节点) 拉取 msg(消息),一次只能拉取一条 msg(消息)

# RabbitMQ 提供 消息确认机制 保证 msg(消息)一定会被消费

可以通过设置参数 autoAck 来指定**msg(消息)**被消费之后是否需要通知 RabbitMQ Borker(RabbitMQ 服务节点)

# autoAck = true

自动确认机制,consumer(消费者) 无需对消费完成的 msg(消息) 进行手动确认。

一旦 msg(消息) 被推送给了 consumer(消息者) ,该 msg(消息) 直接从 Queue(队列) 中移除。

存在的问题msg(消息) 消费异常,但是已经从 Queue(队列) 中移除,无法重新进行处理。

# autoAck = false

手动确认机制,consumer(消费者) 需要手动给 RabbitMQ Borker(RabbitMQ 服务节点) 发送 Basic.Ack 或者 Basic.Reject 的命令。

Basic.AckRabbitMQ Borker(RabbitMQ 服务节点) 收到 Basic.Ackmsg(消息)Queue(队列) 中移除。

Basic.Reject 根据 requeue = true or false 对,msg(消息) 进行移除或者重新入 Queue(队列)

存在的问题:存在 msg(消息) 重复消费的问题。

精彩内容推送,请关注公众号!
最近更新时间: 3/26/2020, 12:09:24 PM