热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot2.0快速集成RocketMQ(三)

SpringBoot2.0-快速集成RocketMQ(三),Go语言社区,Golang程序员人脉社

参考:https://github.com/apache/rocketmq-spring/blob/master/README_zh_CN.md

RocketMQ 单机搭建以及使用:https://blog.csdn.net/qq_31289187/article/details/88351235

1、pom.xml

4.0.0

org.springframework.boot spring-boot-starter-parent 2.1.3.RELEASE com.cn.dl springboot-rocketmq 0.0.1-SNAPSHOT springboot-rocketmq springboot-rocketmq

1.8 org.springframework.boot spring-boot-starter org.apache.rocketmq rocketmq-spring-boot-starter 2.0.1 com.alibaba.rocketmq rocketmq-client 3.5.8 org.projectlombok lombok 1.18.4 com.alibaba fastjson 1.2.47 org.springframework.boot spring-boot-starter-test test

org.springframework.boot spring-boot-maven-plugin

2、application.properties

#nameServer rocketmq.name-server=127.0.0.1:9876 #生产者组 rocketmq.producer.group=user_producer_group

3、UserMsg

package com.cn.dl; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * Created by yanshao on 2019/3/29. */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class UserMsg implements Serializable { private long userId; private String name; private int age; private String address; }

4、Constant:配置

package com.cn.dl; /** * Created by yanshao on 2019/3/29. */ public interface Constant { String TOPIC = "user_msg"; String PRODUCER_GROUP = "user_producer_group"; String CONSUMER_GROUP = "user_consumer_group"; }

5、MsgProducer

package com.cn.dl; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by yanshao on 2019/3/29. */ @Component public class MsgProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendUserMsg(UserMsg userMsg){ rocketMQTemplate.syncSend(Constant.TOPIC, JSON.toJSONString(userMsg)); } }

6、MsgConsumer

package com.cn.dl; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Service; import java.io.UnsupportedEncodingException; /** * Created by yanshao on 2019/3/29. */ @Service @RocketMQMessageListener( cOnsumerGroup= Constant.CONSUMER_GROUP, topic = Constant.TOPIC ) public class MsgConsumer implements RocketMQListener,RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt messageExt) { String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println("MsgConsumer >>> " + msg +", msgId = " + messageExt.getMsgId()); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { } }

7、RocketMQTest

package com.cn.dl; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.TimeUnit; /** * Created by yanshao on 2019/3/29. */ @RunWith(SpringRunner.class) @SpringBootTest public class RocketMQTest { @Autowired private MsgProducer msgProducer; @Test public void rocketMqTest() throws InterruptedException { int i = 0; while (i <10){ UserMsg userMsg = UserMsg.builder() .userId(10000 + i) .name("yanshao" + i) .age(10 + i) .address("shanghai") .build(); i++; msgProducer.sendUserMsg(userMsg); TimeUnit.SECONDS.sleep(5); } } }

MsgConsumer >>> {"address":"shanghai","age":10,"name":"yanshao0","userId":10000}, msgId = 0200011D5EF418B4AAC2935F143C0000 MsgConsumer >>> {"address":"shanghai","age":11,"name":"yanshao1","userId":10001}, msgId = 0200011D5EF418B4AAC2935F27D00001 MsgConsumer >>> {"address":"shanghai","age":12,"name":"yanshao2","userId":10002}, msgId = 0200011D5EF418B4AAC2935F3B620002 MsgConsumer >>> {"address":"shanghai","age":13,"name":"yanshao3","userId":10003}, msgId = 0200011D5EF418B4AAC2935F4EF50003 MsgConsumer >>> {"address":"shanghai","age":14,"name":"yanshao4","userId":10004}, msgId = 0200011D5EF418B4AAC2935F62870004 MsgConsumer >>> {"address":"shanghai","age":15,"name":"yanshao5","userId":10005}, msgId = 0200011D5EF418B4AAC2935F761A0005 MsgConsumer >>> {"address":"shanghai","age":16,"name":"yanshao6","userId":10006}, msgId = 0200011D5EF418B4AAC2935F89AA0006 MsgConsumer >>> {"address":"shanghai","age":17,"name":"yanshao7","userId":10007}, msgId = 0200011D5EF418B4AAC2935F9D3B0007 MsgConsumer >>> {"address":"shanghai","age":18,"name":"yanshao8","userId":10008}, msgId = 0200011D5EF418B4AAC2935FB0CB0008 MsgConsumer >>> {"address":"shanghai","age":19,"name":"yanshao9","userId":10009}, msgId = 0200011D5EF418B4AAC2935FC45D0009

