CurrentHashMap如何實(shí)現(xiàn)高效地線程安全

以下文章來(lái)源于七哥聊編程 ,作者七哥

大家好,我是頭發(fā)已經(jīng)不多七哥(最近找工作復(fù)習(xí)壓力有點(diǎn)大哦??)。

今天的主角就是 java 并發(fā)包中提供的 CurrentHashMap 這是一個(gè)線程安全且高效的HashMap ,也是面試的高頻考點(diǎn)。



下面將圍繞主題:ConcurrentHashMap 如何實(shí)現(xiàn)高效地線程安全?以及在Java8中它從設(shè)計(jì)實(shí)現(xiàn)上有哪些演進(jìn)?

這篇文章一開(kāi)始我以為會(huì)比較簡(jiǎn)單,但是在深入源碼分析時(shí),遇到了很大的阻礙,比前面我們分析AQS以及讀寫(xiě)鎖的源碼要難理解的多,斷斷續(xù)續(xù)也寫(xiě)了4天了。如果你看完還是沒(méi)有理解的話,那我在這里表示深深的歉意,同時(shí)也歡迎你和我一起溝通。

網(wǎng)上關(guān)于 HashMap 和 ConcurrentHashMap 的文章確實(shí)不少,不過(guò)目前的很多分析資料還是基于其早期版本,所以才想自己也寫(xiě)一篇,把細(xì)節(jié)說(shuō)清楚說(shuō)透,尤其像 Java8 中的 ConcurrentHashMap 的演進(jìn)設(shè)計(jì)實(shí)現(xiàn),大部分文章都說(shuō)不清楚。希望能降低大家學(xué)習(xí)的成本,不希望大家看了一篇又一篇文章,最終還是模模糊糊。

閱讀前提:

本文會(huì)涉及源碼分析,所以至少讀者要熟悉它們的接口使用,同時(shí),對(duì)于并發(fā),讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個(gè)基本的知識(shí),文中不會(huì)對(duì)這些知識(shí)進(jìn)行介紹。

為什么需要 ConcurrentHashMap?
在并發(fā)編程中使用HashMap可能導(dǎo)致程序死循環(huán)。而使用線程安全的HashTable效率又非常低下(它的實(shí)現(xiàn)就是將put、get、size等方法加上 synchronized 關(guān)鍵字),基于以上兩個(gè)原因,便有了ConcurrentHashMap的登場(chǎng)機(jī)會(huì)。

可能有的同學(xué)對(duì) HashMap 為什么會(huì)在并發(fā)中出現(xiàn)死循環(huán)從而導(dǎo)致 cpu 占用達(dá)到100% 不太了解,這里直接展示一段示例代碼,運(yùn)行它就會(huì)出現(xiàn)死循環(huán)。

static final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < 100000; i++) {
            int finalI1 = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    map.put(String.valueOf(finalI1), "");
                }
            }, "ftf" + i).start();
        }
    }
}, "ftf");
t.start();
t.join();
記著在 jdk1.7 及之前的版本測(cè)試(1.8 已經(jīng)解決了死循環(huán)問(wèn)題),死循環(huán)的概率還是非常低的,比較難以重現(xiàn)。為了提高出現(xiàn)概率,采用多次迭代測(cè)試,我在測(cè)試時(shí)出現(xiàn)在 128次。

感興趣的同學(xué)可以用 jstack 分析下,網(wǎng)上有很多教程,這里就不展開(kāi) 排查過(guò)程了。原因就是:HashMap 在并發(fā)執(zhí)行 put 操作時(shí)會(huì)引起死循環(huán),因?yàn)槎嗑€程會(huì)導(dǎo)致 HashMap 擴(kuò)容時(shí) Entry 鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),一旦形成環(huán)形數(shù)據(jù)結(jié)構(gòu),Entry 的 next 節(jié)點(diǎn)永遠(yuǎn)不為空,就會(huì)產(chǎn)生死循環(huán)獲 取 Entry 。從而導(dǎo)致CPU占用將近100%。

Java7中ConcurrentHashMap分析
首先,我這里強(qiáng)調(diào),ConcurrentHashMap 的設(shè)計(jì)實(shí)現(xiàn)其實(shí)一直在演化,比如在 Java 8 中就發(fā)生了非常大的變化(Java 7 其實(shí)也有不少更新),所以,我這里將比較分析結(jié)構(gòu)、實(shí)現(xiàn)機(jī)制等方面,對(duì)比不同版本的主要區(qū)別。

在 Java7 中的實(shí)現(xiàn)是基于:

