ConcurrentHashMap
HashMap在put的时候,插入的元素数量超过了容量(由负载因子决定)的范围是会触发扩容操作,就是rehash,这个会重新将原数组的内容重新hash到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行put操作,如果hash值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在get时会出现死循环,所以HashMap是线程不安全的。
我们来了解另一个键值存储集合HashTable,它是线程安全的,它在所有涉及到多线程操作的都加上了synchronized关键字来锁住整个table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的。
其实HashTable有很多的优化空间,锁住整个table这么粗暴的方法可以变相的柔和点,比如在多线程的环境下,对不同的数据集进行操作时其实根本就不需要去竞争一个锁,因为他们不同hash值,不会因为rehash造成线程不安全,所以互不影响,这就是锁分离技术,将锁的粒度降低,利用多个锁来控制多个小的table,这就是这篇文章的主角ConcurrentHashMap JDK1.7版本的核心思想。
ConcurrentHashMap在JDK 1.7的实现
在JDK1.7版本中,ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,如下图所示:

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。
初始化
ConcurrentHashMap的初始化是会通过位与运算来初始化Segment的大小,用ssize来表示,如下所示:
1 2 3 4 5 6
   | int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) {     ++sshift;     ssize <<= 1; }
   | 
 
如上所示,因为ssize用位于运算来计算(ssize <<=1),所以Segment的大小取值都是以2的N次方,无关concurrencyLevel的取值,当然concurrencyLevel最大只能用16位的二进制来表示,即65536,换句话说,Segment的大小最多65536个,没有指定concurrencyLevel元素初始化,Segment的大小ssize默认为16。
每一个Segment元素下的HashEntry的初始化也是按照位于运算来计算,用cap来表示,如下所示:
1 2 3
   | int cap = 1; while (cap < c)     cap <<= 1;
   | 
 
如上所示,HashEntry大小的计算也是2的N次方(cap <<=1), cap的初始值为1,所以HashEntry最小的容量为2。
put操作
对于ConcurrentHashMap的数据插入,这里要进行两次Hash去定位数据的存储位置。
1
   | static class Segment<K,V> extends ReentrantLock implements Serializable {
  | 
 
从上Segment的继承体系可以看出,Segment实现了ReentrantLock,也就带有锁的功能,当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。
get操作
ConcurrentHashMap的get操作跟HashMap类似,只是ConcurrentHashMap第一次需要经过一次hash定位到Segment的位置,然后再hash定位到指定的HashEntry,遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null。
size操作
计算ConcurrentHashMap的元素大小是一个有趣的问题,因为他是并发操作的,就是在你计算size的时候,他还在并发的插入数据,可能会导致你计算出来的size和你实际的size有相差(在你return size的时候,插入了多个数据),要解决这个问题,JDK1.7版本用两种方案。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | try {     for (;;) {         if (retries++ == RETRIES_BEFORE_LOCK) {             for (int j = 0; j < segments.length; ++j)                  ensureSegment(j).lock();          }         sum = 0L;         size = 0;         overflow = false;         for (int j = 0; j < segments.length; ++j) {             Segment<K,V> seg = segmentAt(segments, j);             if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0)                overflow = true;             }          }         if (sum == last) break;         last = sum;      }  } finally {     if (retries > RETRIES_BEFORE_LOCK) {         for (int j = 0; j < segments.length; ++j)             segmentAt(segments, j).unlock();     } }
  | 
 
- 第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的;
 
- 第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回。
 
ConcurrentHashMap在JDK 1.8的实现
JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本。

在深入JDK1.8的put和get实现之前要知道一些常量设计和数据结构,这些是构成ConcurrentHashMap实现结构的基础,下面看一下基本属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
   |  private static final int MAXIMUM_CAPACITY = 1 << 30;
 
  private static final int DEFAULT_CAPACITY = 16;
 
  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
  private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 
  private static final float LOAD_FACTOR = 0.75f;
 
  static final int TREEIFY_THRESHOLD = 8;
 
  static final int UNTREEIFY_THRESHOLD = 6;
  static final int MIN_TREEIFY_CAPACITY = 64;
  private static final int MIN_TRANSFER_STRIDE = 16;
  private static int RESIZE_STAMP_BITS = 16;
 
  private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 
  private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
 
  static final int MOVED     = -1; 
 
  static final int TREEBIN   = -2; 
 
  static final int RESERVED  = -3; 
 
  static final int NCPU = Runtime.getRuntime().availableProcessors();
 
  transient volatile Node<K,V>[] table;
 
 
 
 
 
  private transient volatile int sizeCtl;
 
  | 
 
