集合框架 Queue篇(6)—LinkedBlockingQueue

来源:互联网

Queue

------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)

4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue,           (基于数组的并发阻塞队列)
6.LinkedBlockingQueue,        (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue,        (带优先级的无界阻塞队列)
9.SynchronousQueue                       (并发同步阻塞队列)
-----------------------------------------------------
LinkedBlockingQueue

采用对于的next构成链表的方式来存储对象。
由于读只操作队头,而写只操作队尾,这里巧妙地采用了两把锁,
对put和offer采用putLock,对take和poll采用takeLock,避免了读写时相互竞争锁的现象,
因此LinkedBlockingQueue在高并发读写操作都多的情况下,性能会比ArrayBlockingQueue好很多,
在遍历以及删除元素时则要把两把锁都锁住。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/**
* 链表节点node类结构
*/
static class Node<E> {
volatile E item;//volatile使得所有的write happen-befor read,保证了数据的可见性
Node<E> next;
Node(E x) { item = x; }
}

/** 队列容量,默认为Integer.MAX_VALUE*/
private final int capacity;

/** 用原子变量 表示当前元素的个数 */
private final AtomicInteger count = new AtomicInteger(0);

/** 表头节点 */
private transient Node<E> head;

/** 表尾节点 */
private transient Node<E> last;

/** 获取元素或删除元素时 要加的takeLock锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 获取元素 notEmpty条件 */
private final Condition notEmpty = takeLock.newCondition();

/** 插入元素时 要加putLock锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 插入时,要判满 */
private final Condition notFull = putLock.newCondition();

/**
* 唤醒等待的take操作,在put/offer中调用(因为这些操作中不会用到takeLock锁)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* 唤醒等待插入操作,在take/poll中调用.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

/**
* 插入到尾部
*/
private void insert(E x) {
last = last.next = new Node<E>(x);
}

/**
* 获取并移除头元素
*/
private E extract() {
Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;
return x;
}

/**
* 锁住两把锁,在remove,clear等方法中调用
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* 和fullyLock成对使用
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

/**
* 默认构造,容量为 Integer.MAX_VALUE
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
*指定容量的构造
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* 指定初始化集合的构造
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
for (E e : c)
add(e);
}

/**
* 通过原子变量,直接获得大小
*/
public int size() {
return count.get();
}

/**
*返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
*/
public int remainingCapacity() {
return capacity - count.get();
}

/**
* 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

/**
* 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
for (;;) {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

/**
*将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),
*在成功时返回 true,如果此队列已满,则返回 false。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

//获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}

x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

//获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

/**
* 从此队列移除指定元素的单个实例(如果存在)。
*/
public boolean remove(Object o) {
if (o == null) return false;
boolean removed = false;
fullyLock();
try {
Node<E> trail = head;
Node<E> p = head.next;
while (p != null) {
if (o.equals(p.item)) {
removed = true;
break;
}
trail = p;
p = p.next;
}
if (removed) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signalAll();
}
} finally {
fullyUnlock();
}
return removed;
}
……
}
--------------------------------------------------------

http://hi.baidu.com/yao1111yao/item/eb1ea344bc2bc52511ee1e6d

发表评论