JUC并发—8.并发安全集合一

大纲

1.JDK 1.7的HashMap的死循环与数据丢失

2.ConcurrentHashMap的并发安全

3.ConcurrentHashMap的设计介绍

4.ConcurrentHashMap的put操作流程

5.ConcurrentHashMap的Node数组初始化

6.ConcurrentHashMap对Hash冲突的处理

7.ConcurrentHashMap的并发扩容机制

8.ConcurrentHashMap的分段锁统计元素数据

9.ConcurrentHashMap的查询操作是否涉及锁

10.ConcurrentHashMap中红黑树的使用

 

1.JDK 1.7的HashMap的死循环与数据丢失

(1)JDK 1.7的HashMap工作原理

(2)JDK 1.7的HashMap并发下导致的环形链表

(3)环形链表引发的死循环与数据丢失

(4)JDK 1.7和JDK 1.8的HashMap对比

(5)并发安全的集合

 

(1)JDK 1.7的HashMap工作原理

一.Hash算法

put(key, value) => 对key执行Hash算法 => 根据Hash值用类似取模的算法 => 定位数组的某一个元素 => 如果数组元素为空,则将value存放在该数组元素里。

 

二.Hash冲突

如果两个key的Hash值,经过取模算法定位到数组的同一个位置,此时就会用链表处理这种Hash冲突。

 

三.数组扩容

如果数组元素达到了:数组大小 * loadFactor(0.75),此时就会数组扩容。扩容时会按照倍数扩容,首先创建一个两倍大小的新数组。然后遍历原来的数组元素,对每个元素的key值进行Hash运算。接着将Hash运算后的Hash值对新数组大小进行取模,定位到新数组位置。

 

(2)JDK 1.7的HashMap并发下导致的环形链表

多线程并发操作HashMap时,可能会在扩容过程中形成一个环形链表。比如两个线程同时插入一个元素,而此时恰好两个线程同时触发了数组扩容。那么在数组扩容的过程中,就可能会形成一个环形链表。

 

下面是JDK1.7中HashMap扩容的核心源码。进行数组扩容时,会使用头插法来进行链表迁移。如果并发执行的两个线程同时使用头插法进行链表迁移,那么就有可能形成一个环形链表。

//JDK1.7的HashMap扩容的核心方法 void transfer(Entry[] newTable) {     Entry[] src = table;//旧的数组     int newCapacity = newTable.length;     for (int j = 0; j < src.length; j++) {         Entry<K,V> e = src[j];         if (e != null) {             src[j] = null;             do {                 Entry<K,V> next = e.next;                 //线程1执行到这里,假设此时的链表为:newTable[i] = <k1,v1> -> <k2,v2>                 //那么可知:e = <k1,v1>,next = <k2,v2>                 //恰好此时CPU发生了上下文切换,于是切换到线程2去执行扩容                 //线程2扩容时处理完链表的这两个节点后,newTable[i]就变成了:<k2,v2> -> <k1,v1>                 //然后CPU又切换回线程1来执行,由于此时e = <k1,v1>,那么后续代码对e.next赋值后,e就成为环形链表了:                 //也就是e = <k1,v1> -> <k2,v2> -> <k1,v1>,最后e又赋值给newTable[i]                 int i = indexFor(e.hash, newCapacity);//在新数组的位置                 //头插法:刚开始newTable[i]为null,后来newTable[i]变为<k1,v1>;                 //然后当e=<k2,v2>时,这里e设为<k2,v2> -> <k1,v1>,并又赋值给newTable[i]                 //接着遍历链表当e=<k3,v3>时,这里e设为<k3,v3> -> <k2,v2> -> <k1,v1> ...                 e.next = newTable[i];                 newTable[i] = e;                 e = next;             } while (e != null);         }     } }

(3)环形链表引发的死循环与数据丢失

一.环形链表导致死循环

假如执行get(k3)时,k3的Hash取模算法定位到环形链表的位置。于是开始遍历环形链表,但由于环形链表里没有k3的值,所以会导致在环形链表中无法找到k3对应的值进行返回。这样就导致了一直在环形链表中进行死循环,无法退出遍历。最后导致CPU飙升,线上系统被这个get操作卡死。

 

二.环形链表导致丢失数据

上面例子就导致了从出发是无法找到的,因此这条数据就永久丢失了,甚至会被垃圾回收掉。

 

(4)JDK 1.7和JDK 1.8的HashMap对比

在JDK1.7中,HashMap采用数组 + 链表的数据结构来存储数据。在多个线程并发扩容时,可能会造成环形链表最终导致死循环和数据丢失。

 

在JDK1.8中,HashMap采用数组 + 链表 + 红黑树的数据结构来存储数据,并且优化了JDK1.7中的数组扩容方案,解决了死循环和数据丢失的问题。但是在并发场景下调用put()方法时,有可能会存在数据覆盖的问题。

 

(5)并发安全的集合

比如HashTable使用synchronized来保证线程的安全性,比如Collections.synchronizedMap可以把一个线程不安全的Map,通过synchronized的方式,将其变成安全的。

 

但是这些方法在线程竞争激烈的情况下,效率都比较低。因为它们都是在方法层面上使用了synchronized实现的锁机制,从而导致不管是put操作还是get操作都需要去竞争同一把锁。

 

ConcurrentHashMap既能保证并发安全,性能也好于HashTable等集合。

 

2.ConcurrentHashMap的并发安全

(1)如何理解ConcurrentHashMap的并发安全

(2)ConcurrentHashMap在复合操作中的安全问题

(3)ConcurrentMap可解决复合操作的安全问题

(4)ConcurrentMap支持lambda表达式操作

 

(1)如何理解ConcurrentHashMap的并发安全

只能保证多线程并发执行时,容器中的数据不会被破坏。无法保证涉及多个线程的复合操作的正确性,复合操作会有并发安全问题。

 

(2)ConcurrentHashMap在复合操作中的安全问题

假设需要通过一个ConcurrentHashMap来记录每个用户的访问次数。如果指定用户已经有访问次数的记录,则进行递增,否则添加新访问记录。

 

如下代码在多线程并发调用时,会存在并发安全问题。虽然ConcurrentHashMap对于数据操作本身是安全的,但这里是复合操作,也就是"读—修改—写",而这三个操作作为一个整体却不是原子的。所以当多个线程访问同一个用户时,很可能会覆盖相互操作的结果,从而造成该用户的访问记录次数少于实际访问次数。

public class Demo {     private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);      		public static void main(String[] args) throws InterruptedException {         Long accessCount = USER_ACCESS_COUNT.get("张三");         if (accessCount == null) {             USER_ACCESS_COUNT.put("张三", 1L);         } else {             USER_ACCESS_COUNT.put("张三", accessCount + 1);         }     } }

(3)ConcurrentMap可解决复合操作的安全问题

虽然ConcurrentHashMap是并发安全的,但对于其复合操作需要特别关注。上述复合操作的安全问题的解决方案是,可以对复合操作加锁,也可以使用ConcurrentMap接口来解决复合操作的安全问题。

 

ConcurrentMap是一个支持并发访问的Map集合,相当于在原本的Map集合上新增了一些方法来扩展Map的功能。

 

ConcurrentMap接口定义的如下4个方法,都能满足原子性的,可以用在ConcurrentHashMap的复合操作场景中。

//A java.util.Map providing thread safety and atomicity guarantees. public interface ConcurrentMap<K, V> extends Map<K, V> {     ...     //向ConcurrentHashMap集合插入数据     //如果插入数据的key不存在于集合中,则保存当前数据并且返回null     //如果key已经存在,则返回存在的key对应的value     V putIfAbsent(K key, V value);          //根据key和value来删除ConcurrentHashMap集合中的元素     //该删除操作必须保证key和value完全匹配,删除成功则返回true,否则返回false     boolean remove(Object key, Object value);          //根据key和oldValue来替换ConcurrentHashMap中已经存在的值,新的值是newValue     //该替换操作必须保证key和oldValue玩去匹配,替换成功则返回true,否则返回false     boolean replace(K key, V oldValue, V newValue);          //和replace(key, oldValue, newValue)不同之处在于,少了对oldValue的判断     //如果替换成功,则返回替换之前的value,否则返回null     V replace(K key, V value);     ... }

因此,可以基于ConcurrentMap提供的接口对上述Demo进行改造。将原来ConcurrentHashMap第一次的put()方法替换为putIfAbsent()方法,将原来ConcurrentHashMap修改用的put()方法替换为replace()方法。由于putIfAbsent()方法和replace()方法都能保证原子性,所以并发安全了。同时增加一个while(true)方法以实现一个类似自旋的操作,确保操作成功。

public class KeyUtil {     private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);      public static void main(String[] args) throws InterruptedException {         while (true) {             Long accessCount = USER_ACCESS_COUNT.get("张三");             if (accessCount == null) {                 if (USER_ACCESS_COUNT.putIfAbsent("张三", 1L) == null) {                     break;                 }             } else {                 if (USER_ACCESS_COUNT.replace("张三", accessCount, accessCount + 1)) {                     break;                 }             }         }     } }