分離鎖,也就是將內(nèi)部進(jìn)行分段(Segment),里面則是 HashEntry 的數(shù)組,和 HashMap 類(lèi)似,哈希相同的條目也是以鏈表形式存放。
HashEntry 內(nèi)部使用 volatile 的 value 字段來(lái)保證可見(jiàn)性,也利用了不可變對(duì)象的機(jī)制以改進(jìn)利用 Unsafe 提供的底層能力,比如 volatile access,去直接完成部分操作,以最優(yōu)化性能,畢竟 Unsafe 中的很多操作都是 JVM intrinsic 優(yōu)化過(guò)的。
具體實(shí)現(xiàn)可以理解為:ConcurrentHashMap 是由 Segment 數(shù)組結(jié)構(gòu)和 HashEntry 數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖(繼承了ReentrantLock),在ConcurrentHashMap里扮演鎖的角色;HashEntry 則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。一個(gè) ConcurrentHashMap 里包含一個(gè) Segment 數(shù)組。Segment 的結(jié)構(gòu)和 HashMap 類(lèi)似,是一種 數(shù)組和鏈表結(jié)構(gòu)。一個(gè) Segment 里包含一個(gè) HashEntry 數(shù)組,每個(gè) HashEntry 是一個(gè)鏈表結(jié)構(gòu)的元 素,每個(gè)Segment 守護(hù)著一個(gè) HashEntry 數(shù)組里的元素,當(dāng)對(duì) HashEntry 數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí), 必須首先獲得與它對(duì)應(yīng)的Segment鎖。



初始化
在構(gòu)造的時(shí)候,Segment 的數(shù)量由所謂的 concurrentcyLevel 決定,默認(rèn)是 16,所以理論上,這個(gè)時(shí)候,最多可以同時(shí)支持 16 個(gè)線程并發(fā)寫(xiě),只要它們的操作分別分布在不同的 Segment 上。也可以在相應(yīng)構(gòu)造函數(shù)直接指定。注意,Java 需要它是 2 的冪數(shù)值,如果輸入是類(lèi)似 15 這種非冪值,會(huì)被自動(dòng)調(diào)整到 16 之類(lèi) 2 的冪數(shù)值。并且一旦初始化后,它是不可以擴(kuò)容的。

ConcurrentHashMap 初始化方法是通過(guò) initialCapacity 、loadFactor 和 concurrencyLevel 等幾個(gè)參數(shù)來(lái)初始化segment數(shù)組、段偏移量 segmentShift 、段掩碼 segmentMask 和每個(gè) segment 里的 HashEntry 數(shù)組來(lái)實(shí)現(xiàn)的。

下面結(jié)合源代碼一起來(lái)看下,為方便理解,我直接注釋在代碼段里:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計(jì)算并行級(jí)別 ssize,因?yàn)橐3植⑿屑?jí)別是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我們這里先不要那么燒腦,用默認(rèn)值,concurrencyLevel 為 16,sshift 為 4
    // 那么計(jì)算出 segmentShift 為 28,segmentMask 為 15,后面會(huì)用到這兩個(gè)值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是設(shè)置整個(gè) map 初始的大小,
    // 這里根據(jù) initialCapacity 計(jì)算 Segment 數(shù)組中每個(gè)位置可以分到的大小
    // 如 initialCapacity 為 64,那么每個(gè) Segment 可以分到 4 個(gè)
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默認(rèn) MIN_SEGMENT_TABLE_CAPACITY 是 2,這個(gè)值也是有講究的,因?yàn)檫@樣的話,對(duì)于具體的槽上,插入一個(gè)元素不至于擴(kuò)容,插入第二個(gè)的時(shí)候才會(huì)擴(kuò)容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;

    // 創(chuàng)建 Segment 數(shù)組,
    // 并創(chuàng)建數(shù)組的第一個(gè)元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往數(shù)組寫(xiě)入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
初始化完成,我們得到了一個(gè) Segment 數(shù)組。這里之所以 segments 數(shù)組的長(zhǎng)度必須是2的N次冪,主要是為了能通過(guò)按位與的散列算法來(lái)定位 segments 數(shù)組的索引。

注意:concurrencyLevel 的最大值是65535,這意味著 segments 數(shù)組的長(zhǎng)度最大為65536, 對(duì)應(yīng)的二進(jìn)制是16位。

為了加深讀者理解,下面來(lái)分析下,當(dāng)我們用 new ConcurrentHashMap() 無(wú)參構(gòu)造函數(shù)進(jìn)行初始化的,那么初始化完成后:

