Event Sourcing 和 CQRS落地(三):Event-Sourcing 实现

实现 Event Soucing

在了解了相关的基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐在之后的过程去丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot 工程搭建

打开 http://start.spring.io/ 选择对应的版本(这里是 2.1.5 )以及相应的依赖,这里多选了一些之后会用到的服务:

image

添加配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
application:
name: event-sourcing-service
datasource:
url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE
username: root
password: root
jpa:
hibernate:
ddl-auto: update
use-new-id-generator-mappings: false
show-sql: false
properties:
hibernate.dialect: org.hibernate.dialect.MySQL55Dialect

为了便于测试,这里我开启了 JPA 的自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此一个简单的服务就搭建完毕了。

Axon 依赖和配置

  1. 依赖添加

搭建好了工程之后,我们正式开始做一些 Axon 相关的事情,这个工程我们用一个简单的 Contract 业务来作为demo,首先添加依赖:
添加 Axon 依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.1.1</version>
<exclusions>
<exclusion>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.1-jre</version>
</dependency>

解释一下,axon 从 4.0 开始加入了 axon-server 这么一个东西,目的是将 event 的存储和分发剥离,以更好的发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 就会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存户 saga。

  1. Axon 的配置
1
2
3
axon:
serializer:
general: jackson

这里指定使用了Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

  1. 定义 aggregate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public interface ContractInterface {

Long getId();

@NotBlank
String getName();

@NotBlank
String getPartyA();

@NotBlank
String getPartyB();
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Aggregate
public class ContractAggregate implements ContractInterface {

@AggregateIdentifier
private Long id;

private String name;

private String partyA;

private String partyB;

private boolean deleted = false;
}
  1. 定义 commands
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class AbstractCommand {
@TargetAggregateIdentifier
private Long identifier;
}

@Getter
@Setter
@NoArgsConstructor
public class UpdateContractCommand extends AbstractCommand implements ContractInterface {

private String name;

private String partyA;

private String partyB;

public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) {
super(identifier);
this.name = name;
this.partyA = partyA;
this.partyB = partyB;
}
}

@Getter
@Setter
@NoArgsConstructor
public class CreateContractCommand extends UpdateContractCommand {

public CreateContractCommand(Long identifier, String name, String partyA, String partyB) {
super(identifier, name, partyA, partyB);
}
}

@NoArgsConstructor
@Getter
@Setter
public class DeleteContractCommand extends AbstractCommand {
public DeleteContractCommand(Long identifier) {
super(identifier);
}
}
  1. 定义 events
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class AbstractEvent {

@TargetAggregateIdentifier
private Long identifier;
}


@Getter
@Setter
@NoArgsConstructor
public class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {

private String name;

private String partyA;

private String partyB;

public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) {
super(identifier);
this.name = name;
this.partyA = partyA;
this.partyB = partyB;
}
}

@Getter
@Setter
@NoArgsConstructor
public class ContractCreatedEvent extends ContractUpdatedEvent {

public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) {
super(identifier, name, partyA, partyB);
}
}

@Getter
@Setter
@NoArgsConstructor
public class ContractDeletedEvent extends AbstractEvent {

public ContractDeletedEvent(Long identifier) {
super(identifier);
}
}

这里只抽象了统一的 command 和 event,在实际的业务开发过程中可以有更多的抽象。

实现各个 Handler

在这里我们实现 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@CommandHandler
public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {
if (null == command.getIdentifier()) {
command.setIdentifier(generator.getId());
}
AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);
}

@CommandHandler
private void on(UpdateContractCommand command, MetaData metaData) {
AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);
}

@CommandHandler
private void on(DeleteContractCommand command, MetaData metaData) {
AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData);
}

@EventSourcingHandler
private void on(ContractCreatedEvent event) {
this.setIdentifier(event.getIdentifier());
this.onUpdate(event);
}

@EventSourcingHandler
private void onUpdate(ContractUpdatedEvent event) {
this.setName(event.getName());
this.setPartyA(event.getPartyA());
this.setPartyB(event.getPartyB());
}

@EventSourcingHandler(payloadType = ContractDeletedEvent.class)
private void on() {
this.setDeleted(true);
}
  • 这里看到有一个 CommandHandler 的注解写在了构造方法上,那么在处理这个 Command 的时候,将会自动创建一个对象。另外这里的 MetaData 是在 command 发送的时候顺带的附加信息,可以是用户信息,机器信息等等,后续也会涉及这部分,这里就不深入了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善的过程中也会涉及到。

编写接口

aggreate 以及各个 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利的跑起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RestController
@RequestMapping("/contracts")
@AllArgsConstructor
public class ContractController {

private final CommandGateway commandGateway;

@PostMapping
public void createContract(@RequestBody @Valid CreateContractCommand command) {
commandGateway.send(command);
}

@PutMapping("/{id}")
public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) {
command.setIdentifier(id);
commandGateway.send(command);
}

@DeleteMapping("/{id}")
public void deleteContract(@PathVariable("id") Long id) {
commandGateway.send(new DeleteContractCommand(id));
}
}

启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 的角度实现数据的读取。

完整的例子 - branch session4