写于:2019-09-12 22:52:37

参考资料:《RabbitMQ实战指南》 RabbitMQ 官网

# 回顾

RabbitMQ【理论篇】消息的可靠投递-消息路由 一文中,抛出了一个常见面试问题:

# 如何确保 msg(消息)投递的可靠性?

在文末总结 RabbitMQ 提供的2大解决方案:事务机制发送确认机制

不过单纯通过 “事务机制” 和 “发送确认机制” 还不能保证消息投递的万无一失事务机制发送确认机制 只保证 **msg(消息)**到 **RabbitMQ Borker(RabbitMQ 服务节点)Exchange(交换器)**的可靠性。如果 **msg(消息)**正确送达 Exchange(交换器),但是路由到 **Queue(队列)**过程中出现问题,发送端得到了确认的消息,但是 **msg(消息)**却丢失了,这会导致业务系统相关数据的不一致,使得系统出现业务问题。

在回答如何解决 msg(消息)Exchange(交换器) 之后的路由丢失问题前,先来了解一下 RabbitMQ 中 Exchange(交换器) 相关概念。

# 开发中常见的 Exchange(交换器)类型

# fanout Exchange(广播类型 Exchange)

fanout 是一个广播类型的交换器。他会将msg(消息)路由到所有与他绑定队列中。

RabbitMQ_交换器类型_fanout图

图中:到达 fanout Exchange 的消息,所有与 fanout Exchange 有 binding 关系的 **Queue(队列)**都能够收到该 msg(消息)

# direct Exchange(完全匹配 Exchange)

英文中 “direct” 是直连的意思,在 RabbitMQ 中 “direct Exchange” 可以理解为 “完全匹配交换器”。只有当 routing keybinding key 完全匹配时,才会将**msg(消息)**路由到 **Queue(队列)**中。

RabbitMQ_交换器类型_direct图

图中:msg(消息)的 routing key(路由键)为 “wtf”,由于是完全匹配规则,只有 binding key(绑定键)为 “wtf” 的 Queue(队列)才能收到该msg(消息)。

# topic Exchange (模糊匹配 Exchange)

英文中 "topic" 是主题的意思,在 RabbitMQ 中 “topic Exchange” 可以接理解为 “模糊匹配交换器”, topic Exchange 也是通过 routing key 匹配 binding key 实现msg(消息)路由。与 direct Exchange 不同,direct Exchange 是完全匹配,而 topic Exchange 是模糊匹配。 针对 topic Exchange 的模糊匹配功能,RabbitMQ 提供了相关的匹配规则: 1、以单词作为匹配区分,单词个数的判定以 英文 句号"."作为区分。例如:top.qguofeng.www 表示 3 个单词,这三个单词为:“top”,“qguofeng”,“www” 2、模糊字符:* 号。表示一个任意单词的占位。 3、模糊字符:# 号。表示0个或者多个任意单词的占位。

RabbitMQ_交换器类型_topic图

图中:msg(消息)的 routing key(路由键)为 “wtf”,匹配方式是模糊匹配,只有 binding key(绑定键)为 “wtf” 或者 “wtf.#” 或者 “#.wtf” 的 Queue(队列)能够收到该 msg(消息)

# msg(消息) 被 Exchange(交换器)路由到 Queue(队列)过程中丢失了,该如何处理?

# 方案一:当 msg(消息)没有办法被正确路由,通知 producer(生产者) 进行处理

# 方案

# 操作

producer(生产者)在进行 msg(消息) 发送前

  • 指定参数 mandatory
  • 添加 ReturnListener 监听器

# 效果

当 Exchange(交换器)没办法把 msg(消息)正确路由到 Queue(队列)时,RabbitMQ 将消息返回给 producer(生产者),produer(生产者)通过发送前自定的 ReturnListener 进行处理。

Exchange(交换器)无法正确路由:当前 msg(消息)的 routing key(路由键)没有与之对应的 binding key(绑定键)时,Exchange(交换器)无法正确路由 msg(消息)。

# 实验验证

# producer(生产者)关键代码

RabbitMQ_生产者_消息路由何去何从_mandatory参数设定_关键生产者代码图

代码操作:模拟不存在的 binding key(绑定键),触发 RabbitMQ 回调操作。

将 msg(消息)路由到 默认的 Exchange(交换器),对应 binding key(绑定键)为空串的队列中。

