基于redis缓存的延迟队列,内部依赖redis的数据结构,形如hash zset,通过lua保持多命令的事务性。
内嵌在业务进程内执行, 目前支持原生的spring-data-redis和jimDB(均需要开启hashtag)。
- 约定大于配置
- 有生产必须要有消费。
- 比较适合小量级的延迟场景,快速生产、快速消费。形如:业务异常重试、小时级别内的短暂延迟等。
- 目前设定的延迟时间范围是:[1s~1h]。
- 数据安全
- 延迟数据容灾完全依赖redis备份。
- 业务进程强杀,丢失最多10条数据(如果用户没有修改默认消费线程配置的话)。
- 由于使用了hashtag,所以数据量巨大的时候,极易出现数据倾斜。
- 系统稳定
- 生产和消费内部均做异常处理,不会对主业务造成异常传递的影响。
- 消费端会占用业务线程,只要配置合理,不会造成影响。
- 生产为同步执行,消费为异步执行,使用得当不会对主业务造成影响。
模块介绍
├── README.md
├── delay-queue-redis-core 核心主流程
├── delay-queue-redis-data-spring-boot-starter 基于spring-data-redis实现的spring-boot-starter
├── delay-queue-redis-jimdb-spring-boot-starter 基于jimDB实现的spring-boot-starter
├── delay-queue-redis-spring-boot-starter-demo 样例:基于deley-queue-redis-data-spring-boot-starter实现
└── pom.xml具体样例请参考moduledelay-queue-redis-spring-boot-starter-demo
<!--暂时未发正式版本-->
<dependency>
<groupId>o2o.platform.commons</groupId>
<artifactId>delay-queue-redis-jimdb-spring-boot-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>- 引用jar包 delay-queue-redis-[jimdb|data]-spring-boot-starter.jar。
- 启动函数添加
@EnabledDelayQueue开启delay-queue-redis的功能。 - 增加application.properties相关配置,形如:生产者、消费者、topic等,大部分保持默认即可。比较重要的是
delay.queue.topics配置。 - 增加topics对应的消费者,记得把消费者的handler名称配置到
delay.queue.topics即可。 - 可以自定义异常消费,内部有默认的可重入实现。
- 需要的地方添加延迟消息发送方法。
SpringApplication启动类头部添加@EnabledDelayQueue 开启延迟队列支持
@SpringBootApplication
@EnabledDelayQueue
public class DelayQueueRedisApplication {
public static void main(String[] args) {
SpringApplication.run(DelayQueueRedisApplication.class, args);
}
}## redis相关配置,可以配置单机版redis,也可以配置集群版本。
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.timeout=5000
spring.redis.lettuce.pool.max-active=50
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.shutdown-timeout=100
## 延迟队列服务配置,除了appName,其他可以不用填写,保持系统默认配置。
delay.queue.appName=inviter-activity
delay.queue.executor.coreSize=2
delay.queue.executor.maxSize=10
delay.queue.executor.queueSize=100
delay.queue.executor.threadNamePrefix=delay-queue-consumer-worker-
delay.queue.executor.shutdownAwaitTime=10s
## 发送端相关配置,不填写,默认开启
delay.queue.p.enabled=true
## 消费端相关配置,不填写,有默认配置
delay.queue.c.enabled=true
delay.queue.c.initialDelay=3s
delay.queue.c.scanInterval=1s
delay.queue.c.shutdownAwaitTime=10s
## 抽象出来的topic主题配置,可以配置多个topic,除了 consumer-handler-name,其他均有默认配置
delay.queue.topics.inviterEventNotify.enabled=true
delay.queue.topics.inviterEventNotify.consumer-handler-name=delayQueueConsumerSkipHandler
delay.queue.topics.inviterEventNotify.consumer-exception-handler-name=delayQueueConsumerExceptionReentrantHandler
delay.queue.topics.inviterEventNotify.fetch-size=10
delay.queue.topics.inviterEventNotify.max-retry=3
delay.queue.topics.inviterEventNotify.retry-interval=5000## 延迟队列服务配置,除了appName,其他可以不用填写,保持系统默认配置。
delay.queue.appName=inviter-activity
delay.queue.executor.coreSize=2
delay.queue.executor.maxSize=10
delay.queue.executor.queueSize=100
delay.queue.executor.threadNamePrefix=delay-queue-consumer-worker-
delay.queue.executor.shutdownAwaitTime=10s
## 发送端相关配置,不填写,默认开启
delay.queue.p.enabled=true
## 消费端相关配置,不填写,有默认配置
delay.queue.c.enabled=true
delay.queue.c.initialDelay=3s
delay.queue.c.scanInterval=1s
delay.queue.c.shutdownAwaitTime=10s
## 抽象出来的topic主题配置,可以配置多个topic,除了 consumer-handler-name,其他均有默认配置
delay.queue.topics.inviterEventNotify.enabled=true
delay.queue.topics.inviterEventNotify.consumer-handler-name=delayQueueConsumerSkipHandler
delay.queue.topics.inviterEventNotify.consumer-exception-handler-name=delayQueueConsumerExceptionReentrantHandler
delay.queue.topics.inviterEventNotify.fetch-size=10
delay.queue.topics.inviterEventNotify.max-retry=3
delay.queue.topics.inviterEventNotify.retry-interval=5000 <!-- 京东公有云redis集群 -->
<bean id="jimClient" class="com.jd.jim.cli.ReloadableJimClientFactoryBean">
<property name="jimUrl" value="${mvn.jimdb.url}"/>
</bean>实现接口DelayQueueConsumerHandler,消费异常不在这个地方处理,有专门的异常处理类
public class DelayQueueConsumerSkipHandler implements DelayQueueConsumerHandler {
private static final Logger logger = LoggerFactory.getLogger(DelayQueueConsumerSkipHandler.class);
@Override
public void onMessage(DelayMessage delayMessage) {
logger.info("消息消费:跳过处理 {}", Jsons.toJson(delayMessage));
}
}默认实现类是 o2o.platform.commons.delay.queue.redis.core.handler.DelayQueueConsumerExceptionReentrantHandler
,原理是拿到失败的消息后,二次投递到topic中,默认重试次数是3次,可以通过delay.queue.topics.xxx.max-retry
自定义重试次数。
也可以实现o2o.platform.commons.delay.queue.redis.core.handler.DelayQueueConsumerExceptionHandler
,配置在delay.queue.topics.xx.consumer-exception-handler-name中。
@RestController
public class DemoController {
private final DelayMessageProducer delayMessageProducer;
public DemoController(DelayMessageProducer delayMessageProducer) {
this.delayMessageProducer = delayMessageProducer;
}
@PostMapping("/delayMessage/send")
public Object send(@RequestBody InviterEventNotifyMessage message,
@RequestParam("topic") String topic,
@RequestParam("delaySeconds") int delaySeconds) {
return delayMessageProducer.send(
new DelayMessage(topic, message, Duration.ofSeconds(delaySeconds)));
}
}- 添加监控接口,依据组件的差异在starter中编写不用的实现。
- 实现第二版,基于rocksdb实现delay-server,以独立的进程运行,实现延迟服务的存储和转发两大特性。
