CurrentHashMap如何實(shí)現(xiàn)高效地線程安全

以下文章來源于七哥聊編程 ,作者七哥

大家好,我是頭發(fā)已經(jīng)不多七哥(最近找工作復(fù)習(xí)壓力有點(diǎn)大哦??)。

今天的主角就是 java 并發(fā)包中提供的 CurrentHashMap 這是一個(gè)線程安全且高效的HashMap ,也是面試的高頻考點(diǎn)。



下面將圍繞主題:ConcurrentHashMap 如何實(shí)現(xiàn)高效地線程安全?以及在Java8中它從設(shè)計(jì)實(shí)現(xiàn)上有哪些演進(jìn)?

這篇文章一開始我以為會比較簡單,但是在深入源碼分析時(shí),遇到了很大的阻礙,比前面我們分析AQS以及讀寫鎖的源碼要難理解的多,斷斷續(xù)續(xù)也寫了4天了。如果你看完還是沒有理解的話,那我在這里表示深深的歉意,同時(shí)也歡迎你和我一起溝通。

網(wǎng)上關(guān)于 HashMap 和 ConcurrentHashMap 的文章確實(shí)不少,不過目前的很多分析資料還是基于其早期版本,所以才想自己也寫一篇,把細(xì)節(jié)說清楚說透,尤其像 Java8 中的 ConcurrentHashMap 的演進(jìn)設(shè)計(jì)實(shí)現(xiàn),大部分文章都說不清楚。希望能降低大家學(xué)習(xí)的成本,不希望大家看了一篇又一篇文章,最終還是模模糊糊。

閱讀前提:

本文會涉及源碼分析,所以至少讀者要熟悉它們的接口使用,同時(shí),對于并發(fā),讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個(gè)基本的知識,文中不會對這些知識進(jìn)行介紹。

為什么需要 ConcurrentHashMap?
在并發(fā)編程中使用HashMap可能導(dǎo)致程序死循環(huán)。而使用線程安全的HashTable效率又非常低下(它的實(shí)現(xiàn)就是將put、get、size等方法加上 synchronized 關(guān)鍵字),基于以上兩個(gè)原因,便有了ConcurrentHashMap的登場機(jī)會。

可能有的同學(xué)對 HashMap 為什么會在并發(fā)中出現(xiàn)死循環(huán)從而導(dǎo)致 cpu 占用達(dá)到100% 不太了解,這里直接展示一段示例代碼,運(yùn)行它就會出現(xiàn)死循環(huán)。

static final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < 100000; i++) {
            int finalI1 = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    map.put(String.valueOf(finalI1), "");
                }
            }, "ftf" + i).start();
        }
    }
}, "ftf");
t.start();
t.join();
記著在 jdk1.7 及之前的版本測試(1.8 已經(jīng)解決了死循環(huán)問題),死循環(huán)的概率還是非常低的,比較難以重現(xiàn)。為了提高出現(xiàn)概率,采用多次迭代測試,我在測試時(shí)出現(xiàn)在 128次。

感興趣的同學(xué)可以用 jstack 分析下,網(wǎng)上有很多教程,這里就不展開 排查過程了。原因就是:HashMap 在并發(fā)執(zhí)行 put 操作時(shí)會引起死循環(huán),因?yàn)槎嗑€程會導(dǎo)致 HashMap 擴(kuò)容時(shí) Entry 鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),一旦形成環(huán)形數(shù)據(jù)結(jié)構(gòu),Entry 的 next 節(jié)點(diǎn)永遠(yuǎn)不為空,就會產(chǎn)生死循環(huán)獲 取 Entry 。從而導(dǎo)致CPU占用將近100%。

Java7中ConcurrentHashMap分析
首先,我這里強(qiáng)調(diào),ConcurrentHashMap 的設(shè)計(jì)實(shí)現(xiàn)其實(shí)一直在演化,比如在 Java 8 中就發(fā)生了非常大的變化(Java 7 其實(shí)也有不少更新),所以,我這里將比較分析結(jié)構(gòu)、實(shí)現(xiàn)機(jī)制等方面,對比不同版本的主要區(qū)別。

在 Java7 中的實(shí)現(xiàn)是基于:

分離鎖,也就是將內(nèi)部進(jìn)行分段(Segment),里面則是 HashEntry 的數(shù)組,和 HashMap 類似,哈希相同的條目也是以鏈表形式存放。
HashEntry 內(nèi)部使用 volatile 的 value 字段來保證可見性,也利用了不可變對象的機(jī)制以改進(jìn)利用 Unsafe 提供的底層能力,比如 volatile access,去直接完成部分操作,以最優(yōu)化性能,畢竟 Unsafe 中的很多操作都是 JVM intrinsic 優(yōu)化過的。
具體實(shí)現(xiàn)可以理解為:ConcurrentHashMap 是由 Segment 數(shù)組結(jié)構(gòu)和 HashEntry 數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖(繼承了ReentrantLock),在ConcurrentHashMap里扮演鎖的角色;HashEntry 則用于存儲鍵值對數(shù)據(jù)。一個(gè) ConcurrentHashMap 里包含一個(gè) Segment 數(shù)組。Segment 的結(jié)構(gòu)和 HashMap 類似,是一種 數(shù)組和鏈表結(jié)構(gòu)。一個(gè) Segment 里包含一個(gè) HashEntry 數(shù)組,每個(gè) HashEntry 是一個(gè)鏈表結(jié)構(gòu)的元 素,每個(gè)Segment 守護(hù)著一個(gè) HashEntry 數(shù)組里的元素,當(dāng)對 HashEntry 數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí), 必須首先獲得與它對應(yīng)的Segment鎖。



初始化
在構(gòu)造的時(shí)候,Segment 的數(shù)量由所謂的 concurrentcyLevel 決定,默認(rèn)是 16,所以理論上,這個(gè)時(shí)候,最多可以同時(shí)支持 16 個(gè)線程并發(fā)寫,只要它們的操作分別分布在不同的 Segment 上。也可以在相應(yīng)構(gòu)造函數(shù)直接指定。注意,Java 需要它是 2 的冪數(shù)值,如果輸入是類似 15 這種非冪值,會被自動調(diào)整到 16 之類 2 的冪數(shù)值。并且一旦初始化后,它是不可以擴(kuò)容的。

ConcurrentHashMap 初始化方法是通過 initialCapacity 、loadFactor 和 concurrencyLevel 等幾個(gè)參數(shù)來初始化segment數(shù)組、段偏移量 segmentShift 、段掩碼 segmentMask 和每個(gè) segment 里的 HashEntry 數(shù)組來實(shí)現(xiàn)的。

下面結(jié)合源代碼一起來看下,為方便理解,我直接注釋在代碼段里:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計(jì)算并行級別 ssize,因?yàn)橐3植⑿屑墑e是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我們這里先不要那么燒腦,用默認(rèn)值,concurrencyLevel 為 16,sshift 為 4
    // 那么計(jì)算出 segmentShift 為 28,segmentMask 為 15,后面會用到這兩個(gè)值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是設(shè)置整個(gè) map 初始的大小,
    // 這里根據(jù) initialCapacity 計(jì)算 Segment 數(shù)組中每個(gè)位置可以分到的大小
    // 如 initialCapacity 為 64,那么每個(gè) Segment 可以分到 4 個(gè)
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默認(rèn) MIN_SEGMENT_TABLE_CAPACITY 是 2,這個(gè)值也是有講究的,因?yàn)檫@樣的話,對于具體的槽上,插入一個(gè)元素不至于擴(kuò)容,插入第二個(gè)的時(shí)候才會擴(kuò)容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;

    // 創(chuàng)建 Segment 數(shù)組,
    // 并創(chuàng)建數(shù)組的第一個(gè)元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往數(shù)組寫入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
初始化完成,我們得到了一個(gè) Segment 數(shù)組。這里之所以 segments 數(shù)組的長度必須是2的N次冪,主要是為了能通過按位與的散列算法來定位 segments 數(shù)組的索引。

注意:concurrencyLevel 的最大值是65535,這意味著 segments 數(shù)組的長度最大為65536, 對應(yīng)的二進(jìn)制是16位。

為了加深讀者理解,下面來分析下,當(dāng)我們用 new ConcurrentHashMap() 無參構(gòu)造函數(shù)進(jìn)行初始化的,那么初始化完成后:

Segment 數(shù)組長度為 16,不可以擴(kuò)容
Segment[i] 的默認(rèn)大小為 2,負(fù)載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個(gè)元素不會觸發(fā)擴(kuò)容,插入第二個(gè)會進(jìn)行第一次擴(kuò)容
這里初始化了 segment[0],其他位置還是 null,至于為什么要初始化 segment[0],后面的代碼會介紹
當(dāng)前段偏移量 segmentShift 的值為 32 - 4 = 28,段掩碼  segmentMask 為 16 - 1 = 15,這兩個(gè)值馬上就會用到
get 操作
get 操作需要保證的是可見性,所以并沒有什么同步邏輯。

計(jì)算 hash 值,找到 segment 數(shù)組中的具體位置
segment 中也是一個(gè)數(shù)組(HashEntry數(shù)組),根據(jù) hash 找到數(shù)組中具體的位置
到這里是鏈表了,HashEntry 是鏈表中的元素,順著鏈表進(jìn)行查找即可
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值,32位
    int h = hash(key);
    // 利用位操作替換普通數(shù)學(xué)運(yùn)算,將hash值無符號左移段偏移量位,即取高四位,在與上段掩碼(15二進(jìn)制位1111)
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據(jù) hash 找到對應(yīng)的 segment,利用Unsafe直接進(jìn)行volatile access
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內(nèi)部數(shù)組相應(yīng)位置的鏈表,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
put操作
對于 put 操作,首先是通過二次哈希避免哈希沖突,然后以 Unsafe 調(diào)用方式,直接獲取相應(yīng)的 Segment,然后進(jìn)行線程安全的 put 操作:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 二次哈希,以保證數(shù)據(jù)的分散性,避免哈希沖突
   int hash = hash(key.hashCode());
    // 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j
    //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位,
    //    然后和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是segment的數(shù)組下標(biāo)
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時(shí)候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對 segment[j] 進(jìn)行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}
其核心邏輯實(shí)現(xiàn)在下面的內(nèi)部方法中:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨(dú)占鎖
    //    先看主流程,后面還會具體介紹這部分內(nèi)容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個(gè)是 segment 內(nèi)部的數(shù)組
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應(yīng)該放置的數(shù)組下標(biāo)
        int index = (tab.length - 1) & hash;
        // first 是數(shù)組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);

        // 下面這串 for 循環(huán)雖然很長,不過也很好理解,想象當(dāng)前位置鏈表不為空則先遍歷找是否存在,如果存在則覆蓋,否則放到合適的位置
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續(xù)順著鏈表走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個(gè)要看獲取鎖的過程,不過和這里都沒有關(guān)系。
                // 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是null,初始化并設(shè)置為鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個(gè) segment 需要擴(kuò)容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴(kuò)容后面也會具體分析
                else
                    // 沒有達(dá)到閾值,將 node 放到數(shù)組 tab 的 index 位置,
                    // 其實(shí)就是將新的節(jié)點(diǎn)設(shè)置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}





rehash:擴(kuò)容操作
重復(fù)一下,segment 數(shù)組不能擴(kuò)容,擴(kuò)容是 segment 數(shù)組某個(gè)位置內(nèi)部的數(shù)組 HashEntry<K,V>[] 進(jìn)行擴(kuò)容,擴(kuò)容后,容量為原來的 2 倍。

首先,我們要回顧一下觸發(fā)擴(kuò)容的地方,put 的時(shí)候,如果判斷該值的插入會導(dǎo)致該 segment 的元素個(gè)數(shù)超過閾值,那么先進(jìn)行擴(kuò)容,再插值,讀者這個(gè)時(shí)候可以回去 put 方法看一眼。

該方法不需要考慮并發(fā),因?yàn)榈竭@里的時(shí)候,是持有該 segment 的獨(dú)占鎖的。

// 方法參數(shù)上的 node 是這次擴(kuò)容后,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創(chuàng)建新數(shù)組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴(kuò)容到 32,那么 sizeMask 為 31,對應(yīng)二進(jìn)制 ‘000...00011111’
    int sizeMask = newCapacity - 1;

    // 遍歷原數(shù)組,老套路,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個(gè)位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個(gè)元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計(jì)算應(yīng)該放置在新數(shù)組中的位置,
            // 假設(shè)原數(shù)組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            // 該位置處只有一個(gè)元素,那比較好辦,直接放到新數(shù)組中對應(yīng)的位置
            if (next == null)   
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當(dāng)前鏈表的頭結(jié)點(diǎn) e 的新位置
                int lastIdx = idx;

                // 下面這個(gè) for 循環(huán)會找到一個(gè) lastRun 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)之后的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之后的所有節(jié)點(diǎn)組成的這個(gè)鏈表放到 lastIdx 這個(gè)位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的節(jié)點(diǎn),
                //    這些節(jié)點(diǎn)可能分配在另一個(gè)鏈表中,也可能分配到上面的那個(gè)鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數(shù)組中剛剛的 兩個(gè)鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