Segment 數(shù)組長(zhǎng)度為 16,不可以擴(kuò)容
Segment[i] 的默認(rèn)大小為 2,負(fù)載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個(gè)元素不會(huì)觸發(fā)擴(kuò)容,插入第二個(gè)會(huì)進(jìn)行第一次擴(kuò)容
這里初始化了 segment[0],其他位置還是 null,至于為什么要初始化 segment[0],后面的代碼會(huì)介紹
當(dāng)前段偏移量 segmentShift 的值為 32 - 4 = 28,段掩碼  segmentMask 為 16 - 1 = 15,這兩個(gè)值馬上就會(huì)用到
get 操作
get 操作需要保證的是可見(jiàn)性,所以并沒(méi)有什么同步邏輯。

計(jì)算 hash 值,找到 segment 數(shù)組中的具體位置
segment 中也是一個(gè)數(shù)組(HashEntry數(shù)組),根據(jù) hash 找到數(shù)組中具體的位置
到這里是鏈表了,HashEntry 是鏈表中的元素,順著鏈表進(jìn)行查找即可
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值,32位
    int h = hash(key);
    // 利用位操作替換普通數(shù)學(xué)運(yùn)算,將hash值無(wú)符號(hào)左移段偏移量位,即取高四位,在與上段掩碼(15二進(jìn)制位1111)
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據(jù) hash 找到對(duì)應(yīng)的 segment,利用Unsafe直接進(jìn)行volatile access
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內(nèi)部數(shù)組相應(yīng)位置的鏈表,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
put操作
對(duì)于 put 操作,首先是通過(guò)二次哈希避免哈希沖突,然后以 Unsafe 調(diào)用方式,直接獲取相應(yīng)的 Segment,然后進(jìn)行線程安全的 put 操作:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 二次哈希,以保證數(shù)據(jù)的分散性,避免哈希沖突
   int hash = hash(key.hashCode());
    // 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j
    //    hash 是 32 位,無(wú)符號(hào)右移 segmentShift(28) 位,剩下高 4 位,
    //    然后和 segmentMask(15) 做一次與操作,也就是說(shuō) j 是 hash 值的高 4 位,也就是segment的數(shù)組下標(biāo)
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說(shuō)了,初始化的時(shí)候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對(duì) segment[j] 進(jìn)行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}
其核心邏輯實(shí)現(xiàn)在下面的內(nèi)部方法中:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫(xiě)入前,需要先獲取該 segment 的獨(dú)占鎖
    //    先看主流程,后面還會(huì)具體介紹這部分內(nèi)容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個(gè)是 segment 內(nèi)部的數(shù)組
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應(yīng)該放置的數(shù)組下標(biāo)
        int index = (tab.length - 1) & hash;
        // first 是數(shù)組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);

        // 下面這串 for 循環(huán)雖然很長(zhǎng),不過(guò)也很好理解,想象當(dāng)前位置鏈表不為空則先遍歷找是否存在,如果存在則覆蓋,否則放到合適的位置
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續(xù)順著鏈表走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個(gè)要看獲取鎖的過(guò)程,不過(guò)和這里都沒(méi)有關(guān)系。
                // 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是null,初始化并設(shè)置為鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 如果超過(guò)了該 segment 的閾值,這個(gè) segment 需要擴(kuò)容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴(kuò)容后面也會(huì)具體分析
                else
                    // 沒(méi)有達(dá)到閾值,將 node 放到數(shù)組 tab 的 index 位置,
                    // 其實(shí)就是將新的節(jié)點(diǎn)設(shè)置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}





rehash:擴(kuò)容操作
重復(fù)一下,segment 數(shù)組不能擴(kuò)容,擴(kuò)容是 segment 數(shù)組某個(gè)位置內(nèi)部的數(shù)組 HashEntry<K,V>[] 進(jìn)行擴(kuò)容,擴(kuò)容后,容量為原來(lái)的 2 倍。

首先,我們要回顧一下觸發(fā)擴(kuò)容的地方,put 的時(shí)候,如果判斷該值的插入會(huì)導(dǎo)致該 segment 的元素個(gè)數(shù)超過(guò)閾值,那么先進(jìn)行擴(kuò)容,再插值,讀者這個(gè)時(shí)候可以回去 put 方法看一眼。

該方法不需要考慮并發(fā),因?yàn)榈竭@里的時(shí)候,是持有該 segment 的獨(dú)占鎖的。