(4)ConcurrentMap支持lambda表达式操作

一.computeIfAbsent()方法

该方法通过判断传入的key是否存在来对ConcurrentMap进行数据初始化。如果key存在,则不做任何处理。如果key不存在,则调用mappingFunction计算出value值添加到Map。

//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1 USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);

二.computeIfPresent()方法

该方法对已经存在的key对应的value值进行修改。如果key不存在,则返回null。如果key存在,则调用mappingFunction计算出value值修改Map。

//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码: USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);

三.compute()方法

compute()方法是computeIfAbsent()方法和computeIfPresent()方法的结合体。不管key是否存在,都会调用mappingFunction计算出value值。如果key存在,则对value进行修改。如果key不存在,则进行初始化处理。

//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1 USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);  //如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码: USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);  //如果张三这个用户存在,则对其value加1,否则初始化其值为1 USER_ACCESS_COUNT.compute("张三", (k,v) -> (v == null) ? 1L : v + 1);

 

3.ConcurrentHashMap的设计介绍

(1)JDK1.8相比于JDK1.7的改进

(2)ConcurrentHashMap的设计思想

(3)ConcurrentHashMap的数据结构定义

 

(1)JDK1.8相比于JDK1.7的改进

一.取消了segment分段设计,直接使用Node数组来保存数据

采用Node数组元素作为锁的粒度,进一步减少并发冲突的范围和概率。

 

二.引入红黑树设计

红黑树降低了极端情况下查询某个结点数据的时间复杂度,从O(n)降低到了O(logn),提升了查找性能。

 

(2)ConcurrentHashMap的设计思想

一.通过对数组元素加锁来降低锁的粒度

二.多线程进行并发扩容

三.高低位迁移方法

四.链表转红黑树及红黑树转链表

五.分段锁来实现数据统计

 

(3)ConcurrentHashMap的数据结构定义

ConcurrentHashMap采用Node数组来存储数据,数组长度默认为16。Node表示数组中的一个具体的数据结点,并且实现了Map.Entry接口。Node的key和val属性,表示实际存储的key和value。Node的hash属性,表示当前key对应的hash值。Node的next属性,表示如果是链表结构,则指向下一个Node结点。

 

当链表长度大于等于8 + Node数组长度大于64时,链表会转为红黑树,红黑树的存储使用TreeNode来实现。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //The default initial table capacity.     //Must be a power of 2 (i.e., at least 1) and at most MAXIMUM_CAPACITY.     private static final int DEFAULT_CAPACITY = 16;          //The array of bins. Lazily initialized upon first insertion.     //Size is always a power of two. Accessed directly by iterators.     transient volatile Node<K,V>[] table;//用来存储ConcurrentHashMap数据的Node数组          //Key-value entry.       //This class is never exported out as a user-mutable Map.Entry      //(i.e., one supporting setValue; see MapEntry below),      //but can be used for read-only traversals used in bulk tasks.     //Subclasses of Node with a negative hash field are special,      //and contain null keys and values (but are never exported).       //Otherwise, keys and vals are never null.     static class Node<K,V> implements Map.Entry<K,V> {         final int hash;//当前key对应的hash值         final K key;//实际存储的key         volatile V val;//实际存储的value         volatile Node<K,V> next;//如果是链表结构,则指向下一个Node结点               Node(int hash, K key, V val, Node<K,V> next) {             this.hash = hash;             this.key = key;             this.val = val;             this.next = next;         }         ...     }          //Nodes for use in TreeBins     static final class TreeNode<K,V> extends Node<K,V> {         TreeNode<K,V> parent;//red-black tree links         TreeNode<K,V> left;         TreeNode<K,V> right;         TreeNode<K,V> prev;//needed to unlink next upon deletion         boolean red;           TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {             super(hash, key, val, next);             this.parent = parent;         }         ...     }     ... }

 

4.ConcurrentHashMap的put操作流程

(1)ConcurrentHashMap的put操作流程

(2)ConcurrentHashMap和HashMap的put操作

(3)为什么ConcurrentHashMap是并发安全的

 

(1)ConcurrentHashMap的put操作流程

首先通过key的hashCode的高低16位的位与操作来计算key的hash值,让32位的hashCode都参与运算以降低数组大小小于32时哈希冲突的概率。

 

然后判断Node数组是否为空或者Node数组的长度是否为0。如果为空或者为0,则调用initTable()方法进行初始化。如果不为空,则通过hash & (n - 1)计算当前key在Node数组中的下标位置。并通过tabAt()方法获取该位置的值f,然后判断该位置的值f是否为空。

 

如果该位置的值f为空,则把当前的key和value封装成Node对象。然后尝试通过casTabAt()方法使用CAS设置该位置的值f为封装好的Node对象。如果CAS设置成功,则退出for循环,否则继续进行下一次for循环。

 

如果该位置的值f不为空,则判断Node数组是否正处于扩容中。如果是,那么当前线程就调用helpTransfer()方法进行并发扩容。如果不是,那么说明当前的key在Node数组中出现了Hash冲突。于是通过synchronized关键字,对该位置的值f进行Hash冲突处理。其实JUC还可以继续优化,比如先用CAS尝试修改哈希冲突下的链表或红黑树。如果CAS修改失败,那么再通过使用synchronized对该数组元素加锁来进行处理。

 

最后,会调用addCount()方法统计Node数组中的元素个数。

public class ConcurrentHashMapDemo {     public static void main(String[] args) {         ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();         map.put("k1", "v1");     } }  public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //The array of bins. Lazily initialized upon first insertion.     //Size is always a power of two. Accessed directly by iterators.     transient volatile Node<K,V>[] table;          //Creates a new, empty map with the default initial table size (16).     public ConcurrentHashMap() {              }          //Creates a new, empty map with an initial table size      //accommodating the specified number of elements without the need to dynamically resize.     //@param initialCapacity The implementation performs internal sizing to accommodate this many elements.     public ConcurrentHashMap(int initialCapacity) {         if (initialCapacity < 0) {             throw new IllegalArgumentException();         }         int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));         this.sizeCtl = cap;     }          //Returns a power of two table size for the given desired capacity.     private static final int tableSizeFor(int c) {         int n = c - 1;         n |= n >>> 1;         n |= n >>> 2;         n |= n >>> 4;         n |= n >>> 8;         n |= n >>> 16;         return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;     }          static final int spread(int h) {         //通过hashCode的高低16位的异或运算来计算hash值,以降低数组大小比32小的时候的哈希冲突概率         return (h ^ (h >>> 16)) & HASH_BITS;     }          //Maps the specified key to the specified value in this table.     //Neither the key nor the value can be null.     public V put(K key, V value) {         return putVal(key, value, false);     }          //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性     //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的     //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);     }          //CAS设置Node数组的元素为某个Node对象     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);     }          final V putVal(K key, V value, boolean onlyIfAbsent) {         if (key == null || value == null) {             throw new NullPointerException();         }                //通过key的hashCode的高低16位的位与操作来计算hash值         int hash = spread(key.hashCode());         int binCount = 0;                 //这是一个没有结束条件的for循环,用来自旋         //其中Node数组的引用赋值给了tab变量         for (Node<K,V>[] tab = table;;) {             Node<K,V> f; int n, i, fh;             if (tab == null || (n = tab.length) == 0) {                 //调用initTable()方法初始化Node数组                 tab = initTable();             } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                 //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环                 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {                     break;// no lock when adding to empty bin                 }             } else if ((fh = f.hash) == MOVED) {                 //如果发现Node数组正处于扩容中,那么就进行并发扩容                 tab = helpTransfer(tab, f);             } else {                 V oldVal = null;                 //处理Hash冲突                 synchronized (f) {                     if (tabAt(tab, i) == f) {                         if (fh >= 0) {//如果是链表                             binCount = 1;                             for (Node<K,V> e = f;; ++binCount) {                                 K ek;                                 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {                                     oldVal = e.val;                                     if (!onlyIfAbsent) {                                         e.val = value;                                     }                                     break;                                 }                                 Node<K,V> pred = e;                                 if ((e = e.next) == null) {                                     pred.next = new Node<K,V>(hash, key, value, null);                                     break;                                 }                             }                         } else if (f instanceof TreeBin) {//如果是红黑树                             Node<K,V> p;                             binCount = 2;                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {                                 oldVal = p.val;                                 if (!onlyIfAbsent) {                                     p.val = value;                                 }                             }                         }                     }                 }                 if (binCount != 0) {                     //如果链表的元素大于等于8                     if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8                         treeifyBin(tab, i);//链表转红黑树                     }                     if (oldVal != null) {                         return oldVal;                     }                     break;                 }             }         }         //调用addCount()方法统计Node数组元素的个数         addCount(1L, binCount);         return null;     }     ... }