同时按照方案的步骤

  • 设置 参数 mandatory = true
  • 定义 ReturnListener,当 msg(消息)无法正确路由时的处理逻辑。

# 运行结果

RabbitMQ_生产者_mandatory参数设定_控制台打印信息图

由于:

  • 1、没有与 routing key(路由键) 匹配的 binding key(绑定键)。
  • 2、msg(消息)发送设定了 mandatory = true
  • 3、针对 RabbitMQ 的回调,定义了 ReturnListener 进行处理

所以: 控制台中打印了 ReturnListener 针对无法正确路由的 msg(消息)的处理结果。

# 抓包来看看相关消息流转

RabbitMQ_生产者_mandatory参数设定_消息流转_抓包图

在图中能够看到,当 msg(消息)无法被正确路由时,RabbitMQ 会发送 Basic.Return 指令,携带相关 msg(消息)信息通知到 producer(生产者)进行处理。

# 该方案存在的问题

该方案能够进一步提高 msg(消息)投递的可靠性,但是需要在处理 msg(消息)发送逻辑的同时,针对发送失败的 msg(消息)进行补偿等操作,增大了代码的复杂性。

# 方案二:当 msg(消息)没有办法被正确路由,把 msg(消息)交给另一个 Exchange(交换器)( AE(备胎交换器) )进行处理

# 方案

# 操作

producer(生产者)在发送消息前,除了指定消息路由的 Exchange(交换器)

  • 还需指定 AE(备胎交换器)

# 效果

当 Exchange(交换器)无法正确路由 msg(消息),将 msg(消息)转给 AE(备胎交换器)去处理。

# 实验验证

# producer(生产者)关键代码

RabbitMQ_生产者_exchage备胎_声明exchange同时指定_生产者关键代码图

在声明 Exchange(交换器)的同时指定 AE(备胎交换器)。

在指定 AE(备胎交换器),当 msg(消息)无法被 Exchange(交换器)正确路由会将 msg(消息)交给 AE(备胎交换器)进行处理。而此时 AE(备胎交换器)路由的 msg(消息)对应的 routing key(路由键)不变。 最好将 AE(备胎交换器)类型设定为 fanout Exchange ,这样就不用关心 msg(消息)的 routing key(路由键)问题。

# 运行结果

RabbitMQ_生产者_exchage备胎_声明exchange同时指定_RabbitMQ_web界面图

模拟了无法正确路由的 msg(消息),所以 msg(消息)被交给 AE(备胎交换器)进行处理,AE(备胎交换器)将消息路由到了 UNROUTE_BINDING_KEY Queue(队列)中。

# 该方案存在问题

  • 1、如果 AE(备胎交换器)不存在,不会任何报错,无法路由的消息直接丢失。
  • 2、如果 AE(备胎交换器)存在,但是没有绑定任何队列,不会有任何报错,无法路由的消息直接丢失
  • 3、如果 AE(备胎交换器)存在,同时存在绑定的队列,但是没有对应的队列匹配,不会有任何报错,无法路由的消息直接丢失。(如果使用 fanout Exchange 就不会有这个问题)
  • 4、当 AE(备胎交换器) 和 mandatory=true(以及 ReturnListener) 同时存在时,无法路由的消息不会返回给生产者,直接发给 AE(备胎交换器)。

# 总结

# 围绕 如何确保 msg(消息)投递的可靠性? 进行总结

# 1、使用 发布确认机制 或者 事务机制 保证 msg(消息)到 Exchange(交换器)投递的可靠性。

友链:RabbitMQ【理论篇】消息的可靠投递-消息路由

# 2、通过 AE(备胎交换器) 或者 设定 mandatory = true ,对 Exchange(交换器)无法正确路由的 msg(消息)进行处理。

# 3、通过持久化 Exchange(交换器)和 Queue(队列)保证在 RabbitMQ 出现故障重新启动的时候,msg(消息)仍然存在。

除了以上的方案,还可以在运维层面,增加 镜像队列 进一步提升 msg(消息)投递的可靠性。(Queue(队列)启用了持久化,但是在操作系统层面,msg(消息)会先存入操作系统缓冲区中,然后再刷入磁盘中,msg(消息)真正的持久化存在时间延迟)

精彩内容推送,请关注公众号!
最近更新时间: 3/26/2020, 12:09:24 PM