标签归档:zookeeper

Netflix Curator使用

Netflix curator 是Netflix公司开源的一个Zookeeper client library,用于简化zookeeper客户端编程,包含一下几个模块:

  • curator-client - zookeeper client封装,用于取代原生的zookeeper客户端,提供一些非常有用的客户端特性
  • curator-framework - zookeeper api的高层封装,大大简化zookeeper客户端编程,添加了例如zookeeper连接管理、重试机制等
  • curator-recipes - zookeeper recipes 基于curator-framework的实现(除2PC以外)

maven dependency:

  1. <dependency>
  2.     <groupId>com.netflix.curator</groupId>
  3.     <artifactId>curator-recipes</artifactId>
  4.     <version>0.6.4</version>
  5. </dependency>

注意:在www.mvnrepository.com中认为0.32为最新版本,其实迄今为止最新版本为0.64,github trunk中的版本现在是0.65-SNAPSHOT

curator framework 使用

  1.               String path = "/test_path";
  2. CuratorFramework client = CuratorFrameworkFactory.builder()
  3.         .connectString("test:2181").namespace("/test1")
  4.         .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
  5.         .connectionTimeoutMs(5000).build();
  6. //create a node
  7. client.create().forPath("/head"new byte[0]);
  8. //delete a node in background
  9. client.delete().inBackground().forPath("/head");
  10. // create a EPHEMERAL_SEQUENTIAL
  11. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child"new byte[0]);
  12. // get the data 
  13. client.getData().watched().inBackground().forPath("/test");
  14. // check the path exits
  15. client.checkExists().forPath(path);

curator framework使用builder模式和类似nio的chain api,代码非常简洁

curator recipes 使用

InterProcessMutex

用途:进程间互斥锁

示例代码:

  1. String lockName = "/lock1";
  2. InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);
  3. InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);
  4. lock1.acquire();
  5. boolean result = lock2.acquire(1, TimeUnit.SECONDS);
  6. assertFalse(result);
  7. lock1.release();
  8. result = lock2.acquire(1, TimeUnit.SECONDS);
  9. assertTrue(result);

原理:每次调用acquire在/lock1节点节点下使用CreateMode.EPHEMERAL_SEQUENTIAL 创建新的ephemeral节点,然后getChildren获取所有的children,判断刚刚创建的临时节点是否为第一个,如果是,则获取锁成功; 如果不是,则删除刚刚创建的临时节点。

注意: 每次accquire操作,成功,则请求zk server 2次(一次写,一次getChildren);如果失败,则请求zk server 3次(一次写,一次getChildren,一次delete)

InterProcessReadWriteLock

示例代码:

  1. @Test
  2. public void testReadWriteLock() throws Exception{
  3.     String readWriteLockPath = "/RWLock";
  4.     InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
  5.     InterProcessMutex writeLock1 = readWriteLock1.writeLock();
  6.     InterProcessMutex readLock1 = readWriteLock1.readLock();
  7.     InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
  8.     InterProcessMutex writeLock2 = readWriteLock2.writeLock();
  9.     InterProcessMutex readLock2 = readWriteLock2.readLock();
  10.     writeLock1.acquire();
  11.     // same with WriteLock, can read
  12.     assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
  13.     // different lock, can't read while writting
  14.     assertFalse(readLock2.acquire(1, TimeUnit.SECONDS));
  15.     // different write lock, can't write
  16.     assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS));
  17.     // release the write lock
  18.     writeLock1.release();
  19.     //both read lock can read
  20.     assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
  21.     assertTrue(readLock2.acquire(1, TimeUnit.SECONDS));
  22. }

原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。

注意: 同一个InterProcessReadWriteLock如果已经获取了write lock,则获取read lock也会成功

LeaderSelector

示例代码:

  1. @Test
  2. public void testLeader() throws Exception{
  3.     LeaderSelectorListener listener = new LeaderSelectorListener(){
  4.         @Override
  5.         public void takeLeadership(CuratorFramework client)
  6.                 throws Exception {
  7.             System.out.println("i'm leader");
  8.         }
  9.         @Override
  10.         public void handleException(CuratorFramework client,
  11.                 Exception exception) {
  12.         }
  13.         @Override
  14.         public void notifyClientClosing(CuratorFramework client) {
  15.         }};
  16.     String leaderPath = "/leader";
  17.     LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener);
  18.     selector1.start();
  19.     LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener);
  20.     selector2.start();
  21.     assertFalse(selector2.hasLeadership());
  22. }

原理:内部基于InterProcessMutex实现,具体细节参见shared lock一节

总结

curator还提供了很多其他的实现,具体参见https://github.com/Netflix/curator/wiki/Recipes

 

参考文章:

Zookeeper开源客户端框架Curator简介

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手.看完官方的文档之后, 发现Curator主要解决了三类问题:

  • 封装ZooKeeper client与ZooKeeper server之间的连接处理;
  • 提供了一套Fluent风格的操作API;
  • 提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装.

Curator列举的ZooKeeper使用过程中的几个问题
初始化连接的问题: 在client与server之间握手建立连接的过程中, 如果握手失败, 执行所有的同步方法(比如create, getData等)将抛出异常
自动恢复(failover)的问题: 当client与一台server的连接丢失,并试图去连接另外一台server时, client将回到初始连接模式
session过期的问题: 在极端情况下, 出现ZooKeeper session过期, 客户端需要自己去监听该状态并重新创建ZooKeeper实例 .
对可恢复异常的处理:当在server端创建一个有序ZNode, 而在将节点名返回给客户端时崩溃, 此时client端抛出可恢复的异常, 用户需要自己捕获这些异常并进行重试
使用场景的问题:Zookeeper提供了一些标准的使用场景支持, 但是ZooKeeper对这些功能的使用说明文档很少, 而且很容易用错. 在一些极端场景下如何处理, zk并没有给出详细的文档说明. 比如共享锁服务, 当服务器端创建临时顺序节点成功, 但是在客户端接收到节点名之前挂掉了, 如果不能很好的处理这种情况, 将导致死锁.

