写于:2019-09-07 22:52:37

参考资料:《RabbitMQ实战指南》 RabbitMQ 官网

RabbitMQ【理论篇】RabbitMQ相关概念及工作模型 中介绍了在开发中使用 RabbitMQ 作为中间件的 三个主体步骤生产消息并发送RabbitMQ 路由消息并保存消费者消费消息

在于 RabbitMQ 交互中为了确保 msg(消息) 携带的业务信息被正常处理 三个主体步骤就需要有如下保障:(确保通过 RabbitMQ 进行业务交互的逻辑被正确处理,或者回滚):

  • 1、**msg(消息)**送达 RabbitMQ Borker(RabbitMQ 服务节点) 需要有保障,换句话说,**msg(消息)**要保证至少有一次成功投递到了 **RabbitMQ Borker(RabbitMQ 服务节点)中,虽然msg(消息)可能重复发送,但是msg(消息)**一定不会丢失。
  • 2、msg(消息) 到达 RabbitMQ Borker(RabbitMQ 服务节点) 中的 Exchange(交换器) 后需要正确路由到指定的位置(队列中)。同时为了保证 RabbitMQ 出现异常,**msg(消息)**不会丢失,需要对 Queue(队列)msg(消息) 进行持久化保存。
  • 3、确保 **msg(消息)被正常消费,同时对于重复的msg(消息)**进行幂等操作。

本文来讨论第一点:保障 msg(消息)送达 RabbitMQ Borker(RabbitMQ 服务节点)的可靠性

# 这也是面试经常问到的:如何确保 msg(消息)投递的可靠性?其中一个答案

# RabbitMQ 中对于 msg(消息)的可靠投递提供了两种解决方案

  • 1、通过 事务机制 解决 **msg(消息)**的可靠投递问题
  • 2、通过 发送/确认 机制 解决 **msg(消息)**的可靠投递问题

# 事务机制 确保 msg(消息)的可靠投递

RabbitMQ 中关于事务控制三个关键AMQP操作:

  • channel.txSelect:指定当前 channel(信道)为事务模式
  • chanel.txCommit:提交事务
  • channel.txRollback :事务回滚

# 演示单个消息事务提交

# 代码

// 开启事务
channel.txSelect();
try{
	channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
		 msg.getBytes());
	// 模拟异常
	//int a = 1/0;
	// 提交事务
	channel.txCommit();
}catch (Exception e){
	e.printStackTrace();
	// 事务回滚
	channel.txRollback();
}

# 模拟事务正常提交和事务回滚并进行分析

通过抓包对比事务提交和事务回滚时的 AMQP协议流转以及 消息是否存入 RabbitMQ中

RabbitMQ_生产者_消息可靠投递_事务机制_抓包分析图

再来看看,对应的AMQP 协议流转图

RabbitMQ_生产者_消息可靠投递_事务机制_AMQP协议流转图

通过抓包图 和 AMQP 协议流转图,能够得到

  • 1、事务正常提交与异常回滚,在 AMQP 协议中的区别在于:**Tx.Commit(提交)**和 **Tx.Rollback(回滚)**的执行。
  • 2、在事务正常提交之后, **msg(消息)**才会进入到 RabbitMQ 队列中。

# 尝试通过事务机制,发送多条 msg(消息)

关键代码:

// 开启事务
channel.txSelect();
int LOOP_TIMES = 5;
for(int i = 0;i < LOOP_TIMES;i++){
    try{
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
        // 提交事务
        channel.txCommit();
    }catch (Exception e){
        e.printStackTrace();
        // 事务回滚
        channel.txRollback();
    }
}

通过抓包进行分析

RabbitMQ_生产者_事务机制_批量发送_抓包分析图

通过抓包分析,能够知道,**channel(信道)**开启事务进行 **msg(消息)**发送, **msg(消息)**的发送是串行的。事务机制在发送一条 **msg(消息)**之后,发送端会进行阻塞,等待 RabbitMQ 的回应,才能够继续发送下一条 msg(消息)

事务机制的方式由于采用的阻塞的方式进行 **msg(消息)**发送, **msg(消息)**的发送是串行的,这会使 RabbitMQ 的性能严重下降。

# 事务机制能够解决 msg(消息)可靠投递的问题,但同时带来了性能的损耗,性价比不高,鉴于此,RabbitMQ 提供了另外一种较事务机制来说更轻量级的实现机制:发送确认机制

# 发送确认机制 确保 msg(消息)的可靠投递

相较于 事务机制 重量级的消息可靠投递,RabbitMQ 提供了发送确认机制 实现了轻量级的 **msg(消息)**的可靠投递。

