热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMq的最终一致性分布式事务

RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5.


RabbitMq的最终一致性分布式事务

    • 使用rabbitmq的步骤
      • 1.运行安装在服务器上的rabbit服务
      • 2.在项目中安装依赖
      • 3.编写对应的配置文件
      • 4.创建对应配置并加上启动注解
      • 5.创建message表记录发送次数及信息
      • 6.发送请求时并创建message信息
      • 7.创建spring Task定时器并定时输出rabbitmq信息
      • 8.创建消息确定方法,确认接受方收到的了消息并进行了处理
      • 9.消息接受者创建消息重复表进行消息去重
      • 10.接受方微服务创建监听器监听rabbitmq信息


使用rabbitmq的步骤

在这里插入图片描述


1.运行安装在服务器上的rabbit服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]

或者在docker上运行

# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v rabbitmq-5672-data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v ~/docker/5672/data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

2.在项目中安装依赖

<dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbit-testartifactId><scope>testscope>
dependency>

3.编写对应的配置文件

## 连接rabbitmq服务器
spring.rabbitmq.host&#61;192.168.12.12
spring.rabbitmq.port&#61;5672
spring.rabbitmq.username&#61;guest
spring.rabbitmq.password&#61;guest
spring.rabbitmq.virtual-host&#61;hl## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode&#61;manual
spring.rabbitmq.listener.direct.acknowledge-mode&#61;manual## 确认消息已发送到交换机&#xff08; Exchange &#xff09;
spring.rabbitmq.publisher-confirm-type&#61;CORRELATED# 确认消息已发送到队列&#xff08;Queue&#xff09;
spring.rabbitmq.publisher-returns&#61;true

4.创建对应配置并加上启动注解

&#64;Configuration
&#64;EnableRabbit
&#64;Slf4j
&#64;Transactional
public class RabbitConfig {&#64;Resourceprivate MessageDao messageDao;public static final String EMPLOYEE_LIST &#61; "employee-list";public static final String DEPARTMENT_DELETE &#61; "department-delete";&#64;Beanpublic Queue DepartmentDelete(){return new Queue(DEPARTMENT_DELETE);}&#64;Beanpublic Queue employeeList(){return new Queue(EMPLOYEE_LIST);}&#64;Bean("rabbitTemplate")public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate &#61; new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {//每次发送队列信息将触发此方法,需要添加配置属性System.out.println(correlationData.getId());Message message &#61; messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));if (ack){message.setStatus("B");}message.setRetryCount(message.getRetryCount()-1);log.info("剩余消息数:"&#43;message.getRetryCount());messageDao.save(message);}});// rabbitTemplate.setMandatory(true);
//
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// &#64;Override
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// log.info("ReturnCallback 消息&#xff1a;{}", message);
// log.info("ReturnCallback 回应码&#xff1a;{}", replyCode);
// log.info("ReturnCallback 回应信息&#xff1a;{}", replyText);
// log.info("ReturnCallback 交换机&#xff1a;{}", exchange);
// log.info("ReturnCallback 路由键&#xff1a;{}", routingKey);
// }
// });return rabbitTemplate;}}

5.创建message表记录发送次数及信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]

drop table if exists message;
create table message
(id bigint auto_increment,exchange varchar(64) ,routing_key varchar(64) not null,content varchar(128) not null,retry_count int not null,status varchar(32) not null,primary key (id)
);

创建对应的DAO类和实体类


6.发送请求时并创建message信息

public void deleteById(Long id) {departmentDao.deleteById(id);Message message &#61; new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id&#43;"", 5, "A");messageDao.save(message);
}

7.创建spring Task定时器并定时输出rabbitmq信息

&#64;Component
&#64;Slf4j
&#64;Transactional
&#64;EnableScheduling
public class RabbitTimer {&#64;Resourceprivate MessageMysqlDao messageDao;&#64;Resourceprivate RabbitTemplate rabbitTemplate;&#64;Scheduled(fixedDelay &#61; 6000)private void process(){//获取状态不等于C和次数大于0的信息QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.ne("status", "C");wrapper.gt("retry_count", 0);List<Message> messageList &#61; messageDao.selectList(wrapper);if (messageList.size()&#61;&#61;0){log.info("暂无消息发送,请等待...");}else {//进行信息发送for (Message message : messageList) {String content &#61; message.getId()&#43;":"&#43;message.getContent();CorrelationData correlationData &#61; new CorrelationData(message.getId()&#43;"");if (message.getExchange()&#61;&#61;null) {rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);}else{rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);}log.info("消息 {} 已发送",content);}}}
}

8.创建消息确定方法,确认接受方收到的了消息并进行了处理

&#64;RestController
&#64;RequestMapping("/message")
&#64;RequiredArgsConstructor
public class MessageController implements IAMessageController{&#64;Resourceprivate MessageMysqlDao messageMysqlDao;&#64;PostMapping("/update/{id}")public String messageUpdate(&#64;PathVariable("id")Long id){QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", id);Message message &#61; new Message();message.setStatus("C");messageMysqlDao.update(message,wrapper);return "success";}}

9.消息接受者创建消息重复表进行消息去重

在这里插入图片描述

drop table if exists recived_message;
create table recived_message
(id bigint auto_increment,recived_at datetime
);

10.接受方微服务创建监听器监听rabbitmq信息

消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息

&#64;Configuration
&#64;RequiredArgsConstructor
&#64;Transactional
public class HarvestResultLister {private final HarvestPlanMysqlDao harvestPlanMysqlDao;private final ReceivedMessageMysqlDao receivedMessageMysqlDao;private final HarvestCheckClient harvestCheckClient;&#64;RabbitListener(queues &#61; RabbitConfig.HARVEST_CHECK)public void harvestUpdateByCheck(String msg, Channel channel,&#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {System.out.println(msg);String[] split &#61; msg.split(":");if (split.length !&#61; 3) {throw new RabbitDataError("发送的数据异常");}String mesId &#61; split[0];//获取发送内容idString contentId &#61; split[1];//获取被修改的采收idString harvestId &#61; split[2];ReceivedMessage receivedMessage &#61; receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));if (receivedMessage !&#61; null){throw new RabbitDataError("发送重复数据");}//存入数据receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));QueryWrapper<HarvestPlan> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", Long.parseLong(harvestId));HarvestPlan harvestPlan &#61; new HarvestPlan();harvestPlan.setPurchaseStatusId(3L);String result &#61; harvestCheckClient.messageUpdate(Long.parseLong(mesId));if (!"success".equals(result)){throw new RabbitDataError("确认消息未正常传回");}} catch (Exception e) {e.printStackTrace();throw e;}finally {try {channel.basicAck(tag, false);} catch (IOException e) {e.printStackTrace();}}}
}

推荐阅读
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • 这也太简单了!轻松操作Feign 服务调用使用 Zipkin 链路追踪!
    0、介绍分布式微服务时代,方便了业务的快速增长和服务的稳定,但是系统出现问题后,面对同业务多服务排查起来令人头大。这时候领导就想着集成分布式追踪系统。Zipkin是T ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • 都说Python处理速度慢,为何月活7亿的 Instagram依然在使用Python?
    点击“Python编程与实战”,选择“置顶公众号”第一时间获取Python技术干货!来自|简书作者|我爱学python链接|https:www.jian ... [详细]
  • [我们是谁?] ... [详细]
author-avatar
耗子很傻爱钻洞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有