knight_ka | 生活及学习笔记

zookeeper 基于zk实现动态感知服务器上下线

zookeeper 基于zk实现动态感知服务器上下线

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package io.github.dearas.Zookeeper.DistributeClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 动态感知服务器上下线! 客户端
* Created by tnp on 19/04/2017.
*/
public class JavaDistributeClient {
private String serverAddr = "192.168.1.99:2181";
/**
* sessionTimeout的作用是当一次会话中由于网络或者其他原因,zk客户端与服务端断开连接了。客户端请求连接其他zk服务器的时候,
* 为了保证能够连接上或者连接不上,需要设置一个会话超时时间。如果在这个时间内没有重连成功,则会话就结束。zk会删除这次会话中的临时数据:如Ephemeral_Sequential等。
* 会话超时时间的大小是有限制的:
* 默认zoo.cfg配置文件中的ticktime=2000
* sessionTimeout的带下必须在 2*ticktime 和 20*ticktime之间,如果设置大小不在这个范围,则自动使用20*ticktime
*/
private int sessionTimeout = 4000;
// zk对象
private ZooKeeper zk;
/**
* java.util.concurrent 并发包下的内容。
* countDownLatch.await();
* 当countdown为0时,自动释放notify
*/
private CountDownLatch countDownLatch = new CountDownLatch(1);
private String serverName = "/servers";
/**
* 初始化zk连接
* @throws IOException
*/
private void init() throws IOException {
//1. 创建一个zookeeper对象 -- 连接zk服务器conn --创建一个zk服务器的监听listener
zk = new ZooKeeper(serverAddr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 监听器回调逻辑
event.getPath(); // 哪个节点
event.getType(); // 事件类型EventType枚举类型None NodeDeleted NodeDataChanged NodeCreated NodeChildrenChanged
event.getState(); // 连接状态 SyncConnected 为保持连接中
event.getWrapper(); // 监听器事件
System.out.println("Event.KeeperState.SyncConnected " + event.getState());
System.out.println("event.getWrapper()" + event.getWrapper());
if(Event.KeeperState.SyncConnected == event.getState()){
// 如果连接成功
countDownLatch.countDown();
}
/**
* 因为一个zk中可能会监听多个类型的时间,所以为了判断是不是自己监听的时间,在这里做判断。
* 虽然此程序中之监听了servers子节点的变化这一个事件。
*/
if(Event.EventType.NodeChildrenChanged == event.getType()){
try {
List<String> children = zk.getChildren(serverName, true);
System.out.println("服务器组改变:"+children);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 客户端具体操作
* 客户端本身是不需要去zk中注册信息的,客户端的主要作用的监听服务器的上下线。所以只需要监听Znode的子节点变化情况即可。
*/
private void clientOperation() throws KeeperException, InterruptedException {
System.out.println("Client 启动...");
List<String> children = zk.getChildren(serverName, true);
System.out.println("服务器组 : " + children);
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
JavaDistributeClient client = new JavaDistributeClient();
client.init();
client.clientOperation();
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package io.github.dearas.Zookeeper.DistributeClient;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* 动态感知服务器上下线 服务端。
*
* Created by tnp on 21/04/2017.
*/
public class JavaDistributeServer {
private String serverAddr = "192.168.1.99:2181";
private String serverName = "/servers";
private int sessionTimeout = 6000;
private ZooKeeper zk;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private void init() throws IOException {
zk = new ZooKeeper(serverAddr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected == event.getState()){
countDownLatch.countDown(); // -1
}
}
});
}
/**
* 服务器上线
*/
private void onLineServer(String hostname) throws KeeperException, InterruptedException {
String s = zk.create(serverName + "/serverName", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(s + "服务器上线: " + hostname);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
System.out.println(Runtime.getRuntime().availableProcessors());
JavaDistributeServer server = new JavaDistributeServer();
server.init();
//server.onLineServer(args[0]);
server.onLineServer("server");
}
}