主要内容
etcd 的特点
- KV 数据库, github 描述为: Distributed reliable key-value store for the most critical data of a distributed system
- 较为轻量, 和etcd交互的时候就是 操作(查询,设置,删除,监听)一个 字符串的 键值对
- etcd 支持集群 但是笔者没有使用过。
- 每个KEY可以设置TTL, 并且存在 mod revision, 即修改版本号。
应用场景
- 服务注册与发现
- 一些读者可能觉得nacos 比较好, 因为它可以以一个非常小的编码操作来实现这个功能。 那么笔者认为可以继续这样使用, 没必要一定要换到etcd。
- 笔者没有选择 nacos 的原因是因为笔者觉得 nacos 可能比较庞大, 笔者想引入一个轻量级的库。
- 笔者实现的思路是:
- 每个服务器上报自己的信息到etcd上。
- KEY的格式是:
/servers/TYPE/ID
- 上传间隔60秒, KEY的过期时间为65秒
- 只有当前服务器自己会上传信息到这些KEY, 别的服务器会拉取这些KEY的信息, 但是不会修改。
- 网关服务器使用 key prefix 来查询其他所有在线的服务器, 并尝试连接他们。
- 登录服使用 key prefix 来查询其他所有的网关服务器, 并根据网关在线人数 进行负载均衡。
- 配置表读取和热更新
- 配置表的KEY格式为:
/config/GROUP/CONFIG_FILENAME
突然发现, 服务器的KEY也可以把GROUP带上
- GROUP是服务器分组, 当多个服务器需要使用同一个etcd的时候 可以使用GROUP 进行分组。
- 配置表数据拉取到之后, 将这个KEY的 mod revision 记录一下。
- 定时轮询一下etcd , 或者使用 WatchClient 来监视一下KEY, 如果key被修改了 就进行数据更新。 如果轮询的话,可以对比mod revision
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
@Data
public static class ServerRunningInfo {
private int id;
private int onlineCount;
private String group = "test";
private String host;
private int port;
private String uuid;
}
public static ByteSequence seq(String str) {
return ByteSequence.from(str, StandardCharsets.UTF_8);
}
public static void main(String[] args) {
ObjectMapper objectMapper = new ObjectMapper();
Client client = Client.builder().endpoints("http://127.0.0.1:2379")
.password(seq("123456")).build();
KV kvClient = client.getKVClient();
var option = GetOption.builder().isPrefix(true).build();
kvClient.get(seq("/servers/Fight/"), option).thenAccept(r -> {
if (r.getCount() <= 0) {
return;
}
for (KeyValue kv : r.getKvs()) {
long oldModRevision = 1; // 应该保存在某处, 在此地获取
if (kv.getModRevision() == oldModRevision) {
continue;
}
try {
ServerRunningInfo runningInfo = objectMapper.readValue(kv.getValue().getBytes(), ServerRunningInfo.class);
// connect to server or some thing
System.out.println(runningInfo);
oldModRevision = kv.getModRevision();
} catch (StreamReadException e) {
e.printStackTrace();
} catch (DatabindException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}).join(); // 当前主线程没其他事情的时候, 如果这里不 join的话, 程序就会自动退出了。
Lease leaseClient = client.getLeaseClient();
var leaseId = leaseClient.grant(65).join().getID(); // 设置65秒的 TTL
// 模拟放置服务器信息, 假设目前是网关服务器
ServerRunningInfo myInfo = new ServerRunningInfo();
try {
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
kvClient.put(seq("/servers/Gate/" + myInfo.getId()),
ByteSequence.from(objectMapper.writeValueAsBytes(myInfo)),
putOption)
.join(); // 如果主线程或者说当前线程有其他任务的话, 就不需要join
} catch (JsonProcessingException e) {
e.printStackTrace();
}
Watch watchClient = client.getWatchClient();
watchClient.watch(seq("/config/test/A.json"), r -> {
for (WatchEvent e : r.getEvents()) {
KeyValue keyValue = e.getKeyValue();
long oldModRevision = 1; // 应该保存在某处, 在此地获取
if (keyValue.getModRevision() == oldModRevision) {
continue;
}
var jsonContent = keyValue.getValue().toString();
// do something with jsonContent ...
oldModRevision = keyValue.getModRevision();
}
});
}
|
可以看到, 总共的代码行数也不是特别多, 但是需要的功能都可以实现。
自己实现的好处是减少黑盒占比, 出现了问题的时候方便解决。