(2)ConcurrentHashMap和HashMap的put操作

都是通过key的hashCode的高低16位的异或运算,来降低Hash冲突概率。

 

都是通过Hash值与数组大小-1的位与运算(取模),来定位key在数组的位置。

 

但ConcurrentHashMap使用了自旋 + CAS + synchronized来处理put操作,从而保证了多个线程对数组里某个key进行赋值时的效率 + 并发安全性。

public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {     static final int TREEIFY_THRESHOLD = 8;//链表转红黑树的阈值     ...          public V put(K key, V value) {         return putVal(hash(key), key, value, false, true);     }        final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {         Node<K,V>[] tab; Node<K,V> p; int n, i;         if ((tab = table) == null || (n = tab.length) == 0) {             n = (tab = resize()).length;         }         if ((p = tab[i = (n - 1) & hash]) == null) {             //如果通过哈希寻址算法定位到的下标为i的数组元素为空(即tab[i]为空)             //那么就可以直接将一个新创建的Node对象放到数组的tab[i]这个位置;             tab[i] = newNode(hash, key, value, null);         } else {             Node<K,V> e; K k;             if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) {                 //通过哈希寻址算法定位到的数组位置已有Node元素                 //那么判断是否为相同的key,如果是相同的key则进行value覆盖                 e = p;             } else if (p instanceof TreeNode) {                 //通过哈希寻址算法定位到的数组位置已有Node元素,而且不是相同的key                 //那么通过"p instanceof TreeNode)",判断数组的tab[i]元素是否是一颗红黑树                 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);             } else {                 ...             }             ...         }         ++modCount;         //判断数组大小size,是否已经达到了扩容阈值threshold大小         if (++size > threshold) {             resize();         }         afterNodeInsertion(evict);         return null;     }     ... }

(3)为什么ConcurrentHashMap是并发安全的

首先在初始化Node数组时,会通过自旋 + CAS去设置sizeCtl的值来获得锁。然后在put()操作时,也会通过自旋 + CAS去设置数组某个位置的值。当出现Hash冲突时,则使用synchronized关键字来修改数组某个位置的值。

 

5.ConcurrentHashMap的Node数组初始化

(1)调用put()方法时才初始化Node数组

(2)initTable()方法的初始化逻辑

(3)sizeCtl的状态流转

 

(1)调用put()方法时才初始化Node数组

Node数组的初始化过程是被动的,当调用ConcurrentHashMap.put()方法时,如果发现Node数组还没有被初始化,才会调用initTable()方法完成初始化。

 

(2)initTable()方法的初始化逻辑

initTable()方法和一般的初始化方法不同,因为需要考虑多线程并发情形。

 

首先while循环的退出条件是Node数据即table初始化成功,否则一直循环。这其实就使用到了自旋的机制,因为多个线程调用initTable()必然会竞争。而在竞争的情况下如果不采用独占锁机制,就只能通过自旋来不断重试。

 

然后通过sizeCtl是否小于0来判断当前是否有其他线程正在进行初始化。如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源。如果没有,则通过CAS修改sizeCtl变量的值为-1。

 

如果CAS修改sizeCtl成功,则表示当前线程获取初始化Node数组的锁成功了;

如果CAS修改sizeCtl失败,则表示当前线程获取初始化Node数组的锁失败了;

 

对于获取锁失败的线程,会继续进入下一次while循环进行重试,这样设计是为了避免出现多个线程同时初始化Node数组。

 

对于获取锁成功的线程,首先会判断Node数组是否已经初始化完成。如果Node数组已经初始化完成,则退出while循环。如果Node数组还是空,则创建一个Node数组,然后赋值给table变量。并且计算下次扩容的阈值(0.75倍当前数组容量),然后赋值给sizeCtl。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      //sizeCtl = -1,表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组     //sizeCtl < -1,用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量     //sizeCtl = 0,表示Node数组未初始化,并且在ConcurrentHashMap构造方法中没有指定初始容量     //sizeCtl > 0,如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75),如果未初始化,则表示数组的初始容量     private transient volatile int sizeCtl;     private static final long SIZECTL;        static {         try {             U = sun.misc.Unsafe.getUnsafe();//获取UnSafe对象             Class<?> k = ConcurrentHashMap.class;             SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));//获取sizeCtl变量的偏移量             ...         } catch (Exception e) {             throw new Error(e);         }     }     ...          //初始化Node数组     //Initializes table, using the size recorded in sizeCtl.     private final Node<K,V>[] initTable() {         Node<K,V>[] tab; int sc;         //退出while循环的条件是Node数组即table初始化成功         while ((tab = table) == null || tab.length == 0) {             //判断当前是否有其他线程正在进行初始化             if ((sc = sizeCtl) < 0) {                 //如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源                 Thread.yield();//lost initialization race; just spin             } else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                 //如果没有线程正在进行初始化,则通过CAS修改sizeCtl变量的值为-1                 //如果CAS修改成功,则表示当前线程获得了初始化数组的锁                 //如果CAS修改失败,则表示当前线程获取初始化数组的锁失败                 try {                     //再次判断Node数组是否为空,即Node数组是否已经初始化完成                     //因为执行Thread.yield()让出CPU资源的线程必然会再次执行到这里                     if ((tab = table) == null || tab.length == 0) {                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                         @SuppressWarnings("unchecked")                         //初始化大小为n的Node数组,然后赋值给tab变量和table变量                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                         //赋值给ConcurrentHashMap的全局Node数组table                         table = tab = nt;                         //计算下次扩容的阈值,阈值的计算是当前数组容量的0.75倍                         sc = n - (n >>> 2);                     }                 } finally {                     //最后将扩容的阈值赋值给sizeCtl                     sizeCtl = sc;                 }                 break;             }         }         return tab;     }     ... }

(3)sizeCtl的状态流转

一.sizeCtl = -1

表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组。

二.sizeCtl < -1

用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量。

三.sizeCtl = 0

表示Node数组未初始化,且创建ConcurrentHashMap时没有指定初始容量。

四.sizeCtl > 0

如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75)。如果Node数组未初始化,则表示数组的初始容量。

 

JUC并发—8.并发安全集合一

 

6.ConcurrentHashMap对Hash冲突的处理

(1)Hash冲突的几个解决方案

(2)ConcurrentHashMap对Hash冲突的处理

(3)链表长度大于8时是扩容还是转化为红黑树

 

(1)Hash冲突的几个解决方案

一.开放寻址法

如果位置i被占用,那么就探查i+1、i+2、i+3的位置。ThreadLocal采用的就是开放寻址法。

 

二.链式寻址法

Hash表的每个位置都连接一个链表。当发生Hash冲突时,冲突的元素会被加入到这个位置的链表中。ConcurrentHashMap就是基于链式寻址法解决Hash冲突的。

 

三.再Hash法

提供多个不同的Hash函数,当发生Hash冲突时,使用第二个、第三个等。

 

(2)ConcurrentHashMap对Hash冲突的处理

首先使用synchronized对当前位置的Node对象f进行加锁。由于这种锁控制在数组的单个数据元素上,所以长度为16的数组理论上就可以支持16个线程并发写入数据。

 

然后判断当前位置的Node对象f是链表还是红黑树。如果是链表,那么就把当前的key/value封装成Node对象插入到链表的尾部。如果是红黑树,那么就调用TreeBin的putTreeVal()方法往红黑树插入结点。

 

最后判断链表的长度是否大于等于8,如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是将链表转化为红黑树。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //The array of bins. Lazily initialized upon first insertion.     //Size is always a power of two. Accessed directly by iterators.     transient volatile Node<K,V>[] table;          //Maps the specified key to the specified value in this table.     //Neither the key nor the value can be null.     public V put(K key, V value) {         return putVal(key, value, false);     }          //获取Node数组在位置i的元素     //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的     //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);     }          //CAS设置Node数组的元素为某个Node对象     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);     }          final V putVal(K key, V value, boolean onlyIfAbsent) {         if (key == null || value == null) {             throw new NullPointerException();         }                //通过key的hashCode的高低16位的位与操作来计算hash值         int hash = spread(key.hashCode());         int binCount = 0;                 //这是一个没有结束条件的for循环,用来自旋         //其中Node数组的引用赋值给了tab变量         for (Node<K,V>[] tab = table;;) {             Node<K,V> f; int n, i, fh;             if (tab == null || (n = tab.length) == 0) {                 //调用initTable()方法初始化Node数组                 tab = initTable();             } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                 //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环                 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {                     break;// no lock when adding to empty bin                 }             } else if ((fh = f.hash) == MOVED) {                 //发现Node数组正处于扩容中,那么就进行并发扩容                 tab = helpTransfer(tab, f);             } else {                 V oldVal = null;                 //处理Hash冲突                 synchronized (f) {//使用synchronized对当前数组位置的Node对象f进行加锁                     if (tabAt(tab, i) == f) {                         if (fh >= 0) {//如果是链表                             binCount = 1;//binCount用来记录链表的长度                             //从链表的头结点开始遍历每个结点                             for (Node<K,V> e = f;; ++binCount) {                                 K ek;                                 //如果存在相同的key,则修改该key的value                                 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {                                     oldVal = e.val;                                     if (!onlyIfAbsent) {                                         e.val = value;                                     }                                     break;                                 }                                 Node<K,V> pred = e;                                 //找到链表的最后一个结点                                 if ((e = e.next) == null) {                                     //把当前的key/value封装成Node对象插入到链表的尾部                                     pred.next = new Node<K,V>(hash, key, value, null);                                     break;                                 }                             }                         } else if (f instanceof TreeBin) {//如果是红黑树                             Node<K,V> p;                             binCount = 2;                             //调用TreeBin的putTreeVal()方法往红黑树插入结点                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {                                 oldVal = p.val;                                 if (!onlyIfAbsent) {                                     p.val = value;                                 }                             }                         }                     }                 }                 //最后判断链表的长度是否大于等于8                 if (binCount != 0) {                     //如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是转化为红黑树                     if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8                         treeifyBin(tab, i);//是扩容数组还是转化为红黑树                     }                     if (oldVal != null) {                         return oldVal;                     }                     break;                 }             }         }         //调用addCount()方法统计Node数组元素的个数         addCount(1L, binCount);         return null;     }     ... }

