热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot2.X学习笔记09整合RocketMQ

SpringBoot-2.X学习笔记09整合RocketMQ,Go语言社区,Golang程序员人脉社

SpringBoot-2.X 学习笔记09 整合RocketMQ

  • 1 下载部署 RocketMQ
    • 1.1 Windows 系统
      • 1.1.1 下载 Apache RocketMQ-4.5.2
      • 1.1.2 安装 Apache RocketMQ-4.5.2
      • 1.1.3 启动 Apache RocketMQ-4.5.2
      • 1.1.4 Apache RocketMQ-4.5.2 nameserver 启动成功
      • 1.1.5 Apache RocketMQ-4.5.2 broker 启动成功
    • 1.2 非 Windows 系统
      • 1.2.1 进入 Apache RocketMQ 官网
      • 1.2.2 参考 Apache RocketMQ 快速开始
  • 2 安装 RocketMQ 可视化工具
    • 2.1 使用Git克隆或者直接下载 Apache RocketMQ 工具集
    • 2.2 选择 rocketmq-console 的目录
      • 2.2.1 修改 rocketmq-console 的 application.properties。
      • 2.2.2 使用 Maven 编译 rocketmq-console
      • 2.2.3 启动 rocketmq-console
      • 2.2.4 rocketmq-console 启动成功
      • 2.2.5 查看 Apache RocketMQ-4.5.2 启动状态
  • 3 SpringBoot-2.X 整合 RocketMQ
    • 3.1 添加 Apache RocketMQ 依赖
    • 3.2 修改 SpringBoot-2.X 的 application.properties 配置
    • 3.3 消息生产者 ProducerConfigure.java
    • 3.4 消息消费者 ConsumerConfigure.java
    • 3.5 消费者加入到监听器(HttpSessionIdListener)中测试 Consumer.java
    • 3.6 修改启动类代码加入 @ServletComponentScan
    • 3.7 编写控制类 RocketMQController.java
    • 3.8 测试及结果


1 下载部署 RocketMQ

1.1 Windows 系统


1.1.1 下载 Apache RocketMQ-4.5.2

进入 Apache RocketMQ官网 下载你需要的版本的二进制或者源码(需要手动编译)。
在这里插入图片描述

如果选择 Source 需要自己编译


1.1.2 安装 Apache RocketMQ-4.5.2

解压或者编译好 RocketMQ-4.5.2,并将对应的 RocketMQ 文件目录添加到 Windows 环境变量中。
Windows 配置 RocketMQ 环境变量


1.1.3 启动 Apache RocketMQ-4.5.2

# 切换到 RocketMQ 的 bin 目录
cd C:UsersAdministratorVideosRocketMQ-4.5.2bin
# 启动 RocketMQ nameserver
start mqnamesrv.cmd
# 启动 RocketMQ broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
# 关闭 RocketMQ nameserver
mqshutdown.cmd namesrv
# 关闭 RocketMQ broker
mqshutdown.cmd broker

1.1.4 Apache RocketMQ-4.5.2 nameserver 启动成功

RocketMQ nameserver


1.1.5 Apache RocketMQ-4.5.2 broker 启动成功

RocketMQ broker


1.2 非 Windows 系统


1.2.1 进入 Apache RocketMQ 官网

进入 Apache RocketMQ官网 。
在这里插入图片描述


1.2.2 参考 Apache RocketMQ 快速开始

参考 Apache RocketMQ 快速开始。
在这里插入图片描述


2 安装 RocketMQ 可视化工具

2.1 使用Git克隆或者直接下载 Apache RocketMQ 工具集

使用Git克隆或者直接下载 Apache RocketMQ 工具集。
在这里插入图片描述
在这里插入图片描述


2.2 选择 rocketmq-console 的目录


2.2.1 修改 rocketmq-console 的 application.properties。

切换到 rocketmq-console 的目录,找到 application.properties ,按自己的实际情况修改 rocketmq-console 的配置。

