Java并发49:并发集合系列-基于独占锁+链表实现的单向阻塞无界队列LinkedBlockingQueue

news/2024/6/3 21:17:25

[超级链接:Java并发学习系列-绪论]
[系列序章:Java并发43:并发集合系列-序章]


原文地址:http://www.importnew.com/25583.html

一、前言

前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现。

二、 LinkedBlockingQueue类图结构

这里写图片描述

如图LinkedBlockingQueue中:

  • 也有两个Node分别用来存放首尾节点,
  • 并且里面有个初始值为0的原子变量count用来记录队列元素个数,
  • 另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,
  • 其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,
  • putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。
  • 另外notEmpty和notFull用来实现入队和出队的同步。

另外由于出入队是两个非公平独占锁,所以可以同时有一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);

public static final int   MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

  public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //初始化首尾节点
    last = head = new Node<E>(null);
}

如图默认队列容量为0x7fffffff,用户也可以自己指定容量。

三、必备基础

3.1 ReentrantLock

可以参考:Java并发19:Lock系列-Lock接口基本方法学习实例

3.2 条件变量(Condition)

可以参考:Java并发20:Lock系列-Condition接口基本方法学习实例

四 、带超时时间的offer操作-生产者

在队尾添加元素

  • 如果队列满了,那么等待timeout时候,如果时间超时则返回false,
  • 如果在超时前队列有空余空间,则插入后返回true。
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    //空元素抛空指针异常
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;

    //获取可被中断锁,只有一个线程克获取
    putLock.lockInterruptibly();
    try {

        //如果队列满则进入循环
        while (count.get() == capacity) {
            //nanos<=0直接返回
            if (nanos <= 0)
                return false;
            //否者调用await进行等待,超时则返回<=0(1)
            nanos = notFull.awaitNanos(nanos);
        }
        //await在超时时间内返回则添加元素(2)
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();

        //队列不满则激活其他等待入队线程(3)
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }

    //c==0说明队列里面有一个元素,这时候唤醒出队线程(4)
    if (c == 0)
        signalNotEmpty();
    return true;
}

private void enqueue(Node<E> node) {   
    last = last.next = node;
}

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

五、 带超时时间的poll操作-消费者

获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;

    //出队线程获取独占锁
    takeLock.lockInterruptibly();
    try {

        //循环直到队列不为空
        while (count.get() == 0) {

            //超时直接返回null
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }

        //出队,计数器减一
        x = dequeue();
        c = count.getAndDecrement();

        //如果出队前队列不为空则发送信号,激活其他阻塞的出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        //释放锁
        takeLock.unlock();
    }

    //当前队列容量为最大值-1则激活入队线程。
    if (c == capacity)
        signalNotFull();
    return x;
}

六、put操作-生产者

与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被唤醒

七、 take操作-消费者

与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒

八、 size操作

当前队列元素个数,如代码直接使用原子变量count获取

public int size() {
    return count.get();
}

九、peek操作

获取但是不移除当前队列的头元素,没有则返回null

