Event Sourcing 和 CQRS落地(五):深入使用-Axon

深入使用 Axon

实现 snapshot

在前面文章中,我们略微涉及了一些 snapshot 的概念,其实这个概念还是比较好理解的,当事件堆积到一定的程度,每次 load 都会花费一定的时间,这个时候自然会想到 snapshot,先将一部分事件进行计算,然后生成一个 snapshot,后续 load 的时候先读取 snapshot,这样就省去了很多计算过程。如果你读过之前改写 event store 部分的代码,就会发现每个 aggregate 实际上只会存储一个 snapshot,每当新的生成的时候会替换老的。对一个 aggregate 来说,snapshot 同样和事件差不多,它都可以当做事件来处理,但是对于 aggregate 来说,我并不想去处理 snapshot,我只是需要一个计算好的结果而已。基于这个,Axon 给我们提供了 AggregateSnapshotter,这类 snapshot 可以直接还原 aggregate 状态。

了解大概的原理之后,我们就要做的事情其实比较明确了:

  1. 告诉 Axon 我们要在什么时候触发 snapshot 的生成。
  2. 告诉 Axon 我们要生成什么样的 snapshot。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Bean
    public CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository(CustomEmbeddedEventStore eventStore,
    SnapshotTriggerDefinition snapshotTriggerDefinition,
    ParameterResolverFactory parameterResolverFactory) {
    return CustomEventSourcingRepository.builder(ContractAggregate.class)
    .eventStore(eventStore)
    .snapshotTriggerDefinition(snapshotTriggerDefinition)
    .parameterResolverFactory(parameterResolverFactory)
    .build();
    }

    @Bean
    public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) {
    return new EventCountSnapshotTriggerDefinition(snapshotter, 5);
    }

    @Bean
    public AggregateSnapshotter snapShotter(CustomEmbeddedEventStore eventStore, ParameterResolverFactory parameterResolverFactory) {
    return AggregateSnapshotter.builder()
    .eventStore(eventStore)
    .parameterResolverFactory(parameterResolverFactory)
    .aggregateFactories(Collections.singletonList(new GenericAggregateFactory<>(ContractAggregate.class)))
    .build();
    }
  • SnapshotTriggerDefinition是用来定义 snapshot 策略的,这里使用了 Axon 提供的策略,是基于 event count 的。
  • AggregateSnapshotter告诉 Axon 我们需要生成这类 snapshot,其实还有一类 snapshot 叫SpringAggregateSnapshotter,如果我们使用的是系统自带的EventSourcingRepository,那么可以直接使用这类 snapshot,实质上看代码,它就是取找了EventSourcingRepository这个 bean 的 AggregateFactory。
  • Repository 初始化的时候指定snapshotTriggerDefinition,那么在执行 load 方法的时候,就会去触发了。

启动项目,PUT 某个 aggregate 5次之后,你就会发现snapshot_event_entry这张表里多了记录,snapshot 就生成了。

实现 upcaster

upcaster 的概念也是比较好理解的,比如 create 事件,后期加入了一个新的字段,而老的事件没这个字段,这个时候就需要一个升级过程,把老的事件升级成新的事件,大部分的升级都是单个事件内的增减变动字段,也有比较复杂的情况,一个事件变多个,或者多个变一个等等。在 Axon 中把这个过程称为 upcaster,对应的 interface 是 Upcaster,里面只有一个方法Stream<T> upcast(Stream<T> intermediateRepresentations);看这个方法,理论上来说可以实现 N 对 N 的转换,Axon 提供了 one to one 和 one to many 的实现,这两种转换应该可以满足 90% 以上的需求了,我们在开发过程中本身也应该对删除事件谨慎一点,一般不会去删除一个事件,可以在消费的时候忽略就行了,而不是直接删除这个事件。

那么要进行升级,事件本身必须要有个版本的概念,Axon 提供了@Revision注解用以标明事件的版本,如果没有标注则为 null,由于之前的例子都是没有 version 的,那么我们往 contract 里面添加个字段 industryName。更新event、command、aggregate、model、view,然后给 AbstractEvent加上@Reivision("1.0.0")注解,启动工程并发送一个 GET contracts/{id} 请求原来已经建立的数据,这时候会发现 industryNamenull,下面我们实现默认行业为 “互联网”。

  1. 由于大部分的升级都是针对同一个事件的增减字段,这里建立了SameEventUpCaster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class SameEventUpCaster extends SingleEventUpcaster {

protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {

return outputType(intermediateRepresentation.getType()) != null;
}

@Override
protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) {
return intermediateRepresentation.upcast(
outputType(intermediateRepresentation.getType()),
JsonNode.class,
d -> this.doUpCastPayload(d, intermediateRepresentation),
metaData -> this.doUpCastMetaData(metaData, intermediateRepresentation)
);
}