server.cOntextPath=
server.port=8077
### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.cOnfig=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=
#if you use rocketmq version <3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false

2.2.2 使用 Maven 编译 rocketmq-console

切换到 rocketmq-console 的目录

# 1
cd C:UsersAdministratorVideosrocketmq-externals-masterrocketmq-console
# 2
mvn clean package -Dmaven.test.skip=true

2.2.3 启动 rocketmq-console

编译完成后 rocketmq-console 的目录会多出一个 target 目录,进入 target 目录,使用Java CMD 命令运行 rocketmq-console

cd target
java -jar rocketmq-console-ng-1.0.1.jar

2.2.4 rocketmq-console 启动成功

在这里插入图片描述


2.2.5 查看 Apache RocketMQ-4.5.2 启动状态

C:UsersAdministratorVideosRocketMQ-4.5.2bin>jps
10056 NamesrvStartup
12424 jar
13160
11228 BrokerStartup
13580 Jps

3 SpringBoot-2.X 整合 RocketMQ

3.1 添加 Apache RocketMQ 依赖

<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.5.2version>
dependency>

3.2 修改 SpringBoot-2.X 的 application.properties 配置

#producer 该应用是否启用生产者
rocketmq.producer.isOnOff=on
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.producer.groupName=producer
#mq的nameserver地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
#消息最大长度 默认1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
#发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
#consumer 该应用是否启用消费者
rocketmq.consumer.isOnOff=on
rocketmq.consumer.groupName=test_consumer
#mq的nameserver地址
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
#该消费者订阅的主题和tags
rocketmq.consumer.topics=test_topic
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
#设置一次消费消息的条数,默认为1
rocketmq.consumer.consumeMessageBatchMaxSize=1

3.3 消息生产者 ProducerConfigure.java

package com.xu.springboot.rocketmq;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ProducerConfigure {
/**
* 生产者组名
*/

@Value("${rocketmq.producer.groupName}")
private String producerGroup;
/**
* namesrvAddr
*/

@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer;
public DefaultMQProducer getProducer() {
return this.producer;
}
@PostConstruct
public void init() {
producer = new DefaultMQProducer(producerGroup);
//NamesrvAddr 有多个 可以 producer.setNamesrvAddr("127.0.0.1:9876;192.168.0.1:9876;192.168.0.2:9876")
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
//防止 InstanceName 重复
producer.setInstanceName(UUID.randomUUID().toString());
try {
//producer 在使用前必须调用 start() 只能初始化一次
producer.start();
} catch (Exception e) {
e.printStackTrace();
producer.shutdown();
}
}
}

3.4 消息消费者 ConsumerConfigure.java

