热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

如何处理消息消费失败

本篇内容介绍了“如何处理消息消费失败”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何

本篇内容介绍了“如何处理消息消费失败”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、介绍

在介绍消息中间件 MQ 之前,我们先来简单的了解一下,为何要引用消息中间件。

例如,在电商平台中,常见的用户下单,会经历以下几个流程。

当用户下单时,创建完订单之后,会调用第三方支付平台,对用户的账户金额进行扣款,如果平台支付扣款成功,会将结果通知到对应的业务系统,接着业务系统会更新订单状态,同时调用仓库接口,进行减库存,通知物流进行发货!

试想一下,从订单状态更新、到扣减库存、通知物流发货都在一个方法内同步完成,假如用户支付成功、订单状态更新也成功,但是在扣减库存或者通知物流发货步骤失败了,那么就会造成一个问题,用户已经支付成功了,只是在仓库扣减库存方面失败,从而导致整个交易失败!

一单失败,老板可以假装看不见,但是如果上千个单子都因此失败,那么因系统造成的业务损失,将是巨大的,老板可能坐不住了!

因此,针对这种业务场景,架构师们引入了异步通信技术方案,从而保证服务的高可用,大体流程如下:

如何处理消息消费失败

当订单系统收到支付平台发送的扣款结果之后,会将订单消息发送到 MQ 消息中间件,同时也会更新订单状态。

在另一端,由仓库系统来异步监听订单系统发送的消息,当收到订单消息之后,再操作扣减库存、通知物流公司发货等服务!

在优化后的流程下,即使扣减库存服务失败,也不会影响用户交易。

正如《人月神话》中所说的,软件工程,没有银弹!

当引入了 MQ 消息中间件之后,同样也会带来另一个问题,假如 MQ  消息中间件突然宕机了,导致消息无法发送出去,那仓库系统就无法接受到订单消息,进而也无法发货!

针对这个问题,业界主流的解决办法是采用集群部署,一主多从模式,从而实现服务的高可用,即使一台机器突然宕机了,也依然能保证服务可用,在服务器故障期间,通过运维手段,将服务重新启动,之后服务依然能正常运行!

但是还有另一个问题,假如仓库系统已经收到订单消息了,但是业务处理异常,或者服务器异常,导致当前商品库存并没有扣减,也没有发货!

这个时候又改如何处理呢?

今天我们所要介绍的正是这种场景,假如消息消费失败,我们应该如何处理?

二、解决方案

针对消息消费失败的场景,我们一般会通过如下方式进行处理:

  • 当消息消费失败时,会对消息进行重新推送

  • 如果重试次数超过最大值,会将异常消息存储到数据库,然后人工介入排查问题,进行手工重试

如何处理消息消费失败

当消息在客户端消费失败时,我们会将异常的消息加入到一个消息重试对象中,同时设置最大重试次数,并将消息重新推送到 MQ  消息中间件里,当重试次数超过最大值时,会将异常的消息存储到 MongoDB数据库中,方便后续查询异常的信息。

基于以上系统模型,我们可以编写一个公共重试组件,话不多说,直接干!

三、代码实践

本次补偿服务采用 rabbitmq 消息中间件进行处理,其他消息中间件处理思路也类似!

3.1、创建一个消息重试实体类

@Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class MessageRetryDTO implements Serializable {      private static final long serialVersionUID = 1L;      /**      * 原始消息body      */     private String bodyMsg;      /**      * 消息来源ID      */     private String sourceId;      /**      * 消息来源描述      */     private String sourceDesc;      /**      * 交换器      */     private String exchangeName;      /**      * 路由键      */     private String routingKey;      /**      * 队列      */     private String queueName;      /**      * 状态,1:初始化,2:成功,3:失败      */     private Integer status = 1;      /**      * 最大重试次数      */     private Integer maxTryCount = 3;      /**      * 当前重试次数      */     private Integer currentRetryCount = 0;      /**      * 重试时间间隔(毫秒)      */     private Long retryIntervalTime = 0L;      /**      * 任务失败信息      */     private String errorMsg;      /**      * 创建时间      */     private Date createTime;      @Override     public String toString() {         return "MessageRetryDTO{" +                 "bodyMsg=&#39;" + bodyMsg + &#39;\&#39;&#39; +                 ", sourceId=&#39;" + sourceId + &#39;\&#39;&#39; +                 ", sourceDesc=&#39;" + sourceDesc + &#39;\&#39;&#39; +                 ", exchangeName=&#39;" + exchangeName + &#39;\&#39;&#39; +                 ", routingKey=&#39;" + routingKey + &#39;\&#39;&#39; +                 ", queueName=&#39;" + queueName + &#39;\&#39;&#39; +                 ", status=" + status +                 ", maxTryCount=" + maxTryCount +                 ", currentRetryCount=" + currentRetryCount +                 ", retryIntervalTime=" + retryIntervalTime +                 ", errorMsg=&#39;" + errorMsg + &#39;\&#39;&#39; +                 ", createTime=" + createTime +                 &#39;}&#39;;     }      /**      * 检查重试次数是否超过最大值      *      * @return      */     public boolean checkRetryCount() {         retryCountCalculate();         //检查重试次数是否超过最大值         if (this.currentRetryCount < this.maxTryCount) {             return true;         }         return false;     }      /**      * 重新计算重试次数      */     private void retryCountCalculate() {         this.currentRetryCount = this.currentRetryCount + 1;     }  }

3.2、编写服务重试抽象类

public abstract class CommonMessageRetryService {      private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);      @Autowired     private RabbitTemplate rabbitTemplate;      @Autowired     private MongoTemplate mongoTemplate;       /**      * 初始化消息      *      * @param message      */     public void initMessage(Message message) {         log.info("{} 收到消息: {},业务数据:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));         try {             //封装消息             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);             if (log.isInfoEnabled()) {                 log.info("反序列化消息:{}", messageRetryDto.toString());             }             prepareAction(messageRetryDto);         } catch (Exception e) {             log.warn("处理消息异常,错误信息:", e);         }     }      /**      * 准备执行      *      * @param retryDto      */     protected void prepareAction(MessageRetryDTO retryDto) {         try {             execute(retryDto);             doSuccessCallBack(retryDto);         } catch (Exception e) {             log.error("当前任务执行异常,业务数据:" + retryDto.toString(), e);             //执行失败,计算是否还需要继续重试             if (retryDto.checkRetryCount()) {                 if (log.isInfoEnabled()) {                     log.info("重试消息:{}", retryDto.toString());                 }                 retrySend(retryDto);             } else {                 if (log.isWarnEnabled()) {                     log.warn("当前任务重试次数已经到达最大次数,业务数据:" + retryDto.toString(), e);                 }                 doFailCallBack(retryDto.setErrorMsg(e.getMessage()));             }         }     }      /**      * 任务执行成功,回调服务(根据需要进行重写)      *      * @param messageRetryDto      */     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {         try {             successCallback(messageRetryDto);         } catch (Exception e) {             log.warn("执行成功回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());         }     }      /**      * 任务执行失败,回调服务(根据需要进行重写)      *      * @param messageRetryDto      */     private void doFailCallBack(MessageRetryDTO messageRetryDto) {         try {             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));             failCallback(messageRetryDto);         } catch (Exception e) {             log.warn("执行失败回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());         }     }      /**      * 执行任务      *      * @param messageRetryDto      */     protected abstract void execute(MessageRetryDTO messageRetryDto);      /**      * 成功回调      *      * @param messageRetryDto      */     protected abstract void successCallback(MessageRetryDTO messageRetryDto);      /**      * 失败回调      *      * @param messageRetryDto      */     protected abstract void failCallback(MessageRetryDTO messageRetryDto);      /**      * 构建消息补偿实体      * @param message      * @return      */     private MessageRetryDTO buildMessageRetryInfo(Message message){         //如果头部包含补偿消息实体,直接返回         Map messageHeaders = message.getMessageProperties().getHeaders();         if(messageHeaders.containsKey("message_retry_info")){             Object retryMsg = messageHeaders.get("message_retry_info");             if(Objects.nonNull(retryMsg)){                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);             }         }         //自动将业务消息加入补偿实体         MessageRetryDTO messageRetryDto = new MessageRetryDTO();         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());         messageRetryDto.setCreateTime(new Date());         return messageRetryDto;     }      /**      * 异常消息重新入库      * @param retryDto      */     private void retrySend(MessageRetryDTO retryDto){         //将补偿消息实体放入头部,原始消息内容保持不变         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);     }        /**      * 将异常消息存储到mongodb中      * @param retryDto      */     private void saveMessageRetryInfo(MessageRetryDTO retryDto){         try {             mongoTemplate.save(retryDto, "message_retry_info");         } catch (Exception e){             log.error("将异常消息存储到mongodb失败,消息数据:" + retryDto.toString(), e);         }     } }

3.3、编写监听服务类

在消费端应用的时候,也非常简单,例如,针对扣减库存操作,我们可以通过如下方式进行处理!

@Component public class OrderServiceListener extends CommonMessageRetryService {      private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);      /**      * 监听订单系统下单成功消息      * @param message      */     @RabbitListener(queues = "mq.order.add")     public void consume(Message message) {         log.info("收到订单下单成功消息: {}", message.toString());         super.initMessage(message);     }       @Override     protected void execute(MessageRetryDTO messageRetryDto) {         //调用扣减库存服务,将业务异常抛出来     }      @Override     protected void successCallback(MessageRetryDTO messageRetryDto) {         //业务处理成功,回调     }      @Override     protected void failCallback(MessageRetryDTO messageRetryDto) {         //业务处理失败,回调     } }

当消息消费失败,并超过最大次数时,会将消息存储到 mongodb 中,然后像常规数据库操作一样,可以通过 web  接口查询异常消息,并针对具体场景进行重试!

“如何处理消息消费失败”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程笔记网站,小编将为大家输出更多高质量的实用文章!


推荐阅读
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文详细介绍了PHP中与URL处理相关的三个函数:http_build_query、parse_str和查询字符串的解析。通过示例和语法说明,讲解了这些函数的使用方法和作用,帮助读者更好地理解和应用。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
author-avatar
alxg
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有