public SimpleSerializedType outputType(SerializedType originType) {
return new SimpleSerializedType(eventTypeName(), outputRevision(originType.getRevision()));
}

public abstract String eventTypeName();

public abstract String outputRevision(String originRevision);

public abstract JsonNode doUpCastPayload(JsonNode document, IntermediateEventRepresentation intermediateEventRepresentation);

public abstract MetaData doUpCastMetaData(MetaData document, IntermediateEventRepresentation intermediateEventRepresentation);

}
  1. 建立一个处理事件转换的中心,目的是将转换操作分发下去:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ContractEventUpCaster extends SingleEventUpcaster {
private static List<SameEventUpCaster> upCasters = Arrays.asList(
new ContractCreatedEventUpCaster(),
new ContractUpdatedEventUpCaster()
);

@Override
protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {
return upCasters.stream().anyMatch(o -> o.canUpcast(intermediateRepresentation));
}

@Override
protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) {
SameEventUpCaster upCaster = upCasters.stream()
.filter(o -> o.canUpcast(intermediateRepresentation))
.findAny().orElseThrow(RuntimeException::new);
return upCaster.doUpcast(intermediateRepresentation);
}
}
  1. 实现各个事件的具体升级方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class ContractCreatedEventUpCaster extends SameEventUpCaster {


@Override
public String eventTypeName() {
return ContractCreatedEvent.class.getTypeName();
}

@Override
public String outputRevision(String originRevision) {
final HashMap<String, String> revisionConvertMpp = new HashMap<>();
revisionConvertMpp.put(null, "1.0.0");

return revisionConvertMpp.get(originRevision);
}

@Override
public JsonNode doUpCastPayload(JsonNode document, IntermediateEventRepresentation intermediateEventRepresentation) {

if (intermediateEventRepresentation.getType().getRevision() == null) {
((ObjectNode) document).put("industryName", "互联网");
}

return document;
}

@Override
public MetaData doUpCastMetaData(MetaData document, IntermediateEventRepresentation intermediateEventRepresentation) {
return document;
}
}

public class ContractUpdatedEventUpCaster extends SameEventUpCaster {


@Override
public String eventTypeName() {
return ContractUpdatedEvent.class.getTypeName();
}

@Override
public String outputRevision(String originRevision) {
final HashMap<String, String> revisionConvertMpp = new HashMap<>();
revisionConvertMpp.put(null, "1.0.0");

return revisionConvertMpp.get(originRevision);
}

@Override
public JsonNode doUpCastPayload(JsonNode document, IntermediateEventRepresentation intermediateEventRepresentation) {

// 每个版本只有一种升级方案,然后链式一步一步升级
if (intermediateEventRepresentation.getType().getRevision() == null) {
((ObjectNode) document).put("industryName", "互联网");
}

return document;
}

@Override
public MetaData doUpCastMetaData(MetaData document, IntermediateEventRepresentation intermediateEventRepresentation) {
return document;
}
}
  1. 添加配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Bean
public EventStorageEngine eventStorageEngine(Serializer defaultSerializer,
PersistenceExceptionResolver persistenceExceptionResolver,
@Qualifier("eventSerializer") Serializer eventSerializer,
EntityManagerProvider entityManagerProvider,
EventUpcaster contractUpCaster,
TransactionManager transactionManager) {
return JpaEventStorageEngine.builder()
.snapshotSerializer(defaultSerializer)
.upcasterChain(contractUpCaster)
.persistenceExceptionResolver(persistenceExceptionResolver)
.eventSerializer(eventSerializer)
.entityManagerProvider(entityManagerProvider)
.transactionManager(transactionManager)
.build();
}
@Bean
public EventUpcaster contractUpCaster() {
return new ContractEventUpCaster();
}

启动项目,再次请求 /contracts/{id} ,数据已经更新了。但是有个问题,那就是 view 视图的数据还没有更新,这部分的数据还是需要编写脚本去做升级的,暂时没有什么更好的办法。