(3)链表长度大于8时是扩容还是转化为红黑树

当链表长度 >= 8时ConcurrentHashMap会对链表采用两种方式进行优化。

 

方式一:对数组进行扩容

当数组长度 <= 64,且链表长度 >= 8时,优先选择对数组进行扩容。

 

方式二:把链表转化为红黑树

当数组长度 > 64,且链表长度 >= 8时,会将链表转化为红黑树。

 

treeifyBin()方法的作用是根据相关阈值来决定是扩容还是把链表转为红黑树。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      static final int MIN_TREEIFY_CAPACITY = 64;     ...     //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.     private final void treeifyBin(Node<K,V>[] tab, int index) {         Node<K,V> b; int n, sc;         if (tab != null) {             //如果当前数组的长度小于64,则调用tryPresize()方法进行数组扩容             if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {                 tryPresize(n << 1);//数组扩容             } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {                 synchronized (b) {                     if (tabAt(tab, index) == b) {                         TreeNode<K,V> hd = null, tl = null;                         for (Node<K,V> e = b; e != null; e = e.next) {                             //构建一个TreeNode并插入红黑树中                             TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);                             if ((p.prev = tl) == null) {                                 hd = p;                             } else {                                 tl.next = p;                             }                             tl = p;                         }                         setTabAt(tab, index, new TreeBin<K,V>(hd));                     }                 }             }         }     }     ... }

 

7.ConcurrentHashMap的并发扩容机制

(1)ConcurrentHashMap扩容的前置操作

(2)ConcurrentHashMap并发扩容的机制

(3)ConcurrentHashMap并发扩容的流程

 

(1)ConcurrentHashMap扩容的前置操作

ConcurrentHashMap的tryPresize()方法用于处理数组扩容前的前置操作,该方法主要分为四部分。

 

第一部分:

首先通过tableSizeFor()方法计算传入size的最小的2的幂次方。

 

第二部分:

然后判断Node数组是否已初始化,如果还没初始化则要先进行初始化。初始化时会计算扩容阈值为数组大小的0.75倍 + 将扩容阈值赋值给sizeCtl。

 

第三部分:

如果Node数组已经初始化,则判断是否需要进行扩容。如果Node数组已经被其他线程完成扩容,则当前线程退出循环,无需扩容。如果Node数组已达到最大容量,则无法再进行扩容,也需退出循环。

 

第四部分:

调用transfer()方法开始执行扩容操作。如果sizeCtl < 0,说明此时已经有其他线程在执行扩容了。如果sizeCtl >= 0,说明此时没有其他线程进行扩容。当前线程都会先通过CAS成功设置sizeCtl后,再调用transfer()方法来扩容。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //Returns a power of two table size for the given desired capacity.     private static final int tableSizeFor(int c) {         int n = c - 1;         n |= n >>> 1;         n |= n >>> 2;         n |= n >>> 4;         n |= n >>> 8;         n |= n >>> 16;         return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;     }      //Tries to presize table to accommodate the given number of elements.     private final void tryPresize(int size) {         //一.首先通过tableSizeFor()方法计算传入size的最小的2的幂次方         int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);         int sc;         while ((sc = sizeCtl) >= 0) {             Node<K,V>[] tab = table; int n;             //二.判断Node数组是否已经初始化,如果还没初始化,需要先进行初始化             if (tab == null || (n = tab.length) == 0) {                 n = (sc > c) ? sc : c;                 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                     try {                         if (table == tab) {                             @SuppressWarnings("unchecked")                             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                             table = nt;                             sc = n - (n >>> 2);//扩容阈值为数组大小的0.75倍                         }                     } finally {                         sizeCtl = sc;//将扩容阈值赋值给sizeCtl                     }                 }             }             //三.如果Node数组已经初始化,则判断是否需要进行扩容             else if (c <= sc || n >= MAXIMUM_CAPACITY) {                 //c <= sc,说明Node数组已经被其他线程完成扩容了,不需要再进行扩容                 //n >= MAXIMUM_CAPACITY,说明Node数组已达到最大容量,无法再进行扩容                 break;             }             //四.调用transfer()方法开始执行扩容操作             else if (tab == table) {                 int rs = resizeStamp(n);                 //如果sc < 0,说明此时已经有其他线程在执行扩容了                 //于是当前线程可以先通过CAS成功设置sizeCtl的值后,再调用transfer()方法协助扩容                 if (sc < 0) {                     Node<K,V>[] nt;                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) {                          break;                     }                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {                         transfer(tab, nt);                     }                 }                 //如果sc >= 0,说明此时没有其他线程进行扩容                 //于是当前线程也是先通过CAS成功设置sizeCtl的值后,再调用transfer()方法进行扩容                 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {                     transfer(tab, null);                 }             }         }     }     ... }

(2)ConcurrentHashMap并发扩容的机制

一.ConcurrentHashMap中的扩容设计

二.多线程并发扩容的原理

 

一.ConcurrentHashMap中的扩容设计

扩容就是创建一个2倍原大小的数组,然后把原数组的数据迁移到新数组中。但多线程环境下的扩容,需要考虑其他线程会同时往数组添加元素的情况。如果简单地对扩容过程增加一把同步锁,保证扩容过程不存在其他线程操作,那么就会对性能的损耗特别大,特别是数据量比较大时,阻塞的线程会很多。

 

首先使用CAS来实现计算每个线程的迁移区间。然后使用synchronized把锁粒度控制到每个数组元素上。如果数组有16个元素就有16把锁,如果数组有32个元素就有32把锁。接着如果线程A在进行数组扩容时,线程B要修改数组的某个元素f。那么就让修改元素的线程加入迁移,从而实现多线程并发扩容来提高效率。等数组扩容完成后,线程B才继续去修改元素f。最后通过高低位迁移逻辑计算出高位链和低位链,大大减少了数据迁移次数。

 

二.多线程并发扩容的原理

当存在多个线程并发扩容及数据迁移时,默认会给每个线程分配一个区间。这个区间的默认长度是16,每个线程会负责自己区间内的数据迁移工作。如果只有两个线程对长度为64的数组迁移数据,则每个线程要做2次迁移,迁移过程会依赖transferIndex来更新每个线程的迁移区间。

 

(3)ConcurrentHashMap并发扩容的流程

ConcurrentHashMap的transfer()方法用于处理数组扩容时的流程细节,该方法主要分为五部分:

第一部分:创建扩容后的数组

第二部分:计算当前线程的数据迁移区间

第三部分:更新扩容标记advance

第四部分:开始数据迁移和扩容

第五部分:完成迁移后的判断

 

第一部分:创建扩容后的数组

这部分代码主要做两件事情:

一.计算每个线程处理的迁移区间长度,默认是16。

二.初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable。该数组的长度是原数组的2倍,并且设置transferIndex的值为为原数组大小。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      //The next table to use; non-null only while resizing.     private transient volatile Node<K,V>[] nextTable;     ...          //Moves and/or copies the nodes in each bin to new table.      //tab是原数组,nextTab是扩容后的数组     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {         int n = tab.length, stride;         //计算每个线程处理的迁移区间长度,默认是16         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {             stride = MIN_TRANSFER_STRIDE;//subdivide range         }         //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍         //并且设置transferIndex的值为为原数组大小         if (nextTab == null) {//initiating             try {                 @SuppressWarnings("unchecked")                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n                 nextTab = nt;//将创建的扩容数组赋值给nextTab             } catch (Throwable ex) {//try to cope with OOME                 sizeCtl = Integer.MAX_VALUE;                 return;             }             nextTable = nextTab;//将创建的扩容数组赋值给nextTable             transferIndex = n;//设置transferIndex为原来的数组大小         }         ...     } }

