CurrentHashMap如何實現(xiàn)高效地線程安全

以下文章來源于七哥聊編程 ,作者七哥

大家好,我是頭發(fā)已經(jīng)不多七哥(最近找工作復習壓力有點大哦??)。

今天的主角就是 java 并發(fā)包中提供的 CurrentHashMap 這是一個線程安全且高效的HashMap ,也是面試的高頻考點。



下面將圍繞主題:ConcurrentHashMap 如何實現(xiàn)高效地線程安全?以及在Java8中它從設(shè)計實現(xiàn)上有哪些演進?

這篇文章一開始我以為會比較簡單,但是在深入源碼分析時,遇到了很大的阻礙,比前面我們分析AQS以及讀寫鎖的源碼要難理解的多,斷斷續(xù)續(xù)也寫了4天了。如果你看完還是沒有理解的話,那我在這里表示深深的歉意,同時也歡迎你和我一起溝通。

網(wǎng)上關(guān)于 HashMap 和 ConcurrentHashMap 的文章確實不少,不過目前的很多分析資料還是基于其早期版本,所以才想自己也寫一篇,把細節(jié)說清楚說透,尤其像 Java8 中的 ConcurrentHashMap 的演進設(shè)計實現(xiàn),大部分文章都說不清楚。希望能降低大家學習的成本,不希望大家看了一篇又一篇文章,最終還是模模糊糊。

閱讀前提:

本文會涉及源碼分析,所以至少讀者要熟悉它們的接口使用,同時,對于并發(fā),讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。

為什么需要 ConcurrentHashMap?
在并發(fā)編程中使用HashMap可能導致程序死循環(huán)。而使用線程安全的HashTable效率又非常低下(它的實現(xiàn)就是將put、get、size等方法加上 synchronized 關(guān)鍵字),基于以上兩個原因,便有了ConcurrentHashMap的登場機會。

可能有的同學對 HashMap 為什么會在并發(fā)中出現(xiàn)死循環(huán)從而導致 cpu 占用達到100% 不太了解,這里直接展示一段示例代碼,運行它就會出現(xiàn)死循環(huán)。

static final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < 100000; i++) {
            int finalI1 = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    map.put(String.valueOf(finalI1), "");
                }
            }, "ftf" + i).start();
        }
    }
}, "ftf");
t.start();
t.join();
記著在 jdk1.7 及之前的版本測試(1.8 已經(jīng)解決了死循環(huán)問題),死循環(huán)的概率還是非常低的,比較難以重現(xiàn)。為了提高出現(xiàn)概率,采用多次迭代測試,我在測試時出現(xiàn)在 128次。

感興趣的同學可以用 jstack 分析下,網(wǎng)上有很多教程,這里就不展開 排查過程了。原因就是:HashMap 在并發(fā)執(zhí)行 put 操作時會引起死循環(huán),因為多線程會導致 HashMap 擴容時 Entry 鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),一旦形成環(huán)形數(shù)據(jù)結(jié)構(gòu),Entry 的 next 節(jié)點永遠不為空,就會產(chǎn)生死循環(huán)獲 取 Entry 。從而導致CPU占用將近100%。

Java7中ConcurrentHashMap分析
首先,我這里強調(diào),ConcurrentHashMap 的設(shè)計實現(xiàn)其實一直在演化,比如在 Java 8 中就發(fā)生了非常大的變化(Java 7 其實也有不少更新),所以,我這里將比較分析結(jié)構(gòu)、實現(xiàn)機制等方面,對比不同版本的主要區(qū)別。

在 Java7 中的實現(xiàn)是基于:

分離鎖,也就是將內(nèi)部進行分段(Segment),里面則是 HashEntry 的數(shù)組,和 HashMap 類似,哈希相同的條目也是以鏈表形式存放。
HashEntry 內(nèi)部使用 volatile 的 value 字段來保證可見性,也利用了不可變對象的機制以改進利用 Unsafe 提供的底層能力,比如 volatile access,去直接完成部分操作,以最優(yōu)化性能,畢竟 Unsafe 中的很多操作都是 JVM intrinsic 優(yōu)化過的。
具體實現(xiàn)可以理解為:ConcurrentHashMap 是由 Segment 數(shù)組結(jié)構(gòu)和 HashEntry 數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖(繼承了ReentrantLock),在ConcurrentHashMap里扮演鎖的角色;HashEntry 則用于存儲鍵值對數(shù)據(jù)。一個 ConcurrentHashMap 里包含一個 Segment 數(shù)組。Segment 的結(jié)構(gòu)和 HashMap 類似,是一種 數(shù)組和鏈表結(jié)構(gòu)。一個 Segment 里包含一個 HashEntry 數(shù)組,每個 HashEntry 是一個鏈表結(jié)構(gòu)的元 素,每個Segment 守護著一個 HashEntry 數(shù)組里的元素,當對 HashEntry 數(shù)組的數(shù)據(jù)進行修改時, 必須首先獲得與它對應(yīng)的Segment鎖。



初始化
在構(gòu)造的時候,Segment 的數(shù)量由所謂的 concurrentcyLevel 決定,默認是 16,所以理論上,這個時候,最多可以同時支持 16 個線程并發(fā)寫,只要它們的操作分別分布在不同的 Segment 上。也可以在相應(yīng)構(gòu)造函數(shù)直接指定。注意,Java 需要它是 2 的冪數(shù)值,如果輸入是類似 15 這種非冪值,會被自動調(diào)整到 16 之類 2 的冪數(shù)值。并且一旦初始化后,它是不可以擴容的。

ConcurrentHashMap 初始化方法是通過 initialCapacity 、loadFactor 和 concurrencyLevel 等幾個參數(shù)來初始化segment數(shù)組、段偏移量 segmentShift 、段掩碼 segmentMask 和每個 segment 里的 HashEntry 數(shù)組來實現(xiàn)的。

下面結(jié)合源代碼一起來看下,為方便理解,我直接注釋在代碼段里:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計算并行級別 ssize,因為要保持并行級別是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我們這里先不要那么燒腦,用默認值,concurrencyLevel 為 16,sshift 為 4
    // 那么計算出 segmentShift 為 28,segmentMask 為 15,后面會用到這兩個值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是設(shè)置整個 map 初始的大小,
    // 這里根據(jù) initialCapacity 計算 Segment 數(shù)組中每個位置可以分到的大小
    // 如 initialCapacity 為 64,那么每個 Segment 可以分到 4 個
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對于具體的槽上,插入一個元素不至于擴容,插入第二個的時候才會擴容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;

    // 創(chuàng)建 Segment 數(shù)組,
    // 并創(chuàng)建數(shù)組的第一個元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往數(shù)組寫入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
初始化完成,我們得到了一個 Segment 數(shù)組。這里之所以 segments 數(shù)組的長度必須是2的N次冪,主要是為了能通過按位與的散列算法來定位 segments 數(shù)組的索引。

注意:concurrencyLevel 的最大值是65535,這意味著 segments 數(shù)組的長度最大為65536, 對應(yīng)的二進制是16位。

為了加深讀者理解,下面來分析下,當我們用 new ConcurrentHashMap() 無參構(gòu)造函數(shù)進行初始化的,那么初始化完成后:

Segment 數(shù)組長度為 16,不可以擴容
Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個元素不會觸發(fā)擴容,插入第二個會進行第一次擴容
這里初始化了 segment[0],其他位置還是 null,至于為什么要初始化 segment[0],后面的代碼會介紹
當前段偏移量 segmentShift 的值為 32 - 4 = 28,段掩碼  segmentMask 為 16 - 1 = 15,這兩個值馬上就會用到
get 操作
get 操作需要保證的是可見性,所以并沒有什么同步邏輯。

計算 hash 值,找到 segment 數(shù)組中的具體位置
segment 中也是一個數(shù)組(HashEntry數(shù)組),根據(jù) hash 找到數(shù)組中具體的位置
到這里是鏈表了,HashEntry 是鏈表中的元素,順著鏈表進行查找即可
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值,32位
    int h = hash(key);
    // 利用位操作替換普通數(shù)學運算,將hash值無符號左移段偏移量位,即取高四位,在與上段掩碼(15二進制位1111)
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據(jù) hash 找到對應(yīng)的 segment,利用Unsafe直接進行volatile access
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內(nèi)部數(shù)組相應(yīng)位置的鏈表,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
put操作
對于 put 操作,首先是通過二次哈希避免哈希沖突,然后以 Unsafe 調(diào)用方式,直接獲取相應(yīng)的 Segment,然后進行線程安全的 put 操作:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 二次哈希,以保證數(shù)據(jù)的分散性,避免哈希沖突
   int hash = hash(key.hashCode());
    // 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j
    //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位,
    //    然后和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是segment的數(shù)組下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}
