如何保证集合是线程安全的?

并发

Posted by CHuiL on May 22, 2021

容器线程安全

可以使用传统的同步容器,如hashTable,也可以使用同步包装器Collecetions.synchronzied*方法,来给原本不同步的容器包装为同步容器;以及通过使用并发包中的线程安全容器类来保证线程安全,如各种并发容器,比如 ConcurrentHashMap、CopyOnWriteArrayList。各种线程安全队列(Queue/Deque),如 ArrayBlockingQueue、SynchronousQueue。

首先HashMap是线程不安全的,其主要体现:

  1. 在jdk1.7中,在多线程环境下,扩容时会造成环形链或数据丢失。

  2. 在jdk1.8中,在多线程环境下,会发生数据覆盖的情况。 以下标出可能出问题的代码

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        if ((p = tab[i = (n - 1) & hash]) == null)
            //并发情况下可能覆盖
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            else if (p instanceof TreeNode)
                //并发情况下可能覆盖
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        //并发情况下可能覆盖
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    //并发情况下可能覆盖
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }

get方法的并发问题,无非就是,正在读取过程中,另外一个线程修改了,导致数据与实际不一致;所以主要保证可见性即可;

    final Node<K,V> getNode(int hash, Object key) {
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
        //这里可能出现,该线程可能此时是null,退出,但是与此同时其他线程又初始化了该tab;
        if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) {
            if (first.hash == hash &&  ((k = first.key) == key || (key != null && key.equals(k))))
                return first;
            if ((e = first.next) != null) {
                if (first instanceof TreeNode)
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                do {
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        return e;
                } while ((e = e.next) != null);
            }
        }
        return null;
    }

同步包装器Synchronization Wrappers

其实现不过是代理原来的方法,并在每个方法添加上synchronized同步而已

static class SynchronizedList<E>
        extends SynchronizedCollection<E>
        implements List<E> {
        private static final long serialVersionUID = -7754090372962971524L;

        final List<E> list;

        SynchronizedList(List<E> list) {
            super(list);
            this.list = list;
        }
        SynchronizedList(List<E> list, Object mutex) {
            super(list, mutex);
            this.list = list;
        }

        public boolean equals(Object o) {
            if (this == o)
                return true;
            synchronized (mutex) {return list.equals(o);}
        }
        public int hashCode() {
            synchronized (mutex) {return list.hashCode();}
        }

        public E get(int index) {
            synchronized (mutex) {return list.get(index);}
        }
        public E set(int index, E element) {
            synchronized (mutex) {return list.set(index, element);}
        }
        public void add(int index, E element) {
            synchronized (mutex) {list.add(index, element);}
        }
        public E remove(int index) {
            synchronized (mutex) {return list.remove(index);}
        }
        ...
}

并发包

基于分离锁实现的 ConcurrentHashMap(1.7)

为什么需要 ConcurrentHashMap?

因为HashTable和同步包装器都它只是使用synchronize进行很粗粒度的同步,性能低,一个线程操作,其他线程都要等待;

image

get put的过程

基于上面那张图,我们知道,其实就是多加了一层,使用Segment来分割HashEntry数组,这样我们就可以只锁住部分HashEntry,而不至于锁住所有;
先看基本存储结构Segment;

//继承了ReentrantLock
static final class Segment<K,V> extends ReentrantLock implements Serializable {
    ...
    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;
    ...
}

Segment中就由HashEntry组成

        /**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;

然后先看put方法

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

简单的说,就说根据hash值获取到对应的Segment,然后在调用Segment中的put;

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
            ..... //这部分就是在获得锁之后put的过程了,判断是否重复key是否覆盖,不充分将node添加到链表上,判断是否需要扩容
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

这里调用了ReentrantLock的tryLock方法,会尝试去获取当前对象(Segment对象)的锁,如果当前可获取就立即返回true,其他情况返回false;
在scanAndLockForPut,它会不断的尝试去获取锁,当然在尝试的过程中,它还提前做了一些能做的事情,竟然不能put,那它可以先准备好要put的Node,在重试的过程,如果发现要插入的table所在的头结点以被其他线程修改(可能是刚好put在这个点上,也可能是扩容了),那么会在判断是否需要创建Node;

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

总之最后重试达到一定次数后,就会直接lock去获取锁,保证一定能拿到锁,因为前面的操作可能会使它错过获取锁的时机;

ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。由于 HashEntry 中的 value 属性是用 volatile关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

基于CAS + synchronized实现的 ConcurrentHashMap(1.8)

1.7 已经解决了并发问题,并且能支持 N 个 Segment 这么多次数的并发,但依然存在 HashMap 在 1.7 版本中的问题。

那就是查询遍历链表效率太低。

看下1.8中的存储结构图,发现与hashMpa差不多; image

先看具体存储结构,已经没有1.7中的Segment了;

    /**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;

    /**
     * The next table to use; non-null only while resizing.
     */
    private transient volatile Node<K,V>[] nextTable;

image Node中的value和key也同样使用了volatile来保证可见性; put方法

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); //初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //当前hash值所在表为空,创建第一个节点利用CAS写入 由于volatile,修改对其他线程可见
                if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) { //利用 synchronized 锁住这tab[i]的整条链或树
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value; //由于volatile,修改对其他线程可见
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                 //由于volatile,修改对其他线程可见
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

get方法就和hashMap的差不多,计算出hashcode值,如果直接就在桶上就直接返回值,如果是红黑树的结构就按红黑树的方式查询返回,否则就是链表的形势查询返回;get全程无锁,通过主要通过volatile来保证并发写对数据修改的可见性;

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

同步迭代和复合操作原子接口

  • 内部的Traverser用于迭代容器节点,保证不会抛出ConcuttentModificationException,即可以在迭代的过程中,其他线程对该容器进行添加,删除甚至是扩容操作
  • 弱一致性的迭代器可以容忍并发的修改

CocurrentMap提供了一些额外复合操作的原子接口

  • 若没有则添加:putIfAbsent()
  • 若存在着删除: remove(K key,v value);
  • 若相等则替换: replace(K key,V oldValue,V newValue);replave(k key,V newValue);

可参考

详细的源码分析 JDK1.8–深度分析ConcurrentHashMap原理分析

源码注释写的很清楚ConcurrentHashMap 源码解读