• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

JavaThreadPoolExecutor 线程池任务队列满了再添加任务就会执行拒绝策略这简直是危言耸听

武飞扬头像
不够优雅
帮助1

问题

大家平时应该都有自己定义过线程池

下面是线程池ThreadPoolExecutor的构造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //省略
}

自定义线程池的时候需要指定一些参数

比较重要的就是corePoolSizemaximumPoolSizeworkQueue这几个参数

corePoolSize

核心线程数是指要保持活动状态的最小工作线程数(除非设置了allowCoreThreadTimeOut

maximumPoolSize

线程池最大工作线程数

workQueue

用于容纳任务并将任务移交给工作线程的队列

那么大家有没有想过这样一个问题

当线程数量等于核心线程数corePoolSize之后

这时调用execute添加一个新的任务是会放到任务队列还是在最大线程数maximumPoolSize的范围内新增一个线程来执行

用代码说话

为了得出问题的答案,我写了一个测试方法

public class ThreadPoolExecutorTest {

    @Test
    public void test() {
        int corePoolSize = 1;//核心线程数
        int maximumPoolSize = 2;//最大线程数
        int workQueueSize = 1;//任务队列大小
        long keepAliveTime = 0;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize, maximumPoolSize,
                keepAliveTime, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(workQueueSize),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        Task task = (Task) r;
                        System.out.println("第"   task.index   "个任务被拒绝执行");
                        System.out.println("线程池中的线程数量:"   executor.getPoolSize());
                        System.out.println("队列中等待的任务数量:"   executor.getQueue().size());
                        Task peek = (Task) executor.getQueue().peek();
                        System.out.println("队列中等待的任务是第"   peek.index   "个任务");
                        System.out.println("------------------------------------\n");
                    }
                });
        
        //第一个任务
        executor.execute(new Task(1, executor));
        sleep(100);

        //第二个任务
        executor.execute(new Task(2, executor));
        sleep(100);

        //第三个任务
        executor.execute(new Task(3, executor));
        sleep(100);

        //第四个任务
        executor.execute(new Task(4, executor));
        sleep(100);
    }

    void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    //任务
    class Task implements Runnable {

        int index;//表示这是第几个任务

        ThreadPoolExecutor executor;

        Task(int index, ThreadPoolExecutor executor) {
            this.index = index;
            this.executor = executor;
        }

        @Override
        public void run() {
            System.out.println("这是第"   index   "个任务");
            System.out.println("线程池中的线程数量:"   executor.getPoolSize());
            System.out.println("队列中等待的任务数量:"   executor.getQueue().size());
            System.out.println("------------------------------------\n");
            sleep(100000);//使任务不结束,保证线程占用
        }
    }
}

指定核心线程数为1,最大线程数为2,任务队列大小为1

然后依次执行四个任务,看看最后结果会是什么样

在运行测试代码之前,不妨让我们先来做个假设

因为第一个任务肯定是会执行的,而第四个任务肯定是会被拒绝的(前三个任务会消耗掉最大线程数和任务队列,2 1 = 3)

所以不确定的就是第二个任务和第三个任务

如果任务放到队列中就不会被执行(这里我用sleep来保证任务不会结束)

那么我们只需要看任务二和任务三哪个不会被执行就能知道是先放队列还是先加线程

假设一:先放队列

任务二会被放到任务队列,任务三会因为队列满了而新加线程执行

我们可以根据程序预测控制台的输出是

这是第1个任务
线程池中的线程数量:1
队列中等待的任务数量:0
------------------------------------

这是第3个任务
线程池中的线程数量:2
队列中等待的任务数量:1
------------------------------------

第4个任务被拒绝执行
线程池中的线程数量:2
队列中等待的任务数量:1
队列中等待的任务是第2个任务
------------------------------------

第一个任务会占用核心线程数,所以线程数量为1,队列为0

第二个任务因为核心线程数已经满了,会被放到队列中,所以不会打印任何信息

第三个任务因为核心线程数和任务队列都满了,会新增一个线程执行,所以线程数为1 1 = 2,队列中是任务二,数量为1

第四个任务因为任务一和任务三已经让线程数达到了最大线程数并且任务二让队列满了,所以会被拒绝执行

假设二:先加线程

任务二会新加线程执行,任务三会因为到达最大线程数而被放到任务队列

我们可以根据程序预测控制台的输出是