第二部分:计算当前线程的数据迁移区间

下面的while循环会计算每个执行到此处的线程需要负责的数据迁移区间。假设当前数组长度是32,需要扩容到64。那么此时transferIndex = 32,nextn = 64,n = 32。

 

当前线程第一次for循环:nextIndex被transferIndex赋值为32,之后CAS修改transferIndex。CAS修改成功后,nextBound = 32 - 16 = 16,transferIndex = 16。所以bound = 16,i = 31,当前线程负责的迁移区间为[bound, i] = [16, 31]。

 

当前线程第二次for循环,或者有其他线程进来第一次for循环:由于此时transferIndex = 16,所以nextIndex会被transferIndex赋值为16。之后CAS修改transferIndex为0,修改成功后,nextBound = 16 - 16 = 0。所以bound = 0,i = 15,此时线程负责的迁移区间为[bound, i] = [0, 15]。

 

需要注意的是:每次循环都会通过if (--i >= bound || finishing)判断区间是否已迁移完成。如果已完成,则会继续进入while循环中的CAS,获取新的迁移区间。

 

数组从高位往低位进行迁移,比如第一次for循环,处理的区间是[16, 31]。那么就会从位置为31开始往前进行遍历,对每个数组元素进行数据迁移。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      //The next table to use; non-null only while resizing.     private transient volatile Node<K,V>[] nextTable;     ...          //Moves and/or copies the nodes in each bin to new table.      //tab是原数组,nextTab是扩容后的数组     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {         ...         int nextn = nextTab.length;//扩容后的数组长度         //继承自Node的ForwardingNode表示一个正在被迁移的Node         //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);         //advance字段用来判断是否还有待处理的数据迁移工作,也就是扩容标记         boolean advance = true;         boolean finishing = false; // to ensure sweep before committing nextTab         //当前线程负责的迁移区间是[bound, i]         for (int i = 0, bound = 0;;) {             Node<K,V> f; int fh;             //while循环会计算每个执行到此处的线程需要负责的数据迁移区间             while (advance) {                 //假设当前数组长度是32,需要扩容到64;                 //那么此时transferIndex = 32,nextn = 64,n = 32;                 //刚开始循环时i = 0,nextIndex被transferIndex赋值为32                 int nextIndex, nextBound;                 if (--i >= bound || finishing) {                     //一开始i = bound = 0,所以不会进入这里,而是进入U.compareAndSwapInt()的条件中                     //但后来bound = 16, i = 31后,就会进入这里,退出循环                     //此后,每次--i,当i = bound = 16时,就又会进入U.compareAndSwapInt()的条件中,重新获取数据迁移区间                     advance = false;                 } else if ((nextIndex = transferIndex) <= 0) {                     //判断当前线程是否已经分配到了新的迁移区间                     i = -1;                     advance = false;                 } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {                      //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,                     //那么advance设置为false,退出while循环                     //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间                     bound = nextBound;//第一次for循环nextBound = 16                     i = nextIndex - 1;//第一次for循环i = 31                     advance = false;                 }             }             ...         }         ...     }          //A node inserted at head of bins during transfer operations.     static final class ForwardingNode<K,V> extends Node<K,V> {         final Node<K,V>[] nextTable;         ForwardingNode(Node<K,V>[] tab) {             super(MOVED, null, null, null);             this.nextTable = tab;         }         ...     } }

第三部分:更新扩容标记advance

如果位置i的数组元素Node为空,说明该Node对象不需要迁移。所以通过casTabAt()方法修改原数组在位置i的元素为fwd对象,这样其他线程在进行put()操作的时候就可以发现当前数组正在扩容。

 

如果位置i的数组元素Node的hash值为MOVED,那么说明该Node对象已经被迁移了。所以设置扩容标记位advance为true,等下次for循环时进入while循环--i。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      //The next table to use; non-null only while resizing.     private transient volatile Node<K,V>[] nextTable;          //The array of bins. Lazily initialized upon first insertion.     //Size is always a power of two. Accessed directly by iterators.     transient volatile Node<K,V>[] table;          ...     //Moves and/or copies the nodes in each bin to new table.      //tab是原数组,nextTab是扩容后的数组     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {         ...         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);         ...         //当前线程负责的迁移区间是[bound, i]         for (int i = 0, bound = 0;;) {             ...             } else if ((f = tabAt(tab, i)) == null) {                 //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容                 advance = casTabAt(tab, i, null, fwd);             } else if ((fh = f.hash) == MOVED) {                 //设置扩容标记位advance为true,等下次for循环时进入while循环--i                 advance = true; // already processed                 //第三部分结束             } else {             ...         }         ...     }          //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的      //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);     }      static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);     } }

第四部分:开始数据迁移和扩容

首先对当前要迁移的Node结点f添加同步锁synchronized,避免多线程竞争。

 

如果结点f的哈希值大于0,则表示Node结点f为链表或普通结点,那么此时就需要按照链表或普通结点的方式来进行数据迁移。

 

如果结点f属于TreeBin类型,则表示结点f为红黑树,那么此时就要按红黑树的规则进行数据迁移。

 

需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表。

 

接着使用高位链和低位链的计算方法构造高位链和低位链,遍历链表的每一个结点,计算p.hash & n的值。如果值为0,表示需要迁移,属于高位链;否则不需要迁移,属于低位链。

 

比如在数组长度为16的一个链表中,hash值为:4, 20, 52, 68, 84, 100。经过hash & (n - 1)得到的下标位置都是4,接着数组长度需要扩容到32。于是经过hash & (n - 1)计算,发现20, 52, 84对应的下标变成了20。这就意味着,这个链表中hash值为20, 52, 84的结点需要迁移到位置20。

 

最后把低位链设置到扩容后的数组的位置i,把高位链设置到位置i + n。此时当前线程已处理完位置为i的数据迁移,于是设置advance为true,让后续的for循环可以进入while循环来实现对i的递减继续迁移数据。

 

第五部分:完成迁移后的判断

