Event Sourcing 和 CQRS落地(二):UID-Generator 实现

UID Generator

为什么要使用

由于 Event Soucing 是记录事件的,那么 Object Id 肯定就不能是用数据库生成的了,基本上所有的 Event Soucing 相关的框架都是将事件直接序列化,然后对应到 Object,所以这种情况下,就需要自己产生 ID,而自己生成 ID 的话,就有很多限制,比如需要根据时间递增,尽量比较短,在分布式的情况下 ID 保证不能重复等等,本文会比较几种方案,然后选择一种比较好的来实现。

方案选择

数据库

这种方案其实就是基于数据库的自增 ID,各个分布式系统通过一个数据库去分配 ID,由于依赖了数据库,性能肯定是个问题,如果部署多点数据库,不但实现麻烦,而且性能还是取决于数据库数量,所以在分布式系统当中,并发量大的系统一般不会采取该方案。

UUID

UUID是通用唯一识别码 (Universally Unique Identifier),是由一组 32 位数的 16 进制数字所构成,也就是 128 bit。在规范字符串格式中,UUID 的十六个八位字节被表示为 32个十六进制(基数16)数字,以连字号分隔的五组来显示,形式为 8-4-4-4-12,总共有 36个字符(即三十二个英数字母和四个连字号)。例如:

1
2
123e4567-e89b-12d3-a456-426655440000
xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx

N那个位置,只会是8,9,a,b, M那个位置,代表版本号,由于UUID的标准实现有5个版本,所以只会是1,2,3,4,5。不同的版本基于的算法不一样,而在 Java 中最常用的 UUID.randomUUID() 是基于版本 4 的,基于随机数,也会有重复的概率,只是概率特别低,低到几乎可以忽略而已。

由于这种算法生成的 ID 是字符串,而且长度有特别的长,非常不利于建立索引等操作,所以通常不会用来作为主键。

Snowflake

为了满足在分布式系统中可以生成全局唯一且趋势递增的 ID,Twitter 推出了一种算法,该算法由 64 bit 组成。

  • 第一位永远是0,实际上这是为了让生成的 ID 都为正数,以保证趋势递增;
  • 后面 41 位用来记录时间,理论上可以记录 2^41 毫秒,2^41/(24 * 3600 * 365 * 1000) = 69.7 年,所以这里的理论最大使用时间就是 70 年左右;
  • 在后面 10 位用来记录机器 ID ,更准确的应该说是实例 ID,对应的可以是某个 Container 或者某个进程,最多支持 1024 个;
  • 最后12位用来记录序列号,来保证每个实例每毫秒生成的 ID 唯一;

该算法的优点:

  • 不依赖数据库,高性能;
  • 生成的 ID 趋势递增;
  • 64 bit 的数字作为 ID 相比 UUID 短的多,方便建立数据库索引。

该算法的缺点:

  • 依赖系统时钟,如果系统时钟发生回拨,那么有可能造成 ID 冲突或乱序。

基于 Java 的实现

基于上面的分析,这里我们选择使用 Snowflake 来实现,Twitter 官方提供了一个 Scala 版本的实现,在这里我们实现一个 Java 版本,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Slf4j
public class SnowFlake {

private static class TimeBackwardsException extends RuntimeException {
public TimeBackwardsException(String message) {
super(message);
}
}

/**
* 起始的时间戳
*/
private static final long START_STAMP = 1262275200000L;

/**
* 每一部分占用的位数
*/
private static final long SEQUENCE_BIT = 12; //序列号占用的位数
private static final long MACHINE_BIT = 10; //机器标识占用的位数

/**
* 每一部分的最大值
*/
private static final long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private static final long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);

/**
* 每一部分向左的位移
*/
private static final long MACHINE_LEFT = SEQUENCE_BIT;
private static final long TIMESTAMP_LEFT = SEQUENCE_BIT + MACHINE_BIT;

private long machineId; //机器标识
private long sequence = 0L; //序列号
private long lastStamp = -1L;//上一次时间戳

public SnowFlake(long machineId) {

if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.machineId = machineId;
}

/**
* 产生下一个ID
*/
public synchronized Long nextId() {
long currStamp = getNewStamp();
if (currStamp < lastStamp) {
throw new TimeBackwardsException("Clock moved backwards. Refusing to generate id");
}

if (currStamp == lastStamp) {
sequence = (sequence + 1) & MAX_SEQUENCE;

//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
sequence = new Random().nextInt(10);
currStamp = getNextMill();
}
} else {
// 新的一毫秒,随机从 0-9 中开始
sequence = new Random().nextInt(10);
}
lastStamp = currStamp;

return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}

private long getNextMill() {
long mill = getNewStamp();
while (mill <= lastStamp) {
mill = getNewStamp();
}

return mill;
}

private long getNewStamp() {
return System.currentTimeMillis();
}

}

仔细阅读以下这段代码,你会发现有个地方和我们描述的不太一样

1
2
3
4
5
6
7
8
9
10
11
12
if (currStamp == lastStamp) {
sequence = (sequence + 1) & MAX_SEQUENCE;

//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
sequence = new Random().nextInt(10);
currStamp = getNextMill();
}
} else {
// 新的一毫秒,随机从 0-9 中开始
sequence = new Random().nextInt(10);
}

