2018/1/13 23:01:53当前位置推荐好文程序员浏览文章

ConcurrentHashMap 是一个支持并发检索和并发更新的线程安全的HashMap(但不允许空key或value)。不管是在实际工作或者是面试中,ConcurrentHashMap 都是在整个JUC集合框架里出现频率最高的一个类,所以,对ConcurrentHashMap有一个深入的认识对我们自身还是非常重要的。本章我们来从源码层面详细分析 ConcurrentHashMap 的实现(基于JDK1.8),希望对大家有所帮助。

1. 概述

ConcurrentHashMap 在JDK1.7之前是通过Lock和Segment(分段锁)实现并发安全,1.8之后改为CAS+synchronized来保证并发安全(为了序列化兼容,1.8的代码中还是保留了Segment的部分代码)。由于笔者没有过多研究过1.7的源码,所以我们后面的分析主要针对JDK1.8。

首先来看一下ConcurrentHashMap、HashMap和HashTable的区别:

  • HashMap 是非线程安全的哈希表,常用于单线程程序中。
  • Hashtable 是线程安全的哈希表,由于是通过内置锁 synchronized 来保证线程安全,在资源争用比较高的环境下,Hashtable 的效率比较低。
  • ConcurrentHashMap 是一个支持并发操作的线程安全的HashMap,但是他不允许存储空key或value。使用CAS+synchronized来保证并发安全,在并发访问时不需要阻塞线程,所以效率是比Hashtable 要高的。

2. 数据结构

ConcurrentHashMap数据结构

ConcurrentHashMap 是通过Node来存储K-V的,从上图可以看出,它的内部有很多Node节点(在内部封装了一个Node数组—table),不同的节点有不同的作用。下面来看一下这几个Node节点类:

Node
  1. Node<K, V>: 保存k-v、k的hash值和链表的下一个节点next,其中V和next用volatile修饰,保证多线程环境下的可见性。
  2. TreeNode<K, V>: 红黑树节点类,当链表长度>=8且数组长度>=64时,Node会转为TreeNode,但它不是直接转为红黑树,而是把这些TreeNode节点放入TreeBin对象中,由treeBin完成红黑树的封装。
  3. TreeBin<K, V>: 封装了TreeNode,红黑树的根节点,也就是说在ConcurrentHashMap中红黑树存储的是TreeBin对象。
  4. ForwardingNode<K, V>: 在节点转移时用于连接两个table(table和nextTable)的节点类。包含一个nextTable指针,用于指向下一个table。而且这个节点的k-v和next指针全部为null,hash值为-1。只有在扩容时发挥作用,作为一个占位节点放在table中表示当前节点已经被移动。
  5. ReservationNode<K,V>: 在computeIfAbsent和compute方法计算时当做一个占位节点,表示当前节点已经被占用,在compute或computeIfAbsent的function计算完成后插入元素。hash值为-3。

2.1 核心参数

//最大容量private static final int MAXIMUM_CAPACITY = 1 << 30;//初始容量private static final int DEFAULT_CAPACITY = 16;//数组最大容量static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//默认并发度,兼容1.7及之前版本private static final int DEFAULT_CONCURRENCY_LEVEL = 16;//加载/扩容因子,实际使用n - (n >>> 2)private static final float LOAD_FACTOR = 0.75f;//链表转红黑树的节点数阀值static final int TREEIFY_THRESHOLD = 8;//红黑树转链表的节点数阀值static final int UNTREEIFY_THRESHOLD = 6;//当数组长度还未超过64,优先数组的扩容,否则将链表转为红黑树static final int MIN_TREEIFY_CAPACITY = 64;//扩容时任务的最小转移节点数private static final int MIN_TRANSFER_STRIDE = 16;//sizeCtl中记录stamp的位数private static int RESIZE_STAMP_BITS = 16;//帮助扩容的最大线程数private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;//size在sizeCtl中的偏移量private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;//存放Node元素的数组,在第一次插入数据时初始化transient volatile Node<K,V>[] table;//一个过渡的table表,只有在扩容的时候才会使用private transient volatile Node<K,V>[] nextTable;//基础计数器值(size = base+ CounterCell[i].value)private transient volatile long baseCount;//控制table初始化和扩容操作private transient volatile int sizeCtl;//节点转移时下一个需要转移的table索引private transient volatile int transferIndex;//元素变化时用于控制自旋private transient volatile int cellsBusy;// 保存table中的每个节点的元素个数 2的幂次方// size = base+ CounterCell[i].valueprivate transient volatile CounterCell[] counterCells;

