热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:一篇详解Redis延时队列

Redis的list数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队,使用lpop/rpop来操作出队

Redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队

技术图片

> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)


空队列


  1. 如果队列为空,客户端会陷入 pop的死循环 , 空轮询 不仅拉高了 客户端的CPU , Redis的QPS 也会被拉高

  2. 如果空轮询的客户端有几十个, Redis的慢查询 也会显著增加,可以尝试让客户端线程 sleep 1s

  3. 但睡眠会导致消息的 延迟增大 ,可以使用 blpop/brpop (blocking, 阻塞读 )

  4. 阻塞读在队列没有数据时,会立即进入 休眠 状态,一旦有数据到来,会立即被 唤醒 , 消息延迟几乎为0


空闲连接


  1. 如果线程一直阻塞在那里,Redis的客户端连接就成了 闲置连接

  2. 闲置过久, 服务器 一般会 主动断开 连接, 减少闲置的资源占用 ,此时 blpop/brpop 会 抛出异常


锁冲突处理


  1. 分布式锁 加锁失败 的处理策略

  2. 直接抛出异常 ,通知用户稍后重试

  3. sleep 后再重试

  4. 将请求转移到 延时队列 ,过一会重试

  5. 抛出异常

  6. 这种方式比较适合由 用户直接发起 的请求

  7. sleep

  8. sleep会 阻塞 当前的消息处理线程,从而导致队列的后续消息处理出现 延迟

  9. 如果 碰撞比较频繁 ,sleep方案不合适

  10. 延时队列

  11. 比较适合异步消息处理的场景,通过将当前冲突的请求转移到另一个队列 延后处理 来 避免冲突


延时队列


  1. 可以通过Redis的 zset 来实现延时队列

  2. 将消息序列化成一个字符串作为zet的 value ,将该消息的 到期处理时间 作为 score

  3. 然后 多线程轮询 zset获取 到期的任务 进行处理

  4. 多线程是为了保障 可用性 ,但同时要考虑 并发安全 ,确保 任务不能被多次执行


public class RedisDelayingQueue {
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class TaskItem {
private String id;
private T msg;
}
private Type taskType = new TypeReference>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem task = new TaskItem<>(UUID.randomUUID().toString(), msg);
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
}
public void loop() {
// 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费
while (!Thread.interrupted()) {
// 只取一条
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) {
// zrem是多线程多进程争夺任务的关键
TaskItem task = JSON.parseObject(s, taskType);
this.handleMsg(task.msg);
}
}
}
private void handleMsg(T msg) {
try {
System.out.println(msg);
} catch (Throwable ignored) {
// 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出
}
}
public static void main(String[] args) {
final RedisDelayingQueue queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 0; i <10; i++) {
queue.delay("zhongmingmao" + i);
}
}
};
Thread cOnsumer= new Thread() {
@Override
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException ignored) {
}
}
}

推荐阅读
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 旁路|发生_Day749.旁路缓存:Redis是如何工作的Redis 核心技术与实战
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Day749.旁路缓存:Redis是如何工作的-Redis核心技术与实战相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • Android JSON基础,音视频开发进阶指南目录
    Array里面的对象数据是有序的,json字符串最外层是方括号的,方括号:[]解析jsonArray代码try{json字符串最外层是 ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文介绍了一个React Native新手在尝试将数据发布到服务器时遇到的问题,以及他的React Native代码和服务器端代码。他使用fetch方法将数据发送到服务器,但无法在服务器端读取/获取发布的数据。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
author-avatar
幸运幸福一家人1314_332_887
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有