Curator主要从以下几个方面降低了zk使用的复杂性:
重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略, 并且内部也提供了几种标准的重试策略(比如指数补偿).
连接状态监控: Curator初始化之后会一直的对zk连接进行监听, 一旦发现连接状态发生变化, 将作出相应的处理.
zk客户端实例管理:Curator对zk客户端到server集群连接进行管理. 并在需要的情况, 重建zk实例, 保证与zk集群的可靠连接
各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景), 这些实现都遵循了zk的最佳实践, 并考虑了各种极端情况.

Curator通过以上的处理, 让用户专注于自身的业务本身, 而无需花费更多的精力在zk本身.

Curator声称的一些亮点:

日志工具
内部采用SLF4J 来输出日志
采用驱动器(driver)机制, 允许扩展和定制日志和跟踪处理
提供了一个TracerDriver接口, 通过实现addTrace()和addCount()接口来集成用户自己的跟踪框架

和Curator相比, 另一个ZooKeeper客户端——zkClient(https://github.com/sgroschupf/zkclient)的不足之处:
文档几乎没有
异常处理弱爆了(简单的抛出RuntimeException)
重试处理太难用了
没有提供各种使用场景的实现

对ZooKeeper自带客户端(ZooKeeper类)的"抱怨":
只是一个底层实现
要用需要自己写大量的代码
很容易误用
需要自己处理连接丢失, 重试等

Curator几个组成部分

  • Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法.
  • Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理
  • Recipes: 实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上
  • Utilities:各种ZooKeeper的工具类
  • Errors: 异常处理, 连接, 恢复等.
  • Extensions: recipe扩展

Client
这是一个底层的API, 应用方基本对这个可以无视, 最好直接从Curator Framework入手
主要包括三部分:
不间断连接管理
连接重试处理

Retry Loop(循环重试)
一种典型的用法:

Java代码
  1. RetryLoop retryLoop = client.newRetryLoop();
  2. while ( retryLoop.shouldContinue() )
  3. {
  4.    try
  5.    {
  6.        // perform your work
  7.        ...
  8.        // it's important to re-get the ZK instance as there may have been an error and the instance was re-created
  9.        ZooKeeper      zk = client.getZookeeper();
  10.        retryLoop.markComplete();
  11.    }
  12.    catch ( Exception e )
  13.    {
  14.        retryLoop.takeException(e);
  15.    }
  16. }

如果在操作过程中失败, 且这种失败是可重试的, 而且在允许的次数内, Curator将保证操作的最终完成.

另一种使用Callable接口的重试做法:

Java代码
  1. RetryLoop.callWithRetry(client, new Callable()
  2. {
  3.       @Override
  4.       public Void call() throws Exception
  5.       {
  6.           // do your work here - it will get retried if needed
  7.           return null;
  8.       }
  9. });

重试策略
RetryPolicy接口只有一个方法(以前版本有两个方法):
public boolean allowRetry(int retryCount, long elapsedTimeMs);
在开始重试之前, allowRetry方法被调用, 其参数将指定当前重试次数, 和操作已消耗时间. 如果允许, 将继续重试, 否则抛出异常.

Curator内部实现的几种重试策略:

  • ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
  • RetryNTimes:指定最大重试次数的重试策略
  • RetryOneTime:仅重试一次
  • RetryUntilElapsed:一直重试直到达到规定的时间

Framework
是ZooKeeper Client更高的抽象API
自动连接管理: 当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明
更清晰的API: 简化了ZooKeeper原生的方法, 事件等, 提供流程的接口

CuratorFrameworkFactory类提供了两个方法, 一个工厂方法newClient, 一个构建方法build. 使用工厂方法newClient可以创建一个默认的实例, 而build构建方法可以对实例进行定制. 当CuratorFramework实例构建完成, 紧接着调用start()方法, 在应用结束的时候, 需要调用close()方法.  CuratorFramework是线程安全的. 在一个应用中可以共享同一个zk集群的CuratorFramework.

CuratorFramework API采用了连贯风格的接口(Fluent Interface). 所有的操作一律返回构建器, 当所有元素加在一起之后, 整个方法看起来就像一个完整的句子. 比如下面的操作:

Java代码
  1. client.create().forPath("/head"new byte[0]);
  2. client.delete().inBackground().forPath("/head");
  3. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child"new byte[0]);
  4. client.getData().watched().inBackground().forPath("/test");

方法说明:

  • create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾
  • delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
  • checkExists(): 发起一个检查ZNode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forPath()方法结尾
  • getData(): 发起一个获取ZNode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
  • setData(): 发起一个设置ZNode数据的操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
  • getChildren(): 发起一个获取ZNode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
  • inTransaction(): 发起一个ZooKeeper事务. 可以组合create, setData, check, 和/或delete 为一个操作, 然后commit() 提交

.

通知(Notification)
Curator的相关代码已经更新了, 里面的接口已经由ClientListener改成CuratorListener了, 而且接口中去掉了clientCloseDueToError方法. 只有一个方法:
eventReceived()            当一个后台操作完成或者指定的watch被触发时该方法被调用

UnhandledErrorListener接口用来对异常进行处理.

CuratorEvent(在以前版本为ClientEvent)是对各种操作触发相关事件对象(POJO)的一个完整封装, 而事件对象的内容跟事件类型相关, 下面是对应关系:

CREATEgetResultCode() and getPath()
DELETEgetResultCode() and getPath()
EXISTSgetResultCode(), getPath() and getStat()
GET_DATAgetResultCode(), getPath(), getStat() and getData()
SET_DATAgetResultCode(), getPath() and getStat()
CHILDRENgetResultCode(), getPath(), getStat(), getChildren()
WATCHEDgetWatchedEvent()

名称空间(Namespace)
因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下:

Java代码
  1. CuratorFramework    client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
  2.  …
  3. client.create().forPath("/test", data);
  4. // node was actually written to: "/MyApp/test"

Recipe

Curator实现ZooKeeper的所有recipe(除了两段提交)
选举
集群领导选举(leader election)

锁服务
共享锁: 全局同步分布式锁, 同一时间两台机器只有一台能获得同一把锁.
共享读写锁: 用于分布式的读写互斥处理, 同时生成两个锁:一个读锁, 一个写锁, 读锁能被多个应用持有, 而写锁只能一个独占, 当写锁未被持有时, 多个读锁持有者可以同时进行读操作
共享信号量: 在分布式系统中的各个JVM使用同一个zk lock path, 该path将跟一个给定数量的租约(lease)相关联, 然后各个应用根据请求顺序获得对应的lease, 相对来说, 这是最公平的锁服务使用方式.
多共享锁:内部构件多个共享锁(会跟一个znode path关联), 在acquire()过程中, 执行所有共享锁的acquire()方法, 如果中间出现一个失败, 则将释放所有已require的共享锁; 执行release()方法时, 则执行内部多个共享锁的release方法(如果出现失败将忽略)

队列(Queue)
分布式队列:采用持久顺序zk node来实现FIFO队列, 如果有多个消费者, 可以使用LeaderSelector来保证队列的消费者顺序
分布式优先队列: 优先队列的分布式版本
BlockingQueueConsumer: JDK阻塞队列的分布式版本

关卡(Barrier)
分布式关卡:一堆客户端去处理一堆任务, 只有所有的客户端都执行完, 所有客户端才能继续往下处理
双分布式关卡:同时开始, 同时结束

计数器(Counter)
共享计数器:所有客户端监听同一个znode path, 并共享一个最新的integer计数值
分布式AtomicLong(AtomicInteger): AtomicXxx的分布式版本, 先采用乐观锁更新, 若失败再采用互斥锁更新, 可以配置重试策略来处理重试

工具类

Path Cache
Path Cache用于监听ZNode的子节点的变化, 当add, update, remove子节点时将改变Path Cache state, 同时返回所有子节点的data和state.
Curator中采用了PathChildrenCache类来处理Path Cache, 状态的变化则采用PathChildrenCacheListener来监听.
相关用法参见TestPathChildrenCache测试类

注意: 当zk server的数据发生变化, zk client会出现不一致, 这个需要通过版本号来识别这种状态的变化

Test Server
用来在测试中模拟一个本地进程内ZooKeeper Server.

Test Cluster
用来在测试中模拟一个ZooKeeper Server集群

ZKPaths工具类
提供了和ZNode相关的path处理工具方法:

  • getNodeFromPath: 根据给定path获取node name. i.e. "/one/two/three" -> "three"
  •     mkdirs: 根据给定路径递归创建所有node
  •     getSortedChildren: 根据给定路径, 返回一个按序列号排序的子节点列表
  •     makePath: 根据给定的path和子节点名, 创建一个完整path

EnsurePath工具类

直接看例子, 具体的说就是调用多次, 只会执行一次创建节点操作.

Java代码
  1. EnsurePath       ensurePath = new EnsurePath(aFullPathToEnsure);
  2. ...
  3. String           nodePath = aFullPathToEnsure + "/foo";
  4. ensurePath.ensure(zk);   // first time syncs and creates if needed
  5. zk.create(nodePath, ...);
  6. ...
  7. ensurePath.ensure(zk);   // subsequent times are NOPs
  8. zk.create(nodePath, ...);

Notification事件处理
Curator对ZooKeeper的事件Watcher进行了封装处理, 然后实现了一套监听机制. 提供了几个监听接口用来处理ZooKeeper连接状态的变化
当连接出现异常, 将通过ConnectionStateListener接口进行监听, 并进行相应的处理, 这些状态变化包括:

  • 暂停(SUSPENDED): 当连接丢失, 将暂停所有操作, 直到连接重新建立, 如果在规定时间内无法建立连接, 将触发LOST通知
  • 重连(RECONNECTED): 连接丢失, 执行重连时, 将触发该通知
  • 丢失(LOST): 连接超时时, 将触发该通知

从 com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent) 方法中我们可以知道, Curator分别将ZooKeeper的Disconnected, Expired, SyncConnected三种状态转换成上面三种状态.

