热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Rocketmq讲解以及使用SpringCloudStream操作

安装:RocketMQ4.5.1安装教程_慕课手记搭建RocketMQ控制台RocketMQ控制台安装教程_慕课手记SpringCloudStream是什么

安装:

RocketMQ 4.5.1安装教程_慕课手记


 搭建RocketMQ控制台

 RocketMQ控制台安装教程_慕课手记


 Spring Cloud Stream是什么?


  • 是一个用于构建消息驱动的微服务的框架
  • 可实现kafka,rabbitmq,rocketmq的无感知切换
  • 当消息生产者使用Kafka发送消息,那只能用Kafka来接收消息。当使用SpringCloudStream来处理消息的话,我们接收Kafka的消息,可以使用其他的消息中间件来进行接收。SpringCloudStream对消息进行了一层封装,所以我们不需要去关心生产者用的是什么消息中间件。
     

 Spring Cloud Stream编写rocketmq生产者?


  1.   添加依赖:

    org.springframework.cloudspring-cloud-starter-stream-rocketmq

  2. 添加注解添加@ EnableBing(Source.class) 注解,如图所示:

  

 3.写配置(application.yml):

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:output:# 用来指定topicdestination: stream-test-topic

 4.生产者发送消息:

@GetMapping("test-stream")public String testStream(){this.source.output().send(MessageBuilder.withPayload("消息体").build());return "success";}

Spring Cloud Stream 消息消费者?


  1. 添加依赖:

org.springframework.cloudspring-cloud-starter-stream-rocketmq

 2.添加注解添加@ EnableBing(Sink.class) 注解,如图所示:

 

 3.写配置:

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:input:destination: stream-test-topicgroup: binder-group # 这里的group 一定要设置; 如果使用的不是rocketmq的话,这里可以不用设置,可以留空

4.编写监听消费类

@Service@Slf4jpublic class TestStreamConsumer{@StreamListener(Sink.INPUT)public void receive(String messageBody){log.info("通过stream收到了消息: messageBody = {}");}}

 到此已经完成了rocketmq的基本操作,我们使用的监听配置类都是java默认自带的,我们也可以自定义,举个生产者的例子:

1.定义类

public interface MySource{String MY_OUTPUT= "my-output";@Output(MY_OUTPUT)MessageChannel output();}

2.启动类添加注解

 3.加配置

 其他都是雷同的,就不一一列举了


 下边讲一下mq的分布式事务


什么场景使用?

当我们的逻辑代码中,不仅仅对数据库做了处理,一些场景下我们需要同时进行消息发送和与MySQL进行交互的功能;此图中,我们首先进行了消息发送,然后再把消息写入缓存,那么就会导致: 如果写入缓存的时候,代码执行失败,回滚操作只能回滚数据库,消息已经被消费者监听到了并做了处理了。


rocketmq如何做到分布式事务?

简单来说RocketMQ实现分布式事务的原理是: 执行到应该发送消息的时候,它并未发送,而是处于“准备发送”阶段,当所有的代码都已执行完毕且无异常时,则进行完全发送,此刻消息消费者才能监听到消息;


概念术语讲解:


  • 半消息(Half(Prepare) Message)
    • 暂时无法消费的消息。生产者将消息发送到了MQ server,但这个消息会被标记为“暂不能投递”状态,先存储起来;消费者不会去消费这条消息
  • 消息回查(Message Status Check)
    • 网络断开或生产者重启可能会导致丢失事务消息的第二次确认。当MQ Server发现消息长时间处于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)
  • 消息三态:
    • Commit:提交事务消息,消费者可以消费此消息
    • Rollback: 回滚事务消息,broker会删除该消息,消费者不能消费
    • UNKNOWN: broker需要回查确认该消息的状态

我们本文主要讲的事spring cloud stream的分布式事务,但是SpringCloud Stream 本身没有实现分布式事务,它与RocketMQ结合则是使用RocketMQ的分布式事务。它若与其他结合,则使用其他消息中间件的分布式事务。


 如何做到分布式事务?


  1. 到数据库中新增一张表,用来记录 RocketMQ的事务日志:

    执行代码:

    

create table rocketmq_transaction_log(id int auto_increment comment 'id' primary key, transaction_Id varchar(45) not null comment '事务id',log varchar(45) not null comment '日志')

 2.消息生产者编写:发送半消息:

//列举两种方式,一种是原始的rocketmqTempalte发送消息,一种是spring cloud stream发送消息一:String transactiOnId=UUID.randomUUID().toString()
this.rocketMQTemplate.sendMessageInTransaction("tx-add-bonus-group","add-bonus",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId).bonus(50).build()).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).setHeader("share_id",id).build(),auditDTO
)二:

 

