热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Hoxton.SR1版本】SpringCloudStream消息驱动

目录一、简介二、搭建消息生产者端三、搭建消息消费者端四、消息重复消费问题五、消息持久化六、总结一、简介在实际项目中,服务与服务之间的通信往往我们会采用消

目录

一、简介

二、搭建消息生产者端

三、搭建消息消费者端

四、消息重复消费问题

五、消息持久化

六、总结




一、简介

在实际项目中,服务与服务之间的通信往往我们会采用消息中间件方式来处理,比如引入RabbitMQ、Kafka等,但这会有一个问题,就是我们的应用程序跟消息中间件耦合在一块了,还有就是如果我们要替换为Kafka,那么变动会比较大,Spring Cloud官网提供了Spring Cloud Stream组件,用来给我们整合消息中间件,Spring Cloud Stream底层屏蔽了消息中间件的差异,降低了切换成本,统一消息的编程模型,这样就可以降低我们系统和消息中间件的耦合度。总结一句话:就是Spring Cloud Stream有利于应用程序与消息中间件的解耦。

 


  • Spring Cloud Stream是什么?

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs和outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅,消费组、分区的三个核心概念。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

 


  • Spring Cloud Stream官方文档地址

https://spring.io/projects/spring-cloud-stream

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.6.RELEASE/reference/html/

 


  • Spring Cloud Stream官网结构图

可以看到,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。


  • Spring Cloud Stream编码API和常用注解

  1. Middleware:中间件,目前只支持RabbitMQ和Kafka。
  2. Binder:Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便地连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
  3. @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
  4. @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
  5. @StreamListener:监听队列,用于消费者的队列的消息接收。
  6. @EnableBinding:指信道channel和exchange绑定在一起。

  • Stream几个重要概念

  1. Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder;
  2. Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建);
  3. Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信;

二、搭建消息生产者端

新建一个module【springcloud-stream-rabbitmq-provider8801】

【a】pom.xml:引入spring-cloud-starter-stream-rabbit依赖


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-provider8801org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-testtest

【b】application.yml:相关配置都在配置文件里面做了较详细的说明

server:port: 8801
spring:application:name: springcloud-stream-rabbitmq-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # rabbitmq相关环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理output: # 输出通道,表示消息生产方destination: rabbitmq_stream_exchange # 指定输出的交换器名称content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx中的xxx绑定器名称对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class SpringCloudStreamMQServiceApplicaiton8801 {public static void main(String[] args) {SpringApplication.run(SpringCloudStreamMQServiceApplicaiton8801.class, args);}
}

【d】定义消息发送的接口

/*** @Description 消息发送接口* @Date 2020/8/27 21:37* @Author weishihuai* 说明:*/
public interface IMessageProvider {/*** 发送消息*/String sendMessage();
}

消息发送实现类:

package com.wsh.springcloud.service.impl;import com.wsh.springcloud.service.IMessageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;/*** @Description 消息发送实现类* @Date 2020/8/27 21:38* @Author weishihuai* 说明: @EnableBinding表示信道channel和exchange绑定在一起.*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {private static final Logger logger = LoggerFactory.getLogger(MessageProviderImpl.class);/*** 消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String sendMessage() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());logger.info("消息发送者发送消息: {}", uuid);return "消息发送者发送消息: " + uuid;}}

 【e】定义消息发送Controller

package com.wsh.springcloud.controller;import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Description 消息发送测试Controller* @Date 2020/8/27 21:39* @Author weishihuai* 说明:*/
@RestController
public class SendMessageController {@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage() {return messageProvider.sendMessage();}}

【f】测试

启动Eureka注册中心以及消息驱动发送方服务,浏览器访问:http://localhost:8801/sendMessage 测试发送消息,然后我们去RabbitMQ界面观察流量情况:

注意:下图中的rabbitmq_stream_exchange就是我们在application.yml中指定的将消息输出到哪个desitination交换机上面。

观察后台日志:

 可见,消息成功发送到MQ中,正在等待消费方进行消费消息,至此,消息发送者端搭建成功,接下来搭建消息消费方服务。


三、搭建消息消费者端

新建module【springcloud-stream-rabbitmq-consumer8802】

【a】pom.xml


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-consumer8802org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-actuatororg.springframework.bootspring-boot-starter-testtest

【b】applicaiton.yml

server:port: 8802
spring:application:name: springcloud-stream-rabbitmq-consumercloud:stream:binders:rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理input: # 输入通道,表示消息消费方destination: rabbitmq_stream_exchange # 指定接收的交换器名称,需与消息发送方的destination对应上content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx对应中的xxx对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class RabbitMQStreamServiceApplication8802 {public static void main(String[] args) {SpringApplication.run(RabbitMQStreamServiceApplication8802.class, args);}
}

【d】新增接收消息发送方发送消息的方法