参考

来源:http://macrochen.iteye.com/blog/1366136

 

参考文章:

Zookeeper Client简介

直 接使用zk的api实现业务功能比较繁琐。因为要处理session loss,session expire等异常,在发生这些异常后进行重连。又因为ZK的watcher是一次性的,如果要基于wather实现发布/订阅模式,还要自己包装一下, 将一次性订阅包装成持久订阅。另外如果要使用抽象级别更高的功能,比如分布式锁,leader选举等,还要自己额外做很多事情。这里介绍下ZK的两个第三 方客户端包装小工具,可以分别解决上述小问题。

一、 zkClient
zkClient 主要做了两件事情。一件是在session loss和session expire时自动创建新的ZooKeeper实例进行重连。另一件是将一次性watcher包装为持久watcher。后者的具体做法是简单的在 watcher回调中,重新读取数据的同时再注册相同的watcher实例。

zkClient简单的使用样例如下:

	public static void testzkClient(final String serverList) {
		ZkClient zkClient4subChild = new ZkClient(serverList);
		zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List currentChilds) throws Exception {
				System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
			}
		});

上面是订阅children变化,下面是订阅数据变化

		ZkClient zkClient4subData = new ZkClient(serverList);
		zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				System.out.println(prefix() + "Data of " + dataPath + " has changed");
			}

			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				System.out.println(prefix() + dataPath + " has deleted");
			}
		});

订阅连接状态的变化:

		ZkClient zkClient4subStat = new ZkClient(serverList);
		zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
			@Override
			public void handleNewSession() throws Exception {
				System.out.println(prefix() + "handleNewSession()");
			}

			@Override
			public void handleStateChanged(KeeperState stat) throws Exception {
				System.out.println(prefix() + "handleStateChanged,stat:" + stat);
			}
		});

下面表格列出了写操作与ZK内部产生的事件的对应关系:

event For “/path”event For “/path/child”

create(“/path”)EventType.NodeCreatedNA
delete(“/path”)EventType.NodeDeletedNA
setData(“/path”)EventType.NodeDataChangedNA
create(“/path/child”)EventType.NodeChildrenChangedEventType.NodeCreated
delete(“/path/child”)EventType.NodeChildrenChangedEventType.NodeDeleted
setData(“/path/child”)NAEventType.NodeDataChanged

而ZK内部的写事件与所触发的watcher的对应关系如下:

event For “/path”defaultWatcherexists
(“/path”)
getData
(“/path”)
getChildren
(“/path”)

EventType.None
EventType.NodeCreated
EventType.NodeDeleted√(不正常)
EventType.NodeDataChanged
EventType.NodeChildrenChanged

综合上面两个表,我们可以总结出各种写操作可以触发哪些watcher,如下表所示:

