写于:2019-08-29 22:52:37

参考资料:《RabbitMQ实战指南》

# RabbitMQ 工作模型

RabbitMQ工作模型图

# 通过上图熟悉 RabbitMQ 相关概念。

在开发中,当我们将 RabbitMQ 作为系统的组成部分时,程序与 RabbitMQ 之间的交互,包括 RabbitMQ 自身的处理,可以分为三个主体:生产者,RabbitMQ 自身,消费者。

下面以这三个主题进行展开,介绍 RabbitMQ 涉及到的相关概念。

# 首先先来看看: RabbitMQ 本身。

第一个概念:“RabbitMQ Borker”。可以将其理解为,就是一个运行的 RabbitMQ 服务节点。该服务节点(RabbitMQ Borker)主要提供三个功能:接收消息,存储消息,推送消息。

消息进到 "RabbitMQ Borker" 之后,第一个到达的节点叫做 Exchange(交换机)。之后 Exchange(交换机)再把消息路由到 Queue(队列)中对数据进行策略保存。

那么 Exchange (交换器) 和 Queue(队列)之间是怎么关联的?

这就扯出了另一个概念: binding key (绑定键)。Exchange(交换器)需要和 Queue(队列)建立绑定关系,Exchange(交换器)才能根据需求将消息路由到指定 Queue(队列)中进行保存操作。而他们之间关系的维系标识就是 binding key (绑定键)。

在 RabbitMQ Borker (RabbitMQ 服务节点)中还有一个重要概念: virtual host(虚拟机)。可以将之理解为名称空间,其存在的意义就是共享同一个 RabbitMQ Borker(RabbitMQ 服务节点)中的资源,且不同 virtual host(虚拟机)中的资源又是相互隔离的。 以此来实现,不同业务场景间在实现数据隔离的同时,最大限度的利用资源。

# 再来看看:消息的生产者

消息的生产者也有一个名词 "Producer"。在程序中 Producer(生产者)可以直接看做为发送消息给 RabbitMq Borker(RabbitMQ 服务节点)的代码块。

Producer(生产者) 存在的目的,就是发送 msg(消息)给 RabbitMQ Borker(RabbitMQ 服务节点)的。 从上面我们知道了,RabbitMQ Borker(RabbitMQ 服务节点)对于 msg(消息)的处理是从 Exchange(交换器),然后到 Queue(队列)的过程。

这时候就衍生出了一个问题:RabbitMQ Borker(RabbitMQ 服务节点)怎么知道那一条 msg(消息),要经过哪个 Exchange(交换器),最后到 Queue(队列)中?

这时候就需要由 Producer(生产者) 在生产 msg(消息) 的时候指定 msg(消息)的归属地。于是 msg(消息)自然而然的被分为了两个部分:playload(载体)和 label(标签)。

其中:playload(载体)用来存放 Producer(生产者)需要业务处理的数据,而 label(标签)用来存放 msg(消息)在 RabbitMQ Borker(RabbitMQ 服务节点)中如何被路由到正确的 Queue(队列)等相关参数信息。

label(标签)中存放的关键信息主要是两个:一是:Exchange(交换器)名称,二是:Routing Key(路由键)。Excnahge(交换器)名称 用来指定 msg(消息)被送到 RabbitMQ Borker(RabbitMQ 服务节点)的那个 Exchange(交换器)中,而 Routing KEy (路由键)其实就是 Binding Key(绑定键)。在 RabbitMQ Borker(RabbitMQ 服务节点)里,称之为 Binding Key(绑定键)用来标识 Exchange(交换器)与 Queue(队列)之间的绑定关系,在 msg(消息)中的 label(标签)中用来表示 该 msg(消息)在送达指定 Exchange(交换器)之后,Exchange(交换器)需要将该 msg(消息)送往的 Queue(队列)位置。

# 最后就是:消息的消费者

消费者也就是 Consumer ,其本身就是用来接收 RabbitMQ Borker(RabbitMQ 服务节点)消息,然后进行消息处理的代码块。

# 通过上面相关概念的介绍,RabbitMQ 的工作流程也就变得清晰了

下面通过一个简单的流程来简述一下,RabbitMQ 的工作流程

流程简化图如下:

RabbitMQ工作模型图简图

  • 1、生产者,生产消息(消息中包含有:playload 和 label),通过 TCP 链接中的 channel 通道将消息发送给 RabbitMQ Broker 中。

  • 2、RabbitMQ Broker 根据消息中 label 中的 交换器信息,将 msg 送往指定交换器。

  • 3、exchange 交换器收到 msg 之后,获取 label 中对应的 routing key 信息,然后获取所有与之建立绑定关系队列的关系证明 binding key 。通过匹配 routing keybinding key ,将 msgplayload 送到目标队列中。

  • 4、消费者,实时监听某个队列,当队列中有 消息存入(msg 中的 playload),从队列中取到数据,然后消费。

# RabbitMQ 运转流程

# RabbitMQ 运转流程

# 消息发送流程

1、Producer(生产者)与 RabbitMQ Borker(RabbitMQ 服务节点)建立 Connection(连接)开启 channel(信道) 2、Producer(生产者)声明 Exchange(交换器),Queue(队列),以及 binding key (绑定键) 3、Producer(生产者)生产 msg(消息),包括:palyload(载体)、label(标签)。然后发送消息 4、RabbitMQ Boker(RabbitMQ 服务节点)根据 msg(消息)中 label(标签)中的 Exchange(交换器)名称 和 Routing key(路由键)进行消息路由。(处理细节不展开) 5、关闭 channel(信道)和 Connection(连接)

# 消息接收流程

