Event Sourcing 和 CQRS落地(四):CQRS 实现

实现 CQRS

在实现了 EventSoucing 之后,亟待解决的问题是查询了,理论上同一 Service 可以做到多数据源,甚至多数据库,这篇文章就暂时以同一个数据库为例子,同样使用 JPA 去做 view 的 ORM。

建立 entity

第一步当然是建立对应的 Entity 和对应的 Repository 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Entity
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ContractView implements ContractInterface {

@Id
@Column(length = 64)
private Long id;

private String name;

private String partyA;

private String partyB;

private boolean deleted = false;

}

@Repository
public interface ContractViewRepository extends JpaRepository<ContractView, Long> {

}

实现 view 的存储

接下来我们就实现事件发生之后,view 的存储,这个过程是由一个独立的 EventHandler 来实现的,新建ContractViewHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
@Slf4j
@AllArgsConstructor
@Transactional
public class ContractViewHandler {

private final EventSourcingRepository<ContractAggregate> customEventSourcingRepository;

private final ContractViewRepository contractViewRepository;


/**
* 任何 contract 事件发生之后,重新计算 aggregate 的最新状态,转换成 view 之后存储到本地
*
* @param event any event from contract
* @param message domain event wrapper
*/
@EventHandler
public void on(AbstractContractEvent event, DomainEventMessage<AbstractContractEvent> message) {


log.info(MessageFormat.format("{0}: {1} , seq: {2}, payload: {3}", message.getType(), message.getAggregateIdentifier(), message.getSequenceNumber(), message.getPayload()));

updateContractView(message.getAggregateIdentifier());
}

@Transactional
public void updateContractView(String id) {
LockAwareAggregate<ContractAggregate, EventSourcedAggregate<ContractAggregate>> lockAwareAggregate = customEventSourcingRepository.load(id);
ContractAggregate aggregate = lockAwareAggregate.getWrappedAggregate().getAggregateRoot();


ContractView contractView = contractViewRepository.findById(Long.valueOf(id)).orElse(new ContractView());
contractView.setId(aggregate.getIdentifier());
contractView.setDeleted(aggregate.isDeleted());
contractView.setName(aggregate.getName());
contractView.setPartyA(aggregate.getPartyA());
contractView.setPartyB(aggregate.getPartyB());

contractViewRepository.save(contractView);
}
}

接收到事件之后,把 aggregate 使用mapstruct转换到 view ,然后直接保存就完成了 view 的存储。看起来好像事情干完了,然而一启动就发现EventSourcingRepository这个 bean 找不到,实际上这个 bean 是需要我们自己定义的,在 config 中加入:

1
2
3
4
@Bean
public EventSourcingRepository<ContractAggregate> contractAggregateRepository(EventStore eventStore) {
return new EventSourcingRepository<>(ContractAggregate.class, eventStore);
}

启动并发送一个 POST 请求,在查看contract_view表,已经有一条记录被插入。

让 view 存储过程异步

似乎看起来,上面的流程已经没什么问题了,然而实际上是有问题的,我们先来看一张图:
image

这张图描述了一个 Event Soucing 和 CQRS 的理想模型,可以看到 event 存储和 view 的存储应该是分离的,view 的存储是等 event 存储之后异步进行存储,同理事件的发送也是这样,那么我们上面的例子有没有实现 view 的存储是分离的呢,很遗憾,Axon 默认的实现并不是这样的,当 event handler 丢出 runtime exception 之后,事件并没有被存储,也就是说他们应该是都在一个 tranaction 里面。而 Axon 确实提供了 event 的异步处理,官方文档 里有提到过,但是实际上仍然没有解决问题,因为 Axon 在 Subscribe 模式的 handler 调用过程中,并不会等事件事务存储之后再去调用,而是存储的同时依次调用,也就是说虽然可以做到 view 的 handler 异步化,但仍然做不到保证事件的存储之后再去更新 view(理论上 axon 的tracking event 也是可以保证消息发送和 view handler 异步的,但是这个模式下,Axon 会隔一段时间去扫表,并且只有一个节点可以处理,所以我觉得这种方式不太好就没有用 )。所以我们改进一下 view 的流程:
image

这样 view 的存储是分离了,但是事件是否发出却没有得到保证,也就是可靠消息,关于可靠消息这一块,后面将专门搞个话题处理可靠消息以及可靠消息下 view 层的实现,这里为了只涉及 CQRS 就暂时不做深入。

基于 Aggregate 的查询实现

有了 view 的存储之后,查询方式就和以前传统的 jpa 方式一样了,那么我们有些时候需要从 aggregate 查询最新的状态,比如在 view 处理错误的时候,其实 Axon 也提供了相关的实现,从上面的 view handler 也可以看到,我们可以通过一个 repository 去 load 一个 aggregate,那么通过 aggregate 查询的实现也就比较简单了,这里就不贴代码了。