再启一个消费者,订阅user_msg

8、MsgConsumer2

package com.cn.dl; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Service; import java.io.UnsupportedEncodingException; /** * Created by yanshao on 2019/3/29. */ @Service @RocketMQMessageListener( #记得修改consumer_group cOnsumerGroup= Constant.CONSUMER_GROUP, topic = Constant.TOPIC ) public class MsgConsumer2 implements RocketMQListener,RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt messageExt) { String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println("MsgConsumer2 消费者2 >>> " + msg +", msgId = " + messageExt.getMsgId()); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { } }

Caused by: java.lang.IllegalStateException: Failed to start RocketMQ push consumer at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:231) at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.registerContainer(ListenerContainerConfiguration.java:100) ... 36 more Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[user_consumer_group] has been created before, specify another name please. See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:613) at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:520) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:229) ... 37 more

       rocketmq不允许一个消费者同时订阅多个topic或者同时做多件事情,也就是消费者只能订阅一个topic、做一件事情,这跟kafka有很大的不同。重新定义一个消费者

MsgConsumer >>> {"address":"shanghai","age":10,"name":"yanshao0","userId":10000}, msgId = 0200011D3C2018B4AAC2937D5AFB0000 MsgConsumer2 消费者2 >>> {"address":"shanghai","age":11,"name":"yanshao1","userId":10001}, msgId = 0200011D3C2018B4AAC2937D6E8D0001 MsgConsumer >>> {"address":"shanghai","age":11,"name":"yanshao1","userId":10001}, msgId = 0200011D3C2018B4AAC2937D6E8D0001 MsgConsumer2 消费者2 >>> {"address":"shanghai","age":12,"name":"yanshao2","userId":10002}, msgId = 0200011D3C2018B4AAC2937D82240002 MsgConsumer >>> {"address":"shanghai","age":12,"name":"yanshao2","userId":10002}, msgId = 0200011D3C2018B4AAC2937D82240002 MsgConsumer2 消费者2 >>> {"address":"shanghai","age":13,"name":"yanshao3","userId":10003}, msgId = 0200011D3C2018B4AAC2937D95B70003 MsgConsumer >>> {"address":"shanghai","age":13,"name":"yanshao3","userId":10003}, msgId = 0200011D3C2018B4AAC2937D95B70003 MsgConsumer2 消费者2 >>> {"address":"shanghai","age":14,"name":"yanshao4","userId":10004}, msgId = 0200011D3C2018B4AAC2937DA9430004 MsgConsumer >>> {"address":"shanghai","age":14,"name":"yanshao4","userId":10004}, msgId = 0200011D3C2018B4AAC2937DA9430004

rocketmq消费失败,默认重试16次,时间间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

如果捕获处理逻辑代码,并没有抛出异常,rocketmq不会重试消费的

MsgConsumer >>> {"address":"shanghai","age":10,"name":"yanshao0","userId":10000}, msgId = 0200011D61E418B4AAC2938398610000 at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:32) at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:15) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:295) at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.ArithmeticException: / by zero at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:32) at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:15) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:295) at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.ArithmeticException: / by zero MsgConsumer >>> {"address":"shanghai","age":11,"name":"yanshao1","userId":10001}, msgId = 0200011D61E418B4AAC29383ABF00001 at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:32) at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:15) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:295) at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.ArithmeticException: / by zero at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:32) at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:15) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:295) at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) MsgConsumer >>> {"address":"shanghai","age":12,"name":"yanshao2","userId":10002}, msgId = 0200011D61E418B4AAC29383BF7C0002 MsgConsumer >>> {"address":"shanghai","age":13,"name":"yanshao3","userId":10003}, msgId = 0200011D61E418B4AAC29383D30C0003java.lang.ArithmeticException: / by zero at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:32) at com.cn.dl.MsgConsumer.onMessage(MsgConsumer.java:15) at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:295) at

如果抛出异常,消费失败的,会重试

      如果消费失败,也不重试,在void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer)设置,也可以设置offset等等。

每天进步一点点!!!


推荐阅读
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了在使用Python中的aiohttp模块模拟服务器时出现的连接失败问题,并提供了相应的解决方法。文章中详细说明了出错的代码以及相关的软件版本和环境信息,同时也提到了相关的警告信息和函数的替代方案。通过阅读本文,读者可以了解到如何解决Python连接服务器失败的问题,并对aiohttp模块有更深入的了解。 ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
author-avatar
qs08y602lt
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有