1、Consumer(消费者)与 RabbitMQ Borker(RabbitMQ 服务节点)建立 Connection(连接)开启 channel(信道) 2、Consumer(消费者)监听某个 Queue(队列) 3、Consumer(消费者)主动拉取,或者被动接收推送的信息进行消费,并推送 Ack 确认消费到 RabbitMQ Borker(RabbitMQ 服务节点) 4、RabbitMQ Boker(RabbitMQ 服务节点) 收到 Ack 确认,将消息从 Queue(队列)中删除。 5、关闭 channel(信道)和 Connection(连接)

# 代码演示:生产者-消息发送运转流程

# 生产者测试代码

public class RabbitMQProducer {
    private final static String QUEUE_NAME = "CASE_QUEUE";
    private final static String HOST = "192.168.2.154";
    private final static Integer PORT = 5672;
    private final static String VIRTUAL_HOST = "/wtf_host";
    private final static String USERNAME = "wtf_user";
    private final static String PASSWORD = "wtf_pwd";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost(HOST);
        // 连接端口
        factory.setPort(PORT);
        // 虚拟机
        factory.setVirtualHost(VIRTUAL_HOST);
        // 用户
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();
        String msg = "RabbitMQ Producer";
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送消息(不指定交换器,发送到默认交换机AMQP Default,Direct)
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 
        channel.close();
        conn.close();
    }
}

# 生产者运转流程图

下面是一张 producer(生产者)与 RabbitMQ Borker(RabbitMQ 服务节点)之间的消息发送抓包图。

RabbitMQ_Producer流转过程抓包图

其对应的运转图如下:

RabbitMQ_Producer流转过程

通过抓包,获取此次消息发送的内容,及参数信息

RabbitMQ_Producer流转过程抓包图信息图

从图中可以看到,执行消息发送的:Basic.Publish 的操作中包含的信息:Routing key(路由键),Exchange(交换器)名称,playload(消息载体),Channel(信道)编号,Mandatory 参数等信息。

# 代码演示:消费者-消息接收运转流程

# 消费者测试代码

public class RabbitMQConsumer {

    private final static String QUEUE_NAME = "CASE_QUEUE";
    private final static String HOST = "192.168.2.154";
    private final static Integer PORT = 5672;
    private final static String VIRTUAL_HOST = "/wtf_host";
    private final static String USERNAME = "wtf_user";
    private final static String PASSWORD = "wtf_pwd";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost(HOST);
        // 连接端口
        factory.setPort(PORT);

        // 用户
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 虚拟机
        factory.setVirtualHost(VIRTUAL_HOST);
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();
        // 声明队列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8"); 
                System.out.println("Received message : '" + msg + "'"); }
        };
        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
        System.in.read();
        channel.close();
        conn.close();
    }
}

# 消费者运转流程图

下面是一张 Consumer(消费者)与 RabbitMQ Borker(RabbitMQ 服务节点)之间的消息发送抓包图。

RabbitMQ_consumer流转过程抓包图

其对应的运转图如下:

RabbitMQ_Consumer流转过程

通过抓包获取此次消息消费的相关内容和参数信息

RabbitMQ_Consumer接收消息信息图

# RabbitMQ 常见的应用场景

# 跨系统间的通信

如:存在一个旧有的跑了很久的老代码。此时,搭建一个新的 Saas 平台,作为服务提供商,旧有的老系统需要作为其中一个服务纳入到新系统平台中,且业务需求上他们之间的用户是存在关联关系的。 这两个系统除了业务上需要关联,其他都是分离,包括数据库等。这时候,两个平台的用户信息变更,如:创建,更新等,都需要进行信息同步。这时候消息中间件就派上了用场。

在旧有的系统中加入 RabbitMQ 组件,在用户信息发生变更时,将变更信息可靠的投递到 RabbitMQ Borker(RabbitMQ 服务节点)中,由新的系统监听消息 Queue(队列),然后确保消息的消费,同步更新用户信息,以此达到跨系统间的通信。

# 保持消息的最终一致性

解决分布式事务的方案之一,Base 理论的提出,通过数据的最终一致性解决分布式事务问题,而事务的最终一致性可以通过 RabbitMQ 来完成。分布式系统间的事务处理,通过消息的发送和订阅来实现,当出现事务需要回滚的场景,同样通过消息的发送和订阅来实现业务的补偿,最终实现数据的最终一致性问题。

友链:谈谈分布式事务

# 基于 Pub/Sub 模型实现事件驱动

在例如:电商,金融交易等平台中,通常都存在 “订单”。如:电商中,用户下单之后并不一定会马上付款,但是此时商品资源被锁定,这时候就需要在规定时间内对未支付的订单进行取消的操作,同时释放商品资源。如:金融交易平台。平台中的金融产品需要提前预约,预约时会进行资源锁定,锁定产品份额等资源。在客户在规定时间内未进行交付,需要自动取消预约订单,同时释放额度。 这时候可以通过 RabbitMQ 实现延迟队列的方式来处理。通过设定如:30分钟,未交付或者未付款的订单 进行取消订单,释放额度处理。

友链:Rabbitmq延迟队列-实现任务延迟处理

# 同步请求异步化,提升响应速度

之前做的某个平台,偶尔会做个小活动。平时服务器好好的,但是一旦做活动,服务响应延迟明显提高。但是升级服务配置不划算。所以直接在原有 配套的RabbitMQ 服务中划出一个新的 virutal host 空间用来做这类消息处理。通过业务需求配合,所有活动处理请求都进入到队列中,直接返回客户端中间状态值,待消息消费完成,以通知的方式推送处理结果。通过RabbitMQ 将同步请求异步化,提升系统响应速度。

精彩内容推送,请关注公众号!
最近更新时间: 3/26/2020, 12:09:24 PM