JDK并发工具类源码学习系列——PriorityBlockingQueue

PriorityBlockingQueue是一个基于优先级堆的无界的并发安全的优先级队列(FIFO),队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。

实现原理

PriorityBlockingQueue通过使用堆这种数据结构实现将队列中的元素按照某种排序规则进行排序,从而改变先进先出的队列顺序,提供开发者改变队列中元素的顺序的能力。队列中的元素必须是可比较的,即实现Comparable接口,或者在构建函数时提供可对队列元素进行比较的Comparator对象。

堆的介绍

由于PriorityBlockingQueue是基于堆的,所以这里简单介绍下堆的结构。堆是一种二叉树结构,堆的根元素是整个树的最大值或者最小值(称为大顶堆或者小顶堆),同时堆的每个子树都是满足堆的树结构。由于堆的顶部是最大值或者最小值,所以每次从堆获取数据都是直接获取堆顶元素,然后再将堆调整成堆结构。

更多关于堆的介绍请参考:数据结构系列——堆

结构介绍

PriorityBlockingQueue通过内部组合PriorityQueue的方式实现优先级队列(private final PriorityQueue q;),另外在外层通过ReentrantLock实现线程安全,同时通过Condition实现阻塞唤醒。

常用方法介绍

PriorityBlockingQueue继承自AbstractQueue,以及实现了BlockingQueue接口,是一个阻塞队列,主要方法:offer(E)/poll()/poll(long, TimeUnit)/take()/remove(Object)。下面我们结合源码堆这些方法进行深入分析。

offer(E)

入队操作。此处虽然PriorityBlockingQueue是阻塞队列,但是其并没有阻塞的入队方法,因为该队列是无界的,所以入队是不会阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();// 加锁
try {
// 通过PriorityQueue入队一个元素
boolean ok = q.offer(e);
assert ok;
// 唤醒等在notEmpty上的线程
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}

1
2
3
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}

offer()方法正如在结构介绍中提到的通过组合的方式,通过外部加锁内部直接调用PriorityQueue的offer()方法。所以主要的工作在PriorityQueue内部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
*@By Vicky:入队一个元素
*/

public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
// 内部使用数组保存队列的元素,所以如果队列的大小超过数组的长度,则需要进行扩容
// 扩容的标准是:<64扩大2倍,>=64则扩大1.5倍
if (i >= queue.length)
grow(i + 1);
size = i + 1;
// i==0表示队列目前没有元素,则直接将带插入元素添加到数组即可
if (i == 0)
queue[0] = e;
else
// 将带插入元素添加到队列的最后一个元素,然后自下而上调整堆
siftUp(i, e);
return true;
}

/**
*@By Vicky:自下而上调整堆
*/

private void siftUp(int k, E x) {
// 两者逻辑一样,只是采用的比较方式不同而已
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}

/**
*@By Vicky:自下而上调整堆
*/

private void siftUpUsingComparator(int k, E x) {
// 循环,直到根元素
while (k > 0) {
// 寻找k的父元素下标,固定规则,可参考博客:http://vickyqi.com/
int parent = (k - 1) >>> 1;
Object e = queue[parent];
// 如果x >= e,即子节点>=父节点,则直接退出循环
// 解释:自下而上一般出现在插入元素时调用,插入元素是插入到队列的最后,则需要将该元素调整到合适的位置
// 即从队列的最后往上调整堆,直到不小于其父节点为止,相当于冒泡
if (comparator.compare(x, (E) e) >= 0)
break;
// 如果当前节点<其父节点,则将其与父节点进行交换,并继续往上访问父节点
queue[k] = e;
k = parent;
}
queue[k] = x;
}

入队时通过调用ReentrantLock.lock()进行加锁,然后调用PriorityQueue.offer()方法进行入队操作,最后通过Condition.signal()唤醒等待其上的线程。PriorityQueue.offer()方法将元素插入到队列的最后,然后自上而下调整堆。文中代码都给出了注释,同时可参考博客:数据结构系列——堆进行详细的了解。

poll()和poll(long, TimeUnit),take()