“/path”“/path/child”   existsgetDatagetChildrenexistsgetDatagetChildren

create(“/path”)
delete(“/path”)
setData(“/path”)
create(“/path/child”)
delete(“/path/child”)
setData(“/path/child”)

如果发生session close、authFail和invalid,那么所有类型的wather都会被触发

zkClient 除了做了一些便捷包装之外,对watcher使用做了一点增强。比如subscribeChildChanges实际上是通过exists和 getChildren关注了两个事件。这样当create(“/path”)时,对应path上通过getChildren注册的listener也会 被调用。另外subscribeDataChanges实际上只是通过exists注册了事件。因为从上表可以看到,对于一个更新,通过exists和 getData注册的watcher要么都会触发,要么都不会触发。

zkClient地址:https://github.com/sgroschupf/zkclient
Maven工程中使用zkClient需要加的依赖:

    <dependency>
        <groupId>zkclient</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.1</version>
    </dependency>

二、 menagerie

menagerie基于Zookeeper实现了java.util.concurrent包的一个分布式版本。这个封装是更大粒度上对各种分布式一致性使用场景的抽象。其中最基础和常用的是一个分布式锁的实现:
org.menagerie.locks.ReentrantZkLock, 通过ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL类型znode的支持,实现了分布式锁。具体做法是:不同的 client上每个试图获得锁的线程,都在相同的basepath下面创建一个EPHEMERAL_SEQUENTIAL的node。EPHEMERAL 表示要创建的是临时znode,创建连接断开时会自动删除; SEQUENTIAL表示要自动在传入的path后面缀上一个自增的全局唯一后缀,作为最终的path。因此对不同的请求ZK会生成不同的后缀,并分别返 回带了各自后缀的path给各个请求。因为ZK全局有序的特性,不管client请求怎样先后到达,在ZKServer端都会最终排好一个顺序,因此自增 后缀最小的那个子节点,就对应第一个到达ZK的有效请求。然后client读取basepath下的所有子节点和ZK返回给自己的path进行比较,当发 现自己创建的sequential node的后缀序号排在第一个时,就认为自己获得了锁;否则的话,就认为自己没有获得锁。这时肯定是有其他并发的并且是没有断开的client/线程先创 建了node。

基于分布式锁,还实现了其他业务场景,比如leader选举:
public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181″, 5000);
LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(“Try to become the leader success!”);
}
}

java.util.concurrent包下面的其他接口实现,也主要是基于ReentrantZkLock的,比如ZkHashMap实现了ConcurrentMap。具体请参见menagerie的API文档

menagerie地址:https://github.com/openUtility/menagerie
Maven工程中使用menagerie需要加的依赖:

    <dependency>
        <groupId>org.menagerie</groupId>
        <artifactId>menagerie</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>
转载自:
http://rdc.taobao.com/team/jm/archives/1047

参考文章:

zookeeper原理(转)

ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。 Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,由于工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在 某些应用中使用,因此需要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。本文简单分析 zookeeper的工作原理,对于如何使用zookeeper不是本文讨论的重点。

1 Zookeeper的基本概念

1.1 角色

Zookeeper中的角色主要有以下三类,如下表所示:

系统模型如图所示:

1.2 设计目的

1.最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。

2 .可靠性:具有简单、健壮、良好的性能,如果消息m被到一台服务器接受,那么它将被所有的服务器接受。

3 .实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。

4 .等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待。

5.原子性:更新只能成功或者失败,没有中间状态。

6 .顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。

2 ZooKeeper的工作原理

Zookeeper 的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)和 广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以 后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

为 了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了 zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新 的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。

每个Server在工作过程中有三种状态:

  • LOOKING:当前Server不知道leader是谁,正在搜寻
  • LEADING:当前Server即为选举出来的leader
  • FOLLOWING:leader已经选举出来,当前Server与之同步

2.1 选主流程

当 leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的 Server都恢复到一个正确的状态。Zk的选举算法有两种:一种是基于basic paxos实现的,另外一种是基于fast paxos算法实现的。系统默认的选举算法为fast paxos。先介绍basic paxos流程:

  1. 1 .选举线程由当前Server发起选举的线程担任,其主要功能是对投票结果进行统计,并选出推荐的Server;
  2. 2 .选举线程首先向所有Server发起一次询问(包括自己);
  3. 3 .选举线程收到回复后,验证是否是自己发起的询问(验证zxid是否一致),然后获取对方的id(myid),并存储到当前询问对象列表中,最后获取对方提议的leader相关信息(id,zxid),并将这些信息存储到当次选举的投票记录表中;
  4. 4.  收到所有Server回复以后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server;
  5. 5.  线程将当前zxid最大的Server设置为当前Server要推荐的Leader,如果此时获胜的Server获得n/2 + 1的Server票数, 设置当前推荐的leader为获胜的Server,将根据获胜的Server相关信息设置自己的状态,否则,继续这个过程,直到leader被选举出来。

通过流程分析我们可以得出:要使Leader获得多数Server的支持,则Server总数必须是奇数2n+1,且存活的Server的数目不得少于n+1.

每个Server启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的server还会从磁盘快照中恢复数据和会话信息,zk会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。选主的具体流程图如下所示:

fast paxos流程是在选举过程中,某Server首先向所有Server提议自己要成为leader,当其它Server收到提议以后,解决epoch和 zxid的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息,重复这个流程,最后一定能选举出Leader。其流程图如下所示:

2.2 同步流程

选完leader以后,zk就进入状态同步过程。

  1. 1. leader等待server连接;
  2. 2 .Follower连接leader,将最大的zxid发送给leader;
  3. 3 .Leader根据follower的zxid确定同步点;
  4. 4 .完成同步后通知follower 已经成为uptodate状态;
  5. 5 .Follower收到uptodate消息后,又可以重新接受client的请求进行服务了。

流程图如下所示:

2.3 工作流程

2.3.1 Leader工作流程

Leader主要有三个功能:

  1. 1 .恢复数据;
  2. 2 .维持与Learner的心跳,接收Learner请求并判断Learner的请求消息类型;
  3. 3 .Learner的消息类型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根据不同的消息类型,进行不同的处理。

