热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Hoxton.SR1版本】SpringCloudStream消息驱动

目录一、简介二、搭建消息生产者端三、搭建消息消费者端四、消息重复消费问题五、消息持久化六、总结一、简介在实际项目中,服务与服务之间的通信往往我们会采用消

目录

一、简介

二、搭建消息生产者端

三、搭建消息消费者端

四、消息重复消费问题

五、消息持久化

六、总结




一、简介

在实际项目中,服务与服务之间的通信往往我们会采用消息中间件方式来处理,比如引入RabbitMQ、Kafka等,但这会有一个问题,就是我们的应用程序跟消息中间件耦合在一块了,还有就是如果我们要替换为Kafka,那么变动会比较大,Spring Cloud官网提供了Spring Cloud Stream组件,用来给我们整合消息中间件,Spring Cloud Stream底层屏蔽了消息中间件的差异,降低了切换成本,统一消息的编程模型,这样就可以降低我们系统和消息中间件的耦合度。总结一句话:就是Spring Cloud Stream有利于应用程序与消息中间件的解耦。

 


  • Spring Cloud Stream是什么?

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs和outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅,消费组、分区的三个核心概念。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

 


  • Spring Cloud Stream官方文档地址

https://spring.io/projects/spring-cloud-stream

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.6.RELEASE/reference/html/

 


  • Spring Cloud Stream官网结构图

可以看到,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。


  • Spring Cloud Stream编码API和常用注解

  1. Middleware:中间件,目前只支持RabbitMQ和Kafka。
  2. Binder:Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便地连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
  3. @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
  4. @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
  5. @StreamListener:监听队列,用于消费者的队列的消息接收。
  6. @EnableBinding:指信道channel和exchange绑定在一起。

  • Stream几个重要概念

  1. Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder;
  2. Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建);
  3. Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信;

二、搭建消息生产者端

新建一个module【springcloud-stream-rabbitmq-provider8801】

【a】pom.xml:引入spring-cloud-starter-stream-rabbit依赖


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-provider8801org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-testtest

【b】application.yml:相关配置都在配置文件里面做了较详细的说明

server:port: 8801
spring:application:name: springcloud-stream-rabbitmq-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # rabbitmq相关环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理output: # 输出通道,表示消息生产方destination: rabbitmq_stream_exchange # 指定输出的交换器名称content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx中的xxx绑定器名称对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class SpringCloudStreamMQServiceApplicaiton8801 {public static void main(String[] args) {SpringApplication.run(SpringCloudStreamMQServiceApplicaiton8801.class, args);}
}

【d】定义消息发送的接口

/*** @Description 消息发送接口* @Date 2020/8/27 21:37* @Author weishihuai* 说明:*/
public interface IMessageProvider {/*** 发送消息*/String sendMessage();
}

消息发送实现类:

package com.wsh.springcloud.service.impl;import com.wsh.springcloud.service.IMessageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;/*** @Description 消息发送实现类* @Date 2020/8/27 21:38* @Author weishihuai* 说明: @EnableBinding表示信道channel和exchange绑定在一起.*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {private static final Logger logger = LoggerFactory.getLogger(MessageProviderImpl.class);/*** 消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String sendMessage() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());logger.info("消息发送者发送消息: {}", uuid);return "消息发送者发送消息: " + uuid;}}

 【e】定义消息发送Controller

package com.wsh.springcloud.controller;import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Description 消息发送测试Controller* @Date 2020/8/27 21:39* @Author weishihuai* 说明:*/
@RestController
public class SendMessageController {@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage() {return messageProvider.sendMessage();}}

【f】测试

启动Eureka注册中心以及消息驱动发送方服务,浏览器访问:http://localhost:8801/sendMessage 测试发送消息,然后我们去RabbitMQ界面观察流量情况:

注意:下图中的rabbitmq_stream_exchange就是我们在application.yml中指定的将消息输出到哪个desitination交换机上面。

观察后台日志:

 可见,消息成功发送到MQ中,正在等待消费方进行消费消息,至此,消息发送者端搭建成功,接下来搭建消息消费方服务。


三、搭建消息消费者端

新建module【springcloud-stream-rabbitmq-consumer8802】

【a】pom.xml


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-consumer8802org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-actuatororg.springframework.bootspring-boot-starter-testtest

【b】applicaiton.yml

server:port: 8802
spring:application:name: springcloud-stream-rabbitmq-consumercloud:stream:binders:rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理input: # 输入通道,表示消息消费方destination: rabbitmq_stream_exchange # 指定接收的交换器名称,需与消息发送方的destination对应上content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx对应中的xxx对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class RabbitMQStreamServiceApplication8802 {public static void main(String[] args) {SpringApplication.run(RabbitMQStreamServiceApplication8802.class, args);}
}

【d】新增接收消息发送方发送消息的方法

package com.wsh.springcloud.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** @version V1.0* @ClassName: com.wsh.springcloud.controller.ReceiveMessageController.java* @Description: 接收消息发送方发送的消息* @author: weishihuai* @date: 2020/8/28 10:55*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageController.class);@Value("${server.port}")private String serverPort;/*** 接收消息发送方发送的消息** @param message 消息* @StreamListener 通过@StreamListener注解来监听exchange中的消息*/@StreamListener(Sink.INPUT)private void receiveMessage(Message message) {String payload = message.getPayload();logger.info("消息接收方接收消息: {}, 服务端口号:{}", payload, serverPort);}}

 【e】测试

