月度归档:2015年04月

Java多线程CountDownLatch原理

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务。
CountDownLatch的用法

CountDownLatch典型用法1:某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为n new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

CountDownLatch典型用法2:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
CountDownLatch的不足

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

 

来源:http://www.cnblogs.com/skywang12345/p/3533887.html
里面有一个系列的文章介绍java多线程的源代码分析等, 非常好! 值得观看!

概要

前面对"独占锁"和"共享锁"有了个大致的了解;本章,我们对CountDownLatch进行学习。和ReadWriteLock.ReadLock一样,CountDownLatch的本质也是一个"共享锁"。本章的内容包括:
CountDownLatch简介
CountDownLatch数据结构
CountDownLatch源码分析(基于JDK1.7.0_40)
CountDownLatch示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533887.html

CountDownLatch简介

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

CountDownLatch和CyclicBarrier的区别
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
关于CyclicBarrier的原理,后面一章再来学习。
CountDownLatch函数列表

CountDownLatch(int count)
构造一个用给定计数初始化的 CountDownLatch。

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
void await()
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
boolean await(long timeout, TimeUnit unit)
// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()
// 返回标识此锁存器及其状态的字符串。
String toString()

 

CountDownLatch数据结构

CountDownLatch的UML类图如下:

CountDownLatch的数据结构很简单,它是通过"共享锁"实现的。它包含了sync对象,sync是Sync类型。Sync是实例类,它继承于AQS。

 

CountDownLatch完整源码(基于JDK1.7.0_40)

CountDownLatch是通过“共享锁”实现的。下面,我们分析CountDownLatch中3个核心函数: CountDownLatch(int count), await(), countDown()。

1. CountDownLatch(int count)
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

说明:该函数是创建一个Sync对象,而Sync是继承于AQS类。Sync构造函数如下:

Sync(int count) {
    setState(count);
}

setState()在AQS中实现,源码如下:

protected final void setState(long newState) {
    state = newState;
}

说明:在AQS中,state是一个private volatile long类型的对象。对于CountDownLatch而言,state表示的”锁计数器“。CountDownLatch中的getCount()最终是调用AQS中的getState(),返回的state对象,即”锁计数器“。

2. await()
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

说明:该函数实际上是调用的AQS的acquireSharedInterruptibly(1);

AQS中的acquireSharedInterruptibly()的源码如下:

public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

说明:acquireSharedInterruptibly()的作用是获取共享锁。
如果当前线程是中断状态,则抛出异常InterruptedException。否则,调用tryAcquireShared(arg)尝试获取共享锁;尝试成功则返回,否则就调用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。

tryAcquireShared()在CountDownLatch.java中被重写,它的源码如下:

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

说明:tryAcquireShared()的作用是尝试获取共享锁。
如果"锁计数器=0",即锁是可获取状态,则返回1;否则,锁是不可获取状态,则返回-1。

private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

说明:
(01) addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。关于Node和CLH在"Java多线程系列--“JUC锁”03之 公平锁(一)"已经详细介绍过,这里就不再重复说明了。
(02) node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
(03) shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
(04) 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函数在"Java多线程系列--“JUC锁”03之 公平锁(一)"中介绍过,这里也就不再详细说明了。

3. countDown()

public void countDown() {
sync.releaseShared(1);
}

说明:该函数实际上调用releaseShared(1)释放共享锁。

releaseShared()在AQS中实现,源码如下:

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。
它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

tryReleaseShared()在CountDownLatch.java中被重写,源码如下:

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取“锁计数器”的状态
int c = getState();
if (c == 0)
return false;
// “锁计数器”-1
int nextc = c-1;
// 通过CAS函数进行赋值。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

说明:tryReleaseShared()的作用是释放共享锁,将“锁计数器”的值-1。

总结:CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count给线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!

以上,就是CountDownLatch的实现原理。

CountDownLatch的使用示例