package com.xu.springboot.rocketmq;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ConsumerConfigure {

/**
* 消费者者组名
*/

@Value("${rocketmq.consumer.groupName}")
private String consumerGroup;
/**
* namesrvAddr
*/

@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
private DefaultMQPushConsumer consumer;
public DefaultMQPushConsumer getProducer() {
return this.consumer;
}
@PostConstruct
public void consumer() {
consumer = new DefaultMQPushConsumer(consumerGroup);
//NamesrvAddr 有多个 可以 producer.setNamesrvAddr("127.0.0.1:9876;192.168.0.1:9876;192.168.0.2:9876")
consumer.setNamesrvAddr(namesrvAddr);
//防止 InstanceName 重复
consumer.setInstanceName(UUID.randomUUID().toString());
try {
consumer.subscribe("test_topic", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently)(list,context)->{
try {
for(MessageExt msg:list) {
System.out.println("消费者:MsgId = "+msg.getMsgId()+"t消费内容: SendStatus = "+new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET));
}
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (Exception e) {
consumer.shutdown();
}
}

}

3.5 消费者加入到监听器(HttpSessionIdListener)中测试 Consumer.java

监听器属于SpringMVC所以需要在启动内中加入@ServletComponentScan(“com.xu.springboot.listener”)

package com.xu.springboot.listener;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionIdListener;
import org.springframework.stereotype.Component;
import com.xu.springboot.rocketmq.ConsumerConfigure;
@Component
public class Consumer extends ConsumerConfigure implements HttpSessionIdListener {
@Override
public void sessionIdChanged(HttpSessionEvent se, String oldSessionId) {
super.consumer();
}
}

3.6 修改启动类代码加入 @ServletComponentScan

修改启动类代码加入 @ServletComponentScan(“com.xu.springboot.listener”) Application.java

package com.xu.springboot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@SpringBootApplication
@ServletComponentScan("com.xu.springboot.listener")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

3.7 编写控制类 RocketMQController.java

package com.xu.springboot.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.xu.springboot.rocketmq.ProducerConfigure;
@Controller
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private ProducerConfigure producer;
@RequestMapping("/order1")
@ResponseBody
public Object order1(String msg,String tag) throws Exception {
Message message=new Message("test_topic",tag,msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result=producer.getProducer().send(message);
System.out.println("生产者:MsgId = "+result.getMsgId()+"t发送状态:SendStatus = "+result.getSendStatus());
return result.getSendStatus();
}
@RequestMapping("/order2")
@ResponseBody
public void order2(String msg,String tag) throws Exception {
Message message=new Message("test_topic",tag,msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.getProducer().send(message,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("生产者:MsgId = "+sendResult.getMsgId()+"t发送状态:SendStatus = "+sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败");
}
});
}
}

3.8 测试及结果

测试1
测试2




推荐阅读
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • 2018年人工智能大数据的爆发,学Java还是Python?
    本文介绍了2018年人工智能大数据的爆发以及学习Java和Python的相关知识。在人工智能和大数据时代,Java和Python这两门编程语言都很优秀且火爆。选择学习哪门语言要根据个人兴趣爱好来决定。Python是一门拥有简洁语法的高级编程语言,容易上手。其特色之一是强制使用空白符作为语句缩进,使得新手可以快速上手。目前,Python在人工智能领域有着广泛的应用。如果对Java、Python或大数据感兴趣,欢迎加入qq群458345782。 ... [详细]
  • 拥抱Android Design Support Library新变化(导航视图、悬浮ActionBar)
    转载请注明明桑AndroidAndroid5.0Loollipop作为Android最重要的版本之一,为我们带来了全新的界面风格和设计语言。看起来很受欢迎࿰ ... [详细]
  • switch语句的一些用法及注意事项
    本文介绍了使用switch语句时的一些用法和注意事项,包括如何实现"fall through"、default语句的作用、在case语句中定义变量时可能出现的问题以及解决方法。同时也提到了C#严格控制switch分支不允许贯穿的规定。通过本文的介绍,读者可以更好地理解和使用switch语句。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 标题: ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 本文介绍了在Mac上安装Xamarin并使用Windows上的VS开发iOS app的方法,包括所需的安装环境和软件,以及使用Xamarin.iOS进行开发的步骤。通过这种方法,即使没有Mac或者安装苹果系统,程序员们也能轻松开发iOS app。 ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
  • 本文介绍了H5游戏性能优化和调试技巧,包括从问题表象出发进行优化、排除外部问题导致的卡顿、帧率设定、减少drawcall的方法、UI优化和图集渲染等八个理念。对于游戏程序员来说,解决游戏性能问题是一个关键的任务,本文提供了一些有用的参考价值。摘要长度为183字。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了Python版Protobuf的安装和使用方法,包括版本选择、编译配置、示例代码等内容。通过学习本教程,您将了解如何在Python中使用Protobuf进行数据序列化和反序列化操作,以及相关的注意事项和技巧。 ... [详细]
  • 本文介绍了作者在开发过程中遇到的问题,即播放框架内容安全策略设置不起作用的错误。作者通过使用编译时依赖注入的方式解决了这个问题,并分享了解决方案。文章详细描述了问题的出现情况、错误输出内容以及解决方案的具体步骤。如果你也遇到了类似的问题,本文可能对你有一定的参考价值。 ... [详细]
author-avatar
朴子字軒_755
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有