侧边栏壁纸
  • 累计撰写 106 篇文章
  • 累计创建 19 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

线程池核心源码分析

zero
2022-04-20 / 0 评论 / 0 点赞 / 19 阅读 / 8717 字
温馨提示:
本文最后更新于 2024-07-06,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

知识点

  • 线程池的7个参数

  • 线程池中的核心属性ctl是什么?

  • 线程池的状态以及转变的方式

  • 线程池的执行流程

  • 工作线程在线程池中是如何表述的?

  • 工作线程存储在什么位置?

  • 线程池的拒绝策略

  • 工作线程是什么?

  • 如何在线程执行任务前后做额外处理?

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  // 核心线程数
                                         int maximumPoolSize,  // 最大线程数
                                         long keepAliveTime,  // 最大线程数空闲时间
                                         TimeUnit unit,  // 时间单位
                                         BlockingQueue<Runnable> workQueue,  // 工作队列
                                         ThreadFactory threadFactory,  // 线程工厂
                                         RejectedExecutionHandler handler  // 拒绝策略
                                         ) {}

线程池的核心属性

// 本质就是一个int类型的数值,前三位标识线程池的状态,后二十九位标识线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 本质就是29,为了方便对ctl做位运算使用的常量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 计算出线程池的线程最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池正常运行   111
private static final int RUNNING    = -1 << COUNT_BITS;
// 线程池被shutdown,继续执行完剩下任务   000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 线程池被shutdown,线程池停止,并且所有任务中断  001
private static final int STOP       =  1 << COUNT_BITS;
// shutdown或者shutdownNow之后,任务都被处理完之后,到这个过渡状态  010
private static final int TIDYING    =  2 << COUNT_BITS;
// 线程池停止   011
private static final int TERMINATED =  3 << COUNT_BITS;

线程池核心的execute方法流程

public void execute(Runnable command) {
        //健壮性判断,避免传入为null的任务
        if (command == null)
            throw new NullPointerException();
        // 获取ctl
        int c = ctl.get();
        // 基于workerCountOf获取当前正在工作的线程数,判断是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 创建核心线程数,并且执行任务,传入true代表当前为核心线程
            if (addWorker(command, true))
                return;
            // 如果添加核心线程失败,重新获取ctl
            c = ctl.get();
        }
        // isRunning(c)获取线程池状态,如果线程池正在RUNNING,将任务追加到堵塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 尝试创建最大线程数,如果成功,返回true,失败返回false
        else if (!addWorker(command, false))
            // 执行拒绝策略
            reject(command);
    }

查看addWorker添加工作线程的操作

private boolean addWorker(Runnable firstTask, boolean core) {
        // 给for循环追加标记
        retry:
        for (;;) {
            // 获取ctl
            int c = ctl.get();
            // 获取当前线程池的运行状态
            int rs = runStateOf(c);

            // 如果rs大于SHUTDOWN,说明线程池执行了shutdown方法或者shutdownNow方法
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&    // 线程池停了
                   firstTask == null &&     // 任务为null
                   ! workQueue.isEmpty()))      //工作队列为空
                return false;

            for (;;) {
                // 获取当前工作线程个数
                int wc = workerCountOf(c);
                // 如果工作线程大于容量 或者(工作线程大于核心线程 | 工作线程大于最大线程)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 基于CAS的方式,将工作线程数+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 下面就要判断是重要执行内部循环,还是重新执行外部循环
                // 如果线程池状态改变,从外侧重新执行
                // 如果线程池状态没有改变,执行内侧循环
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        // 声明里两个标识
        boolean workerStarted = false;
        boolean workerAdded = false;
        // 真正的工作线程就是worker
        Worker w = null;
        try {
            // 通过new的方式构建worker,将任务传入
            w = new Worker(firstTask);
            // worker中有一个thread对象
            final Thread t = w.thread;
            if (t != null) {
                // 获取了线程的全局锁
                final ReentrantLock mainLock = this.mainLock;
                // 上锁,shutdown或者shutdownNow的时候,也需要获取锁资源
                mainLock.lock();
                try {
                    // 获取当前线程的状态
                    int rs = runStateOf(ctl.get());
                    // 线程池状态小于SHUTDOWN,也就是判断线程池是否是RUNMING
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {    // 或者线程池尾SHUTDOWN,任务必须是null
                        // 如果线程已经工作了,直接抛出异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 将工作线程追加到HashSet中存储
                        workers.add(w);
                        // 获取了HashSet的长度
                        int s = workers.size();
                        // 如果现在的工作线程个数大雨历史最大值,替换掉 largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 将添加工作线程设置为true
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 添加工作线程成功
                if (workerAdded) {
                    // 启动任务
                    t.start();
                    // 设置启动工作线程位true
                    workerStarted = true;
                }
            }
        } finally {
            // 如果工作线程启动失败
            if (! workerStarted)
                // 补救操作
                addWorkerFailed(w);
        }
        // 返回工作线程是否启动
        return workerStarted;
    }

查看Worker对象

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable     // 当前worker也就是一个任务
    {
        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread;

        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        // 当调用worker内部线程的start方法时,实际是在调用Worker类中的run方法,实际执行的是runWorker的方法
        public void run() {
            runWorker(this);
        }

    }

runWorker方法

final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取Worker中的具体任务
    Runnable task = w.firstTask;
    // 将worker中的task置为null
    w.firstTask = null;
    // 将有参构造中的标记撤销,代表当前线程可以被打断
    w.unlock();
    // 标记,值为true
    boolean completedAbruptly = true;
    try {
        // 如果worker中task有任务,直接执行当前任务,否则去队列中获取任务
        while (task != null || (task = getTask()) != null) {
            // 当前要执行,添加标记,shutdown也不会打断
            w.lock();
            // 判断当前线程池状态,以及线程状态,判断是否需要被打断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 执行任务的开始
            try {
                // 任务的前置增强
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务的后置增强
                    afterExecute(task, thrown);
                }
            } finally {
                // 将task置为null,任务处理完成
                task = null;
                // 标记当前worker处理的任务数+1
                w.completedTasks++;
                // 去掉标记
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

0

评论区