上面有兩個(gè)挨著的 for 循環(huán),第一個(gè) for 有什么用呢?

這塊代碼我看的時(shí)候真的是很難理解,反復(fù)看了好幾遍,主要原因還是對鏈表操作不太熟悉,這里為大家在解釋下,幫助理解。這里需要進(jìn)行第一個(gè) for 循環(huán),主要是因?yàn)閿U(kuò)容后,原來數(shù)組位置 i 的 HashEntry 是一個(gè)鏈表,那么這個(gè)鏈表的元素對應(yīng)擴(kuò)容后的數(shù)組位置必然是 i 或 i+oldCap。第一個(gè)循環(huán)就是為遍歷當(dāng)前位置 i 的鏈表找到最后一個(gè)在新數(shù)組中位置相同的節(jié)點(diǎn) lastRun。

如果沒有第一個(gè) for 循環(huán),也是可以工作的,但是,這個(gè) for 循環(huán)下來,如果 lastRun 的后面還有比較多的節(jié)點(diǎn),那么這次就是值得的。因?yàn)槲覀冎恍枰寺?lastRun 前面的節(jié)點(diǎn),后面的一串節(jié)點(diǎn)跟著 lastRun 進(jìn)行賦值就可以了,不需要做任何操作。

Doug Lea 大神這塊的想法一般人可能是想不到的,畢竟作為并發(fā)包中的基礎(chǔ)類 都是為了將并發(fā)性能做到極致的。但是也有最差的情況,就是找到的 lastRun 是鏈表的最后一個(gè)元素,或者排在倒數(shù),那么這次遍歷就顯得多余了,而且浪費(fèi)了性能。不過 Doug Lea 也說了,根據(jù)統(tǒng)計(jì),如果使用默認(rèn)的閾值,大約只有 1/6 的節(jié)點(diǎn)需要克隆。

size 操作
知道了 ConcurrentHashMap 通過分段鎖實(shí)現(xiàn)高性能且線程安全的原理。試想,如果不進(jìn)行同步,簡單的計(jì)算所有 Segment 的總值,可能會因?yàn)椴l(fā) put,導(dǎo)致結(jié)果不準(zhǔn)確,但是直接鎖定所有 Segment 進(jìn)行計(jì)算,就會變得非常昂貴。

所以,ConcurrentHashMap 的實(shí)現(xiàn)是通過重試機(jī)制(RETRIES_BEFORE_LOCK,指定重試次數(shù) 2),來試圖獲得可靠值。如果沒有監(jiān)控到發(fā)生變化(通過對比 Segment.modCount),就直接返回,否則獲取鎖進(jìn)行操作。

Java8中ConcurrentHashMap分析
在 Java 8 和之后的版本中,ConcurrentHashMap 發(fā)生了哪些變化呢?

Java8 對 HashMap 進(jìn)行了一些修改,最大的不同就是利用了紅黑樹,所以其由 數(shù)組+鏈表+紅黑樹 組成。

因?yàn)椴辉偈褂?Segment,初始化操作大大簡化,修改為 lazy-load 形式,這樣可以有效避免初始開銷,解決了老版本很多人抱怨的這一點(diǎn)。

數(shù)據(jù)存儲利用 volatile 來保證可見性。

使用 CAS 等操作,在特定場景進(jìn)行無鎖并發(fā)操作。

這里介紹一個(gè)最常問的問題:Java8 為什么使用紅黑樹呢?

根據(jù) Java7 HashMap 的介紹,我們知道,查找的時(shí)候,根據(jù) hash 值我們能夠快速定位到數(shù)組的具體下標(biāo),但是之后的話,需要順著鏈表一個(gè)個(gè)比較下去才能找到我們需要的,時(shí)間復(fù)雜度取決于鏈表的長度,為 O(n)。