这里再毫秒刷新的时候,我们并没有去把序列号置为 0,而是随机从 0-9 取了一个数,这么做的原因是在并发量不是特别高的时候,如果都从 0 开始的话,会导致生成的 ID 都是偶数,那么在做一些分表操作的时候,会导致严重的分配不均匀,所以这里我们随机从 0-9 开始让产生的 ID 尽可能的分配均匀。但是这么做是会下降性能的,每毫秒的 ID 生成数量会下降一些,但是这并没有下降数量级,完全是可以接受的。

基于 Spring Cloud 分配 Worker Id

上面介绍了如何使用 Snowflake 来生成 ID,那么结合 Spring Cloud ,我们需要给每个节点分配一个 Worker ID,但是由于 Spring Cloud 的特点,它是希望每个节点无状态化的,这就给我们分配 Worker ID 带来了一定的难度,如果我们需要区分每个几点,就不得不将节点信息存储到某个中央,然后再分配,为了便于之后的水平扩展,这里我们基于内部代码实现,大概的原理是在服务启动的时候,记录下节点 IP 和 MAC ,作为 Service Node Key 存储到数据库,这个 Key 在数据库中唯一,通过这个唯一的 Key 给不同的节点分配 ID。下面我们尝试使用 JPA 来实现这一过程。

Spring Cloud 在 2.1.0 之后提供了 getInstanceId() 方法,但是可以为空,所以需要看各个具体实现,我看了 K8S 和 consul 都提供了该方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@Entity
@Getter
@Setter
@NoArgsConstructor
public class WorkerId {

@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;

@Column(unique = true)
private String serviceKey;
}

@Repository
public interface WorkerIdRepository extends JpaRepository<WorkerId, Long> {
WorkerId findByServiceKey(String serviceKey);
}

@Service
@AllArgsConstructor
@Slf4j
public class WorkerIdService {

private final WorkerIdRepository workerIdRepository;
private final Registration registration;

Long getWorkerId() {

String serviceKey = getServiceKey();

WorkerId workerId = workerIdRepository.findByServiceKey(serviceKey);

if (workerId != null) {
return workerId.getId() % (SnowFlake.MAX_MACHINE_NUM + 1);
}

workerId = new WorkerId();
// 如果你的 Spring Boot 版本 >= 2.1.0 并且使用的 Discovery 提供了该方法的实现则可以直接使用
// workerId.setServiceKey(registration.getInstanceId());
workerId.setServiceKey(serviceKey);
workerIdRepository.save(workerId);
return workerId.getId() % (SnowFlake.MAX_MACHINE_NUM + 1);
}

/**
* 由于 Spring Cloud Discovery 的 ServiceInstance 接口没有一个获取 instance id 的方法,所以只能想办法自己标记
* Spring Cloud Discovery 在 2.1.0 之后的版本在接口中提供了 getInstanceId 这一方法,但是可以为空,所以需要各个实现,我看了 K8S 和 consul 都提供了该方法的实现
* @return ip:mac_address 形式的字符串
*/
public String getServiceKey() {
byte[] mac = null;
String hostAddress = null;
try {

Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();

while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (addr instanceof Inet4Address && !addr.isLoopbackAddress() && (networkInterface.getDisplayName().equals("en0") || networkInterface.getDisplayName().equals("eth0"))) {
hostAddress = addr.getHostAddress();
mac = networkInterface.getHardwareAddress();
break;
}
}
if (mac != null && StringUtils.isNotBlank(hostAddress)) {
break;
}
}
} catch (Exception e) {
log.error(e.getMessage());
}
if (mac == null || StringUtils.isBlank(hostAddress)) {
return null;
}
// mac地址拼装成String
StringBuilder macAddress = new StringBuilder();
for (int i = 0; i < mac.length; i++) {
if (i != 0) {
macAddress.append("-");
}
//mac[i] & 0xFF 是为了把byte转化为正整数
String s = Integer.toHexString(mac[i] & 0xFF);
macAddress.append(s.length() == 1 ? 0 + s : s);
}
// 把字符串所有小写字母改为大写成为正规的mac地址并返回
return hostAddress + ":" + macAddress.toString().toUpperCase();
}
}

建立相应的 Entity、Repository、Service,从代码中可以看到 Worker ID 的实现,获取 IP 和 MAC 作为唯一 Key 存入数据库,获取到自增 ID,然后对 Snowflake 的最大 Worker ID 取余,这样便得到了一个可用的 Worker ID。

然而这么做会不会有问题?由于Worker ID 在 0-1023 之间反复,如果某些节点反复重启,超过 1024 次并且一些节点一直没有重启,就会出现 Worker ID 重复的情况。由于我们的业务目前节点的更新一般都是逐个依次重启,所以这里暂时不去处理这个问题,未来如果需要多个节点进行 AB 测试,这个时候可能就会出现某些节点频繁更新,而某些节点不变化的情况,届时可能就要重新考虑分配 ID 的方案了。

下面继续完成上面的方案,我们已经写好了相关的 Service ,剩下的就是在服务启动的时候向数据库写入信息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class UIDGenerator {

private final WorkerIdService workerIdService;
private SnowFlake flake;

@Autowired
public UIDGenerator(WorkerIdService workerIdService) {
this.workerIdService = workerIdService;
}

public Long getId() {
return flake.nextId();
}

@PostConstruct
public void init() {
this.flake = new SnowFlake(workerIdService.getWorkerId());
}
}

这里我们建立一个 UIDGenerator 作为服务的 ID 生成器,在启动的时候,通过 WorkerIdService 获取到一个 Worker ID,并构建 Snowfalke 对象,至此我们的 ID 生成器就基本完成了。