Event Sourcing 和 CQRS落地(七):Spring-Cloud-Stream 优化

Spring Cloud Stream 优化

有哪些问题

Spring Cloud Stream(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:

  1. StreamListener用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。
  2. 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。
  3. 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。

    解决方案

为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:

image

这样以上几点问题都会得到解决,下面我们来看看具体如何实现:

  • 首先定义一个注解用于接受自己分发的事件:
1
2
3
4
5
6
7
8
9
10

@Target( {ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface StreamEventHandler {

String[] payloadTypes() default {""};

String[] types();
}

types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。

  • 定义用于记录 aggregate sequenceNumber 的 entity 和 repository :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Entity
@Table(indexes = @Index(columnList = "aggregateIdentifier,type", unique = true))
@Getter
@Setter
@NoArgsConstructor
public class DomainAggregateSequence {

@Id
@GeneratedValue
private Long id;

private Long sequenceNumber;

private Long aggregateIdentifier;

private String type;
}

@Repository
public interface DomainAggregateSequenceRepository extends JpaRepository<DomainAggregateSequence, Long> {

/**
* 根据 aggregate id 和 type 找到对应的记录
*
* @param identifier
* @param type
*
* @return
*/
DomainAggregateSequence findByAggregateIdentifierAndType(Long identifier, String type);

}
  • 由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@Slf4j
@Component
@AllArgsConstructor
public class StreamDomainEventDispatcher implements BeanPostProcessor {

private final ObjectMapper mapper;

private final DomainAggregateSequenceRepository domainAggregateSequenceRepository;

private HashMap<Object, List<Method>> beanHandlerMap = new HashMap<>();

@Autowired
public StreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository) {
this.mapper = mapper;
this.domainAggregateSequenceRepository = domainAggregateSequenceRepository;
}

@Transactional
public void dispatchEvent(DomainEvent event, String type) {

log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier()));

// 1. 检查是否是乱序事件或者重复事件
Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier());
String eventType = event.getType();
Long eventSequence = event.getSequenceNumber();

DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType);

if (sequenceObject == null) {
sequenceObject = new DomainAggregateSequence();
sequenceObject.setSequenceNumber(eventSequence);
sequenceObject.setAggregateIdentifier(aggregateIdentifier);
sequenceObject.setType(eventType);
} else if (sequenceObject.getSequenceNumber() + 1 != eventSequence) {
// 重复事件,直接忽略
if (sequenceObject.getSequenceNumber().equals(eventSequence)) {
log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber()));
return;
}
throw new StreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence));
} else {
sequenceObject.setSequenceNumber(eventSequence);
}

domainAggregateSequenceRepository.save(sequenceObject);

// 2. 分发事件到各个 handler
beanHandlerMap.forEach((key, value) -> {
Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType());

matchedMethod.ifPresent(method -> {
try {
invoke(key, method, event);
} catch (IllegalAccessException | InvocationTargetException e) {

throw new StreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e);
}
});

if (!matchedMethod.isPresent()) {
log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier()));
}
});

log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier()));
}

@Transactional
public Optional<Method> getMatchedMethods(List<Method> methods, String type, String payloadType) {
// 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性
List<Method> results = methods.stream().filter(m -> {
StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class);
List<String> types = new ArrayList<>(Arrays.asList(handler.types()));
List<String> payloadTypes = new ArrayList<>(Arrays.asList(handler.payloadTypes()));

types.removeIf(StringUtils::isBlank);
payloadTypes.removeIf(StringUtils::isBlank);

if (CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length != 0) {
payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName());
}

boolean isTypeMatch = types.contains(type);

String checkedPayloadType = payloadType;
if (StringUtils.contains(checkedPayloadType, ".")) {
checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType, ".");
}
boolean isPayloadTypeMatch = payloadTypes.contains(checkedPayloadType);

return isTypeMatch && isPayloadTypeMatch;
}).collect(Collectors.toList());

if (results.size() > 1) {
throw new StreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType));
}

return results.size() == 1 ? Optional.of(results.get(0)) : Optional.empty();
}

@Transactional
public void invoke(Object bean, Method method, DomainEvent event) throws IllegalAccessException, InvocationTargetException {

int count = method.getParameterCount();

if (count == 0) {
method.invoke(bean);
} else if (count == 1) {
Class<?> payloadType = method.getParameterTypes()[0];

if (payloadType.equals(DomainEvent.class)) {
method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class));
} else {
method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType));
}

} else if (count == 2) {
Class<?> payloadType0 = method.getParameterTypes()[0];
Class<?> payloadType1 = method.getParameterTypes()[1];

Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0);
Object secondParameterValue = event.getMetaData();

// 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入
if (payloadType0.equals(DomainEvent.class)) {
firstParameterValue = mapper.convertValue(event, payloadType0);
secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1);
}
if (payloadType1.equals(DomainEvent.class)) {
secondParameterValue = mapper.convertValue(event, payloadType1);
}
method.invoke(bean, firstParameterValue, secondParameterValue);
}
}


@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);

List<Method> methods = new ArrayList<>();
for (Method method : uniqueDeclaredMethods) {
StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method,
StreamEventHandler.class);
if (streamListener != null) {
methods.add(method);
}
}
if (!CollectionUtils.isEmpty(methods)) {
beanHandlerMap.put(bean, methods);
}
return bean;
}

}

这里参照了 SCS 本身手机 handler的方式,会将有 @StreamEventHandler 注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。

  • 实现一个比较简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Component
@Transactional
@AllArgsConstructor
public class DomainEventDispatcher {

private final StreamDomainEventDispatcher streamDomainEventDispatcher;

@StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition = "headers['messageType']=='eventSourcing'")
public void handleBuilding(@Payload DomainEvent event) {
streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT);
}
}

@Component
@AllArgsConstructor
@Transactional
public class ContractEventHandler {

@StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)
public void handle(ContractCreatedEvent event) {
// 实现你的 view 层更新业务
}
}

注意:

  • AbstractDomainEventDispatcher中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional会失效,这个有兴趣的同学可以研究一下@Transactional的机制。

至此以上几点就优化完了。

其他优化

错误处理

基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:

  1. 拒绝这个消息,丢在 broker 原先的队列里。
  2. 将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。

方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:

image

  • 首先让 SCS 为我们自动生成一个 DLQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
spring:
application:
name: event-sourcing-service
datasource:
url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE
username: root
password: root
jpa:
hibernate:
ddl-auto: update
use-new-id-generator-mappings: false
show-sql: false
properties:
hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
rabbitmq:
host: localhost
port: 5672
username: creams_user
password: Souban701
cloud:
stream.bindings:
contract-events: # 这个名字对应代码中@input("value") 的 value
destination: contract-events # 这个对应 rabbit 中的 channel
contentType: application/json # 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下
contract-events-input:
destination: contract-events
contentType: application/json
group: event-sourcing-service
durableSubscription: true
stream.rabbit.bindings.contract-events-input.consumer:
autoBindDlq: true
republishToDlq: true
deadLetterQueueName: contract-error.dlq
logging:
level.org:
springframework:
web: INFO
cloud.sleuth: INFO
apache.ibatis: DEBUG
java.sql: DEBUG
hibernate:
SQL: DEBUG
type.descriptor.sql: TRACE

axon:
serializer:
general: jackson

加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。

  • 在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
public class DLXHandler implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {

private final RabbitTemplate rabbitTemplate;

private ApplicationContext applicationContext;

private static final String DLQ = "contract-error.dlq";

@Autowired
public DLXHandler(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成
if (event.getApplicationContext().equals(this.applicationContext)) {
// 启动后获取 dlq 中所有的消息,进行消费
Message message = rabbitTemplate.receive(DLQ);
while (message != null) {
rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message);
message = rabbitTemplate.receive(DLQ);
}
}

}
}

由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。

完善之前的 CQRS 例子

经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。

  1. 添加时间的监听
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)
    public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {
    QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());

    ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();

    ContractView view = new ContractView();
    view.setIndustryName(aggregate.getIndustryName());
    view.setId(aggregate.getIdentifier());
    view.setPartyB(aggregate.getPartyB());
    view.setPartyA(aggregate.getPartyA());
    view.setName(aggregate.getName());
    view.setDeleted(aggregate.isDeleted());

    contractViewRepository.save(view);
    }

StreamDomainEventDispatcher 对传参做了一些处理,当有两个参数的时候会将 DomainEvent 传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。

  1. 删除原来的 ContractViewHandler 即可。

完整的例子 - branch session6