博客
关于我
线程池源码解析 4.runWorker() 方法
阅读量:796 次
发布时间:2023-03-25

本文共 5789 字,大约阅读时间需要 19 分钟。

线程池源码解析 — runWorker()方法

在Java中,线程池是处理并发任务的核心工具之一。ThreadPoolExecutor类中的runWorker()方法是线程池执行任务的关键部分,它负责从任务队列中获取任务并执行。以下将详细解析runWorker()方法的工作流程。

worker总流程图

下面是一个简要的worker执行流程图:

  • 线程启动:线程被创建并启动后,调用runWorker()方法。
  • 获取任务:线程从任务队列中获取任务。
  • 执行任务:执行当前任务。如果任务成功,继续获取下一个任务;如果失败,退出线程池。
  • 异常处理:在执行过程中发生异常时,记录异常并退出线程池。
  • 线程退出:线程退出后,更新线程池状态并释放资源。
  • runWorker()方法详解

    runWorker()方法的主要逻辑如下:

    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
    while (task != null || (task = getTask()) != null) {
    w.lock();
    if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.interrupted()) {
    wt.interrupt();
    }
    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }
    }
    completedAbruptly = false;
    } finally {
    processWorkerExit(w, completedAbruptly);
    }
    }

    1. 获取当前线程

    Thread wt = Thread.currentThread();

    2. 获取任务

    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    • w.firstTask:当前线程的任务。
    • w.unlock():释放独占锁,允许其他线程获取任务。

    3. 加锁

    w.lock();

    4. 处理中断

    if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.interrupted()) {
    wt.interrupt();
    }
    • runStateAtLeast(ctl.get(), STOP):检查线程池状态是否为停止或更严格的状态。
    • Thread.interrupted():检查当前线程是否被中断。
    • wt.interrupt():强制唤醒当前线程。

    5. 执行任务

    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }
    • beforeExecute(wt, task):子类可选实现的钩子方法,用于在任务执行前进行处理。
    • task.run():执行当前任务。
    • afterExecute(task, thrown):子类可选实现的钩子方法,用于在任务执行后进行处理。
    • w.completedTasks++:标记当前线程完成一个任务。

    6. 处理异常

    如果任务执行过程中发生异常,processWorkerExit(w, completedAbruptly)会被调用。

    7. 更新线程状态

    processWorkerExit(w, completedAbruptly);

    8. 退出线程池

    finally {
    processWorkerExit(w, completedAbruptly);
    }

    getTask()方法

    getTask()方法用于从任务队列中获取任务:

    private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }
    int wc = workerCountOf(c);
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c)) {
    return null;
    }
    continue;
    }
    try {
    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
    if (r != null) {
    return r;
    }
    timedOut = true;
    } catch (InterruptedException retry) {
    timedOut = false;
    }
    }
    }

    1. 获取当前线程状态

    int rs = runStateOf(c);

    2. 处理关闭状态

    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }

    3. 获取线程数

    int wc = workerCountOf(c);

    4. 超时机制

    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    5. 检查回收标准

    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c)) {
    return null;
    }
    continue;
    }

    6. 获取任务

    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

    7. 处理超时

    timedOut = true;

    8. 处理中断

    catch (InterruptedException retry) {
    timedOut = false;
    }

    processWorkerExit()方法

    processWorkerExit()方法用于处理线程退出逻辑:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) {
    decrementWorkerCount();
    }
    mainLock.lock();
    try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
    } finally {
    mainLock.unlock();
    }
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    if (min == 0 && !workQueue.isEmpty()) {
    min = 1;
    }
    if (workerCountOf(c) >= min) {
    return;
    }
    }
    addWorker(null, false);
    }
    }

    1. 处理异常

    if (completedAbruptly) {
    decrementWorkerCount();
    }

    2. 加锁

    mainLock.lock();
    try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
    } finally {
    mainLock.unlock();
    }

    3. 尝试终止

    tryTerminate();

    4. 更新线程池状态

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    if (min == 0 && !workQueue.isEmpty()) {
    min = 1;
    }
    if (workerCountOf(c) >= min) {
    return;
    }
    }
    addWorker(null, false);
    }

    5. 添加新线程

    addWorker(null, false);

    通过以上详细分析,可以看出runWorker()方法和其相关的辅助方法共同构成了线程池的核心执行机制。理解这些方法有助于更好地配置和管理线程池,以应对不同的并发任务需求。

    转载地址:http://tzhfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现普通矩阵A和B的乘积(附完整源码)
    查看>>
    Objective-C实现更新数字指定偏移量上的值updateBit算法(附完整源码)
    查看>>
    Objective-C实现最大和连续子序列算法(附完整源码)
    查看>>
    Objective-C实现最大类间方差法OTSU算法(附完整源码)
    查看>>
    Objective-C实现最大非相邻和算法(附完整源码)
    查看>>
    Objective-C实现最小二乘多项式曲线拟合(附完整源码)
    查看>>
    Objective-C实现最小值滤波(附完整源码)
    查看>>
    Objective-C实现最小路径和算法(附完整源码)
    查看>>
    Objective-C实现最快的归并排序算法(附完整源码)
    查看>>
    Objective-C实现最近点对问题(附完整源码)
    查看>>
    Objective-C实现最长公共子序列算法(附完整源码)
    查看>>
    Objective-C实现最长回文子串算法(附完整源码)
    查看>>
    Objective-C实现最长回文子序列算法(附完整源码)
    查看>>
    Objective-C实现最长子数组算法(附完整源码)
    查看>>
    Objective-C实现最长字符串链(附完整源码)
    查看>>
    Objective-C实现最长递增子序列算法(附完整源码)
    查看>>
    Objective-C实现有向图和无向加权图算法(附完整源码)
    查看>>
    Objective-C实现有序表查找算法(附完整源码)
    查看>>
    Objective-C实现有限状态机(附完整源码)
    查看>>
    Objective-C实现有限状态自动机FSM(附完整源码)
    查看>>