线程池-ThreadPoolExecutor类详解 ThreadPoolExecutor的构造方法 1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
corePoolSize: 核心线程数, 线程池中即使没有任务也要保持存活的最大线程数量
maximumPoolSize: 线程池中允许的最大线程数量
keepAliveTime: 线程池中超过核心线程数的空闲线程存活时间
unit: keepAliveTime的时间单位
workQueue: 任务队列, 用于存放待执行的任务
threadFactory: 线程工厂, 用于创建新线程
handler: 拒绝策略, 当任务无法被执行时的处理策略
拒绝策略 JDK提供了四种默认的拒绝策略
AbortPolicy: 将任务丢弃并抛出一个RejectedExecutionException
异常, 默认的拒绝策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static class DiscardPolicy implements RejectedExecutionHandler { public discardpolicy () { } public void rejectedexecution (runnable r, threadpoolexecutor e) { } }
CallerRunPolicy: 直接在试图创建并执行任务的calling Thread线程执行这个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardOldestPolicy: 将阻塞队列的最后一个任务丢弃, 然后重新执行这个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
核心字段 源码
1 2 3 4 5 6 7 8 9 10 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
ctl: Control线程控制信号, 是一个原子类, 前三位是线程池的状态位, 后29位是线程池中的线程数量
int ctlOf(int rs, int wc) { return rs | wc; }
方法构建ctl
rs是RunState 线程池状态
wc是WorkerCount 工人数量也就是线程池中的线程的数量
int runStateOf(int c) { return c & ~COUNT_MASK; }
: 获取线程池状态
int workerCountOf(int c) { return c & COUNT_MASK; }
: 获取工人数量
COUNT_BITS: 这个参数 == 29, 是获取状态位和设置状态位需要移动的位数
CONUT_MASK: 前29bit都是1, c & COUNT_MASK来获取到wc
RUNNING: 线程池的正常工作状态, 线程池可以接受新的任务并且会处理等待队列中的任务
SHUTDOWN: 调用shutdown()
方法的时候, 线程池进入到这个状态
线程池不再接受新的任务
继续执行等待队列中的已经存在的任务和正在执行的任务
STOP: 调用shutdownNow()
方法
不接受新的任务
不处理队列中的任务
尝试中断正在执行的任务
TIDYING: 一种过渡状态, 在满足以下条件的时候进入
所有的任务都已经终止
工作线程的数量是0
队列为空的时候进入到这个状态以后, 会执行 terminate()
钩子方法
TERMINATED: 线程池关闭, 所有的资源都得到释放
任务的执行 execute方法 源码及解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
线程池类执行任务的顺序是
先尝试将任务交给核心线程, 如果核心线程的数量 < 最大核心线程数量, 创建新的核心线程执行任务
核心线程的数量超过了最大线程, 尝试将任务添加到等待队列里面
等待队列也已经满了, 创建一个非核心线程执行任务, 如果创建失败, 说明线程状态不是RUNNING或者线程数量已经超过了最大的数量, 这个时候执行reject方法
原文: Proceed in 3 steps:
If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn’t, by returning false.
If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.
addWorker 方法比较长, 分成两部分解读
方法签名: private boolean addWorker(Runnable firstTask, boolean core)
core: 添加的线程是不是核心线程
增加线程的数量, 并没有真的增加线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 retry: for (int c = ctl.get();;) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false ; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) continue retry; } }
Worker是一个继承了AQS, 实现了Runnable的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 boolean workerStarted = false ;boolean workerAdded = false ;Worker w = null ;try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null )) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException (); workers.add(w); workerAdded = true ; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
Worker类的构造方法 1 2 3 4 5 Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); }
runWorker方法 Worker类的run方法实现内部是直接调用runWorker(Worker w) 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null ); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
getTask方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
到这里我们能简单总结下一个Worker的生命周期
对于任何Worker, 会在runWorker方法中不断循环获取任务执行任务 一旦没有获取到任务, worker就会被移除
对于核心线程worker, 会在获取任务getTask()上一直阻塞直到获取任务 对于非核心线程worker, 在getTask上只会阻塞这个worker存活时间 超过这个时间, 就会在getTask的下一次循环中workerCOunt–返回null, 然后结束runWorker中while循环, 然后将这个worker销毁
任务的提交 submit方法 ThreadPoolExecutor类是继承自AbstractExecutorService的. 其中的submit方法也是在这个抽象类中实现的
1 2 3 4 5 6 7 8 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }
而execute就是我们第一个讲解的任务的执行的核心部分了
这里线程池的设计我们能看到是使用了模板模式
任务的关闭 shutdown方法 将所有的正在阻塞获取任务的空闲线程的状态变成interrupt, 来释放没有在
怎么实现的SHUTDOWM状态的语义:
线程池不再接受新的任务
在addWorker的时候, 如果是addWorker(command, true/false)形式都会返回false
继续执行等待队列中的已经存在的任务和正在执行的任务
只会通过interrupt唤醒没有在执行任务在阻塞获取Task的worker, 并且会删除这个空闲的worker
不会影响正在执行的任务, 也不会影响在等待队列中还有任务的时候
怎么从SHUTDOWN一步一步变成的TERMINATE
在execute中会直接reject新的任务
SHUTDOWN状态下并且队列为空的时候, 也就是开始出现空闲的worker的时候, 会在getTask方法返回null
因为getTask方法返回了null, 触发了runWorker方法中的销毁worker, 并tryTerminate()
在tryTerminate中再interrupt下一个worker, 这样渐进式将所有的worker都销毁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers () { interruptIdleWorkers(false ); } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
shutdownNow方法 shutdownNow会为所有的线程都打上interrupt状态
STOP: 调用shutdownNow()
方法
不接受新的任务
不处理队列中的任务
尝试中断正在执行的任务
如果方法尝试执行, 但是还没有执行的时候, 也就是worker刚获取到下一轮的task的时候, 会因为状态是STOP, getTask() = null, 进入到销毁worker的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers () { for (Worker w : workers) w.interruptIfStarted(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }