写于:2019-01-14 23:00:37 待更新

# 介绍

Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions. A key idea is that the Bus is like a distributed Actuator for a Spring Boot application that is scaled out, but it can also be used as a communication channel between apps. Starters are provided for an AMQP broker as the transport or for Kafka, but the same basic feature set (and some more depending on the transport) is on the roadmap for other transports.

Spring Cloud Bus 是一个链接分布式系统节点的轻量级的消息代理。 它可以广播状态变更或者配置变更或者管理指令。 Spring Cloud Bus 的一个关键概念是他像Spring Boot 分布式执行机构(Actuator)的一个扩展,它能够让各个应用之间建立通信通道。

什么是消息代理?

消息代理:一种消息验证、传输、路由的架构模式。 消息代理是一个中间件产品,他的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。

# 模拟场景:配置文件的实时更新

通过更新远程 git 仓库中的配置文件信息,同时更新实例的配置信息

12-bus-config

准备环境: 服务配置:config-server 参考:【04】-Spring-cloud-Config配置加密传输 config-cliet 同上 Eureka 参考:【06】-Service-Discovery-Eureka

# 主要配置

Config-server

  • Config-server pom 文件追加

    <!-- bug-amqp -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>
    
    <!-- actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
  • Config-server 配置文件 properties 追加

    ## Rabbit Mq 配置
    spring.rabbitmq.host    = 192.168.2.154
    spring.rabbitmq.port    = 5672
    spring.rabbitmq.username= guest
    spring.rabbitmq.password= guest
    
    ## 关闭安全管理
    management.security.enabled = false
    

Config-Client

  • Config-client pom 文件追加(同 Config-Server)

    <!-- bug-amqp -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>
    
    <!-- actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
  • Config-Client 配置文件 properties 追加

    ## Rabbit Mq 配置
    spring.rabbitmq.host    = 192.168.2.154
    spring.rabbitmq.port    = 5672
    spring.rabbitmq.username= guest
    spring.rabbitmq.password= guest
    
  • 新增测试接口:PropertiesDemoController

    /**
     * @Description <p>尝试获取配置文件信息</p>
     * @Author QGUOFENG
     */
    @RestController
    @RequestMapping("/config/property")
    public class PropertiesDemoController {
    
        @Autowired
        private Environment environment;
    
        @GetMapping("/by-name")
        public String getPropertyByName(@RequestParam("name") String name){
            return environment.getProperty(name);
        }
    }
    

远程配置文件

  • 增加属性

    bus-name=qguofeng-bus
    

测试一:使用 Actuator 端点进行远程配置文件动态更新 前提条件:每个 Config-Client 需要关闭安全管理,引入 Actuator 依赖

  • 启动项目 Eureka 启动:端口号 -> 1111 Config-Server 启动:端口号 -> 28080 Config_client 启动两个服务:端口号 -> 18080 和 18081

  • 分别访问:Config -client 的两个项目,得到结果一致

    请求链接: http://localhost:18080/config/property/by-name?name=bus.name http://localhost:18081/config/property/by-name?name=bus.name

    qguofeng-bus
    
  • 修改远程配置文件属性

    bus-name=qguofeng-bus-change
    
  • 访问 Config-Client 18081 中的 /refresh

    [
        "config.client.version",
        "bus.name"
    ]
    
  • 再一次分别访问:Config -client 的两个项目,得到结果不一致

    http://localhost:18081/config/property/by-name?name=bus.name

    qguofeng-bus-change
    

    http://localhost:18080/config/property/by-name?name=bus.name

    qguofeng-bus
    

需要同时更新多个项目,需要针对每个项目:/refresh 进行访问,刷新配置信息

测试二:通过 BUS 总线进行远程配置文件动态更新

  • 启动项目 Eureka 启动:端口号 -> 1111 Config-Server 启动:端口号 -> 28080 Config_client 启动两个服务:端口号 -> 18080 和 18081

  • 分别访问:Config -client 的两个项目,得到结果一致

    请求链接: http://localhost:18080/config/property/by-name?name=bus.name http://localhost:18081/config/property/by-name?name=bus.name

    qguofeng-bus
    
  • 修改远程配置文件属性

    bus-name=qguofeng-bus-change
    
  • 请求 Config-Server 的 bus/refresh 接口 请求链接: http://localhost:28080/bus/refresh/

  • 重新分别访问:Config-Client 两个项目,得到结果一致请求链接: http://localhost:18080/config/property/by-name?name=bus.name http://localhost:18081/config/property/by-name?name=bus.name

    qguofeng-bus-change
    

# 归纳

在使用了Spring Cloud Config 进行远程配置管理是,Spring Cloud 提供了两种动态更新配置文件的功能。

  • 第一种:依赖于 Spring-Boot-Actuator 模块的中的 /refresh 存在的问题 针对每个应用,需要关闭安全管理:management.security.enabled = false 在动态更新配置文件时,需要针对每个 服务实例执行操作,增加运维成本

  • 第二种:依赖于 spring-cloud-starter-bus-amqp 模块中的 /bus/refresh

    其原理就是通过消息中间件进行消息的发送和消费。 当 Config-Server 执行 /bus/refresh 操作时,能够通知到所有在总线中的应用,进行配置文件的更新。 相比较于第一种,在较大的企业环境中能够降低运维成本,并且在开发操作中,只需要 配置 mq 的 地址,账号密码 ,而在大多企业开发中,消息中间件一般为必备之一,并不会提高开发成本。

# Spring Cloud Bus 工作原理

通过官网我们可以知道 Spring Cloud Bus 采用的是事件驱动机制 结合 中间件消息发送同步消息完成 一键触发所有实例的远程配置文件更新。