其核心邏輯實現(xiàn)在下面的內(nèi)部方法中:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨占鎖
    //    先看主流程,后面還會具體介紹這部分內(nèi)容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個是 segment 內(nèi)部的數(shù)組
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應(yīng)該放置的數(shù)組下標
        int index = (tab.length - 1) & hash;
        // first 是數(shù)組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);

        // 下面這串 for 循環(huán)雖然很長,不過也很好理解,想象當前位置鏈表不為空則先遍歷找是否存在,如果存在則覆蓋,否則放到合適的位置
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續(xù)順著鏈表走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個要看獲取鎖的過程,不過和這里都沒有關(guān)系。
                // 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是null,初始化并設(shè)置為鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容后面也會具體分析
                else
                    // 沒有達到閾值,將 node 放到數(shù)組 tab 的 index 位置,
                    // 其實就是將新的節(jié)點設(shè)置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}





rehash:擴容操作
重復一下,segment 數(shù)組不能擴容,擴容是 segment 數(shù)組某個位置內(nèi)部的數(shù)組 HashEntry<K,V>[] 進行擴容,擴容后,容量為原來的 2 倍。

首先,我們要回顧一下觸發(fā)擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數(shù)超過閾值,那么先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。

該方法不需要考慮并發(fā),因為到這里的時候,是持有該 segment 的獨占鎖的。

// 方法參數(shù)上的 node 是這次擴容后,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創(chuàng)建新數(shù)組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那么 sizeMask 為 31,對應(yīng)二進制 ‘000...00011111’
    int sizeMask = newCapacity - 1;

    // 遍歷原數(shù)組,老套路,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應(yīng)該放置在新數(shù)組中的位置,
            // 假設(shè)原數(shù)組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            // 該位置處只有一個元素,那比較好辦,直接放到新數(shù)組中對應(yīng)的位置
            if (next == null)   
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前鏈表的頭結(jié)點 e 的新位置
                int lastIdx = idx;

                // 下面這個 for 循環(huán)會找到一個 lastRun 節(jié)點,這個節(jié)點之后的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之后的所有節(jié)點組成的這個鏈表放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的節(jié)點,
                //    這些節(jié)點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數(shù)組中剛剛的 兩個鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
上面有兩個挨著的 for 循環(huán),第一個 for 有什么用呢?

這塊代碼我看的時候真的是很難理解,反復看了好幾遍,主要原因還是對鏈表操作不太熟悉,這里為大家在解釋下,幫助理解。這里需要進行第一個 for 循環(huán),主要是因為擴容后,原來數(shù)組位置 i 的 HashEntry 是一個鏈表,那么這個鏈表的元素對應(yīng)擴容后的數(shù)組位置必然是 i 或 i+oldCap。第一個循環(huán)就是為遍歷當前位置 i 的鏈表找到最后一個在新數(shù)組中位置相同的節(jié)點 lastRun。

如果沒有第一個 for 循環(huán),也是可以工作的,但是,這個 for 循環(huán)下來,如果 lastRun 的后面還有比較多的節(jié)點,那么這次就是值得的。因為我們只需要克隆 lastRun 前面的節(jié)點,后面的一串節(jié)點跟著 lastRun 進行賦值就可以了,不需要做任何操作。

Doug Lea 大神這塊的想法一般人可能是想不到的,畢竟作為并發(fā)包中的基礎(chǔ)類 都是為了將并發(fā)性能做到極致的。但是也有最差的情況,就是找到的 lastRun 是鏈表的最后一個元素,或者排在倒數(shù),那么這次遍歷就顯得多余了,而且浪費了性能。不過 Doug Lea 也說了,根據(jù)統(tǒng)計,如果使用默認的閾值,大約只有 1/6 的節(jié)點需要克隆。