// 方法參數(shù)上的 node 是這次擴(kuò)容后,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創(chuàng)建新數(shù)組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴(kuò)容到 32,那么 sizeMask 為 31,對(duì)應(yīng)二進(jìn)制 ‘000...00011111’
    int sizeMask = newCapacity - 1;

    // 遍歷原數(shù)組,老套路,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個(gè)位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個(gè)元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計(jì)算應(yīng)該放置在新數(shù)組中的位置,
            // 假設(shè)原數(shù)組長(zhǎng)度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            // 該位置處只有一個(gè)元素,那比較好辦,直接放到新數(shù)組中對(duì)應(yīng)的位置
            if (next == null)   
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當(dāng)前鏈表的頭結(jié)點(diǎn) e 的新位置
                int lastIdx = idx;

                // 下面這個(gè) for 循環(huán)會(huì)找到一個(gè) lastRun 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)之后的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之后的所有節(jié)點(diǎn)組成的這個(gè)鏈表放到 lastIdx 這個(gè)位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的節(jié)點(diǎn),
                //    這些節(jié)點(diǎn)可能分配在另一個(gè)鏈表中,也可能分配到上面的那個(gè)鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來(lái)的 node 放到新數(shù)組中剛剛的 兩個(gè)鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
上面有兩個(gè)挨著的 for 循環(huán),第一個(gè) for 有什么用呢?

這塊代碼我看的時(shí)候真的是很難理解,反復(fù)看了好幾遍,主要原因還是對(duì)鏈表操作不太熟悉,這里為大家在解釋下,幫助理解。這里需要進(jìn)行第一個(gè) for 循環(huán),主要是因?yàn)閿U(kuò)容后,原來(lái)數(shù)組位置 i 的 HashEntry 是一個(gè)鏈表,那么這個(gè)鏈表的元素對(duì)應(yīng)擴(kuò)容后的數(shù)組位置必然是 i 或 i+oldCap。第一個(gè)循環(huán)就是為遍歷當(dāng)前位置 i 的鏈表找到最后一個(gè)在新數(shù)組中位置相同的節(jié)點(diǎn) lastRun。

如果沒(méi)有第一個(gè) for 循環(huán),也是可以工作的,但是,這個(gè) for 循環(huán)下來(lái),如果 lastRun 的后面還有比較多的節(jié)點(diǎn),那么這次就是值得的。因?yàn)槲覀冎恍枰寺?lastRun 前面的節(jié)點(diǎn),后面的一串節(jié)點(diǎn)跟著 lastRun 進(jìn)行賦值就可以了,不需要做任何操作。

Doug Lea 大神這塊的想法一般人可能是想不到的,畢竟作為并發(fā)包中的基礎(chǔ)類(lèi) 都是為了將并發(fā)性能做到極致的。但是也有最差的情況,就是找到的 lastRun 是鏈表的最后一個(gè)元素,或者排在倒數(shù),那么這次遍歷就顯得多余了,而且浪費(fèi)了性能。不過(guò) Doug Lea 也說(shuō)了,根據(jù)統(tǒng)計(jì),如果使用默認(rèn)的閾值,大約只有 1/6 的節(jié)點(diǎn)需要克隆。

size 操作
知道了 ConcurrentHashMap 通過(guò)分段鎖實(shí)現(xiàn)高性能且線程安全的原理。試想,如果不進(jìn)行同步,簡(jiǎn)單的計(jì)算所有 Segment 的總值,可能會(huì)因?yàn)椴l(fā) put,導(dǎo)致結(jié)果不準(zhǔn)確,但是直接鎖定所有 Segment 進(jìn)行計(jì)算,就會(huì)變得非常昂貴。

所以,ConcurrentHashMap 的實(shí)現(xiàn)是通過(guò)重試機(jī)制(RETRIES_BEFORE_LOCK,指定重試次數(shù) 2),來(lái)試圖獲得可靠值。如果沒(méi)有監(jiān)控到發(fā)生變化(通過(guò)對(duì)比 Segment.modCount),就直接返回,否則獲取鎖進(jìn)行操作。

Java8中ConcurrentHashMap分析
在 Java 8 和之后的版本中,ConcurrentHashMap 發(fā)生了哪些變化呢?

Java8 對(duì) HashMap 進(jìn)行了一些修改,最大的不同就是利用了紅黑樹(shù),所以其由 數(shù)組+鏈表+紅黑樹(shù) 組成。

因?yàn)椴辉偈褂?Segment,初始化操作大大簡(jiǎn)化,修改為 lazy-load 形式,這樣可以有效避免初始開(kāi)銷(xiāo),解決了老版本很多人抱怨的這一點(diǎn)。

數(shù)據(jù)存儲(chǔ)利用 volatile 來(lái)保證可見(jiàn)性。

使用 CAS 等操作,在特定場(chǎng)景進(jìn)行無(wú)鎖并發(fā)操作。

這里介紹一個(gè)最常問(wèn)的問(wèn)題:Java8 為什么使用紅黑樹(shù)呢?

根據(jù) Java7 HashMap 的介紹,我們知道,查找的時(shí)候,根據(jù) hash 值我們能夠快速定位到數(shù)組的具體下標(biāo),但是之后的話,需要順著鏈表一個(gè)個(gè)比較下去才能找到我們需要的,時(shí)間復(fù)雜度取決于鏈表的長(zhǎng)度,為 O(n)。