出队操作。poll(long, TimeUnit)是poll()的阻塞版本,同时take()是无限阻塞版poll()(即无期限阻塞,直到获取到数据),通过Condition.awaitNanos()实现阻塞。三者实现主要逻辑相同,只是在等待时不同,这里主要介绍poll(long, TimeUnit)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @By Vicky:阻塞版的出队
*/

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 此处不同于其他非阻塞方法,调用了ReentrantLock的lockInterruptibly()方法,考虑了当前线程是否被打断
lock.lockInterruptibly();
try {
// 循环,直到获取到元素,或者到达等待时间
for (;;) {
// 从PriorityQueue获取一个元素,该方法不会阻塞
E x = q.poll();
if (x != null)
return x;
// 此处的nanos会因为每次调用Condition.awaitNanos而减少,如果<0则说明累计等待时间已达到设定的等待时间
if (nanos <= 0)
return null;
try {
// Condition.awaitNanos指定等待时间,但是有可能会被“虚假唤醒”(参考API),导致等待时间未满,返回值即剩余的等待时间
// 所以需要在外层进行循环,每次等待的时候是上次剩余的时间
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}

具体的出队操作依然是调用PriorityQueue.poll()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* @By Vicky:出队一个元素
*/

public E poll() {
// size==0队列为0,直接返回null
if (size == 0)
return null;
int s = --size;
modCount++;
// 出队总是将数组的第一个元素进行出队,
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
// 同时将队列的最后一个元素放到第一个位置,然后自上而下调整堆
siftDown(0, x);
return result;
}

/**
* @By Vicky:自下而上调整堆
*/

private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}

/**
* @By Vicky:自下而上调整堆
*/

private void siftDownUsingComparator(int k, E x) {
// 由于堆是一个二叉树,所以size/2是树中的最后一个非叶子节点
// 如果k是叶子节点,那么其无子节点,则不需要再往下调整堆
int half = size >>> 1;
while (k < half) {
// 左节点,固定规则,可参考博客:http://vickyqi.com/
int child = (k << 1) + 1;
Object c = queue[child];
// 右节点
int right = child + 1;
// 找出两个子节点以及父节点中较小的一个
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
// 如果父节点最小,则无需继续往下调整堆
if (comparator.compare(x, (E) c) <= 0)
break;
// 否则将父节点与两个子节点中较小的一个交换,然后往下继续调整
queue[k] = c;
k = child;
}
queue[k] = x;
}

基本操作原理见代码注释,同时可参考博客:数据结构系列——堆进行详细的了解。

remove(Object)

删除其实并不是常用的方法,主要是堆在删除时还是有点值得介绍的。这里我们直接看PriorityQueue.remove()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @By Vicky:移除指定元素
*/

public boolean remove(Object o) {
// 在队列中查询元素,返回待删除元素在队列中的位置
int i = indexOf(o);
if (i == -1)
return false;
else {
// 删除指定位置的元素
removeAt(i);
return true;
}
}

/**
* @By Vicky:删除指定位置的元素
*/

private E removeAt(int i) {
assert i >= 0 && i < size;
modCount++;
int s = --size;
if (s == i) // removed last element
queue[i] = null;
else {
// 删除最后一个元素,将最后一个元素放到i的位置,然后从i开始上而下调整堆
E moved = (E) queue[s];
queue[s] = null;
siftDown(i, moved);
// 如果queue[i] == moved说明未发生调整,那么则需要自下而上调整堆
if (queue[i] == moved) {
siftUp(i, moved);
if (queue[i] != moved)
return moved;
}
}
return null;
}

当删除堆中的一个元素时,将堆的最后一个元素移动到被删除的位置,然后将最后一个位置值为NULL,当把最后一个元素移动到堆中的某个位置时,这时首先需要从该位置开始自上而下的调整堆,如果该位置的元素在调整时发生变化,即堆有变化,则说明该元素是大于其子节点的,那么该节点就不可能小于其上的父节点(因为堆的结构是传递性的,即子节点小于父节点,其孙子节点同时小于其父节点),所以就不需要再网上调整了;但是如果未发生变化,则说明该位置的节点小于其子节点,那么就无法保证其一定比父节点大,所以需要从该节点开始自上而下的调整堆。调整堆的方法是入队和出队时都有介绍,这里就不介绍了。

以上即PriorityBlockingQueue常用的一些方法,另外一些peek(),迭代等方法就不介绍了,毕竟不涉及堆的改变。

使用场景

PriorityBlockingQueue与普通阻塞队列的不同之处就是在于其支持对队列中的元素进行比较,而已决定出队的顺序,所以可以使用PriorityBlockingQueue实现高优先级的线程优先执行。


以上即本篇的全部内容,如有错误之处,请不吝赐教~~~