针对 发送确认机制 在使用中有三种实现方式:

  • 普通 发送确认机制
  • 批量 发送确认机制
  • 异步 发送确认机制

# 普通 发送确认机制

# 使用 普通 发送确认机制 进行 msg(消息)的批量发送,关键代码

// 开启 confirm
channel.confirmSelect();
int preMsgListCount = 20;  // 准备批量发送消息数量
int pushedMsgCount = 0;     // 已经发送的消息数量
long startTime = System.currentTimeMillis();
        try{
            do{
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                // channel.waitForConfirms() 阻塞等待确认信息
                boolean confirmFlag = channel.waitForConfirms();
                System.err.println("RabbitMQ 消息是否已经收到 " + confirmFlag);
                ++pushedMsgCount;
            }while (pushedMsgCount < preMsgListCount);
            // TODO 根据 confirmFlag 进行相关业务处理
        }catch (Exception e){
            e.printStackTrace();
        }

# 通过抓包分析

普通 发送确认机制 抓包图

RabbitMQ_生产者_消息可靠投递_发送确认机制_普通发送图

对比前面事务机制 进行批量消息发送的抓包图:

RabbitMQ_生产者_消息可靠投递_发送确认机制_普通发送图

首先,普通 发送确认机制 和 事务机制 最大区别在于,事务机制 对 **msg(消息)**的发送是并行的,而 发送确认机制 是异步的,在发送一条 **msg(消息)**之后,能够在等待 msg(消息)确认的同时发送另外一条 msg(消息)。但是 事务机制普通 发送确认机制 在进行 msg(消息)确认时需要对每一条发送的消息进行确认,所以在性能上 普通 发送确认机制 对比 事务机制 实际上时差不多的, 而普通 发送确认机制 性能上会比 事务机制 好一丢丢的原因如下(通过查看抓包图能够得知):

  • 事务机制 完整发送一条 **msg(消息)**的流程,Basic.Publish -> Tx.Commit / Tx.Commit-Ok (或 Tx.Rollback / Tx.Rollbock-Ok)
  • 普通的发送确认机制 完整的发送一条 **msg(消息)**的流程:Basic.Publish ,Basic.Ack

两者对比,普通的发送确认机制事务机制 在每一条 msg(消息)的确认交互上减少了一次交互的性能损耗,所以性能上普通的发送确认机制事务机制 好一点。

下面是一张 普通的发送确认机制事务机制 在进行批量发送时的 QPS 对比图:

RabbitMQ_生产者_消息可靠投递_事务机制_普通发送确认机制_QPS对比图

虽然 普通的发送确认机制事务机制 在性能上有一定的提升,但是提升并不明显。

# 批量 发送确认机制

# 批量 发送确认机制 进行 msg(消息)发送 关键代码

int preMsgListCount = 20;  // 准备批量发送消息数量
int pushedMsgCount = 0;     // 已经发送的消息数量
long startTime = System.currentTimeMillis();
try{
    // 开启 发送确认机制
	channel.confirmSelect();
	do{
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		// TODO 将发送出去的消息存放到,指定容器中
        // 当消息全部发送完毕进行统一的批量确认
		if(++pushedMsgCount >= preMsgListCount){
			try{
				boolean confirmFlag = channel.waitForConfirms();
				System.err.println("RabbitMQ 消息是否已经收到 " + confirmFlag);
				if(confirmFlag){
					// TODO RabbitMQ 返回消息已经确认收到,清空容器中的缓存数据
				}
			}catch (Exception e){
				e.printStackTrace();
					// TODO 重新发送缓存中的数据
			}
		}
	}while (pushedMsgCount < preMsgListCount);
		// TODO 根据 confirmFlag 进行相关业务处理
}catch (Exception e){
	e.printStackTrace();
}

# 通过抓包进行分析

批量 发送确认机制抓包图:

RabbitMQ_生产者_消息可靠投递_发送确认机制_批量确认

对比 普通 发送确认机制 抓包图

RabbitMQ_生产者_消息可靠投递_普通发送_对比批量发送确认

普通 发送确认机制 进行 **msg(消息)**的批量发送 对比 批量 发送确认机制 进行 **msg(消息)**的批量发送。普通 发送确认机制 每发送一条 msg(消息),都需要进行一次确认,而 批量 发送确认机制 发送一批 msg(消息),确认一批。

从性能上来说,批量 发送确认机制 优于 普通 发送确认机制。但是 批量 发送确认机制 同样会存在问题,由于是一批一批 **msg(消息)**进行发送,然后一批一批进行确认,所以当 **msg(消息)**确认结果为失败时,需要对失败的这批数据进行重发,或者业务冲正等处理。如果出现频繁的失败,导致频繁的重发 msg(消息),反而会导致性能变得更低。