table:Node数组,在第一次插入元素的时候初始化,默认初始大小为16,用来存储Node节点数据,扩容时大小总是2的幂次方。

nextTable:默认为null,扩容时生成的新的数组,其大小为原数组的两倍。

sizeCtl :默认为0,用来控制table的初始化和扩容操作,在不同的情况下有不同的涵义:

  • -1 代表table正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 初始化数组或扩容完成后,将sizeCtl的值设为0.75n
  • 在扩容操作在进行节点转移前,sizeCtl改为(hash << RESIZE_STAMP_SHIFT) + 2,这个值为负数,并且每有一个线程参与扩容操作sizeCtl就加1

transferIndex:扩容时用到,初始时为table.length,表示从索引 0 到transferIndex的节点还未转移 。

counterCells: ConcurrentHashMap的特定计数器,实现方法跟LongAdder类似。这个计数器的机制避免了在更新时的资源争用,但是如果并发读取太频繁会导致缓存超负荷,为了避免读取太频繁,只有在添加了两个以上节点时才可以尝试扩容操作。在统一hash分配的前提下,发生这种情况的概率在13%左右,也就是说只有大约1/8的put操作才会检查扩容(并且在扩容后会更少)。

hash计算公式hash = (key.hashCode ^ (key.hashCode >>> 16)) & HASH_BITS
索引计算公式(table.length-1)&hash

3. 源码解析

3.1 put(K, V)

public V put(K key, V value) {    return putVal(key, value, false);}/ Implementation for put putIfAbsent /final V putVal(K key, V value, boolean onlyIfAbsent) {    if (key == null || value == null) throw new NullPointerException();    //计算hash值    int hash = spread(key.hashCode());    int bin= 0;    for (Node<K,V>[] tab = table;;) {//自旋        //f:索引节点; n:tab.length; i:新节点索引 (n - 1) & hash; fh:f.hash        Node<K,V> f; int n, i, fh;        if (tab == null || (n = tab.length) == 0)            //初始化            tab = initTable();        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//索引i节点为空,直接插入            //cas插入节点,成功则跳出循环            if (casTabAt(tab, i, null,                         new Node<K,V>(hash, key, value, null)))                break;                   // no lock when adding to empty bin        }        //当前节点处于移动状态-其他线程正在进行节点转移操作        else if ((fh = f.hash) == MOVED)            //帮助转移            tab = helpTransfer(tab, f);        else {            V oldVal = null;            synchronized (f) {                if (tabAt(tab, i) == f) {//check stable                    //f.hash>=0,说明f是链表的头结点                    if (fh >= 0) {                        bin= 1;//记录链表节点数,用于后面是否转换为红黑树做判断                        for (Node<K,V> e = f;; ++binCount) {                            K ek;                            //key相同 修改                            if (e.hash == hash &&                                ((ek = e.key) == key ||                                 (ek != null && key.equals(ek)))) {                                oldVal = e.val;                                if (!onlyIfAbsent)                                    e.val = value;                                break;                            }                            Node<K,V> pred = e;                            //到这里说明已经是链表尾,把当前值作为新的节点插入到队尾                            if ((e = e.next) == null) {                                pred.next = new Node<K,V>(hash, key,                                                          value, null);                                break;                            }                        }                    }                    //红黑树节点操作                    else if (f instanceof TreeBin) {                        Node<K,V> p;                        bin= 2;                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                                       value)) != null) {                            oldVal = p.val;                            if (!onlyIfAbsent)                                p.val = value;                        }                    }                }            }            if (bin!= 0) {                //如果链表中节点数bin>= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构                if (bin>= TREEIFY_THRESHOLD)                    treeifyBin(tab, i);                if (oldVal != null)                    return oldVal;                break;            }        }    }    //更新新元素个数    addCount(1L, binCount);    return null;}

说明:在 ConcurrentHashMap 中,put 方法几乎涵盖了所有内部的函数操作。所以,我们将从put函数开始逐步向下分析。
首先说一下put的流程,后面再详细分析每一个流程的具体实现(阅读时请结合源码):

  1. 计算当前key的hash值,根据hash值计算索引 i (i=(table.length - 1) & hash)
  2. 如果当前table为null,说明是第一次进行put操作,调用initTable()初始化table
  3. 如果索引 i 位置的节点 f 为空,则直接把当前值作为新的节点直接插入到索引 i 位置;
  4. 如果节点 f 的hash为-1(f.hash == MOVED(-1)),说明当前节点处于移动状态(或者说是其他线程正在对 f 节点进行转移/扩容操作),此时调用helpTransfer(tab, f)帮助转移/扩容;
  5. 如果不属于上述条件,说明已经有元素存储到索引 i 处,此时需要对索引 i 处的节点 f 进行 put or 操作,首先使用内置锁 synchronized 对节点 f 进行加锁;
    a) 如果f.hash>=0,说明 i 位置是一个链表,并且节点 f 是这个链表的头节点,则对 f 节点进行遍历,此时分两种情况:
  • 如果链表中某个节点e的hash与当前key的hash相同,则对这个节点e的value进行修改操作。
  • 如果遍历到链表尾都没有找到与当前key的hash相同的节点,则把当前K-V作为一个新的节点插入到这个链表尾部。
    b) 如果节点 f 是TreeBin节点(f instanceof TreeBin),说明索引 i 位置的节点是一个红黑树,则调用putTreeVal方法找到一个已存在的节点进行修改,或者是把当前K-V放入一个新的节点(put or update)。
  1. 完成插入后,如果索引 i 处是一个链表,并且在插入新的节点后节点数>8,则调用treeifyBin把链表转换为红黑树。
  2. 最后,调用addCount更新元素数量

