ZooKeeper学习(五)—使用Curator客户端操纵Zookeeper

Scroll Down

ZooKeeper学习(五)—使用Curator客户端操纵Zookeeper

Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator 解决了很多 ZooKeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,目前已经成为了Apache的顶级项目,是全世界范围内使用最广泛的ZooKeeper客户端之一。

除了封装一些开发人员不需要特别关注的底层细节之外,Curator 还在 ZooKeeper 原生API的基础上进行了包装,提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

Curator中还提供了ZooKeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计数器等)的抽象封装。

引入Maven依赖

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.4.2</version>
        </dependency>

创建会话

创建会话使用CuratorFrameworkFactory的两个newClient方法,

image-20200708180927095

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
{
        return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS,DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
}
参数名参数说明
connectString同前面zookeeper的参数一样,这里传zookeeper的服务地址和端口,集群服务就用,隔开
sessionTimeoutMs会话超时时间,单位是毫秒,默认是60000ms,可以通过启动参数curator-default-session-timeout 去进行修改
connectionTimeoutMs连接创建超时时间,默认是15000ms,可以通过启动参数curator-default-connection-timeout进行修改
retryPolicy重试策略,默认有四种实现:
ExponentialBackoffRetry
RetryNTimes
RetryOneTime
RetryUntilElapsed

在重试策略上,Curator通过一个接口RetryPolicy来让用户实现自定义的重试策略。在RetryPolicy接口中只定义了一个方法

public boolean      allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
参数名参数说明
retryCount已经重试的次数,第一次重试该参数为0
elapsedTimeMs从第一次重试开始已经花费的时间,单位为毫秒
sleeper用于sleep指定时间
public static void main(String[] args) {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
    }

ExponentialBackoffRetry的构造方法如下:

public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs);
参数名参数说明
baseSleepTimeMs初始化sleep时间
maxRetries最大重试次数
maxSleepMs最大睡眠时间

给定一个初始sleep时间baseSleepTimeMs,在这个基础上结合重试次数,通过以下公式计算出当前需要sleep的时间:
当前sleep时间= baseSleepTimeMs*Math.max(1,random.nextInt(1<<(retryCount+1)))

随着重试次数的增加,计算出的sleep时间会越来越大。如果该sleep时间在maxSleepMs 的范围之内,那么就使用该 sleep 时间,否则使用 maxSleepMs。另外,maxRetries参数控制了最大重试次数,以避免无限制的重试。

创建节点

Curator 中提供了一系列 Fluent 风格的接口,开发人员可以通过对其进行自由组合来完成各种类型节点的创建。

创建节点API:

public T withMode(CreateMode mode);
public T forPath(String path, byte[] data);
public T forPath(String path);
public class CreateNode {
    private static final String HOST_PORT = "127.0.0.1:21822";
    
    public static void main(String[] args) throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
        //创建空节点
        String result = client.create().forPath("/test-curator");
        System.out.println("创建节点:"+result);
        //创建有值的节点
        String initResult = client.create().forPath("/test-curator2", "init".getBytes());
        System.out.println("创建初始值节点:"+initResult);
        //创建节点并设置节点类型
        client.create().withMode(CreateMode.PERSISTENT).forPath("/test-curator3", "持久性节点".getBytes());
        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-curator4", "持久性顺序节点".getBytes());
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-curator5", "临时节点".getBytes());
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test-curator6", "临时性顺序节点".getBytes());
        //递归创建节点
        String child = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test" +
                "-curator7" +
                "/test" +
                "-curator7", "创建子节点".getBytes());
        System.out.println("创建子节点:"+child);
    }
}

结果为:

image-20200708184654187

在使用 ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中一个可能的原因就是试图对一个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断一下该父节点是否存在在使用 Curator 之后,通过调用creatingParentsIfNeeded 接口,Curator 就能够自动地递归创建所有需要的父节点。

删除节点

删除节点主要是通过下面这四个方法来执行的:

public class DeleteNode {
    private static final String HOST_PORT = "127.0.0.1:21822";
    
    public static void main(String[] args) throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
        //删除叶子节点
        client.delete().forPath("/test-curator");
        //递归删除含有子节点的节点
        client.delete().deletingChildrenIfNeeded().forPath("/test-curator7");
        //指定版本进行删除,-1是删除最新版本
        client.delete().withVersion(-1).forPath("/test-curator2");
        //guaranteed是只要未删除成功,会持续的重试,避免由于网络波动等原因导致删除命令执行却删除失败的场景
        client.delete().guaranteed().forPath("/test-curator3");
    }
}

读取节点

读取节点一个是直接读取数据,一个是将数据存储到Stat的数据结构中

// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
public class GetNodeData {
    private static final String HOST_PORT = "127.0.0.1:21822";
    
    public static void main(String[] args) throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
        //获取节点数据
        byte[] bytes = client.getData().forPath("/test-curator3");
        System.out.println("获取内容为:" + new String(bytes));
        //获取节点数据,并获取节点的详细内容,存储至Stat数据结构中
        Stat stat = new Stat();
        byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/test-curator3");
        System.out.println("获取内容为:" + new String(bytes1));
        System.out.println("数据结构stat内容为:"+stat);
    }
}

image-20200708185654622

更新数据

更新也是提供了两个方法,一个是直接更新节点数据,一个是按照版本来更新节点数据,一般是使用getData()获取到Stat对象后,获取stat的数据结构在进行更新。

// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);
public class UpdateNodeData {
    private static final String HOST_PORT = "127.0.0.1:21822";
    
    public static void main(String[] args) throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
        //获取节点数据
        byte[] bytes = client.getData().forPath("/test-curator3");
        System.out.println("获取内容为:" + new String(bytes));
        //获取节点数据,并获取节点的详细内容,存储至Stat数据结构中
        Stat stat = new Stat();
        byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/test-curator3");
        System.out.println("获取内容为:" + new String(bytes1));
        System.out.println("数据结构stat内容为:"+stat);
        //更新节点数据
        Stat stat1 = client.setData().forPath("/test-curator3", "更新后数据".getBytes());
        System.out.println("更新后数据结构stat内容为"+stat1);
        byte[] bytes2 = client.getData().forPath("/test-curator3");
        System.out.println("获取更新后内容为:" + new String(bytes2));
        //使用Stat中获取到的版本号,按照版本来更新,版本号不符合会报错
        Stat stat2 = client.setData().withVersion(stat1.getVersion()).forPath("/test-curator3", "版本号更新后数据".getBytes());
        bytes2 = client.getData().forPath("/test-curator3");
        System.out.println("版本号更新后内容为:" + new String(bytes2));
        Stat stat3 = client.setData().withVersion(stat.getVersion()).forPath("/test-curator3", "错误版本更新数据".getBytes());
    }
}

执行结果:

image-20200708190129251

第一次使用最新的stat变量进行更新操作,更新成功;第二次使用了过期的stat变量进行更新操作,抛出异常:KeeperErrorCode=BadVersion。

监听子节点变化

Curator 引入了 Cache 来实现对ZooKeeper服务端事件的监听,Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程ZooKeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听。

Cache分为两类监听类型:节点监听(NodeCache)和子节点监听()。

NodeCache

自带了两个构造方法,分别是

public NodeCache(CuratorFramework client, String path);
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
参数名参数说明
client客户端实例
path监听的节点路径
dataIsCompressed数据是否压缩

NodeCache定义了事件的处理类NodeCacheListener,

public interface NodeCacheListener
{
    /**
     * Called when a change has occurred
     */
    public void     nodeChanged() throws Exception;
}

下面演示一下如何使用NodeCache监听节点数据变化

public class WatcherNodeData {
    private static final String HOST_PORT = "127.0.0.1:21822";

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
        String path = "/test-curator5";
        NodeCache nodeCache = new NodeCache(client,path);
        //该方法有个boolean类型的参数,默认是false,
        //如果设置为true,那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的数据内容,并保存在Cache中。
        nodeCache.start(true);
        nodeCache.getListenable().addListener(() -> {
            System.out.print("数据节点:"+path+"已更新,当前值为:");
            System.out.println(new String(nodeCache.getCurrentData().getData()));
        });
        System.out.println("开始创建节点");
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, "临时节点".getBytes());
        client.setData().forPath(path, "更新后节点".getBytes());
        TimeUnit.SECONDS.sleep(100);
    }
}

执行结果为:image-20200712233545896

PathChildrenCache

PathChildrenCache用于监听指定ZooKeeper数据节点的子节点变化情况。

	public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode);
    public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory);
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory);
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService);
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService);

参数名参数说明
clientcurator客户端
path节点路径
dataIsCompressed数据是否压缩
cacheData是否将节点数据缓存起来,一般设置为true,节点数据变更时可以同步获得节点的数据内容,设置为false的话则只能监听节点变化,无法获取节点内容
threadFactory线程池工厂类,用于构造一个线程池,来处理事件通知
executorService用于构造一个线程池,来处理事件通知
public class WatcherChildRenNodeData {
    private static final String HOST_PORT = "127.0.0.1:21822";

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(HOST_PORT, 5000, 3000, retryPolicy);
        //启动客户端
        client.start();
        String path = "/test-curator5";
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        pathChildrenCache.getListenable().addListener((client1, event) -> {
            System.out.println("监听到子节点数据改变了!");
            switch (event.getType()){
                case CHILD_ADDED:
                    System.out.println("节点新增,内容为:"+new String(event.getData().getData()));break;
                case CHILD_UPDATED:
                    System.out.println("节点修改,内容为:"+new String(event.getData().getData()));break;
                case CHILD_REMOVED:
                    System.out.println("节点已移除,path为:"+ event.getData().getPath());break;

            }
        });
        TimeUnit.SECONDS.sleep(5);
        String path1 = path + "/test01";
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1,
                "临时节点".getBytes());
        client.setData().forPath(path1, "更新后节点".getBytes());
        client.delete().forPath(path1);
        TimeUnit.SECONDS.sleep(100);
    }
}

执行结果为:

image-20200713001224559