Command 优化

  1. 自定义CommandGateway
    Axon 提供的CommandGateway接口,我们看到任意的 object 都能发送,并且不能附带 MetaData,所以这里我们进行一个自定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ContractCommandGateway {

// fire and forget
void sendCommand(AbstractCommand command);

// method that will wait for a result for 10 seconds
@Timeout(value = 6, unit = TimeUnit.SECONDS)
Long sendCommandAndWaitForAResult(AbstractCommand command);


// method that will wait for a result for 10 seconds
@Timeout(value = 6, unit = TimeUnit.SECONDS)
void sendCommandAndWait(AbstractCommand command);

// method that attaches meta data and will wait for a result for 10 seconds
@Timeout(value = 6, unit = TimeUnit.SECONDS)
ContractAggregate sendCommandAndWaitForAResult(AbstractCommand command,
@MetaDataValue("userId") String userId);

// this method will also wait, caller decides how long
void sendCommandAndWait(AbstractCommand command, long timeout, TimeUnit unit) throws TimeoutException, InterruptedException;
}
  1. 增加重试机制,由于分布式的问题,event 在存储的时候还是会发生资源争夺,在 InnoDB 下的表现就是 A 节点存储了 seq 为 1 的 event,B 节点在 A 存储前内存中先读取到了 0 ,然后进行存储,这个时候就会有重复的风险,比较好的做法是将同一个 aggregate 的操作尽量分配到一个节点下面去处理,但是事实上完全避免是不太可能的,所以了这里我们需要一个重试机制去做补偿,Axon 也提供了这个机制,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class CommandRetryScheduler implements RetryScheduler {

@Override
public boolean scheduleRetry(CommandMessage commandMessage, RuntimeException lastFailure, List<Class<? extends Throwable>[]> failures, Runnable commandDispatch) {
log.info(MessageFormat.format("aggregate [{0}] execute [{1}] retry [{2}] time", commandMessage.getIdentifier(), commandMessage.getCommandName(), failures.size()));

if (failures.size() > 2) {
return false;
}

commandDispatch.run();

return true;
}
}
  1. 截获 command 做一些事情,比如从 Security Context 中获取 user 信息并作为 MetaData 发送,或者为 CreateCommand 类型的 command 自动生成一个 ID,而不用我们手动去构建:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface MetaDataUserInterface {

String getName();

Long getUserId();

Long getCustomerId();
}

@Getter
@Setter
@Builder
public class MetaDataUser implements MetaDataUserInterface {

private String name;

private Long userId;

private Long customerId;
}

@AllArgsConstructor
@Configuration
public class CommandInterceptor implements MessageDispatchInterceptor {

private final UIDGenerator uidGenerator;

@Override
public BiFunction<Integer, GenericCommandMessage<AbstractCommand>, GenericCommandMessage<AbstractCommand>> handle(List messages) {
return (index, message) -> {

// create command 自动生成 ID
if (message.getPayload() instanceof CreateContractCommand) {
CreateContractCommand payload = (CreateContractCommand) message.getPayload();
payload.setIdentifier(uidGenerator.getId());
}

// 添加 user info 作为 MetaData,由于项目不设计 security 这里就简单的附加一个假的用户
Map<String, MetaDataUserInterface> map = new HashMap<>();

map.put("user", MetaDataUser.builder().customerId(1L).name("Test").userId(2L).build());
return map.isEmpty() ? message : message.andMetaData(map);
};
}
}

这里为了不涉及 Spring Security,我直接 new 了一个测试对象进去,大家可以取自己的 user。

  1. 最后添加配置代码:
1
2
3
4
5
6
7
8
9
@Bean
public ContractCommandGateway getCommandGateway(SimpleCommandBus simpleCommandBus, CommandInterceptor commandInterceptor) {
return CommandGatewayFactory.builder()
.commandBus(simpleCommandBus)
.retryScheduler(new CommandRetryScheduler())
.dispatchInterceptors(commandInterceptor)
.build()
.createGateway(ContractCommandGateway.class);
}
  1. 最后替换掉 command gateway 的调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final ContractCommandGateway contractCommandGateway;

@PostMapping
public Long createContract(@RequestBody @Valid CreateContractCommand command) {
return contractCommandGateway.sendCommandAndWaitForAResult(command);
}

@PutMapping("/{id}")
public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) {
command.setIdentifier(id);
contractCommandGateway.sendCommandAndWait(command);
}

@DeleteMapping("/{id}")
public void deleteContract(@PathVariable("id") Long id) {
contractCommandGateway.sendCommandAndWait(new DeleteContractCommand(id));
}

这里大家可能注意到,我将原来 async 的发送都改成了 sync 的,因为 async 的调用,如果过程异常,其实这个请求并不会丢出异常,sendCommandAndWaitForAResult command 可以有返回值,这个在官方文档中也有介绍,就不做过多的介绍。

完整的例子 - branch session5