欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 宝鼎售后问题提交 | 后台管理


新闻资讯

MENU

软件开发知识

由于加了独占锁所以返回结果是精确的 public int size() { final ReentrantLock lo

点击: 次  来源:宝鼎软件 时间:2017-07-28

原文出处: 本日你不格斗来日诰日你就落伍

一、 媒介

PriorityBlockingQueue是带优先级的无界阻塞行列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方法存放最小堆节点的都知道,直接遍历行列元素是无序的。

二、 PriorityBlockingQueue类图布局

由于加了独有锁所以返回功效是准确的 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } final 昆山软件定制开拓 ly { lock.unlock(); }} 八、 开源框架中利用 今朝还没找到.. 九、总结 PriorityBlockingQueue雷同于ArrayBlockingQueue内部利用一个独有锁来节制同时只有一个线程可以举办入队和出队

如图PriorityBlockingQueue内部有个数组queue用来存放行列元素,size用来存放行列元素个数,allocationSpinLockOffset是用来在扩容行列时候做cas的,目标是担保只有一个线程可以举办扩容。

由于这是一个优先级行列所以有个较量器comparator用来较量元素巨细。lock独有锁工具用来节制同时只能有一个线程可以举办入队出队操纵。notEmpty条件变量用来实现take要领阻塞模式。这里没有notFull 条件变量是因为这里的put操纵长短阻塞的,为啥要设计为非阻塞的是因为这是无界行列。
最后PriorityQueue q用来搞序列化的。

如下结构函数,默认行列容量为11,默认较量器为null;

private static final int DEFAULT_INITIAL_CAPACITY = 11;


 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

三、 offer操纵

在行列插入一个元素,由于是无界行列,所以一直为乐成返回true;

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;

    //假如当前元素个数>=行列容量,则扩容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);


    try {
        Comparator<? super E> cmp = comparator;

        //默认较量器为null
        if (cmp == null)(2)
            siftUpComparable(n, e, array);
        else
            //自界说较量器(3)
            siftUpUsingComparator(n, e, array, cmp);

        //行列元素增加1,而且激活notEmpty的条件行列内里的一个阻塞线程
        size = n + 1;(9)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

主流程较量简朴,下面看看两个主要函数

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //must release and then re-acquire main lock
    Object[] newArray = null;

    //cas乐成则扩容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64则扩容新增oldcap+2,否者扩容50%,而且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一个线程cas乐成后,第二个线程会进入这个处所,然后第二个线程让出cpu,只管让第一个线程执行下面点获取锁,可是这得不到必定的担保。(5)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow目标是扩容,这里要思考下为啥在扩容前要先释放锁,然后利用cas节制只有一个线程可以扩容乐成。我的领略是为了机能,因为扩容时候是需要花时间的,假如这些操纵时候还占用锁那么其他线程在这个时候是不能举办出队操纵的,也不能举办入队操纵,这大大低落了并发性。

所以在扩容前释放锁,这答允其他出队线程可以举办出队操纵,可是由于释放了锁,所以也答允在扩容时候举办入队操纵,软件开发,这就会导致多个线程举办扩容会呈现问题,所以这里利用了一个spinlock用cas节制只有一个线程可以举办扩容,失败的线程挪用Thread.yield()让出cpu,目标意在让扩容线程扩容后优先挪用lock.lock从头获取锁,可是这得不到必然的担保,有大概挪用Thread.yield()的线程先获取了锁。