為了降低這部分的開(kāi)銷(xiāo),在 Java8 中,當(dāng)鏈表中的元素達(dá)到了 8 個(gè)時(shí),會(huì)將鏈表轉(zhuǎn)換為紅黑樹(shù),在這些位置進(jìn)行查找的時(shí)候可以降低時(shí)間復(fù)雜度為 O(logN)。



注意,上圖是示意圖,主要是描述結(jié)構(gòu),不會(huì)達(dá)到這個(gè)狀態(tài)的,因?yàn)檫@么多數(shù)據(jù)的時(shí)候早就擴(kuò)容了。

Java7 中使用 HashEntry 來(lái)代表每個(gè) HashMap 中的數(shù)據(jù)節(jié)點(diǎn),Java8 中使用 Node,基本沒(méi)有區(qū)別,都是 key,value,hash 和 next 這四個(gè)屬性,不過(guò),Node 只能用于鏈表的情況,紅黑樹(shù)的情況需要使用 TreeNode。

先看看現(xiàn)在的數(shù)據(jù)存儲(chǔ)內(nèi)部實(shí)現(xiàn),我們可以發(fā)現(xiàn) Key 是 final 的,因?yàn)樵谏芷谥校粋€(gè)條目的 Key 發(fā)生變化是不可能的;與此同時(shí) val,則聲明為 volatile,以保證可見(jiàn)性。

static class Node<K,V> implements Map.Entry<K,V> {
 final int hash;
 final K key;
 volatile V val;
 volatile Node<K,V> next;
 // …
}
為了提高大家的閱讀體驗(yàn),我這里就不再介紹 get 方法和構(gòu)造函數(shù)了,相對(duì)比較簡(jiǎn)單,相信你如果看懂了 Java7 的實(shí)現(xiàn)一定沒(méi)有啥問(wèn)題的。直接看并發(fā)的 put 是如何實(shí)現(xiàn)的。

put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    // 用于記錄相應(yīng)鏈表的長(zhǎng)度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果數(shù)組"空",進(jìn)行數(shù)組初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化數(shù)組
            tab = initTable();

        // 找該 hash 值對(duì)應(yīng)的數(shù)組下標(biāo),得到第一個(gè)節(jié)點(diǎn) f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果數(shù)組該位置為空,
            // 用一次 CAS 操作將這個(gè)新值放入其中即可,這個(gè) put 操作差不多就結(jié)束了,可以拉到最后面
            // 如果 CAS 失敗,那就是有并發(fā)操作,進(jìn)到下一個(gè)循環(huán)就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等于 MOVED,這個(gè)需要到后面才能看明白,不過(guò)從名字上也能猜到,肯定是因?yàn)樵跀U(kuò)容
        else if ((fh = f.hash) == MOVED)
            // 幫助數(shù)據(jù)遷移,這個(gè)等到看完數(shù)據(jù)遷移部分的介紹后,再理解這個(gè)就很簡(jiǎn)單了
            tab = helpTransfer(tab, f);

        else { // 到這里就是說(shuō),f 是該位置的頭結(jié)點(diǎn),而且不為空

            V oldVal = null;
            // 獲取數(shù)組該位置的頭結(jié)點(diǎn)的監(jiān)視器鎖
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 頭結(jié)點(diǎn)的 hash 值大于 0,說(shuō)明是鏈表
                        // 用于累加,記錄鏈表的長(zhǎng)度
                        binCount = 1;
                        // 遍歷鏈表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果發(fā)現(xiàn)了"相等"的 key,判斷是否要進(jìn)行值覆蓋,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了鏈表的最末端,將這個(gè)新值放到鏈表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 紅黑樹(shù)
                        Node<K,V> p;
                        binCount = 2;
                        // 調(diào)用紅黑樹(shù)的插值方法插入新節(jié)點(diǎn)
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判斷是否要將鏈表轉(zhuǎn)換為紅黑樹(shù),臨界值和 HashMap 一樣,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 這個(gè)方法和 HashMap 中稍微有一點(diǎn)點(diǎn)不同,那就是它不是一定會(huì)進(jìn)行紅黑樹(shù)轉(zhuǎn)換,
                    // 如果當(dāng)前數(shù)組的長(zhǎng)度小于 64,那么會(huì)選擇進(jìn)行數(shù)組擴(kuò)容,而不是轉(zhuǎn)換為紅黑樹(shù)
                    // 具體源碼我們就不看了,擴(kuò)容部分后面說(shuō)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //
    addCount(1L, binCount);
    return null;
}
put 的主流程看完了,但是至少留下了幾個(gè)問(wèn)題,第一個(gè)是初始化,第二個(gè)是擴(kuò)容,第三個(gè)是幫助數(shù)據(jù)遷移,這些我們都會(huì)在后面進(jìn)行一一介紹。

