热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloud终结篇之消息驱动stream大集合

创建子工程 stream-sample编写pom文件org.springframework.boot

创建子工程  stream-sample

编写pom文件



org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-actuator


org.springframework.cloud
spring-cloud-starter-stream-rabbit

创建启动引导类  StreamApplication

 

@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
}

 

创建配置文件

spring.application.name=stream-sample
server.port=63003
# RabbitMQ连接字符串
spring.rabbitmq.host=192.168.0.201
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 绑定Channel到broadcast
spring.cloud.stream.bindings.myTopic-consumer.destination=broadcast
spring.cloud.stream.bindings.myTopic-producer.destination=broadcast
# 消息分组示例
spring.cloud.stream.bindings.group-consumer.destination=group-topic
spring.cloud.stream.bindings.group-producer.destination=group-topic
spring.cloud.stream.bindings.group-consumer.group=Group-A
## 消息分区配置
## 打开消费者的消费分区功能
spring.cloud.stream.bindings.group-consumer.consumer.partitioned=true
## 两个消息分区
spring.cloud.stream.bindings.group-producer.producer.partition-count=2
# SpEL (Key resolver) 可以定义复杂表达式生成Key
# 我们这里用最简化的配置,只有索引参数为1的节点(消费者),才能消费消息
spring.cloud.stream.bindings.group-producer.producer.partition-key-expression=1
# 当前消费者实例总数
spring.cloud.stream.instanceCount=2
# 最大值instanceCount-1,当前实例的索引号
spring.cloud.stream.instanceIndex=1
# 延迟消息配置
spring.cloud.stream.bindings.delayed-consumer.destination=delayed-topic
spring.cloud.stream.bindings.delayed-producer.destination=delayed-topic
spring.cloud.stream.rabbit.bindings.delayed-producer.producer.delayed-exchange=true
# 异常消息(单机版重试)
spring.cloud.stream.bindings.error-consumer.destination=error-out-topic
spring.cloud.stream.bindings.error-producer.destination=error-out-topic
# 重试次数(本机重试)
# 次数=1相当于不重试
spring.cloud.stream.bindings.error-consumer.consumer.max-attempts=2
# 异常消息(requeue重试)
spring.cloud.stream.bindings.requeue-consumer.destination=requeue-topic
spring.cloud.stream.bindings.requeue-producer.destination=requeue-topic
# 必须把max-attempts设置为1,否则requeue不能生效
spring.cloud.stream.bindings.requeue-consumer.consumer.max-attempts=1
spring.cloud.stream.bindings.requeue-consumer.group=requeue-group
# 仅对当前requeue-consumer,开启requeue
spring.cloud.stream.rabbit.bindings.requeue-consumer.consumer.requeueRejected=true
# 默认全局开启requeue
# spring.rabbitmq.listener.default-requeue-rejected=true
# 死信队列配置
spring.cloud.stream.bindings.dlq-consumer.destination=dlq-topic
spring.cloud.stream.bindings.dlq-producer.destination=dlq-topic
spring.cloud.stream.bindings.dlq-consumer.consumer.max-attempts=2
spring.cloud.stream.bindings.dlq-consumer.group=dlq-group
# 开启死信队列(默认 topic.dlq)
spring.cloud.stream.rabbit.bindings.dlq-consumer.consumer.auto-bind-dlq=true
# Fallback配置
spring.cloud.stream.bindings.fallback-consumer.destination=fallback-topic
spring.cloud.stream.bindings.fallback-producer.destination=fallback-topic
spring.cloud.stream.bindings.fallback-consumer.consumer.max-attempts=2
spring.cloud.stream.bindings.fallback-consumer.group=fallback-group
# input channel -> fallback-topic.fallback-group.errors
management.security.enabled=false
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

 

 

创建 Topic

 

延时消息