PING 消息是指Learner的心跳信息;REQUEST消息是Follower发送的提议信息,包括写请求及同步请求;ACK消息是Follower的对提议 的回复,超过半数的Follower通过,则commit该提议;REVALIDATE消息是用来延长SESSION有效时间。
Leader的工作流程简图如下所示,在实际实现中,流程要比下图复杂得多,启动了三个线程来实现功能。

2.3.2 Follower工作流程

Follower主要有四个功能:

  1. 1. 向Leader发送请求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);
  2. 2 .接收Leader消息并进行处理;
  3. 3 .接收Client的请求,如果为写请求,发送给Leader进行投票;
  4. 4 .返回Client结果。

Follower的消息循环处理如下几种来自Leader的消息:

  1. 1 .PING消息: 心跳消息;
  2. 2 .PROPOSAL消息:Leader发起的提案,要求Follower投票;
  3. 3 .COMMIT消息:服务器端最新一次提案的信息;
  4. 4 .UPTODATE消息:表明同步完成;
  5. 5 .REVALIDATE消息:根据Leader的REVALIDATE结果,关闭待revalidate的session还是允许其接受消息;
  6. 6 .SYNC消息:返回SYNC结果到客户端,这个消息最初由客户端发起,用来强制得到最新的更新。

Follower的工作流程简图如下所示,在实际实现中,Follower是通过5个线程来实现功能的。

对于observer的流程不再叙述,observer流程和Follower的唯一不同的地方就是observer不会参加leader发起的投票。
主流应用场景:

Zookeeper的主流应用场景实现思路(除去官方示例)

(1)配置管理
集中式的配置管理在应用集群中是非常常见的,一般商业公司内部都会实现一套集中的配置管理中心,应对不同的应用集群对于共享各自配置的需求,并且在配置变更时能够通知到集群中的每一个机器。

Zookeeper很容易实现这种集中式的配置管理,比如将APP1的所有配置配置到/APP1 znode下,APP1所有机器一启动就对/APP1这个节点进行监控(zk.exist("/APP1",true)),并且实现回调方法Watcher,那么在zookeeper上/APP1 znode节点下数据发生变化的时候,每个机器都会收到通知,Watcher方法将会被执行,那么应用再取下数据即可(zk.getData("/APP1",false,null));

以上这个例子只是简单的粗颗粒度配置监控,细颗粒度的数据可以进行分层级监控,这一切都是可以设计和控制的。     
(2)集群管理
应用集群中,我们常常需要让每一个机器知道集群中(或依赖的其他某一个集群)哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器。

Zookeeper同样很容易实现这个功能,比如我在zookeeper服务器端有一个znode叫/APP1SERVERS,那么集群中每一个机器启动的时候都去这个节点下创建一个EPHEMERAL类型的节点,比如server1创建/APP1SERVERS/SERVER1(可以使用ip,保证不重复),server2创建/APP1SERVERS/SERVER2,然后SERVER1和SERVER2都watch /APP1SERVERS这个父节点,那么也就是这个父节点下数据或者子节点变化都会通知对该节点进行watch的客户端。因为EPHEMERAL类型节点有一个很重要的特性,就是客户端和服务器端连接断掉或者session过期就会使节点消失,那么在某一个机器挂掉或者断链的时候,其对应的节点就会消失,然后集群中所有对/APP1SERVERS进行watch的客户端都会收到通知,然后取得最新列表即可。

另外有一个应用场景就是集群选master,一旦master挂掉能够马上能从slave中选出一个master,实现步骤和前者一样,只是机器在启动的时候在APP1SERVERS创建的节点类型变为EPHEMERAL_SEQUENTIAL类型,这样每个节点会自动被编号

我们默认规定编号最小的为master,所以当我们对/APP1SERVERS节点做监控的时候,得到服务器列表,只要所有集群机器逻辑认为最小编号节点为master,那么master就被选出,而这个master宕机的时候,相应的znode会消失,然后新的服务器列表就被推送到客户端,然后每个节点逻辑认为最小编号节点为master,这样就做到动态master选举。

Zookeeper 监视(Watches) 简介

Zookeeper C API 的声明和描述在 include/zookeeper.h 中可以找到,另外大部分的 Zookeeper C API 常量、结构体声明也在 zookeeper.h 中,如果如果你在使用 C API 是遇到不明白的地方,最好看看 zookeeper.h,或者自己使用 doxygen 生成 Zookeeper C API 的帮助文档。

Zookeeper 中最有特色且最不容易理解的是监视(Watches)。Zookeeper 所有的读操作——getData(), getChildren(), 和 exists() 都 可以设置监视(watch),监视事件可以理解为一次性的触发器, 官方定义如下: a watch event is one-time trigger, sent to the client that set the watch, which occurs when the data for which the watch was set changes。对此需要作出如下理解:

  • (一次性触发)One-time trigger当 设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,如果客户端调用了 getData("/znode1", true) 并且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而如果 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,否则客户端不会收到事件通知。
  • (发送至客户端)Sent to the clientZookeeper 客户端和服务端是通过 socket 进行通信的,由于网络存在故障,所以监视事件很有可能不会成功地到达客户端,监视事件是异步发送至监视者的,Zookeeper 本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event). 网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。
  • (被设置 watch 的数据)The data for which the watch was set这 意味着 znode 节点本身具有不同的改变方式。你也可以想象 Zookeeper 维护了两条监视链表:数据监视和子节点监视(data watches and child watches) getData() and exists() 设置数据监视,getChildren() 设置子节点监视。 或者,你也可以想象 Zookeeper 设置的不同监视返回不同的数据,getData() 和 exists() 返回 znode 节点的相关信息,而 getChildren() 返回子节点列表。因此, setData() 会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create() 操作则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete() 操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。

Zookeeper 中的监视是轻量级的,因此容易设置、维护和分发。当客户端与 Zookeeper 服务器端失去联系时,客户端并不会收到监视事件的通知,只有当客户端重新连接后,若在必要的情况下,以前注册的监视会重新被注册并触发,对于开发人员来说 这通常是透明的。只有一种情况会导致监视事件的丢失,即:通过 exists() 设置了某个 znode 节点的监视,但是如果某个客户端在此 znode 节点被创建和删除的时间间隔内与 zookeeper 服务器失去了联系,该客户端即使稍后重新连接 zookeeper服务器后也得不到事件通知。

Zookeeper C API 常量与部分结构(struct)介绍

