深入使用 Axon
实现 snapshot
在前面文章中,我们略微涉及了一些 snapshot 的概念,其实这个概念还是比较好理解的,当事件堆积到一定的程度,每次 load 都会花费一定的时间,这个时候自然会想到 snapshot,先将一部分事件进行计算,然后生成一个 snapshot,后续 load 的时候先读取 snapshot,这样就省去了很多计算过程。如果你读过之前改写 event store 部分的代码,就会发现每个 aggregate 实际上只会存储一个 snapshot,每当新的生成的时候会替换老的。对一个 aggregate 来说,snapshot 同样和事件差不多,它都可以当做事件来处理,但是对于 aggregate 来说,我并不想去处理 snapshot,我只是需要一个计算好的结果而已。基于这个,Axon 给我们提供了 AggregateSnapshotter
,这类 snapshot 可以直接还原 aggregate 状态。
了解大概的原理之后,我们就要做的事情其实比较明确了:
- 告诉 Axon 我们要在什么时候触发 snapshot 的生成。
- 告诉 Axon 我们要生成什么样的 snapshot。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository(CustomEmbeddedEventStore eventStore,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory) {
return CustomEventSourcingRepository.builder(ContractAggregate.class)
.eventStore(eventStore)
.snapshotTriggerDefinition(snapshotTriggerDefinition)
.parameterResolverFactory(parameterResolverFactory)
.build();
}
public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) {
return new EventCountSnapshotTriggerDefinition(snapshotter, 5);
}
public AggregateSnapshotter snapShotter(CustomEmbeddedEventStore eventStore, ParameterResolverFactory parameterResolverFactory) {
return AggregateSnapshotter.builder()
.eventStore(eventStore)
.parameterResolverFactory(parameterResolverFactory)
.aggregateFactories(Collections.singletonList(new GenericAggregateFactory<>(ContractAggregate.class)))
.build();
}
SnapshotTriggerDefinition
是用来定义 snapshot 策略的,这里使用了 Axon 提供的策略,是基于 event count 的。AggregateSnapshotter
告诉 Axon 我们需要生成这类 snapshot,其实还有一类 snapshot 叫SpringAggregateSnapshotter
,如果我们使用的是系统自带的EventSourcingRepository
,那么可以直接使用这类 snapshot,实质上看代码,它就是取找了EventSourcingRepository
这个 bean 的 AggregateFactory。- Repository 初始化的时候指定
snapshotTriggerDefinition
,那么在执行 load 方法的时候,就会去触发了。
启动项目,PUT 某个 aggregate 5次之后,你就会发现snapshot_event_entry
这张表里多了记录,snapshot 就生成了。
实现 upcaster
upcaster 的概念也是比较好理解的,比如 create 事件,后期加入了一个新的字段,而老的事件没这个字段,这个时候就需要一个升级过程,把老的事件升级成新的事件,大部分的升级都是单个事件内的增减变动字段,也有比较复杂的情况,一个事件变多个,或者多个变一个等等。在 Axon 中把这个过程称为 upcaster,对应的 interface 是 Upcaster
,里面只有一个方法Stream<T> upcast(Stream<T> intermediateRepresentations);
看这个方法,理论上来说可以实现 N 对 N 的转换,Axon 提供了 one to one 和 one to many 的实现,这两种转换应该可以满足 90% 以上的需求了,我们在开发过程中本身也应该对删除事件谨慎一点,一般不会去删除一个事件,可以在消费的时候忽略就行了,而不是直接删除这个事件。
那么要进行升级,事件本身必须要有个版本的概念,Axon 提供了@Revision
注解用以标明事件的版本,如果没有标注则为 null,由于之前的例子都是没有 version 的,那么我们往 contract 里面添加个字段 industryName
。更新event、command、aggregate、model、view,然后给 AbstractEvent
加上@Reivision("1.0.0")
注解,启动工程并发送一个 GET contracts/{id}
请求原来已经建立的数据,这时候会发现 industryName
为 null
,下面我们实现默认行业为 “互联网”。
- 由于大部分的升级都是针对同一个事件的增减字段,这里建立了
SameEventUpCaster
:
1 | public abstract class SameEventUpCaster extends SingleEventUpcaster { |
- 建立一个处理事件转换的中心,目的是将转换操作分发下去:
1 | public class ContractEventUpCaster extends SingleEventUpcaster { |
- 实现各个事件的具体升级方式:
1 | public class ContractCreatedEventUpCaster extends SameEventUpCaster { |
- 添加配置:
1 |
|
启动项目,再次请求 /contracts/{id}
,数据已经更新了。但是有个问题,那就是 view 视图的数据还没有更新,这部分的数据还是需要编写脚本去做升级的,暂时没有什么更好的办法。
Command 优化
- 自定义
CommandGateway
Axon 提供的CommandGateway
接口,我们看到任意的 object 都能发送,并且不能附带 MetaData,所以这里我们进行一个自定义:
1 | public interface ContractCommandGateway { |
- 增加重试机制,由于分布式的问题,event 在存储的时候还是会发生资源争夺,在 InnoDB 下的表现就是 A 节点存储了 seq 为 1 的 event,B 节点在 A 存储前内存中先读取到了 0 ,然后进行存储,这个时候就会有重复的风险,比较好的做法是将同一个 aggregate 的操作尽量分配到一个节点下面去处理,但是事实上完全避免是不太可能的,所以了这里我们需要一个重试机制去做补偿,Axon 也提供了这个机制,代码如下:
1 | 4j |
- 截获 command 做一些事情,比如从 Security Context 中获取 user 信息并作为 MetaData 发送,或者为 CreateCommand 类型的 command 自动生成一个 ID,而不用我们手动去构建:
1 | public interface MetaDataUserInterface { |
这里为了不涉及 Spring Security,我直接 new 了一个测试对象进去,大家可以取自己的 user。
- 最后添加配置代码:
1 |
|
- 最后替换掉 command gateway 的调用:
1 | private final ContractCommandGateway contractCommandGateway; |
这里大家可能注意到,我将原来 async 的发送都改成了 sync 的,因为 async 的调用,如果过程异常,其实这个请求并不会丢出异常,sendCommandAndWaitForAResult
command 可以有返回值,这个在官方文档中也有介绍,就不做过多的介绍。