/** * Maps the specified <code>key</code> to the specified * <code>value</code> in this hashtable. Neither the key nor the * value can be <code>null</code>. <p> * * The value can be retrieved by calling the <code>get</code> method * with a key that is equal to the original key. * * @param key the hashtable key * @param value the value * @return the previous value of the specified key in this hashtable, * or <code>null</code> if it did not have one * @exception NullPointerException if the key or value is * <code>null</code> * @see Object#equals(Object) * @see #get(Object) */ publicsynchronized V put(K key, V value){ // Make sure the value is not null if (value == null) { thrownew NullPointerException(); }
// Makes sure the key is not already in the hashtable. Entry tab[] = table; int hash = hash(key); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { V old = e.value; e.value = value; return old; } }
modCount++; if (count >= threshold) { // Rehash the table if the threshold is exceeded rehash();
/** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ staticfinalint DEFAULT_INITIAL_CAPACITY = 16; //默认的数组容量大小
/** * The default load factor for this table, used when not * otherwise specified in a constructor. */ staticfinalfloat DEFAULT_LOAD_FACTOR = 0.75f; //加载因子
/** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ staticfinalint DEFAULT_CONCURRENCY_LEVEL = 16; //并发级别
/** * Creates a new, empty map with the specified initial * capacity, load factor and concurrency level. * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @param concurrencyLevel the estimated number of concurrently * updating threads. The implementation performs internal sizing * to try to accommodate this many threads. * @throws IllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive. */ @SuppressWarnings("unchecked") publicConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel){ if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) thrownew IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) //限制并发级别最大为2的12次幂 concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; //Segment数组的大小 //找大于等于并发级别的2的幂次方数 while (ssize < concurrencyLevel) { ++sshift; //统计1左移的次数 ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; //HashEntry的size if (c * ssize < initialCapacity) //比如initialCapacity=17 ,ssize=16 c=1,但是明显不够存了,所以这里会判断c * ssize < initialCapacity,++c ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; //最终还是要以2的幂次方数作为数据的容量 // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p> The value can be retrieved by calling the <tt>get</tt> method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * @throws NullPointerException if the specified key or value is null */ @SuppressWarnings("unchecked") public V put(K key, V value){ Segment<K,V> s; if (value == null) thrownew NullPointerException(); int hash = hash(key); //计算出一个hash值 int j = (hash >>> segmentShift) & segmentMask; //确定放在数组的什么位置 segmentMask=segment大小-1 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // 查看segment的第J个位置的值是不是为null s = ensureSegment(j); //生成segment对象 return s.put(key, hash, value, false); }
/** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */ privateinthash(Object k){ int h = hashSeed;
// Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
/** * Returns the segment for the given index, creating it and * recording in segment table (via CAS) if not already present. * * @param k the index * @return the segment */ @SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k){ final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //取当前位置的元素,如果有的话,说明另外的线程已经生成出来了,直接返回 Segment<K,V> proto = ss[0]; // use segment 0 as prototype //直接拿concurrenthashMap在初始化的时候在第0个位置生成的segment对象 int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) //再次判断当前位置还是不是空,如果还是空,真正的new Segment对象 == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) //失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次发起尝试 == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) //cas 对数组的第U个位置进行赋值,如果该位置的值不为null,则赋值 break; } } } return seg; }
就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。
/** * Scans for a node containing given key while trying to * acquire lock, creating and returning one if not found. Upon * return, guarantees that lock is held. UNlike in most * methods, calls to method equals are not screened: Since * traversal speed doesn't matter, we might as well help warm * up the associated code and accesses as well. * * @return a new node if key not found, else null */ private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value){ HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); //尝试获取锁,没有获取到的时候,先实例化HashEntry retries = 0; } elseif (key.equals(e.key)) retries = 0; else e = e.next; } elseif (++retries > MAX_SCAN_RETRIES) { lock(); break; } //这里表示在尝试获取锁的时候,需要判断遍历到该链表的当前节点数据有没有发生改变 elseif ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }