CREAMS 小组

  • Home

  • Categories

  • Tags

  • Archives

RabbitMQ 延时消息的应用与实现

Posted on 2020-03-12 | In 后端 | Comments: | Views:

前言

在实际的业务开发中,经常会碰到一些类似定时的任务,比如每天早上 9 点提醒用户业务进度,或者 30 分钟后锁定某个账单之类的操作。类似这样的业务需求,比较古老的操作就是使用各种定时器来实现,然而面对现在的开发更新速度,以及容器化、微服务架构的流行和普及,原本类似 Crontab 这种简单的东西使用起来会有各种限制,比如在最简单的情况你可以使用 Spring 的 @Scheduled 注解来轻松的实现一些定时需求,但现在你可能就需要考虑多如何将定时任务分配给多个节点,某个时间重新构建容器会不会丢失一些定时任务等等一些问题。这种情况延迟消息能解决掉我们大部分的需求,大概的流程就是丢一个消息给中间件,告诉它什么时候投递出去,投递后再由消费端消费(负责去通知/发邮件/发短信)。由于我们公司一直使用的 Spring Cloud 全家桶,RabbitMQ 也已经使用很多年,这里就讲讲如何在Spring Cloud Stream RabbitMQ下以最小的成本实现延时消息,所以在阅读文章时可能需要有 Spring Cloud Stream 相关的基础知识。

实现

概述

实际上 RabbitMQ 的延迟消息是通过 rabbitmq-delayed-message-exchange 这个插件实现的,开启了这个插件之后,只要在 exchange 上定义 x-delayed-message 类型,然后在发送消息的时候 header 带上 x-delay (毫秒数),就可以实现延迟消息。我们可以利用这一特性,来实现消息的延迟/定时发送。

理想的模型是这个样子的,但实际在应用中难免会有一些出入,在文末问题区会做一些相关的讨论。

RabbieMQ 插件安装

由于我们的 RabbitMQ 服务器都是跑在 Docker 容器里,所以这里就相对简单,新建一个 Dockerfile:

1
2
3
4
5
6
7
FROM rabbitmq:3.7.8-management-alpine

RUN cd /plugins \
&& wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip \
&& unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip \
&& rm rabbitmq_delayed_message_exchange-20171201-3.7.x.zip \
&& rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果你使用的是 3.8.x 版本,注意修改对应的版本。另外要是不跑在容器当中,RabbitMQ 官网也有说明,这里就不再赘述。

Sprint Cloud Stream 优化

stream 配置有以下几个问题:

  • 需要在配置中编写 destination 等配置,然而这些配置往往都是模板化的
  • 配置完毕后,代码中还需要将 channel 和 bean 的名称对应上,属于 hard code,如果对不上,或者某一边忘记删除,都会出错

实际上在我们整个微服务的使用来看,大部分的配置内容都是一样的,我们先来看一下比较常见的模板:

1
2
3
4
5
6
7
xxx-events: # 这里是 input
group: yyy-service
contentType: application/json # 新的 stream 版本可以提供默认配置,不用每个指定
durableSubscription: true
yyy-events: # 这里是 output
destination: yyy-events
contentType: application/json # 新的 stream 版本可以提供默认配置,不用每个指定

如果一个 service 以领域事件向外输出消息,那么 RabbitMQ 的模板长成这个样子是比较常见的,统一由 yyy-events 发出领域事件,通过 xxx-events(各种领域)来接收领域事件。这样实际我们只需要根据服务的名称去生成对应的配置即可,如果你自己的业务有所不同,则需要根据自己的使用方式来生成配置。

下面给出实现上述配置的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@Configuration
@ConditionalOnClass(RabbitServiceAutoConfiguration.class)
public class StreamAutoConfiguration {

@Autowired
private ConfigurableEnvironment environment;

private static final String PROPERTY_SOURCE_NAME = "creams-stream-rabbit-spring-boot-starter:stream-rabbit-config-properties";

private static final String RABBIT_PREFIX = "spring.cloud.stream.rabbit.bindings.";

@Autowired
private ApplicationContext context;


@Bean
@Primary
BindingServiceProperties customProps(BindingServiceProperties bindingServiceProperties) {

Map<String, Object> beanMap = context.getBeansWithAnnotation(EnableBinding.class);

Map<String, Object> map = new HashMap<>();

beanMap.forEach((key, value) -> {
EnableBinding annotation = context.findAnnotationOnBean(key, EnableBinding.class);
if (annotation != null) {
setPropsForAnnotation(annotation, map, bindingServiceProperties);
}
});

addOrReplace(environment.getPropertySources(), map);

return bindingServiceProperties;
}

private void setPropsForAnnotation(EnableBinding annotation, Map<String, Object> map, BindingServiceProperties bindingServiceProperties) {
Stream.of(annotation.value()).forEach(type -> ReflectionUtils.doWithMethods(type, method -> {
Input input = AnnotationUtils.findAnnotation(method, Input.class);
Output output = AnnotationUtils.findAnnotation(method, Output.class);
Destination destination = AnnotationUtils.findAnnotation(method, Destination.class);

String destinationName = Optional.ofNullable(destination).map(Destination::destination).orElse("");
if (input != null) {
// 设置 input 的内容
if (StringUtils.isEmpty(destinationName)) {
destinationName = input.value();
}
String name = BindingBeanDefinitionRegistryUtils
.getBindingTargetName(input, method);
BindingProperties properties = bindingServiceProperties.getBindingProperties(name);
properties.setGroup(getApplicationName());
properties.setDestination(destinationName);
String rabbitPrefix = RABBIT_PREFIX + name + ".consumer";
map.put(rabbitPrefix + ".autoBindDlq", true);
map.put(rabbitPrefix + ".republishToDlq", true);
map.put(rabbitPrefix + ".deadLetterQueueName", getErrorQueueName());
// 如果注解上声明了 isDelayedExchange 为 true,则加上对应的配置
if (Optional.ofNullable(destination).map(Destination::isDelayedExchange).orElse(false)) {
map.put(rabbitPrefix + ".delayedExchange", true);
}
} else if (output != null) {
// 设置 output 的内容
if (StringUtils.isEmpty(destinationName)) {
destinationName = output.value();
}
String name = BindingBeanDefinitionRegistryUtils
.getBindingTargetName(output, method);
BindingProperties properties = bindingServiceProperties.getBindingProperties(name);
properties.setDestination(destinationName);

String rabbitPrefix = RABBIT_PREFIX + name + ".producer";
// 如果注解上声明了 isDelayedExchange 为 true,则加上对应的配置
if (Optional.ofNullable(destination).map(Destination::isDelayedExchange).orElse(false)) {
map.put(rabbitPrefix + ".delayedExchange", true);
}
}
}));
}

private String getApplicationName() {
return environment.getProperty("spring.application.name");
}

// 定义 DLQ 的名称
public String getErrorQueueName() {
return getPureServiceName() + "-error.dlq";
}

// 这里由于我们自己的服务通常起名 creams-xxx-service, 所以会做一次去头去尾的操作,你可以自定义
private String getPureServiceName() {
String[] names = getApplicationName().split("-");
List<String> nameList = new ArrayList<>(Arrays.asList(names));
if (nameList.size() >= 3) {
nameList.remove(0);
nameList.remove(nameList.size() - 1);
}
return String.join("-", nameList);
}

// 添加或者替换 KV
private void addOrReplace(MutablePropertySources propertySources,
Map<String, Object> map) {

MapPropertySource target = null;
if (propertySources.contains(PROPERTY_SOURCE_NAME)) {
PropertySource<?> source = propertySources.get(PROPERTY_SOURCE_NAME);
if (source instanceof MapPropertySource) {
target = (MapPropertySource) source;
for (String key : map.keySet()) {
if (!target.containsProperty(key)) {
target.getSource().put(key, map.get(key));
}
}
}
}
if (target == null) {
target = new MapPropertySource(PROPERTY_SOURCE_NAME, map);
}
if (!propertySources.contains(PROPERTY_SOURCE_NAME)) {
propertySources.addLast(target);
}
}
}

// 提供一个注解,用于自定义一些内容
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE,
ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Destination {

@AliasFor("destination")
String value() default "";

@AliasFor("value")
String destination() default "";

boolean isDelayedExchange() default false;
}

延迟消息的实现

这里我以实现一个小 Demo 来展示所要实现的东西,这个 Demo 实现发送延迟消息,并自我接收后消费消息。为了节省篇幅,这里略去 Demo 工程的建立以及配置过程,文末将附上地址。

  • 建立输入输出的 interface :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface BindingChannelOutput {

String DEMO_EVENTS = "demo-events";

@Output(DEMO_EVENTS)
@Destination(isDelayedExchange = true)
MessageChannel contract();
}

public interface BindingChannelInput {

String DEMO_INPUTS = "demo-events";

@Input(DEMO_INPUTS)
@Destination(isDelayedExchange = true)
SubscribableChannel input();
}

注意这里和一般的 Stream 例子 有一些不一样, 用到了我们之前优化的注解,isDelayedExchange 可以指定为 delay exchange。

  • 再建立一个 publisher 用于发送消息,以及一个 Listener 用于接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
@AllArgsConstructor
@Slf4j
public class Publisher {

private final BindingChannelOutput outputChannel;

public void send(String content, Long delayMillSeconds) {

MessageBuilder<String> builder = MessageBuilder.withPayload(content);
if (delayMillSeconds > 0) {
builder.setHeader("x-delay", delayMillSeconds);
}
log.info(MessageFormat.format("pushing message [{0}] to remote", content));
outputChannel.output().send(builder.build());
}
}

@Component
@Slf4j
public class Listener {

@StreamListener(BindingChannelInput.DEMO_INPUTS)
public void update(@Payload String test) {
// 发通知/邮件/短信
log.info(MessageFormat.format("received message [{0}]", test));
}
}

启动工程后在 RabbitMQ 的控制台应该可以看到两个 queue ,一个是 demo-events.demo-service 另一个是 demo-service-error.dlq ,这说明我们的配置优化生效了。

  • 写一个接口来测试延迟消息
1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@AllArgsConstructor
public class TestController {

private Publisher publisher;

@PostMapping("/test")
public void test(@RequestParam("content") String content) {

publisher.send(content, 20000L);
}
}

在控制台可以看到打印时间相差 20s

1
2
2020-03-10 17:17:40.691  INFO 77487 --- [nio-8080-exec-5] com.example.demo.Publisher               : pushing message ["测试"] to remote
2020-03-10 17:18:00.483 INFO 77487 --- [.demo-service-1] com.example.demo.Listener : received message ["测试"]

至此延迟消息的实现就完成了,具体到业务的话,都是在消费端进行,比如做一些邮件/短信发送或者通知之类的。

问题&思考

发出去的消息不可查询

当我们的业务服务向 RabbitMQ 发送消息后,RabbitMQ 在没有到时间的时候是看不到的,那么要保证可靠,就需要在业务服务中记录是否投递到 RabbitMQ,可以参考可靠消息的投递,在本地做记录。

延迟时间的限制

文档中提到过延迟的最大毫秒数是 2^32-1 ,大概是 49 天多一点,不过一般也不会往 RabbitMQ 堆这么长时间的消息。一般如果是 1-2 天内需要推送/发通知的业务,我们会直接丢给 RabbitMQ,剩下的比如定了一个一年后的提醒之类的业务,都采用在夜间流量少的时候去查一下第二天需要发送的内容,然后丢到 RabbitMQ 来完成。所以定时之类的操作根据业务的具体情况可能还会存在,但是会大大减少。

禁用插件后所有的未发布的消息会丢失

这个其实还好,一般不会去做这样的操作。

delay exchange 的路由类型

路由类型还是取决于 exchange 本身的类型,由于 Spring Cloud Stream 默认都是 topic 类型,所以这里保持一致。

Event Sourcing 和 CQRS落地(八):服务优化

Posted on 2019-06-14 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

服务优化

失败消息的补偿机制

由于消息存在发送失败的情况,比如 broker 临时下线或者不可用了,尽管这种情况很少,我们最好做一个机制可以定期或者手动检查,并且尝试自己发送,这里我们就来实现这个机制。