基本属性定义了ConcurrentHashMap的一些边界以及操作时的一些控制,下面看一些内部的一些结构组成,这些是整个ConcurrentHashMap整个数据结构的核心。
基本结构
Node是ConcurrentHashMap存储结构的基本单元,继承于HashMap中的Entry,用于存储数据,源代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
   | static class Node<K,V> implements Map.Entry<K,V> {          final int hash;     final K key;          volatile V val;     volatile Node<K,V> next;
      Node(int hash, K key, V val, Node<K,V> next) {         this.hash = hash;         this.key = key;         this.val = val;         this.next = next;     }
      public final K getKey()       { return key; }     public final V getValue()     { return val; }     public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }     public final String toString(){ return key + "=" + val; }
           public final V setValue(V value) {         throw new UnsupportedOperationException();     }
      public final boolean equals(Object o) {         Object k, v, u; Map.Entry<?,?> e;         return ((o instanceof Map.Entry) &&                 (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&                 (v = e.getValue()) != null &&                 (k == key || k.equals(key)) &&                 (v == (u = val) || v.equals(u)));     }
           Node<K,V> find(int h, Object k) {         Node<K,V> e = this;         if (k != null) {             do {                 K ek;                 if (e.hash == h &&                     ((ek = e.key) == k || (ek != null && k.equals(ek))))                     return e;             } while ((e = e.next) != null);         }         return null;     } }
  | 
 
Node数据结构很简单,从上可知,就是一个链表,但是只允许对数据进行查找,不允许进行修改。
TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树源代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
   | static final class TreeNode<K,V> extends Node<K,V> {          TreeNode<K,V> parent;       TreeNode<K,V> left;     TreeNode<K,V> right;     TreeNode<K,V> prev;         boolean red; 
      TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {         super(hash, key, val, next);         this.parent = parent;     }
      Node<K,V> find(int h, Object k) {         return findTreeNode(h, k, null);     }
           final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {         if (k != null) {             TreeNode<K,V> p = this;             do  {                 int ph, dir; K pk; TreeNode<K,V> q;                 TreeNode<K,V> pl = p.left, pr = p.right;                 if ((ph = p.hash) > h)                     p = pl;                 else if (ph < h)                     p = pr;                 else if ((pk = p.key) == k || (pk != null && k.equals(pk)))                     return p;                 else if (pl == null)                     p = pr;                 else if (pr == null)                     p = pl;                 else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0)                     p = (dir < 0) ? pl : pr;                 else if ((q = pr.findTreeNode(h, k, kc)) != null)                     return q;                 else                     p = pl;             } while (p != null);         }         return null;     } }
  | 
 
TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制,部分源码结构如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
   | static final class TreeBin<K,V> extends Node<K,V> {          TreeNode<K,V> root;     volatile TreeNode<K,V> first;     volatile Thread waiter;     volatile int lockState;          static final int WRITER = 1;      static final int WAITER = 2;      static final int READER = 4; 
      
 
      TreeBin(TreeNode<K,V> b) {         super(TREEBIN, null, null, null);         this.first = b;         TreeNode<K,V> r = null;         for (TreeNode<K,V> x = b, next; x != null; x = next) {             next = (TreeNode<K,V>)x.next;             x.left = x.right = null;             if (r == null) {                 x.parent = null;                 x.red = false;                 r = x;             } else {                 K k = x.key;                 int h = x.hash;                 Class<?> kc = null;                 for (TreeNode<K,V> p = r;;) {                     int dir, ph;                     K pk = p.key;                     if ((ph = p.hash) > h)                         dir = -1;                     else if (ph < h)                         dir = 1;                     else if ((kc == null &&                               (kc = comparableClassFor(k)) == null) ||                              (dir = compareComparables(kc, k, pk)) == 0)                         dir = tieBreakOrder(k, pk);                         TreeNode<K,V> xp = p;                     if ((p = (dir <= 0) ? p.left : p.right) == null) {                         x.parent = xp;                         if (dir <= 0)                             xp.left = x;                         else                             xp.right = x;                         r = balanceInsertion(r, x);                         break;                     }                 }             }         }         this.root = r;         assert checkInvariants(root);     }          …… }
  | 
 
介绍了ConcurrentHashMap主要的属性与内部的数据结构,现在通过一个简单的例子以debug的视角看看ConcurrentHashMap的具体操作细节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | public class TestConcurrentHashMap{    
      public static void main(String[] args){         ConcurrentHashMap<String,String> map = new ConcurrentHashMap();                   map.put("id","1");         map.put("name","andy");         map.put("sex","男");                  String name = map.get("name");         Assert.assertEquals(name,"andy");                  int size = map.size();         Assert.assertEquals(size,3);     } }
  | 
 
我们先通过new ConcurrentHashMap()来进行初始化
1 2
   | public ConcurrentHashMap() { }
  | 
 
由上你会发现ConcurrentHashMap的初始化其实是一个空实现,并没有做任何事,这里后面会讲到,这也是和其他的集合类有区别的地方,初始化操作并不是在构造函数实现的,而是在put操作中实现,当然ConcurrentHashMap还提供了其他的构造函数,有指定容量大小或者指定负载因子,跟HashMap一样,这里就不做介绍了。
put操作
在上面的例子中我们新增个人信息会调用put方法,我们来看下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
   | public V put(K key, V value) {     return putVal(key, value, false); }
 
  final V putVal(K key, V value, boolean onlyIfAbsent) {     if (key == null || value == null) throw new NullPointerException();     int hash = spread(key.hashCode());      int binCount = 0;     for (Node<K,V>[] tab = table;;) {          Node<K,V> f; int n, i, fh;                  if (tab == null || (n = tab.length) == 0)             tab = initTable();         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {             if (casTabAt(tab, i, null,                          new Node<K,V>(hash, key, value, null)))                 break;                            }         else if ((fh = f.hash) == MOVED)             tab = helpTransfer(tab, f);         else {             V oldVal = null;                          synchronized (f) {                 if (tabAt(tab, i) == f) {                     if (fh >= 0) {                          binCount = 1;                         for (Node<K,V> e = f;; ++binCount) {                             K ek;                                                          if (e.hash == hash &&                                 ((ek = e.key) == key ||                                  (ek != null && key.equals(ek)))) {                                 oldVal = e.val;                                 if (!onlyIfAbsent)                                     e.val = value;                                 break;                             }                             Node<K,V> pred = e;                             if ((e = e.next) == null) {                                   pred.next = new Node<K,V>(hash, key, value, null);                                 break;                             }                         }                     } else if (f instanceof TreeBin) {                         Node<K,V> p;                         binCount = 2;                                                  if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {                             oldVal = p.val;                             if (!onlyIfAbsent)                                 p.val = value;                         }                     }                 }             }             if (binCount != 0) {                  if (binCount >= TREEIFY_THRESHOLD)                     treeifyBin(tab, i);                 if (oldVal != null)                     return oldVal;                 break;             }         }     }     addCount(1L, binCount);     return null; }
  | 
 
这个put的过程很清晰,对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述。
- 如果没有初始化就先调用initTable()方法来进行初始化过程
 
- 如果没有hash冲突就直接CAS插入
 
- 如果还在进行扩容操作就先进行扩容
 
- 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
 
- 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
 
- 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容
 
补充说明
Java 7中的ConcurrentHashMap为了实现并行访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。Java 8为了进一步提高并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。同时为了提高哈希碰撞下的寻址性能,Java 8在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为O(N))转换为红黑树(寻址时间复杂度为O(log(N))),参考Java进阶(六)从ConcurrentHashMap的演进看Java多线程核心技术。