3.1.1 initTable()

/  Initializes table, using the size recorded in sizeCtl. /private final Node<K,V>[] initTable() {    Node<K,V>[] tab; int sc;    while ((tab = table) == null || tab.length == 0) {        if ((sc = sizeCtl) < 0)//其他线程正在进行初始化或转移操作,让出CPU执行时间片,继续自旋            Thread.yield(); // lost initialization race; just spin        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS设置sizectl为-1 表示当前线程正在进行初始化            try {                if ((tab = table) == null || tab.length == 0) {                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                    @SuppressWarnings("unchecked")                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                    table = tab = nt;                    sc = n - (n >>> 2);//0.75n 设置扩容阈值                }            } finally {                sizeCtl = sc;//初始化sizeCtl=0.75n            }            break;        }    }    return tab;}

说明:初始化操作,ConcurrentHashMap的初始化在第一次插入数据的时候(判断table是否为null),注意初始化操作为单线程操作(如果有其他线程正在进行初始化,则调用Thread.yield()让出CPU时间片,自旋等待table初始完成)。

3.1.2 helpTransfer(Node<K,V>[], Node<K,V>)

//帮助其他线程进行转移操作final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {    Node<K,V>[] nextTab; int sc;    if (tab != null && (f instanceof ForwardingNode) &&        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {        //计算操作栈校验码        int rs = resizeStamp(tab.length);        while (nextTab == nextTable && table == tab &&               (sc = sizeCtl) < 0) {            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                sc == rs + MAX_RESIZERS || transferIndex <= 0)//不需要帮助转移,跳出                break;            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//CAS更新帮助转移的线程数                transfer(tab, nextTab);                break;            }        }        return nextTab;    }    return table;}

说明: 如果索引到的节点的 hash 为-1,说明当前节点处于移动状态(或者说是其他线程正在对 f 节点进行转移操作。这里主要是靠 ForwardingNode 节点来检测,在transfer方法中,被转移后的节点会改为ForwardingNode,它是一个占位节点,并且hash=MOVED(-1),也就是说,我们可以通过判断hash是否为MOVED来确定当前节点的状态),此时调用helpTransfer(tab, f)帮助转移,主要操作就是更新帮助转移的线程数(sizeCtl+1),然后调用transfer方法进行转移操作,transfer后面我们会详细分析。

3.1.3 treeifyBin(Node<K,V>[] , index)

private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //当数组长度还未超过64,优先数组的扩容,否则将链表转为红黑树 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //两倍扩容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) {//check stable //hd:节点头 TreeNode<K,V> hd = null, tl = null; //遍历转换节点 for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else                tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } }}

说明: 在put操作完成后,如果当前节点为一个链表,并且链表长度>=TREEIFY_THRESHOLD(8),此时就需要调用treeifyBin方法来把当前链表转为一个红黑树。treeifyBin主要进行两步操作:

  1. 如果当前table长度还未超过MIN_TREEIFY_CAPACITY(64),则优先对数组进行扩容操作,容量为原来的2倍(n<<1)。
  2. 否则就对当前节点进行转换操作(注意这个操作是单线程完成的)。遍历链表节点,把Node转换为TreeNode,然后在通过TreeBin来构造红黑树(红黑树的构造这里就不在详细介绍了)。

3.1.4 tryPresize(int size)

/  Tries to presize table to accommodate the given number of elements.   @param size number of elements (doesnt need to be perfectly accurate) / private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //未初始化 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } //已达到最大容量 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); //正在进行扩容操作 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } }}

说明: 当table容量不足时,需要对其进行两倍扩容。tryPresize方法很简单,主要就是用来检查扩容前的必要条件(比如是否超过最大容量),真正的扩容其实也可以叫“节点转移”,主要是通过transfer方法完成。

3.1.5 transfer(Node<K,V> tab, Node<K,V> nextTab)

//转移或复制节点到新的tableprivate final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {    int n = tab.length, stride;    //转移幅度( tab.length/(NCPU8) ),最小为16    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)        stride = MIN_TRANSFER_STRIDE; // subdivide range    if (nextTab == null) {            // initiating        try {            //根据当前数组长度,新建一个两倍长度的数组nextTab            @SuppressWarnings("unchecked")            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];            nextTab = nt;        } catch (Throwable ex) {      // try to cope with OOME            sizeCtl = Integer.MAX_VALUE;            return;        }        nextTable = nextTab;        transferIndex = n;//初始为table的最后一个索引    }    int nextn = nextTab.length;    //初始化ForwardingNode节点,持有nextTab的引用,在处理完每个节点之后当做占位节点,表示该槽位已经处理过了    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);    boolean advance = true;//节点是否已经处理    boolean finishing = false; // to ensure sweep before committing nextTab    //自旋移动每个节点,从transferIndex开始移动stride个节点到新的table。    //i:当前处理的Node索引;bound:需要处理节点的索引边界    for (int i = 0, bound = 0;;) {        //f:当前处理i位置的node; fh:f.hash        Node<K,V> f; int fh;        //通过while循环获取本次需要移动的节点索引i        while (advance) {            //nextIndex:下一个要处理的节点索引; nextBound:下一个需要处理的节点的索引边界            int nextIndex, nextBound;            if (--i >= bound || finishing)//通过--i控制下一个需要移动的节点                advance = false;            //节点已全部转移            else if ((nextIndex = transferIndex) <= 0) {                i = -1;                advance = false;            }            //transferIndex(初值为最后一个节点的索引),表示从transferIndex开始后面所有的节点都已分配,            //每次线程领取扩容任务后,需要更新transferIndex的值(transferIndex-stride)。            //CAS修改transferIndex,并更新索引边界            else if (U.compareAndSwapInt                     (this, TRANSFERINDEX, nextIndex,                      nextBound = (nextIndex > stride ?                                   nextIndex - stride : 0))) {                bound = nextBound;                i = nextIndex - 1;                advance = false;            }        }        if (i < 0 || i >= n || i + n >= nextn) {            int sc;            if (finishing) {//已完成转移,更新相关属性                nextTable = null;                table = nextTab;                sizeCtl = (n << 1) - (n >>> 1);//1.5n 扩容阈值设置为原来容量的1.5倍  依然相当于现在容量的0.75倍                return;            }            //当前线程已经完成转移,但可能还有其他线程正在进行转移操作            //每个线程完成自己的扩容操作后就对sizeCtl-1            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                //判断是否全部任务已经完成,sizeCtl初始值=(rs << RESIZE_STAMP_SHIFT) + 2)                //这里判断如果还有其他线程正在操作,直接返回,否则的话重新初始化i对原tab进行一遍检查然后再提交                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                    return;                finishing = advance = true;                i = n; // recheck before commit            }        }        else if ((f = tabAt(tab, i)) == null)            advance = casTabAt(tab, i, null, fwd);//i位置节点为空,替换为ForwardingNode节点,用于通知其他线程该位置已经处理        else if ((fh = f.hash) == MOVED)//节点已经被其他线程处理过,继续处理下一个节点            advance = true; // already processed        else {            synchronized (f) {                if (tabAt(tab, i) == f) {//check stable                    //处理当前拿到的节点,构建两个node:ln/hn。ln:原位置; hn:i+n位置                    Node<K,V> ln, hn;                    if (fh >= 0) {//当前为链表节点(fh>=0)                        //使用fn&n把原链表中的元素分成两份(fn&n = n or 0)                        //在表扩容2倍后,索引i可能发生改变,如果原table长度n=2^x,如果hash的x位为1,此时需要加上x位的值,也就是i+n;                        //如果x位为0,索引i不变                        int runBit = fh & n; // n or 0                        //最后一个与头节点f索引不同的节点                        Node<K,V> lastRun = f;                        //从索引i的节点开始向后查找最后一个有效节点                        for (Node<K,V> p = f.next; p != null; p = p.next) {                            int b = p.hash & n;//n or 0                            if (b != runBit) {                                runBit = b;                                lastRun = p;                            }                        }                        if (runBit == 0) {                            ln = lastRun;                            hn = null;                        } else {                            hn = lastRun;                            ln = null;                        }                        //把f链表分解为两个链表                        for (Node<K,V> p = f; p != lastRun; p = p.next) {                            int ph = p.hash; K pk = p.key; V pv = p.val;                            //在原位置                            if ((ph & n) == 0)                                ln = new Node<K,V>(ph, pk, pv, ln);                            //i+n位置                            else                                hn = new Node<K,V>(ph, pk, pv, hn);                        }                        //nextTab的i位置插入一个链表                        setTabAt(nextTab, i, ln);                        //nextTab的i+n位置插入一个链表                        setTabAt(nextTab, i + n, hn);                        //在table的i位置上插入forwardNode节点  表示已经处理过该节点                        setTabAt(tab, i, fwd);                        advance = true;                    }                    /                      如果该节点是红黑树结构,则构造树节点lo和hi,遍历红黑树中的节点,同样是根据hash&tab.length算法,                      把节点分为两类,分别插入索引i和(i+n)位置。                     /                    else if (f instanceof TreeBin) {                        //转为根结点                        TreeBin<K,V> t = (TreeBin<K,V>)f;                        TreeNode<K,V> lo = null, loTail = null;//低位(i)节点和低位尾节点                        TreeNode<K,V> hi = null, hiTail = null;//高位(i+n)节点和高位尾节点                        int lc = 0, hc = 0;                        //从首个节点向后遍历                        for (Node<K,V> e = t.first; e != null; e = e.next) {                            int h = e.hash;                            //构建树节点                            TreeNode<K,V> p = new TreeNode<K,V>                                (h, e.key, e.val, null, null);                            //原位置                            if ((h & n) == 0) {                                if ((p.prev = loTail) == null)                                    lo = p;                                else                                    loTail.next = p;                                loTail = p;                                ++lc;                            }                            //i+n位置                            else {                                if ((p.prev = hiTail) == null)                                    hi = p;                                else                                    hiTail.next = p;                                hiTail = p;                                ++hc;                            }                        }                        //如果扩容后已经不再需要tree的结构 反向转换为链表结构                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :                            (hc != 0) ? new TreeBin<K,V>(lo) : t;                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :                            (lc != 0) ? new TreeBin<K,V>(hi) : t;                        setTabAt(nextTab, i, ln);                        setTabAt(nextTab, i + n, hn);                        setTabAt(tab, i, fwd);                        advance = true;                    }                }            }        }    }}

说明: transfer方法是table扩容的核心实现。由于 ConcurrentHashMap 的扩容是新建一个table,所以主要问题就是如何把旧table的元素转移到新的table上。所以,扩容问题就演变成了“节点转移”问题。首先总结一下需要转移节点(调用transfer)的几个条件:

  1. 对table进行扩容时
  2. 在更新元素数目时(addCount方法),元素总数>=sizeCtl(sizeCtl=0.75n,达到扩容阀值),此时也需要扩容
  3. 在put操作时,发现索引节点正在转移(hash==MOVED),此时需要帮助转移

在进行节点转移之前,首先要做的就是重新初始化sizeCtl的值(sizeCtl = (hash << RESIZE_STAMP_SHIFT) + 2),这个值是一个负值,用于标识当前table正在进行转移操作,并且每有一个线程参与转移,sizeCtl就加1。transfer执行步骤如下(请结合源码注释阅读):

  1. 计算转移幅度stride(或者说是当前线程需要转移的节点数),最小为16;

  2. 创建一个相当于当前 table 两倍容量的 Node 数组,转移完成后用作新的 table;

  3. transferIndex(初始为table.length,也就是 table 的最后一个节点)开始,依次向前处理stride个节点。前面介绍过,table 的每个节点都可能是一个链表结构,因为在 put 的时候是根据(table.length-1)&hash计算出的索引,当插入新值时,如果通过 key 计算出的索引已经存在节点,那么这个新值就放在这个索引位节点的尾部(Node.next)。所以,在进行节点转移后,由于 table.length 变为原来的两倍,所以相应的索引也会改变,这时候就需要对链表进行分割,我们来看一下这个分割算法:

  • 假设当前处理的节点 table[i]=f,并且它是一个链表结构,原table容量为 n=2x,索引计算公式为i=(n - 1)&hash。在表扩张后,由于容量 n 变为 2x+1,所以索引计算就变为i=(2n - 1)&hash。如果 hash 的 x 位为0,则 hash&2x=0,此时 hash&(2x-1)== hash&((2x+1)-1),索引位 i 不变;如果 hash 的 x 位为1,则 hash&2x=2x == n,在扩容后 x 变为 x+1,此时需要加上 x 位的值,即 hash&(2x-1) + hash&2x,也就是 i+n。举个栗子:设 n=100000 (25),x=5,hash 为100101。n-1=011111,那么i=hash&(n-1)=000101;扩容后容量变为m=1000000(26),m-1=0111111,那么 i 就变成了 hash&(m-1)=100101,此时就需要加上 x 位的值,也就是 hash&n。

  • 如果当前节点为红黑树结构,也是利用这个算法进行分割,不同的是,在分割完成之后,如果这两个新的树节点<=6,则调用untreeify方法把树结构转为链表结构。

  1. 最后把操作过的节点都设为 ForwardingNode 节点(hash= MOVED,这样别的线程就可以检测到)。

transfer操作完成后,table的结构变化如下:

扩容之后的table变化

3.1.6 addCount(long,int)

private final void addCount(long x, int check) {    CounterCell[] as; long b, s;    if ((as = counterCells) != null ||        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//counterCells为null,CAS更新base       CounterCell a; long v; int m;        boolean uncontended = true;        if (as == null || (m = as.length - 1) < 0 ||            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||            !(uncontended =              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {            fullAddCount(x, uncontended);//在线程争用资源时,使用fullAddCount计算更新元素数            return;        }        if (check <= 1)            return;        s = sumCount();//计算元素总数,用于后面的扩容操作    }    if (check >= 0) {        //检查扩容        Node<K,V>[] tab, nt; int n, sc;        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&               (n = tab.length) < MAXIMUM_CAPACITY) {            int rs = resizeStamp(n);            //其他线程在进行扩容操作            if (sc < 0) {                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                    transferIndex <= 0)                    break;                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                    transfer(tab, nt);            }            else if (U.compareAndSwapInt(this, SIZECTL, sc,                                         (rs << RESIZE_STAMP_SHIFT) + 2))                transfer(tab, null);            s = sumCount();        }    }}

说明: put操作全部完成后,别忘了更新元素数量。addCount用来更新 ConcurrentHashMap 的元素数,根据所传参数check决定是否检查扩容,如果需要,调用transfer方法进行扩容/节点转移。这里面有一个看起来比较复杂的方法fullAddCount,作用是在线程争用资源时,使用它来计算更新元素数。这个方法的实现类似于LongAdder的add(LongAdder在上面有简单介绍),源码在此就不再详细分析了,有兴趣的同学可以研究下。

4. 总结

到此,ConcurrentHashMap的分析就告一段落了。总的来说源码比较复杂,真正理解它还是需要一些耐心的。重点是它的数据结构扩容的实现。
ConcurrentHashMap 源码分析到此结束,希望对大家有所帮助,如您发现文章中有不妥的地方,请留言指正,谢谢。

网友评论