线程池工作队列

理解

线程池是存放执行多个线程的容器,多个未执行的线程存存放在队列中。根据线程池原因,队列满足存入和取出功能就行。

队列对应JAVA中java.util.Queue接口,接口方法设计看出,队列需要有入队(add、offer)、出队(remove、poll、element、peek)。功能需要多个方法实现是考虑,异常处理、原数据是否删除。

队列的设计会决定线程池的执行顺序、容量上限、阻塞并发。

常见队列

Spring 线程池 ThreadPoolTaskExecutor 如果设置线程池最大等待队列数,会使用 LinkedBlockingQueue 类,这个类使用单向链表实现的阻塞队列,队列后面进,前面出,所以有先进先出特性,也可设置容量。

若没有设置最大等待队列数,会使用 SynchronousQueue 类,它不是正常的队列,它有入队、出队的能力,但没有如数组、集合等队列的概念存放数据,解耦入队、出队功能。所以它的使用必须一入一出,否则就阻塞。这个队列能为线程池增加一个知道任务什么时候开始处理的特性。

ArrayBlockingQueue 与 SynchronousQueue 是非常像的队列,像的点是阻塞,不同的点是 ArrayBlockingQueue 有一个创建是就锁定长度的环形数组存放数据,在放满后也会阻塞入队功能,就如同 SynchronousQueue 类的入队、出队中间有了一个缓冲区(环形数组),但缓冲区满了也会阻塞入队功能。 同理,因为环形数组的存在,ArrayBlockingQueue 丢失了 SynchronousQueue 的一个知道任务什么时候开始处理的特性。又因为环形数组取值的实现逻辑方式,也为这种线程池增加一个公平和非公平的特性

PriorityBlockingQueue 的数据结构是堆,这种数据结构的生成具有比较的特性,具体可历史文章【】的实现原理。又因为 PriorityBlockingQueue 没有管控队列长度,所以线程池使用这种队列可无限放入线程数。为了避免 OOM 和 拒绝策略的失效,可重写入队的(offer)方法,实现数据超过指定数量直接返回false,否则调用原来逻辑。

线程池队列替换

以 ThreadPoolTaskExecutor 线程池替换队列为例,默认队列就 LinkedBlockingQueue 或 SynchronousQueue。
我要替换成 PriorityBlockingQueue 优先级队列,先继承 ThreadPoolTaskExecutor 类重写 createQueue(int queueCapacity) 方法。
原 createQueue(int queueCapacity) 方法

protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
                return new LinkedBlockingQueue<Runnable>(queueCapacity);
        }
        else {
                return new SynchronousQueue<Runnable>();
        }
}

在线程池中 PriorityBlockingQueue 存放的对象是 Runnable 但 Runnable 没有实现 Comparable 接口。所以 PriorityBlockingQueue 要么默认实现一个 Comparable 接口,弄一个新类继承 Runnable 实现 Comparable 接口
下面就是继承 Runnable 接口的类,PriorityBlockingQueue 默认实现一个 Comparable 接口

public class PriorityThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    @Override
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new PriorityBlockingQueue<>(queueCapacity, new Comparator<Runnable>() {
                @Override
                public int compare(Runnable o1, Runnable o2) {
                    int one = 0;
                    if (o1 instanceof PriorityRunnable) {
                        one = ((PriorityRunnable) o1).getPriority();
                    }
                    int two = 0;
                    if (o2 instanceof PriorityRunnable) {
                        two = ((PriorityRunnable) o2).getPriority();
                    }
                    return Integer.compare(one, two);
                }
            });
        }
        else {
            return new SynchronousQueue<Runnable>();
        }
    }
}