3.修改配置文件(注:之前编写生产者配置文件,是不需要添加分组的,但是现在我们不使用spring cloud stream的方式,而使用原始的事务方式监听,所以需要在生产者的rocketmq配置下编写事务配置和分组,然后消费者直接监听即可 )如图

 4.消息的监听定义组名称时,一定要与生产者配置文件中的保持一致,如图所示

 5.消费者编写如下

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")@RequiredArgsConstructor(OnConstructor= @_(@Autowired))public class AddBonusTransactionListener implements RocketMQLocalTransactionListener{@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg)String transactionId(String)headers.get(RocketMQHeaders.TRANSACTION_ID);Integer shareId= Integer.valueOf((String)headers.get("share_id"))try{this.shareService.auditByIdInDB(shareId,(ShareAuditDTO) arg)return RocketMQLocalTransactionState.COMMIT;}catch(Exception e){return RocketMQLocalTransactionState.ROLLBACK;}}// 编写回查代码,当我们@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg){return null;}}

当我们执行成功,则执行RocketMQLocalTransactionState.COMMIT,失败则ROLLBACK。但是有这样一种情况,比如我们已经执行完逻辑代码,正准备COMMIT提交,此时突然停电了,导致数据已经存入,但是却没有提交成功。所以我们需要一个回查方法,checkLocalTransaction()是一个回查方法,它会去里面进行判断是否执行成功。结合我们已经建立的RocketMQ事务表,我们可以进行回查操作,代码看下方:
 

//新建一个存入方法,我们之前的存入方法,没有将事务数据加入日志表,我们可以这样改造: 当数据存入的时候,将数据存入日志表;回查方法就进行回查,如果没有存入则表示执行失败:
@Autowired
private RocketmqTransactionLogMapepr rocketmqTransactionLogMapepr;@Transactional(rollbackFor= Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId){
//异步业务代码this.auditByIdInDB(id,auditDTO);
//新增rocketmq事务id,表示已提交,可以committhis.rocketmqTransactionLogMapper.insertSelective(RocketmqTransactionLog.builder().transactionId(transactionId).log("审核分享").build());
}

消息消费者重写:

@Autowiredprivate ShareService shareService;@Autowiredpriavte RocketmqTransactionLogMapper rocketmqTransactionLogMapper;@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")@RequiredArgsConstructor(OnConstructor= @_(@Autowired))public class AddBonusTransactionListener implements RocketMQLocalTransactionListener{@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg)String transactionId(String)headers.get(RocketMQHeaders.TRANSACTION_ID);Integer shareId= Integer.valueOf((String)headers.get("share_id"))try{//上边编写的方法this.shareService.auditByIdWIthRocketMqLog(shareId,(ShareAuditDTO) arg,transactionId)return RocketMQLocalTransactionState.COMMIT;}catch(Exception e){return RocketMQLocalTransactionState.ROLLBACK;}}// 编写回查代码,当消息长时间未被消费,就会回调这个函数@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg){MessageHeaders headers= msg.getHeaders();String transactiOnId= (String) headers.get(RocketMQHeaders.TRANSACTION_ID);// 查询是否存了事务数据this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());// 判断是否提交if(transactionLog != null){return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}}


推荐阅读
  • 1.脚本功能1)自动替换jar包中的配置文件。2)自动备份老版本的Jar包3)自动判断是初次启动还是更新服务2.脚本准备进入ho ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
  • 如何搭建Java开发环境并开发WinCE项目
    本文介绍了如何搭建Java开发环境并开发WinCE项目,包括搭建开发环境的步骤和获取SDK的几种方式。同时还解答了一些关于WinCE开发的常见问题。通过阅读本文,您将了解如何使用Java进行嵌入式开发,并能够顺利开发WinCE应用程序。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • TiDB | TiDB在5A级物流企业核心系统的应用与实践
    TiDB在5A级物流企业核心系统的应用与实践前言一、业务背景科捷物流概况神州金库简介二、现状与挑战神州金库现有技术体系业务挑战应对方案三、TiDB解决方案测试迁移收益问题四、说在最 ... [详细]
  • 后台自动化测试与持续部署实践
    后台自动化测试与持续部署实践https:mp.weixin.qq.comslqwGUCKZM0AvEw_xh-7BDA后台自动化测试与持续部署实践原创 腾讯程序员 腾讯技术工程 2 ... [详细]
author-avatar
飞轶尘埃_130
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有