public E peek() {
    //队列空,则返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

十、 remove操作

删除队列里面的一个元素,有则删除返回true,没有则返回false。

在删除操作时候由于要遍历队列所以加了双重锁,也就是在删除过程中不允许入队也不允许出队操作:

public boolean remove(Object o) {
    if (o == null) return false;

    //双重加锁
    fullyLock();
    try {

        //遍历队列找则删除返回true
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        //找不到返回false
        return false;
    } finally {
        //解锁
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

void unlink(Node<E> p, Node<E> trail) {

    p.item = null;
    trail.next = p.next;
    if (last == p)
        last = trail;
    //如果当前队列满,删除后,也不忘记最快的唤醒等待的线程
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

十一、开源框架中使用

tomcat中任务队列TaskQueue

11.1 类图结构

这里写图片描述

可知TaskQueue继承了LinkedBlockingQueue并且泛化类型固定了为Runnalbe.重写了offer、poll、take方法。

十二、总结

LinkedBlockingQueue与ConcurrentLinkedQueue相比前者是阻塞队列,使用可重入独占的非公平锁来实现。

通过使用put锁和take锁使得入队和出队解耦可以同时进行处理,但是同时只有一个线程可以入队或者出队,其他线程必须等待。

另外引入了条件变量来进行入队和出队的同步,每个条件变量维护一个条件队列用来存放阻塞的线程。

LinkedBlockingQueue的size操作通过使用原子变量count获取能够比较精确的获取当前队列的元素个数。

另外remove方法使用双锁保证删除时候队列元素保持不变,另外其实这个是个生产者-消费者模型。


而ConcurrentLinkedQueue则使用CAS非阻塞算法来实现,使用CAS原子操作保证链表构建的安全性。

当多个线程并发时候CAS失败的线程不会被阻塞,而是使用cpu资源去轮询CAS直到成功。

size方法先比LinkedBlockingQueue的获取的个数是不精确的,因为获取size的时候是通过遍历队列进行的,而遍历过程中可能进行增加删除操作,remove方法操作时候也没有对整个队列加锁。

remove时候可能进行增加删除操作,这就可能删除了一个刚刚新增的元素,而不是删除的想要位置的。


http://www.niftyadmin.cn/n/2815557.html

相关文章

正式入驻CSDN博客

正式入驻CSDN博客&#xff0c;以前新浪博客文章会逐步搬过来&#xff0c;敬请期待。

在linux环境下重启oracle数据库,解决密码过期的问题

&#xff08;1&#xff09; 以oracle身份登录数据库&#xff0c;命令&#xff1a;su – oracle &#xff08;2&#xff09; 进入Sqlplus控制台&#xff0c;命令&#xff1a;sqlplus /nolog &#xff08;3&#xff09; 以系统管理员登录&#xff0c;命令&#xff1a;connect /as…

Java并发50:并发集合系列-基于独占锁实现的双向阻塞队列LinkedBlockingDeque

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://ifeve.com/concurrent-collections-3/ 关于与LinkedBlockingDeque类似的单向队列LinkedBlockingQueue可以参考&#xff1a;Java并发49 使用阻…

hibernatenbsp;注解配置一对多关系

从Hibernate 2.5开始就可以使用annotation实现实体关系的映射了&#xff0c;减少了配置hbm文件的繁琐&#xff0c;而且annotation也是一种趋势&#xff0c;现在的SSH2的整合都是完全可以用annotation来实现。在以前实现一对多关联的关联式都是使用hbm文件&#xff0c;今天我们来…

React 作为一个 UI 运行时(一、宿主环境和渲染器)

很多教程把React介绍为一个UI框架。这很合理因为它就是一个UI库&#xff0c;这就是react标语的意思。这篇文章不会叫你任何关于建立用户界面的知识&#xff0c;但是会帮助你更生层次的理解React编程模型。这是一篇深入解析的文章&#xff0c;对初学者不太适合。在这篇文章我将通…

Java并发51:并发集合系列-基于独占锁+数组实现的单向阻塞有界队列ArrayBlockingQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://www.importnew.com/25566.html 一、 前言 上节介绍了无界链表方式的阻塞队列LinkedBlockingQueue&#xff0c;本节来研究下有界使用数组方式实…

lingo错误代码对照大全

需要lingo11破解版&#xff0c;请留下邮箱。 0 LINGO模型生成器的内存已经用尽(可用“LINGO|Options"命令对General Solver 选项卡中的“Generator Memory Limit"选项进行内存大小的修改) 1 模型中的行数太多(对于有实际意义的模型&#xff0c;这个错…

Java并发52:并发集合系列-基于独占锁+二叉树最小堆实现的单向阻塞无界优先级队列PriorityBlockingQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://www.importnew.com/25541.html 一、 前言 PriorityBlockingQueue是带优先级的无界阻塞队列&#xff0c;每次出队都返回优先级最高的元素&…