利用 Redission 实现延迟队列
1. Redis 实现延迟队列的方案
基于 Redis 实现延时任务的功能无非就下面两种方案:
Redis 过期事件监听
Redisson 内置的延时队列
这里选用的是用 Redission 内置的延迟队列,所以实现的着重点放在 Redission
2. Redis 过期事件监听实现延时任务
Redis 2.0 引入了 发布订阅(Pub/Sub) 功能。在 Pub/Sub 模型中,引入了一个名为 channel(频道) 的概念,类似于消息队列中的 topic(主题)。
Pub/Sub 涉及两个主要角色:发布者(Publisher) 和 订阅者(Subscriber,也称为消费者):
发布者 通过 PUBLISH 命令将消息发送到指定的 channel。
订阅者 通过 SUBSCRIBE 命令订阅感兴趣的 channel,并且可以同时订阅一个或多个 channel。
在 Pub/Sub 模式 中,生产者需要指定将消息发送到哪个 channel,而消费者通过订阅对应的 channel 来获取消息。Redis 内部也存在一些默认的 channel,这些通道用于 Redis 自身发送消息,而非用户代码生成。只需监听这些 channel,即可获取与 过期 key 相关的通知,从而实现延时任务的功能。
这一特性被 Redis 官方称为 Keyspace Notifications,其主要作用是 实时监控 Redis 中键和值的变化。通过它,开发者能够及时捕捉键的变化(如过期、删除等事件),从而执行相应的处理逻辑
3. Redis 过期事件监听实现延时任务功能有什么缺陷
3.1. 时效性差
Redis 中的 过期事件消息 只有在 Redis 服务器真正删除 key 时才会发布,而不是在 key 到达过期时间后立即发布。
常见的过期数据删除策略有两种:
惰性删除:仅当访问 key 时,才会检查其是否过期。这种方式对 CPU 友好,但可能导致大量过期 key 未及时删除,继续占用内存。
定期删除:Redis 会定期抽取一部分 key,检查并删除过期的 key。为了减少删除操作对 CPU 的影响,Redis 会限制删除操作的执行时长和频率。虽然定期删除更有利于释放内存,但也可能增加 CPU 负载。
Redis 结合了这两种策略,采用 定期删除 加 惰性删除 的方式。定期删除保证了内存的回收,而惰性删除则在取用时保证 CPU 性能。
因此,可能会出现这样一种情况:虽然设置了 key 的过期时间,但当该时间到达时,key 可能尚未被删除,导致 过期事件 未及时发布。
3.2. 丢消息
Redis 的 pub/sub 模式中的消息并不支持持久化,这与消息队列不同。在 Redis 的 pub/sub 模式中,发布者将消息发送给指定的频道,订阅者监听相应的频道以接收消息。当没有订阅者时,消息会被直接丢弃,在 Redis 中不会存储该消息。
3.3. 多服务实例下存在消息重复消息的问题
Redis 的 pub/sub 模式目前只有广播模式,这意味着当生产者向特定频道发布一条消息时,所有订阅相关频道的消费者都能够收到该消息。
这个时候,我们需要注意多个服务实例重复处理消息的问题,这会增加代码开发量和维护难度。
4. 为什么选用Redission作为延迟队列
Redisson 是一个开源的 Java Redis 客户端,提供了许多开箱即用的功能,包括多种分布式锁的实现和延迟队列。Redisson 内置的延迟队列 RDelayedQueue 利用 Redis 的 SortedSet 实现延时任务功能。
SortedSet 是一个有序集合,每个元素都有一个分数,代表其优先级或时间权重。Redisson 通过将需要延迟执行的任务插入到 SortedSet 中,并为它们设置相应的过期时间作为分数来实现延迟队列。
Redisson 在客户端启动一个定时任务,当时间到达时,它使用 zrangebyscore 命令扫描 SortedSet 中已过期的元素(即分数小于或等于当前时间的元素)。这些过期元素会被从 SortedSet 中移除,并加入到就绪消息列表(List 结构)中。
当任务被移到就绪消息列表时,Redisson 通常还会通过 Redis 的发布/订阅机制(Pub/Sub)通知消费者有新任务到达。就绪消息列表是一个阻塞队列,消费者可以使用阻塞操作(如 BLPOP key 0,其中0表示无限等待)来监听。由于 Redis 的 Pub/Sub 机制是事件驱动的,它避免了轮询开销,只有在有新消息时才会触发处理逻辑。
需要注意的是,Redisson 的定时任务调度器并不是以固定时间间隔频繁调用 zrangebyscore 命令进行扫描,而是根据 SortedSet 中最近的到期时间动态调整下一次检查的时间点。
相比于使用 Redis 过期事件监听实现延时任务,Redisson 延迟队列具有以下优势:
减少丢失消息的可能性:RDelayedQueue 中的消息会被持久化,即使 Redis 宕机,根据持久化机制,可能仅丢失少量消息,影响不大。此外,还可以使用数据库扫描作为补偿机制。
避免消息重复消费:所有客户端从同一个目标队列获取任务,避免了重复消费的问题。
5. 使用 Redis 实现延时任务有什么注意的地方?
在任务时间跨度较大且任务数量众多的场景中,需要特别注意内存管理。大量任务可能会导致内存占用过高,而长时间保存任务则会造成资源浪费。为了解决这些问题,可以结合使用 MySQL 和 Redis 来优化任务管理:
短期任务:对于延迟时间较短的任务(例如几分钟到几个小时内执行的任务),可以继续存储在 Redis 中,以便快速访问和处理。
长期任务:对于延迟时间较长的任务(例如几天或几周后执行的任务),则可以存储在 MySQL 中。通过这种方式,可以有效减少 Redis 的内存占用。
定期扫描:使用定时任务(例如 XXL-JOB 或 Spring Task)定期扫描 MySQL 中即将到期的任务(例如未来 2 小时内到期的任务),并将这些任务推送到 Redis 中进行处理。这种做法可以确保任务在适当的时候被加载到内存中。
优化查询:在定期扫描 MySQL 时,可能需要处理大量数据。为提高查询效率,可以使用索引或进行分库分表等优化措施。
将 Redis 和 MySQL 结合使用的优势
节省缓存资源:通过将长期任务存储在 MySQL 中,避免了在 Redis 中存储大量长期任务导致的内存浪费。
可靠性和成本:MySQL 提供的事务机制可以保证任务数据的可靠性,同时存储成本也相对较低。
避免大 key 问题:如果仅使用一个 RDelayedQueue,任务数量过大会产生大 key 问题。可以通过将任务按某种逻辑(如时间段、任务类型)分片存储到多个 RDelayedQueue 中来避免这一问题。
private RBlockingQueue<String> blockingQueue = null;
private RDelayedQueue<String> delayedQueue = null;
@PostConstruct
public void init() {
blockingQueue = redisson.getBlockingQueue("DelayedOrderNotifierMessageQueue");
delayedQueue = redisson.getDelayedQueue(blockingQueue);
orderSharingNotifierExecutor.execute(this::loopConsumeDelayedMsg);
}