為了降低這部分的開銷,在 Java8 中,當(dāng)鏈表中的元素達(dá)到了 8 個(gè)時(shí),會將鏈表轉(zhuǎn)換為紅黑樹,在這些位置進(jìn)行查找的時(shí)候可以降低時(shí)間復(fù)雜度為 O(logN)。



注意,上圖是示意圖,主要是描述結(jié)構(gòu),不會達(dá)到這個(gè)狀態(tài)的,因?yàn)檫@么多數(shù)據(jù)的時(shí)候早就擴(kuò)容了。

Java7 中使用 HashEntry 來代表每個(gè) HashMap 中的數(shù)據(jù)節(jié)點(diǎn),Java8 中使用 Node,基本沒有區(qū)別,都是 key,value,hash 和 next 這四個(gè)屬性,不過,Node 只能用于鏈表的情況,紅黑樹的情況需要使用 TreeNode。

先看看現(xiàn)在的數(shù)據(jù)存儲內(nèi)部實(shí)現(xiàn),我們可以發(fā)現(xiàn) Key 是 final 的,因?yàn)樵谏芷谥?,一個(gè)條目的 Key 發(fā)生變化是不可能的;與此同時(shí) val,則聲明為 volatile,以保證可見性。

static class Node<K,V> implements Map.Entry<K,V> {
 final int hash;
 final K key;
 volatile V val;
 volatile Node<K,V> next;
 // …
}
為了提高大家的閱讀體驗(yàn),我這里就不再介紹 get 方法和構(gòu)造函數(shù)了,相對比較簡單,相信你如果看懂了 Java7 的實(shí)現(xiàn)一定沒有啥問題的。直接看并發(fā)的 put 是如何實(shí)現(xiàn)的。

put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    // 用于記錄相應(yīng)鏈表的長度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果數(shù)組"空",進(jìn)行數(shù)組初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化數(shù)組
            tab = initTable();

        // 找該 hash 值對應(yīng)的數(shù)組下標(biāo),得到第一個(gè)節(jié)點(diǎn) f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果數(shù)組該位置為空,
            // 用一次 CAS 操作將這個(gè)新值放入其中即可,這個(gè) put 操作差不多就結(jié)束了,可以拉到最后面
            // 如果 CAS 失敗,那就是有并發(fā)操作,進(jìn)到下一個(gè)循環(huán)就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等于 MOVED,這個(gè)需要到后面才能看明白,不過從名字上也能猜到,肯定是因?yàn)樵跀U(kuò)容
        else if ((fh = f.hash) == MOVED)
            // 幫助數(shù)據(jù)遷移,這個(gè)等到看完數(shù)據(jù)遷移部分的介紹后,再理解這個(gè)就很簡單了
            tab = helpTransfer(tab, f);

        else { // 到這里就是說,f 是該位置的頭結(jié)點(diǎn),而且不為空

            V oldVal = null;
            // 獲取數(shù)組該位置的頭結(jié)點(diǎn)的監(jiān)視器鎖
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 頭結(jié)點(diǎn)的 hash 值大于 0,說明是鏈表
                        // 用于累加,記錄鏈表的長度
                        binCount = 1;
                        // 遍歷鏈表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果發(fā)現(xiàn)了"相等"的 key,判斷是否要進(jìn)行值覆蓋,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了鏈表的最末端,將這個(gè)新值放到鏈表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 紅黑樹
                        Node<K,V> p;
                        binCount = 2;
                        // 調(diào)用紅黑樹的插值方法插入新節(jié)點(diǎn)
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判斷是否要將鏈表轉(zhuǎn)換為紅黑樹,臨界值和 HashMap 一樣,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 這個(gè)方法和 HashMap 中稍微有一點(diǎn)點(diǎn)不同,那就是它不是一定會進(jìn)行紅黑樹轉(zhuǎn)換,
                    // 如果當(dāng)前數(shù)組的長度小于 64,那么會選擇進(jìn)行數(shù)組擴(kuò)容,而不是轉(zhuǎn)換為紅黑樹
                    // 具體源碼我們就不看了,擴(kuò)容部分后面說
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //
    addCount(1L, binCount);
    return null;
}
put 的主流程看完了,但是至少留下了幾個(gè)問題,第一個(gè)是初始化,第二個(gè)是擴(kuò)容,第三個(gè)是幫助數(shù)據(jù)遷移,這些我們都會在后面進(jìn)行一一介紹。