# 事件驱动机制工作流程解析

  • 入口:/bus/refresh (RefreshBusEndpoint ,BusEndpoint)

    @ConfigurationProperties(prefix = "endpoints.bus", ignoreUnknownFields = false)
    public class BusEndpoint extends AbstractEndpoint<Collection<String>> {
    
    	public BusEndpoint() {
    		super("bus");
    	}
    
    	@Override
    	public Collection<String> invoke() {
    		return Collections.emptyList();
    	}
    }
    
    @ManagedResource
    public class RefreshBusEndpoint extends AbstractBusEndpoint {
    
    	public RefreshBusEndpoint(ApplicationEventPublisher context, String id,
    			BusEndpoint delegate) {
    		super(context, id, delegate);
    	}
    
    	@RequestMapping(value = "refresh", method = RequestMethod.POST)
    	@ResponseBody
    	@ManagedOperation
    	public void refresh(
    			@RequestParam(value = "destination", required = false) String destination) {
    		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
    	}
    
    }
    

    两个组合,形成了 /bus/refresh 请求接口 从定义上来看,存在一个 endpoint.bus 的属性来配置 访问路径,但是在调用父类的时候,却写死了

    通过访问 /bus/fresh 实际上是生成一个新的事件源,然后被 监听器 RefreshListener 监听到进行操作

  • 事件源:RemoteApplicationEvent (AckRemoteApplicationEvent 其实现)

    
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
    @JsonIgnoreProperties("source")
    public abstract class RemoteApplicationEvent extends ApplicationEvent {
    	private static final Object TRANSIENT_SOURCE = new Object();
    	private final String originService;
    	private final String destinationService;
    	private final String id;
    
    	protected RemoteApplicationEvent() {
    		// for serialization libs like jackson
    		this(TRANSIENT_SOURCE, null, null);
    	}
    
    	protected RemoteApplicationEvent(Object source, String originService,
    			String destinationService) {
    		............................................
    	}
    
    	protected RemoteApplicationEvent(Object source, String originService) {
    		this(source, originService, null);
    	}
    
    	..........get/set..................
    
    	@Override
    	public int hashCode() {
    		...............................
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		...............................
    	}
    }
    
  • 事件监听器(RefreshListener

    public class RefreshListener
    		implements ApplicationListener<RefreshRemoteApplicationEvent> {
    
    	private static Log log = LogFactory.getLog(RefreshListener.class);
    
    	private ContextRefresher contextRefresher;
    
    	public RefreshListener(ContextRefresher contextRefresher) {
    		this.contextRefresher = contextRefresher;
    	}
    
    	@Override
    	public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
    		Set<String> keys = contextRefresher.refresh();
    		log.info("Received remote refresh request. Keys refreshed " + keys);
    	}
    }
    
  • 监听调用的 方法 refresh()

    public synchronized Set<String> refresh() {
            Map<String, Object> before = this.extract(this.context.getEnvironment().getPropertySources());
            this.addConfigFilesToEnvironment();
            Set<String> keys = this.changes(before, this.extract(this.context.getEnvironment().getPropertySources())).keySet();
            this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
            this.scope.refreshAll();
            return keys;
        }
    

    事件 嵌套事件,最终完成的功能是重新加载 Spring 的环境 Environment 相当于重新加载配置文件

那么知道了 /bus/refresh 如何重新加载配置文件,那么 bus 总线又是如何进行消息推送,让所有的实例节点进行配置文件的同步更新的?

通过查看 BusAutoConfiguration 我们可以得到结果:

  • BusAutoConfiguration

    ......
    @Autowired
    @Output(SpringCloudBusClient.OUTPUT)
    private MessageChannel cloudBusOutboundChannel;
    
    @EventListener(classes = RemoteApplicationEvent.class)
    public void acceptLocal(RemoteApplicationEvent event) {
        if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) {
            this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
        }
    }
    

    监听 RemoteApplicationEvent 然后通过消息中间件 发送事件源 RemoteApplicationEvent

  • 消费信息 :同样在 BusAutoConfiguration

    @StreamListener(SpringCloudBusClient.INPUT)
    public void acceptRemote(RemoteApplicationEvent event) {
    	if (event instanceof AckRemoteApplicationEvent) {
    		if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
    			&& this.applicationEventPublisher != null) {
    			this.applicationEventPublisher.publishEvent(event);
    		}
    		// If it's an ACK we are finished processing at this point
    		return;
    	}
    	if (this.serviceMatcher.isForSelf(event)
    			&& this.applicationEventPublisher != null) {
    		if (!this.serviceMatcher.isFromSelf(event)) {
    			this.applicationEventPublisher.publishEvent(event);
    		}
    		if (this.bus.getAck().isEnabled()) {
    			AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
    					this.serviceMatcher.getServiceId(),
    					this.bus.getAck().getDestinationService(),
    					event.getDestinationService(), event.getId(), event.getClass());
    			this.cloudBusOutboundChannel
    					.send(MessageBuilder.withPayload(ack).build());
    			this.applicationEventPublisher.publishEvent(ack);
    		}
    	}
    	if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
    		// We are set to register sent events so publish it for local consumption,
    		// irrespective of the origin
    		this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
    				event.getOriginService(), event.getDestinationService(),
    				event.getId(), event.getClass()));
    	}
    }
    

综上所述:Bus 通过 事件驱动模型 + 消息中间件 的形式完成了,自身远程配置文件更新的同时,发送消息,其他实例通过消费实例完成所有实例的远程配置文件更新。可以得知,远程配置文件的动态更新其实就是重新加载 Spring Environment ,实现了远程配置文件实时更新的。

精彩内容推送,请关注公众号!
最近更新时间: 4/10/2020, 10:09:47 PM