package com.wsh.springcloud.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** @version V1.0* @ClassName: com.wsh.springcloud.controller.ReceiveMessageController.java* @Description: 接收消息发送方发送的消息* @author: weishihuai* @date: 2020/8/28 10:55*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageController.class);@Value("${server.port}")private String serverPort;/*** 接收消息发送方发送的消息** @param message 消息* @StreamListener 通过@StreamListener注解来监听exchange中的消息*/@StreamListener(Sink.INPUT)private void receiveMessage(Message message) {String payload = message.getPayload();logger.info("消息接收方接收消息: {}, 服务端口号:{}", payload, serverPort);}}

 【e】测试

启动消息消费端,浏览器访问两次:http://localhost:8801/sendMessage 模拟发送两条消息到RabbitMQ中,查看消费者端是否成功消费此消息。

下图是消息发送方的日志:

下图是消息消费方的日志:

由此可见,成功实现了消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息:


四、消息重复消费问题

为了模拟消息重复消费的问题,这里我们还需要一个消息消费端,所以我们新建一个module【springcloud-stream-rabbitmq-consumer8803】,此子模块跟【springcloud-stream-rabbitmq-consumer8802】除了端口号,其他一模一样,这里不再过多阐述。

启动8803消费者和8802消费者,浏览器访问两次:http://localhost:8801/sendMessage,模拟发送两条消息。

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

由此可见,同一条消息同时被两个消费者处理,这是不对的。

比如在如下场景中,假如订单服务调用支付服务,支付服务我们做集群部署,那如果支付服务重复消费了订单服务发送过来的支付消息,那么就会造成数据错误,我们得避免这种情况。试想一下重复扣用户的款,这肯定不行的。

接下来,我们谈谈怎么利用Stream来处理重复消费的问题。Spring Cloud Stream提供了Group组的概念,我们可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

导致原因:默认分组group是不同的,组流水号不一样,被认为不同组,可以消费。

从RabbitMQ可视化界面中,我们可以看到【8802】和【8803】被分配的默认分组信息:

可以看到,两个消费者的group组名是不一样的,所以导致了重复消费。Spring Cloud Stream提供了自定义分组配置的功能,我们可以将【8802】和【8803】分配相同的组名,具体配置如下:

 在【8802】和【8803】的application.yml配置文件中都加入: group:group1  指定相同的分组名称,如下图所示:

重启【8802】和【8803】,浏览器访问两次:http://localhost:8801/sendMessage  模拟发送两条消息

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

可以看到,同一条消息同时只能被一个消费者处理,成功防止了消息的重复消费问题。同时我们在RabbitMQ的web界面也可以看到相关的信息:


五、消息持久化

除了使用group能防止消息重复消费,其实group还能将消息进行持久化,下面我们来测试一下。

(1)、停掉【8802】和【8803】两个消息消费者服务

(2)、注释掉【8802】服务中的group分组属性,注意【8803】需要保留group分组属性

(3)、浏览器访问两次http://localhost:8801/sendMessage,模拟发送两条消息。

     (a)、消息发送端日志

接着我们重启【8802】和【8803】服务,注意观察后台日志:

     (b)、消息接收方【8802】日志

可见,没有消息消费的日志信息。 

     (c)、消息接收方【8803】日志 

可以看到,保留group属性的【8803】服务实现了对消息的持久化,当重启之后会自动去拉取未消费的消息来进行消费;而【8802】由于未保留group属性,所以并没有重新去拉取最新消息进行消费。 


六、总结

本篇文章总结了如何使用Spring Cloud Stream消息驱动屏蔽消息中渐渐的底层实现,极大地方便我们开发者。同时讲解了如何使用分组来避免消息重复消费的问题以及消息持久化。Spring Cloud Stream实现了消息中间件和应用程序的高度解耦以上相关项目的代码我已经放在Gitee上,有需要的小伙伴可以去拉取进行学习:https://gitee.com/weixiaohuai/springcloud_Hoxton,由于笔者水平有限,如有不对之处,还请小伙伴们指正,相互学习,一起进步。


推荐阅读
  • 原文地址:https:www.cnblogs.combaoyipSpringBoot_YML.html1.在springboot中,有两种配置文件,一种 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了Android中的assets目录和raw目录的共同点和区别,包括获取资源的方法、目录结构的限制以及列出资源的能力。同时,还解释了raw目录中资源文件生成的ID,并说明了这些目录的使用方法。 ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • 阿里Treebased Deep Match(TDM) 学习笔记及技术发展回顾
    本文介绍了阿里Treebased Deep Match(TDM)的学习笔记,同时回顾了工业界技术发展的几代演进。从基于统计的启发式规则方法到基于内积模型的向量检索方法,再到引入复杂深度学习模型的下一代匹配技术。文章详细解释了基于统计的启发式规则方法和基于内积模型的向量检索方法的原理和应用,并介绍了TDM的背景和优势。最后,文章提到了向量距离和基于向量聚类的索引结构对于加速匹配效率的作用。本文对于理解TDM的学习过程和了解匹配技术的发展具有重要意义。 ... [详细]
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 在springmvc框架中,前台ajax调用方法,对图片批量下载,如何弹出提示保存位置选框?Controller方法 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
author-avatar
回__复卷轴
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有