zookeeper 基于zk实现动态感知服务器上下线 发表于 2017-04-26 | 分类于 zookeeper | | 阅读次数 zookeeper 基于zk实现动态感知服务器上下线 zookeeper 基于zk实现动态感知服务器上下线客户端123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596package io.github.dearas.Zookeeper.DistributeClient;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;/** * 动态感知服务器上下线! 客户端 * Created by tnp on 19/04/2017. */public class JavaDistributeClient { private String serverAddr = "192.168.1.99:2181"; /** * sessionTimeout的作用是当一次会话中由于网络或者其他原因,zk客户端与服务端断开连接了。客户端请求连接其他zk服务器的时候, * 为了保证能够连接上或者连接不上,需要设置一个会话超时时间。如果在这个时间内没有重连成功,则会话就结束。zk会删除这次会话中的临时数据:如Ephemeral_Sequential等。 * 会话超时时间的大小是有限制的: * 默认zoo.cfg配置文件中的ticktime=2000 * sessionTimeout的带下必须在 2*ticktime 和 20*ticktime之间,如果设置大小不在这个范围,则自动使用20*ticktime */ private int sessionTimeout = 4000; // zk对象 private ZooKeeper zk; /** * java.util.concurrent 并发包下的内容。 * countDownLatch.await(); * 当countdown为0时,自动释放notify */ private CountDownLatch countDownLatch = new CountDownLatch(1); private String serverName = "/servers"; /** * 初始化zk连接 * @throws IOException */ private void init() throws IOException { //1. 创建一个zookeeper对象 -- 连接zk服务器conn --创建一个zk服务器的监听listener zk = new ZooKeeper(serverAddr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 监听器回调逻辑 event.getPath(); // 哪个节点 event.getType(); // 事件类型EventType枚举类型None NodeDeleted NodeDataChanged NodeCreated NodeChildrenChanged event.getState(); // 连接状态 SyncConnected 为保持连接中 event.getWrapper(); // 监听器事件 System.out.println("Event.KeeperState.SyncConnected " + event.getState()); System.out.println("event.getWrapper()" + event.getWrapper()); if(Event.KeeperState.SyncConnected == event.getState()){ // 如果连接成功 countDownLatch.countDown(); } /** * 因为一个zk中可能会监听多个类型的时间,所以为了判断是不是自己监听的时间,在这里做判断。 * 虽然此程序中之监听了servers子节点的变化这一个事件。 */ if(Event.EventType.NodeChildrenChanged == event.getType()){ try { List<String> children = zk.getChildren(serverName, true); System.out.println("服务器组改变:"+children); } catch (Exception e) { e.printStackTrace(); } } } }); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 客户端具体操作 * 客户端本身是不需要去zk中注册信息的,客户端的主要作用的监听服务器的上下线。所以只需要监听Znode的子节点变化情况即可。 */ private void clientOperation() throws KeeperException, InterruptedException { System.out.println("Client 启动..."); List<String> children = zk.getChildren(serverName, true); System.out.println("服务器组 : " + children); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { JavaDistributeClient client = new JavaDistributeClient(); client.init(); client.clientOperation(); }} 服务端123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051package io.github.dearas.Zookeeper.DistributeClient;import org.apache.zookeeper.*;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * 动态感知服务器上下线 服务端。 * * Created by tnp on 21/04/2017. */public class JavaDistributeServer { private String serverAddr = "192.168.1.99:2181"; private String serverName = "/servers"; private int sessionTimeout = 6000; private ZooKeeper zk; private CountDownLatch countDownLatch = new CountDownLatch(1); private void init() throws IOException { zk = new ZooKeeper(serverAddr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ countDownLatch.countDown(); // -1 } } }); } /** * 服务器上线 */ private void onLineServer(String hostname) throws KeeperException, InterruptedException { String s = zk.create(serverName + "/serverName", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(s + "服务器上线: " + hostname); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { System.out.println(Runtime.getRuntime().availableProcessors()); JavaDistributeServer server = new JavaDistributeServer(); server.init(); //server.onLineServer(args[0]); server.onLineServer("server"); }}