JDK并发工具类源码学习系列——ConcurrentSkipListMap(续)

ConcurrentSkipListMap在JDK并发工具类使用范围不是很广,它是针对某一特殊需求而设计的——支持排序,同时支持搜索目标返回最接近匹配项的导航方法。一般情况下开发者很少会使用到该类,但是如果你有如上的特殊需求,那么ConcurrentSkipListMap将是一个很好地解决方案。 本文通过对JDK的ConcurrentSkipListMap的代码详细分析,深入分析其实现原理。

上一篇介绍了ConcurrentSkipListMap的原理以及对put()方法进行了解析,本篇继续接着上一篇往下看。

常用方法解读

上一篇说到put()的最后一句:insertIndex(z, level);这一句是将一个新节点插入到跳表结构中,下面看看是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
private void insertIndex(Node<K,V> z, int level) {
HeadIndex<K,V> h = head;// 读取跳表的头
int max = h.level;// 跳表的最高层级

if (level <= max) {// level <= max则需要将自最底层到level层的每一层插入该节点
// 此处从最底层开始往上创建跳表节点,每个节点的down指向下一层的节点,
// 最后的idx为level层的节点,通过down形成一个链表结构
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
// 将该节点插入跳表
addIndex(idx, h, level);

} else { // Add a new level(新增加一层)
/*
* To reduce interference by other threads checking for
* empty levels in tryReduceLevel, new levels are added
* with initialized right pointers. Which in turn requires
* keeping levels in an array to access them while
* creating new head index nodes from the opposite
* direction.
*/

level = max + 1;// 最大值+1
Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];//一个数组
Index<K,V> idx = null;// 一个新的跳表头结点
// 该数组是一个从level层开始一直向下引用的垂直链表,用于方便获取某层的节点
// 用处在下面if (level <= oldLevel)出可以看出
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);

HeadIndex<K,V> oldh;//旧头结点
int k;
for (;;) {
oldh = head;
int oldLevel = oldh.level;
// 此处如果发生level <= oldLevel说明其他线程已经增加了这一层,那么当前线程要插入的层已经不>跳表的最大层了
// 那直接从数组中渠道level层的节点插入到跳表即可,插入方式同上面
if (level <= oldLevel) { // lost race to add level
k = level;
break;
}
HeadIndex<K,V> newh = oldh;
Node<K,V> oldbase = oldh.node;
for (int j = oldLevel+1; j <= level; ++j)
// 循环创建头结点,直到level层,其实从level = max + 1;这里可以看出也就最多增加一层
// 新的头结点node和下一层的node一样,down引用下一层,right是我们新建的节点(通过idxs数组直接访问第N层的节点)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// 用新的头结点取代旧的头结点,使用CAS避免并发安全问题
if (casHead(oldh, newh)) {
// 此处k = oldLevel,直接导致下面插入节点的时候从oldLevel开始插入,
// 因为oldLevel之上是新插入的层级,每层都是一个接节点,所以已经是OK的了
k = oldLevel;
break;
}
}
// 插入节点
addIndex(idxs[k], oldh, k);
}
}

private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
// Track next level to insert in case of retries
int insertionLevel = indexLevel;
Comparable<? super K> key = comparable(idx.node.key);
if (key == null) throw new NullPointerException();

