zookeeper学习之Curator客户端

博客分类:

Curator框架是最好用,最流行的zookeeper的客户端。

它有以下三个优点

1.提供了一套非常友好的操作API;

2. 提供一些高级特性(包括但不仅限于前篇文章中提到的)的封装

3.易测试

 

maven依赖如下

 

Xml代码
  1. <dependency>
  2.     <groupId>org.apache.curator</groupId>
  3.     <artifactId>curator-recipes</artifactId>
  4.     <version>2.5.0</version>
  5. </dependency>

 

 

按照官方给出的文档和包结构,可以轻松的看出Curator功能分两大类,一是对zookeeper的一些基本命令的封装,比如增删改查。是他的framework模块,一个是他的高级特性,即recipes模块。

 

一、framework模块

Curator提供了一套Fluent风格的操作API。这在很多脚本类语言里比较流行。

比如他创建client的代码是这样

Java代码
  1. CuratorFramework client = builder.connectString("192.168.11.56:2180")
  2.         .sessionTimeoutMs(30000)
  3.         .connectionTimeoutMs(30000)
  4.         .canBeReadOnly(false)
  5.         .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
  6.         .namespace(namespace)
  7.         .defaultData(null)
  8.         .build();
  9. client.start();

一路点到底,这就是所谓的Fluent风格。

 

我们再看增删改查的

Java代码  收藏代码
  1. public class CrudExamples {
  2.     private static CuratorFramework client = ClientFactory.newClient();
  3.     private static final String PATH = "/crud";
  4.     public static void main(String[] args) {
  5.         try {
  6.             client.start();
  7.             client.create().forPath(PATH, "I love messi".getBytes());
  8.             byte[] bs = client.getData().forPath(PATH);
  9.             System.out.println("新建的节点,data为:" + new String(bs));
  10.             client.setData().forPath(PATH, "I love football".getBytes());
  11.             // 由于是在background模式下获取的data,此时的bs可能为null
  12.             byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);
  13.             System.out.println("修改后的data为" + new String(bs2 != null ? bs2 : new byte[0]));
  14.             client.delete().forPath(PATH);
  15.             Stat stat = client.checkExists().forPath(PATH);
  16.             // Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在!
  17.             System.out.println(stat);
  18.         } catch (Exception e) {
  19.             e.printStackTrace();
  20.         } finally {
  21.             CloseableUtils.closeQuietly(client);
  22.         }
  23.     }
  24. }

常用接口有

create()增

delete(): 删

checkExists(): 判断是否存在

setData():  改

getData(): 查

所有这些方法都以forpath()结尾,辅以watch(监听),withMode(指定模式),和inBackground(后台运行)等方法来使用。

 

此外,Curator还支持事务,一组crud操作同生同灭。代码如下

