集合框架 Queue篇(1)—ArrayDeque

来源:互联网

Queue

------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)

4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口
5.ArrayBlockingQueue,           (基于数组的并发阻塞队列)
6.LinkedBlockingQueue,        (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue,        (带优先级的无界阻塞队列)
9.SynchronousQueue                       (并发同步阻塞队列)

 

-----------------------------------------------------

队列是只能对头尾两个元素操作的数据结构,

单向队列只能从头remove(poll),从尾add(offer),

双端队列则头尾均可执行插入和删除操作。

------------------

看一下  Queue接口的API:

public interface Queue<E> extends Collection<E>

在处理元素前用于保存元素的collection。除了基本的Collection操作以外,队列还提供了插入、提取、和检查操作。每个方法都有两种形式:一种抛出异常(操作失败时),另一种返回一个特殊值(null或false,具体取决于操作)。插入操作的后一种形式是用于专门为有容量限制的 Queue 实现设计的;在大多数实现中,插入操作不会失败。

抛出异常返回特殊值插入add(e)offer(e)移除remove()poll()检查element()peek()

队列通常(但并非一定)以 FIFO(先进先出)的方式排序各个元素。不过优先级队列和 LIFO 队列(或堆栈)例外,前者根据提供的比较器或元素的自然顺序对元素进行排序,后者按 LIFO(后进先出)的方式对元素进行排序。无论使用哪种排序方式,队列的 都是调用 或 所移除的元素。在 FIFO 队列中,所有的新元素都插入队列的末尾。其他种类的队列可能使用不同的元素放置规则。每个 Queue 实现必须指定其顺序属性。

如果可能, 方法可插入一个元素,否则返回 false。这与 方法不同,该方法只能通过抛出未经检查的异常使添加元素失败。offer 方法设计用于正常的失败情况,而不是出现异常的情况,例如在容量固定(有界)的队列中。

和 方法可移除和返回队列的头。到底从队列中移除哪个元素是队列排序策略的功能,而该策略在各种实现中是不同的。remove()poll() 方法仅在队列为空时其行为有所不同:remove() 方法抛出一个异常,而 poll() 方法则返回 null

和 返回,但不移除,队列的头。

Queue 接口并未定义阻塞队列的方法,而这在并发编程中是很常见的。 接口定义了那些等待元素出现或等待队列中有可用空间的方法,这些方法扩展了此接口。

Queue 实现通常不允许插入 null 元素,尽管某些实现(如 )并不禁止插入 null。即使在允许 null 的实现中,也不应该将 null 插入到 Queue 中,因为 null 也用作 poll 方法的一个特殊返回值,表明队列不包含元素。

Queue 实现通常未定义 equalshashCode 方法的基于元素的版本,而是从 Object 类继承了基于身份的版本,因为对于具有相同元素但有不同排序属性的队列而言,基于元素的相等性并非总是定义良好的。

------------------------------------------------------------------------------------------

看一下 BlockingQueue(阻塞队列)的API:

public interface BlockingQueue<E> extends <E>

支持两个附加操作的 ,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。

BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(nullfalse,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

抛出异常特殊值阻塞超时插入add(e)offer(e)put(e)offer(e, time, unit)移除remove()poll()take()poll(time, unit)检查element()peek()不可用不可用

BlockingQueue 不接受 null 元素。试图 addputoffer 一个 null 元素时,某些实现会抛出 NullPointerExceptionnull 被用作指示 poll 操作失败的警戒值。

BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAllremoveAll没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

BlockingQueue 实质上 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-streampoison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

这个类型的队列, 个人理解主要在多线程中, 有个消息生产者和消费者或者 生产者和消费速度不匹配的情况。若是单独的一对一的生产者和消费者,采用函数调用或者rpc方式调用还是简单实用。

在真正的大型系统中, 可以采用消息系统实现, 主要可以为分布式系统, 避免单点故障, 也可以持久化消息等的,更方便

1. ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

2. LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

3. DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

4. PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

5. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

小结
BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

-------------------------------------------------------------------------------------------

看一下 Deque(双端队列)的API:

public interface Deque<E> extends <E>

一个线性 collection,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数 Deque 实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。

此接口定义在双端队列两端访问元素的方法。提供插入、移除和检查元素的方法。每种方法都存在两种形式:一种形式在操作失败时抛出异常,另一种形式返回一个特殊值(nullfalse,具体取决于操作)。插入操作的后一种形式是专为使用有容量限制的 Deque 实现设计的;在大多数实现中,插入操作不能失败。

下表总结了上述 12 种方法:

第一个元素(头部)最后一个元素(尾部)抛出异常特殊值抛出异常特殊值插入addFirst(e)offerFirst(e)addLast(e)offerLast(e)移除removeFirst()pollFirst()removeLast()pollLast()检查getFirst()peekFirst()getLast()peekLast()

此接口扩展了 接口。在将双端队列用作队列时,将得到 FIFO(先进先出)行为。将元素添加到双端队列的末尾,从双端队列的开头移除元素。从 Queue 接口继承的方法完全等效于 Deque 方法,如下表所示:

Queue 方法等效 Deque 方法add(e)addLast(e)offer(e)offerLast(e)remove()removeFirst()poll()pollFirst()element()getFirst()peek()peekFirst()

双端队列也可用作 LIFO(后进先出)堆栈。应优先使用此接口而不是遗留 类。在将双端队列用作堆栈时,元素被推入双端队列的开头并从双端队列开头弹出。堆栈方法完全等效于 Deque 方法,如下表所示:

堆栈方法等效 Deque 方法push(e)addFirst(e)pop()removeFirst()peek()peekFirst()

注意,在将双端队列用作队列或堆栈时, 方法同样正常工作;无论哪种情况下,都从双端队列的开头抽取元素。

此接口提供了两种移除内部元素的方法: 和 。

与 接口不同,此接口不支持通过索引访问元素。

虽然 Deque 实现没有严格要求禁止插入 null 元素,但建议最好这样做。建议任何事实上允许 null 元素的 Deque 实现用户最好 要利用插入 null 的功能。这是因为各种方法会将 null 用作特殊的返回值来指示双端队列为空。

Deque 实现通常不定义基于元素的 equalshashCode 方法,而是从 Object 类继承基于身份的 equalshashCode 方法。

-----------------------------------------------

ArrayDeque(数组双端队列)

ArrayDeque实现了Deque双端队列接口,大小可变的数组实现并且没有容量的限制(不过,容量应该<=2 ^ 30)。

//数组双端队列ArrayDeque的源码解析
public class ArrayDeque<E> extends AbstractCollection<E> implements Deque<E>, Cloneable, Serializable{
/**
* 存放队列元素的数组,数组的长度为“2的指数”
*/
private transient E[] elements;

/**
*队列的头部索引位置,(被remove()或pop()操作的位置),当为空队列时,首尾index相同
*/
private transient int head;

/**
* 队列的尾部索引位置,(被 addLast(E), add(E), 或 push(E)操作的位置).
*/
private transient int tail;

/**
* 队列的最小容量(大小必须为“2的指数”)
*/
private static final int MIN_INITIAL_CAPACITY = 8;

// ******  Array allocation and resizing utilities ******

/**
* 根据所给的数组长度,得到一个比该长度大的最小的2^p的真实长度,并建立真实长度的空数组
*/
private void allocateElements(int numElements) {
int initialCapacity = MIN_INITIAL_CAPACITY;
if (numElements >= initialCapacity) {
initialCapacity = numElements;
initialCapacity |= (initialCapacity >>>  1);
initialCapacity |= (initialCapacity >>>  2);
initialCapacity |= (initialCapacity >>>  4);
initialCapacity |= (initialCapacity >>>  8);
initialCapacity |= (initialCapacity >>> 16);
initialCapacity++;

if (initialCapacity < 0)   // Too many elements, must back off
initialCapacity >>>= 1;// Good luck allocating 2 ^ 30 elements
}
elements = (E[]) new Object[initialCapacity];
}

/**
* 当队列首尾指向同一个引用时,扩充队列的容量为原来的两倍,并对元素重新定位到新数组中
*/
private void doubleCapacity() {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = (E[])a;
head = 0;
tail = n;
}

/**
* 拷贝队列中的元素到新数组中
*/
private <T> T[] copyElements(T[] a) {
if (head < tail) {
System.arraycopy(elements, head, a, 0, size());
} else if (head > tail) {
int headPortionLen = elements.length - head;
System.arraycopy(elements, head, a, 0, headPortionLen);
System.arraycopy(elements, 0, a, headPortionLen, tail);
}
return a;
}

/**
* 默认构造队列,初始化一个长度为16的数组
*/
public ArrayDeque() {
elements = (E[]) new Object[16];
}

/**
* 指定元素个数的构造方法
*/
public ArrayDeque(int numElements) {
allocateElements(numElements);
}

/**
* 用一个集合作为参数的构造方法
*/
public ArrayDeque(Collection<? extends E> c) {
allocateElements(c.size());
addAll(c);
}

//插入和删除的方法主要是: addFirst(),addLast(), pollFirst(), pollLast()。
//其他的方法依赖于这些实现。
/**
* 在双端队列的前端插入元素,元素为null抛异常
*/
public void addFirst(E e) {
if (e == null)
throw new NullPointerException();
elements[head = (head - 1) & (elements.length - 1)] = e;
if (head == tail)
doubleCapacity();
}

/**
*在双端队列的末端插入元素,元素为null抛异常
*/
public void addLast(E e) {
if (e == null)
throw new NullPointerException();
elements[tail] = e;
if ( (tail = (tail + 1) & (elements.length - 1)) == head)
doubleCapacity();
}

/**
* 在前端插入,调用addFirst实现,返回boolean类型
*/
public boolean offerFirst(E e) {
addFirst(e);
return true;
}

/**
* 在末端插入,调用addLast实现,返回boolean类型
*/
public boolean offerLast(E e) {
addLast(e);
return true;
}

/**
* 删除前端,调用pollFirst实现
*/
public E removeFirst() {
E x = pollFirst();
if (x == null)
throw new NoSuchElementException();
return x;
}

/**
* 删除后端,调用pollLast实现
*/
public E removeLast() {
E x = pollLast();
if (x == null)
throw new NoSuchElementException();
return x;
}
//前端出对(删除前端)
public E pollFirst() {
int h = head;
E result = elements[h]; // Element is null if deque empty
if (result == null)
return null;
elements[h] = null;     // Must null out slot
head = (h + 1) & (elements.length - 1);
return result;
}
//后端出对(删除后端)
public E pollLast() {
int t = (tail - 1) & (elements.length - 1);
E result = elements[t];
if (result == null)
return null;
elements[t] = null;
tail = t;
return result;
}

/**
* 得到前端头元素
*/
public E getFirst() {
E x = elements[head];
if (x == null)
throw new NoSuchElementException();
return x;
}

/**
* 得到末端尾元素
*/
public E getLast() {
E x = elements[(tail - 1) & (elements.length - 1)];
if (x == null)
throw new NoSuchElementException();
return x;
}
public E peekFirst() {
return elements[head]; // elements[head] is null if deque empty
}

public E peekLast() {
return elements[(tail - 1) & (elements.length - 1)];
}

/**
* 移除此双端队列中第一次出现的指定元素(当从头部到尾部遍历双端队列时)。
*/
public boolean removeFirstOccurrence(Object o) {
if (o == null)
return false;
int mask = elements.length - 1;
int i = head;
E x;
while ( (x = elements[i]) != null) {
if (o.equals(x)) {
delete(i);
return true;
}
i = (i + 1) & mask;
}
return false;
}

/**
* 移除此双端队列中最后一次出现的指定元素(当从头部到尾部遍历双端队列时)。
*/
public boolean removeLastOccurrence(Object o) {
if (o == null)
return false;
int mask = elements.length - 1;
int i = (tail - 1) & mask;
E x;
while ( (x = elements[i]) != null) {
if (o.equals(x)) {
delete(i);
return true;
}
i = (i - 1) & mask;
}
return false;
}

// *** 队列方法(Queue methods) ***
/**
* add方法,添加到队列末端
*/
public boolean add(E e) {
addLast(e);
return true;
}
/**
* 同上
*/
public boolean offer(E e) {
return offerLast(e);
}
/**
* remove元素,删除队列前端
*/
public E remove() {
return removeFirst();
}
/**
* 弹出前端(出对,删除前端)
*/
public E poll() {
return pollFirst();
}
public E element() {
return getFirst();
}
public E peek() {
return peekFirst();
}

// *** 栈 方法(Stack methods) ***
public void push(E e) {
addFirst(e);
}
public E pop() {
return removeFirst();
}
private void checkInvariants() { ……    }
private boolean delete(int i) {   ……   }

// *** 集合方法(Collection Methods) ***
……

// *** Object methods ***
……
}

整体来说:1个数组,2个index(head 索引和tail索引)。实现比较简单,容易理解。

 

LinkedBlockingQueue和LinkedBlockingDeque分析

有两个比较相似的并发阻塞队列,LinkedBlockingQueue和LinkedBlockingDeque,两个都是队列,只不过前者只能一端出一端入,后者则可以两端同时出入,并且都是结构改变线程安全的队列。其实两个队列从实现思想上比较容易理解,有以下特点:

链表结构(动态数组)
通过ReentrantLock实现锁
利用Condition实现队列的阻塞等待,唤醒

以下将分开讲述LinkedBlockingQueue和LinkedBlockingDeque的基本特点及操作。
LinkedBlockingQueue

这是一个只能一端出一端如的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。先看它的结构

 

http://hi.baidu.com/yao1111yao/item/1a1346f65a50d9c8521c266d

发表评论