knight_ka | 生活及学习笔记

zk 基本介绍及用zookeeper实现分布式共享锁

用zookeeper实现分布式共享锁

什么是zookeeper以及它有什么功能

Zookeeper是一个分布式协调服务,就是为用户的分布式应用程序提供协调服务。

事物的出现肯定是有需求,zookeeper的需求是:当我们要向外部服务器采集数据时,我们是客户端,外部是服务端。4台客户端服务器需要实时采集来自4台服务端服务器的数据,数据量很大。

1.如果客户端一台服务器宕机了,那么继续采集的时候,服务器已经有很多数据了,无法跟上服务器步骤。新数据太多,还要补充下载老数据,就会一直下载不到最新进度。

这时候可以怎么做,keepalived是用在服务端的东西,保证服务器高可用。zookeeper和keepalived的区别就是zookeeper可以保存一些元数据,并且可以监听服务器的状态,当服务器宕机之后,更改该服务器在zookeeper中的状态,然后应用中访问的时候则不会访问这个状态的服务器。而keepalived只能做ip的自动切换,服务器的自动切换,保证服务高可用,它是自动切换服务器,zookeeper则使应用中根据服务器状态自主选择服务器。

1.如果配置客户端主从,则还需要自己去实现主从之间的心跳监听,记录该客户机下载到的位置,从机继续从这个位置开始下载。但是记录这个状态是记录在哪里呢?记录在第三方。

2.zookeeper就是这个第三方,当一台服务器挂掉之后,可以发出一个事件通知其他节点。

zookeeper做了什么?

1.保管数据
2.提供节点监听

zookeeper的数据特点:

* 永久数据PERSITEMT
* 永久序列数据PERSISTENT_SQUENCE
* 临时数据EPHEMERAL
* 临时序列数据EPHEMERAL_SQUENCE
* 序列数据:创建该名称的znode则会创建为递增的序列。set get /vc 只能操作序列最大的那个。
* 序列数据不能拥有子节点。Znode

zkCli.sh 客户端命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# zookeeper的数据结构:
zk的数据结构是目录型的数据结构。/a/b/c
可以通过./zkCli -server ip:port 来连接zookeeper服务
zkCli基本命令:
ls /
ls /a/b
create /a 123
create /a/b 123
create -s /a 123
create -e -s /a 123
create -e /a 123
create -p /a 123
get /a 123
get /a/b 123
set /a 123
set /a/b 123
delete /a/b
rmr /a
# ACL access control list访问控制列表
create -s /test data scheme:id:permission
scheme:
1.world
2.auth
3.digest
4.ip
5.super
id:
id
user:password
permission:
c
r
d
w
a

分布式共享锁

分布式共享锁,是指在集群环境中使用一个锁来控制不同机器上的线程的顺序执行

在单机模式下,线程之间的同步可以通过jdk的lock对象或者synchronized来控制顺序执行。但是在分布式环境中,这种方式是行不通的。所以这时候就用一个第三方来存错锁,让第三方告诉你现在有没有使用锁的权利。

基于zk的分布式共享锁,主要是使用zk的临时序列节点类型。这种节点类型,当客户端会话结束后自动删除,多次创建这个节点它会在节点后增加数字,并且这个数字会增长。比如客户端1创建/a/b节点,则生成的是/a/b000,客户端2再创建/a/b节点,则生成的是/a/b001

zk提供了watch方法,用于监控某个节点是否发生改变。它也可以监控一个父节点下的子节点是否改变过,如/a下的子节点是否有过改变。

根据zookeeper的这两个功能可以想到分布式共享锁的两种实现方式:

1. 分布式环境中每个线程在启动时向zk中注册一条信息,生成一个序列号。然后每个线程判断自己是不是zk中序列号最小的那个(最先注册),序列号最小的获取到线程锁,如果是最小的,那么执行自己的业务逻辑,执行完毕之后删除自己的注册序列。如果不是序列最小的,那么获取到自己的前一个序列,并对这个序列进行监控(也就是对自己的前一个线程进行监控,它执行完毕后自己就该拥有锁了)。**此时直接在监控回调中执行本线程的业务逻辑即可,执行完毕删除本序列,方便下个线程执行**