初始化數(shù)組:initTable
從上面的 put 操作可以看到,數(shù)組初始化是在 put 操作時(shí)進(jìn)行的,采用的 lazy-load 形式。

這個(gè)比較簡(jiǎn)單,主要就是初始化一個(gè)合適大小的數(shù)組,然后會(huì)設(shè)置 sizeCtl。

初始化方法中的并發(fā)問(wèn)題是通過(guò)對(duì) sizeCtl 進(jìn)行一個(gè) CAS 操作來(lái)控制的。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 初始化的"功勞"被其他線程"搶去"了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,將 sizeCtl 設(shè)置為 -1,代表?yè)尩搅随i
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 默認(rèn)初始容量是 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 初始化數(shù)組,長(zhǎng)度為 16 或初始化時(shí)提供的長(zhǎng)度
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 將這個(gè)數(shù)組賦值給 table,table 是 volatile 的
                    table = tab = nt;
                    // 如果 n 為 16 的話,那么這里 sc = 12
                    // 其實(shí)就是 0.75 * n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 設(shè)置 sizeCtl 為 sc,我們就當(dāng)是 12 吧
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}





鏈表轉(zhuǎn)為紅黑樹(shù):treeifyBin
這里需要注意:前面我們?cè)?put 源碼分析也說(shuō)過(guò),treeifyBin 不一定就會(huì)進(jìn)行紅黑樹(shù)轉(zhuǎn)換,也可能是僅僅做數(shù)組擴(kuò)容。我們還是進(jìn)行源碼分析吧。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 所以,如果數(shù)組長(zhǎng)度小于 64 的時(shí)候,其實(shí)也就是 32 或者 16 或者更小的時(shí)候,會(huì)進(jìn)行數(shù)組擴(kuò)容
            tryPresize(n << 1);
        // b 是頭結(jié)點(diǎn)
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 加鎖
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 下面就是遍歷鏈表,建立一顆紅黑樹(shù)
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 將紅黑樹(shù)設(shè)置到數(shù)組相應(yīng)位置中
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
擴(kuò)容:tryPresize
如果說(shuō) Java8 ConcurrentHashMap 的源碼不簡(jiǎn)單,那么說(shuō)的就是擴(kuò)容操作和遷移操作。

這里的擴(kuò)容也是做翻倍擴(kuò)容的,擴(kuò)容后數(shù)組容量為原來(lái)的 2 倍。

這個(gè)方法要完完全全看懂還需要看之后的 transfer 方法,讀者應(yīng)該提前知道這點(diǎn)。