// Similar to findPredecessor, but adding index nodes along
// path to key.
// 此处死循环是在发生不一致时进行重试
for (;;) {
// 此处从跳表头部开始找到该插入该节点的位置,类似findPredecessor
int j = h.level;
Index<K,V> q = h;
Index<K,V> r = q.right;
Index<K,V> t = idx;
for (;;) {
// 在当前层级查找适合插入节点的位置
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = key.compareTo(n.key);
if (n.value == null) {// 如果r已被删除,则移除r
if (!q.unlink(r))
break;
r = q.right;//r指向下一层节点,继续
continue;
}
if (c > 0) {//如果当前访问的节点key<需插入节点的key,则继续右移
q = r;
r = r.right;
continue;
}
}
// 到达此处有两种情况:
// 1.已访问到该层链表的最右边
// 2.当前访问的节点(r)的key>=需插入节点的key,则继续右移
if (j == insertionLevel) {// 如果找到的层级==要插入的层级,那么就在该层级插入该节点
// Don't insert index if node already deleted
if (t.indexesDeletedNode()) {// 判断要插入的节点是否已经被删除,此处考虑的是多线程情况下可能发生的并发问题
findNode(key); // cleans up
return;
}
// 将t插入到q的右边,替代之前的r,通过CAS确保并发安全
if (!q.link(r, t))
break; // restart
if (--insertionLevel == 0) {// 插入下一层,直到0为止
// need final deletion check before return
if (t.indexesDeletedNode())
findNode(key);
return;
}
}
// 将j-1有两个目的:
// 1.如果在-1之前j == insertionLevel,即查找已经定位到要插入节点的那一层,则insertionLevel会-1,所以j也要跟着-1
// 2.在-1之前j != insertionLevel,说明还未查找到要插入的那一层,需要继续往下查找
//此处需要同时满足两个条件才可向下一层查找:
// 1.-1之后的j>=insertionLevel(此处可能是还未找到要插入的层,也可能已经找到了)
// 2.j < indexLevel,针对第一点还未找到要插入的层,如果j>=indeLevel说明该层不需要插入节点
if (--j >= insertionLevel && j < indexLevel)
// t=t.down即将t往下移动一层,此处的t是在每一层要插入的节点,其实也就是要插入节点的下一层
t = t.down;
// 到这里存在两种情况:
// 1.上层已经插入了
// 2.还未找到要插入的层(第一个要插入的层,也就是新插入节点被分配到的层级)
// 不管上面如何处理,到这里都需要往下遍历了
q = q.down;
r = q.right;
}
}
}

真的挺复杂的逻辑,本身跳表的插入就有点复杂,现在还需要考虑并发问题,就更多复杂难懂了。代码中加入了大量的注释,希望能够说明白吧,其实自己看着也挺蒙,明白原理就行,哈哈。
put()总算说完了,下面看看get()吧。

get(Object)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
Node<K,V> bound = null;
Index<K,V> q = head;
Index<K,V> r = q.right;
Node<K,V> n;
K k;
int c;
for (;;) {
Index<K,V> d;
// Traverse rights
if (r != null && (n = r.node) != bound && (k = n.key) != null) {
if ((c = key.compareTo(k)) > 0) {
q = r;
r = r.right;
continue;
} else if (c == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else
// 这里当前查找的节点(n)key>指定key说明已经跑过头了,此时需要往下找了
// bound在这里的作用是在下一层进行查找时如果找到了这一层的bound节点则说明又需要往下找了
// 这里如果理解了跳表的结构就好理解了,因为有的节点并不是每层都有
bound = n;
}

// Traverse down
if ((d = q.down) != null) {
q = d;
r = d.right;
} else
break;
}
// 如果执行到这里,说明已经找到最底层了,但是依旧没找到该节点,所以要找的节点如果存在那么就是在最底层的靠右端
// 所以还需要往右查找
// Traverse nexts
for (n = q.node.next; n != null; n = n.next) {
if ((k = n.key) != null) {
if ((c = key.compareTo(k)) == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else if (c < 0)
break;
}
}
return null;
}

private V getUsingFindNode(Comparable<? super K> key) {
/*
* Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we
* lost a race with a deletion, so must retry.
*/

// 该方法循环查找key的节点,如果找不到key对应的节点,则直接返回null
// 如果找到对应的节点,但是value==null,则重试(原因尚不清楚,注释也不是太明白)
for (;;) {
// findNode返回的n如果不为null,那么就已经判断过n的value是否为null了,
// 下面再次判断可能是为了避免其他线程在这段时间内删掉了这个节点
// 不过既然被其他线程删掉了为什么还要去找呢,直接返回null不就好了,是在不明白,难道是为了再去删掉这个节点?
Node<K,V> n = findNode(key);
if (n == null)
return null;
Object v = n.value;
if (v != null)
return (V)v;
}
}

/**
* @By Vicky:此方法用于根据key查找对应的节点
*/

private Node<K,V> findNode(Comparable<? super K> key) {
for (;;) {
// findPredecessor找到key的左边的节点,之前有介绍
Node<K,V> b = findPredecessor(key);
// n是b的next节点,正确情况下n的key==key
Node<K,V> n = b.next;
for (;;) {
if (n == null) // n==null说明b是最右边的节点,所以其实未找到key对应的节点
return null;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
// n被删除了,helpDelete掉
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// b被打上了delete标记,等待其他线程将其删除
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c == 0)// 即n就是要查找的节点
return n;
if (c < 0)// b的key小于指定key,但是b的下一个节点的key大于指定key,说明没指定key对应的节点
return null;
b = n;// 继续往右走
n = f;
}
}
}