启动消息消费端,浏览器访问两次:http://localhost:8801/sendMessage 模拟发送两条消息到RabbitMQ中,查看消费者端是否成功消费此消息。

下图是消息发送方的日志:

下图是消息消费方的日志:

由此可见,成功实现了消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息:


四、消息重复消费问题

为了模拟消息重复消费的问题,这里我们还需要一个消息消费端,所以我们新建一个module【springcloud-stream-rabbitmq-consumer8803】,此子模块跟【springcloud-stream-rabbitmq-consumer8802】除了端口号,其他一模一样,这里不再过多阐述。

启动8803消费者和8802消费者,浏览器访问两次:http://localhost:8801/sendMessage,模拟发送两条消息。

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

由此可见,同一条消息同时被两个消费者处理,这是不对的。

比如在如下场景中,假如订单服务调用支付服务,支付服务我们做集群部署,那如果支付服务重复消费了订单服务发送过来的支付消息,那么就会造成数据错误,我们得避免这种情况。试想一下重复扣用户的款,这肯定不行的。

接下来,我们谈谈怎么利用Stream来处理重复消费的问题。Spring Cloud Stream提供了Group组的概念,我们可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

导致原因:默认分组group是不同的,组流水号不一样,被认为不同组,可以消费。

从RabbitMQ可视化界面中,我们可以看到【8802】和【8803】被分配的默认分组信息:

可以看到,两个消费者的group组名是不一样的,所以导致了重复消费。Spring Cloud Stream提供了自定义分组配置的功能,我们可以将【8802】和【8803】分配相同的组名,具体配置如下:

 在【8802】和【8803】的application.yml配置文件中都加入: group:group1  指定相同的分组名称,如下图所示:

重启【8802】和【8803】,浏览器访问两次:http://localhost:8801/sendMessage  模拟发送两条消息

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

可以看到,同一条消息同时只能被一个消费者处理,成功防止了消息的重复消费问题。同时我们在RabbitMQ的web界面也可以看到相关的信息:


五、消息持久化

除了使用group能防止消息重复消费,其实group还能将消息进行持久化,下面我们来测试一下。

(1)、停掉【8802】和【8803】两个消息消费者服务

(2)、注释掉【8802】服务中的group分组属性,注意【8803】需要保留group分组属性

(3)、浏览器访问两次http://localhost:8801/sendMessage,模拟发送两条消息。

     (a)、消息发送端日志

接着我们重启【8802】和【8803】服务,注意观察后台日志:

     (b)、消息接收方【8802】日志

可见,没有消息消费的日志信息。 

     (c)、消息接收方【8803】日志 

可以看到,保留group属性的【8803】服务实现了对消息的持久化,当重启之后会自动去拉取未消费的消息来进行消费;而【8802】由于未保留group属性,所以并没有重新去拉取最新消息进行消费。 


六、总结

本篇文章总结了如何使用Spring Cloud Stream消息驱动屏蔽消息中渐渐的底层实现,极大地方便我们开发者。同时讲解了如何使用分组来避免消息重复消费的问题以及消息持久化。Spring Cloud Stream实现了消息中间件和应用程序的高度解耦以上相关项目的代码我已经放在Gitee上,有需要的小伙伴可以去拉取进行学习:https://gitee.com/weixiaohuai/springcloud_Hoxton,由于笔者水平有限,如有不对之处,还请小伙伴们指正,相互学习,一起进步。


推荐阅读
  • 都说Python处理速度慢,为何月活7亿的 Instagram依然在使用Python?
    点击“Python编程与实战”,选择“置顶公众号”第一时间获取Python技术干货!来自|简书作者|我爱学python链接|https:www.jian ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 本文介绍了Android中的assets目录和raw目录的共同点和区别,包括获取资源的方法、目录结构的限制以及列出资源的能力。同时,还解释了raw目录中资源文件生成的ID,并说明了这些目录的使用方法。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • 服务网关与流量网关
    一、为什么需要服务网关1、什么是服务网关传统的单体架构中只需要开放一个服务给客户端调用,但是微服务架构中是将一个系统拆分成多个微服务,如果没有网关& ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • SOA架构理解理解SOA架构,了解ESB概念,明白SOA与微服务的区别和联系,了解SOA与热门技术的结合与应用。1、面向服务的架构SOASOA(ServiceOrien ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • TiDB | TiDB在5A级物流企业核心系统的应用与实践
    TiDB在5A级物流企业核心系统的应用与实践前言一、业务背景科捷物流概况神州金库简介二、现状与挑战神州金库现有技术体系业务挑战应对方案三、TiDB解决方案测试迁移收益问题四、说在最 ... [详细]
  • 后台自动化测试与持续部署实践
    后台自动化测试与持续部署实践https:mp.weixin.qq.comslqwGUCKZM0AvEw_xh-7BDA后台自动化测试与持续部署实践原创 腾讯程序员 腾讯技术工程 2 ... [详细]
  • 提供:ZStack云计算原创2016-12-26张鑫讲师介绍张鑫ZStack总架构师、联合创始人《系统虚拟化》主要作者,曾任职Intel开源软件技术中心 ... [详细]
author-avatar
回__复卷轴
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有