提供未发送的 event 查询

  1. 在CustomDomainEventEntryRepository中加入:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 查找未发送的事件
*
* @param pageable
*
* @return
*/
Page<CustomDomainEventEntry> findBySentFalse(Pageable pageable);

/**
* 查询未发送事件的数量
* @return
*/
Long countBySentFalse();
  1. 将未发送事件的数量集成到 actuator,让我们可以事实看到失败消息的数量:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@AllArgsConstructor
public class EventHealthContributor implements InfoContributor {

private final CustomDomainEventEntryRepository customDomainEventEntryRepository;

@Override
public void contribute(Info.Builder builder) {
Long count = customDomainEventEntryRepository.countBySentFalse();

builder.withDetail("failedMessage", count);
}
}

打开 http://localhost:8080/actuator/info 应该就可以看到我们的失败消息数量。

Read more »

Event Sourcing 和 CQRS落地(七):Spring-Cloud-Stream 优化

Posted on 2019-06-13 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

Spring Cloud Stream 优化

有哪些问题

Spring Cloud Stream(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:

  1. StreamListener用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。
  2. 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。
  3. 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。
    Read more »

Event Sourcing 和 CQRS落地(六):实现可靠消息

Posted on 2019-06-13 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

实现可靠消息

什么是可靠消息

微服务盛行的时代下,消息成为了不可缺少的组件,首先我们看一个例子,contract 系统创建了一个合同,然后发送创建合同的消息。看似简单,实际上分析一下它的出错可能性,会有以下几种:

  1. 创建合同成功,发送消息失败;
  2. 创建合同失败,发送消息成功;
  3. 创建合同成功,发送消息成功;
  4. 创建合同失败,发送消息失败;

同时成功或者同时失败,这个情况是一致的,是正确的,我们需要关心的就是不一致的情况。那么最简单的办法,就是让创建合同和发送消息成为一个事务,要么一起成功,要么一起失败,但是这么做的话耦合性太强,本身合同创建成功了,却因为消息发送的失败强制回滚。这个时候,可能就想到了存储消息数据,将合同创建和消息数据的存储作为一个事务,消息发送成功之后再去删除消息数据,定期去扫描未发送的消息数据,来保证消息的发送。但是这么做还是有一定的代价的,需要实现消息的存储,消息存储和合同创建还是耦合在一起的,不过这样的模式到 Event Sourcing 下面那就比较理想了,因为本身消息数据和 event 是一样的,存储了 event 相当于完成了存储消息数据,只需要在 event 下做一个标记即可。

做完了上面这些,就能保证消息一定从 producer 到 broker 这一过程,当然要做到消息不丢,必然产生的结果就是消息可能会重复,情况就是 broker 收到了消息,但是没有通知到 producer,这种情况下 producer 是认为消息没有投递成功的,会出现重复投递的情况。保证了消息一定送达 broker 之后,就是 consumer 和 broker 的关系了,consumer 在消费之后需要告诉 broker 消费成功,否则 broker 需要一直保存这些消息。当然消费端可能需要做更多的事情,比如保证同一 aggregate 事件的顺序消费。下面文章会以在 Axon 框架上做一些拓展,以分别实现 consumer 和 producer。

Read more »

Event Sourcing 和 CQRS落地(五):深入使用-Axon

Posted on 2019-06-12 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

深入使用 Axon

实现 snapshot

在前面文章中,我们略微涉及了一些 snapshot 的概念,其实这个概念还是比较好理解的,当事件堆积到一定的程度,每次 load 都会花费一定的时间,这个时候自然会想到 snapshot,先将一部分事件进行计算,然后生成一个 snapshot,后续 load 的时候先读取 snapshot,这样就省去了很多计算过程。如果你读过之前改写 event store 部分的代码,就会发现每个 aggregate 实际上只会存储一个 snapshot,每当新的生成的时候会替换老的。对一个 aggregate 来说,snapshot 同样和事件差不多,它都可以当做事件来处理,但是对于 aggregate 来说,我并不想去处理 snapshot,我只是需要一个计算好的结果而已。基于这个,Axon 给我们提供了 AggregateSnapshotter,这类 snapshot 可以直接还原 aggregate 状态。

了解大概的原理之后,我们就要做的事情其实比较明确了:

  1. 告诉 Axon 我们要在什么时候触发 snapshot 的生成。
  2. 告诉 Axon 我们要生成什么样的 snapshot。
    Read more »

Event Sourcing 和 CQRS落地(四):CQRS 实现

Posted on 2019-06-11 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

实现 CQRS

在实现了 EventSoucing 之后,亟待解决的问题是查询了,理论上同一 Service 可以做到多数据源,甚至多数据库,这篇文章就暂时以同一个数据库为例子,同样使用 JPA 去做 view 的 ORM。

建立 entity

第一步当然是建立对应的 Entity 和对应的 Repository 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Entity
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ContractView implements ContractInterface {

@Id
@Column(length = 64)
private Long id;

private String name;

private String partyA;

private String partyB;

private boolean deleted = false;

}

@Repository
public interface ContractViewRepository extends JpaRepository<ContractView, Long> {

}
Read more »

Event Sourcing 和 CQRS落地(三):Event-Sourcing 实现

Posted on 2019-06-11 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

实现 Event Soucing

在了解了相关的基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐在之后的过程去丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot 工程搭建

打开 http://start.spring.io/ 选择对应的版本(这里是 2.1.5 )以及相应的依赖,这里多选了一些之后会用到的服务:

image

添加配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
application:
name: event-sourcing-service
datasource:
url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE
username: root
password: root
jpa:
hibernate:
ddl-auto: update
use-new-id-generator-mappings: false
show-sql: false
properties:
hibernate.dialect: org.hibernate.dialect.MySQL55Dialect

为了便于测试,这里我开启了 JPA 的自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此一个简单的服务就搭建完毕了。

Read more »

Event Sourcing 和 CQRS落地(二):UID-Generator 实现

Posted on 2019-06-10 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

UID Generator

为什么要使用

由于 Event Soucing 是记录事件的,那么 Object Id 肯定就不能是用数据库生成的了,基本上所有的 Event Soucing 相关的框架都是将事件直接序列化,然后对应到 Object,所以这种情况下,就需要自己产生 ID,而自己生成 ID 的话,就有很多限制,比如需要根据时间递增,尽量比较短,在分布式的情况下 ID 保证不能重复等等,本文会比较几种方案,然后选择一种比较好的来实现。

方案选择

数据库

这种方案其实就是基于数据库的自增 ID,各个分布式系统通过一个数据库去分配 ID,由于依赖了数据库,性能肯定是个问题,如果部署多点数据库,不但实现麻烦,而且性能还是取决于数据库数量,所以在分布式系统当中,并发量大的系统一般不会采取该方案。

UUID

UUID是通用唯一识别码 (Universally Unique Identifier),是由一组 32 位数的 16 进制数字所构成,也就是 128 bit。在规范字符串格式中,UUID 的十六个八位字节被表示为 32个十六进制(基数16)数字,以连字号分隔的五组来显示,形式为 8-4-4-4-12,总共有 36个字符(即三十二个英数字母和四个连字号)。例如:

1
2
123e4567-e89b-12d3-a456-426655440000
xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx

N那个位置,只会是8,9,a,b, M那个位置,代表版本号,由于UUID的标准实现有5个版本,所以只会是1,2,3,4,5。不同的版本基于的算法不一样,而在 Java 中最常用的 UUID.randomUUID() 是基于版本 4 的,基于随机数,也会有重复的概率,只是概率特别低,低到几乎可以忽略而已。

由于这种算法生成的 ID 是字符串,而且长度有特别的长,非常不利于建立索引等操作,所以通常不会用来作为主键。

Read more »

Event Sourcing 和 CQRS落地(一):前言

Posted on 2019-06-10 | Edited on 2019-06-28 | In 后端 | Comments: | Views:

Event Sourcing And CQRS

Event Sourcing 、CQRS 简述

Event Sourcing 简单来说就是记录对象的每个事件而不是记录对象的最新状态,比如新建、修改等,只记录事件内容,当需要最新的状态的时,通过堆叠事件将最新的状态计算出来。那么这种模式查询的时候性能会变的非常差,这个时候就涉及到了 CQRS ,简单的理解就是读写分离,通过事件触发,将最新状态保存到读库,查询全都走读库,理论上代码层,数据库层,都可以做到分离,当然也可以不分离,一般来说为了保证数据库性能,这里起码会将数据库分离。

为什么要使用

了解了 Event Sourcing 的基本内容之后,我们可以发现这个模式有很多的好处:

  • 记录了对象的事件,相当于记录了整个历史,可以查看到任意一个时间点的对象状态;
  • 都是以事件形式进行写入操作,理论上在高并发的情况下,没有死锁,性能会快很多;
  • 可以基于历史事件做更多的数据分析。

Event Soucing 通常会和 DDD CQRS 一起讨论,在微服务盛行的前提下,DDD 让整个软件系统松耦合,而 Event Soucing 也是面向 Aggregate,这个模式很符合 DDD 思想,同时由于 Event Soucing 的特性,读取数据必然会成为瓶颈,这个时候 CQRS 就起到做用了,通过读写分离,让各自的职责专一,实际上在传统的方式中我们也可能会这么干,只是方式略微不同,比如有一个只读库,时时同步主库,让查询通过只读库进行,那么如果查询量特别大的时候,起码写库不会因为查询而下降性能。

背景

由于我们公司的技术体系基本是 Spring 全家桶,而 Java 界似乎 Axon 又是比较流行的 Event Sourcing 框架,本着对新技术的尝试以及某些业务也确实有这方面的需求的出发点,对 Axon 做了一些尝试。后面的一系列文章将会以 Spring Cloud 作为背景,探讨 Axon 如何使用,以及如何出处理一些常见的业务需求(溯源、读写分离、消息可靠等),所以在看后面的文章之前最好对 Spring Boot、Spring Cloud、Spring Cloud Stream、Spring Data JPA 等有一些基本的了解。

目录

  1. Event Sourcing 和 CQRS落地(一):前言
  2. Event Sourcing 和 CQRS落地(二):UID-Generator 实现
  3. Event Sourcing 和 CQRS落地(三):Event-Sourcing 实现
  4. Event Sourcing 和 CQRS落地(四):CQRS 实现
  5. Event Sourcing 和 CQRS落地(五):深入使用-Axon
  6. Event Sourcing 和 CQRS落地(六):实现可靠消息
  7. Event Sourcing 和 CQRS落地(七):Spring-Cloud-Stream 优化he
  8. Event Sourcing 和 CQRS落地(八):服务优化

最终例子

Hexo的介绍以及使用

Posted on 2016-02-26 | Edited on 2019-06-14 | In 其他 | Comments: | Views:

简介

hexo是一个基于Node.js的静态博客程序,其编译上百篇文字只需要几秒。hexo生成的静态网页可以直接放到GitHub Pages,BAE,SAE等平台上。基于Github Pages的博客系统还是很流行的,所以我打算将我们以后所有的session全都放到这个平台上面,这样对内有一个记录,以后新人进来可以查阅,对外也有一定的输出,如果以后质量上去了,还是有希望做到有一定的影响力的。关于session大家可以放开了讲,也可以并不完全是技术方面的,只要是分享即可:]。

环境

Node

开发人员应该对Node比较熟悉了,就算不熟悉或多或少也是听到过的,没听到过的也没关系,到Node官网下载最新的版本安装即可。

Git

同样要使用hexo需要安装Git,相信大部分也都已经安装了,如果你从来没了解过Git,建议去Git官网下载安装最新版本,同时使用谷歌搜索引擎简单的了解一下Git的使用:]。

Github

Github是一个非常好的地方,不管是设计狮、攻城狮、产品狗都可以去了解一下,上面有很多值得学习或者阅读的东西。前面说了hexo是可以将静态文件放多很多的平台上的,我们这里就使用比较流行的Github Pages,所以最起码你得有个账号。

Read more »

Jokefaker

记录小伙伴们的分享
10 posts
2 categories
5 tags
© 2020 Jokefaker
Powered by Hexo v3.8.0
|
Theme – NexT.Muse v7.1.2