get()过程涉及三个方法,都是查找,相对put()来说,get简单多了,就是按跳表的结构查找,只是需要判断节点是否被删除,同时还需要帮忙将被删除的节点移除跳表结构。代码中的注释应该比较清楚了,就不再细述了,下面看remove()方法。

remove(Object)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public V remove(Object key) {
return doRemove(key, null);
}

/**
* @By Vicky:删除的逻辑和查找类似,将找到的节点的value置为null,同时将其打上删除标记
*/

final V doRemove(Object okey, Object value) {
Comparable<? super K> key = comparable(okey);
for (;;) {
// findPredecessor找到最底层的前一个节点(这里的节点是链表的节点,而非跳表的节点)
Node<K,V> b = findPredecessor(key);
Node<K,V> n = b.next;
for (;;) {
// b.next==null说明未找到响应的节点,直接返回null
if (n == null)
return null;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
// 下面分别判断n和b是否已被删除
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
// c<0说明findPredecessor找到的节点的下一个节点的key<要删除的key
// 正确情况下(无并发情况下)c应该是>=0才对
if (c < 0)
return null;
// c>0继续查找
if (c > 0) {
b = n;
n = f;
continue;
}
// value!=null则说明需要待删除的节点的值和指定的值一样才可删除
if (value != null && !value.equals(v))
return null;
// 将节点的value置为null即表示删除
if (!n.casValue(v, null))
break;
// 同时为n添加删除标记
if (!n.appendMarker(f) || !b.casNext(n, f))
// 这一步其实就是为了触发删除节点
findNode(key); // Retry via findNode
else {
// 触发删除节点,同时如果最高层的节点被删完了则降一层
findPredecessor(key); // Clean index
if (head.right == null)
tryReduceLevel();
}
return (V)v;
}
}
}

remove()和get()差不多,毕竟删除节点的第一步就是找到这个节点,代码中添加了注释,这里不再细述。

看完了三个重要方法,说实话虽然每个方法中的每行代码都可以理解,但是想要理解为什么这么做还是很难的,毕竟代码是死的。就比如说remove和get,remove中也要查找节点,但是remove是调用findPredecessor()来直接查找最底层链表的前继节点,而get是自己按照跳表的结构进行查找,为什么呢?还有一点,所有的跳表节点中引用的node(链表节点)都是一样的,即如果第2层跳表引用一个node(key=4)和第1层引用的node(key=4)其实是同一个引用,而且同最底层的链表的node也是同一个,所以如果找到最上层的那个节点,不就是相当于找到最底层的那个节点了吗?为什么非得找最底层呢?
这里可以给出一个答案,那就是findPredecessor()会将已经被删除的跳表的节点从跳表结构中移除,因为删除节点只是将value置为null,顺便加个删除标记,但是这些节点还挂在跳表中,所以需要人为的触发findPredecessor()去一个个删除,也许这就是目的吧。

这三个方法都没有加锁,使用了CAS进行更新,同时修改结构的时候都是控制在一个CAS操作中完成,例如put()方法在将一个节点插入到链表时,是先创建一个新的节点,这个节点的next指向要插入的位置的下一个节点,这步操作不会影响整体的结构,所以是安全的,然后将要插入位置的前一个节点的next原子性的更新成新的节点,所以也不会影响到其他线程,而remove则只是将节点的value值修改为null,以及原子的更新节点的next,所以CAS+volatile就是JDKconcurrent包下的并发控制的实现原理。

使用场景

ConcurrentSkipListMap使用跳表结构来保存链表的顺序,解决了在有序链表中快速查找的问题,所以ConcurrentSkipListMap适合在需要链表的高效更新效率(删除/插入)以及还要保证一定的随机访问效率(查找/更新)的场景下使用。

参考文章

SkipList 跳表
Java多线程(四)之ConcurrentSkipListMap深入分析
Java多线程系列–“JUC集合”05之 ConcurrentSkipListMap


以上就是本篇的全部内容,如有错误之处,还望大家指出,谢谢~~~