size 操作
知道了 ConcurrentHashMap 通過分段鎖實現(xiàn)高性能且線程安全的原理。試想,如果不進行同步,簡單的計算所有 Segment 的總值,可能會因為并發(fā) put,導致結(jié)果不準確,但是直接鎖定所有 Segment 進行計算,就會變得非常昂貴。

所以,ConcurrentHashMap 的實現(xiàn)是通過重試機制(RETRIES_BEFORE_LOCK,指定重試次數(shù) 2),來試圖獲得可靠值。如果沒有監(jiān)控到發(fā)生變化(通過對比 Segment.modCount),就直接返回,否則獲取鎖進行操作。

Java8中ConcurrentHashMap分析
在 Java 8 和之后的版本中,ConcurrentHashMap 發(fā)生了哪些變化呢?

Java8 對 HashMap 進行了一些修改,最大的不同就是利用了紅黑樹,所以其由 數(shù)組+鏈表+紅黑樹 組成。

因為不再使用 Segment,初始化操作大大簡化,修改為 lazy-load 形式,這樣可以有效避免初始開銷,解決了老版本很多人抱怨的這一點。

數(shù)據(jù)存儲利用 volatile 來保證可見性。

使用 CAS 等操作,在特定場景進行無鎖并發(fā)操作。

這里介紹一個最常問的問題:Java8 為什么使用紅黑樹呢?

根據(jù) Java7 HashMap 的介紹,我們知道,查找的時候,根據(jù) hash 值我們能夠快速定位到數(shù)組的具體下標,但是之后的話,需要順著鏈表一個個比較下去才能找到我們需要的,時間復雜度取決于鏈表的長度,為 O(n)。

為了降低這部分的開銷,在 Java8 中,當鏈表中的元素達到了 8 個時,會將鏈表轉(zhuǎn)換為紅黑樹,在這些位置進行查找的時候可以降低時間復雜度為 O(logN)。



注意,上圖是示意圖,主要是描述結(jié)構(gòu),不會達到這個狀態(tài)的,因為這么多數(shù)據(jù)的時候早就擴容了。

Java7 中使用 HashEntry 來代表每個 HashMap 中的數(shù)據(jù)節(jié)點,Java8 中使用 Node,基本沒有區(qū)別,都是 key,value,hash 和 next 這四個屬性,不過,Node 只能用于鏈表的情況,紅黑樹的情況需要使用 TreeNode。

先看看現(xiàn)在的數(shù)據(jù)存儲內(nèi)部實現(xiàn),我們可以發(fā)現(xiàn) Key 是 final 的,因為在生命周期中,一個條目的 Key 發(fā)生變化是不可能的;與此同時 val,則聲明為 volatile,以保證可見性。

static class Node<K,V> implements Map.Entry<K,V> {
 final int hash;
 final K key;
 volatile V val;
 volatile Node<K,V> next;
 // …
}
為了提高大家的閱讀體驗,我這里就不再介紹 get 方法和構(gòu)造函數(shù)了,相對比較簡單,相信你如果看懂了 Java7 的實現(xiàn)一定沒有啥問題的。直接看并發(fā)的 put 是如何實現(xiàn)的。

