知识点
线程池的7个参数
线程池中的核心属性ctl是什么?
线程池的状态以及转变的方式
线程池的执行流程
工作线程在线程池中是如何表述的?
工作线程存储在什么位置?
线程池的拒绝策略
工作线程是什么?
如何在线程执行任务前后做额外处理?
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 最大线程数空闲时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
) {}
线程池的核心属性
// 本质就是一个int类型的数值,前三位标识线程池的状态,后二十九位标识线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 本质就是29,为了方便对ctl做位运算使用的常量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 计算出线程池的线程最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池正常运行 111
private static final int RUNNING = -1 << COUNT_BITS;
// 线程池被shutdown,继续执行完剩下任务 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 线程池被shutdown,线程池停止,并且所有任务中断 001
private static final int STOP = 1 << COUNT_BITS;
// shutdown或者shutdownNow之后,任务都被处理完之后,到这个过渡状态 010
private static final int TIDYING = 2 << COUNT_BITS;
// 线程池停止 011
private static final int TERMINATED = 3 << COUNT_BITS;
线程池核心的execute方法流程
public void execute(Runnable command) {
//健壮性判断,避免传入为null的任务
if (command == null)
throw new NullPointerException();
// 获取ctl
int c = ctl.get();
// 基于workerCountOf获取当前正在工作的线程数,判断是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程数,并且执行任务,传入true代表当前为核心线程
if (addWorker(command, true))
return;
// 如果添加核心线程失败,重新获取ctl
c = ctl.get();
}
// isRunning(c)获取线程池状态,如果线程池正在RUNNING,将任务追加到堵塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试创建最大线程数,如果成功,返回true,失败返回false
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
查看addWorker添加工作线程的操作
private boolean addWorker(Runnable firstTask, boolean core) {
// 给for循环追加标记
retry:
for (;;) {
// 获取ctl
int c = ctl.get();
// 获取当前线程池的运行状态
int rs = runStateOf(c);
// 如果rs大于SHUTDOWN,说明线程池执行了shutdown方法或者shutdownNow方法
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && // 线程池停了
firstTask == null && // 任务为null
! workQueue.isEmpty())) //工作队列为空
return false;
for (;;) {
// 获取当前工作线程个数
int wc = workerCountOf(c);
// 如果工作线程大于容量 或者(工作线程大于核心线程 | 工作线程大于最大线程)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 基于CAS的方式,将工作线程数+1
if (compareAndIncrementWorkerCount(c))
break retry;
// 下面就要判断是重要执行内部循环,还是重新执行外部循环
// 如果线程池状态改变,从外侧重新执行
// 如果线程池状态没有改变,执行内侧循环
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// 声明里两个标识
boolean workerStarted = false;
boolean workerAdded = false;
// 真正的工作线程就是worker
Worker w = null;
try {
// 通过new的方式构建worker,将任务传入
w = new Worker(firstTask);
// worker中有一个thread对象
final Thread t = w.thread;
if (t != null) {
// 获取了线程的全局锁
final ReentrantLock mainLock = this.mainLock;
// 上锁,shutdown或者shutdownNow的时候,也需要获取锁资源
mainLock.lock();
try {
// 获取当前线程的状态
int rs = runStateOf(ctl.get());
// 线程池状态小于SHUTDOWN,也就是判断线程池是否是RUNMING
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 或者线程池尾SHUTDOWN,任务必须是null
// 如果线程已经工作了,直接抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
// 将工作线程追加到HashSet中存储
workers.add(w);
// 获取了HashSet的长度
int s = workers.size();
// 如果现在的工作线程个数大雨历史最大值,替换掉 largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
// 将添加工作线程设置为true
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 添加工作线程成功
if (workerAdded) {
// 启动任务
t.start();
// 设置启动工作线程位true
workerStarted = true;
}
}
} finally {
// 如果工作线程启动失败
if (! workerStarted)
// 补救操作
addWorkerFailed(w);
}
// 返回工作线程是否启动
return workerStarted;
}
查看Worker对象
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable // 当前worker也就是一个任务
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 当调用worker内部线程的start方法时,实际是在调用Worker类中的run方法,实际执行的是runWorker的方法
public void run() {
runWorker(this);
}
}
runWorker方法
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取Worker中的具体任务
Runnable task = w.firstTask;
// 将worker中的task置为null
w.firstTask = null;
// 将有参构造中的标记撤销,代表当前线程可以被打断
w.unlock();
// 标记,值为true
boolean completedAbruptly = true;
try {
// 如果worker中task有任务,直接执行当前任务,否则去队列中获取任务
while (task != null || (task = getTask()) != null) {
// 当前要执行,添加标记,shutdown也不会打断
w.lock();
// 判断当前线程池状态,以及线程状态,判断是否需要被打断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 执行任务的开始
try {
// 任务的前置增强
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务的后置增强
afterExecute(task, thrown);
}
} finally {
// 将task置为null,任务处理完成
task = null;
// 标记当前worker处理的任务数+1
w.completedTasks++;
// 去掉标记
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
评论区