与 ACL 相关的结构与常量:

struct Id 结构为:

struct Id {     char * scheme;     char * id; };

 

struct ACL 结构为:

struct ACL {     int32_t perms;     struct Id id; };

 

struct ACL_vector 结构为:

struct ACL_vector {     int32_t count;     struct ACL *data; };

 

与 znode 访问权限有关的常量

  • const int ZOO_PERM_READ; //允许客户端读取 znode 节点的值以及子节点列表。
  • const int ZOO_PERM_WRITE;// 允许客户端设置 znode 节点的值。
  • const int ZOO_PERM_CREATE; //允许客户端在该 znode 节点下创建子节点。
  • const int ZOO_PERM_DELETE;//允许客户端删除子节点。
  • const int ZOO_PERM_ADMIN; //允许客户端执行 set_acl()。
  • const int ZOO_PERM_ALL;//允许客户端执行所有操作,等价与上述所有标志的或(OR) 。

与 ACL IDs 相关的常量

  • struct Id ZOO_ANYONE_ID_UNSAFE; //(‘world’,’anyone’)
  • struct Id ZOO_AUTH_IDS;// (‘auth’,’’)

三种标准的 ACL

  • struct ACL_vector ZOO_OPEN_ACL_UNSAFE; //(ZOO_PERM_ALL,ZOO_ANYONE_ID_UNSAFE)
  • struct ACL_vector ZOO_READ_ACL_UNSAFE;// (ZOO_PERM_READ, ZOO_ANYONE_ID_UNSAFE)
  • struct ACL_vector ZOO_CREATOR_ALL_ACL; //(ZOO_PERM_ALL,ZOO_AUTH_IDS)

与 Interest 相关的常量:ZOOKEEPER_WRITE, ZOOKEEPER_READ

这 两个常量用于标识感兴趣的事件并通知 zookeeper 发生了哪些事件。Interest 常量可以进行组合或(OR)来标识多种兴趣(multiple interests: write, read),这两个常量一般用于 zookeeper_interest() 和 zookeeper_process()两个函数中。

与节点创建相关的常量:ZOO_EPHEMERAL, ZOO_SEQUENCE

zoo_create 函数标志,ZOO_EPHEMERAL 用来标识创建临时节点,ZOO_SEQUENCE 用来标识节点命名具有递增的后缀序号(一般是节点名称后填充 10 位字符的序号,如 /xyz0000000000, /xyz0000000001, /xyz0000000002, ...),同样地,ZOO_EPHEMERAL, ZOO_SEQUENCE 可以组合。

与连接状态 Stat 相关的常量

以下常量均与 Zookeeper 连接状态有关,他们通常用作监视器回调函数的参数。

ZOOAPI const intZOO_EXPIRED_SESSION_STATE
ZOOAPI const intZOO_AUTH_FAILED_STATE
ZOOAPI const intZOO_CONNECTING_STATE
ZOOAPI const intZOO_ASSOCIATING_STATE
ZOOAPI const intZOO_CONNECTED_STATE

与监视类型(Watch Types)相关的常量

以下常量标识监视事件的类型,他们通常用作监视器回调函数的第一个参数。

Zookeeper C API 错误码介绍 ZOO_ERRORS

ZOK正常返回
ZSYSTEMERROR系统或服务器端错误(System and server-side errors),服务器不会抛出该错误,该错误也只是用来标识错误范围的,即大于该错误值,且小于 ZAPIERROR 都是系统错误。
ZRUNTIMEINCONSISTENCY运行时非一致性错误。
ZDATAINCONSISTENCY数据非一致性错误。
ZCONNECTIONLOSSZookeeper 客户端与服务器端失去连接
ZMARSHALLINGERROR在 marshalling 和 unmarshalling 数据时出现错误(Error while marshalling or unmarshalling data)
ZUNIMPLEMENTED该操作未实现(Operation is unimplemented)
ZOPERATIONTIMEOUT该操作超时(Operation timeout)
ZBADARGUMENTS非法参数错误(Invalid arguments)
ZINVALIDSTATE非法句柄状态(Invliad zhandle state)
ZAPIERRORAPI 错误(API errors),服务器不会抛出该错误,该错误也只是用来标识错误范围的,错误值大于该值的标识 API 错误,而小于该值的标识 ZSYSTEMERROR。
ZNONODE节点不存在(Node does not exist)
ZNOAUTH没有经过授权(Not authenticated)
ZBADVERSION版本冲突(Version conflict)
ZNOCHILDRENFOREPHEMERALS临时节点不能拥有子节点(Ephemeral nodes may not have children)
ZNODEEXISTS节点已经存在(The node already exists)
ZNOTEMPTY该节点具有自身的子节点(The node has children)
ZSESSIONEXPIRED会话过期(The session has been expired by the server)
ZINVALIDCALLBACK非法的回调函数(Invalid callback specified)
ZINVALIDACL非法的ACL(Invalid ACL specified)
ZAUTHFAILED客户端授权失败(Client authentication failed)
ZCLOSINGZookeeper 连接关闭(ZooKeeper is closing)
ZNOTHING并非错误,客户端不需要处理服务器的响应(not error, no server responses to process)
ZSESSIONMOVED会话转移至其他服务器,所以操作被忽略(session moved to another server, so operation is ignored)

 

Watch事件类型:

 

ZOO_CREATED_EVENT:节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过zoo_exists()设置
ZOO_DELETED_EVENT:节点删除事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHANGED_EVENT:节点数据改变事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHILD_EVENT:子节点列表改变事件,此watch通过zoo_get_children()或zoo_get_children2()设置
ZOO_SESSION_EVENT:会话失效事件,客户端与服务端断开或重连时触发
ZOO_NOTWATCHING_EVENT:watch移除事件,服务端出于某些原因不再为客户端watch节点时触发

来源:http://cailin.iteye.com/blog/2014486

参考文章:

zookeeper学习之Curator客户端

博客分类:

Curator框架是最好用,最流行的zookeeper的客户端。

它有以下三个优点

1.提供了一套非常友好的操作API;

2. 提供一些高级特性(包括但不仅限于前篇文章中提到的)的封装

3.易测试

 

maven依赖如下

 

Xml代码
  1. <dependency>
  2.     <groupId>org.apache.curator</groupId>
  3.     <artifactId>curator-recipes</artifactId>
  4.     <version>2.5.0</version>
  5. </dependency>

 

 