# 异步 发送确认机制

# 异步发送确认机制 进行 msg(消息)发送 关键代码

 // 开启 confirm
int preMsgListCount = 10;  // 准备批量发送消息数量
int pushedMsgCount = 0;     // 已经发送的消息数量
SortedSet<Long> sortedSet = new TreeSet<>();
long startTime = System.currentTimeMillis();
try{
	channel.confirmSelect();
    // 异步 Confirm
    channel.addConfirmListener(new ConfirmListener() {
    	@Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        	System.err.println("Nack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
            System.err.println(sortedSet.toString());
            if(multiple){
            	sortedSet.headSet(deliveryTag - 1).clear();
            }else{
            	sortedSet.remove(deliveryTag);
            }
		}
		@Override
		public void handleNack(long deliveryTag, boolean multiple) throws IOException {
			if(multiple){
				sortedSet.headSet(deliveryTag - 1).clear();
			}else{
				sortedSet.remove(deliveryTag);
			}
		}
	});
	do{
		long nextSeqNo = channel.getNextPublishSeqNo();
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		// TODO 将发送出去的消息存放到,指定容器中
		++pushedMsgCount;
		sortedSet.add(nextSeqNo);
	}while (pushedMsgCount < preMsgListCount);
}catch (Exception e){ e.printStackTrace(); }

# 通过抓包进行分析

异步 发送确认机制 抓包图

RabbitMQ_生产者_消息可靠投递_发送确认机制_异步确认抓包图

对比 批量 发送确认机制 抓包图

RabbitMQ_生产者_消息可靠投递_批量确认对比异步确认抓包图

异步 发送确认机制批量 发送确认机制从 AMQP 协议流转来说是一致的,都是批量的发送 msg(消息),批量的确认 msg(消息)。但是他们之间仍存在差异,批量 发送确认机制 可以通过代码控制一批 msg(消息)的发送和确认,这批数据要嘛成功,要嘛失败。而 异步 发送确认机制 一直进行 msg(消息)发送,确认的处理,根据 RabbitMQ 的 异步回调进行处理,回调的 deliveryTag 消息批次和数量,时间无法确认,此时就需要维护未确认消息缓存数据,根据每一批异步回调的消息批次,将维护的未确认消息缓存 数据根据维护的 deliveryTag 对比异步回调的消息批次,进行 **msg(消息)**的确认,如果失败则重发,成功则从缓存中清除表示 **msg(消息)**已经成功发送。

# 对比 三种方式实现的 发送确认机制 和 事务机制

# 对比性能指标 QPS

直接来看一张 三种 发送确认机制事务机制 的QPS 图

RabbitMQ_生产者_消息可靠投递_批量确认对比异步确认抓包图

从图中能够直观的看到,批量 发送确认机制异步 发送确认机制 在 RabbitMQ 服务正常运行的情况下,QPS 明显比 普通 发送确认机制普通 发送确认机制 高很多。

在开发应用中,**msg(消息)**的可靠投递到 RabbitMQ Borker(RabbitMQ 服务节点) 中,优先选择,批量确认和异步确认。

# 总结

# 前面文章开头抛出了一个问题:如何确保消息投递的可靠性?

回答一: 所谓 msg(消息)的可靠投递,就是要能够确定发送到 RabbitMQ Borker(RabbitMQ 服务节点)的消息,是“发送成功”还是“发送失败”。并且只有这两种情况,以确保业务处理的准确性,保证数据的一致性。

针对该问题,RabbitMQ 给出两大解决方案,4种选择:

# 事务机制

**msg(消息)**一条一条发送,一条一条确认

# 发送确认机制

  • 普通 发送确认机制(发送一条确认一条)
  • 批量 发送确认机制(发送一批,确认一批)
  • 异步 发送确认机制 (一直发送msg(消息),根据 RabbitMQ 发送过来的确认信息,一批一批进行确认)

不过单纯通过 事务机制 和 发送确认机制 还不能保证消息投递的万无一失

通过 “事务机制” 或者 “发送确认机制” 只能保证 msg(消息) 到 **Exchange(交换器)**的可靠投递。换句话说, msg(消息) 到达 **Exchange(交换器)之后 事务机制 或者 发送确认机制 就认定该 msg(消息) 已经有效投递了,如果 msg(消息)Exchange(交换器)**无法正确进行路由并落入队列中,消息仍然会丢失且客户端收到的是确认的信息。这时候就需要通过 AE(备胎交换器) 或者 通过 mandatory 参数 来进一步提高消息投递的可靠性,也就是文章开头提到的三点保障的第二点:msg(消息)的正确路由

精彩内容推送,请关注公众号!
最近更新时间: 3/26/2020, 12:09:24 PM