热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

线程安全的并发容器1.7版本的ConcurrentHashMap原理分析(二十二)

线程安全的并发容器1.7版本的ConcurrentHashMap原理分析:ConcurrentHashMap使用除了Map系列应该有的线程安全的get,

线程安全的并发容器1.7版本的ConcurrentHashMap原理分析:


ConcurrentHashMap 使用

除了 Map 系列应该有的线程安全的 get,put 等方法外,ConcurrentHashMap 还提供了一个在并发下比较有用的方法 putIfAbsent,如果传入 key 对应的 value 已经存在,就返回存在的 value,不进行替换。如果不存在,就添加 key value, 返回 null。在代码层面它的作用类似于:

synchronized(map){
if (map.get(key) == null){return map.put(key, value);
} else{return map.get(key);}
}

源码如下:

public V putIfAbsent(K key, V value) {Segment s;if (value &#61;&#61; null)throw new NullPointerException();int hash &#61; hash(key);int j &#61; (hash >>> segmentShift) & segmentMask;if ((s &#61; (Segment)UNSAFE.getObject(segments, (j <

它让上述代码的整个操作是线程安全的。



1.7 下的实现


1&#xff0c;



桶&#xff1a;

static final class Segment extends ReentrantLock implements Serializable {

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。 Segment 是一种可重入锁&#xff08;ReentrantLock&#xff09;&#xff0c;在 ConcurrentHashMap 里扮演锁的 角色&#xff1b;HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和 HashMap 类似&#xff0c;是一种数组和链表结构。一个 Segment 里包含一个 HashEntry 数组&#xff0c;每个 HashEntry 是一个链表结构的元素&#xff0c; 每个 Segment 守护着一个 HashEntry 数组里的元素&#xff0c;当对 HashEntry 数组的数据 进行修改时&#xff0c;必须首先获得与它对应的 Segment 锁。


2、构造方法和初始化

public ConcurrentHashMap17(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity <0 || concurrencyLevel <&#61; 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel &#61; MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift &#61; 0;int ssize &#61; 1;while (ssize MAXIMUM_CAPACITY)initialCapacity &#61; MAXIMUM_CAPACITY;int c &#61; initialCapacity / ssize;if (c * ssize s0 &#61;new Segment(loadFactor, (int)(cap * loadFactor),(HashEntry[])new HashEntry[cap]);Segment[] ss &#61; (Segment[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments &#61; ss;}

ConcurrentHashMap 初始化方法是通过 initialCapacityloadFactor 和 concurrencyLevel(参数 concurrencyLevel 是用户估计的并发级别&#xff0c;就是说你觉得最 多有多少线程共同修改这个 map&#xff0c;根据这个来确定 Segment 数组的大小 concurrencyLevel 默认是 DEFAULT_CONCURRENCY_LEVEL &#61; 16;)等几个参数来初始 化 segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 segment 里的 HashEntry 数组来实现的。并发级别可以理解为程序运行时能够同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数&#xff0c;实际上就是 ConcurrentHashMap 中的分段锁个数&#xff0c;即 Segment[]的数组长度。ConcurrentHashMap 默认的并发度为 16&#xff0c;但用户也可以 在构造函数中设置并发度。当用户设置并发度时&#xff0c;ConcurrentHashMap 会使用大 于等于该值的最小 2 幂指数作为实际并发度&#xff08;假如用户设置并发度为 17&#xff0c;实际 并发度则为 32&#xff09;。 如果并发度设置的过小&#xff0c;会带来严重的锁竞争问题&#xff1b;如果并发度设置的过大&#xff0c; 原本位于同一个 Segment 内的访问会扩散到不同的 Segment 中&#xff0c;CPU cache 命中 率会下降&#xff0c;从而引起程序性能下降。&#xff08;文档的说法是根据你并发的线程数量决定&#xff0c; 太多会导性能降低&#xff09; segments 数组的长度 ssize 是通过 concurrencyLevel 计算得出的。为了能通 过按位与的散列算法来定位 segments 数组的索引&#xff0c;必须保证 segments 数组的长 度是 2 N 次方&#xff08;power-of-two size&#xff09;&#xff0c;所以必须计算出一个大于或等于 concurrencyLevel 的最小的 2 N 次方值来作为 segments 数组的长度。假如 concurrencyLevel 等于 1415 16&#xff0c;ssize 都会等于 16&#xff0c;即容器里锁的个数也是 16。
以下知识了解即可&#xff1a;
初始化 segmentShift segmentMask这两个全局变量需要在定位 segment 时的散列算法里使用&#xff0c;sshift 等于 ssize1 向左移位的次数&#xff0c;在默认情况下 concurrencyLevel 等于 16&#xff0c;1 需要向左移位 移动 4 次&#xff0c;所以 sshift 等于 4segmentShift 用于定位参与散列运算的位数&#xff0c; segmentShift 等于 32 sshift&#xff0c;所以等于 28&#xff0c;这里之所以用 32 是因为 ConcurrentHashMap 里的 hash()方法输出的最大数是 32 位的。segmentMask 是散 列运算的掩码&#xff0c;等于 ssize 1&#xff0c;即 15&#xff0c;掩码的二进制各个位的值都是 1。因为
ssize 的最大长度是 65536&#xff0c;所以 segmentShift 最大值是 16&#xff0c;segmentMask 最大值 65535&#xff0c;对应的二进制是 16 位&#xff0c;每个位都是 1
输入参数 initialCapacity 是 ConcurrentHashMap 的初始化容量&#xff0c;loadfactor 是每个 segment 的负载因子&#xff0c;在构造方法里需要通过这两个参数来初始化数组中的 每个 segment。上面代码中的变量 cap 就是 segment HashEntry 数组的长度&#xff0c; 它等于 initialCapacity 除以 ssize 的倍数 c&#xff0c;如果 c 大于 1&#xff0c;就会取大于等于 c 2 的 N 次方值&#xff0c;所以 segment HashEntry 数组的长度不是 1&#xff0c;就是 2 N 次方。 在默认情况下&#xff0c; ssize &#61; 16&#xff0c;initialCapacity &#61; 16&#xff0c;loadFactor &#61; 0.75f&#xff0c;那么 cap &#61; 1&#xff0c;threshold &#61; (int) cap * loadFactor &#61; 0既然 ConcurrentHashMap 使用分段锁 Segment 来保护不同段的数据&#xff0c;那么在插入和获取元素的时候&#xff0c;必须先通过散列算法定位到 Segment。 ConcurrentHashMap 会首先使用 Wang/Jenkins hash 的变种算法对元素的 hashCode 进行一次再散列。 ConcurrentHashMap 完全允许多个读操作并发进行&#xff0c;读操作并不需要加锁。 ConcurrentHashMap 实现技术是保证 HashEntry 几乎是不可变的以及 volatile 关键 字。



3、HashEntry结构&#xff1a;

static final class HashEntry {final int hash;final K key;volatile V value;volatile HashEntry next;HashEntry(int hash, K key, V value, HashEntry next) {this.hash &#61; hash;this.key &#61; key;this.value &#61; value;this.next &#61; next;}/*** Sets next field with volatile write semantics. (See above* about use of putOrderedObject.)*/final void setNext(HashEntry n) {UNSAFE.putOrderedObject(this, nextOffset, n);}// Unsafe mechanicsstatic final sun.misc.Unsafe UNSAFE;static final long nextOffset;static {try {UNSAFE &#61; sun.misc.Unsafe.getUnsafe();Class k &#61; HashEntry.class;nextOffset &#61; UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}


4、get 操作

public V get(Object key) {Segment s; // manually integrate access methods to reduce overheadHashEntry[] tab;int h &#61; hash(key);//准备定位的hash值long u &#61; (((h >>> segmentShift) & segmentMask) <)UNSAFE.getObjectVolatile(segments, u)) !&#61; null &&(tab &#61; s.table) !&#61; null) {//拿到segment下的tablefor (HashEntry e &#61; (HashEntry) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) <

get 操作先经过一次再散列&#xff0c;然后使用这个散列值通过散列运算定位到 Segment(使用了散列值的高位部分)&#xff0c;再通过散列算法定位到 table(使用了散列值 的全部)。整个 get 过程&#xff0c;没有加锁&#xff0c;而是通过 volatile 保证 get 总是可以拿到最 新值。

transient volatile HashEntry[] table; 5、put 操作



public V put(K key, V value) {Segment s;if (value &#61;&#61; null)throw new NullPointerException();int hash &#61; hash(key);//定位所需的hash值int j &#61; (hash >>> segmentShift) & segmentMask;if ((s &#61; (Segment)UNSAFE.getObject // nonvolatile; recheck(segments, (j <

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0]&#xff0c;对于其他 槽&#xff0c;在插入第一个值的时候再进行初始化。 ensureSegment 方法考虑了并发情况&#xff0c;多个线程同时进入初始化同一个槽 segment[k]&#xff0c;但只要有一个成功就可以了。 具体实现是

private Segment ensureSegment(int k) {final Segment[] ss &#61; this.segments;long u &#61; (k < seg;if ((seg &#61; (Segment)UNSAFE.getObjectVolatile(ss, u)) &#61;&#61; null) {Segment proto &#61; ss[0]; // use segment 0 as prototypeint cap &#61; proto.table.length;float lf &#61; proto.loadFactor;int threshold &#61; (int)(cap * lf);HashEntry[] tab &#61; (HashEntry[])new HashEntry[cap];if ((seg &#61; (Segment)UNSAFE.getObjectVolatile(ss, u))&#61;&#61; null) { // recheckSegment s &#61; new Segment(lf, threshold, tab);//多次检查&#xff0c;循环CAS操作&#xff0c;保证多线程下只有一个线程可以成功while ((seg &#61; (Segment)UNSAFE.getObjectVolatile(ss, u))&#61;&#61; null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg &#61; s))break;}}}return seg;}

最终调用s.put(key, hash, value, false);方法&#xff1a;

final V put(K key, int hash, V value, boolean onlyIfAbsent) {HashEntry node &#61; tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry[] tab &#61; table;int index &#61; (tab.length - 1) & hash;HashEntry first &#61; entryAt(tab, index);for (HashEntry e &#61; first;;) {if (e !&#61; null) {K k;if ((k &#61; e.key) &#61;&#61; key ||(e.hash &#61;&#61; hash && key.equals(k))) {oldValue &#61; e.value;if (!onlyIfAbsent) {e.value &#61; value;&#43;&#43;modCount;}break;}e &#61; e.next;}else {if (node !&#61; null)node.setNext(first);elsenode &#61; new HashEntry(hash, key, value, first);int c &#61; count &#43; 1;if (c > threshold && tab.length

put 方法会通过 tryLock()方法尝试获得锁&#xff0c;获得了锁&#xff0c;node null 进入 try 语句块&#xff0c;没有获得锁&#xff0c;调用 scanAndLockForPut 方法自旋等待获得锁。


6、调用 scanAndLockForPut(key, hash, value)方法&#xff1a;

private HashEntry scanAndLockForPut(K key, int hash, V value) {HashEntry first &#61; entryForHash(this, hash);HashEntry e &#61; first;HashEntry node &#61; null;int retries &#61; -1; // negative while locating nodewhile (!tryLock()) {HashEntry f; // to recheck first belowif (retries <0) {if (e &#61;&#61; null) {if (node &#61;&#61; null) // speculatively create nodenode &#61; new HashEntry(hash, key, value, null);retries &#61; 0;}else if (key.equals(e.key))retries &#61; 0;elsee &#61; e.next;}else if (&#43;&#43;retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) &#61;&#61; 0 &&(f &#61; entryForHash(this, hash)) !&#61; first) {e &#61; first &#61; f; // re-traverse if entry changedretries &#61; -1;}}return node;}

scanAndLockForPut 方法里在尝试获得锁的过程中会对对应 hashcode 的链表 进行遍历&#xff0c;如果遍历完毕仍然找不到与 key 相同的 HashEntry 节点&#xff0c;则为后续的 put 操作提前创建一个 HashEntry。当 tryLock 一定次数后仍无法获得锁&#xff0c;则通过 lock 申请锁。在获得锁之后&#xff0c;Segment 对链表进行遍历&#xff0c;如果某个 HashEntry 节点具有相同的 key&#xff0c;则更新该 HashEntry value 值&#xff0c;否则新建一个 HashEntry 节点&#xff0c;采用头插法&#xff0c;将它设置为链表的新 head 节点并将原头节点设为新 head 的下一个节点。新建过程中如果节点总数&#xff08;含新建 的 HashEntry&#xff09;超过 threshold&#xff0c;则调用 rehash()方法对 Segment 进行扩容&#xff0c;最后 将新建 HashEntry 写入到数组中。



7、rehash 操作

put方法里 rehash(node);

private void rehash(HashEntry node) {HashEntry[] oldTable &#61; table;int oldCapacity &#61; oldTable.length;int newCapacity &#61; oldCapacity <<1;/* oldCapacity*2 */threshold &#61; (int)(newCapacity * loadFactor);HashEntry[] newTable &#61;(HashEntry[]) new HashEntry[newCapacity];int sizeMask &#61; newCapacity - 1;for (int i &#61; 0; i e &#61; oldTable[i];if (e !&#61; null) {HashEntry next &#61; e.next;int idx &#61; e.hash & sizeMask;if (next &#61;&#61; null) // Single node on listnewTable[idx] &#61; e;else { // Reuse consecutive sequence at same slotHashEntry lastRun &#61; e;int lastIdx &#61; idx;for (HashEntry last &#61; next;last !&#61; null;last &#61; last.next) {int k &#61; last.hash & sizeMask;if (k !&#61; lastIdx) {lastIdx &#61; k;lastRun &#61; last;}}newTable[lastIdx] &#61; lastRun;// Clone remaining nodesfor (HashEntry p &#61; e; p !&#61; lastRun; p &#61; p.next) {V v &#61; p.value;int h &#61; p.hash;int k &#61; h & sizeMask;HashEntry n &#61; newTable[k];newTable[k] &#61; new HashEntry(h, p.key, v, n);}}}}int nodeIndex &#61; node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] &#61; node;table &#61; newTable;}

扩容是新创建了数组&#xff0c;然后进行迁移数据&#xff0c;最后再将 newTable 设置给属性table。为了避免让所有的节点都进行复制操作&#xff1a;由于扩容是基于 2 的幂指来操作&#xff0c; 假设扩容前某 HashEntry 对应到 Segment 中数组的 index i&#xff0c;数组的容量为 capacity&#xff0c;那么扩容后该 HashEntry 对应到新数组中的 index 只可能为 i 或者 i&#43;capacity&#xff0c;因此很多HashEntry 节点在扩容前后 index 可以保持不变。



假设原来 table 长度为 4&#xff0c;那么元素在 table 中的分布是这样的
扩容后 table 长度变为 8&#xff0c;那么元素在 table 中的分布变成&#xff1a;
可以看见 hash 值为 34 56 的下标保持不变&#xff0c;而 15,23,77 的下标都是在原 来下标的基础上&#43;4 即可&#xff0c;可以快速定位和减少重排次数。 该方法没有考虑并发&#xff0c;因为执行该方法之前已经获取了锁。


8、remove 操作 :key做参数

public V remove(Object key) {int hash &#61; hash(key);Segment s &#61; segmentForHash(hash);return s &#61;&#61; null ? null : s.remove(key, hash, null);}

或&#xff1a;用key和value做参数

public boolean remove(Object key, Object value) {int hash &#61; hash(key);Segment s;return value !&#61; null && (s &#61; segmentForHash(hash)) !&#61; null &&s.remove(key, hash, value) !&#61; null;}

与 put 方法类似&#xff0c;都是在操作前需要拿到锁&#xff0c;以保证操作的线程安全性。




9、ConcurrentHashMap 的弱一致性


然后对链表遍历判断是否存在 key 相同的节点以及获得该节点的 value。但 由于遍历过程中其他线程可能对链表结构做了调整&#xff0c;因此 get containsKey 返 回的可能是过时的数据&#xff0c;这一点是 ConcurrentHashMap 在弱一致性上的体现。如 果要求强一致性&#xff0c;那么必须使用 Collections.synchronizedMap()方法。
sizecontainsValue
这些方法都是基于整个 ConcurrentHashMap 来进行操作的&#xff0c;他们的原理也基 本类似&#xff1a;首先不加锁循环执行以下操作&#xff1a;循环所有的 Segment&#xff0c;获得对应的值以 及所有 Segment modcount 之和。在 putremove clean 方法里操作元素 前都会将变量 modCount 进行变动&#xff0c;如果连续两次所有 Segment modcount 和相等&#xff0c;则过程中没有发生其他线程修改 ConcurrentHashMap 的情况&#xff0c;返回获得 的值。当循环次数超过预定义的值时&#xff0c;这时需要对所有的 Segment 依次进行加锁&#xff0c; 获取返回值后再依次解锁。所以一般来说&#xff0c;应该避免在多线程环境下使用 size 和 containsValue 方法。


今天主要分享1.7版本的ConcurrentHashMap&#xff0c;下篇我们分享1.8版本的原理&#xff0c;敬请期待&#xff01;

 


推荐阅读
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 本文介绍了Java的集合及其实现类,包括数据结构、抽象类和具体实现类的关系,详细介绍了List接口及其实现类ArrayList的基本操作和特点。文章通过提供相关参考文档和链接,帮助读者更好地理解和使用Java的集合类。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
author-avatar
手机用户2702933712
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有