5-教你如何用Zookeeper开发锁与生产者消费者?
5.1 源码下载
首先我们访问github官方仓库来拉取最新代码 https://github.com/apache/zookeeper
克隆代码:
git clone https://github.com/apache/zookeeper.git
或者git clone git@github.com:apache/zookeeper.git
查看版本列表
git tag
切换到3.6.2版本:
git checkout release-3.6.2
然后使用idea或者eclipse导入项目,配置好maven即可
5.2 源码目录说明
先来给大家截个图:
然后针对各个目录这里我来解释一下,先了解下 为下一章了解源码做个准备:
文件 | 说明 |
---|---|
bin | 包含访问zookeeper服务器和命令行客户端的脚本 |
conf | 启动zookeeper默认的配置文件目录 |
zookeeper-assembly | 基础服务打包目录 |
zookeeper-client | 客户端,目前只支持c |
zookeeper-compatibility-tests | 兼容性测试目录 |
zookeeper-contrib | 附加的功能,比如zookeeper可视化客户端工具 |
zookeeper-doc | zookeeper文档 |
zookeeper-it | 供fatjar使用,进行系统测试依赖的类 |
zookeeper-jute | zookeeper序列化组件 |
zookeeper-metrics-providers | 监控相关,目前支持普罗米修斯 prometheus |
zookeeper-recipes | zookeeper提供的一些功能例子,包括选举election,lock和queue |
zookeeper-server | zookeeper服务端 |
5.3 使用Java和Zookeeper开发一个生产者消费者队列例子
所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架满的时候,消费者可以从货架上拿走商品,生产者此时等待货架的空位,这样不断的循环。那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者模型。
总结一下:生产者消费者能够解决的问题如下:
- 生产与消费的速度不匹配
- 软件开发过程中解耦
如果要使用Zookeeper来实现生产者消费者,那么 Zookeeper需要有数据模型来充当中间容器,在这里我们可以使用顺序节点来实现有序的队列,生产数据的话就创建节点,消费数据的话就读取节点数据然后删除节点。
使用Zookeeper来实现生产者消费者模型,需要使用zookeeper实现有序的队列,有序的队列我们就可以用Zookeeper的顺序节点,假如我们创建的是无界队列,那么生产者生产消息放入队列即可,消费者消费消息的时候需要先判断队列是否为空如果为空则等待,通过监听机制来监听队列节点的变化,如果有消息进入创建了节点则开始消费消息。
首先来说生产者生产消息的过程,
生产消息的时候创建临时顺序节点来代表队列的节点,将数据放到节点下面。再看消费者消费消息的过程,
消费者消费消息先判断临时顺序节点是否存在:- 如果临时节点不存在则说明没有消息可以消费则wait等待,
- 当有新节点的加入的时候则节点监听器可以监听到节点的变化唤醒wait等待的消费。
- 如果临时节点存在,则消息不为空开始消费消息,消费消息的过程先读取节点序号最小的节点的数据,然后删除读取到数据的节点,删除成功则说明成功读取消息,如果读取失败则重新进入读取消息过程。
5.4 分布式锁
完全分布式锁是全局同步的,这意味着在任何时间点上,没有两个客户机认为它们持有相同的锁。这些可以用Zookeeper来实现。与优先队列一样,首先定义一个锁节点。
在ZooKeeper源码目录中在项目目录zookeeper-recipes/zookeeper-recipes-lock中存在锁实现的例子。
接下来我们看下锁的实现思路:
调用create(),路径名“locknode/guide-lock-”,并设置序列和临时标志来创建节点。
在不设置监视标志的情况下在锁节点上调用getChildren()(这对于避免群集效应非常重要,防止一个节点释放锁之后所有触发所有节点工作)。
如果在步骤1中创建的路径名具有最低的序列号后缀,则客户端拥有锁,并且客户端退出协议。
客户端调用exists(),在锁定目录的路径上设置了下一个最低序列号的监视标志。
如果exists()返回null,请转步骤2。否则,在转到步骤2之前,等待来自上一个步骤的路径名通知。
释放锁:客户端希望释放锁,只需删除在步骤1中创建的节点即可。
避免了群体效应:
删除一个节点只会导致一个客户端被唤醒,因为每个节点都下一个客户端监视。这样就避免了群体效应。
5.5 共享锁:
获取读锁:
调用 create() 创建路径名为”guid-/read-“的节点。这是稍后在协议中使用的锁节点。确保同时设置了序列标志和临时标志。
在不设置监视标志的情况下在锁节点上调用getChildren()——这很重要,因为它可以避免羊群效应。
如果没有子节点的路径名以 “write-” 开头,且序号小于步骤1中创建的节点,则客户机拥有锁,可以退出协议。
否则,**调用exists()**,它带有监视标志,设置在锁目录中的节点上,路径名以“write-”开头,序号次之。
If exists()返回false,转到步骤2。
否则,在转到步骤2之前,等待来自上一个步骤的路径名通知
获取写锁:
**调用create()**创建路径名为”guid-/write-“的节点。这是协议后面提到的锁节点。确保同时设置了序列标志和临时标志。
在不设置监视标志的情况下在锁节点上调用getChildren()——这很重要,因为它可以避免羊群效应。
如果没有子节点的序列号低于步骤1中创建的节点,则客户端拥有锁,并退出协议。
在具有下一个最低序列号的路径名的节点上调用exists(),并设置监视标志。
If exists()返回false,转到步骤2。否则,在转到步骤2之前,等待来自上一个步骤的路径名通知。
注:
这种方法可能会产生一种群体效应:当有一大群客户端在等待一个读锁,并且当具有最低序列号的 “写”节点被删除时,所有客户端或多或少地同时得到通知。事实上。这是有效的行为:因为所有等待的读取器客户机都应该被释放,因为它们拥有锁。群体效应指的是释放一个“群体”,而实际上只有一个或少量的机器可以进行。
5.6 使用锁来进行主节点选举
选主节点:
选主节点是针对我们业务来说,选主的时候可以通过分布式锁让多个节点同时获取锁,优先获取锁的节点执行选主逻辑,写入主节点标示,然后释放锁,让后面的节点开始执行,当前节点拿到锁后可以判断主节点是否存在,不存在则执行选主逻辑,如果存在主节点则跳过。
5.7 Java客户端使用Java和Zookeeper开发一个屏障例子
barrier是一种原语,允许一组线程/进程在到达某个栅栏点(common barrier point)互相等待,直到最后一个线程/进程到达栅栏点,栅栏才会打开,处于阻塞状态的线程恢复继续执行。
举个例子来说:**比如我们在打王者荣耀游戏的的时候,十个人必须全部加载到100%**,才可以开局。如果想要了解Java线程级的屏障实现可以参考CyclicBarrier类型
在这里我们主要介绍基于Zookeeper进程级屏障的实现,这里屏障实现使一组进程能够同步计算的开始和结束。这个实现的基本思想是有一个barrier节点,用于作为各个流程节点的父节点。假设我们称屏障节点为“/b1”。然后,每个进程“p”创建一个节点“/b1/p”。通过监听机制来监测其他节点的写入,一旦足够多的进程创建了它们对应的节点,连接的进程就可以开始计算了。
接下来我们可以看下代码实现
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null;
static Integer mutex;
String root;
SyncPrimitive(String address) {
if(zk == null){
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
//else mutex = new Integer(-1);
}
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
//System.out.println("Process: " + event.getType());
mutex.notify();
}
}
/**
* Barrier
*/
static public class Barrier extends SyncPrimitive {
int size;
String name;
/**
* Barrier constructor
*
* @param address
* @param root
* @param size
*/
Barrier(String address, String root, int size) {
super(address);
this.root = root;
this.size = size;
// Create barrier node
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
// My node name
try {
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}
}
/**
* Join barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean enter() throws KeeperException, InterruptedException{
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() < size) {
mutex.wait();
} else {
return true;
}
}
}
}
/**
* Wait until all reach barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean leave() throws KeeperException, InterruptedException{
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}
/**
* Producer-Consumer queue
*/
static public class Queue extends SyncPrimitive {
/**
* Constructor of producer-consumer queue
*
* @param address
* @param name
*/
Queue(String address, String name) {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* Add element to the queue.
*
* @param i
* @return
*/
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
/**
* Remove first element from the queue.
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
String minNode = list.get(0);
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
//System.out.println("Temporary value: " + tempValue);
if(tempValue < min) {
min = tempValue;
minNode = s;
}
}
System.out.println("Temporary value: " + root + "/" + minNode);
byte[] b = zk.getData(root + "/" + minNode,
false, stat);
zk.delete(root + "/" + minNode, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
}
public static void main(String args[]) {
if (args[0].equals("qTest"))
queueTest(args);
else
barrierTest(args);
}
public static void queueTest(String args[]) {
Queue q = new Queue(args[1], "/app1");
System.out.println("Input: " + args[1]);
int i;
Integer max = new Integer(args[2]);
if (args[3].equals("p")) {
System.out.println("Producer");
for (i = 0; i < max; i++)
try{
q.produce(10 + i);
} catch (KeeperException e){
} catch (InterruptedException e){
}
} else {
System.out.println("Consumer");
for (i = 0; i < max; i++) {
try{
int r = q.consume();
System.out.println("Item: " + r);
} catch (KeeperException e){
i--;
} catch (InterruptedException e){
}
}
}
}
public static void barrierTest(String args[]) {
Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
try{
boolean flag = b.enter();
System.out.println("Entered barrier: " + args[2]);
if(!flag) System.out.println("Error when entering the barrier");
} catch (KeeperException e){
} catch (InterruptedException e){
}
// Generate random integer
Random rand = new Random();
int r = rand.nextInt(100);
// Loop for rand iterations
for (int i = 0; i < r; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
try{
b.leave();
} catch (KeeperException e){
} catch (InterruptedException e){
}
System.out.println("Left barrier");
}
}
技术咨询与支持,可以扫描微信公众号进行回复咨询
Zookeeper16, Zookeeper安装配置14, Zookeeper源码解析16