Netflix Curator使用

Netflix curator 是Netflix公司开源的一个Zookeeper client library,用于简化zookeeper客户端编程,包含一下几个模块:

  • curator-client - zookeeper client封装,用于取代原生的zookeeper客户端,提供一些非常有用的客户端特性
  • curator-framework - zookeeper api的高层封装,大大简化zookeeper客户端编程,添加了例如zookeeper连接管理、重试机制等
  • curator-recipes - zookeeper recipes 基于curator-framework的实现(除2PC以外)

maven dependency:

  1. <dependency>
  2.     <groupId>com.netflix.curator</groupId>
  3.     <artifactId>curator-recipes</artifactId>
  4.     <version>0.6.4</version>
  5. </dependency>

注意:在www.mvnrepository.com中认为0.32为最新版本,其实迄今为止最新版本为0.64,github trunk中的版本现在是0.65-SNAPSHOT

curator framework 使用

  1.               String path = "/test_path";
  2. CuratorFramework client = CuratorFrameworkFactory.builder()
  3.         .connectString("test:2181").namespace("/test1")
  4.         .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
  5.         .connectionTimeoutMs(5000).build();
  6. //create a node
  7. client.create().forPath("/head"new byte[0]);
  8. //delete a node in background
  9. client.delete().inBackground().forPath("/head");
  10. // create a EPHEMERAL_SEQUENTIAL
  11. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child"new byte[0]);
  12. // get the data 
  13. client.getData().watched().inBackground().forPath("/test");
  14. // check the path exits
  15. client.checkExists().forPath(path);

curator framework使用builder模式和类似nio的chain api,代码非常简洁

curator recipes 使用

InterProcessMutex

用途:进程间互斥锁

示例代码:

  1. String lockName = "/lock1";
  2. InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);
  3. InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);
  4. lock1.acquire();
  5. boolean result = lock2.acquire(1, TimeUnit.SECONDS);
  6. assertFalse(result);
  7. lock1.release();
  8. result = lock2.acquire(1, TimeUnit.SECONDS);
  9. assertTrue(result);

原理:每次调用acquire在/lock1节点节点下使用CreateMode.EPHEMERAL_SEQUENTIAL 创建新的ephemeral节点,然后getChildren获取所有的children,判断刚刚创建的临时节点是否为第一个,如果是,则获取锁成功; 如果不是,则删除刚刚创建的临时节点。

注意: 每次accquire操作,成功,则请求zk server 2次(一次写,一次getChildren);如果失败,则请求zk server 3次(一次写,一次getChildren,一次delete)

InterProcessReadWriteLock

示例代码:

  1. @Test
  2. public void testReadWriteLock() throws Exception{
  3.     String readWriteLockPath = "/RWLock";
  4.     InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
  5.     InterProcessMutex writeLock1 = readWriteLock1.writeLock();
  6.     InterProcessMutex readLock1 = readWriteLock1.readLock();
  7.     InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
  8.     InterProcessMutex writeLock2 = readWriteLock2.writeLock();
  9.     InterProcessMutex readLock2 = readWriteLock2.readLock();
  10.     writeLock1.acquire();
  11.     // same with WriteLock, can read
  12.     assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
  13.     // different lock, can't read while writting
  14.     assertFalse(readLock2.acquire(1, TimeUnit.SECONDS));
  15.     // different write lock, can't write
  16.     assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS));
  17.     // release the write lock
  18.     writeLock1.release();
  19.     //both read lock can read
  20.     assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
  21.     assertTrue(readLock2.acquire(1, TimeUnit.SECONDS));
  22. }

原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。

注意: 同一个InterProcessReadWriteLock如果已经获取了write lock,则获取read lock也会成功

LeaderSelector

示例代码:

  1. @Test
  2. public void testLeader() throws Exception{
  3.     LeaderSelectorListener listener = new LeaderSelectorListener(){
  4.         @Override
  5.         public void takeLeadership(CuratorFramework client)
  6.                 throws Exception {
  7.             System.out.println("i'm leader");
  8.         }
  9.         @Override
  10.         public void handleException(CuratorFramework client,
  11.                 Exception exception) {
  12.         }
  13.         @Override
  14.         public void notifyClientClosing(CuratorFramework client) {
  15.         }};
  16.     String leaderPath = "/leader";
  17.     LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener);
  18.     selector1.start();
  19.     LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener);
  20.     selector2.start();
  21.     assertFalse(selector2.hasLeadership());
  22. }

原理:内部基于InterProcessMutex实现,具体细节参见shared lock一节

总结

curator还提供了很多其他的实现,具体参见https://github.com/Netflix/curator/wiki/Recipes

 

参考文章:

发表评论