让 Aggregate 可以查询历史状态

首先我们先看以下 Axon 存储的整体结构:

Repository ->EventStore->EventStorageEngine 其中EventStorageEngine 提供了最底层的查询存储功能,EventStore进行封装、过滤和优化,整个 aggregate 的过程就是从 DB 中 fetch 所有的事件(这里会做一个 snapshot 的优化),然后将事件发送出去,那么我们为了尽量不去改写底层的查询,可以在EventStore做一个内存过滤然后向 Repository 输出接口,其实这个效率也还行,因为本身一个对象的事件有限,并不会吃掉很多资源去做过滤这件事情。

自定义 EventStore

这里比较简单,只要参照原来EmbeddedEventStore里面的 readEvents 方法,然后将里面的内容按照时间过滤一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

@Slf4j
public class CustomEmbeddedEventStore extends EmbeddedEventStore {

public CustomEmbeddedEventStore(Builder builder) {
super(builder);
}

public static CustomEmbeddedEventStore.Builder builder() {
return new CustomEmbeddedEventStore.Builder();
}

public DomainEventStream readEvents(String aggregateIdentifier, Instant timestamp) {
Optional<DomainEventMessage<?>> optionalSnapshot;
try {
optionalSnapshot = storageEngine().readSnapshot(aggregateIdentifier);
} catch (Exception | LinkageError e) {
log.warn("Error reading snapshot. Reconstructing aggregate from entire event stream.", e);
optionalSnapshot = Optional.empty();
}
DomainEventStream eventStream;
// 加上时间判断,如果 snapshot 在指定的时间之间,那么可以使用,否则直接读取所有的 events
if (optionalSnapshot.isPresent() && optionalSnapshot.get().getTimestamp().compareTo(timestamp) <= 0) {
DomainEventMessage<?> snapshot = optionalSnapshot.get();
eventStream = DomainEventStream.concat(DomainEventStream.of(snapshot),
storageEngine().readEvents(aggregateIdentifier,
snapshot.getSequenceNumber() + 1));
} else {
eventStream = storageEngine().readEvents(aggregateIdentifier);
}

eventStream = new IteratorBackedDomainEventStream(eventStream.asStream().filter(m -> m.getTimestamp().compareTo(timestamp) <= 0).iterator());

Stream<? extends DomainEventMessage<?>> domainEventMessages = stagedDomainEventMessages(aggregateIdentifier);
return DomainEventStream.concat(eventStream, DomainEventStream.of(domainEventMessages));
}

// 这里为了构建自己的 eventStore 就把 builder 搬过来了,略去了内部实现,具体可以看源码
public static class Builder extends EmbeddedEventStore.Builder {
...
}
}

自定义 Repository

首先分析一下EventSourcingRepository的继承关系:

EventSourcingRepository->LockingRepository->AbstractRepository->Repository 其中Repository是个 interface

其实最好的方式是在 Repository interface 中加一个 load(Long, Instant) 方法,但是这样我们需要改造的地方就有点多了,因为要一层层的添加实现,还是考虑接地气一点,直接在 EventSourcingRepository 中实现这一功能(相当于把几个父类的事情一起干了)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Slf4j
public class CustomEventSourcingRepository<T> extends EventSourcingRepository<T> {

private final CustomEmbeddedEventStore eventStore;
private final SnapshotTriggerDefinition snapshotTriggerDefinition;
private final AggregateFactory<T> aggregateFactory;
private final LockFactory lockFactory;

public static <T> Builder<T> builder(Class<T> aggregateType) {
return new Builder<>(aggregateType);
}

protected CustomEventSourcingRepository(Builder<T> builder) {
super(builder);
this.eventStore = builder.eventStore;
this.aggregateFactory = builder.buildAggregateFactory();
this.snapshotTriggerDefinition = builder.snapshotTriggerDefinition;
this.lockFactory = builder.lockFactory;
}

protected EventSourcedAggregate<T> doLoadWithLock(String aggregateIdentifier, Instant timestamp) {
DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier, timestamp);

SnapshotTrigger trigger = snapshotTriggerDefinition.prepareTrigger(aggregateFactory.getAggregateType());
if (!eventStream.hasNext()) {
throw new AggregateNotFoundException(aggregateIdentifier, "The aggregate was not found in the event store");
}
EventSourcedAggregate<T> aggregate = EventSourcedAggregate
.initialize(aggregateFactory.createAggregateRoot(aggregateIdentifier, eventStream.peek()),
aggregateModel(), eventStore, trigger);
aggregate.initializeState(eventStream);
if (aggregate.isDeleted()) {
throw new AggregateDeletedException(aggregateIdentifier);
}
return aggregate;
}

