Event Sourcing 和 CQRS落地(六):实现可靠消息

实现可靠消息

什么是可靠消息

微服务盛行的时代下,消息成为了不可缺少的组件,首先我们看一个例子,contract 系统创建了一个合同,然后发送创建合同的消息。看似简单,实际上分析一下它的出错可能性,会有以下几种:

  1. 创建合同成功,发送消息失败;
  2. 创建合同失败,发送消息成功;
  3. 创建合同成功,发送消息成功;
  4. 创建合同失败,发送消息失败;

同时成功或者同时失败,这个情况是一致的,是正确的,我们需要关心的就是不一致的情况。那么最简单的办法,就是让创建合同和发送消息成为一个事务,要么一起成功,要么一起失败,但是这么做的话耦合性太强,本身合同创建成功了,却因为消息发送的失败强制回滚。这个时候,可能就想到了存储消息数据,将合同创建和消息数据的存储作为一个事务,消息发送成功之后再去删除消息数据,定期去扫描未发送的消息数据,来保证消息的发送。但是这么做还是有一定的代价的,需要实现消息的存储,消息存储和合同创建还是耦合在一起的,不过这样的模式到 Event Sourcing 下面那就比较理想了,因为本身消息数据和 event 是一样的,存储了 event 相当于完成了存储消息数据,只需要在 event 下做一个标记即可。

做完了上面这些,就能保证消息一定从 producer 到 broker 这一过程,当然要做到消息不丢,必然产生的结果就是消息可能会重复,情况就是 broker 收到了消息,但是没有通知到 producer,这种情况下 producer 是认为消息没有投递成功的,会出现重复投递的情况。保证了消息一定送达 broker 之后,就是 consumer 和 broker 的关系了,consumer 在消费之后需要告诉 broker 消费成功,否则 broker 需要一直保存这些消息。当然消费端可能需要做更多的事情,比如保证同一 aggregate 事件的顺序消费。下面文章会以在 Axon 框架上做一些拓展,以分别实现 consumer 和 producer。

拓展 DomainEventEntry

上面也说到了,在 Event Sourcing 模式下,我们只需要给事件加上一个是否投递的标志,这里我们就看看如何实现(这里只针对 JPA 做了实现)。

  1. 建立对应的 entity 以取代DomainEventEntry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Entity(name = "DomainEventEntry")
@Getter
@Setter
@Table(indexes = @Index(columnList = "aggregateIdentifier,sequenceNumber", unique = true))
public class CustomDomainEventEntry extends AbstractSequencedDomainEventEntry<byte[]> {

@NotNull
@Column(columnDefinition = "tinyint(1) default 0")
private boolean sent = false;

public CustomDomainEventEntry(DomainEventMessage<?> eventMessage, Serializer serializer) {
super(eventMessage, serializer, byte[].class);
this.setSent(false);
}

/**
* Default constructor required by JPA
*/
protected CustomDomainEventEntry() {
}
}
  1. 建立对应的 storage 以取代 JpaEventStorageEngine:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CustomJpaEventStorageEngine extends JpaEventStorageEngine {

public CustomJpaEventStorageEngine(Builder builder) {
super(builder);
}

@Override
protected Object createEventEntity(EventMessage<?> eventMessage, Serializer serializer) {
return new CustomDomainEventEntry(asDomainEventMessage(eventMessage), serializer);
}

public static CustomJpaEventStorageEngine.Builder builder() {
return new CustomJpaEventStorageEngine.Builder();
}

// 此处略去了 builder 部分代码
public static class Builder extends JpaEventStorageEngine.Builder {
...
}
}
  1. 更新对应的 config:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public EventStorageEngine eventStorageEngine(Serializer defaultSerializer,
PersistenceExceptionResolver persistenceExceptionResolver,
@Qualifier("eventSerializer") Serializer eventSerializer,
EntityManagerProvider entityManagerProvider,
EventUpcaster contractUpCaster,
TransactionManager transactionManager) {
return CustomJpaEventStorageEngine.builder()
.snapshotSerializer(defaultSerializer)
.upcasterChain(contractUpCaster)
.persistenceExceptionResolver(persistenceExceptionResolver)
.eventSerializer(eventSerializer)
.entityManagerProvider(entityManagerProvider)
.transactionManager(transactionManager)
.build();
}

