实现 CQRS
在实现了 EventSoucing 之后,亟待解决的问题是查询了,理论上同一 Service 可以做到多数据源,甚至多数据库,这篇文章就暂时以同一个数据库为例子,同样使用 JPA 去做 view 的 ORM。
建立 entity
第一步当然是建立对应的 Entity 和对应的 Repository 了:
1 |
|
实现 view 的存储
接下来我们就实现事件发生之后,view 的存储,这个过程是由一个独立的 EventHandler 来实现的,新建ContractViewHandler
:
1 |
|
接收到事件之后,把 aggregate 使用mapstruct
转换到 view ,然后直接保存就完成了 view 的存储。看起来好像事情干完了,然而一启动就发现EventSourcingRepository
这个 bean 找不到,实际上这个 bean 是需要我们自己定义的,在 config 中加入:
1 |
|
启动并发送一个 POST 请求,在查看contract_view
表,已经有一条记录被插入。
让 view 存储过程异步
似乎看起来,上面的流程已经没什么问题了,然而实际上是有问题的,我们先来看一张图:
这张图描述了一个 Event Soucing 和 CQRS 的理想模型,可以看到 event 存储和 view 的存储应该是分离的,view 的存储是等 event 存储之后异步进行存储,同理事件的发送也是这样,那么我们上面的例子有没有实现 view 的存储是分离的呢,很遗憾,Axon 默认的实现并不是这样的,当 event handler 丢出 runtime exception 之后,事件并没有被存储,也就是说他们应该是都在一个 tranaction 里面。而 Axon 确实提供了 event 的异步处理,官方文档 里有提到过,但是实际上仍然没有解决问题,因为 Axon 在 Subscribe
模式的 handler 调用过程中,并不会等事件事务存储之后再去调用,而是存储的同时依次调用,也就是说虽然可以做到 view 的 handler 异步化,但仍然做不到保证事件的存储之后再去更新 view(理论上 axon 的tracking event
也是可以保证消息发送和 view handler 异步的,但是这个模式下,Axon 会隔一段时间去扫表,并且只有一个节点可以处理,所以我觉得这种方式不太好就没有用 )。所以我们改进一下 view 的流程:
这样 view 的存储是分离了,但是事件是否发出却没有得到保证,也就是可靠消息,关于可靠消息这一块,后面将专门搞个话题处理可靠消息以及可靠消息下 view 层的实现,这里为了只涉及 CQRS 就暂时不做深入。
基于 Aggregate 的查询实现
有了 view 的存储之后,查询方式就和以前传统的 jpa 方式一样了,那么我们有些时候需要从 aggregate 查询最新的状态,比如在 view 处理错误的时候,其实 Axon 也提供了相关的实现,从上面的 view handler 也可以看到,我们可以通过一个 repository 去 load 一个 aggregate,那么通过 aggregate 查询的实现也就比较简单了,这里就不贴代码了。
让 Aggregate 可以查询历史状态
首先我们先看以下 Axon 存储的整体结构:
Repository
->EventStore
->EventStorageEngine
其中EventStorageEngine
提供了最底层的查询存储功能,EventStore
进行封装、过滤和优化,整个 aggregate 的过程就是从 DB 中 fetch 所有的事件(这里会做一个 snapshot 的优化),然后将事件发送出去,那么我们为了尽量不去改写底层的查询,可以在EventStore
做一个内存过滤然后向 Repository 输出接口,其实这个效率也还行,因为本身一个对象的事件有限,并不会吃掉很多资源去做过滤这件事情。
自定义 EventStore
这里比较简单,只要参照原来EmbeddedEventStore
里面的 readEvents 方法,然后将里面的内容按照时间过滤一下。
1 |
|
自定义 Repository
首先分析一下EventSourcingRepository
的继承关系:
EventSourcingRepository
->LockingRepository
->AbstractRepository
->Repository
其中Repository
是个 interface
其实最好的方式是在 Repository
interface 中加一个 load(Long, Instant)
方法,但是这样我们需要改造的地方就有点多了,因为要一层层的添加实现,还是考虑接地气一点,直接在 EventSourcingRepository
中实现这一功能(相当于把几个父类的事情一起干了)。
1 | 4j |
可以看到这里我们在 timestamp 为空的情况下,直接走了原来的 load 方法,剩下的就是参照原来的写法,直接在 CustomEventSourcingRepository
实现几个父类的 loadXXX
方法即可。
另外配置中更新下我们需要的 Repository Bean:
1 |
|
这里由于 EventStorageEngine 的 Auto Config
在自定义了 eventStore 之后就不起作用了,所以这里把 JpaEventStoreAutoConfiguration
中的内容搬过来了。另外在自定义上面三个 bean 之后默认的 event mode 也莫名其妙的变成了 tracking event
,所以这里第四个 bean 对默认的 processor 做了修改。
Query Command 的应用
Axon 基于 Command 这种模式,将查询也做了一部分,这里我们尝试下用它的 Query Command 来实现各种查询。
- 添加 Query :
1 |
|
- 添加 Handler :
1 |
|
- Controller 中发送命令:
1 | private final QueryGateway queryGateway; |
到这里 CQRS + EventSoucing 的模型就做完了。