protected LockAwareAggregate<T, EventSourcedAggregate<T>> doLoad(String aggregateIdentifier, Instant timestamp) {
Lock lock = lockFactory.obtainLock(aggregateIdentifier);
try {
final EventSourcedAggregate<T> aggregate = doLoadWithLock(aggregateIdentifier, timestamp);
CurrentUnitOfWork.get().onCleanup(u -> lock.release());
return new LockAwareAggregate<>(aggregate, lock);
} catch (Exception ex) {
log.debug("Exception occurred while trying to load an aggregate. Releasing lock.", ex);
lock.release();
throw ex;
}
}

public LockAwareAggregate<T, EventSourcedAggregate<T>> load(String aggregateIdentifier, Instant timestamp) {

if (timestamp == null) {
return this.load(aggregateIdentifier);
}

UnitOfWork<?> uow = CurrentUnitOfWork.get();
Map<String, LockAwareAggregate<T, EventSourcedAggregate<T>>> aggregates = managedAggregates(uow);
LockAwareAggregate<T, EventSourcedAggregate<T>> aggregate = aggregates.computeIfAbsent(aggregateIdentifier,
s -> doLoad(aggregateIdentifier, timestamp));
uow.onRollback(u -> aggregates.remove(aggregateIdentifier));
prepareForCommit(aggregate);

return aggregate;
}

public static class Builder<T> extends EventSourcingRepository.Builder<T> {
...// 这里我略去了 builder class 内部的方法,具体的可以看源码。
}
}

可以看到这里我们在 timestamp 为空的情况下,直接走了原来的 load 方法,剩下的就是参照原来的写法,直接在 CustomEventSourcingRepository 实现几个父类的 loadXXX 方法即可。

另外配置中更新下我们需要的 Repository Bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Configuration
@RegisterDefaultEntities(packages = {
"org.axonframework.eventsourcing.eventstore.jpa"
})
public class AxonContinueConfiguration {

@Bean
public CustomEmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) {
return CustomEmbeddedEventStore.builder()
.storageEngine(storageEngine)
.messageMonitor(configuration.messageMonitor(EventStore.class, "eventStore"))
.build();
}

@Bean
public CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository(CustomEmbeddedEventStore eventStore,
ParameterResolverFactory parameterResolverFactory) {
return CustomEventSourcingRepository.builder(ContractAggregate.class)
.eventStore(eventStore)
.parameterResolverFactory(parameterResolverFactory)
.build();
}

@Bean
public EventStorageEngine eventStorageEngine(Serializer defaultSerializer,
PersistenceExceptionResolver persistenceExceptionResolver,
@Qualifier("eventSerializer") Serializer eventSerializer,
AxonConfiguration configuration,
EntityManagerProvider entityManagerProvider,
TransactionManager transactionManager) {
return JpaEventStorageEngine.builder()
.snapshotSerializer(defaultSerializer)
.upcasterChain(configuration.upcasterChain())
.persistenceExceptionResolver(persistenceExceptionResolver)
.eventSerializer(eventSerializer)
.entityManagerProvider(entityManagerProvider)
.transactionManager(transactionManager)
.build();
}

@Bean
public EventProcessingConfigurer eventProcessingConfigurer(EventProcessingConfigurer eventProcessingConfigurer) {
eventProcessingConfigurer.usingSubscribingEventProcessors();
return eventProcessingConfigurer;
}
}

这里由于 EventStorageEngine 的 Auto Config 在自定义了 eventStore 之后就不起作用了,所以这里把 JpaEventStoreAutoConfiguration 中的内容搬过来了。另外在自定义上面三个 bean 之后默认的 event mode 也莫名其妙的变成了 tracking event,所以这里第四个 bean 对默认的 processor 做了修改。

Query Command 的应用

Axon 基于 Command 这种模式,将查询也做了一部分,这里我们尝试下用它的 Query Command 来实现各种查询。

  1. 添加 Query :
1
2
3
4
5
6
7
8
9
10
11
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class QueryContractCommand {
@NotBlank
@NotNull
private String id;

private Instant endDate;
}
  1. 添加 Handler :
1
2
3
4
5
6
7
8
9
10
11
12
@Component
@AllArgsConstructor
@Slf4j
public class ContractQueryHandler {
private final CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository;

@QueryHandler
public ContractAggregate on(QueryContractCommand command) {
LockAwareAggregate<ContractAggregate, EventSourcedAggregate<ContractAggregate>> lockAwareAggregate = contractAggregateRepository.load(command.getId().toString(), command.getEndDate());
return lockAwareAggregate.getWrappedAggregate().getAggregateRoot();
}
}
  1. Controller 中发送命令:
1
2
3
4
5
6
7
8
private final QueryGateway queryGateway;

@GetMapping("/{id}")
public ContractAggregate getContract(@PathVariable("id") Long id) {
QueryContractCommand command = new QueryContractCommand(id, Instant.now());

return queryGateway.query(command, ContractAggregate.class).join();
}

到这里 CQRS + EventSoucing 的模型就做完了。

完整的例子 - branch session3