put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    // 用于記錄相應(yīng)鏈表的長度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果數(shù)組"空",進行數(shù)組初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化數(shù)組
            tab = initTable();

        // 找該 hash 值對應(yīng)的數(shù)組下標,得到第一個節(jié)點 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果數(shù)組該位置為空,
            // 用一次 CAS 操作將這個新值放入其中即可,這個 put 操作差不多就結(jié)束了,可以拉到最后面
            // 如果 CAS 失敗,那就是有并發(fā)操作,進到下一個循環(huán)就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等于 MOVED,這個需要到后面才能看明白,不過從名字上也能猜到,肯定是因為在擴容
        else if ((fh = f.hash) == MOVED)
            // 幫助數(shù)據(jù)遷移,這個等到看完數(shù)據(jù)遷移部分的介紹后,再理解這個就很簡單了
            tab = helpTransfer(tab, f);

        else { // 到這里就是說,f 是該位置的頭結(jié)點,而且不為空

            V oldVal = null;
            // 獲取數(shù)組該位置的頭結(jié)點的監(jiān)視器鎖
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 頭結(jié)點的 hash 值大于 0,說明是鏈表
                        // 用于累加,記錄鏈表的長度
                        binCount = 1;
                        // 遍歷鏈表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果發(fā)現(xiàn)了"相等"的 key,判斷是否要進行值覆蓋,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了鏈表的最末端,將這個新值放到鏈表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 紅黑樹
                        Node<K,V> p;
                        binCount = 2;
                        // 調(diào)用紅黑樹的插值方法插入新節(jié)點
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判斷是否要將鏈表轉(zhuǎn)換為紅黑樹,臨界值和 HashMap 一樣,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 這個方法和 HashMap 中稍微有一點點不同,那就是它不是一定會進行紅黑樹轉(zhuǎn)換,
                    // 如果當前數(shù)組的長度小于 64,那么會選擇進行數(shù)組擴容,而不是轉(zhuǎn)換為紅黑樹
                    // 具體源碼我們就不看了,擴容部分后面說
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //
    addCount(1L, binCount);
    return null;
}
put 的主流程看完了,但是至少留下了幾個問題,第一個是初始化,第二個是擴容,第三個是幫助數(shù)據(jù)遷移,這些我們都會在后面進行一一介紹。

初始化數(shù)組:initTable
從上面的 put 操作可以看到,數(shù)組初始化是在 put 操作時進行的,采用的 lazy-load 形式。

這個比較簡單,主要就是初始化一個合適大小的數(shù)組,然后會設(shè)置 sizeCtl。

初始化方法中的并發(fā)問題是通過對 sizeCtl 進行一個 CAS 操作來控制的。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 初始化的"功勞"被其他線程"搶去"了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,將 sizeCtl 設(shè)置為 -1,代表搶到了鎖
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 默認初始容量是 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 初始化數(shù)組,長度為 16 或初始化時提供的長度
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 將這個數(shù)組賦值給 table,table 是 volatile 的
                    table = tab = nt;
                    // 如果 n 為 16 的話,那么這里 sc = 12
                    // 其實就是 0.75 * n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 設(shè)置 sizeCtl 為 sc,我們就當是 12 吧
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}





鏈表轉(zhuǎn)為紅黑樹:treeifyBin
這里需要注意:前面我們在 put 源碼分析也說過,treeifyBin 不一定就會進行紅黑樹轉(zhuǎn)換,也可能是僅僅做數(shù)組擴容。我們還是進行源碼分析吧。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 所以,如果數(shù)組長度小于 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行數(shù)組擴容
            tryPresize(n << 1);
        // b 是頭結(jié)點
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 加鎖
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 下面就是遍歷鏈表,建立一顆紅黑樹
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 將紅黑樹設(shè)置到數(shù)組相應(yīng)位置中
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
擴容:tryPresize
如果說 Java8 ConcurrentHashMap 的源碼不簡單,那么說的就是擴容操作和遷移操作。

這里的擴容也是做翻倍擴容的,擴容后數(shù)組容量為原來的 2 倍。

這個方法要完完全全看懂還需要看之后的 transfer 方法,讀者應(yīng)該提前知道這點。

