ZooKeeper学习(四)—使用JavaAPI操纵Zookeeper

Scroll Down

ZooKeeper学习(四)—使用JavaAPI操纵Zookeeper

zookeeper主要用于解决分布式一致性问题,zookeeper包含5个包:

  • org.apache.zookeeper
  • org.apache.zookeeper.data
  • org.apache.zookeeper.server
  • org.apache.zookeeper.server.quorum
  • org.apache.zookeeper.server.upgrade

客户端主要通过org.apache.zookeeper.Zookeeper用来连接服务器,这里我们创建个Maven项目,引入zookeeperAPI开始操作

引入zookeeperAPI

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>

本文代码所在地址:https://github.com/ecit13218/zookeeperAPI

创建会话(Session)

这里我们首先先看一下zookeeper的实例该如何创建

image-20200708114257998

Zookeeper自带了4个构造方法,这里拿构造参数最长的那个方法出来进行解释

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
        throws IOException
 {
     //.....
 }
参数名参数解释
connectStringzookeeper的服务器列表,如果是集群的话可以用,隔开,例如:127.0.0.1:2181,127.0.0.1:2182,如果在服务器名后面增加了目录,则是相当于namespace的作用,例如,127.0.0.1:2181/portal 后面所有的操作都是基于/portal这个目录下的操作
sessionTimeout会话超时时间,以毫秒为单位,zookeeper客户端与服务器通过心跳检测机制来维持会话的有效性
watcher传入一个实现了Watcher接口的类,可以用来作为事件通知的默认处理器,设置为null则是不需要处理事件
sessionId会话ID,与会话密钥可以确认唯一的一个会话,可以达到会话复用的效果,通过getSessonId()方法获取当前session
sessionPasswd会话密钥,同上,通过getSessionasswd()方法获得密钥
canBeReadOnly布尔类型的参数,是否是支持只读,如果这个机器与集群中过半的机器无法连接,那么这个机器将无法处理请求,设置为true可以使该服务器在此种场景下提供读服务

ZooKeeper 客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“CONNECTING”的状态。

当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。

所以这里我们引入CountDownLatch,只有在完成连接之后,我们才会继续使用zookeeper实例进行后续的操作

public class CreateSession implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static final CountDownLatch countLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException {
        ZooKeeper zooKeeper = new ZooKeeper(HOST_PORT,5000,new CreateSession());
        System.out.println(zooKeeper.getState());
        countLatch.await();
        System.out.println(zooKeeper.getState());        
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
        }
    }
}

执行结果:

image-20200708124408195

创建节点(Node)

zookeeper创建节点有两种方法,一个是以同步进行创建,一个是以异步进行创建

image-20200708124625897

public void create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode,  StringCallback cb, Object ctx)
参数名参数说明
path创建节点的路径,无法递归创建,创建子节点需要保证有父节点
data[]创建文件的初始内容,使用byte数据传递
acl节点的acl策略
createMode节点类型,是一个枚举:1.持久型(PERSISTENT)
2.持久顺序型(PERSISTENT_SEQUENTIAL)
3.临时型(EPHEMERAL)
4.临时顺序型(EPHEMERAL_SEQUENTIAL)
cb注册一个异步回调函数,需要实现StringCallback接口,当服务端节点创建完毕后,客户端会自动调用接口中的processResult方法
ctx传递一个对象,用于在回调方法执行时使用,通常放上下文信息
public class CreateNode implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static final CountDownLatch countLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(HOST_PORT,5000,new CreateNode());
        System.out.println(zooKeeper.getState());
        countLatch.await();
        System.out.println(zooKeeper.getState());
        String node_PERSISTENT = zooKeeper.create("/lg_persistent", "持久节点内容".getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                String node_PERSISTENT_SEQUENTIAL =
                        zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes("utf-8"),
                                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String node_EPERSISTENT = zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("创建的持久节点是:"+node_PERSISTENT);
        System.out.println("创建的持久顺序节点是:"+node_PERSISTENT_SEQUENTIAL);
        System.out.println("创建的临时节点是:"+node_EPERSISTENT);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
        }
    }
}

执行结果为:

image-20200708125457759

删除节点

zookeeper删除节点也有两个方法,也是同步和异步的删除方法

image-20200708142146795

public void delete(final String path, int version, VoidCallback cb,
        Object ctx)