// 首先要說(shuō)明的是,方法參數(shù) size 傳進(jìn)來(lái)的時(shí)候就已經(jīng)翻了倍了
private final void tryPresize(int size) {
    // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    // 目前容器大小
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 這個(gè) if 分支和之前說(shuō)的初始化數(shù)組的代碼基本上是一樣的,在這里,我們可以不用管這塊代碼
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        // 16-4=12
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 小于目前大小或者達(dá)到最大值直接返回
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 說(shuō)明是tab過(guò)程中沒(méi)有發(fā)生變化,類(lèi)似于懶加載的雙重檢查
        else if (tab == table) {
            // value = 32795
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 2. 用 CAS 將 sizeCtl 加 1,然后執(zhí)行 transfer 方法 此時(shí) nextTab 不為 null
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 1. 將 sizeCtl 設(shè)置為 (rs << RESIZE_STAMP_SHIFT) + 2)
            // 我是沒(méi)看懂這個(gè)值真正的意義是什么?不過(guò)可以計(jì)算出來(lái)的是,結(jié)果是一個(gè)比較大的負(fù)數(shù)
            // 調(diào)用 transfer 方法,此時(shí) nextTab 參數(shù)為 null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
這個(gè)方法的核心在于 sizeCtl 值的操作,首先將其設(shè)置為一個(gè)負(fù)數(shù),然后執(zhí)行 transfer(tab, null),再下一個(gè)循環(huán)將 sizeCtl 加 1,并執(zhí)行 transfer(tab, nt),之后可能是繼續(xù) sizeCtl 加 1,并執(zhí)行 transfer(tab, nt)。

所以,可能的操作就是執(zhí)行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這里怎么結(jié)束循環(huán)的需要看完 transfer 源碼才清楚。

數(shù)據(jù)遷移:transfer
下面這個(gè)方法有點(diǎn)長(zhǎng),將原來(lái)的 tab 數(shù)組的元素遷移到新的 nextTab 數(shù)組中。

雖然我們之前說(shuō)的 tryPresize 方法中多次調(diào)用 transfer 不涉及多線程,但是這個(gè) transfer 方法可以在其他地方被調(diào)用,典型地,我們之前在說(shuō) put 方法的時(shí)候就說(shuō)過(guò)了,請(qǐng)往上看 put 方法,是不是有個(gè)地方調(diào)用了 helpTransfer 方法,helpTransfer 方法會(huì)調(diào)用 transfer 方法的。

此方法支持多線程執(zhí)行,外圍調(diào)用此方法的時(shí)候,會(huì)保證第一個(gè)發(fā)起數(shù)據(jù)遷移的線程,nextTab 參數(shù)為 null,之后再調(diào)用此方法的時(shí)候,nextTab 不會(huì)為 null。

閱讀源碼之前,先要理解并發(fā)操作的機(jī)制。原數(shù)組長(zhǎng)度為 n,所以我們有 n 個(gè)遷移任務(wù),讓每個(gè)線程每次負(fù)責(zé)一個(gè)小任務(wù)是最簡(jiǎn)單的,每做完一個(gè)任務(wù)再檢測(cè)是否有其他沒(méi)做完的任務(wù),幫助遷移就可以了,而 Doug Lea 使用了一個(gè) stride,簡(jiǎn)單理解就是步長(zhǎng),每個(gè)線程每次負(fù)責(zé)遷移其中的一部分,如每次遷移 16 個(gè)小任務(wù)。所以,我們就需要一個(gè)全局的調(diào)度者來(lái)安排哪個(gè)線程執(zhí)行哪幾個(gè)任務(wù),這個(gè)就是屬性 transferIndex 的作用。

第一個(gè)發(fā)起數(shù)據(jù)遷移的線程會(huì)將 transferIndex 指向原數(shù)組最后的位置,然后從后往前的 stride 個(gè)任務(wù)屬于第一個(gè)線程,然后將 transferIndex 指向新的位置,再往前的 stride 個(gè)任務(wù)屬于第二個(gè)線程,依此類(lèi)推。當(dāng)然,這里說(shuō)的第二個(gè)線程不是真的一定指代了第二個(gè)線程,也可以是同一個(gè)線程,這個(gè)讀者應(yīng)該能理解吧。其實(shí)就是將一個(gè)大的遷移任務(wù)分為了一個(gè)個(gè)任務(wù)包。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // stride 在單核下直接等于 n,多核模式下為 (n>>>3)/NCPU,最小值是 16
    // stride 可以理解為”步長(zhǎng)“,有 n 個(gè)位置是需要進(jìn)行遷移的,
    // 將這 n 個(gè)任務(wù)分為多個(gè)任務(wù)包,每個(gè)任務(wù)包有 stride 個(gè)任務(wù)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果 nextTab 為 null,先進(jìn)行一次初始化
    // 前面我們說(shuō)了,外圍會(huì)保證第一個(gè)發(fā)起遷移的線程調(diào)用此方法時(shí),參數(shù) nextTab 為 null
    // 之后參與遷移的線程調(diào)用此方法時(shí),nextTab 不會(huì)為 null
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的屬性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的屬性,用于控制遷移的位置
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // ForwardingNode 翻譯過(guò)來(lái)就是正在被遷移的 Node
    // 這個(gè)構(gòu)造方法會(huì)生成一個(gè)Node,key、value 和 next 都為 null,關(guān)鍵是 hash 為 MOVED
    // 后面我們會(huì)看到,原數(shù)組中位置 i 處的節(jié)點(diǎn)完成遷移工作后,
    // 就會(huì)將位置 i 處設(shè)置為這個(gè) ForwardingNode,用來(lái)告訴其他線程該位置已經(jīng)處理過(guò)了
    // 所以它其實(shí)相當(dāng)于是一個(gè)標(biāo)志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // advance 指的是做完了一個(gè)位置的遷移工作,可以準(zhǔn)備做下一個(gè)位置的了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 下面這個(gè) for 循環(huán),最難理解的在前面,而要看懂它們,應(yīng)該先看懂后面的,然后再倒回來(lái)
    // i 是位置索引,bound 是邊界,注意是從后往前
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 下面這個(gè) while 真的是不好理解
        // advance 為 true 表示可以進(jìn)行下一個(gè)位置的遷移了
        // 簡(jiǎn)單理解為:i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // 將 transferIndex 值賦給 nextIndex
            // 這里 transferIndex 一旦小于等于 0,說(shuō)明原數(shù)組的所有位置都有相應(yīng)的線程去處理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括號(hào)中的代碼,nextBound 是這次遷移任務(wù)的邊界,注意,是從后往前
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有的遷移操作已經(jīng)完成
            if (finishing) {
                nextTable = null;
                // 將新的 nextTab 賦值給 table 屬性,完成遷移
                table = nextTab;
                // 重新計(jì)算 sizeCtl:n 是原數(shù)組長(zhǎng)度,所以 sizeCtl 得出的值將是新數(shù)組長(zhǎng)度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 之前我們說(shuō)過(guò),sizeCtl 在遷移前會(huì)設(shè)置為 (rs << RESIZE_STAMP_SHIFT) + 2
            // 然后,每有一個(gè)線程參與遷移就會(huì)將 sizeCtl 加 1,
            // 這里使用 CAS 操作對(duì) sizeCtl 進(jìn)行減 1,代表做完了屬于自己的任務(wù)
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任務(wù)結(jié)束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 到這里,說(shuō)明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是說(shuō),所有的遷移任務(wù)都做完了,也就會(huì)進(jìn)入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 處是空的,沒(méi)有任何節(jié)點(diǎn),那么放入剛剛初始化的 ForwardingNode ”空節(jié)點(diǎn)“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 該位置處是一個(gè) ForwardingNode,代表該位置已經(jīng)遷移過(guò)了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 對(duì)數(shù)組該位置處的結(jié)點(diǎn)加鎖,開(kāi)始處理數(shù)組該位置處的遷移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 頭結(jié)點(diǎn)的 hash 大于 0,說(shuō)明是鏈表的 Node 節(jié)點(diǎn)
                    if (fh >= 0) {
                        // 下面這一塊和 Java7 中的 ConcurrentHashMap 遷移是差不多的,
                        // 需要將鏈表一分為二,
                        // 找到原鏈表中的 lastRun,然后 lastRun 及其之后的節(jié)點(diǎn)是一起進(jìn)行遷移的
                        // lastRun 之前的節(jié)點(diǎn)需要進(jìn)行克隆,然后分到兩個(gè)鏈表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一個(gè)鏈表放在新數(shù)組的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一個(gè)鏈表放在新數(shù)組的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 將原數(shù)組該位置處設(shè)置為 fwd,代表該位置已經(jīng)處理完畢,
                        // 其他線程一旦看到該位置的 hash 值為 MOVED,就不會(huì)進(jìn)行遷移了
                        setTabAt(tab, i, fwd);
                        // advance 設(shè)置為 true,代表該位置已經(jīng)遷移完畢
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 紅黑樹(shù)的遷移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分為二后,節(jié)點(diǎn)數(shù)少于 8,那么將紅黑樹(shù)轉(zhuǎn)換回鏈表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 將 ln 放置在新數(shù)組的位置 i
                        setTabAt(nextTab, i, ln);
                        // 將 hn 放置在新數(shù)組的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 將原數(shù)組該位置處設(shè)置為 fwd,代表該位置已經(jīng)處理完畢,
                        // 其他線程一旦看到該位置的 hash 值為 MOVED,就不會(huì)進(jìn)行遷移了
                        setTabAt(tab, i, fwd);
                        // advance 設(shè)置為 true,代表該位置已經(jīng)遷移完畢
                        advance = true;
                    }
                }
            }
        }
    }
}
說(shuō)到底,transfer 這個(gè)方法并沒(méi)有實(shí)現(xiàn)所有的遷移任務(wù),每次調(diào)用這個(gè)方法只實(shí)現(xiàn)了 transferIndex 往前 stride 個(gè)位置的遷移工作,其他的需要由外圍來(lái)控制。

這個(gè)時(shí)候,再回去仔細(xì)看 tryPresize 方法可能就會(huì)更加清晰一些了。

總結(jié)
今天我從線程安全問(wèn)題開(kāi)始,分析為什么要使用 ConcurrentHashMap,進(jìn)而分析了 Java 7 和 Java 8 中 ConcurrentHashMap 是如何設(shè)計(jì)實(shí)現(xiàn)的,從源碼層面說(shuō)明白了具體的實(shí)現(xiàn)邏輯。其實(shí)仔細(xì)認(rèn)真讀懂后你會(huì)發(fā)現(xiàn)其實(shí)也不是太難。

希望本文讓你對(duì) ConcurrentHashMap 面試相關(guān)問(wèn)題輕松的應(yīng)對(duì),同時(shí)作為并發(fā)編程技巧對(duì)你在日常開(kāi)發(fā)可以有所幫助。

水平有限,文章難免會(huì)有紕漏,如有錯(cuò)誤歡迎一起交流探討,我會(huì)第一時(shí)間更正的。

參考資料:

周志明:《深入理解 Java 虛擬機(jī)》
方騰飛:《Java 并發(fā)編程的藝術(shù)》
https://www.javadoop.com/

作者:七哥


公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱(chēng)