如果数据迁移完成了,则把扩容后的数组赋值给table。如果还没完成数据迁移,则通过CAS修改并发扩容的线程数。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      //The next table to use; non-null only while resizing.     private transient volatile Node<K,V>[] nextTable;     ...          //Moves and/or copies the nodes in each bin to new table.      private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {         //第一部分开始:创建扩容后的数组         int n = tab.length, stride;//n就是原数组大小         //计算每个线程处理的迁移区间长度,默认是16         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {             stride = MIN_TRANSFER_STRIDE;//subdivide range         }         //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍         //并且设置transferIndex的值为为原数组大小         if (nextTab == null) {//initiating             try {                 @SuppressWarnings("unchecked")                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n                 nextTab = nt;//将创建的扩容数组赋值给nextTab             } catch (Throwable ex) {//try to cope with OOME                 sizeCtl = Integer.MAX_VALUE;                 return;             }             nextTable = nextTab;//将创建的扩容数组赋值给nextTable             transferIndex = n;//设置transferIndex为原来的数组大小         }         //第一部分结束         //第二部分开始:计算当前线程的数据迁移区间         int nextn = nextTab.length;//扩容后的数组长度         //继承自Node的ForwardingNode表示一个正在被迁移的Node         //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);         //advance字段用来判断是否还有待处理的数据迁移工作         boolean advance = true;         boolean finishing = false; // to ensure sweep before committing nextTab                 //当前线程负责的迁移区间是[bound, i]         for (int i = 0, bound = 0;;) {             Node<K,V> f; int fh;             //while循环会计算每个执行到此处的线程需要负责的数据迁移区间             while (advance) {                 //假设当前数组长度是32,需要扩容到64;                 //那么此时transferIndex = 32,nextn = 64,n = 32;                 //刚开始循环时i = 0,nextIndex被transferIndex赋值为32                 int nextIndex, nextBound;                 if (--i >= bound || finishing) {                     //一开始i = bound = 0,所以不会进入这里,而是进入U.compareAndSwapInt()的条件中                     //但后来bound = 16, i = 31后,就会进入这里,退出循环                     //此后,每次--i,当i = bound = 16时,就又会进入U.compareAndSwapInt()的条件中,重新获取数据迁移区间                     advance = false;                 } else if ((nextIndex = transferIndex) <= 0) {                     //判断当前线程是否已经分配到了新的迁移区间                     i = -1;                     advance = false;                 } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {                      //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,                     //那么advance设置为false,退出while循环                     //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间                     bound = nextBound;//第一次for循环nextBound = 16                     i = nextIndex - 1;//第一次for循环i = 31                     advance = false;                 }             }             //第二部分结束             if (i < 0 || i >= n || i + n >= nextn) {                 //第五部分开始:完成迁移后的判断                 int sc;                 //如果数据迁移完成了,则把扩容后的数组赋值给table                 if (finishing) {                     nextTable = null;                     table = nextTab;                     sizeCtl = (n << 1) - (n >>> 1);                     return;                 }                 //如果还没完成数据迁移,则通过CAS修改并发扩容的线程数                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {                         return;                     }                     finishing = advance = true;                     i = n; // recheck before commit                 }                 //第五部分结束             } else if ((f = tabAt(tab, i)) == null) {                 //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容                 advance = casTabAt(tab, i, null, fwd);             } else if ((fh = f.hash) == MOVED) {                 //设置扩容标记位advance为true,等下次for循环时进入while循环--i                 advance = true; // already processed                 //第三部分结束             } else {                 //第四部分开始:开始数据迁移和扩容                 synchronized (f) {//首先对当前要迁移的Node结点f添加同步锁synchronized,避免多线程竞争                     if (tabAt(tab, i) == f) {                         Node<K,V> ln, hn;                         //如果fh >= 0,则表示Node结点f为链表或普通结点,此时需要按照链表或普通结点的方式来进行数据迁移                         if (fh >= 0) {                             int runBit = fh & n;                             Node<K,V> lastRun = f;                             //for循环遍历链表,计算出当前链表最后一个需要迁移或者不需要迁移的结点位置                             //遍历链表的每一个结点,计算p.hash & n,如果值为0,表示需要迁移,否则不需要迁移                             for (Node<K,V> p = f.next; p != null; p = p.next) {                                 int b = p.hash & n;                                 if (b != runBit) {                                     runBit = b;                                     lastRun = p;                                 }                             }                             if (runBit == 0) {                                 ln = lastRun;                                 hn = null;                             } else {                                 hn = lastRun;                                 ln = null;                             }                             for (Node<K,V> p = f; p != lastRun; p = p.next) {                                 int ph = p.hash; K pk = p.key; V pv = p.val;                                 if ((ph & n) == 0) {                                     //将ln作为参数,以ln为基础构造低位链,不需要迁移                                     ln = new Node<K,V>(ph, pk, pv, ln);                                 } else {                                     //将hn作为参数,以hn为基础构造高位链,需要迁移                                     hn = new Node<K,V>(ph, pk, pv, hn);                                 }                             }                             //把低位链设置到扩容后的数组的位置i                             setTabAt(nextTab, i, ln);                             //把高位链设置到扩容后的数组的位置i + n                             setTabAt(nextTab, i + n, hn);                             //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了                             setTabAt(tab, i, fwd);                             //当前线程已处理完位置为i的数据的迁移,于是设置advance为true,让后续的for循环继续进入while循环来实现对i的递减                             advance = true;                         } else if (f instanceof TreeBin) {                             //如果f instanceof TreeBin,则表示结点f为红黑树,需要按照红黑树的规则进行数据迁移                             //需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表                             TreeBin<K,V> t = (TreeBin<K,V>)f;                             TreeNode<K,V> lo = null, loTail = null;                             TreeNode<K,V> hi = null, hiTail = null;                             int lc = 0, hc = 0;                             for (Node<K,V> e = t.first; e != null; e = e.next) {                                 int h = e.hash;                                 TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);                                 if ((h & n) == 0) {                                     if ((p.prev = loTail) == null) {                                         lo = p;                                     } else {                                         loTail.next = p;                                     }                                     loTail = p;                                     ++lc;                                 } else {                                     if ((p.prev = hiTail) == null) {                                         hi = p;                                     } else {                                         hiTail.next = p;                                     }                                     hiTail = p;                                     ++hc;                                 }                             }                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;                             setTabAt(nextTab, i, ln);                             setTabAt(nextTab, i + n, hn);                             setTabAt(tab, i, fwd);                             advance = true;                         }                     }                 }                 //第四部分结束             }         }     }     ... }

 

8.ConcurrentHashMap的分段锁统计元素数据

(1)ConcurrentHashMap维护数组元素个数思路

(2)ConcurrentHashMap维护数组元素个数流程

(3)维护数组元素个数的addCount()方法

(4)维护数组元素个数的fullAddCount()方法

(5)获取数组元素个数的size()方法

 

(1)ConcurrentHashMap维护数组元素个数思路

当调用完put()方法后,ConcurrentHashMap必须增加当前元素的个数,以方便在size()方法中获得存储的元素个数。

 

在常规的集合中,只需要一个全局int类型的字段保存元素个数即可。每次添加一个元素,就对这个size变量 + 1。

 

在ConcurrentHashMap中,需要保证对该变量修改的并发安全。如果使用同步锁synchronized,那么性能开销比较大,不合适。所以ConcurrentHashMap使用了自旋 + 分段锁来维护元素个数。

 

(2)ConcurrentHashMap维护数组元素个数流程

ConcurrentHashMap采用了两种方式来保存元素的个数。当线程竞争不激烈时,直接使用baseCount + 1来增加元素个数。当线程竞争比较激烈时,构建一个CounterCell数组,默认长度为2。然后随机选择一个CounterCell,针对该CounterCell中的value进行保存。

 

增加元素个数的流程如下:

JUC并发—8.并发安全集合一

(3)维护数组元素个数的addCount()方法

addCount()方法的作用主要包括两部分:

一.累加ConcurrentHashMap中的元素个数

二.通过check >= 0判断是否需要进行数组扩容

 

其中增加数组元素个数的核心逻辑是:

首先通过CAS修改全局成员变量baseCount来进行累加。注意会先判断(as = counterCells) != null,再尝试对baseCount进行累加。这是因为如果一个集合发生过并发,那么后续发生并发的可能性会更大。如果CAS累加baseCount失败,则尝试使用CounterCell来进行累加。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //Base counter value, used mainly when there is no contention,     but also as a fallback during table initialization races. Updated via CAS.     private transient volatile long baseCount;          //Table of counter cells. When non-null, size is a power of 2.     private transient volatile CounterCell[] counterCells;      private static final long BASECOUNT;     static {         try {             U = sun.misc.Unsafe.getUnsafe();             ...             BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount"));             ...         } catch (Exception e) {             throw new Error(e);         }     }          //Maps the specified key to the specified value in this table.     //Neither the key nor the value can be null.     public V put(K key, V value) {         return putVal(key, value, false);     }          final V putVal(K key, V value, boolean onlyIfAbsent) {         ...         //调用addCount()方法统计Node数组元素的个数         addCount(1L, binCount);         return null;     }          //Adds to count, and if table is too small and not already resizing, initiates transfer.      //If already resizing, helps perform transfer if work is available.       //Rechecks occupancy after a transfer to see if another resize is already needed because resizings are lagging additions.     //x是要增加的数组元素个数     private final void addCount(long x, int check) {         CounterCell[] as; long b, s;         //首先通过CAS修改全局成员变量baseCount来进行累加         //注意:这里先判断(as = counterCells) != null,再尝试对baseCount进行CAS累加         //这是因为如果一个集合发生过并发,那么后续发生并发的可能性会更大,这种思想在并发编程中很常见         if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {             //增加数组元素个数             CounterCell a; long v; int m;             boolean uncontended = true;             //如果CAS修改baseCount失败,则尝试使用CounterCell来进行累加             //1.as == null,说明CounterCell数组还没初始化             //2.(m = as.length - 1) < 0,说明CounterCell数组还没初始化             //3.(a = as[ThreadLocalRandom.getProbe() & m]) == null,说明CounterCell数组已经创建了,             //但是Hash定位到的数组位置没有对象实例,说明这个数字还存在没有CounterCell实例对象的情况             //4.如果U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)返回false,说明存在多线程竞争             if (as == null || (m = as.length - 1) < 0                  || (a = as[ThreadLocalRandom.getProbe() & m]) == null                  || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {                  //调用fullAddCount()方法实现数组元素个数的累加                 fullAddCount(x, uncontended);                 return;             }             if (check <= 1) {                 return;             }             //sumCount()方法返回总的元素个数,也就是CounterCell数组的元素个数和baseCount两者的和             s = sumCount();         }         if (check >= 0) {             //处理数组扩容             Node<K,V>[] tab, nt; int n, sc;             while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {                 int rs = resizeStamp(n);                 if (sc < 0) {                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                         transferIndex <= 0) {                         break;                     }                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {                         transfer(tab, nt);                     }                 } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {                     transfer(tab, null);                 }                 s = sumCount();             }         }     }          final long sumCount() {         CounterCell[] as = counterCells; CounterCell a;         long sum = baseCount;         if (as != null) {             for (int i = 0; i < as.length; ++i) {                 if ((a = as[i]) != null) {                     sum += a.value;                 }             }         }         return sum;     }     ... }

(4)维护数组元素个数的fullAddCount()方法

fullAddCount()方法的作用主要包括三部分:初始化CounterCell数组、增加数组元素个数、对CounterCell数组扩容。

 