// 首先要說明的是,方法參數(shù) size 傳進來的時候就已經(jīng)翻了倍了
private final void tryPresize(int size) {
    // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    // 目前容器大小
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 這個 if 分支和之前說的初始化數(shù)組的代碼基本上是一樣的,在這里,我們可以不用管這塊代碼
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        // 16-4=12
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 小于目前大小或者達到最大值直接返回
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 說明是tab過程中沒有發(fā)生變化,類似于懶加載的雙重檢查
        else if (tab == table) {
            // value = 32795
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 2. 用 CAS 將 sizeCtl 加 1,然后執(zhí)行 transfer 方法 此時 nextTab 不為 null
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 1. 將 sizeCtl 設(shè)置為 (rs << RESIZE_STAMP_SHIFT) + 2)
            // 我是沒看懂這個值真正的意義是什么?不過可以計算出來的是,結(jié)果是一個比較大的負數(shù)
            // 調(diào)用 transfer 方法,此時 nextTab 參數(shù)為 null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
這個方法的核心在于 sizeCtl 值的操作,首先將其設(shè)置為一個負數(shù),然后執(zhí)行 transfer(tab, null),再下一個循環(huán)將 sizeCtl 加 1,并執(zhí)行 transfer(tab, nt),之后可能是繼續(xù) sizeCtl 加 1,并執(zhí)行 transfer(tab, nt)。

所以,可能的操作就是執(zhí)行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這里怎么結(jié)束循環(huán)的需要看完 transfer 源碼才清楚。

數(shù)據(jù)遷移:transfer
下面這個方法有點長,將原來的 tab 數(shù)組的元素遷移到新的 nextTab 數(shù)組中。

雖然我們之前說的 tryPresize 方法中多次調(diào)用 transfer 不涉及多線程,但是這個 transfer 方法可以在其他地方被調(diào)用,典型地,我們之前在說 put 方法的時候就說過了,請往上看 put 方法,是不是有個地方調(diào)用了 helpTransfer 方法,helpTransfer 方法會調(diào)用 transfer 方法的。

此方法支持多線程執(zhí)行,外圍調(diào)用此方法的時候,會保證第一個發(fā)起數(shù)據(jù)遷移的線程,nextTab 參數(shù)為 null,之后再調(diào)用此方法的時候,nextTab 不會為 null。

閱讀源碼之前,先要理解并發(fā)操作的機制。原數(shù)組長度為 n,所以我們有 n 個遷移任務(wù),讓每個線程每次負責一個小任務(wù)是最簡單的,每做完一個任務(wù)再檢測是否有其他沒做完的任務(wù),幫助遷移就可以了,而 Doug Lea 使用了一個 stride,簡單理解就是步長,每個線程每次負責遷移其中的一部分,如每次遷移 16 個小任務(wù)。所以,我們就需要一個全局的調(diào)度者來安排哪個線程執(zhí)行哪幾個任務(wù),這個就是屬性 transferIndex 的作用。

第一個發(fā)起數(shù)據(jù)遷移的線程會將 transferIndex 指向原數(shù)組最后的位置,然后從后往前的 stride 個任務(wù)屬于第一個線程,然后將 transferIndex 指向新的位置,再往前的 stride 個任務(wù)屬于第二個線程,依此類推。當然,這里說的第二個線程不是真的一定指代了第二個線程,也可以是同一個線程,這個讀者應(yīng)該能理解吧。其實就是將一個大的遷移任務(wù)分為了一個個任務(wù)包。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // stride 在單核下直接等于 n,多核模式下為 (n>>>3)/NCPU,最小值是 16
    // stride 可以理解為”步長“,有 n 個位置是需要進行遷移的,
    // 將這 n 個任務(wù)分為多個任務(wù)包,每個任務(wù)包有 stride 個任務(wù)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果 nextTab 為 null,先進行一次初始化
    // 前面我們說了,外圍會保證第一個發(fā)起遷移的線程調(diào)用此方法時,參數(shù) nextTab 為 null
    // 之后參與遷移的線程調(diào)用此方法時,nextTab 不會為 null
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的屬性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的屬性,用于控制遷移的位置
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // ForwardingNode 翻譯過來就是正在被遷移的 Node
    // 這個構(gòu)造方法會生成一個Node,key、value 和 next 都為 null,關(guān)鍵是 hash 為 MOVED
    // 后面我們會看到,原數(shù)組中位置 i 處的節(jié)點完成遷移工作后,
    // 就會將位置 i 處設(shè)置為這個 ForwardingNode,用來告訴其他線程該位置已經(jīng)處理過了
    // 所以它其實相當于是一個標志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // advance 指的是做完了一個位置的遷移工作,可以準備做下一個位置的了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 下面這個 for 循環(huán),最難理解的在前面,而要看懂它們,應(yīng)該先看懂后面的,然后再倒回來
    // i 是位置索引,bound 是邊界,注意是從后往前
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 下面這個 while 真的是不好理解
        // advance 為 true 表示可以進行下一個位置的遷移了
        // 簡單理解為:i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // 將 transferIndex 值賦給 nextIndex
            // 這里 transferIndex 一旦小于等于 0,說明原數(shù)組的所有位置都有相應(yīng)的線程去處理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括號中的代碼,nextBound 是這次遷移任務(wù)的邊界,注意,是從后往前
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有的遷移操作已經(jīng)完成
            if (finishing) {
                nextTable = null;
                // 將新的 nextTab 賦值給 table 屬性,完成遷移
                table = nextTab;
                // 重新計算 sizeCtl:n 是原數(shù)組長度,所以 sizeCtl 得出的值將是新數(shù)組長度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 之前我們說過,sizeCtl 在遷移前會設(shè)置為 (rs << RESIZE_STAMP_SHIFT) + 2
            // 然后,每有一個線程參與遷移就會將 sizeCtl 加 1,
            // 這里使用 CAS 操作對 sizeCtl 進行減 1,代表做完了屬于自己的任務(wù)
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任務(wù)結(jié)束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 到這里,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是說,所有的遷移任務(wù)都做完了,也就會進入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 處是空的,沒有任何節(jié)點,那么放入剛剛初始化的 ForwardingNode ”空節(jié)點“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 該位置處是一個 ForwardingNode,代表該位置已經(jīng)遷移過了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 對數(shù)組該位置處的結(jié)點加鎖,開始處理數(shù)組該位置處的遷移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 頭結(jié)點的 hash 大于 0,說明是鏈表的 Node 節(jié)點
                    if (fh >= 0) {
                        // 下面這一塊和 Java7 中的 ConcurrentHashMap 遷移是差不多的,
                        // 需要將鏈表一分為二,
                        // 找到原鏈表中的 lastRun,然后 lastRun 及其之后的節(jié)點是一起進行遷移的
                        // lastRun 之前的節(jié)點需要進行克隆,然后分到兩個鏈表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一個鏈表放在新數(shù)組的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一個鏈表放在新數(shù)組的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 將原數(shù)組該位置處設(shè)置為 fwd,代表該位置已經(jīng)處理完畢,
                        // 其他線程一旦看到該位置的 hash 值為 MOVED,就不會進行遷移了
                        setTabAt(tab, i, fwd);
                        // advance 設(shè)置為 true,代表該位置已經(jīng)遷移完畢
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 紅黑樹的遷移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分為二后,節(jié)點數(shù)少于 8,那么將紅黑樹轉(zhuǎn)換回鏈表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 將 ln 放置在新數(shù)組的位置 i
                        setTabAt(nextTab, i, ln);
                        // 將 hn 放置在新數(shù)組的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 將原數(shù)組該位置處設(shè)置為 fwd,代表該位置已經(jīng)處理完畢,
                        // 其他線程一旦看到該位置的 hash 值為 MOVED,就不會進行遷移了
                        setTabAt(tab, i, fwd);
                        // advance 設(shè)置為 true,代表該位置已經(jīng)遷移完畢
                        advance = true;
                    }
                }
            }
        }
    }
}
說到底,transfer 這個方法并沒有實現(xiàn)所有的遷移任務(wù),每次調(diào)用這個方法只實現(xiàn)了 transferIndex 往前 stride 個位置的遷移工作,其他的需要由外圍來控制。

這個時候,再回去仔細看 tryPresize 方法可能就會更加清晰一些了。

總結(jié)
今天我從線程安全問題開始,分析為什么要使用 ConcurrentHashMap,進而分析了 Java 7 和 Java 8 中 ConcurrentHashMap 是如何設(shè)計實現(xiàn)的,從源碼層面說明白了具體的實現(xiàn)邏輯。其實仔細認真讀懂后你會發(fā)現(xiàn)其實也不是太難。

希望本文讓你對 ConcurrentHashMap 面試相關(guān)問題輕松的應(yīng)對,同時作為并發(fā)編程技巧對你在日常開發(fā)可以有所幫助。

水平有限,文章難免會有紕漏,如有錯誤歡迎一起交流探討,我會第一時間更正的。

參考資料:

周志明:《深入理解 Java 虛擬機》
方騰飛:《Java 并發(fā)編程的藝術(shù)》
https://www.javadoop.com/

作者:七哥


公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