Java代码
  1. /**
  2.  * 事务操作
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class TransactionExamples {
  7.     private static CuratorFramework client = ClientFactory.newClient();
  8.     public static void main(String[] args) {
  9.         try {
  10.             client.start();
  11.             // 开启事务
  12.             CuratorTransaction transaction = client.inTransaction();
  13.             Collection<CuratorTransactionResult> results = transaction.create()
  14.                     .forPath("/a/path""some data".getBytes()).and().setData()
  15.                     .forPath("/another/path""other data".getBytes()).and().delete().forPath("/yet/another/path")
  16.                     .and().commit();
  17.             for (CuratorTransactionResult result : results) {
  18.                 System.out.println(result.getForPath() + " - " + result.getType());
  19.             }
  20.         } catch (Exception e) {
  21.             e.printStackTrace();
  22.         } finally {
  23.             // 释放客户端连接
  24.             CloseableUtils.closeQuietly(client);
  25.         }
  26.     }
  27. }

这段的代码的运行结果,由于最后一步delete的节点不存在,所以整个事务commit失败。失败的原因会放在Collection<CuratorTransactionResult>中,非常友好。

 

好了framework部分的内容就这么多,是不是特别简单呢。下面就来看看recipes包的内容吧。。

 

Recipes部分提供的功能官网列的很详细,点击这里。注意文章第一段:Curator宣称,Recipes模块实现了除二阶段提交之外的所有zookeeper特性。

 

二、Recipes模块

 

主要有

Elections(选举),Locks(锁),Barriers(关卡),Atomic(原子量),Caches,Queues等

 

1、 Elections

选举主要依赖于LeaderSelector和LeaderLatch2个类。前者是所有存活的客户端不间断的轮流做Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?

 

这两者在实现上是可以切换的,直接上代码,怎么切换注释里有。由于篇幅所限,这里仅贴出基于LeaderSelector的选举,更多代码见附件

Java代码
  1. /**
  2.  * 本类基于leaderSelector实现,所有存活的client会公平的轮流做leader
  3.  * 如果不想频繁的变化Leader,需要在takeLeadership方法里阻塞leader的变更! 或者使用 {@link}
  4.  * LeaderLatchClient
  5.  */
  6. public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
  7.     private final String name;
  8.     private final LeaderSelector leaderSelector;
  9.     private final String PATH = "/leaderselector";
  10.     public LeaderSelectorClient(CuratorFramework client, String name) {
  11.         this.name = name;
  12.         leaderSelector = new LeaderSelector(client, PATH, this);
  13.         leaderSelector.autoRequeue();
  14.     }
  15.     public void start() throws IOException {
  16.         leaderSelector.start();
  17.     }
  18.     @Override
  19.     public void close() throws IOException {
  20.         leaderSelector.close();
  21.     }
  22.     /**
  23.      * client成为leader后,会调用此方法
  24.      */
  25.     @Override
  26.     public void takeLeadership(CuratorFramework client) throws Exception {
  27.         int waitSeconds = (int) (5 * Math.random()) + 1;
  28.         System.out.println(name + "是当前的leader");
  29.         try {
  30.             Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
  31.         } catch (InterruptedException e) {
  32.             Thread.currentThread().interrupt();
  33.         } finally {
  34.             System.out.println(name + " 让出领导权\n");
  35.         }
  36.     }

 

Java代码  收藏代码
  1. /**
  2.  * leader选举
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class LeaderSelectorExample {
  7.     public static void main(String[] args) {
  8.         List<CuratorFramework> clients = Lists.newArrayList();
  9.         List<LeaderSelectorClient> examples = Lists.newArrayList();
  10.         try {
  11.             for (int i = 0; i < 10; i++) {
  12.                 CuratorFramework client = ClientFactory.newClient();
  13.                 LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);
  14.                 clients.add(client);
  15.                 examples.add(example);
  16.                 client.start();
  17.                 example.start();
  18.             }
  19.             System.out.println("----------先观察一会选举的结果-----------");
  20.             Thread.sleep(10000);
  21.             System.out.println("----------关闭前5个客户端,再观察选举的结果-----------");
  22.             for (int i = 0; i < 5; i++) {
  23.                 clients.get(i).close();
  24.             }
  25.             // 这里有个小技巧,让main程序一直监听控制台输入,异步的代码就可以一直在执行。不同于while(ture)的是,按回车或esc可退出
  26.             new BufferedReader(new InputStreamReader(System.in)).readLine();
  27.         } catch (Exception e) {
  28.             e.printStackTrace();
  29.         } finally {
  30.             for (LeaderSelectorClient exampleClient : examples) {
  31.                 CloseableUtils.closeQuietly(exampleClient);
  32.             }
  33.             for (CuratorFramework client : clients) {
  34.                 CloseableUtils.closeQuietly(client);
  35.             }
  36.         }
  37.     }
  38. }

 

2、locks

curator lock相关的实现在recipes.locks包里。顶级接口都是InterProcessLock。我们直接看最有代表性的 InterProcessReadWriteLock 进程内部读写锁(可重入读写锁)。什么叫可重入,什么叫读写锁。不清楚的先查好资料吧。总之读写锁一定是成对出现的。    简易传送门

 

我们先定义两个任务,可并行的执行的,和互斥执行的。

Java代码
  1. /**
  2.  * 并行任务
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class ParallelJob implements Runnable {
  7.     private final String name;
  8.     private final InterProcessLock lock;
  9.     // 锁等待时间
  10.     private final int wait_time = 5;
  11.     ParallelJob(String name, InterProcessLock lock) {
  12.         this.name = name;
  13.         this.lock = lock;
  14.     }
  15.     @Override
  16.     public void run() {
  17.         try {
  18.             doWork();
  19.         } catch (Exception e) {
  20.             // ingore;
  21.         }
  22.     }
  23.     public void doWork() throws Exception {
  24.         try {
  25.             if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
  26.                 System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。");
  27.             }
  28.             // 模拟job执行时间0-4000毫秒
  29.             int exeTime = new Random().nextInt(4000);
  30.             System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------");
  31.             Thread.sleep(exeTime);
  32.         } catch (Exception e) {
  33.             e.printStackTrace();
  34.         } finally {
  35.             lock.release();
  36.         }
  37.     }
  38. }

 

Java代码
  1. /**
  2.  * 互斥任务
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class MutexJob implements Runnable {
  7.     private final String name;
  8.     private final InterProcessLock lock;
  9.     // 锁等待时间
  10.     private final int wait_time = 10;
  11.     MutexJob(String name, InterProcessLock lock) {
  12.         this.name = name;
  13.         this.lock = lock;
  14.     }
  15.     @Override
  16.     public void run() {
  17.         try {
  18.             doWork();
  19.         } catch (Exception e) {
  20.             // ingore;
  21.         }
  22.     }
  23.     public void doWork() throws Exception {
  24.         try {
  25.             if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
  26.                 System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。");
  27.             }
  28.             // 模拟job执行时间0-2000毫秒
  29.             int exeTime = new Random().nextInt(2000);
  30.             System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------");
  31.             Thread.sleep(exeTime);
  32.         } catch (Exception e) {
  33.             e.printStackTrace();
  34.         } finally {
  35.             lock.release();
  36.         }
  37.     }
  38. }

 

锁测试代码

 

Java代码
  1. /**
  2.  * 分布式锁实例
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class DistributedLockExample {
  7.     private static CuratorFramework client = ClientFactory.newClient();
  8.     private static final String PATH = "/locks";
  9.     // 进程内部(可重入)读写锁
  10.     private static final InterProcessReadWriteLock lock;
  11.     // 读锁
  12.     private static final InterProcessLock readLock;
  13.     // 写锁
  14.     private static final InterProcessLock writeLock;
  15.     static {
  16.         client.start();
  17.         lock = new InterProcessReadWriteLock(client, PATH);
  18.         readLock = lock.readLock();
  19.         writeLock = lock.writeLock();
  20.     }
  21.     public static void main(String[] args) {
  22.         try {
  23.             List<Thread> jobs = Lists.newArrayList();
  24.             for (int i = 0; i < 10; i++) {
  25.                 Thread t = new Thread(new ParallelJob("Parallel任务" + i, readLock));
  26.                 jobs.add(t);
  27.             }
  28.             for (int i = 0; i < 10; i++) {
  29.                 Thread t = new Thread(new MutexJob("Mutex任务" + i, writeLock));
  30.                 jobs.add(t);
  31.             }
  32.             for (Thread t : jobs) {
  33.                 t.start();
  34.             }
  35.         } catch (Exception e) {
  36.             e.printStackTrace();
  37.         } finally {
  38.             CloseableUtils.closeQuietly(client);
  39.         }
  40.     }
  41. }

 

看到没,用法和java concurrent包里的ReentrantReadWriteLock 是一模一样的。

事实上,整个recipes包的目录结构、实现原理同java concurrent包的设置是很一致的。比如有queue,Semaphore,Barrier等类,。他整个就是模仿jdk的实现,只不过是基于分布式的!

 

后边的几项,Barriers(关卡),Atomic(原子量),Caches,Queues和java concurrent包里的类的用法是一样的,就不继续贴了,有些附件里有。

要说明的是:有的功能性能不是特别理想,网上也没见有大的项目的使用案例。比如基于CAS机制的atomic,在某些情况重试的效率还不如硬同步,要是zookeeper节点再一多,各个节点之间通过event触发的数据同步极其频繁。那性能可以想象。

 

三、测试方法

curator提供了很好的测试工具,你甚至是可以在完全没有搭建zookeeper server端的情况下,完成测试。

有2个重要的类

TestingServer 模拟单点, TestingCluster模拟集群。

需要使用的话,得依赖

Xml代码
  1. <dependency>
  2.     <groupId>org.apache.curator</groupId>
  3.     <artifactId>curator-test</artifactId>
  4.     <version>2.5.0</version>
  5. </dependency>

来源:http://supben.iteye.com/blog/2094077

参考文章:

 

发表评论