2.分布式共享锁还可以利用监控父节点的子节点变化情况。同样的每一个线程启动时先注册信息到zookeeper,生成一个序列。当所有的线程注册完毕之后,开始监听子节点的变化情况,当子节点有变化,则判断是不是本线程在所有的子线程中序列号最小。如果是最小,则获取到锁,开始执行自己的逻辑,执行完毕删除本线程的注册信息。此时又触发了子节点变化事件,周而复始,最终到zk中没有子节点了 则执行完毕。

java代码展示zk分布式共享锁的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package io.github.dearas.Zookeeper.DistributeClient;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* Created by tnp on 19/04/2017.
*/
public class DistributeClient2 {
// 定义ZK对象
private ZooKeeper zk;
// 定义 zk保存所有线程信息数据的父节点名称
private String parentNode = "/locks";
//定义 临时有序子节点 的名称 /parent/thread01 /parent/thread02
private String subName = "/thread";
// 要监听的节点名称
private String monitorNode = "";
// 本次注册的节点名称 线程执行完毕后要删除
private String thisNode;
private CountDownLatch latch = new CountDownLatch(1);
private void getConnection() throws IOException, InterruptedException, KeeperException {
// 创建zk对象 注册监听器
zk = new ZooKeeper("192.168.1.99:2181", 4000, new Watcher() {
public void process(WatchedEvent event) {
// 1.启动成功后 设置latch - 1
Event.KeeperState state = event.getState();
if(Event.KeeperState.SyncConnected == state){
latch.countDown(); //latch == 0 则线程停止等待
}
if(event.getType() == Event.EventType.NodeDeleted){
//dosomething();
//如果有节点被删除了 判断该节点是不是本线程正在等待的节点
System.out.println(parentNode + "/" + monitorNode);
dosomething();
// 每个线程设置一个自己在等待的路径信息
if((parentNode + "/" + monitorNode).equals(event.getPath()) ){
// 如果是本线程要监听的节点 则获得锁
//dosomething();
}
}
}
});
latch.await(); //线程等待
// 注册本线程到zk中
thisNode = zk.create(parentNode + subName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 线程等待2s 等待所有的线程注册完毕
Thread.sleep(10);
// 获取parentNode下的所有子节点 watch:false 不监听
List<String> children = zk.getChildren(parentNode, false);
if(children.size() == 1){
//如果children中只有一个元素 则说明只有本线程注册了信息 没人抢占锁
dosomething();
}else{
// 如果集合中的元素大于1
//判断本次注册的线程在子节点中的是不是最小的
//1.把/parent/child 截取成 child
String substring = thisNode.substring((parentNode + "/").length());
//2.判断在集合中的大小
Collections.sort(children);
int index = children.indexOf(substring);
if(index == 0){ //说明是最小的
// id最小 获取到共享锁
dosomething();
}else if(index == -1){
// 如不存在此节点 则不处理。 一般出现这种情况则说明截取出现异常了
}else{
// 如果不是最小的 并且此节点已经注册到了zk中
// 监听此节点的前一个节点
monitorNode = children.get(index -1);
zk.getData(parentNode + "/" + monitorNode,true,new Stat());
}
}
}
// 执行线程自己的业务逻辑 执行完毕后 删除本节点zk
private void dosomething() {
try{
System.out.println("获得锁!" + thisNode + " ! " );
}catch (Exception ex){
ex.printStackTrace();
}finally{
try {
//这里的线程等待很重要
//如果这里不等待可能会导致线程2 添加监听的时候,线程1已经被删除。
Thread.sleep(100);
zk.delete(thisNode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
/**
* 这里的10个线程使用的是不同的zk对象和watcher对象
* 因此当监视器触发的时候,每个线程触发的是自己的监视器对象。也就不可能造成线程1的监控事件触发,导致线程2的watcher对象的process对象被调用。
*
* 所以在process方法内可以不用判断被删除的节点路径。
* 如果是10个线程使用的同一个watcher对象,那么这10个线程的监听事件触发的时候调用的是同一个watcher对象的process方法。
* 此时则需要判断具体是哪个节点的监听事件。所以创建zk所使用的watcher对象那里要注意。
*/
for (int i = 0; i < 10; i++) {
new Thread(){
@Override
public void run() {
DistributeClient2 client = new DistributeClient2();
try {
client.getConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}.start();
}
// 因为java主线程运行完 JVM就关闭了 为了等待所有线程执行完毕 所以等待
Thread.sleep(Long.MAX_VALUE);
}
}