这是第1个任务
线程池中的线程数量:1
队列中等待的任务数量:0
------------------------------------

这是第2个任务
线程池中的线程数量:2
队列中等待的任务数量:0
------------------------------------

第4个任务被拒绝执行
线程池中的线程数量:2
队列中等待的任务数量:1
队列中等待的任务是第3个任务
------------------------------------

第一个任务会占用核心线程数,所以线程数量为1,队列为0

第二个任务因为核心线程数已经满了,会新增一个线程执行,所以线程数为1 1 = 2,队列为0

第三个任务因为核心线程数和任务队列都满了,会被放到队列中,所以不会打印任何信息

第四个任务因为任务一和任务二已经让线程数达到了最大线程数并且任务三让队列满了,所以会被拒绝执行

验证

接下来就让我们来运行测试代码

新机子哇伊自摸一刀子

这是第1个任务
线程池中的线程数量:1
队列中等待的任务数量:0
------------------------------------

这是第3个任务
线程池中的线程数量:2
队列中等待的任务数量:1
------------------------------------

第4个任务被拒绝执行
线程池中的线程数量:2
队列中等待的任务数量:1
队列中等待的任务是第2个任务
------------------------------------

最终的结果是第二个任务会被放到任务队列

所以也就是说会先放队列再加线程

不知道是不是和大家原来的理解一样呢

源码

最后一步,当然是要窥探一下线程池的源码了

首先需要提前说明

在线程池中有一个概念叫Worker

一个Worker就代表一条线程,Worker中持有Thread对象

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable {
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
}

话不多说,直接来看execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //这里有一大段注释被我删了
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

我们先来看workerCountOf这个方法,根据命名就能很好的理解,表示获得当前线程池中有多少线程

接着看if (workerCountOf(c) < corePoolSize)这个条件语句

这个if判断当线程数量如果小于核心线程数,那么就调用addWorker方法

addWorker方法也很好理解,就是添加一个线程,它的定义是这样的

boolean addWorker(Runnable firstTask, boolean core) {
}

返回值是一个布尔值,返回true表示添加成功,false表示添加失败(比如在这个过程中调用了shutdown或是另一个线程已经抢先添加了一个线程等情况)

第一个参数firstTask是当Worker运行之后需要执行的任务

第二个参数core表示是否占用核心线程数

如果coretrue那么会用当前线程数和核心线程数进行比较,如果核心线程数已经到上限就会直接返回false

如果corefalse那么会用当前线程数和最大线程数进行比较,如果最大线程数已经到上限就会直接返回false

这里需要注意,核心线程数和最大线程数是数量,线程本身不存在是核心线程或非核心线程的概念

我们回来继续看这段代码(省略不重要的代码)

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
}

也就说当线程数量小于核心线程数时,添加一个占用核心线程数的任务

如果成功就返回,如果失败就说明可能核心线程数满了或是已经shutdown

因为我们的测试代码不存在shutdown的情况

所以只可能是核心线程数到达上限了

代码会走到if (isRunning(c) && workQueue.offer(command))

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //这里有一大段注释被我删了
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //核心线程数满了
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //会执行这个 if
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

isRunning判断当前线程池的状态,我们的测试代码中肯定是返回true

这样我们的任务就通过workQueue.offer(command)被放到了任务队列workQueue

如果这个时候workQueue.offer(command)返回false说明队列也满了

那么就会到else if分支调用addWorker(command, false)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //这里有一大段注释被我删了
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //核心线程数满了
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //队列也满了
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //会执行这个 else if
    else if (!addWorker(command, false))
        reject(command);
}

相当于添加一个占用非核心线程数的任务

如果这个任务也添加失败了,就说明已经到了最大线程数,直接执行拒绝策略

我们可以尝试还原测试代码的执行逻辑

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //这里有一大段注释被我删了
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //任务一在这里添加成功并执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //任务二在这里被放到了队列中没有执行
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //任务三在这里添加成功并执行
    else if (!addWorker(command, false))
        //任务四在这里被拒绝
        reject(command);
}

结束

其实在我看源码之前,我一直以为会先加线程再放队列

因为在我看来,我们添加的任务一定是想要尽快执行

如果我们设置的是无界队列,那不是基本用不到最大线程

没想到实际的逻辑竟然有点反直觉

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaiekj
系列文章
更多 icon
同类精品
更多 icon
继续加载