下面通过CountDownLatch实现:"主线程"等待"5个子线程"全部都完成"指定的工作(休眠1000ms)"之后,再继续运行。

 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 
 public class CountDownLatchTest1 {
 
     private static int LATCH_SIZE = 5;
     private static CountDownLatch doneSignal;
     public static void main(String[] args) {
 
         try {
             doneSignal = new CountDownLatch(LATCH_SIZE);
 
             // 新建5个任务
             for(int i=0; i<LATCH_SIZE; i++)
                 new InnerThread().start();
 
             System.out.println("main await begin.");
             // "主线程"等待线程池中5个任务的完成
             doneSignal.await();
 
             System.out.println("main await finished.");
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 
     static class InnerThread extends Thread{
         public void run() {
             try {
                 Thread.sleep(1000);
                 System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
                 // 将CountDownLatch的数值减1
                 doneSignal.countDown();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     }
 }

运行结果:

main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.

结果说明:主线程通过doneSignal.await()等待其它线程将doneSignal递减至0。其它的5个InnerThread线程,每一个都通过doneSignal.countDown()将doneSignal的值减1;当doneSignal为0时,main被唤醒后继续执行。

Java 理论与实践 流行的原子

在 JDK 5.0 之前,如果不使用本机代码,就不能用 Java 语言编写无等待、无锁定的算法。在 java.util.concurrent 中添加原子变量类之后,这种情况发生了变化。请跟随并行专家 Brian Goetz 一起,了解这些新类如何使用 Java 语言开发高度可伸缩的无阻塞算法。您可以在本文的 论坛中与作者或其他读者共享您对本文的看法。(也可以通过单击文章顶部或者底部的 讨论链接来访问讨论。)

十五年前,多处理器系统是高度专用系统,要花费数十万美元(大多数具有两个到四个处理器)。现在,多处理器系统很便宜,而且数量很多,几乎每个主要微处理器都内置了多处理支持,其中许多系统支持数十个或数百个处理器。

要使用多处理器系统的功能,通常需要使用多线程构造应用程序。但是正如任何编写并发应用程序的人可以告诉你的那样,要获得好的硬件利用率,只是简单地在多个线程中分割工作是不够的,还必须确保线程确实大部分时间都在工作,而不是在等待更多的工作,或等待锁定共享数据结构。

问题:线程之间的协调

如果线程之间 需要协调,那么几乎没有任务可以真正地并行。以线程池为例,其中执行的任务通常相互独立。如果线程池利用公共工作队 列,则从工作队列中删除元素或向工作队列添加元素的过程必须是线程安全的,并且这意味着要协调对头、尾或节点间链接指针所进行的访问。正是这种协调导致了 所有问题。

标准方法:锁定

在 Java 语言中,协调对共享字段的访问的传统方法是使用同步,确保完成对共享字段的所有访问,同时具有适当的锁定。通过同步,可以确定(假设类编写正确)具有保护 一组给定变量的锁定的所有线程都将拥有对这些变量的独占访问权,并且以后其他线程获得该锁定时,将可以看到对这些变量进行的更改。弊端是如果锁定竞争太厉 害(线程常常在其他线程具有锁定时要求获得该锁定),会损害吞吐量,因为竞争的同步非常昂贵。(Public Service Announcement:对于现代 JVM 而言,无竞争的同步现在非常便宜。

基于锁定的算法的另一个问题是:如果延迟具有锁定的线程(因为页面错误、计划延迟或其他意料之外的延迟),则 没有要求获得该锁定的线程可以继续运行。

还可以使用可变变量来以比同步更低的成本存储共享变量,但它们有局限性。虽然可以保证其他变量可以立即看到对可变变量的写入,但无法呈现原子操作的读-修改-写顺序,这意味着(比如说)可变变量无法用来可靠地实现互斥(互斥锁定)或计数器。

使用锁定实现计数器和互斥

假如开发线程安全的计数器类,那么这将暴露 get()increment()decrement() 操作。清单 1 显示了如何使用锁定(同步)实现该类的例子。注意所有方法,甚至需要同步 get(),使类成为线程安全的类,从而确保没有任何更新信息丢失,所有线程都看到计数器的最新值。

清单 1. 同步的计数器类
public class SynchronizedCounter {
    private int value;
    public synchronized int getValue() { return value; }
    public synchronized int increment() { return ++value; }
    public synchronized int decrement() { return --value; }
}

increment()decrement() 操作是原子的读-修改-写操作,为了安全实现计数器,必须使用当前值,并为其添加一个值,或写出新值,所有这些均视为一项操作,其他线程不能打断它。否 则,如果两个线程试图同时执行增加,操作的不幸交叉将导致计数器只被实现了一次,而不是被实现两次。(注意,通过使值实例变量成为可变变量并不能可靠地完 成这项操作。)

许多并发算法中都显示了原子的读-修改-写组合。清单 2 中的代码实现了简单的互斥, acquire() 方法也是原子的读-修改-写操作。要获得互斥,必须确保没有其他人具有该互斥( curOwner = Thread.currentThread()),然后记录您拥有该互斥的事实( curOwner = Thread.currentThread()),所有这些使其他线程不可能在中间出现以及修改 curOwner field

清单 2. 同步的互斥类
public class SynchronizedMutex {
    private Thread curOwner = null;
    public synchronized void acquire() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        while (curOwner != null) 
            wait();
        curOwner = Thread.currentThread();
    }
    public synchronized void release() {
        if (curOwner == Thread.currentThread()) {
            curOwner = null;
            notify();
        } else
            throw new IllegalStateException("not owner of mutex");
    }
}

清单 1 中的计数器类可以可靠地工作,在竞争很小或没有竞争时都可以很好地执行。然而,在竞争激烈时,这将大大损害性能,因为 JVM 用了更多的时间来调度线程,管理竞争和等待线程队列,而实际工作(如增加计数器)的时间却很少。您可以回想 上月专栏中的图,该图显示了一旦多个线程使用同步竞争一个内置监视器,吞吐量将如何大幅度下降。虽然该专栏说明了新的 ReentrantLock 类如何可以更可伸缩地替代同步,但是对于一些问题,还有更好的解决方法。

锁定问题

使用锁定,如果一个线程试图获取其他线程已经具有的锁定,那么该线程将被阻塞,直到该锁定可用。此方法具有一些明显的缺点,其中包括当线程被阻塞来等待锁定时,它无法进行其他任何操作。如果阻塞的线程是高优先级的任务,那么该方案可能造成非常不好的结果(称为 优先级倒置的危险)。

使用锁定还有一些其他危险,如死锁(当以不一致的顺序获得多个锁定时会发生死锁)。甚至没有这种危险,锁定也仅是相对的粗粒度协调机制,同样非常适合管理 简单操作,如增加计数器或更新互斥拥有者。如果有更细粒度的机制来可靠管理对单独变量的并发更新,则会更好一些;在大多数现代处理器都有这种机制。

硬件同步原语

如前所述,大多数现代处理器都包含对多处理的支持。当然这种支持包括多处理器可以共享外部设备和主内存,同时它通常还包括对指令系统的增加来支持多处理的特殊要求。特别是,几乎每个现代处理器都有通过可以检测或阻止其他处理器的并发访问的方式来更新共享变量的指令。

比较并交换 (CAS)

支持并发的第一个处理器提供原子的测试并设置操作,通常在单位上运行这项操作。现在的处理器(包括 Intel 和 Sparc 处理器)使用的最通用的方法是实现名为 比较并转换或 CAS 的原语。(在 Intel 处理器中,比较并交换通过指令的 cmpxchg 系列实现。PowerPC 处理器有一对名为“加载并保留”和“条件存储”的指令,它们实现相同的目地;MIPS 与 PowerPC 处理器相似,除了第一个指令称为“加载链接”。)

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无 论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”

通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。

类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法可以对该操作重新计算。清单 3 说明了 CAS 操作的行为(而不是性能特征),但是 CAS 的价值是它可以在硬件中实现,并且是极轻量级的(在大多数处理器中):

清单 3. 说明比较并交换的行为(而不是性能)的代码
public class SimulatedCAS {
     private int value;

     public synchronized int getValue() { return value; }

	public synchronized int compareAndSwap(int expectedValue, int newValue) {
         int oldValue = value;
         if (value == expectedValue)
             value = newValue;
         return oldValue;
     }
}

使用 CAS 实现计数器

基于 CAS 的并发算法称为 无锁定算法,因为线程不必再等待锁定(有时称为互斥或关键部分,这取决于线程平台的术语)。无论 CAS 操作成功还是失败,在任何一种情况中,它都在可预知的时间内完成。如果 CAS 失败,调用者可以重试 CAS 操作或采取其他适合的操作。清单 4 显示了重新编写的计数器类来使用 CAS 替代锁定:

清单 4. 使用比较并交换实现计数器
public class CasCounter {
    private SimulatedCAS value;
    public int getValue() {
        return value.getValue();
    }
    public int increment() {
        int oldValue = value.getValue();
        while (value.compareAndSwap(oldValue, oldValue + 1) != oldValue)
            oldValue = value.getValue();
        return oldValue + 1;
    }
}

无锁定且无等待算法

如果每个线程在其他线程任意延迟(或甚至失败)时都将持续进行操作,就可以说该算法是 无等待的。与此形成对比的是, 无锁定算法要求仅 某个线程总是执行操作。(无等待的另一种定义是保证每个线程在其有限的步骤中正确计算自己的操作,而不管其他线程的操作、计时、交叉或速度。这一限制可以是系统中线程数的函数;例如,如果有 10 个线程,每个线程都执行一次 CasCounter.increment() 操作,最坏的情况下,每个线程将必须重试最多九次,才能完成增加。)

再过去的 15 年里,人们已经对无等待且无锁定算法(也称为 无阻塞算法)进行了大量研究,许多人通用数据结构已经发现了无阻塞算法。无阻塞算法被广泛用于操作系统和 JVM 级别,进行诸如线程和进程调度等任务。虽然它们的实现比较复杂,但相对于基于锁定的备选算法,它们有许多优点:可以避免优先级倒置和死锁等危险,竞争比较 便宜,协调发生在更细的粒度级别,允许更高程度的并行机制等等。

原子变量类

在 JDK 5.0 之前,如果不使用本机代码,就不能用 Java 语言编写无等待、无锁定的算法。在 java.util.concurrent.atomic 包中添加原子变量类之后,这种情况才发生了改变。所有原子变量类都公开比较并设置原语(与比较并交换类似),这些原语都是使用平台上可用的最快本机结构(比较并交换、加载链接/条件存储,最坏的情况下是旋转锁)来实现的。 java.util.concurrent.atomic 包中提供了原子变量的 9 种风格( AtomicIntegerAtomicLongAtomicReferenceAtomicBoolean;原子整型;长型;引用;及原子标记引用和戳记引用类的数组形式,其原子地更新一对值)。

原子变量类可以认为是 volatile 变量的泛化,它扩展了可变变量的概念,来支持原子条件的比较并设置更新。读取和写入原子变量与读取和写入对可变变量的访问具有相同的存取语义。

虽然原子变量类表面看起来与清单 1 中的 SynchronizedCounter 例子一样,但相似仅是表面的。在表面之下,原子变量的操作会变为平台提供的用于并发访问的硬件原语,比如比较并交换。

更细粒度意味着更轻量级

调整具有竞争的并发应用程序的可伸缩性的通用技术是降低使用的锁定对象的粒度,希望更多的锁定请求从竞争变为不竞争。从锁定转换为原子变量可以获得相同的结果,通过切换为更细粒度的协调机制,竞争的操作就更少,从而提高了吞吐量。

ABA 问题

因为 在更改 V 之前,CAS 主要询问“V 的值是否仍为 A”,所以在第一次读取 V 以及对 V 执行 CAS 操作之前,如果将值从 A 改为 B,然后再改回 A,会使基于 CAS 的算法混乱。在这种情况下,CAS 操作会成功,但是在一些情况下,结果可能不是您所预期的。(注意, 清单 1清单 2 中的计数器和互斥例子不存在这个问题,但不是所有算法都这样。)这类问题称为 ABA 问题,通常通过将标记或版本编号与要进行 CAS 操作的每个值相关联,并原子地更新值和标记,来处理这类问题。 AtomicStampedReference 类支持这种方法。

java.util.concurrent 中的原子变量

无论是直接的还是间接的,几乎 java.util.concurrent 包中的所有类都使用原子变量,而不使用同步。类似 ConcurrentLinkedQueue 的类也使用原子变量直接实现无等待算法,而类似 ConcurrentHashMap 的类使用 ReentrantLock 在需要时进行锁定。然后, ReentrantLock 使用原子变量来维护等待锁定的线程队列。

如果没有 JDK 5.0 中的 JVM 改进,将无法构造这些类,这些改进暴露了(向类库,而不是用户类)接口来访问硬件级的同步原语。然后,java.util.concurrent 中的原子变量类和其他类向用户类公开这些功能。

使用原子变量获得更高的吞吐量

上月,我介绍了 ReentrantLock 如何相对于同步提供可伸缩性优势,以及构造通过伪随机数生成器模拟旋转骰子的简单、高竞争示例基准。我向您显示了通过同步、 ReentrantLock 和公平 ReentrantLock 来进行协调的实现,并显示了结果。本月,我将向该基准添加其他实现,使用 AtomicLong 更新 PRNG 状态的实现。

清单 5 显示了使用同步的 PRNG 实现和使用 CAS 备选实现。注意,要在循环中执行 CAS,因为它可能会失败一次或多次才能获得成功,使用 CAS 的代码总是这样。

清单 5. 使用同步和原子变量实现线程安全 PRNG
public class PseudoRandomUsingSynch implements PseudoRandom {
    private int seed;
    public PseudoRandomUsingSynch(int s) { seed = s; }
    public synchronized int nextInt(int n) {
        int s = seed;
        seed = Util.calculateNext(seed);
        return s % n;
    }
}
public class PseudoRandomUsingAtomic implements PseudoRandom {
    private final AtomicInteger seed;
    public PseudoRandomUsingAtomic(int s) {
        seed = new AtomicInteger(s);
    }
    public int nextInt(int n) {
        for (;;) {
            int s = seed.get();
            int nexts = Util.calculateNext(s);
            if (seed.compareAndSet(s, nexts))
                return s % n;
        }
    }
}

下面图 1 和图 2 中的图与上月那些图相似,只是为基于原子的方法多添加了一行。这些图显示了在 8-way Ultrasparc3 和单处理器 Pentium 4 上使用不同数量线程的随机发生的吞吐量(以每秒转数为单位)。测试中的线程数不是真实的;这些线程所表现的竞争比通常多得多,所以它们以比实际程序中低得 多的线程数显示了 ReentrantLock 与原子变量之间的平衡。您将看到,虽然 ReentrantLock 拥有比同步更多的优点,但相对于 ReentrantLock,原子变量提供了其他改进。(因为在每个工作单元中完成的工作很少,所以下图可能无法完全地说明与 ReentrantLock 相比,原子变量具有哪些可伸缩性优点。)

图 1. 8-way Ultrasparc3 中同步、ReentrantLock、公平 Lock 和 AtomicLong 的基准吞吐量

8-way Ultrasparc3 吞吐量

图 2. 单处理器 Pentium 4 中的同步、ReentrantLock、公平 Lock 和 AtomicLong 的基准吞吐量

Uniprocessor Pentium4 吞吐量大多数用户都不太可能使用原子变量自己开发无阻塞算法 — 他们更可能使用 java.util.concurrent 中提供的版本,如 ConcurrentLinkedQueue。但是万一您想知道对比以前 JDK 中的相类似的功能,这些类的性能是如何改进的,可以使用通过原子变量类公开的细粒度、硬件级别的并发原语。

开发人员可以直接将原子变量用作共享计数器、序号生成器和其他独立共享变量的高性能替代,否则必须通过同步保护这些变量。

结束语

JDK 5.0 是开发高性能并发类的巨大进步。通过内部公开新的低级协调原语,和提供一组公共原子变量类,现在用 Java 语言开发无等待、无锁定算法首次变为可行。然后, java.util.concurrent 中的类基于这些低级原子变量工具构建,为它们提供比以前执行相似功能的类更显著的可伸缩性优点。虽然您可能永远不会直接使用原子变量,还是应该为它们的存在而欢呼。

ABA 问题

因为在更改 V 之前,CAS 主要询问“V 的值是否仍为 A”,所以在第一次读取 V 以及对 V 执行 CAS 操作之前,如果将值从 A 改为 B,然后再改回 A,会使基于 CAS 的算法混乱。在这种情况下,CAS 操作会成功,但是在一些情况下,结果可能不是您所预期的。(注意, 清单 1清单 2 中的计数器和互斥例子不存在这个问题,但不是所有算法都这样。)这类问题称为 ABA 问题,通常通过将标记或版本编号与要进行 CAS 操作的每个值相关联,并原子地更新值和标记,来处理这类问题。 AtomicStampedReference 类支持这种方法。

 

参考资料

 

来源:https://www.ibm.com/developerworks/cn/java/j-jtp11234/

用ScheduledThreadPoolExecutor替换Timer定时执行任务

项目需要 每天几点 每周周几几点 每月几号几点定时执行任务

一。 用timer缺点非常大

Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

但是Timer和TimerTask存在一些缺陷:

1:Timer只创建了一个线程。当你的任务执行的时间超过设置的延时时间将会产生一些问题。

2:Timer创建的线程没有处理异常,因此一旦抛出非受检异常,该线程会立即终止。

二。 用ScheduledThreadPoolExecutor代替传统的Timer

JDK 5.0以后推荐使用ScheduledThreadPoolExecutor。该类属于Executor Framework,它除了能处理异常外,还可以创建多个线程解决上面的问题。

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类;

ThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。

一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。

以下为测试代码:

package cn.iigrowing.jobs.web.action;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TaskTest {
	static ScheduledThreadPoolExecutor stpe = null;

	static int index;

	/**
	 * @param args
	 *            the command line arguments
	 */
	public static void main(String[] args) {
		// TODO code application logic here
		// 构造一个ScheduledThreadPoolExecutor对象,并且设置它的容量为5个
		stpe = new ScheduledThreadPoolExecutor(5);
		MyTask task = new MyTask();
		// 隔2秒后开始执行任务,并且在上一次任务开始后隔一秒再执行一次;
		// stpe.scheduleWithFixedDelay(task, 2, 1, TimeUnit.SECONDS);
		// 隔6秒后执行一次,但只会执行一次。
		stpe.schedule(task, 6, TimeUnit.SECONDS);
	}

	private static String getTimes() {
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss E");
		Date date = new Date();
		date.setTime(System.currentTimeMillis());
		return format.format(date);
	}

	private static class MyTask implements Runnable {

		@Override
		public void run() {
			index++;
			System.out.println("2= " + getTimes() + " " + index);
			if (index >= 10) {
				stpe.shutdown();
				if (stpe.isShutdown()) {
					System.out.println("停止了????");
				}
			}
		}
	}
}

参考文章
ScheduledExecutorService定时执行任务
用ScheduledThreadPoolExecutor替换Timer定时执行任务
几种任务调度的 Java 实现方法与比较
在android中,经常用到的定时器主要有以下几种实现

非阻塞算法在并发容器中的实现

非阻塞算法在 Java 中的应用越来越广泛 , ConcurrentLinkedQueue 是 java. concurrent 包中基于非阻塞算法实现的并发容器的典范。通过本文,您将了解非阻塞算法的工作原理及其在 ConcurrentLinkedQueue 中的具体实现机制。

简介

非阻塞算法在更细粒度的层面协调争用,它比传统的锁有更高的并发性。随着非阻塞算法在 Java 中的应用越来越广泛,java.concurrent 包中用非阻塞算法实现的并发容器也越来越多,ConcurrentLinkedQueue 就是其中的一个重要成员。鉴于 ConcurrentLinkedQueue 的非阻塞算法实现在并发容器中具有代表性,本文将结合 JDK Update23 的源代码来分析它在当前的实现。由于非阻塞算法本身比较复杂,阅读本文的读者需要对 CAS 原子指令和非阻塞同步机制有所了解。本文分为两部分,第一部分主要是对非阻塞算法相关理论知识的简介,第二部分结合 ConcurrentLinkedQueue 的源代码,来探索非阻塞算法的具体实现。希望通过本文,能够有助于读者理解非阻塞算法在并发容器中的工作原理与具体实现机制。

非阻塞算法相关技术简介

为了便于读者更好的理解本文,首先让我们来对非阻塞算法相关的理论知识做个简单的了解,更详细的论述请参阅文中提及的相关参考文献。

Java 的多线程同步机制

在现代的多处理器系统中,提高程序的并行执行能力是有效利用 CPU 资源的关键。为了有效协调多线程间的并发访问,必须采用适当的同步机制来协调竞争。当前常用的多线程同步机制可以分为下面三种类型:

  • volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。
  • CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。
  • 内部锁和显式锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。

多处理器系统对并发的支持

现 代的多处理器系统大多提供了特殊的指令来管理对共享数据的并发访问,这些指令能实现原子化的读 - 改 - 写操作。现代典型的多处理器系统通常支持两种同步原语(机器级别的原子指令):CAS 和 LL/SC。Intel,AMD 和 SPARC 的多处理器系统支持“比较并交换”(compare-and-swap,CAS)指令。IBM PowerPC,Alpha AXP,MISP 和 ARM 的多处理器系统支持“加载链接 / 存储条件”(load-linked/store-conditional,LL/SC)指令。

JDK 为 concurrent.atomic 包中的原子类提供了 compareAndSet() 方法,compareAndSet() 方法使用上面这些机器级别的原子指令来原子化的更新值。java. concurrent 包中的这些原子类,为用非阻塞算法实现并发容器打下了基础。

关于 CAS 原子指令,感兴趣的读者可以参阅参考文献 1 的 15.2 章和参考文献 3 的附录 B8。关于 CAS 原子指令在不同多处理器上的有关细节,可以参阅参考文献 4 的“Multiprocessors”部分。

非阻塞算法

一个线程的失败和挂起不会引起其他些线程的失败和挂起,这样的算法称为非阻塞算法。非阻塞算法通过使用底层机器级别的原子指令来取代锁,从而保证数据在并发访问下的一致性。

从 Amdahl 定律我们可以知道,要想提高并发性,就应该尽量使串行部分达到最大程度的并行;也就是说:最小化串行代码的粒度是提高并发性能的关键。

与锁相比,非阻塞算法在更细粒度(机器级别的原子指令)的层面协调多线程间的竞争。它使得多个线程在竞争相同资源时不会发生阻塞,它的并发性与锁相比有了质的提高;同时也大大减少了线程调度的开销。同时,由于几乎所有的同步原语都只能对单个变量进行操作,这个限制导致非阻塞算法的设计和实现非常复杂。

关于非阻塞算法,感兴趣的读者可以参阅参考文献 1 的 15.4 章。关于 Amdahl 定律,感兴趣的读者可以参阅参考文献 1 的 11.2 章和参考文献 3 的 1.5 章。

非阻塞算法实现简述

基于非阻塞算法实现的并发容器

在 探索 ConcurrentLinkedQueue 非阻塞算法的具体实现机制之前,首先让我们来了解一下 JDK 中基于非阻塞算法实现的并发容器。在 JDKUpdate23 的 util.concurrent 包中,基于非阻塞算法实现的并发容器包括:ConcurrentLinkedQueue,SynchronousQueue,Exchanger 和 ConcurrentSkipListMap。ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,本文接下来将结合 JDK 源代码,来探索它的非阻塞算法的具体实现机制。SynchronousQueue 是一个没有容量的阻塞队列,它使用双重数据结构 来实现非阻塞算法。Exchanger 是一个能对元素进行配对和交换的交换器。它使用 消除 技术来实现非阻塞算法 ConcurrentSkipListMap 是一个可以根据 Key 进行排序的可伸缩的并发 Map。

关于双重数据结构,感兴趣的读者可以参阅参考文献 3 的 10.7 章,关于消除技术,感兴趣的读者可以参阅参考文献 3 的第 11 章。

ConcurrentLinkedQueue 的非阻塞算法简述

本 文接下来将在分析 ConcurrentLinkedQueue 源代码实现的过程中,穿插讲解非阻塞算法的具体实现。为了便于读者理解本文,首先让我们对它的实现机制做个全局性的简述。 ConcurrentLinkedQueue 的非阻塞算法实现可概括为下面 5 点:

  1. 使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。
  2. head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。
  3. 由于队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式来维护非阻塞算法的正确性。
  4. 以批处理方式来更新 head/tail,从整体上减少入队 / 出队操作的开销。
  5. 为了有利于垃圾收集,队列使用特有的 head 更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略。

ConcurrentLinkedQueue 有机整合了上述 5 点来实现非阻塞算法。由于三个不变式会从全局来约束非阻塞算法,所以在开始分析源代码之前,让我们首先来了解它。

不变式

在后面的源代码分析中,我们将会看到队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式 ( 基本不变式,head 的不变式和 tail 的不变式 ),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性。

不变式:并发对象需要一直保持的特性。不变式是并发对象的各个方法之间必须遵守的“契约”,每个方法在调用前和调用后都必须保持不变式。采用不变式,就可以隔离的分析每个方法,而不用考虑它们之间所有可能的交互。

基本不变式

在执行方法之前和之后,队列必须要保持的不变式:

  • 当入队插入新节点之后,队列中有一个 next 域为 null 的(最后)节点。
  • 从 head 开始遍历队列,可以访问所有 item 域不为 null 的节点。

head 的不变式和可变式

在执行方法之前和之后,head 必须保持的不变式:

  • 所有“活着”的节点(指未删除节点),都能从 head 通过调用 succ() 方法遍历可达。
  • head 不能为 null。
  • head 节点的 next 域不能引用到自身。

在执行方法之前和之后,head 的可变式:

  • head 节点的 item 域可能为 null,也可能不为 null。
  • 允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。

tail 的不变式和可变式

在执行方法之前和之后,tail 必须保持的不变式:

  • 通过 tail 调用 succ() 方法,最后节点总是可达的。
  • tail 不能为 null。

在执行方法之前和之后,tail 的可变式:

  • tail 节点的 item 域可能为 null,也可能不为 null。
  • 允许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
  • tail 节点的 next 域可以引用到自身。

在接下来的源代码分析中,在初始化 ConcurrentLinkedQueue 之后及调用入队 / 出队方法之前和之后,我们都会参照上面三个不变式来分析它们的正确性。

节点类实现及队列初始化

节点类定义

ConcurrentLinkedQueue 是用节点链接成的链表来实现的。首先,让我们来看看节点类的源代码:

清单 1. 节点类
 private static class Node<E> { 
        private volatile  E item;           // 声明为 volatile 型
        private volatile  Node<E> next;    // 声明为 volatile 型

        Node(E item) {                       // 创建新节点
            lazySetItem(item);              // 惰性设置 item 域的值
         } 

        E getItem() { 
            return item; 
        } 

        boolean casItem(E cmp, E val) {   // 使用 CAS 指令设置 item 域的值
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 
        } 

        void setItem(E val) {  // 使用“volatile 写”的方式,设置 item 域的值
             item = val; 
        } 
        voidlazySetItem(E val) { //惰性设置 item 域的值
				 UNSAFE.putOrderedObject(this, itemOffset, val); 
        } 

        void lazySetNext(Node<E> val) {    // 惰性设置 next 域的值 
            UNSAFE.putOrderedObject(this, nextOffset, val); 
        } 

        Node<E> getNext() { 
            return next; 
        } 
        
                                                      //CAS 设置 next 域的值
        boolean casNext(Node<E> cmp, Node<E> val) { 
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
        } 

        private static final sun.misc.Unsafe UNSAFE=     // 域更新器
        sun.misc.Unsafe.getUnsafe(); 
        private static final long nextOffset=              //next 域的偏移量
        objectFieldOffset(UNSAFE, "next", Node.class); 
        private static final long itemOffset=              //item 域的偏移量
        objectFieldOffset(UNSAFE, "item", Node.class); 

    }

在 ConcurrentLinkedQueue 的实际应用中,会频繁分配大量生命周期短暂的节点对象。为了降低开销,Node 类的 item 域和 next 域被声明为普通的 volatile 类型。它们通过原子引用域更新器(AtomicReferenceFieldUpdater),使用反射来更新。关于原子引用域更新器,感兴趣的读者可以 参阅参考文献 1 的 15.4.3。

节点类型说明

为了便于读者理解本文,下面对文中涉及的不同类型的节点集中做个定义:

  • 有效节点:从 head 向后遍历可达的节点当中,item 域不为 null 的节点。
  • 无效节点:从 head 向后遍历可达的节点当中,item 域为 null 的节点。
  • 以删除节点:从 head 向后遍历不可达的节点。
  • 哨兵节点:链接到自身的节点(哨兵节点同时也是以删除节点)。
  • 头节点:队列中的第一个有效节点(如果有的话)。
  • 尾节点:队列中 next 域为 null 的节点(可以是无效节点)。

下面是不同类型节点的示意图:

图 1. 不同类型节点示意图

图 1. 不同类型节点示意图对 比 head 的不变式和 tail 的不变式可以看出,head 只能指向有效节点和无效节点,而 tail 可以指向任意节点,包括以删除节点和哨兵节点。在 ConcurrentLinkedQueue 中,入队时只能把新节点链接到尾节点的后面,出队时只能删除头节点。

队列初始化

接下来让我们来看看 ConcurrentLinkedQueue 初始化过程的源代码实现:

清单 2. 初始化
   // 创建一个 item 域为 null,next 域为 null 的伪节点
 private transient volatile Node<E> head = new Node<E>(null); 
    
 private transient volatile Node<E> tail = head; 

 public ConcurrentLinkedQueue() {}

当初始化一个 ConcurrentLinkedQueue 对象时,会创建一个 item 域为 null 和 next 域为 null 的伪节点,并让 head 和 tail 指向这个伪节点。下面是队列初始化之后的结构示意图:

图 2. 初始化状态结构图

图 2. 初始化状态结构图从上图我们可以看出,处于初始化状态的队列满足三个不变式。

批处理更新,更新 head 及向后推进

批处理更新 head/tail

为 了尽量减少执行 CAS 原子指令的次数,执行入队 / 出队操作时 , ConcurrentLinkedQueue 并不总是更新 head/tail。只有从 head/tail 到头 / 尾节点之间的“距离”达到变量 HOPS 指定的阀值,入队 / 出队操作才会更新它们。下面这行源代码定义了 HOPS 这个变量:

清单 3.HOPS 变量
// 更新 head/tail 的阀值
private static final int HOPS = 1;

以批处理方式更新减少了更新 head/tail 的次数(减少了执行 CAS 原子指令的次数),但额外的增加了遍历队列,寻找头 / 尾节点的开销(增加了读 volatile 变量的开销)。在当前大多数的处理器系统中,volatile 读操作的开销非常低,几乎和非 volatile 变量的读操作一样(见参考文献 2)。而执行一条 CAS 原子指令要消耗比普通加载或存储指令多得多的时钟周期。因为 CAS 原子指令的执行包含了内存屏障(关于内存屏障,感兴趣的读者可以参阅参考文献 4 的 Memory barriers 一章),防止乱序执行以及对各种编译器优化的抑制。因此以批处理方式更新 head/tail,从整体上减少了入队 / 出队操作的开销。

head 更新

为了有利于垃圾收集,ConcurrentLinkedQueue 在更新 head 指向新头结点后,会把旧头节点设置为哨兵节点。下面是更新 head 的源代码:

清单 4. 更新 head
final void updateHead(Node<E> h, Node<E> p) { 
         // 如果两个节点不相同,尝试用 CAS 指令原子更新 head 指向新头节点
         if (h != p && casHead(h, p)) 
             // 惰性设置旧头结点为哨兵节点
             h.lazySetNext (h); 
         }

下面通过一个示意图来理解已删除节点在队列中的状态:

图 3. 已删除节点状态示意图

已删除节点状态示意图在 上图中,假设开始时 head 指向 A 节点,然后连续执行了 4 次出队操作,删除 A,B,C,D 4 个节点。在出队 B 节点时,head 与头结点之间的距离达到变量 HOPS 指定的阀值。这触发执行 updateHead()方法:首先设置 head 指向 C 节点,然后设置 B 节点的 next 域指向自身。同样,在出队 D 节点时,重复同样的过程。由于 B 和 D 节点断开了以删除节点与队列的链接,这将有利于虚拟机回收这些以删除节点占用的内存空间。

向后推进

由于 tail 可以指向任意节点,所以从 tail 向后遍历寻找尾节点的过程中,可能会遇到哨兵节点。此时 succ() 方法会直接跳转到 head 指向的节点继续遍历。下面是 succ() 方法的源代码:

清单 5. 向后推进
final Node<E> succ(Node<E> p) { 
    Node<E> next = p.getNext(); 

 // 如果 p 节点的 next 域链接到自身(p 节点是哨兵节点)
 // 就跳转到 head,从 head 开始继续遍历 
         // 否则向后推进到下一个节点
         return (p == next) ? head : next; 
     }

从上面的源代码我们可以看出,如果向后推进过程中遇到哨兵节点,就跳转到 head,从 head 开始继续遍历;否则,就推进到下一个节点。

下面通过一个示意图来理解跳转动作的的执行过程:

图 4. 跳转动作示意图

跳转动作示意图上 图的队列当前处于 tail 滞后于 head 状态。假设现在执行入队操作,需要从 tail 开始向后遍历找到队列的尾节点。tail 开始时指向 A 节点,执行 succ() 方法向后推进到 B 节点。在 B 节点执行 succ() 方法时,由于 B 节点链接到自身,所以跳转到 head 指向的 E 节点继续遍历。下面对滞后与跳转做个总结:

  • 如上图所示,如果 head 落在 tail 的后面,队列就处于 tail 滞后于 head 状态。
  • 如果 tail 滞后于 head,从 tail 向后遍历过程中就会发生跳转动作。
  • 跳转动作确保从已删除节点向后遍历,可以到达所有的未删除节点。

入队操作

在 ConcurrentLinkedQueue 中,插入新节点时,不用考虑尾节点是否为有效节点,直接把新节点插入到尾节点的后面即可。由于 tail 可以指向任意节点,所以入队时必须先通过 tail 找到尾节点,然后才能执行插入操作。如果插入不成功(说明其他线程已经抢先插入了一个新的节点)就继续向后推进。重复上述迭代过程,直到插入成功为止。下 面是入队方法的源代码:

清单 6. 插入新节点
    public boolean offer(E e) { 
        if (e == null) throw new NullPointerException(); 
        Node<E> n = new Node<E>(e);     // 创建新节点
        retry: 
        for (;;) { 
            Node<E> t = tail; 
            Node<E> p = t; 
            for (int hops = 0; ; hops++) { 
                Node<E> next = succ(p);          //A 
                if (next != null) {                    //B 
                    if (hops > HOPS&& t != tail)    //B1 
                        continue retry;                 //B2 
                    p = next;                           //B3 
                } else if (p.casNext(null, n)) {        //C 
                    if (hops >= HOPS)         //C1 
                        casTail(t, n);                  //C2 
                    return true;                        //C3 
                } else {                                //D 
                    p = succ(p);                        //D1 
                } 
            } 
        } 
 }

插入新节点的源代码分析

offer() 方法使用非阻塞算法惯用的“循环尝试”的方式来执行:如果因其他线程干扰而 失败就重新尝试,直到成功为止。下面是关键代码的解释:

A: 找到 tail 的下一个节点 next。

B: 如果 next 不为 null。

B1:如果已经至少越过了两个节点,且 tail 被修改 (tail 被修改,说明其他线程向队列添加了新的节点,且更新 tail 成功 )。

B2:跳出内外两层循环,重新开始迭代(因为 tail 刚刚被其他线程更新了)。

B3:向后推进到下一个节点。

C: 如果当前节点为尾节点,使用 CAS 原子指令设置尾节点的 next 域指向新节点。

C1:如果已经至少越过了一个节点(此时,tail 至少滞后尾节点两个节点)。

C2:使用 CAS 原子指令更新 tail 指向这个新插入的节点。

C3:新节点以插入队列,不管更新 tail 是否成功,退出方法。

D: 如果向队尾插入新节点不成功(其他线程已经抢先在队尾插入了一个新节点)。

D1:向后推进到下一个节点。

队 列的入队方法包含两个步骤:添加新节点和更新 tail 指向这个新节点。这两个步骤分别对应代码分析的 C 和 C2。从代码中我们可以看到,这两个步骤都是用 CAS 原子指令来完成的。由于 ConcurrentLinkedQueue 允许队列处于不一致状态,所以这里的 C 和 C2 这两个步骤不必一起原子的执行。在 C 处添加新节点后,只有当 tail 与新添加节点之间的距离达到了 HOPS 指定的阀值,才会执行 C2 来更新 tail。

tail 在队列中的位置分析

根据 tail 的不变式和可变式,在执行入队操作前,tail 在队列中的位置共有三种可能:

  1. tail 指向尾节点。
  2. tail 节点指向非尾节点。
  3. tail 滞后于 head。

下面分别分析这三种情形,首先让我们看看第一种情形的示意图:

图 5.tail 指向尾节点

图 5.tail 指向尾节点开 始时,tail 指向 D 节点,首先寻找 D 节点的后继节点。由于 D 的后继节点为 null,所以插入新节点到 D 节点的后面。如果插入成功就退出方法;如果插入失败(说明其他线程刚刚插入了一个新节点),就向后推进到新插入的节点,然后重新开始迭代。下图是插入成功 后的示意图:

图 6.tail 指向尾节点,插入新节点成功

图 6.tail 指向尾节点,插入新节点成功在上图中,由于 tail 滞后于尾节点的节点数还没有达到 HOPS 指定的阈值,所以 tail 没有被更新。

下面,让我们看看第二种情形的结构示意图:

图 7.tail 指向非尾节点

图 7.tail 指向非尾节点开始时,tail 指向 C 节点。首先找到 C 的后继节点 D,然后向后推进到节点 D,后面代码执行路径与上面的“tail 指向尾节点 的代码执行路径相同。下图是插入成功后的结构示意图:

图 8.tail 指向非尾节点,插入新节点成功

图 8.tail 指向非尾节点,插入新节点成功上图中的 tail 更新了位置。因为在添加 E 节点后,tail 滞后的节点数达到了 HOPS 指定的阈值。这触发执行更新 tail 的 CAS 操作。

最后,让我们看看第三种情形的结构示意图:

图 9.tail 滞后于 head

图 9.tail 滞后于 head开 始时,tail 指向 A 节点。首先找到 A 的后继节点 B,然后向后推进到节点 B。由于 B 是哨兵节点,产生跳转动作,跳过 C 节点,从 head 指向的 D 节点开始继续向后遍历。后面的代码执行路径与“tail 指向非尾节点”相同。下面是成功插入一个新节点后的结构示意图:

图 10.tail 滞后于 head,插入新节点成功

图 10.tail 滞后于 head,插入新节点成功上图的 tail 更新了位置,因为 tail 滞后的节点数达到了 HOPS 指定的阈值,这触发执行更新 tail 的 CAS 操作。

从上面插入新节点后的三个结构示意图我们可以看出,执行入队操作后的队列依然满足三个不变式。

出队操作

在 ConcurrentLinkedQueue 中,出队操作从队列的头部删除第一个有效节点。根据 head 的不变式和可变式,head 可以指向无效节点,所以出队前必须先检查 head 是否指向有效节点。如果指向无效节点就要向后推进,直到找到第一个有效节点,然后再执行出队操作。下面是出队方法的源代码:

清单 7. 删除队列头节点
    public E poll() { 
        Node<E> h = head; 
        Node<E> p = h; 
        for (int hops = 0; ; hops++) { 
            E item = p.getItem();                                    //A 
            if (item != null && p.casItem(item, null)) {     //B 
                if (hops >= HOPS) {                        //C 
                    Node<E> q = p.getNext();                   //C1 
                    updateHead(h, (q != null) ? q : p);              //C2 
                } 
                return item;                                         //D 
            } 
            Node<E> next = succ(p);                            //E 
            if (next == null) {                                      //F 
                updateHead(h, p);                                    //G 
                break;                                               //H 
            } 
            p = next;                                                //I 
        } 
        return null; 
    }

删除头结点的代码分析

和 offer() 方法一样,poll() 方法也使用“循环尝试”的方式来执行。下面是对关键代码的解释:

A:获得当前节点 p 的 item 域的值。

B:如果当前节点是有效节点,且成功设置这个节点为无效节点。

C:如果迭代过程已经越过了不小于 1 个节点。

C1:取得后继结点 q。

C2:如果 q 不为 null,设置 head 指向后继节点 q;否则设置 head 指向当前节点 p(此时队列为空,只有一个伪节点 p)。

D:返回被移除节点 item 域的值。

E:向后推进到下一个节点 next(因为当前节点 p 是一个无效节点)。

F:如果 next 为 null。

G:设置 head 指向 p 节点(此时队列为空,只有一个伪节点 p)。

H:退出循环。

I:推进到下一个节点。

队 列的出队方法包含两个步骤:删除头节点和更新 head 指向新头节点。这两个步骤分别对应代码分析的 B 和 C2。这里对头节点的删除使用了一个小技巧:设置头节点的 item 域为 null,即删除了它(虽然这个节点还在队列中,但它以是无效节点)。在代码中我们可以看到,这两个步骤都使用 CAS 原子指令来完成。由于 ConcurrentLinkedQueue 允许队列处于不一致状态,所以这里的 B 和 C2 这两个步骤不必一起原子的执行。在 B 处删除头节点后,只有当 head 与新头节点之间的距离达到了 HOPS 指定的阀值,才会执行 C2 来更新 head。

head 在队列中的位置分析

根据 head 的不变式和可变式,在执行出队操作前,head 在队列中的位置共有两种可能:

  1. head 指向有效节点。
  2. head 指向无效节点。

下面,让我们首先来看第一种情形的结构示意图:

图 11.head 指向有效节点

图 11.head 指向有效节点出 队时,首先取得 head 指向的 A 节点的 item 域的值,然后通过 CAS 设置 A 节点 item 域的值为 null。如果成功,由于此时越过的节点数为 0,所以直接返回 A 节点 item 域原有的值。如果不成功,说明其他线程已经抢先删除了该节点,此时向后推进到 B 节点。重复这个过程,直到成功删除一个节点;如果遍历完队列也没有删除成功,则返回 null。下面是成功删除后的结构示意图:

图 12. 成功删除 head 指向的有效节点

图 12. 成功删除 head 指向的有效节点在上图中,虽然 A 节点被设置成无效节点,但 head 依然指向它,因为删除操作越过的节点数还没有达到 HOPS 指定的阀值。

接下来,让我们来看看第二种情形的结构示意图:

图 13.head 指向无效节点

图 13.head 指向无效节点首 先获得 head 指向节点的 item 域的值,由于为 null,所以向后推进到 B 节点。获得 B 节点 item 域的值后,通过 CAS 设置该值为 null。如果成功,由于已经达到 HOPS 指定的阀值,触发执行 head 更新。如果不成功(说明其他线程已经抢先删除了 B 节点),继续向后推进到 C 节点。重复这个过程,直到删除一个有效节点。如果遍历完队列也没有删除成功,则返回 null。下图是成功删除后的结构示意图:

图 14.head 指向无效节点,成功删除

图 14.head 指向无效节点,成功删除从上图我们可以看到,在执行删除操作过程中,head 越过的节点数达到阀值,触发执行 head 的更新,使它指向 C 节点。

从上面删除头节点后的两个结构示意图可以看出,执行出队操作后的队列依然满足三个不变式。

总结

ConcurrentLinkedQueue 的非阻塞算法实现非常精巧,也非常复杂。它使用 CAS 原子指令来处理对数据的并发访问。同时,它允许队列处于不一致状态。这个特性分离了入队 / 出队操作中包含的两个需要一起原子执行的步骤,从而有效的缩小了入队 / 出队时的原子化(更新值的)范围为唯一变量。由于队列可能处于不一致状态,为此 ConcurrentLinkedQueue 使用三个不变式来维护非阻塞算法的正确性。

虽然我们不用自己去实现如此复杂的并发数据结构,但知晓它的工作原理与实现机制对于我们更好的使用它将很有帮助。

参考资料

学习

讨论

  • 加入 developerWorks 中文社区。查看开发人员推动的博客、论坛、组和维基,并与其他 developerWorks 用户交流。