按照官方给出的文档和包结构,可以轻松的看出Curator功能分两大类,一是对zookeeper的一些基本命令的封装,比如增删改查。是他的framework模块,一个是他的高级特性,即recipes模块。

 

一、framework模块

Curator提供了一套Fluent风格的操作API。这在很多脚本类语言里比较流行。

比如他创建client的代码是这样

Java代码
  1. CuratorFramework client = builder.connectString("192.168.11.56:2180")
  2.         .sessionTimeoutMs(30000)
  3.         .connectionTimeoutMs(30000)
  4.         .canBeReadOnly(false)
  5.         .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
  6.         .namespace(namespace)
  7.         .defaultData(null)
  8.         .build();
  9. client.start();

一路点到底,这就是所谓的Fluent风格。

 

我们再看增删改查的

Java代码  收藏代码
  1. public class CrudExamples {
  2.     private static CuratorFramework client = ClientFactory.newClient();
  3.     private static final String PATH = "/crud";
  4.     public static void main(String[] args) {
  5.         try {
  6.             client.start();
  7.             client.create().forPath(PATH, "I love messi".getBytes());
  8.             byte[] bs = client.getData().forPath(PATH);
  9.             System.out.println("新建的节点,data为:" + new String(bs));
  10.             client.setData().forPath(PATH, "I love football".getBytes());
  11.             // 由于是在background模式下获取的data,此时的bs可能为null
  12.             byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);
  13.             System.out.println("修改后的data为" + new String(bs2 != null ? bs2 : new byte[0]));
  14.             client.delete().forPath(PATH);
  15.             Stat stat = client.checkExists().forPath(PATH);
  16.             // Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在!
  17.             System.out.println(stat);
  18.         } catch (Exception e) {
  19.             e.printStackTrace();
  20.         } finally {
  21.             CloseableUtils.closeQuietly(client);
  22.         }
  23.     }
  24. }

常用接口有

create()增

delete(): 删

checkExists(): 判断是否存在

setData():  改

getData(): 查

所有这些方法都以forpath()结尾,辅以watch(监听),withMode(指定模式),和inBackground(后台运行)等方法来使用。

 

此外,Curator还支持事务,一组crud操作同生同灭。代码如下

Java代码
  1. /**
  2.  * 事务操作
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class TransactionExamples {
  7.     private static CuratorFramework client = ClientFactory.newClient();
  8.     public static void main(String[] args) {
  9.         try {
  10.             client.start();
  11.             // 开启事务
  12.             CuratorTransaction transaction = client.inTransaction();
  13.             Collection<CuratorTransactionResult> results = transaction.create()
  14.                     .forPath("/a/path""some data".getBytes()).and().setData()
  15.                     .forPath("/another/path""other data".getBytes()).and().delete().forPath("/yet/another/path")
  16.                     .and().commit();
  17.             for (CuratorTransactionResult result : results) {
  18.                 System.out.println(result.getForPath() + " - " + result.getType());
  19.             }
  20.         } catch (Exception e) {
  21.             e.printStackTrace();
  22.         } finally {
  23.             // 释放客户端连接
  24.             CloseableUtils.closeQuietly(client);
  25.         }
  26.     }
  27. }

这段的代码的运行结果,由于最后一步delete的节点不存在,所以整个事务commit失败。失败的原因会放在Collection<CuratorTransactionResult>中,非常友好。

 

好了framework部分的内容就这么多,是不是特别简单呢。下面就来看看recipes包的内容吧。。

 

Recipes部分提供的功能官网列的很详细,点击这里。注意文章第一段:Curator宣称,Recipes模块实现了除二阶段提交之外的所有zookeeper特性。

 

二、Recipes模块

 

主要有

Elections(选举),Locks(锁),Barriers(关卡),Atomic(原子量),Caches,Queues等

 

1、 Elections

选举主要依赖于LeaderSelector和LeaderLatch2个类。前者是所有存活的客户端不间断的轮流做Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?

 

这两者在实现上是可以切换的,直接上代码,怎么切换注释里有。由于篇幅所限,这里仅贴出基于LeaderSelector的选举,更多代码见附件

Java代码
  1. /**
  2.  * 本类基于leaderSelector实现,所有存活的client会公平的轮流做leader
  3.  * 如果不想频繁的变化Leader,需要在takeLeadership方法里阻塞leader的变更! 或者使用 {@link}
  4.  * LeaderLatchClient
  5.  */
  6. public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
  7.     private final String name;
  8.     private final LeaderSelector leaderSelector;
  9.     private final String PATH = "/leaderselector";
  10.     public LeaderSelectorClient(CuratorFramework client, String name) {
  11.         this.name = name;
  12.         leaderSelector = new LeaderSelector(client, PATH, this);
  13.         leaderSelector.autoRequeue();
  14.     }
  15.     public void start() throws IOException {
  16.         leaderSelector.start();
  17.     }
  18.     @Override
  19.     public void close() throws IOException {
  20.         leaderSelector.close();
  21.     }
  22.     /**
  23.      * client成为leader后,会调用此方法
  24.      */
  25.     @Override
  26.     public void takeLeadership(CuratorFramework client) throws Exception {
  27.         int waitSeconds = (int) (5 * Math.random()) + 1;
  28.         System.out.println(name + "是当前的leader");
  29.         try {
  30.             Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
  31.         } catch (InterruptedException e) {
  32.             Thread.currentThread().interrupt();
  33.         } finally {
  34.             System.out.println(name + " 让出领导权\n");
  35.         }
  36.     }

 