注意:为了定位当前线程添加的数组元素个数应落到CounterCell数组哪个元素,会使用ThreadLocalRandom确定当前线程对应的hash值,由该hash值和CounterCell数组大小进行类似于取模的位与运算来决定。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //Spinlock (locked via CAS) used when resizing and/or creating CounterCells.     private transient volatile int cellsBusy;      //Table of counter cells. When non-null, size is a power of 2.     private transient volatile CounterCell[] counterCells;          //x是要增加的数组元素个数     private final void fullAddCount(long x, boolean wasUncontended) {         //通过ThreadLocalRandom来确定当前线程对应的hash值         int h;         if ((h = ThreadLocalRandom.getProbe()) == 0) {             ThreadLocalRandom.localInit();//force initialization             h = ThreadLocalRandom.getProbe();             wasUncontended = true;         }         boolean collide = false;// True if last slot nonempty         for (;;) {             CounterCell[] as; CounterCell a; int n; long v;             //(as = counterCells) != null && (n = as.length) > 0,表示counterCells数组已经完成初始化             if ((as = counterCells) != null && (n = as.length) > 0) {                 //第二部分:增加数组元素个数,分两种情况                 if ((a = as[(n - 1) & h]) == null) {                     //情况一:(a = as[(n - 1) & h]) == null,表示将当前线程定位到counterCells数组的某位置的元素为null                     //此时直接把当前要增加的元素个数x保存到新创建的CounterCell对象,然后将该对象赋值到CounterCell数组的该位置即可                     if (cellsBusy == 0) {//Try to attach new Cell                         //先创建一个CounterCell对象,并把x保存进去                         CounterCell r = new CounterCell(x);//Optimistic create                         //U.compareAndSwapInt(this, CELLSBUSY, 0, 1)返回true,表示当前线程占有了锁                         if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {                             boolean created = false;                             try {//Recheck under lock                                 CounterCell[] rs; int m, j;                                 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {                                     //把新构建的保存了元素个数x的CounterCell对象保存到rs[j]的位置                                     rs[j] = r;                                     created = true;                                 }                             } finally {                                 cellsBusy = 0;                             }                             if (created) {                                 break;                             }                             continue;//Slot is now non-empty                         }                     }                     collide = false;                 } else if (!wasUncontended) {//CAS already known to fail                     wasUncontended = true;//Continue after rehash                 } else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) {                     //情况二:如果将当前线程定位到counterCells数组的某位置的元素不为null,                     //那么直接通过U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)操作,对counterCells数组的指定位置进行累加                     break;                 } else if (counterCells != as || n >= NCPU) {                     collide = false;//At max size or stale                 } else if (!collide) {                     collide = true;                 } else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {                     //第三部分:counterCells数组扩容                     //需要先通过cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1),抢占锁                     try {                         if (counterCells == as) {// Expand table unless stale                             //在原有的基础上扩容一倍                             CounterCell[] rs = new CounterCell[n << 1];                             //通过for循环进行数据迁移                             for (int i = 0; i < n; ++i) {                                 rs[i] = as[i];                             }                             //把扩容后的对象赋值给counterCells                             counterCells = rs;                         }                     } finally {                         //恢复标识                         cellsBusy = 0;                     }                     collide = false;                     continue;//继续下一次自旋                 }                 h = ThreadLocalRandom.advanceProbe(h);             } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {                 //第一部分:初始化CounterCell数组                 //cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1),通过cellsBusy字段来抢占锁,通过CAS修改该字段值为1表示抢到锁                  boolean init = false;                 try {//Initialize table                     if (counterCells == as) {                         //构造一个元素个数为2的CounterCell数组                         CounterCell[] rs = new CounterCell[2];                         //把要增加的数组元素个数x,保存到CounterCell数组的某个元素中                         rs[h & 1] = new CounterCell(x);                         //把初始化的CounterCell数组赋值给全局对象counterCells                         counterCells = rs;                         init = true;                     }                 } finally {                     //恢复标识                     cellsBusy = 0;                 }                 if (init) {                     break;                 }             } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) {                 break;//Fall back on using base            }         }     }     ... }

(5)获取数组元素个数的size()方法

sumCount()方法会先得到baseCount的值,保存到sum字段中。然后遍历CounterCell数组,把每个value进行累加。

 

注意:size()方法在计算总的元素个数时并没有加锁,所以size()方法返回的元素个数不一定代表此时此刻总数量。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //Spinlock (locked via CAS) used when resizing and/or creating CounterCells.     private transient volatile int cellsBusy;      //Table of counter cells. When non-null, size is a power of 2.     private transient volatile CounterCell[] counterCells;          public int size() {         long n = sumCount();         return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);     }          final long sumCount() {         CounterCell[] as = counterCells; CounterCell a;         long sum = baseCount;         if (as != null) {             for (int i = 0; i < as.length; ++i) {                 if ((a = as[i]) != null) {                     sum += a.value;                 }             }         }         return sum;     }     ... }

 

9.ConcurrentHashMap的查询操作是否涉及锁

(1)put操作会加锁

(2)size操作不会加锁

(3)get操作也不会加锁

 

(1)put操作会加锁

首先尝试通过CAS设置Node数组对应位置的Node元素。如果该位置的Node元素非空,或者CAS设置失败,则说明发生了哈希冲突。此时就会使用synchronized关键字对该数组元素加锁来处理链表或者红黑树。

 

其实JUC还可以继续优化,先用CAS尝试修改哈希冲突下的链表或红黑树。如果CAS修改失败,再通过使用synchronized对该数组元素加锁来处理。

 

(2)size操作不会加锁

size()方法在计算总的元素个数时并没有加锁,所以size()方法返回的元素个数不一定代表此时此刻数组元素的总数量。

 

(3)get操作也不会加锁

get()方法也使用了CAS操作,通过Unsafe类让数组中的元素具有可见性。保证线程根据tabAt()方法获取数组的某个位置的元素时,能获取最新的值。所以get不加锁,但通过volatile读数组,可以保证读到数组元素的最新值。

 

虽然table变量使用了volatile,但这只保证了table引用对所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的。因此才通过Unsafe类的getObjectVolatile()来保证table数组中元素的可见性。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //The array of bins. Lazily initialized upon first insertion.     //Size is always a power of two. Accessed directly by iterators.     transient volatile Node<K,V>[] table;          public V get(Object key) {         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;         int h = spread(key.hashCode());         if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {             if ((eh = e.hash) == h) {                 if ((ek = e.key) == key || (ek != null && key.equals(ek))) {                     return e.val;                 }             } else if (eh < 0) {                 return (p = e.find(h, key)) != null ? p.val : null;             }             while ((e = e.next) != null) {                 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {                     return e.val;                 }             }         }         return null;     }          //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性     //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的      //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);     }     ... }

 

10.ConcurrentHashMap中红黑树的使用

(1)treeifyBin()方法的逻辑

(2)TreeBin的成员变量和方法

(3)TreeBin在构造方法中将链表转为红黑树

(4)TreeBin在插入元素时实现红黑树的自平衡

 

(1)treeifyBin()方法的逻辑

put操作中当发现链表元素>=8时会调用treeifyBin()方法将链表转为红黑树。首先通过tabAt()方法从Node数组中获取位置为index的元素并赋值给变量b,然后使用synchronized对Node数组中位置为index的元素b进行加锁,接着通过for循环遍历Node数组中位置为index的元素b这个链表,并且根据链表中每个结点的数据封装成一个TreeNode对象来组成新链表,最后把新链表的头结点作为参数传给TreeBin构造方法来完成红黑树的构建。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.     //将Node数组tab中位置为index的元素,从链表转化为红黑树     private final void treeifyBin(Node<K,V>[] tab, int index) {         Node<K,V> b; int n, sc;         if (tab != null) {             if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {                 tryPresize(n << 1);//数组扩容             } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {                 synchronized (b) {//b就是链表,先用synchronized对b加锁,保证并发安全                     if (tabAt(tab, index) == b) {                         TreeNode<K,V> hd = null, tl = null;//hd是新链表的头结点,tl是新链表的尾结点                         //将链表b赋值给e,然后遍历通过e.next赋值回给e来遍历链表                         for (Node<K,V> e = b; e != null; e = e.next) {                             //根据Node结点e来封装一个TreeNode对象                             TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);                             if ((p.prev = tl) == null) {                                 hd = p;                             } else {                                 //尾插法构造新链表                                 tl.next = p;                             }                             tl = p;                         }                         //将构造好的新链表的头结点hd作为参数,创建一个TreeBin对象                         setTabAt(tab, index, new TreeBin<K,V>(hd));                     }                 }             }         }     }     ... }

(2)TreeBin的成员变量和方法