handler 实现可靠消息的生产端

做好了准备工作再发送消息就比较清晰了,我们需要做的就是在事件存储后去尝试发送消息,然后标记为已发送即可,在之前的 实现 CQRS 中我们留了一个坑,就是 view 端的更新不是在事件存储之后,这里我们就去实现发消息在事件存储之后,然后 view 层去监听消息更新。具体的实现就是利用 entity postPersist 去监听存储,在 transaction 成功后去尝试发送消息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

@EntityListeners(CustomDomainEventEntryListener.class)
public class CustomDomainEventEntry extends AbstractSequencedDomainEventEntry<byte[]> {
...
}

@Component
@Slf4j
public class CustomDomainEventEntryListener {
private static CustomDomainEventEntryRepository customDomainEventEntryRepository;

private static ContractPublisher contractPublisher;

@Autowired
public void init(CustomDomainEventEntryRepository customDomainEventEntryRepository, ContractPublisher contractPublisher) {
this.customDomainEventEntryRepository = customDomainEventEntryRepository;
this.contractPublisher = contractPublisher;
}

@PostPersist
void onPersist(CustomDomainEventEntry entry) {

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
CompletableFuture.runAsync(() -> sendEvent(entry.getEventIdentifier()));
}
}
});
}

@Transactional
public void sendEvent(String identifier) {
CustomDomainEventEntry eventEntry = customDomainEventEntryRepository.findByEventIdentifier(identifier);

if (!eventEntry.isSent()) {
contractPublisher.sendEvent(eventEntry);
eventEntry.setSent(true);
customDomainEventEntryRepository.save(eventEntry);
}
}
}

@Repository
public interface CustomDomainEventEntryRepository extends JpaRepository<CustomDomainEventEntry, String> {

/**
* 查找事件
*
* @param identifier
*
* @return
*/
CustomDomainEventEntry findByEventIdentifier(String identifier);
}

@Component
@AllArgsConstructor
@Slf4j
public class ContractEventPublisher {

public void sendEvent(DomainEvent event) {
// use stream to send message here
log.info(MessageFormat.format("prepare to sending message : {0}]", new Gson().toJson(event)));
}

public void sendEvent(CustomDomainEventEntry event) {
// use com.craftsman.eventsourcing.stream to send message here
ObjectMapper mapper = new ObjectMapper();

HashMap payload = null;
HashMap metaData = null;
try {
payload = mapper.readValue(event.getPayload().getData(), HashMap.class);
metaData = mapper.readValue(event.getMetaData().getData(), HashMap.class);
} catch (Exception exception) {
log.error(MessageFormat.format("byte[] to string failed; exception: {0}", exception));
}

if (payload == null || metaData == null) {
log.warn(MessageFormat.format("nothing to send; exception: {0}", event.getEventIdentifier()));
return;
}

DomainEvent domainEvent = new DomainEvent(
event.getType(),
event.getAggregateIdentifier(),
event.getPayload().getType().getName(),
event.getPayload().getType().getRevision(),
event.getSequenceNumber(),
event.getEventIdentifier(),
event.getTimestamp(),
payload,
metaData);

this.sendEvent(domainEvent);
}
}
  • DomainEvent 是为了统一消息的格式包装的类,具体可以看代码这里就不贴了
  • ContractEventPublisher 作为消息统一发送出口,为了不涉及 rabbitmq 暂时以 log 的形式代替消息发送,后续在 Spring Cloud Stream 优化中实现完整的流程

实现消费端

DomainEvent的属性中,我们可以看到有一个sequenceNumber字段,这个字段可以用来保证同一 aggregate 的事件顺序,那么在消费端可以以 type aggregate sequenceNumber 形成一张表,用来记录每个 aggregate 的最新状态,如果 aggregate 数据量比较大,也可以分表存储,一般 aggregate_id 索引之后,单表性能在百万级别,应该都没什么问题。这样在消费的时候先比较 sequenceNumber 差异,只消费差异值为 1 的事件,就可以保证同一 aggregate 的事件被顺序消费。之后会写篇关于 Spring Cloud Stream 的文章,用来作为服务之间的桥梁,并解决框架用 header 作为路由之后引起的问题,这里暂时不做深入。

完整的例子 - branch session7