参数名参数说明
path指定数据节点的路径,删除该节点
version数据版本,传入-1默认是删除最新版本
cb注册一个异步回调函数
ctx用于传递上下文对象
public class DeleteNode implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static final CountDownLatch countLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(HOST_PORT,5000,new DeleteNode());
        System.out.println(zooKeeper.getState());
        countLatch.await();
        System.out.println(zooKeeper.getState());
        /*
            zooKeeper.exists(path,watch) :判断节点是否存在
            zookeeper.delete(path,version) : 删除节点
        */
        zooKeeper.create("/lg_persistent/lg-children", "数据节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);
        Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
        System.out.println(exists == null ? "该节点不存在":"该节点存在");
        zooKeeper.delete("/lg_persistent/lg-children",-1);
        Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
        System.out.println(exists2 == null ? "该节点不存在":"该节点存在");
        zooKeeper.delete("/lg_persistent/lg-children",-1);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
        }
    }
}

只有节点存在时才可以删除该节点,否则删除节点会报错

执行结果:

image-20200708142956535

读取节点数据

读取数据,包括子节点列表的获取和节点数据的获取。ZooKeeper分别提供了不同的API来获取数据。

获取子节点列表数据

zookeeper提供了8个方法用于获取子节点列表

image-20200708143224400

image-20200708143314096

参数名参数说明
path指定节点的路径,获取该路径下的子节点列表
watcher注册一个Watcher,一旦子节点列表发生变化,就会触发事件通知客户端,传入null则是不需要通知
watch表明是否需要注册Watcher,如果传true就是默认的Watcher,false就是不需要注册Watcher
cb异步的回调函数
ctx上下文信息对象
stat传入一个旧的stat变量,该变量会被服务端响应的新stat对象替换
public class GetNodeChildrenList implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static final CountDownLatch countLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(HOST_PORT,5000,new GetNodeChildrenList());
        System.out.println(zooKeeper.getState());
        countLatch.await();
        System.out.println(zooKeeper.getState());
        /*
            zooKeeper.exists(path,watch) :判断节点是否存在
            zookeeper.delete(path,version) : 删除节点
        */
        zooKeeper.create("/lg_persistent/lg-children", "数据节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);
        TimeUnit.SECONDS.sleep(10);
        zooKeeper.create("/lg_persistent/lg-children2", "数据节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);
        TimeUnit.SECONDS.sleep(10);
        zooKeeper.delete("/lg_persistent/lg-children",-1);
        zooKeeper.delete("/lg_persistent/lg-children2",-1);
        TimeUnit.SECONDS.sleep(10);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
            try {
                getChildrens();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private static void getChildrens() throws KeeperException,
            InterruptedException {
        /*
        path:路径
        watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
        zooKeeper.getChildren(path, watch);
        */
        List<String> children = zooKeeper.getChildren("/lg_persistent", true);
        System.out.println("获取子节点:"+children);
    }
}

执行结果为:

image-20200708144958093

获取子节点数据

zookeeper对getData()提供了四个方法

image-20200708145221746

public byte[] getData(final String path, Watcher watcher, Stat stat);
public byte[] getData(String path, boolean watch, Stat stat);
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx);
public void getData(String path, boolean watch, DataCallback cb, Object ctx);
参数名参数说明
path指定数据的节点路径,即API调用的目的是获取该节点的数据内容
watcher注册的Watcher,一旦之后节点内容发生变更,就会向客户端发送通知,传入null就是不需要处理
watch是否需要注册watch,传入true使用默认Watcher,false不注册Watcher
cb异步回调函数
ctx上下文信息对象
public class GetNodeData implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static final CountDownLatch countLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(HOST_PORT,5000,new GetNodeData());
        System.out.println(zooKeeper.getState());
        countLatch.await();
        System.out.println(zooKeeper.getState());
        /**
         * path : 获取数据的路径
         * watch : 是否开启监听
         * stat : 节点状态信息
         * null: 表示获取最新版本的数据
         * zk.getData(path, watch, stat);
         */
        byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true,
                null);
        System.out.println("返回节点内容为:"+new String(data,"utf-8"));
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
        }
    }

}

执行结果为:

image-20200708145820027

更新节点数据

zookeeper对更新节点数据提供了两个方法

public Stat setData(final String path, byte data[], int version);
public void setData(final String path, byte data[], int version,StatCallback cb, Object ctx);