初始化數(shù)組:initTable
從上面的 put 操作可以看到,數(shù)組初始化是在 put 操作時(shí)進(jìn)行的,采用的 lazy-load 形式。

這個(gè)比較簡單,主要就是初始化一個(gè)合適大小的數(shù)組,然后會設(shè)置 sizeCtl。

初始化方法中的并發(fā)問題是通過對 sizeCtl 進(jìn)行一個(gè) CAS 操作來控制的。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 初始化的"功勞"被其他線程"搶去"了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,將 sizeCtl 設(shè)置為 -1,代表搶到了鎖
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 默認(rèn)初始容量是 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 初始化數(shù)組,長度為 16 或初始化時(shí)提供的長度
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 將這個(gè)數(shù)組賦值給 table,table 是 volatile 的
                    table = tab = nt;
                    // 如果 n 為 16 的話,那么這里 sc = 12
                    // 其實(shí)就是 0.75 * n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 設(shè)置 sizeCtl 為 sc,我們就當(dāng)是 12 吧
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}





鏈表轉(zhuǎn)為紅黑樹:treeifyBin
這里需要注意:前面我們在 put 源碼分析也說過,treeifyBin 不一定就會進(jìn)行紅黑樹轉(zhuǎn)換,也可能是僅僅做數(shù)組擴(kuò)容。我們還是進(jìn)行源碼分析吧。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 所以,如果數(shù)組長度小于 64 的時(shí)候,其實(shí)也就是 32 或者 16 或者更小的時(shí)候,會進(jìn)行數(shù)組擴(kuò)容
            tryPresize(n << 1);
        // b 是頭結(jié)點(diǎn)
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 加鎖
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 下面就是遍歷鏈表,建立一顆紅黑樹
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 將紅黑樹設(shè)置到數(shù)組相應(yīng)位置中
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
擴(kuò)容:tryPresize
如果說 Java8 ConcurrentHashMap 的源碼不簡單,那么說的就是擴(kuò)容操作和遷移操作。

這里的擴(kuò)容也是做翻倍擴(kuò)容的,擴(kuò)容后數(shù)組容量為原來的 2 倍。

這個(gè)方法要完完全全看懂還需要看之后的 transfer 方法,讀者應(yīng)該提前知道這點(diǎn)。

