博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java线程池中BlockingQueue的作用
阅读量:6243 次
发布时间:2019-06-22

本文共 6182 字,大约阅读时间需要 20 分钟。

关于线程池中BlockingQueue的疑问

对于Java线程池,相信大家都或多或少使用过。关于其用法和原理介绍,网上已经有很多非常精彩的文章,珠玉在前,我就不献丑了。不了解的,可以参考。今天我想讲的,是关于我对Java线程次的两个疑问,当然高手可以略过了。

  • 1.为什么线程池要使用BlockingQueue,而不是ArrayList或别的什么列表?
  • 2.既然使用了BlockingQueue,为什么还要设置拒绝策略,队列满的时候不是阻塞吗?

为什么使用阻塞队列?

要回答这个问答,首先来看看不用线程池的时候怎么执行异步任务

new Thread(() -> {    // do something }).start();

也就是说,每次需要执行异步任务的时候,新建一个线程去执行,执行完就回收了。这会导致什么问题呢,首先,是对资源的浪费,线程的创建需要陷入内核,需要分配栈空间,需要执行调度,等等,只使用一次就回收太浪费资源。其次,当异步任务比较多的时候,这种方式要创建大量的线程,这对于内存资源也是一个很大的开销。我们知道,在jvm启动的时候可以设置线程栈大小的参数-Xss,默认的大小是1M,如果同时启动1000个线程,就要占用1G的内存,可想而知,这对内存是一个多大的开销。而且,线程数太多,对于内核的调度压力也是相当大的,而且,因为频繁的上下文切换而使程序的局部性丧失,也是一种消耗。线程池的作用,就是线程的复用,那么,怎么复用呢,来看一段代码:

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }

ThreadPoolExecutor中,线程封装在Worker中,Worker实现了Runnable,同时在run()方法中调用上面的runWorker()方法,只要runWorker()方法没有执行完,这个线程就不会被回收。而runWorker()方法要执行下去,就要保证while (task != null || (task = getTask()) != null)的条件为真,第一次判断时task为firstTask,即执行的第一个任务,那么要点就成了getTask()必须不能为空,来看看getTask()的实现:

private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            int wc = workerCountOf(c);            // Are workers subject to culling?            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;            }            try {                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

核心逻辑是:

Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();

这里的workQueue就是阻塞队列,timed表示是否会超时释放,keepAliveTime是非核心线程允许的空闲时间;如果不超时,则调用BlockingQueue.take(),如果取不到值,就会一直阻塞直到程序提交了一个任务。所以,阻塞队列的作用是控制线程池中线程的生命周期。

那么,如果不用阻塞队列,有没有别的方式可以实现线程池的功能?答案是,有,但是没必要。比如我们可以使用wait/notify来控制线程的执行和阻塞,但这里使用生产者/消费者模式来实现是一种更优雅的方式。

为什么需要拒绝策略

既然使用了阻塞队列,那添加任务的时候如果队列满了不就阻塞了吗,拒绝策略是干嘛用的?答案是添加任务调用的并不是阻塞的put()方法,而是非阻塞的offer()方法,看一下ThreadPoolExecutor的execute()方法就知道了

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }

至于为什么这么实现,应该是不希望阻塞用户进程吧。

也就是说,在Java的线程池,只有消费者使用了阻塞的方法,生产者并没有。

SynchronousQueue

不过也有例外,调用ExecutorService executorService = Executors.newCachedThreadPool();

时,BlockingQueue的实现类是SynchronousQueue,顾名思义,这是一个同步队列,其内部没有容量,使用SynchronousQueue,消费者线程和生产者线程必须交替执行,也就是说,生产者和消费者都必须等待对方就绪。这样的话,不就阻塞用户进程了吗。确实会,但是这个时间非常短,因为使用这种方式,每次通过execute()提交任务的时候,要么复用现有空闲的线程,要么新建一个线程,也就是说线程数理论上没有上界,所以可以当作不会阻塞

参考资料

转载地址:http://mxpia.baihongyu.com/

你可能感兴趣的文章
看麦肯锡如何分析中国城市群
查看>>
《数据分析变革:大数据时代精准决策之道》一1.4 全面看待运营型分析
查看>>
一分钟自我介绍:阿里云CDN
查看>>
《iOS 8开发指南》——第6章,第6.5节实战演练——使用模板Single View Application...
查看>>
【观点】离开了信息化,大数据就是为他人作嫁衣
查看>>
《HTML5+CSS3网页设计入门必读》——1.4 分裂:WHATWG TF
查看>>
《JavaScript核心概念及实践》——第2章 基本概念 2.1 数据类型
查看>>
Linux有问必答:如何修复"fatal error: jsoncpp/json/json.h: No such file..."
查看>>
阿里数据库内核月报:2016年11月
查看>>
简单了解Disruptor(一)
查看>>
编写更好 Bash 脚本的 8 个建议
查看>>
Mavens实战 1.5小结
查看>>
《 硬件创业:从产品创意到成熟企业的成功路线图》——第1章 硬件创业概述 1.1 早期的创客们...
查看>>
《Android游戏开发详解》——第3章,第3.5节继承
查看>>
《Docker生产环境实践指南》——2.6 编排
查看>>
Docker学习(一)
查看>>
云端架美购,精品零距离
查看>>
Java设计模式--享元模式
查看>>
码栈开发手册(五)---可视化方式开发(模块详解--浏览图)
查看>>
每天一个设计模式之装饰者模式
查看>>