Java代码  收藏代码
  1. /**
  2.  * leader选举
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class LeaderSelectorExample {
  7.     public static void main(String[] args) {
  8.         List<CuratorFramework> clients = Lists.newArrayList();
  9.         List<LeaderSelectorClient> examples = Lists.newArrayList();
  10.         try {
  11.             for (int i = 0; i < 10; i++) {
  12.                 CuratorFramework client = ClientFactory.newClient();
  13.                 LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);
  14.                 clients.add(client);
  15.                 examples.add(example);
  16.                 client.start();
  17.                 example.start();
  18.             }
  19.             System.out.println("----------先观察一会选举的结果-----------");
  20.             Thread.sleep(10000);
  21.             System.out.println("----------关闭前5个客户端,再观察选举的结果-----------");
  22.             for (int i = 0; i < 5; i++) {
  23.                 clients.get(i).close();
  24.             }
  25.             // 这里有个小技巧,让main程序一直监听控制台输入,异步的代码就可以一直在执行。不同于while(ture)的是,按回车或esc可退出
  26.             new BufferedReader(new InputStreamReader(System.in)).readLine();
  27.         } catch (Exception e) {
  28.             e.printStackTrace();
  29.         } finally {
  30.             for (LeaderSelectorClient exampleClient : examples) {
  31.                 CloseableUtils.closeQuietly(exampleClient);
  32.             }
  33.             for (CuratorFramework client : clients) {
  34.                 CloseableUtils.closeQuietly(client);
  35.             }
  36.         }
  37.     }
  38. }

 

2、locks

curator lock相关的实现在recipes.locks包里。顶级接口都是InterProcessLock。我们直接看最有代表性的 InterProcessReadWriteLock 进程内部读写锁(可重入读写锁)。什么叫可重入,什么叫读写锁。不清楚的先查好资料吧。总之读写锁一定是成对出现的。    简易传送门

 

我们先定义两个任务,可并行的执行的,和互斥执行的。

Java代码
  1. /**
  2.  * 并行任务
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class ParallelJob implements Runnable {
  7.     private final String name;
  8.     private final InterProcessLock lock;
  9.     // 锁等待时间
  10.     private final int wait_time = 5;
  11.     ParallelJob(String name, InterProcessLock lock) {
  12.         this.name = name;
  13.         this.lock = lock;
  14.     }
  15.     @Override
  16.     public void run() {
  17.         try {
  18.             doWork();
  19.         } catch (Exception e) {
  20.             // ingore;
  21.         }
  22.     }
  23.     public void doWork() throws Exception {
  24.         try {
  25.             if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
  26.                 System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。");
  27.             }
  28.             // 模拟job执行时间0-4000毫秒
  29.             int exeTime = new Random().nextInt(4000);
  30.             System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------");
  31.             Thread.sleep(exeTime);
  32.         } catch (Exception e) {
  33.             e.printStackTrace();
  34.         } finally {
  35.             lock.release();
  36.         }
  37.     }
  38. }

 

Java代码
  1. /**
  2.  * 互斥任务
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class MutexJob implements Runnable {
  7.     private final String name;
  8.     private final InterProcessLock lock;
  9.     // 锁等待时间
  10.     private final int wait_time = 10;
  11.     MutexJob(String name, InterProcessLock lock) {
  12.         this.name = name;
  13.         this.lock = lock;
  14.     }
  15.     @Override
  16.     public void run() {
  17.         try {
  18.             doWork();
  19.         } catch (Exception e) {
  20.             // ingore;
  21.         }
  22.     }
  23.     public void doWork() throws Exception {
  24.         try {
  25.             if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
  26.                 System.err.println(name + "等待" + wait_time + "秒,仍未能获取到lock,准备放弃。");
  27.             }
  28.             // 模拟job执行时间0-2000毫秒
  29.             int exeTime = new Random().nextInt(2000);
  30.             System.out.println(name + "开始执行,预计执行时间= " + exeTime + "毫秒----------");
  31.             Thread.sleep(exeTime);
  32.         } catch (Exception e) {
  33.             e.printStackTrace();
  34.         } finally {
  35.             lock.release();
  36.         }
  37.     }
  38. }

 

锁测试代码

 

Java代码
  1. /**
  2.  * 分布式锁实例
  3.  * 
  4.  * @author shencl
  5.  */
  6. public class DistributedLockExample {
  7.     private static CuratorFramework client = ClientFactory.newClient();
  8.     private static final String PATH = "/locks";
  9.     // 进程内部(可重入)读写锁
  10.     private static final InterProcessReadWriteLock lock;
  11.     // 读锁
  12.     private static final InterProcessLock readLock;
  13.     // 写锁
  14.     private static final InterProcessLock writeLock;
  15.     static {
  16.         client.start();
  17.         lock = new InterProcessReadWriteLock(client, PATH);
  18.         readLock = lock.readLock();
  19.         writeLock = lock.writeLock();
  20.     }
  21.     public static void main(String[] args) {
  22.         try {
  23.             List<Thread> jobs = Lists.newArrayList();
  24.             for (int i = 0; i < 10; i++) {
  25.                 Thread t = new Thread(new ParallelJob("Parallel任务" + i, readLock));
  26.                 jobs.add(t);
  27.             }
  28.             for (int i = 0; i < 10; i++) {
  29.                 Thread t = new Thread(new MutexJob("Mutex任务" + i, writeLock));
  30.                 jobs.add(t);
  31.             }
  32.             for (Thread t : jobs) {
  33.                 t.start();
  34.             }
  35.         } catch (Exception e) {
  36.             e.printStackTrace();
  37.         } finally {
  38.             CloseableUtils.closeQuietly(client);
  39.         }
  40.     }
  41. }

 

看到没,用法和java concurrent包里的ReentrantReadWriteLock 是一模一样的。

事实上,整个recipes包的目录结构、实现原理同java concurrent包的设置是很一致的。比如有queue,Semaphore,Barrier等类,。他整个就是模仿jdk的实现,只不过是基于分布式的!

 

后边的几项,Barriers(关卡),Atomic(原子量),Caches,Queues和java concurrent包里的类的用法是一样的,就不继续贴了,有些附件里有。

要说明的是:有的功能性能不是特别理想,网上也没见有大的项目的使用案例。比如基于CAS机制的atomic,在某些情况重试的效率还不如硬同步,要是zookeeper节点再一多,各个节点之间通过event触发的数据同步极其频繁。那性能可以想象。

 

三、测试方法

curator提供了很好的测试工具,你甚至是可以在完全没有搭建zookeeper server端的情况下,完成测试。

有2个重要的类

TestingServer 模拟单点, TestingCluster模拟集群。

需要使用的话,得依赖

Xml代码
  1. <dependency>
  2.     <groupId>org.apache.curator</groupId>
  3.     <artifactId>curator-test</artifactId>
  4.     <version>2.5.0</version>
  5. </dependency>

来源:http://supben.iteye.com/blog/2094077

参考文章: