本文共 5789 字,大约阅读时间需要 19 分钟。
在Java中,线程池是处理并发任务的核心工具之一。ThreadPoolExecutor类中的runWorker()方法是线程池执行任务的关键部分,它负责从任务队列中获取任务并执行。以下将详细解析runWorker()方法的工作流程。
下面是一个简要的worker执行流程图:
runWorker()方法的主要逻辑如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.interrupted()) { wt.interrupt(); } try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }} Thread wt = Thread.currentThread();
Runnable task = w.firstTask;w.firstTask = null;w.unlock();
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.interrupted()) { wt.interrupt();} try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }} finally { task = null; w.completedTasks++; w.unlock();} 如果任务执行过程中发生异常,processWorkerExit(w, completedAbruptly)会被调用。
processWorkerExit(w, completedAbruptly);
finally { processWorkerExit(w, completedAbruptly);} getTask()方法用于从任务队列中获取任务:
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { return null; } continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) { return r; } timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }} int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null;} int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { return null; } continue;} Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
timedOut = true;
catch (InterruptedException retry) { timedOut = false;} processWorkerExit()方法用于处理线程退出逻辑:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) { decrementWorkerCount(); } mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) { min = 1; } if (workerCountOf(c) >= min) { return; } } addWorker(null, false); }} if (completedAbruptly) { decrementWorkerCount();} mainLock.lock();try { completedTaskCount += w.completedTasks; workers.remove(w);} finally { mainLock.unlock();} tryTerminate();
int c = ctl.get();if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) { min = 1; } if (workerCountOf(c) >= min) { return; } } addWorker(null, false);} addWorker(null, false);
通过以上详细分析,可以看出runWorker()方法和其相关的辅助方法共同构成了线程池的核心执行机制。理解这些方法有助于更好地配置和管理线程池,以应对不同的并发任务需求。
转载地址:http://tzhfk.baihongyu.com/