ConcurrentHashMap中红黑树用继承自Node的TreeNode来表示。TreeBin则主要提供了红黑树的一系列功能的实现,并且实现了读写锁。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     //Nodes for use in TreeBins     static final class TreeNode<K,V> extends Node<K,V> {         TreeNode<K,V> parent;//red-black tree links         TreeNode<K,V> left;         TreeNode<K,V> right;         TreeNode<K,V> prev;//needed to unlink next upon deletion         boolean red;          TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {             super(hash, key, val, next);             this.parent = parent;         }         ...     }          //TreeNodes used at the heads of bins.     //TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root.      //They also maintain a parasitic read-write lock forcing writers (who hold bin lock)      //to wait for readers (who do not) to complete before tree restructuring operations.     static final class TreeBin<K,V> extends Node<K,V> {         TreeNode<K,V> root;//红黑树根结点         volatile TreeNode<K,V> first;//链表头结点,由构造函数传入         volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)         volatile int lockState;//表示锁的状态         // values for lockState         static final int WRITER = 1;//写锁状态         static final int WAITER = 2;//等待获取写锁状态         static final int READER = 4;//读锁状态         ...         //构造函数,将以b为头结点的链表转换为红黑树         //Creates bin with initial set of nodes headed by b.         TreeBin(TreeNode<K,V> b) {             ...         }              //对红黑树的根结点加写锁         //Acquires write lock for tree restructuring.         private final void lockRoot() {             if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) {                 contendedLock(); // offload to separate method             }         }                 //释放写锁         //Releases write lock for tree restructuring.         private final void unlockRoot() {             lockState = 0;         }                 //根据key获取指定的结点         //Returns matching node or null if none.          //Tries to search using tree comparisons from root, but continues linear search when lock not available.         final Node<K,V> find(int h, Object k) {             ...         }         ...     }     ... }

(3)TreeBin在构造方法中将链表转为红黑树

treeifyBin()方法在对链表进行转化时,会先构建一个双向链表,然后将该双向链表传入TreeBin的构造方法。

 

TreeBin的构造方法会通过如下处理将该双向链表转化为红黑树:

一.如果红黑树为空,则初始化红黑树的根结点

二.如果红黑树不为空,则按平衡二叉树逻辑插入

三.通过balanceInsertion()方法进行自平衡

 

TreeBin的构造方法可以分为三部分:

 

第一部分:初始化红黑树

遍历链表b,将链表b的头结点设置为红黑树的根结点,接着设置红黑树根结点的左右子结点为null,以及设置红黑树根结点为黑色。

 

第二部分:将链表中的结点添加到初始化好的红黑树

首先计算dir的值。如果dir = -1,表示红黑树中被插入结点的hash值大于新添加结点x的hash值。如果dir = 1,表示红黑树中被插入结点的hash值小于新添加结点x的hash值。然后根据dir的值来决定新添加结点x是被插入结点的左结点还是右结点,最后调用TreeBin的balanceInsertion()方法对红黑树进行自平衡处理。

 

第三部分:对红黑树进行自平衡

调用TreeBin的balanceInsertion()方法对红黑树进行自平衡处理。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     static final class TreeBin<K,V> extends Node<K,V> {         TreeNode<K,V> root;//红黑树根结点         volatile TreeNode<K,V> first;//链表头结点,由构造函数传入         volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)         volatile int lockState;//表示锁的状态         //values for lockState         static final int WRITER = 1;//写锁状态         static final int WAITER = 2;//等待获取写锁状态         static final int READER = 4;//读锁状态         ...         //构造函数,将以b为头结点的链表转换为红黑树         TreeBin(TreeNode<K,V> b) {             //第一部分开始:初始化红黑树             super(TREEBIN, null, null, null);             this.first = b;             //r表示红黑树的根结点             TreeNode<K,V> r = null;             //遍历链表b,x将作为新添加的红黑树结点             for (TreeNode<K,V> x = b, next; x != null; x = next) {                 next = (TreeNode<K,V>)x.next;                 //把新添加的红黑树结点x的左右子结点设置为null                 x.left = x.right = null;                 //r表示红黑树的根结点,r == null表示红黑树为空,将x结点设置为红黑树的根结点                 if (r == null) {                     x.parent = null;                     //把红黑树的根结点设置为黑色                     x.red = false;                     r = x;                     //第一部分结束                 } else {                     //第二部分开始:将链表中的结点添加到初始化好的红黑树中                     //x是新添加的红黑树结点                     K k = x.key;                     int h = x.hash;                     Class<?> kc = null;                     //p是红黑树中被插入的结点                     for (TreeNode<K,V> p = r;;) {                         int dir, ph;                         K pk = p.key;                         //首先计算dir的值                         //dir = -1,表示红黑树中被插入结点的hash值大于新添加结点x的hash值                         //dir = 1,表示红黑树中被插入结点的hash值小于新添加结点x的hash值                         if ((ph = p.hash) > h) {                             dir = -1;                         } else if (ph < h) {                             dir = 1;                         } else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) {                              dir = tieBreakOrder(k, pk);                             TreeNode<K,V> xp = p;                         }                         //然后根据dir的值来决定新添加的结点x是左结点还是右结点                         if ((p = (dir <= 0) ? p.left : p.right) == null) {                             x.parent = xp;                             if (dir <= 0) {                                 xp.left = x;                             } else {                                 xp.right = x;                             }                             //第二部分结束                             //第三部分开始:红黑树进行自平衡                             //r代表一棵红黑树,x代表往红黑树r添加的结点                             r = balanceInsertion(r, x);                             //第三部分结束                             break;                         }                     }                 }             }             this.root = r;             assert checkInvariants(root);         }         ...     }     ... }

(4)TreeBin在插入元素时实现红黑树的自平衡

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {      ...     static final class TreeBin<K,V> extends Node<K,V> {         TreeNode<K,V> root;//红黑树根结点         volatile TreeNode<K,V> first;//链表头结点,由构造函数传入         volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)         volatile int lockState;//表示锁的状态         // values for lockState         static final int WRITER = 1;//写锁状态         static final int WAITER = 2;//等待获取写锁状态         static final int READER = 4;//读锁状态         ...         //root代表一棵红黑树,x代表往红黑树r添加的结点         static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {             //所有往红黑树添加的结点默认为红色             x.red = true;             //自旋,xp表示添加结点x的父结点,xpp表示添加结点x的爷结点,xppl表示爷结点的左结点,xppr表示爷结点的右结点             for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {                 if ((xp = x.parent) == null) {//此处判断条件表示:x结点的父结点为空                     //由于只有根结点的父结点才会为空,所以此时x结点为根结点,于是设置根结点x为黑色                     x.red = false;                     return x;                 } else if (!xp.red || (xpp = xp.parent) == null) {//此处判断条件表示:表示x结点的父结点为黑色,或者x结点的爷结点为空                      //那么直接返回红黑树root,不需要处理                     return root;                 }                 //代码执行到这里,说明x结点的父结点为红色                 if (xp == (xppl = xpp.left)) {//此处判断条件表示:表示x结点的父结点xp是其爷结点xpp的左子结点xppl                     if ((xppr = xpp.right) != null && xppr.red) {//此处判断条件表示:x结点的叔结点存在且为红色                         //那么直接修改父结点和叔结点的颜色为黑色                         xppr.red = false;                         xp.red = false;                         xpp.red = true;                         x = xpp;                     } else {//此处判断条件表示:如果x结点的叔结点不存在,或者叔结点存在且为黑色                         if (x == xp.right) {//如果x结点是父结点的右子结点,则按x结点的父结点进行左旋                             root = rotateLeft(root, x = xp);//将x结点的父结点赋值给x结点                             xpp = (xp = x.parent) == null ? null : xp.parent;                         }                         if (xp != null) {                             xp.red = false;                             if (xpp != null) {                                 xpp.red = true;                                 root = rotateRight(root, xpp);                             }                         }                     }                 } else {//表示x结点的父结点是其爷结点的右子结点                     if (xppl != null && xppl.red) {                         xppl.red = false;                         xp.red = false;                         xpp.red = true;                         x = xpp;                     } else {                         if (x == xp.left) {                             root = rotateRight(root, x = xp);                             xpp = (xp = x.parent) == null ? null : xp.parent;                         }                         if (xp != null) {                             xp.red = false;                             if (xpp != null) {                                 xpp.red = true;                                 root = rotateLeft(root, xpp);                             }                         }                     }                 }             }         }                 static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) {             TreeNode<K,V> r, pp, rl;             if (p != null && (r = p.right) != null) {                 if ((rl = p.right = r.left) != null) {                     rl.parent = p;                 }                 if ((pp = r.parent = p.parent) == null) {                     (root = r).red = false;                 } else if (pp.left == p) {                     pp.left = r;                 } else {                     pp.right = r;                 }                 r.left = p;                 p.parent = r;             }             return root;         }          static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) {             TreeNode<K,V> l, pp, lr;             if (p != null && (l = p.left) != null) {                 if ((lr = p.left = l.right) != null) {                     lr.parent = p;                 }                 if ((pp = l.parent = p.parent) == null) {                     (root = l).red = false;                 } else if (pp.right == p) {                     pp.right = l;                 } else {                     pp.left = l;                 }                 l.right = p;                 p.parent = l;             }             return root;         }         ...     } }

 

发表评论

评论已关闭。

相关文章

当前内容话题
  • 0