public interface DelayedTopic {
String INPUT = "delayed-consumer";
String OUTPUT = "delayed-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

死信队列

public interface DlqTopic {
String INPUT = "dlq-consumer";
String OUTPUT = "dlq-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

 

异常消息

public interface ErrorTopic {
String INPUT = "error-consumer";
String OUTPUT = "error-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

fallback降级

public interface FallbackTopic {
String INPUT = "fallback-consumer";
String OUTPUT = "fallback-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

分区分组

public interface GroupTopic {
String INPUT = "group-consumer";
String OUTPUT = "group-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

重入队列

public interface RequeueTopic {
String INPUT = "requeue-consumer";
String OUTPUT = "requeue-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

我的 消息

public interface MyTopic {
String INPUT = "myTopic-consumer";
String OUTPUT = "myTopic-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

创建stream 流  消息消费者

@Slf4j
@EnableBinding(value = {
Sink.class,
MyTopic.class,
GroupTopic.class,
DelayedTopic.class,
ErrorTopic.class,
RequeueTopic.class,
DlqTopic.class,
FallbackTopic.class
}
)
public class StreamConsumer {
private AtomicInteger count = new AtomicInteger(1);
@StreamListener(Sink.INPUT)
public void consume(Object payload) {
log.info("message consumed successfully, payload={}", payload);
}
// 自定义消息广播
@StreamListener(MyTopic.INPUT)
public void consumeMyMessage(Object payload) {
log.info("My message consumed successfully, payload={}", payload);
}
// 消息分组 & 消费分区示例
@StreamListener(GroupTopic.INPUT)
public void consumeGroupMessage(Object payload) {
log.info("Group message consumed successfully, payload={}", payload);
}
// 延迟消息示例
@StreamListener(DelayedTopic.INPUT)
public void consumeDelayedMessage(MessageBean bean) {
log.info("Delayed message consumed successfully, payload={}", bean.getPayload());
}
// 异常重试(单机版)
@StreamListener(ErrorTopic.INPUT)
public void consumeErrorMessage(MessageBean bean) {
log.info("Are you OK?");
if (count.incrementAndGet() % 3 == 0) {
log.info("Fine, thank you. And you?");
count.set(0);
} else {
log.info("What's your problem?");
throw new RuntimeException("I'm not OK");
}
}
// 异常重试(联机版-重新入列)
@StreamListener(RequeueTopic.INPUT)
public void requeueErrorMessage(MessageBean bean) {
log.info("Are you OK?");
try {
Thread.sleep(3000L);
} catch (Exception e) {
}
// throw new RuntimeException("I'm not OK");
}
// 死信队列
@StreamListener(DlqTopic.INPUT)
public void consumeDlqMessage(MessageBean bean) {
log.info("Dlq - Are you OK?");
if (count.incrementAndGet() % 3 == 0) {
log.info("Dlq - Fine, thank you. And you?");
} else {
log.info("Dlq - What's your problem?");
throw new RuntimeException("I'm not OK");
}
}
// Fallback + 升级版本
@StreamListener(FallbackTopic.INPUT)
public void goodbyeBadGuy(MessageBean bean,
@Header("version") String version) {
log.info("Fallback - Are you OK?");
if ("1.0".equalsIgnoreCase(version)) {
log.info("Fallback - Fine, thank you. And you?");
} else if ("2.0".equalsIgnoreCase(version)) {
log.info("unsupported version");
throw new RuntimeException("I'm not OK");
} else {
log.info("Fallback - version={}", version);
}
}
// 降级流程
@ServiceActivator(inputChannel = "fallback-topic.fallback-group.errors")
public void fallback(Message message) {
log.info("fallback entered");
}
}

创建一个 messageBean

 

@Data
public class MessageBean {
private String payload;
}

最后一步 创建 Controller  

@RestController
@Slf4j
public class Controller {
@Autowired
private MyTopic producer;
@Autowired
private GroupTopic groupTopicProducer;
@Autowired
private DelayedTopic delayedTopicProducer;
@Autowired
private ErrorTopic errorTopicProducer;
@Autowired
private RequeueTopic requeueTopicProducer;
@Autowired
private DlqTopic dlqTopicProducer;
@Autowired
private FallbackTopic fallbackTopicProducer;
// 简单广播消息
@PostMapping("send")
public void sendMessage(@RequestParam(value = "body") String body) {
producer.output().send(MessageBuilder.withPayload(body).build());
}
// 消息分组和消息分区
@PostMapping("sendToGroup")
public void sendMessageToGroup(@RequestParam(value = "body") String body) {
groupTopicProducer.output().send(MessageBuilder.withPayload(body).build());
}
// 延迟消息
@PostMapping("sendDM")
public void sendDelayedMessage(
@RequestParam(value = "body") String body,
@RequestParam(value = "seconds") Integer seconds) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
log.info("ready to send delayed message");
delayedTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("x-delay", seconds * 1000)
.build());
}
// 异常重试(单机版)
@PostMapping("sendError")
public void sendErrorMessage(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
errorTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
// 异常重试(联机版 - 重新入列)
@PostMapping("requeue")
public void sendErrorMessageToMQ(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
requeueTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
// 死信队列测试
@PostMapping("dlq")
public void sendMessageToDlq(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
dlqTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
// fallback + 升版
@PostMapping("fallback")
public void sendMessageToFallback(
@RequestParam(value = "body") String body,
@RequestParam(value = "version", defaultValue = "1.0") String version) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
fallbackTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("version", version)
.build());
}
}

 

 

 

附:

1.    下载插件
https://www.rabbitmq.com/community-plugins.html

找到rabbitmq_delayed_message_exchange
下载对应版本的插件,3.6和3.7版本插件不一样

2. 下载以后解压,copy到rabbitmq安装目录下的plugins文件夹

3.    安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4.    安装完一定要重启RabbitMQ,不是单单重启UI管理界面!
如果只是单单调用rabbitmqctl  stop_app然后再rabbitmqctl  start_app是没有作用的!
正确的步骤是先rabbitmqctl stop,然后再直接执行rabbitmq-server

如果以上步骤还能使延迟队列生效,在重启完之后,换一个新的topic名字就好了
 

本文地址:https://blog.csdn.net/weixin_38305866/article/details/109941990



推荐阅读
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • NotSupportedException无法将类型“System.DateTime”强制转换为类型“System.Object”
    本文介绍了在使用LINQ to Entities时出现的NotSupportedException异常,该异常是由于无法将类型“System.DateTime”强制转换为类型“System.Object”所导致的。同时还介绍了相关的错误信息和解决方法。 ... [详细]
  • 微软评估和规划(MAP)的工具包介绍及应用实验手册
    本文介绍了微软评估和规划(MAP)的工具包,该工具包是一个无代理工具,旨在简化和精简通过网络范围内的自动发现和评估IT基础设施在多个方案规划进程。工具包支持库存和使用用于SQL Server和Windows Server迁移评估,以及评估服务器的信息最广泛使用微软的技术。此外,工具包还提供了服务器虚拟化方案,以帮助识别未被充分利用的资源和硬件需要成功巩固服务器使用微软的Hyper - V技术规格。 ... [详细]
  • Spring框架《一》简介
    Spring框架《一》1.Spring概述1.1简介1.2Spring模板二、IOC容器和Bean1.IOC和DI简介2.三种通过类型获取bean3.给bean的属性赋值3.1依赖 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • java实现rstp格式转换使用ffmpeg实现linux命令第一步安装node.js和ffmpeg第二步搭建node.js启动websocket接收服务
    java实现rstp格式转换使用ffmpeg实现linux命令第一步安装node.js和ffmpeg第二步搭建node.js启动websocket接收服务第三步java实现 ... [详细]
  • 本文介绍了在实现了System.Collections.Generic.IDictionary接口的泛型字典类中如何使用foreach循环来枚举字典中的键值对。同时还讨论了非泛型字典类和泛型字典类在foreach循环中使用的不同类型,以及使用KeyValuePair类型在foreach循环中枚举泛型字典类的优势。阅读本文可以帮助您更好地理解泛型字典类的使用和性能优化。 ... [详细]
  • 本文介绍了如何使用MATLAB调用摄像头进行人脸检测和识别。首先需要安装扩展工具,并下载安装OS Generic Video Interface。然后使用MATLAB的机器视觉工具箱中的VJ算法进行人脸检测,可以直接调用CascadeObjectDetector函数进行检测。同时还介绍了如何调用摄像头进行人脸识别,并对每一帧图像进行识别。最后,给出了一些相关的参考资料和实例。 ... [详细]
  • 本文介绍了利用ARMA模型对平稳非白噪声序列进行建模的步骤及代码实现。首先对观察值序列进行样本自相关系数和样本偏自相关系数的计算,然后根据这些系数的性质选择适当的ARMA模型进行拟合,并估计模型中的位置参数。接着进行模型的有效性检验,如果不通过则重新选择模型再拟合,如果通过则进行模型优化。最后利用拟合模型预测序列的未来走势。文章还介绍了绘制时序图、平稳性检验、白噪声检验、确定ARMA阶数和预测未来走势的代码实现。 ... [详细]
author-avatar
抓尼莫為生
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有