前言
在实际的业务开发中,经常会碰到一些类似定时的任务,比如每天早上 9 点提醒用户业务进度,或者 30 分钟后锁定某个账单之类的操作。类似这样的业务需求,比较古老的操作就是使用各种定时器来实现,然而面对现在的开发更新速度,以及容器化、微服务架构的流行和普及,原本类似 Crontab
这种简单的东西使用起来会有各种限制,比如在最简单的情况你可以使用 Spring
的 @Scheduled
注解来轻松的实现一些定时需求,但现在你可能就需要考虑多如何将定时任务分配给多个节点,某个时间重新构建容器会不会丢失一些定时任务等等一些问题。这种情况延迟消息能解决掉我们大部分的需求,大概的流程就是丢一个消息给中间件,告诉它什么时候投递出去,投递后再由消费端消费(负责去通知/发邮件/发短信)。由于我们公司一直使用的 Spring Cloud
全家桶,RabbitMQ
也已经使用很多年,这里就讲讲如何在Spring Cloud Stream RabbitMQ
下以最小的成本实现延时消息,所以在阅读文章时可能需要有 Spring Cloud Stream
相关的基础知识。
实现
概述
实际上 RabbitMQ
的延迟消息是通过 rabbitmq-delayed-message-exchange
这个插件实现的,开启了这个插件之后,只要在 exchange
上定义 x-delayed-message
类型,然后在发送消息的时候 header
带上 x-delay
(毫秒数),就可以实现延迟消息。我们可以利用这一特性,来实现消息的延迟/定时发送。
理想的模型是这个样子的,但实际在应用中难免会有一些出入,在文末问题区会做一些相关的讨论。
RabbieMQ 插件安装
由于我们的 RabbitMQ 服务器都是跑在 Docker
容器里,所以这里就相对简单,新建一个 Dockerfile:
1 | FROM rabbitmq:3.7.8-management-alpine |
如果你使用的是 3.8.x 版本,注意修改对应的版本。另外要是不跑在容器当中,RabbitMQ
官网也有说明,这里就不再赘述。
Sprint Cloud Stream 优化
stream
配置有以下几个问题:
- 需要在配置中编写
destination
等配置,然而这些配置往往都是模板化的 - 配置完毕后,代码中还需要将 channel 和 bean 的名称对应上,属于 hard code,如果对不上,或者某一边忘记删除,都会出错
实际上在我们整个微服务的使用来看,大部分的配置内容都是一样的,我们先来看一下比较常见的模板:
1 | xxx-events: # 这里是 input |
如果一个 service 以领域事件向外输出消息,那么 RabbitMQ
的模板长成这个样子是比较常见的,统一由 yyy-events 发出领域事件,通过 xxx-events(各种领域)来接收领域事件。这样实际我们只需要根据服务的名称去生成对应的配置即可,如果你自己的业务有所不同,则需要根据自己的使用方式来生成配置。
下面给出实现上述配置的代码:
1 | @Configuration |
延迟消息的实现
这里我以实现一个小 Demo 来展示所要实现的东西,这个 Demo 实现发送延迟消息,并自我接收后消费消息。为了节省篇幅,这里略去 Demo 工程的建立以及配置过程,文末将附上地址。
- 建立输入输出的
interface
:
1 | public interface BindingChannelOutput { |
注意这里和一般的 Stream
例子 有一些不一样, 用到了我们之前优化的注解,isDelayedExchange
可以指定为 delay exchange
。
- 再建立一个
publisher
用于发送消息,以及一个Listener
用于接收消息
1 | @Component |
启动工程后在 RabbitMQ
的控制台应该可以看到两个 queue
,一个是 demo-events.demo-service
另一个是 demo-service-error.dlq
,这说明我们的配置优化生效了。
- 写一个接口来测试延迟消息
1 | @RestController |
在控制台可以看到打印时间相差 20s
1 | 2020-03-10 17:17:40.691 INFO 77487 --- [nio-8080-exec-5] com.example.demo.Publisher : pushing message ["测试"] to remote |
至此延迟消息的实现就完成了,具体到业务的话,都是在消费端进行,比如做一些邮件/短信发送或者通知之类的。
问题&思考
发出去的消息不可查询
当我们的业务服务向 RabbitMQ
发送消息后,RabbitMQ
在没有到时间的时候是看不到的,那么要保证可靠,就需要在业务服务中记录是否投递到 RabbitMQ
,可以参考可靠消息的投递,在本地做记录。
延迟时间的限制
文档中提到过延迟的最大毫秒数是 2^32-1 ,大概是 49 天多一点,不过一般也不会往 RabbitMQ
堆这么长时间的消息。一般如果是 1-2 天内需要推送/发通知的业务,我们会直接丢给 RabbitMQ
,剩下的比如定了一个一年后的提醒之类的业务,都采用在夜间流量少的时候去查一下第二天需要发送的内容,然后丢到 RabbitMQ
来完成。所以定时之类的操作根据业务的具体情况可能还会存在,但是会大大减少。
禁用插件后所有的未发布的消息会丢失
这个其实还好,一般不会去做这样的操作。
delay exchange 的路由类型
路由类型还是取决于 exchange 本身的类型,由于 Spring Cloud Stream
默认都是 topic 类型,所以这里保持一致。