image-20200708145958728

参数名参数说明
path指定数据节点的路径,更新该节点的内容
data[]用新的字节数组更新原有数据内容
version更新指定版本,传入-1则是更新最新版本
cb注册一个异步回调函数
ctx传递上下文信息对象

更新节点时为什么需要指定一个版本呢?

这里其实类似于CAS的理论,更新一个数据前都会对比其原始值是否是期望值,只有其原始值符合期望值,才能将值原子化的更新

zookeeper中数据的版本参数也是通过CAS理论衍化而来,如果假如版本参数,只有在版本符合你传入值的期望值时,你才能更新,否则是更新失败的,主要用于一些分布式环境下的更新并发问题。

public class UpdateNodeData implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static final CountDownLatch countLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(HOST_PORT,5000,new UpdateNodeData());
        System.out.println(zooKeeper.getState());
        countLatch.await();
        System.out.println(zooKeeper.getState());
       /*
            path:路径
            data:要修改的内容 byte[]
            version:为-1,表示对最新版本的数据进⾏修改
            zooKeeper.setData(path, data,version);
        */
        byte[] data = zooKeeper.getData("/lg_persistent", false, null);
        System.out.println("修改前的值:"+new String(data));
        //修改 stat:状态信息对象 -1:最新版本
        Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内容".getBytes(), -1);
        byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
        System.out.println("修改后的值:"+new String(data2));
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
        }
    }

}

执行结果:

image-20200708150601783

zookeeper的权限控制

在zookeeper集群中,为了避免服务器上的数据被其他未授权进程干扰或人为干扰,所以zookeeper提供了ACL的权限控制机制,简单的讲,就是通过设置zookeeper服务器上数据节点的ACL,控制客户端对数据的访问权限,zookeeper提供了多种权限的控制模式(Scheme),分别是digest、world、auth、ip和super。。

权限模式(Scheme)

  • digest:digest类似于 username:password 的格式,便于区分不同应用来进行权限控制,我们使用"username:password"配置了权限标识后,zookeeper会对其进行SHA-1加密后在进行BASE64位编码

  • world:world是最开放的权限控制模式,数据节点访问权限对所有人开放,可以看做是特殊的digest模式,只有一个权限标识,就是"world:anyone"

  • IP:IP模式是通过IP地址的粒度来进行权限控制的,如"ip:192.168.0.1"或者"ip:192.168.0.1/24",可以支持针对IP地址或者针对IP网段进行权限控制

  • Super:super模式,也是类似一种特殊的Digest模式,在super模式下,用户可以对任意Zookeeper节点进行操作。

zookeeper使用权限控制需要在连接客户端之后,使用public void addAuthInfo(String scheme, byte auth[]) API,这里演示一下正确获取权限和错误获取权限的结果

public class GetNodeDataByPermersion implements Watcher {
    private static final String HOST_PORT = "127.0.0.1:21822";
    private static  CountDownLatch countLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(HOST_PORT,5000,new GetNodeDataByPermersion());
        countLatch.await();
        zooKeeper.addAuthInfo("digest","admin:admin".getBytes());
        String path = "/testauth4";
        //设置权限为ACL控制
        zooKeeper.create(path,"权限控制".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);
        countLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper2 = new ZooKeeper(HOST_PORT,5000,new GetNodeDataByPermersion());
        //设置权限
        zooKeeper2.addAuthInfo("digest","admin:admin".getBytes());
        byte[] data = zooKeeper2.getData(path, false, null);
        System.out.println("正确权限获取节点内容:" + new String(data,"utf-8"));
        ZooKeeper zooKeeper3 = new ZooKeeper(HOST_PORT,5000,new GetNodeDataByPermersion());
        countLatch = new CountDownLatch(1);
        zooKeeper.addAuthInfo("digest","admin:false".getBytes());
        byte[] data2 = zooKeeper3.getData(path, false, null);
        System.out.println("错误权限获取节点内容:" + new String(data,"utf-8"));
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入回调事件");
        //只有连接成功之后,才将上面的countLatch减一
        if (Event.KeeperState.SyncConnected.equals(event.getState())) {
            countLatch.countDown();
        }
    }
}

执行结果为:zookeeper2是已经成功执行了,zookeeper3提示没有权限,达到了权限控制的要求

image-20200708153728845