// 首先要說明的是,方法參數(shù) size 傳進(jìn)來的時(shí)候就已經(jīng)翻了倍了
private final void tryPresize(int size) {
    // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    // 目前容器大小
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 這個(gè) if 分支和之前說的初始化數(shù)組的代碼基本上是一樣的,在這里,我們可以不用管這塊代碼
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        // 16-4=12
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 小于目前大小或者達(dá)到最大值直接返回
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 說明是tab過程中沒有發(fā)生變化,類似于懶加載的雙重檢查
        else if (tab == table) {
            // value = 32795
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 2. 用 CAS 將 sizeCtl 加 1,然后執(zhí)行 transfer 方法 此時(shí) nextTab 不為 null
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 1. 將 sizeCtl 設(shè)置為 (rs << RESIZE_STAMP_SHIFT) + 2)
            // 我是沒看懂這個(gè)值真正的意義是什么?不過可以計(jì)算出來的是,結(jié)果是一個(gè)比較大的負(fù)數(shù)
            // 調(diào)用 transfer 方法,此時(shí) nextTab 參數(shù)為 null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
這個(gè)方法的核心在于 sizeCtl 值的操作,首先將其設(shè)置為一個(gè)負(fù)數(shù),然后執(zhí)行 transfer(tab, null),再下一個(gè)循環(huán)將 sizeCtl 加 1,并執(zhí)行 transfer(tab, nt),之后可能是繼續(xù) sizeCtl 加 1,并執(zhí)行 transfer(tab, nt)。

所以,可能的操作就是執(zhí)行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這里怎么結(jié)束循環(huán)的需要看完 transfer 源碼才清楚。

數(shù)據(jù)遷移:transfer
下面這個(gè)方法有點(diǎn)長,將原來的 tab 數(shù)組的元素遷移到新的 nextTab 數(shù)組中。

雖然我們之前說的 tryPresize 方法中多次調(diào)用 transfer 不涉及多線程,但是這個(gè) transfer 方法可以在其他地方被調(diào)用,典型地,我們之前在說 put 方法的時(shí)候就說過了,請往上看 put 方法,是不是有個(gè)地方調(diào)用了 helpTransfer 方法,helpTransfer 方法會調(diào)用 transfer 方法的。

此方法支持多線程執(zhí)行,外圍調(diào)用此方法的時(shí)候,會保證第一個(gè)發(fā)起數(shù)據(jù)遷移的線程,nextTab 參數(shù)為 null,之后再調(diào)用此方法的時(shí)候,nextTab 不會為 null。

閱讀源碼之前,先要理解并發(fā)操作的機(jī)制。原數(shù)組長度為 n,所以我們有 n 個(gè)遷移任務(wù),讓每個(gè)線程每次負(fù)責(zé)一個(gè)小任務(wù)是最簡單的,每做完一個(gè)任務(wù)再檢測是否有其他沒做完的任務(wù),幫助遷移就可以了,而 Doug Lea 使用了一個(gè) stride,簡單理解就是步長,每個(gè)線程每次負(fù)責(zé)遷移其中的一部分,如每次遷移 16 個(gè)小任務(wù)。所以,我們就需要一個(gè)全局的調(diào)度者來安排哪個(gè)線程執(zhí)行哪幾個(gè)任務(wù),這個(gè)就是屬性 transferIndex 的作用。

第一個(gè)發(fā)起數(shù)據(jù)遷移的線程會將 transferIndex 指向原數(shù)組最后的位置,然后從后往前的 stride 個(gè)任務(wù)屬于第一個(gè)線程,然后將 transferIndex 指向新的位置,再往前的 stride 個(gè)任務(wù)屬于第二個(gè)線程,依此類推。當(dāng)然,這里說的第二個(gè)線程不是真的一定指代了第二個(gè)線程,也可以是同一個(gè)線程,這個(gè)讀者應(yīng)該能理解吧。其實(shí)就是將一個(gè)大的遷移任務(wù)分為了一個(gè)個(gè)任務(wù)包。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // stride 在單核下直接等于 n,多核模式下為 (n>>>3)/NCPU,最小值是 16
    // stride 可以理解為”步長“,有 n 個(gè)位置是需要進(jìn)行遷移的,
    // 將這 n 個(gè)任務(wù)分為多個(gè)任務(wù)包,每個(gè)任務(wù)包有 stride 個(gè)任務(wù)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果 nextTab 為 null,先進(jìn)行一次初始化
    // 前面我們說了,外圍會保證第一個(gè)發(fā)起遷移的線程調(diào)用此方法時(shí),參數(shù) nextTab 為 null
    // 之后參與遷移的線程調(diào)用此方法時(shí),nextTab 不會為 null
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的屬性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的屬性,用于控制遷移的位置
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // ForwardingNode 翻譯過來就是正在被遷移的 Node
    // 這個(gè)構(gòu)造方法會生成一個(gè)Node,key、value 和 next 都為 null,關(guān)鍵是 hash 為 MOVED
    // 后面我們會看到,原數(shù)組中位置 i 處的節(jié)點(diǎn)完成遷移工作后,
    // 就會將位置 i 處設(shè)置為這個(gè) ForwardingNode,用來告訴其他線程該位置已經(jīng)處理過了
    // 所以它其實(shí)相當(dāng)于是一個(gè)標(biāo)志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // advance 指的是做完了一個(gè)位置的遷移工作,可以準(zhǔn)備做下一個(gè)位置的了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 下面這個(gè) for 循環(huán),最難理解的在前面,而要看懂它們,應(yīng)該先看懂后面的,然后再倒回來
    // i 是位置索引,bound 是邊界,注意是從后往前
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 下面這個(gè) while 真的是不好理解
        // advance 為 true 表示可以進(jìn)行下一個(gè)位置的遷移了
        // 簡單理解為:i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // 將 transferIndex 值賦給 nextIndex
            // 這里 transferIndex 一旦小于等于 0,說明原數(shù)組的所有位置都有相應(yīng)的線程去處理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括號中的代碼,nextBound 是這次遷移任務(wù)的邊界,注意,是從后往前
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有的遷移操作已經(jīng)完成
            if (finishing) {
                nextTable = null;
                // 將新的 nextTab 賦值給 table 屬性,完成遷移
                table = nextTab;
                // 重新計(jì)算 sizeCtl:n 是原數(shù)組長度,所以 sizeCtl 得出的值將是新數(shù)組長度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 之前我們說過,sizeCtl 在遷移前會設(shè)置為 (rs << RESIZE_STAMP_SHIFT) + 2
            // 然后,每有一個(gè)線程參與遷移就會將 sizeCtl 加 1,
            // 這里使用 CAS 操作對 sizeCtl 進(jìn)行減 1,代表做完了屬于自己的任務(wù)
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任務(wù)結(jié)束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 到這里,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是說,所有的遷移任務(wù)都做完了,也就會進(jìn)入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 處是空的,沒有任何節(jié)點(diǎn),那么放入剛剛初始化的 ForwardingNode ”空節(jié)點(diǎn)“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 該位置處是一個(gè) ForwardingNode,代表該位置已經(jīng)遷移過了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 對數(shù)組該位置處的結(jié)點(diǎn)加鎖,開始處理數(shù)組該位置處的遷移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 頭結(jié)點(diǎn)的 hash 大于 0,說明是鏈表的 Node 節(jié)點(diǎn)
                    if (fh >= 0) {
                        // 下面這一塊和 Java7 中的 ConcurrentHashMap 遷移是差不多的,
                        // 需要將鏈表一分為二,
                        // 找到原鏈表中的 lastRun,然后 lastRun 及其之后的節(jié)點(diǎn)是一起進(jìn)行遷移的
                        // lastRun 之前的節(jié)點(diǎn)需要進(jìn)行克隆,然后分到兩個(gè)鏈表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一個(gè)鏈表放在新數(shù)組的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一個(gè)鏈表放在新數(shù)組的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 將原數(shù)組該位置處設(shè)置為 fwd,代表該位置已經(jīng)處理完畢,
                        // 其他線程一旦看到該位置的 hash 值為 MOVED,就不會進(jìn)行遷移了
                        setTabAt(tab, i, fwd);
                        // advance 設(shè)置為 true,代表該位置已經(jīng)遷移完畢
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 紅黑樹的遷移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分為二后,節(jié)點(diǎn)數(shù)少于 8,那么將紅黑樹轉(zhuǎn)換回鏈表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 將 ln 放置在新數(shù)組的位置 i
                        setTabAt(nextTab, i, ln);
                        // 將 hn 放置在新數(shù)組的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 將原數(shù)組該位置處設(shè)置為 fwd,代表該位置已經(jīng)處理完畢,
                        // 其他線程一旦看到該位置的 hash 值為 MOVED,就不會進(jìn)行遷移了
                        setTabAt(tab, i, fwd);
                        // advance 設(shè)置為 true,代表該位置已經(jīng)遷移完畢
                        advance = true;
                    }
                }
            }
        }
    }
}
說到底,transfer 這個(gè)方法并沒有實(shí)現(xiàn)所有的遷移任務(wù),每次調(diào)用這個(gè)方法只實(shí)現(xiàn)了 transferIndex 往前 stride 個(gè)位置的遷移工作,其他的需要由外圍來控制。

這個(gè)時(shí)候,再回去仔細(xì)看 tryPresize 方法可能就會更加清晰一些了。

總結(jié)
今天我從線程安全問題開始,分析為什么要使用 ConcurrentHashMap,進(jìn)而分析了 Java 7 和 Java 8 中 ConcurrentHashMap 是如何設(shè)計(jì)實(shí)現(xiàn)的,從源碼層面說明白了具體的實(shí)現(xiàn)邏輯。其實(shí)仔細(xì)認(rèn)真讀懂后你會發(fā)現(xiàn)其實(shí)也不是太難。

希望本文讓你對 ConcurrentHashMap 面試相關(guān)問題輕松的應(yīng)對,同時(shí)作為并發(fā)編程技巧對你在日常開發(fā)可以有所幫助。

水平有限,文章難免會有紕漏,如有錯(cuò)誤歡迎一起交流探討,我會第一時(shí)間更正的。

參考資料:

周志明:《深入理解 Java 虛擬機(jī)》
方騰飛:《Java 并發(fā)編程的藝術(shù)》
https://www.javadoop.com/

作者:七哥


公眾號:牧小農(nóng),微信掃碼關(guān)注或搜索公眾號名稱