Spring Cloud Stream 优化
有哪些问题
Spring Cloud Stream
(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:
StreamListener
用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。- 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。
- 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。
解决方案
为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:
这样以上几点问题都会得到解决,下面我们来看看具体如何实现:
- 首先定义一个注解用于接受自己分发的事件:
1 |
|
types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated
,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。
- 定义用于记录 aggregate sequenceNumber 的 entity 和 repository :
1 |
|
- 由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:
1 | 4j |
这里参照了 SCS 本身手机 handler的方式,会将有 @StreamEventHandler
注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。
- 实现一个比较简单的例子:
1 | 4j |
注意:
AbstractDomainEventDispatcher
中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional
会失效,这个有兴趣的同学可以研究一下@Transactional
的机制。
至此以上几点就优化完了。
其他优化
错误处理
基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:
- 拒绝这个消息,丢在 broker 原先的队列里。
- 将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。
方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:
- 首先让 SCS 为我们自动生成一个 DLQ
1 | spring: |
加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName
指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。
- 在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。
1 |
|
由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。
完善之前的 CQRS 例子
经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。
- 添加时间的监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16(types = ChannelDefinition.CONTRACTS_INPUT)
public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {
QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());
ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();
ContractView view = new ContractView();
view.setIndustryName(aggregate.getIndustryName());
view.setId(aggregate.getIdentifier());
view.setPartyB(aggregate.getPartyB());
view.setPartyA(aggregate.getPartyA());
view.setName(aggregate.getName());
view.setDeleted(aggregate.isDeleted());
contractViewRepository.save(view);
}
StreamDomainEventDispatcher
对传参做了一些处理,当有两个参数的时候会将 DomainEvent
传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。
- 删除原来的
ContractViewHandler
即可。