1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.lang.reflect.Field; 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.Collections; 43 import java.util.List; 44 import java.util.Objects; 45 import java.util.function.Consumer; 46 import java.util.function.Predicate; 47 import java.util.concurrent.CountDownLatch; 48 import java.util.concurrent.locks.LockSupport; 49 import jdk.internal.access.JavaLangAccess; 50 import jdk.internal.access.JavaUtilConcurrentFJPAccess; 51 import jdk.internal.access.SharedSecrets; 52 import jdk.internal.misc.Unsafe; 53 import jdk.internal.vm.SharedThreadContainer; 54 import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask; 55 56 /** 57 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 58 * A {@code ForkJoinPool} provides the entry point for submissions 59 * from non-{@code ForkJoinTask} clients, as well as management and 60 * monitoring operations. 61 * 62 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 63 * ExecutorService} mainly by virtue of employing 64 * <em>work-stealing</em>: all threads in the pool attempt to find and 65 * execute tasks submitted to the pool and/or created by other active 66 * tasks (eventually blocking waiting for work if none exist). This 67 * enables efficient processing when most tasks spawn other subtasks 68 * (as do most {@code ForkJoinTask}s), as well as when many small 69 * tasks are submitted to the pool from external clients. Especially 70 * when setting <em>asyncMode</em> to true in constructors, {@code 71 * ForkJoinPool}s may also be appropriate for use with event-style 72 * tasks that are never joined. All worker threads are initialized 73 * with {@link Thread#isDaemon} set {@code true}. 74 * 75 * <p>A static {@link #commonPool()} is available and appropriate for 76 * most applications. The common pool is used by any ForkJoinTask that 77 * is not explicitly submitted to a specified pool. Using the common 78 * pool normally reduces resource usage (its threads are slowly 79 * reclaimed during periods of non-use, and reinstated upon subsequent 80 * use). 81 * 82 * <p>For applications that require separate or custom pools, a {@code 83 * ForkJoinPool} may be constructed with a given target parallelism 84 * level; by default, equal to the number of available processors. 85 * The pool attempts to maintain enough active (or available) threads 86 * by dynamically adding, suspending, or resuming internal worker 87 * threads, even if some tasks are stalled waiting to join others. 88 * However, no such adjustments are guaranteed in the face of blocked 89 * I/O or other unmanaged synchronization. The nested {@link 90 * ManagedBlocker} interface enables extension of the kinds of 91 * synchronization accommodated. The default policies may be 92 * overridden using a constructor with parameters corresponding to 93 * those documented in class {@link ThreadPoolExecutor}. 94 * 95 * <p>In addition to execution and lifecycle control methods, this 96 * class provides status check methods (for example 97 * {@link #getStealCount}) that are intended to aid in developing, 98 * tuning, and monitoring fork/join applications. Also, method 99 * {@link #toString} returns indications of pool state in a 100 * convenient form for informal monitoring. 101 * 102 * <p>As is the case with other ExecutorServices, there are three 103 * main task execution methods summarized in the following table. 104 * These are designed to be used primarily by clients not already 105 * engaged in fork/join computations in the current pool. The main 106 * forms of these methods accept instances of {@code ForkJoinTask}, 107 * but overloaded forms also allow mixed execution of plain {@code 108 * Runnable}- or {@code Callable}- based activities as well. However, 109 * tasks that are already executing in a pool should normally instead 110 * use the within-computation forms listed in the table unless using 111 * async event-style tasks that are not usually joined, in which case 112 * there is little difference among choice of methods. 113 * 114 * <table class="plain"> 115 * <caption>Summary of task execution methods</caption> 116 * <tr> 117 * <td></td> 118 * <th scope="col"> Call from non-fork/join clients</th> 119 * <th scope="col"> Call from within fork/join computations</th> 120 * </tr> 121 * <tr> 122 * <th scope="row" style="text-align:left"> Arrange async execution</th> 123 * <td> {@link #execute(ForkJoinTask)}</td> 124 * <td> {@link ForkJoinTask#fork}</td> 125 * </tr> 126 * <tr> 127 * <th scope="row" style="text-align:left"> Await and obtain result</th> 128 * <td> {@link #invoke(ForkJoinTask)}</td> 129 * <td> {@link ForkJoinTask#invoke}</td> 130 * </tr> 131 * <tr> 132 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 133 * <td> {@link #submit(ForkJoinTask)}</td> 134 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 135 * </tr> 136 * </table> 137 * 138 * <p>Additionally, this class supports {@link 139 * ScheduledExecutorService} methods to delay or periodically execute 140 * tasks, as well as method {@link #submitWithTimeout} to cancel tasks 141 * that take too long. The scheduled functions or actions may create 142 * and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed 143 * actions become <em>enabled</em> and behave as ordinary submitted 144 * tasks when their delays elapse. Scheduling methods return 145 * {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link 146 * ScheduledFuture} interface. Resource exhaustion encountered after 147 * initial submission results in task cancellation. When time-based 148 * methods are used, shutdown policies match the default policies of 149 * class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown}, 150 * existing periodic tasks will not re-execute, and the pool 151 * terminates when quiescent and existing delayed tasks 152 * complete. Method {@link #cancelDelayedTasksOnShutdown} may be used 153 * to disable all delayed tasks upon shutdown, and method {@link 154 * #shutdownNow} may be used to instead unconditionally initiate pool 155 * termination. Monitoring methods such as {@link #getQueuedTaskCount} 156 * do not include scheduled tasks that are not yet enabled to execute, 157 * which are reported separately by method {@link 158 * #getDelayedTaskCount}. 159 * 160 * <p>The parameters used to construct the common pool may be controlled by 161 * setting the following {@linkplain System#getProperty system properties}: 162 * <ul> 163 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism} 164 * - the parallelism level, a non-negative integer. Usage is discouraged. 165 * Use {@link #setParallelism} instead. 166 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory} 167 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 168 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 169 * is used to load this class. 170 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler} 171 * - the class name of a {@link UncaughtExceptionHandler}. 172 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 173 * is used to load this class. 174 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares} 175 * - the maximum number of allowed extra threads to maintain target 176 * parallelism (default 256). 177 * </ul> 178 * If no thread factory is supplied via a system property, then the 179 * common pool uses a factory that uses the system class loader as the 180 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 181 * 182 * Upon any error in establishing these settings, default parameters 183 * are used. It is possible to disable use of threads by using a 184 * factory that may return {@code null}, in which case some tasks may 185 * never execute. While possible, it is strongly discouraged to set 186 * the parallelism property to zero, which may be internally 187 * overridden in the presence of intrinsically async tasks. 188 * 189 * @implNote This implementation restricts the maximum number of 190 * running threads to 32767. Attempts to create pools with greater 191 * than the maximum number result in {@code 192 * IllegalArgumentException}. Also, this implementation rejects 193 * submitted tasks (that is, by throwing {@link 194 * RejectedExecutionException}) only when the pool is shut down or 195 * internal resources have been exhausted. 196 * 197 * @since 1.7 198 * @author Doug Lea 199 */ 200 public class ForkJoinPool extends AbstractExecutorService 201 implements ScheduledExecutorService { 202 203 /* 204 * Implementation Overview 205 * 206 * This class and its nested classes provide the main 207 * functionality and control for a set of worker threads. Because 208 * most internal methods and nested classes are interrelated, 209 * their main rationale and descriptions are presented here; 210 * individual methods and nested classes contain only brief 211 * comments about details. Broadly: submissions from non-FJ 212 * threads enter into submission queues. Workers take these tasks 213 * and typically split them into subtasks that may be stolen by 214 * other workers. Work-stealing based on randomized scans 215 * generally leads to better throughput than "work dealing" in 216 * which producers assign tasks to idle threads, in part because 217 * threads that have finished other tasks before the signalled 218 * thread wakes up (which can be a long time) can take the task 219 * instead. Preference rules give first priority to processing 220 * tasks from their own queues (LIFO or FIFO, depending on mode), 221 * then to randomized FIFO steals of tasks in other queues. 222 * 223 * This framework began as vehicle for supporting structured 224 * parallelism using work-stealing, designed to work best when 225 * tasks are dag-structured (wrt completion dependencies), nested 226 * (generated using recursion or completions), of reasonable 227 * granularity, independent (wrt memory and resources) and where 228 * callers participate in task execution. These are properties 229 * that anyone aiming for efficient parallel multicore execution 230 * should design for. Over time, the scalability advantages of 231 * this framework led to extensions to better support more diverse 232 * usage contexts, amounting to weakenings or violations of each 233 * of these properties. Accommodating them may compromise 234 * performance, but mechanics discussed below include tradeoffs 235 * attempting to arrange that no single performance issue dominates. 236 * 237 * Here's a brief history of major revisions, each also with other 238 * minor features and changes. 239 * 240 * 1. Only handle recursively structured computational tasks 241 * 2. Async (FIFO) mode and striped submission queues 242 * 3. Completion-based tasks (mainly CountedCompleters) 243 * 4. CommonPool and parallelStream support 244 * 5. InterruptibleTasks for externally submitted tasks 245 * 6. Support ScheduledExecutorService methods 246 * 247 * Most changes involve adaptions of base algorithms using 248 * combinations of static and dynamic bitwise mode settings (both 249 * here and in ForkJoinTask), and subclassing of ForkJoinTask. 250 * There are a fair number of odd code constructions and design 251 * decisions for components that reside at the edge of Java vs JVM 252 * functionality. 253 * 254 * WorkQueues 255 * ========== 256 * 257 * Most operations occur within work-stealing queues (in nested 258 * class WorkQueue). These are special forms of Deques that 259 * support only three of the four possible end-operations -- push, 260 * pop, and poll (aka steal), under the further constraints that 261 * push and pop are called only from the owning thread (or, as 262 * extended here, under a lock), while poll may be called from 263 * other threads. (If you are unfamiliar with them, you probably 264 * want to read Herlihy and Shavit's book "The Art of 265 * Multiprocessor programming", chapter 16 describing these in 266 * more detail before proceeding.) The main work-stealing queue 267 * design is roughly similar to those in the papers "Dynamic 268 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 269 * (http://research.sun.com/scalable/pubs/index.html) and 270 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 271 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 272 * The main differences ultimately stem from GC requirements that 273 * we null out taken slots as soon as we can, to maintain as small 274 * a footprint as possible even in programs generating huge 275 * numbers of tasks. To accomplish this, we shift the CAS 276 * arbitrating pop vs poll (steal) from being on the indices 277 * ("base" and "top") to the slots themselves. These provide the 278 * primary required memory ordering -- see "Correct and Efficient 279 * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and 280 * Nardelli, PPoPP 2013 281 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 282 * analysis of memory ordering requirements in work-stealing 283 * algorithms similar to the one used here. We use per-operation 284 * ordered writes of various kinds for accesses when required. 285 * 286 * We also support a user mode in which local task processing is 287 * in FIFO, not LIFO order, simply by using a local version of 288 * poll rather than pop. This can be useful in message-passing 289 * frameworks in which tasks are never joined, although with 290 * increased contention among task producers and consumers. Also, 291 * the same data structure (and class) is used for "submission 292 * queues" (described below) holding externally submitted tasks, 293 * that differ only in that a lock (using field "phase"; see below) is 294 * required by external callers to push and pop tasks. 295 * 296 * Adding tasks then takes the form of a classic array push(task) 297 * in a circular buffer: 298 * q.array[q.top++ % length] = task; 299 * 300 * The actual code needs to null-check and size-check the array, 301 * uses masking, not mod, for indexing a power-of-two-sized array, 302 * enforces memory ordering, supports resizing, and possibly 303 * signals waiting workers to start scanning (described below), 304 * which requires stronger forms of order accesses. 305 * 306 * The pop operation (always performed by owner) is of the form: 307 * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null) 308 * decrement top and return task; 309 * If this fails, the queue is empty. This operation is one part 310 * of the nextLocalTask method, that instead does a local-poll 311 * in FIFO mode. 312 * 313 * The poll operation is, basically: 314 * if (CAS nonnull task t = q.array[k = q.base % length] to null) 315 * increment base and return task; 316 * 317 * However, there are several more cases that must be dealt with. 318 * Some of them are just due to asynchrony; others reflect 319 * contention and stealing policies. Stepping through them 320 * illustrates some of the implementation decisions in this class. 321 * 322 * * Slot k must be read with an acquiring read, which it must 323 * anyway to dereference and run the task if the (acquiring) 324 * CAS succeeds. 325 * 326 * * q.base may change between reading and using its value to 327 * index the slot. To avoid trying to use the wrong t, the 328 * index and slot must be reread (not necessarily immediately) 329 * until consistent, unless this is a local poll by owner, in 330 * which case this form of inconsistency can only appear as t 331 * being null, below. 332 * 333 * * Similarly, q.array may change (due to a resize), unless this 334 * is a local poll by owner. Otherwise, when t is present, this 335 * only needs consideration on CAS failure (since a CAS 336 * confirms the non-resized case.) 337 * 338 * * t may appear null because a previous poll operation has not 339 * yet incremented q.base, so the read is from an already-taken 340 * index. This form of stall reflects the non-lock-freedom of 341 * the poll operation. Stalls can be detected by observing that 342 * q.base doesn't change on repeated reads of null t and when 343 * no other alternatives apply, spin-wait for it to settle. To 344 * reduce producing these kinds of stalls by other stealers, we 345 * encourage timely writes to indices using otherwise 346 * unnecessarily strong writes. 347 * 348 * * The CAS may fail, in which case we may want to retry unless 349 * there is too much contention. One goal is to balance and 350 * spread out the many forms of contention that may be 351 * encountered across polling and other operations to avoid 352 * sustained performance degradations. Across all cases where 353 * alternatives exist, a bounded number of CAS misses or stalls 354 * are tolerated (for slots, ctl, and elsewhere described 355 * below) before taking alternative action. These may move 356 * contention or retries elsewhere, which is still preferable 357 * to single-point bottlenecks. 358 * 359 * * Even though the check "top == base" is quiescently accurate 360 * to determine whether a queue is empty, it is not of much use 361 * when deciding whether to try to poll or repoll after a 362 * failure. Both top and base may move independently, and both 363 * lag updates to the underlying array. To reduce memory 364 * contention, non-owners avoid reading the "top" when 365 * possible, by using one-ahead reads to check whether to 366 * repoll, relying on the fact that a non-empty queue does not 367 * have two null slots in a row, except in cases (resizes and 368 * shifts) that can be detected with a secondary recheck that 369 * is less likely to conflict with owner writes. 370 * 371 * The poll operations in q.poll(), runWorker(), helpJoin(), and 372 * elsewhere differ with respect to whether other queues are 373 * available to try, and the presence or nature of screening steps 374 * when only some kinds of tasks can be taken. When alternatives 375 * (or failing) is an option, they uniformly give up after 376 * bounded numbers of stalls and/or CAS failures, which reduces 377 * contention when too many workers are polling too few tasks. 378 * Overall, in the aggregate, we ensure probabilistic 379 * non-blockingness of work-stealing at least until checking 380 * quiescence (which is intrinsically blocking): If an attempted 381 * steal fails in these ways, a scanning thief chooses a different 382 * target to try next. In contexts where alternatives aren't 383 * available, and when progress conditions can be isolated to 384 * values of a single variable, simple spinloops (using 385 * Thread.onSpinWait) are used to reduce memory traffic. 386 * 387 * WorkQueues are also used in a similar way for tasks submitted 388 * to the pool. We cannot mix these tasks in the same queues used 389 * by workers. Instead, we randomly associate submission queues 390 * with submitting threads (or carriers when using VirtualThreads) 391 * using a form of hashing. The ThreadLocalRandom probe value 392 * serves as a hash code for choosing existing queues, and may be 393 * randomly repositioned upon contention with other submitters. 394 * In essence, submitters act like workers except that they are 395 * restricted to executing local tasks that they submitted (or 396 * when known, subtasks thereof). Insertion of tasks in shared 397 * mode requires a lock. We use only a simple spinlock (as one 398 * role of field "phase") because submitters encountering a busy 399 * queue move to a different position to use or create other 400 * queues. They (spin) block when registering new queues, or 401 * indirectly elsewhere, by revisiting later. 402 * 403 * Management 404 * ========== 405 * 406 * The main throughput advantages of work-stealing stem from 407 * decentralized control -- workers mostly take tasks from 408 * themselves or each other, at rates that can exceed a billion 409 * per second. Most non-atomic control is performed by some form 410 * of scanning across or within queues. The pool itself creates, 411 * activates (enables scanning for and running tasks), 412 * deactivates, blocks, and terminates threads, all with minimal 413 * central information. There are only a few properties that we 414 * can globally track or maintain, so we pack them into a small 415 * number of variables, often maintaining atomicity without 416 * blocking or locking. Nearly all essentially atomic control 417 * state is held in a few variables that are by far most often 418 * read (not written) as status and consistency checks. We pack as 419 * much information into them as we can. 420 * 421 * Field "ctl" contains 64 bits holding information needed to 422 * atomically decide to add, enqueue (on an event queue), and 423 * dequeue and release workers. To enable this packing, we 424 * restrict maximum parallelism to (1<<15)-1 (which is far in 425 * excess of normal operating range) to allow ids, counts, and 426 * their negations (used for thresholding) to fit into 16bit 427 * subfields. 428 * 429 * Field "runState" and per-WorkQueue field "phase" play similar 430 * roles, as lockable, versioned counters. Field runState also 431 * includes monotonic event bits: 432 * * SHUTDOWN: no more external tasks accepted; STOP when quiescent 433 * * STOP: no more tasks run, and deregister all workers 434 * * CLEANED: all unexecuted tasks have been cancelled 435 * * TERMINATED: all workers deregistered and all queues cleaned 436 * The version tags enable detection of state changes (by 437 * comparing two reads) modulo bit wraparound. The bit range in 438 * each case suffices for purposes of determining quiescence, 439 * termination, avoiding ABA-like errors, and signal control, most 440 * of which are ultimately based on at most 15bit ranges (due to 441 * 32767 max total workers). RunState updates do not need to be 442 * atomic with respect to ctl updates, but because they are not, 443 * some care is required to avoid stalls. The seqLock properties 444 * detect changes and conditionally upgrade to coordinate with 445 * updates. It is typically held for less than a dozen 446 * instructions unless the queue array is being resized, during 447 * which contention is rare. To be conservative, lockRunState is 448 * implemented as a spin/sleep loop. Here and elsewhere spin 449 * constants are short enough to apply even on systems with few 450 * available processors. In addition to checking pool status, 451 * reads of runState sometimes serve as acquire fences before 452 * reading other fields. 453 * 454 * Field "parallelism" holds the target parallelism (normally 455 * corresponding to pool size). Users can dynamically reset target 456 * parallelism, but is only accessed when signalling or awaiting 457 * work, so only slowly has an effect in creating threads or 458 * letting them time out and terminate when idle. 459 * 460 * Array "queues" holds references to WorkQueues. It is updated 461 * (only during worker creation and termination) under the 462 * runState lock. It is otherwise concurrently readable but reads 463 * for use in scans (see below) are always prefaced by a volatile 464 * read of runState (or equivalent constructions), ensuring that 465 * its state is current at the point it is used (which is all we 466 * require). To simplify index-based operations, the array size is 467 * always a power of two, and all readers must tolerate null 468 * slots. Worker queues are at odd indices. Worker phase ids 469 * masked with SMASK match their index. Shared (submission) queues 470 * are at even indices. Grouping them together in this way aids in 471 * task scanning: At top-level, both kinds of queues should be 472 * sampled with approximately the same probability, which is 473 * simpler if they are all in the same array. But we also need to 474 * identify what kind they are without looking at them, leading to 475 * this odd/even scheme. One disadvantage is that there are 476 * usually many fewer submission queues, so there can be many 477 * wasted probes (null slots). But this is still cheaper than 478 * alternatives. Other loops over the queues array vary in origin 479 * and stride depending on whether they cover only submission 480 * (even) or worker (odd) queues or both, and whether they require 481 * randomness (in which case cyclically exhaustive strides may be 482 * used). 483 * 484 * All worker thread creation is on-demand, triggered by task 485 * submissions, replacement of terminated workers, and/or 486 * compensation for blocked workers. However, all other support 487 * code is set up to work with other policies. To ensure that we 488 * do not hold on to worker or task references that would prevent 489 * GC, all accesses to workQueues in waiting, signalling, and 490 * control methods are via indices into the queues array (which is 491 * one source of some of the messy code constructions here). In 492 * essence, the queues array serves as a weak reference 493 * mechanism. In particular, the stack top subfield of ctl stores 494 * indices, not references. Operations on queues obtained from 495 * these indices remain valid (with at most some unnecessary extra 496 * work) even if an underlying worker failed and was replaced by 497 * another at the same index. During termination, worker queue 498 * array updates are disabled. 499 * 500 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 501 * cannot let workers spin indefinitely scanning for tasks when 502 * none can be found immediately, and we cannot start/resume 503 * workers unless there appear to be tasks available. On the 504 * other hand, we must quickly prod them into action when new 505 * tasks are submitted or generated. These latencies are mainly a 506 * function of JVM park/unpark (and underlying OS) performance, 507 * which can be slow and variable (even though usages are 508 * streamlined as much as possible). In many usages, ramp-up time 509 * is the main limiting factor in overall performance, which is 510 * compounded at program start-up by JIT compilation and 511 * allocation. On the other hand, throughput degrades when too 512 * many threads poll for too few tasks. (See below.) 513 * 514 * The "ctl" field atomically maintains total and "released" 515 * worker counts, plus the head of the available worker queue 516 * (actually stack, represented by the lower 32bit subfield of 517 * ctl). Released workers are those known to be scanning for 518 * and/or running tasks (we cannot accurately determine 519 * which). Unreleased ("available") workers are recorded in the 520 * ctl stack. These workers are made eligible for signalling by 521 * enqueuing in ctl (see method deactivate). This "queue" is a 522 * form of Treiber stack. This is ideal for activating threads in 523 * most-recently used order, and improves performance and 524 * locality, outweighing the disadvantages of being prone to 525 * contention and inability to release a worker unless it is 526 * topmost on stack. The top stack state holds the value of the 527 * "phase" field of the worker: its index and status, plus a 528 * version counter that, in addition to the count subfields (also 529 * serving as version stamps) provide protection against Treiber 530 * stack ABA effects. 531 * 532 * Creating workers. To create a worker, we pre-increment counts 533 * (serving as a reservation), and attempt to construct a 534 * ForkJoinWorkerThread via its factory. On starting, the new 535 * thread first invokes registerWorker, where it is assigned an 536 * index in the queues array (expanding the array if necessary). 537 * Upon any exception across these steps, or null return from 538 * factory, deregisterWorker adjusts counts and records 539 * accordingly. If a null return, the pool continues running with 540 * fewer than the target number workers. If exceptional, the 541 * exception is propagated, generally to some external caller. 542 * 543 * WorkQueue field "phase" encodes the queue array id in lower 544 * bits, and otherwise acts similarly to the pool runState field: 545 * The "IDLE" bit is clear while active (either a released worker 546 * or a locked external queue), with other bits serving as a 547 * version counter to distinguish changes across multiple reads. 548 * Note that phase field updates lag queue CAS releases; seeing a 549 * non-idle phase does not guarantee that the worker is available 550 * (and so is never checked in this way). 551 * 552 * The ctl field also serves as the basis for memory 553 * synchronization surrounding activation. This uses a more 554 * efficient version of a Dekker-like rule that task producers and 555 * consumers sync with each other by both writing/CASing ctl (even 556 * if to its current value). However, rather than CASing ctl to 557 * its current value in the common case where no action is 558 * required, we reduce write contention by ensuring that 559 * signalWork invocations are prefaced with a fully fenced memory 560 * access (which is usually needed anyway). 561 * 562 * Signalling. Signals (in signalWork) cause new or reactivated 563 * workers to scan for tasks. Method signalWork and its callers 564 * try to approximate the unattainable goal of having the right 565 * number of workers activated for the tasks at hand, but must err 566 * on the side of too many workers vs too few to avoid stalls: 567 * 568 * * If computations are purely tree structured, it suffices for 569 * every worker to activate another when it pushes a task into 570 * an empty queue, resulting in O(log(#threads)) steps to full 571 * activation. Emptiness must be conservatively approximated, 572 * which may result in unnecessary signals. Also, to reduce 573 * resource usages in some cases, at the expense of slower 574 * startup in others, activation of an idle thread is preferred 575 * over creating a new one, here and elsewhere. 576 * 577 * * At the other extreme, if "flat" tasks (those that do not in 578 * turn generate others) come in serially from only a single 579 * producer, each worker taking a task from a queue should 580 * propagate a signal if there are more tasks in that 581 * queue. This is equivalent to, but generally faster than, 582 * arranging the stealer take multiple tasks, re-pushing one or 583 * more on its own queue, and signalling (because its queue is 584 * empty), also resulting in logarithmic full activation 585 * time. If tasks do not not engage in unbounded loops based on 586 * the actions of other workers with unknown dependencies loop, 587 * this form of proagation can be limited to one signal per 588 * activation (phase change). We distinguish the cases by 589 * further signalling only if the task is an InterruptibleTask 590 * (see below), which are the only supported forms of task that 591 * may do so. 592 * 593 * * Because we don't know about usage patterns (or most commonly, 594 * mixtures), we use both approaches, which present even more 595 * opportunities to over-signal. (Failure to distinguish these 596 * cases in terms of submission methods was arguably an early 597 * design mistake.) Note that in either of these contexts, 598 * signals may be (and often are) unnecessary because active 599 * workers continue scanning after running tasks without the 600 * need to be signalled (which is one reason work stealing is 601 * often faster than alternatives), so additional workers 602 * aren't needed. 603 * 604 * * For rapidly branching tasks that require full pool resources, 605 * oversignalling is OK, because signalWork will soon have no 606 * more workers to create or reactivate. But for others (mainly 607 * externally submitted tasks), overprovisioning may cause very 608 * noticeable slowdowns due to contention and resource 609 * wastage. We reduce impact by deactivating workers when 610 * queues don't have accessible tasks, but reactivating and 611 * rescanning if other tasks remain. 612 * 613 * * Despite these, signal contention and overhead effects still 614 * occur during ramp-up and ramp-down of small computations. 615 * 616 * Scanning. Method runWorker performs top-level scanning for (and 617 * execution of) tasks by polling a pseudo-random permutation of 618 * the array (by starting at a given index, and using a constant 619 * cyclically exhaustive stride.) It uses the same basic polling 620 * method as WorkQueue.poll(), but restarts with a different 621 * permutation on each invocation. The pseudorandom generator 622 * need not have high-quality statistical properties in the long 623 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence 624 * from ThreadLocalRandom probes, which are cheap and 625 * suffice. Each queue's polling attempts to avoid becoming stuck 626 * when other scanners/pollers stall. Scans do not otherwise 627 * explicitly take into account core affinities, loads, cache 628 * localities, etc, However, they do exploit temporal locality 629 * (which usually approximates these) by preferring to re-poll 630 * from the same queue after a successful poll before trying 631 * others, which also reduces bookkeeping, cache traffic, and 632 * scanning overhead. But it also reduces fairness, which is 633 * partially counteracted by giving up on detected interference 634 * (which also reduces contention when too many workers try to 635 * take small tasks from the same queue). 636 * 637 * Deactivation. When no tasks are found by a worker in runWorker, 638 * it tries to deactivate()), giving up (and rescanning) on "ctl" 639 * contention. To avoid missed signals during deactivation, the 640 * method rescans and reactivates if there may have been a missed 641 * signal during deactivation. To reduce false-alarm reactivations 642 * while doing so, we scan multiple times (analogously to method 643 * quiescent()) before trying to reactivate. Because idle workers 644 * are often not yet blocked (parked), we use a WorkQueue field to 645 * advertise that a waiter actually needs unparking upon signal. 646 * 647 * Quiescence. Workers scan looking for work, giving up when they 648 * don't find any, without being sure that none are available. 649 * However, some required functionality relies on consensus about 650 * quiescence (also termination, discussed below). The count 651 * fields in ctl allow accurate discovery of states in which all 652 * workers are idle. However, because external (asynchronous) 653 * submitters are not part of this vote, these mechanisms 654 * themselves do not guarantee that the pool is in a quiescent 655 * state with respect to methods isQuiescent, shutdown (which 656 * begins termination when quiescent), helpQuiesce, and indirectly 657 * others including tryCompensate. Method quiescent() is used in 658 * all of these contexts. It provides checks that all workers are 659 * idle and there are no submissions that they could poll if they 660 * were not idle, retrying on inconsistent reads of queues and 661 * using the runState seqLock to retry on queue array updates. 662 * (It also reports quiescence if the pool is terminating.) A true 663 * report means only that there was a moment at which quiescence 664 * held. False negatives are inevitable (for example when queues 665 * indices lag updates, as described above), which is accommodated 666 * when (tentatively) idle by scanning for work etc, and then 667 * re-invoking. This includes cases in which the final unparked 668 * thread (in deactivate()) uses quiescent() to check for tasks 669 * that could have been added during a race window that would not 670 * be accompanied by a signal, in which case re-activating itself 671 * (or any other worker) to rescan. Method helpQuiesce acts 672 * similarly but cannot rely on ctl counts to determine that all 673 * workers are inactive because the caller and any others 674 * executing helpQuiesce are not included in counts. 675 * 676 * Termination. Termination is initiated by setting STOP in one of 677 * three ways (via methods tryTerminate and quiescent): 678 * * A call to shutdownNow, in which case all workers are 679 * interrupted, first ensuring that the queues array is stable, 680 * to avoid missing any workers. 681 * * A call to shutdown when quiescent, in which case method 682 * releaseWaiters is used to dequeue them, at which point they notice 683 * STOP state and return from runWorker to deregister(); 684 * * The pool becomes quiescent() sometime after shutdown has 685 * been called, in which case releaseWaiters is also used to 686 * propagate as they deregister. 687 * Upon STOP, each worker, as well as external callers to 688 * tryTerminate (via close() etc) race to set CLEANED, indicating 689 * that all tasks have been cancelled. The implementation (method 690 * cleanQueues) balances cases in which there may be many tasks to 691 * cancel (benefitting from parallelism) versus contention and 692 * interference when many threads try to poll remaining queues, 693 * while also avoiding unnecessary rechecks, by using 694 * pseudorandom scans and giving up upon interference. This may be 695 * retried by the same caller only when there are no more 696 * registered workers, using the same criteria as method 697 * quiescent. When CLEANED and all workers have deregistered, 698 * TERMINATED is set, also signalling any caller of 699 * awaitTermination or close. Because shutdownNow-based 700 * termination relies on interrupts, there is no guarantee that 701 * workers will stop if their tasks ignore interrupts. Class 702 * InterruptibleTask (see below) further arranges runState checks 703 * before executing task bodies, and ensures interrupts while 704 * terminating. Even so, there are no guarantees because tasks may 705 * internally enter unbounded loops. 706 * 707 * Trimming workers. To release resources after periods of lack of 708 * use, a worker starting to wait when the pool is quiescent will 709 * time out and terminate if the pool has remained quiescent for 710 * period given by field keepAlive (default 60sec), which applies 711 * to the first timeout of a quiescent pool. Subsequent cases use 712 * minimal delays such that, if still quiescent, all will be 713 * released soon thereafter. This is checked by setting the 714 * "source" field of signallee to an invalid value, that will 715 * remain invalid only if it did not process any tasks. 716 * 717 * Joining Tasks 718 * ============= 719 * 720 * The "Join" part of ForkJoinPools consists of a set of 721 * mechanisms that sometimes or always (depending on the kind of 722 * task) avoid context switching or adding worker threads when one 723 * task would otherwise be blocked waiting for completion of 724 * another, basically, just by running that task or one of its 725 * subtasks if not already taken. These mechanics are disabled for 726 * InterruptibleTasks, that guarantee that callers do not execute 727 * submitted tasks. 728 * 729 * The basic structure of joining is an extended spin/block scheme 730 * in which workers check for task completions status between 731 * steps to find other work, until relevant pool state stabilizes 732 * enough to believe that no such tasks are available, at which 733 * point blocking. This is usually a good choice of when to block 734 * that would otherwise be harder to approximate. 735 * 736 * These forms of helping may increase stack space usage, but that 737 * space is bounded in tree/dag structured procedurally parallel 738 * designs to be no more than that if a task were executed only by 739 * the joining thread. This is arranged by associated task 740 * subclasses that also help detect and control the ways in which 741 * this may occur. 742 * 743 * Normally, the first option when joining a task that is not done 744 * is to try to take it from the local queue and run it. Method 745 * tryRemoveAndExec tries to do so. For tasks with any form of 746 * subtasks that must be completed first, we try to locate these 747 * subtasks and run them as well. This is easy when local, but 748 * when stolen, steal-backs are restricted to the same rules as 749 * stealing (polling), which requires additional bookkeeping and 750 * scanning. This cost is still very much worthwhile because of 751 * its impact on task scheduling and resource control. 752 * 753 * The two methods for finding and executing subtasks vary in 754 * details. The algorithm in helpJoin entails a form of "linear 755 * helping". Each worker records (in field "source") the index of 756 * the internal queue from which it last stole a task. (Note: 757 * because chains cannot include even-numbered external queues, 758 * they are ignored, and 0 is an OK default. However, the source 759 * field is set anyway, or eventually to DROPPED, to ensure 760 * volatile memory synchronization effects.) The scan in method 761 * helpJoin uses these markers to try to find a worker to help 762 * (i.e., steal back a task from and execute it) that could make 763 * progress toward completion of the actively joined task. Thus, 764 * the joiner executes a task that would be on its own local deque 765 * if the to-be-joined task had not been stolen. This is a 766 * conservative variant of the approach described in Wagner & 767 * Calder "Leapfrogging: a portable technique for implementing 768 * efficient futures" SIGPLAN Notices, 1993 769 * (http://portal.acm.org/citation.cfm?id=155354). It differs 770 * mainly in that we only record queues, not full dependency 771 * links. This requires a linear scan of the queues to locate 772 * stealers, but isolates cost to when it is needed, rather than 773 * adding to per-task overhead. For CountedCompleters, the 774 * analogous method helpComplete doesn't need stealer-tracking, 775 * but requires a similar (but simpler) check of completion 776 * chains. 777 * 778 * In either case, searches can fail to locate stealers when 779 * stalls delay recording sources or issuing subtasks. We avoid 780 * some of these cases by using snapshotted values of ctl as a 781 * check that the numbers of workers are not changing, along with 782 * rescans to deal with contention and stalls. But even when 783 * accurately identified, stealers might not ever produce a task 784 * that the joiner can in turn help with. 785 * 786 * Related method helpAsyncBlocker does not directly rely on 787 * subtask structure, but instead avoids or postpones blocking of 788 * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by 789 * executing other asyncs that can be processed in any order. 790 * This is currently invoked only in non-join-based blocking 791 * contexts from classes CompletableFuture and 792 * SubmissionPublisher, that could be further generalized. 793 * 794 * When any of the above fail to avoid blocking, we rely on 795 * "compensation" -- an indirect form of context switching that 796 * either activates an existing worker to take the place of the 797 * blocked one, or expands the number of workers. 798 * 799 * Compensation does not by default aim to keep exactly the target 800 * parallelism number of unblocked threads running at any given 801 * time. Some previous versions of this class employed immediate 802 * compensations for any blocked join. However, in practice, the 803 * vast majority of blockages are transient byproducts of GC and 804 * other JVM or OS activities that are made worse by replacement 805 * by causing longer-term oversubscription. These are inevitable 806 * without (unobtainably) perfect information about whether worker 807 * creation is actually necessary. False alarms are common enough 808 * to negatively impact performance, so compensation is by default 809 * attempted only when it appears possible that the pool could 810 * stall due to lack of any unblocked workers. However, we allow 811 * users to override defaults using the long form of the 812 * ForkJoinPool constructor. The compensation mechanism may also 813 * be bounded. Bounds for the commonPool better enable JVMs to 814 * cope with programming errors and abuse before running out of 815 * resources to do so. 816 * 817 * The ManagedBlocker extension API can't use helping so relies 818 * only on compensation in method awaitBlocker. This API was 819 * designed to highlight the uncertainty of compensation decisions 820 * by requiring implementation of method isReleasable to abort 821 * compensation during attempts to obtain a stable snapshot. But 822 * users now rely upon the fact that if isReleasable always 823 * returns false, the API can be used to obtain precautionary 824 * compensation, which is sometimes the only reasonable option 825 * when running unknown code in tasks; which is now supported more 826 * simply (see method beginCompensatedBlock). 827 * 828 * Common Pool 829 * =========== 830 * 831 * The static common pool always exists after static 832 * initialization. Since it (or any other created pool) need 833 * never be used, we minimize initial construction overhead and 834 * footprint to the setup of about a dozen fields, although with 835 * some System property parsing properties are set. The common pool is 836 * distinguished by having a null workerNamePrefix (which is an 837 * odd convention, but avoids the need to decode status in factory 838 * classes). It also has PRESET_SIZE config set if parallelism 839 * was configured by system property. 840 * 841 * When external threads use the common pool, they can perform 842 * subtask processing (see helpComplete and related methods) upon 843 * joins, unless they are submitted using ExecutorService 844 * submission methods, which implicitly disallow this. This 845 * caller-helps policy makes it sensible to set common pool 846 * parallelism level to one (or more) less than the total number 847 * of available cores, or even zero for pure caller-runs. External 848 * threads waiting for joins first check the common pool for their 849 * task, which fails quickly if the caller did not fork to common 850 * pool. 851 * 852 * Guarantees for common pool parallelism zero are limited to 853 * tasks that are joined by their callers in a tree-structured 854 * fashion or use CountedCompleters (as is true for jdk 855 * parallelStreams). Support infiltrates several methods, 856 * including those that retry helping steps until we are sure that 857 * none apply if there are no workers. To deal with conflicting 858 * requirements, uses of the commonPool that require async because 859 * caller-runs need not apply, ensure threads are enabled (by 860 * setting parallelism) via method asyncCommonPool before 861 * proceeding. (In principle, overriding zero parallelism needs to 862 * ensure at least one worker, but due to other backward 863 * compatibility contraints, ensures two.) 864 * 865 * As a more appropriate default in managed environments, unless 866 * overridden by system properties, we use workers of subclass 867 * InnocuousForkJoinWorkerThread for the commonPool. These 868 * workers do not belong to any user-defined ThreadGroup, and 869 * clear all ThreadLocals and reset the ContextClassLoader before 870 * (re)activating to execute top-level tasks. The associated 871 * mechanics may be JVM-dependent and must access particular 872 * Thread class fields to achieve this effect. 873 * 874 * InterruptibleTasks 875 * ==================== 876 * 877 * Regular ForkJoinTasks manage task cancellation (method cancel) 878 * independently from the interrupt status of threads running 879 * tasks. Interrupts are issued internally only while 880 * terminating, to wake up workers and cancel queued tasks. By 881 * default, interrupts are cleared only when necessary to ensure 882 * that calls to LockSupport.park do not loop indefinitely (park 883 * returns immediately if the current thread is interrupted). 884 * 885 * To comply with ExecutorService specs, we use subclasses of 886 * abstract class InterruptibleTask for tasks that require 887 * stronger interruption and cancellation guarantees. External 888 * submitters never run these tasks, even if in the common pool 889 * (as indicated by ForkJoinTask.noUserHelp status bit). 890 * InterruptibleTasks include a "runner" field (implemented 891 * similarly to FutureTask) to support cancel(true). Upon pool 892 * shutdown, runners are interrupted so they can cancel. Since 893 * external joining callers never run these tasks, they must await 894 * cancellation by others, which can occur along several different 895 * paths. The inability to rely on caller-runs may also require 896 * extra signalling (resulting in scanning and contention) so is 897 * done only conditionally in methods push and runworker. 898 * 899 * Across these APIs, rules for reporting exceptions for tasks 900 * with results accessed via join() differ from those via get(), 901 * which differ from those invoked using pool submit methods by 902 * non-workers (which comply with Future.get() specs). Internal 903 * usages of ForkJoinTasks ignore interrupt status when executing 904 * or awaiting completion. Otherwise, reporting task results or 905 * exceptions is preferred to throwing InterruptedExceptions, 906 * which are in turn preferred to timeouts. Similarly, completion 907 * status is preferred to reporting cancellation. Cancellation is 908 * reported as an unchecked exception by join(), and by worker 909 * calls to get(), but is otherwise wrapped in a (checked) 910 * ExecutionException. 911 * 912 * Worker Threads cannot be VirtualThreads, as enforced by 913 * requiring ForkJoinWorkerThreads in factories. There are 914 * several constructions relying on this. However as of this 915 * writing, virtual thread bodies are by default run as some form 916 * of InterruptibleTask. 917 * 918 * DelayScheduler 919 * ================ 920 * 921 * This class supports ScheduledExecutorService methods by 922 * creating and starting a DelayScheduler on first use of these 923 * methods (via startDelayScheduler). The scheduler operates 924 * independently in its own thread, relaying tasks to the pool to 925 * execute when their delays elapse (see method 926 * executeEnabledScheduledTask). The only other interactions with 927 * the delayScheduler are to control shutdown and maintain 928 * shutdown-related policies in methods quiescent() and 929 * tryTerminate(). In particular, processing must deal with cases 930 * in which tasks are submitted before shutdown, but not enabled 931 * until afterwards, in which case they must bypass some screening 932 * to be allowed to run. Conversely, the DelayScheduler checks 933 * runState status and when enabled, completes termination, using 934 * only methods shutdownStatus and tryStopIfShutdown. All of these 935 * methods are final and have signatures referencing 936 * DelaySchedulers, so cannot conflict with those of any existing 937 * FJP subclasses. 938 * 939 * Memory placement 940 * ================ 941 * 942 * Performance is very sensitive to placement of instances of 943 * ForkJoinPool and WorkQueues and their queue arrays, as well as 944 * the placement of their fields. Caches misses and contention due 945 * to false-sharing have been observed to slow down some programs 946 * by more than a factor of four. Effects may vary across initial 947 * memory configuarations, applications, and different garbage 948 * collectors and GC settings, so there is no perfect solution. 949 * Too much isolation may generate more cache misses in common 950 * cases (because some fields snd slots are usually read at the 951 * same time). The @Contended annotation provides only rough 952 * control (for good reason). Similarly for relying on fields 953 * being placed in size-sorted declaration order. 954 * 955 * We isolate the ForkJoinPool.ctl field that otherwise causes the 956 * most false-sharing misses with respect to other fields. Also, 957 * ForkJoinPool fields are ordered such that fields less prone to 958 * contention effects are first, offsetting those that otherwise 959 * would be, while also reducing total footprint vs using 960 * multiple @Contended regions, which tends to slow down 961 * less-contended applications. To help arrange this, some 962 * non-reference fields are declared as "long" even when ints or 963 * shorts would suffice. For class WorkQueue, an 964 * embedded @Contended region segregates fields most heavily 965 * updated by owners from those most commonly read by stealers or 966 * other management. 967 * 968 * Initial sizing and resizing of WorkQueue arrays is an even more 969 * delicate tradeoff because the best strategy systematically 970 * varies across garbage collectors. Small arrays are better for 971 * locality and reduce GC scan time, but large arrays reduce both 972 * direct false-sharing and indirect cases due to GC bookkeeping 973 * (cardmarks etc), and reduce the number of resizes, which are 974 * not especially fast because they require atomic transfers. 975 * Currently, arrays for workers are initialized to be just large 976 * enough to avoid resizing in most tree-structured tasks, but 977 * larger for external queues where both false-sharing problems 978 * and the need for resizing are more common. (Maintenance note: 979 * any changes in fields, queues, or their uses, or JVM layout 980 * policies, must be accompanied by re-evaluation of these 981 * placement and sizing decisions.) 982 * 983 * Style notes 984 * =========== 985 * 986 * Memory ordering relies mainly on atomic operations (CAS, 987 * getAndSet, getAndAdd) along with moded accesses. These use 988 * jdk-internal Unsafe for atomics and special memory modes, 989 * rather than VarHandles, to avoid initialization dependencies in 990 * other jdk components that require early parallelism. This can 991 * be awkward and ugly, but also reflects the need to control 992 * outcomes across the unusual cases that arise in very racy code 993 * with very few invariants. All atomic task slot updates use 994 * Unsafe operations requiring offset positions, not indices, as 995 * computed by method slotOffset. All fields are read into locals 996 * before use, and null-checked if they are references, even if 997 * they can never be null under current usages. Usually, 998 * computations (held in local variables) are defined as soon as 999 * logically enabled, sometimes to convince compilers that they 1000 * may be performed despite memory ordering constraints. Array 1001 * accesses using masked indices include checks (that are always 1002 * true) that the array length is non-zero to avoid compilers 1003 * inserting more expensive traps. This is usually done in a 1004 * "C"-like style of listing declarations at the heads of methods 1005 * or blocks, and using inline assignments on first encounter. 1006 * Nearly all explicit checks lead to bypass/return, not exception 1007 * throws, because they may legitimately arise during shutdown. A 1008 * few unusual loop constructions encourage (with varying 1009 * effectiveness) JVMs about where (not) to place safepoints. All 1010 * public methods screen arguments (mainly null checks) before 1011 * creating or executing tasks. 1012 * 1013 * There is a lot of representation-level coupling among classes 1014 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 1015 * fields of WorkQueue maintain data structures managed by 1016 * ForkJoinPool, so are directly accessed. There is little point 1017 * trying to reduce this, since any associated future changes in 1018 * representations will need to be accompanied by algorithmic 1019 * changes anyway. Several methods intrinsically sprawl because 1020 * they must accumulate sets of consistent reads of fields held in 1021 * local variables. Some others are artificially broken up to 1022 * reduce producer/consumer imbalances due to dynamic compilation. 1023 * There are also other coding oddities (including several 1024 * unnecessary-looking hoisted null checks) that help some methods 1025 * perform reasonably even when interpreted (not compiled). 1026 * 1027 * The order of declarations in this file is (with a few exceptions): 1028 * (1) Static configuration constants 1029 * (2) Static utility functions 1030 * (3) Nested (static) classes 1031 * (4) Fields, along with constants used when unpacking some of them 1032 * (5) Internal control methods 1033 * (6) Callbacks and other support for ForkJoinTask methods 1034 * (7) Exported methods 1035 * (8) Static block initializing statics in minimally dependent order 1036 * 1037 */ 1038 1039 // static configuration constants 1040 1041 /** 1042 * Default idle timeout value (in milliseconds) for idle threads 1043 * to park waiting for new work before terminating. 1044 */ 1045 static final long DEFAULT_KEEPALIVE = 60_000L; 1046 1047 /** 1048 * Undershoot tolerance for idle timeouts, also serving as the 1049 * minimum allowed timeout value. 1050 */ 1051 static final long TIMEOUT_SLOP = 20L; 1052 1053 /** 1054 * The default value for common pool maxSpares. Overridable using 1055 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares" 1056 * system property. The default value is far in excess of normal 1057 * requirements, but also far short of maximum capacity and typical OS 1058 * thread limits, so allows JVMs to catch misuse/abuse before 1059 * running out of resources needed to do so. 1060 */ 1061 static final int DEFAULT_COMMON_MAX_SPARES = 256; 1062 1063 /** 1064 * Initial capacity of work-stealing queue array for workers. 1065 * Must be a power of two, at least 2. See above. 1066 */ 1067 static final int INITIAL_QUEUE_CAPACITY = 1 << 6; 1068 1069 /** 1070 * Initial capacity of work-stealing queue array for external queues. 1071 * Must be a power of two, at least 2. See above. 1072 */ 1073 static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9; 1074 1075 // conversions among short, int, long 1076 static final int SMASK = 0xffff; // (unsigned) short bits 1077 static final long LMASK = 0xffffffffL; // lower 32 bits of long 1078 static final long UMASK = ~LMASK; // upper 32 bits 1079 1080 // masks and sentinels for queue indices 1081 static final int MAX_CAP = 0x7fff; // max # workers 1082 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id 1083 static final int INVALID_ID = 0x4000; // unused external queue id 1084 1085 // pool.runState bits 1086 static final long STOP = 1L << 0; // terminating 1087 static final long SHUTDOWN = 1L << 1; // terminate when quiescent 1088 static final long CLEANED = 1L << 2; // stopped and queues cleared 1089 static final long TERMINATED = 1L << 3; // only set if STOP also set 1090 static final long RS_LOCK = 1L << 4; // lowest seqlock bit 1091 1092 // spin/sleep limits for runState locking and elsewhere 1093 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait 1094 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos 1095 static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos 1096 1097 // {pool, workQueue} config bits 1098 static final int FIFO = 1 << 0; // fifo queue or access mode 1099 static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers 1100 static final int PRESET_SIZE = 1 << 2; // size was set by property 1101 1102 // others 1103 static final int DROPPED = 1 << 16; // removed from ctl counts 1104 static final int UNCOMPENSATE = 1 << 16; // tryCompensate return 1105 static final int IDLE = 1 << 16; // phase seqlock/version count 1106 static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots 1107 1108 /* 1109 * Bits and masks for ctl and bounds are packed with 4 16 bit subfields: 1110 * RC: Number of released (unqueued) workers 1111 * TC: Number of total workers 1112 * SS: version count and status of top waiting thread 1113 * ID: poolIndex of top of Treiber stack of waiters 1114 * 1115 * When convenient, we can extract the lower 32 stack top bits 1116 * (including version bits) as sp=(int)ctl. When sp is non-zero, 1117 * there are waiting workers. Count fields may be transiently 1118 * negative during termination because of out-of-order updates. 1119 * To deal with this, we use casts in and out of "short" and/or 1120 * signed shifts to maintain signedness. Because it occupies 1121 * uppermost bits, we can add one release count using getAndAdd of 1122 * RC_UNIT, rather than CAS, when returning from a blocked join. 1123 * Other updates of multiple subfields require CAS. 1124 */ 1125 1126 // Release counts 1127 static final int RC_SHIFT = 48; 1128 static final long RC_UNIT = 0x0001L << RC_SHIFT; 1129 static final long RC_MASK = 0xffffL << RC_SHIFT; 1130 // Total counts 1131 static final int TC_SHIFT = 32; 1132 static final long TC_UNIT = 0x0001L << TC_SHIFT; 1133 static final long TC_MASK = 0xffffL << TC_SHIFT; 1134 1135 /* 1136 * All atomic operations on task arrays (queues) use Unsafe 1137 * operations that take array offsets versus indices, based on 1138 * array base and shift constants established during static 1139 * initialization. 1140 */ 1141 static final long ABASE; 1142 static final int ASHIFT; 1143 1144 // Static utilities 1145 1146 /** 1147 * Returns the array offset corresponding to the given index for 1148 * Unsafe task queue operations 1149 */ 1150 static long slotOffset(int index) { 1151 return ((long)index << ASHIFT) + ABASE; 1152 } 1153 1154 // Nested classes 1155 1156 /** 1157 * Factory for creating new {@link ForkJoinWorkerThread}s. 1158 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 1159 * for {@code ForkJoinWorkerThread} subclasses that extend base 1160 * functionality or initialize threads with different contexts. 1161 */ 1162 public static interface ForkJoinWorkerThreadFactory { 1163 /** 1164 * Returns a new worker thread operating in the given pool. 1165 * Returning null or throwing an exception may result in tasks 1166 * never being executed. If this method throws an exception, 1167 * it is relayed to the caller of the method (for example 1168 * {@code execute}) causing attempted thread creation. If this 1169 * method returns null or throws an exception, it is not 1170 * retried until the next attempted creation (for example 1171 * another call to {@code execute}). 1172 * 1173 * @param pool the pool this thread works in 1174 * @return the new worker thread, or {@code null} if the request 1175 * to create a thread is rejected 1176 * @throws NullPointerException if the pool is null 1177 */ 1178 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 1179 } 1180 1181 /** 1182 * Default ForkJoinWorkerThreadFactory implementation; creates a 1183 * new ForkJoinWorkerThread using the system class loader as the 1184 * thread context class loader. 1185 */ 1186 static final class DefaultForkJoinWorkerThreadFactory 1187 implements ForkJoinWorkerThreadFactory { 1188 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 1189 return ((pool.workerNamePrefix == null) ? // is commonPool 1190 new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) : 1191 new ForkJoinWorkerThread(null, pool, true, false)); 1192 } 1193 } 1194 1195 /** 1196 * Queues supporting work-stealing as well as external task 1197 * submission. See above for descriptions and algorithms. 1198 */ 1199 static final class WorkQueue { 1200 // fields declared in order of their likely layout on most VMs 1201 final ForkJoinWorkerThread owner; // null if shared 1202 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size 1203 int base; // index of next slot for poll 1204 final int config; // mode bits 1205 1206 // fields otherwise causing more unnecessary false-sharing cache misses 1207 @jdk.internal.vm.annotation.Contended("w") 1208 int top; // index of next slot for push 1209 @jdk.internal.vm.annotation.Contended("w") 1210 volatile int phase; // versioned active status 1211 @jdk.internal.vm.annotation.Contended("w") 1212 int stackPred; // pool stack (ctl) predecessor link 1213 @jdk.internal.vm.annotation.Contended("w") 1214 volatile int source; // source queue id (or DROPPED) 1215 @jdk.internal.vm.annotation.Contended("w") 1216 int nsteals; // number of steals from other queues 1217 @jdk.internal.vm.annotation.Contended("w") 1218 volatile int parking; // nonzero if parked in awaitWork 1219 1220 // Support for atomic operations 1221 private static final Unsafe U; 1222 private static final long PHASE; 1223 private static final long BASE; 1224 private static final long TOP; 1225 private static final long ARRAY; 1226 1227 final void updateBase(int v) { 1228 U.putIntVolatile(this, BASE, v); 1229 } 1230 final void updateTop(int v) { 1231 U.putIntOpaque(this, TOP, v); 1232 } 1233 final void updateArray(ForkJoinTask<?>[] a) { 1234 U.getAndSetReference(this, ARRAY, a); 1235 } 1236 final void unlockPhase() { 1237 U.getAndAddInt(this, PHASE, IDLE); 1238 } 1239 final boolean tryLockPhase() { // seqlock acquire 1240 int p; 1241 return (((p = phase) & IDLE) != 0 && 1242 U.compareAndSetInt(this, PHASE, p, p + IDLE)); 1243 } 1244 1245 /** 1246 * Constructor. For internal queues, most fields are initialized 1247 * upon thread start in pool.registerWorker. 1248 */ 1249 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, 1250 boolean clearThreadLocals) { 1251 array = new ForkJoinTask<?>[owner == null ? 1252 INITIAL_EXTERNAL_QUEUE_CAPACITY : 1253 INITIAL_QUEUE_CAPACITY]; 1254 this.owner = owner; 1255 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg; 1256 } 1257 1258 /** 1259 * Returns an exportable index (used by ForkJoinWorkerThread). 1260 */ 1261 final int getPoolIndex() { 1262 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit 1263 } 1264 1265 /** 1266 * Returns the approximate number of tasks in the queue. 1267 */ 1268 final int queueSize() { 1269 int unused = phase; // for ordering effect 1270 return Math.max(top - base, 0); // ignore transient negative 1271 } 1272 1273 /** 1274 * Pushes a task. Called only by owner or if already locked 1275 * 1276 * @param task the task; no-op if null 1277 * @param pool the pool to signal if was previously empty, else null 1278 * @param internal if caller owns this queue 1279 * @throws RejectedExecutionException if array could not be resized 1280 */ 1281 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) { 1282 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a; 1283 if ((a = array) != null && (cap = a.length) > 0 && // else disabled 1284 task != null) { 1285 int pk = task.noUserHelp() + 1; // prev slot offset 1286 if ((room = (m = cap - 1) - (s - b)) >= 0) { 1287 top = s + 1; 1288 long pos = slotOffset(m & s); 1289 if (!internal) 1290 U.putReference(a, pos, task); // inside lock 1291 else 1292 U.getAndSetReference(a, pos, task); // fully fenced 1293 if (room == 0) // resize 1294 growArray(a, cap, s); 1295 } 1296 if (!internal) 1297 unlockPhase(); 1298 if (room < 0) 1299 throw new RejectedExecutionException("Queue capacity exceeded"); 1300 if ((room == 0 || a[m & (s - pk)] == null) && 1301 pool != null) 1302 pool.signalWork(); // may have appeared empty 1303 } 1304 } 1305 1306 /** 1307 * Resizes the queue array unless out of memory. 1308 * @param a old array 1309 * @param cap old array capacity 1310 * @param s current top 1311 */ 1312 private void growArray(ForkJoinTask<?>[] a, int cap, int s) { 1313 int newCap = cap << 1; 1314 if (a != null && a.length == cap && cap > 0 && newCap > 0) { 1315 ForkJoinTask<?>[] newArray = null; 1316 try { 1317 newArray = new ForkJoinTask<?>[newCap]; 1318 } catch (OutOfMemoryError ex) { 1319 } 1320 if (newArray != null) { // else throw on next push 1321 int mask = cap - 1, newMask = newCap - 1; 1322 for (int k = s, j = cap; j > 0; --j, --k) { 1323 ForkJoinTask<?> u; // poll old, push to new 1324 if ((u = (ForkJoinTask<?>)U.getAndSetReference( 1325 a, slotOffset(k & mask), null)) == null) 1326 break; // lost to pollers 1327 newArray[k & newMask] = u; 1328 } 1329 updateArray(newArray); // fully fenced 1330 } 1331 } 1332 } 1333 1334 /** 1335 * Takes next task, if one exists, in order specified by mode, 1336 * so acts as either local-pop or local-poll. Called only by owner. 1337 * @param fifo nonzero if FIFO mode 1338 */ 1339 private ForkJoinTask<?> nextLocalTask(int fifo) { 1340 ForkJoinTask<?> t = null; 1341 ForkJoinTask<?>[] a = array; 1342 int b = base, p = top, cap; 1343 if (p - b > 0 && a != null && (cap = a.length) > 0) { 1344 for (int m = cap - 1, s, nb;;) { 1345 if (fifo == 0 || (nb = b + 1) == p) { 1346 if ((t = (ForkJoinTask<?>)U.getAndSetReference( 1347 a, slotOffset(m & (s = p - 1)), null)) != null) 1348 updateTop(s); // else lost race for only task 1349 break; 1350 } 1351 if ((t = (ForkJoinTask<?>)U.getAndSetReference( 1352 a, slotOffset(m & b), null)) != null) { 1353 updateBase(nb); 1354 break; 1355 } 1356 while (b == (b = U.getIntAcquire(this, BASE))) 1357 Thread.onSpinWait(); // spin to reduce memory traffic 1358 if (p - b <= 0) 1359 break; 1360 } 1361 } 1362 return t; 1363 } 1364 1365 /** 1366 * Takes next task, if one exists, using configured mode. 1367 * (Always internal, never called for Common pool.) 1368 */ 1369 final ForkJoinTask<?> nextLocalTask() { 1370 return nextLocalTask(config & FIFO); 1371 } 1372 1373 /** 1374 * Pops the given task only if it is at the current top. 1375 * @param task the task. Caller must ensure non-null. 1376 * @param internal if caller owns this queue 1377 */ 1378 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) { 1379 boolean taken = false; 1380 ForkJoinTask<?>[] a = array; 1381 int p = top, s = p - 1, cap; long k; 1382 if (a != null && (cap = a.length) > 0 && 1383 U.getReference(a, k = slotOffset((cap - 1) & s)) == task && 1384 (internal || tryLockPhase())) { 1385 if (top == p && U.compareAndSetReference(a, k, task, null)) { 1386 taken = true; 1387 updateTop(s); 1388 } 1389 if (!internal) 1390 unlockPhase(); 1391 } 1392 return taken; 1393 } 1394 1395 /** 1396 * Returns next task, if one exists, in order specified by mode. 1397 */ 1398 final ForkJoinTask<?> peek() { 1399 ForkJoinTask<?>[] a = array; 1400 int b = base, cfg = config, p = top, cap; 1401 if (p != b && a != null && (cap = a.length) > 0) { 1402 if ((cfg & FIFO) == 0) 1403 return a[(cap - 1) & (p - 1)]; 1404 else { // skip over in-progress removals 1405 ForkJoinTask<?> t; 1406 for ( ; p - b > 0; ++b) { 1407 if ((t = a[(cap - 1) & b]) != null) 1408 return t; 1409 } 1410 } 1411 } 1412 return null; 1413 } 1414 1415 /** 1416 * Polls for a task. Used only by non-owners. 1417 */ 1418 final ForkJoinTask<?> poll() { 1419 for (int pb = -1, b; ; pb = b) { // track progress 1420 ForkJoinTask<?> t; int cap, nb; long k; ForkJoinTask<?>[] a; 1421 if ((a = array) == null || (cap = a.length) <= 0) 1422 break; 1423 t = (ForkJoinTask<?>)U.getReferenceAcquire( 1424 a, k = slotOffset((cap - 1) & (b = base))); 1425 Object u = U.getReference( // next slot 1426 a, slotOffset((cap - 1) & (nb = b + 1))); 1427 if (base != b) // inconsistent 1428 ; 1429 else if (t == null) { 1430 if (u == null && top - b <= 0) 1431 break; // empty 1432 if (pb == b) 1433 Thread.onSpinWait(); // stalled 1434 } 1435 else if (U.compareAndSetReference(a, k, t, null)) { 1436 updateBase(nb); 1437 return t; 1438 } 1439 } 1440 return null; 1441 } 1442 1443 // specialized execution methods 1444 1445 /** 1446 * Runs the given task, as well as remaining local tasks. 1447 */ 1448 final void topLevelExec(ForkJoinTask<?> task, int fifo) { 1449 while (task != null) { 1450 task.doExec(); 1451 task = nextLocalTask(fifo); 1452 } 1453 } 1454 1455 /** 1456 * Deep form of tryUnpush: Traverses from top and removes and 1457 * runs task if present. 1458 */ 1459 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) { 1460 ForkJoinTask<?>[] a = array; 1461 int b = base, p = top, s = p - 1, d = p - b, cap; 1462 if (a != null && (cap = a.length) > 0) { 1463 for (int m = cap - 1, i = s; d > 0; --i, --d) { 1464 long k; boolean taken; 1465 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference( 1466 a, k = slotOffset(i & m)); 1467 if (t == null) 1468 break; 1469 if (t == task) { 1470 if (!internal && !tryLockPhase()) 1471 break; // fail if locked 1472 if (taken = 1473 (top == p && 1474 U.compareAndSetReference(a, k, task, null))) { 1475 if (i == s) // act as pop 1476 updateTop(s); 1477 else if (i == base) // act as poll 1478 updateBase(i + 1); 1479 else { // swap with top 1480 U.putReferenceVolatile( 1481 a, k, (ForkJoinTask<?>) 1482 U.getAndSetReference( 1483 a, slotOffset(s & m), null)); 1484 updateTop(s); 1485 } 1486 } 1487 if (!internal) 1488 unlockPhase(); 1489 if (taken) 1490 task.doExec(); 1491 break; 1492 } 1493 } 1494 } 1495 } 1496 1497 /** 1498 * Tries to pop and run tasks within the target's computation 1499 * until done, not found, or limit exceeded. 1500 * 1501 * @param task root of computation 1502 * @param limit max runs, or zero for no limit 1503 * @return task status if known to be done 1504 */ 1505 final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) { 1506 int status = 0; 1507 if (task != null) { 1508 outer: for (;;) { 1509 ForkJoinTask<?>[] a; boolean taken; Object o; 1510 int stat, p, s, cap; 1511 if ((stat = task.status) < 0) { 1512 status = stat; 1513 break; 1514 } 1515 if ((a = array) == null || (cap = a.length) <= 0) 1516 break; 1517 long k = slotOffset((cap - 1) & (s = (p = top) - 1)); 1518 if (!((o = U.getReference(a, k)) instanceof CountedCompleter)) 1519 break; 1520 CountedCompleter<?> t = (CountedCompleter<?>)o, f = t; 1521 for (int steps = cap;;) { // bound path 1522 if (f == task) 1523 break; 1524 if ((f = f.completer) == null || --steps == 0) 1525 break outer; 1526 } 1527 if (!internal && !tryLockPhase()) 1528 break; 1529 if (taken = 1530 (top == p && 1531 U.compareAndSetReference(a, k, t, null))) 1532 updateTop(s); 1533 if (!internal) 1534 unlockPhase(); 1535 if (!taken) 1536 break; 1537 t.doExec(); 1538 if (limit != 0 && --limit == 0) 1539 break; 1540 } 1541 } 1542 return status; 1543 } 1544 1545 /** 1546 * Tries to poll and run AsynchronousCompletionTasks until 1547 * none found or blocker is released 1548 * 1549 * @param blocker the blocker 1550 */ 1551 final void helpAsyncBlocker(ManagedBlocker blocker) { 1552 for (;;) { 1553 ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, cap; long k; 1554 if ((a = array) == null || (cap = a.length) <= 0) 1555 break; 1556 t = (ForkJoinTask<?>)U.getReferenceAcquire( 1557 a, k = slotOffset((cap - 1) & (b = base))); 1558 if (t == null) { 1559 if (top - b <= 0) 1560 break; 1561 } 1562 else if (!(t instanceof CompletableFuture 1563 .AsynchronousCompletionTask)) 1564 break; 1565 if (blocker != null && blocker.isReleasable()) 1566 break; 1567 if (base == b && t != null && 1568 U.compareAndSetReference(a, k, t, null)) { 1569 updateBase(b + 1); 1570 t.doExec(); 1571 } 1572 } 1573 } 1574 1575 // misc 1576 1577 /** 1578 * Cancels all local tasks. Called only by owner. 1579 */ 1580 final void cancelTasks() { 1581 for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) { 1582 try { 1583 t.cancel(false); 1584 } catch (Throwable ignore) { 1585 } 1586 } 1587 } 1588 1589 /** 1590 * Returns true if internal and not known to be blocked. 1591 */ 1592 final boolean isApparentlyUnblocked() { 1593 Thread wt; Thread.State s; 1594 return ((wt = owner) != null && (phase & IDLE) != 0 && 1595 (s = wt.getState()) != Thread.State.BLOCKED && 1596 s != Thread.State.WAITING && 1597 s != Thread.State.TIMED_WAITING); 1598 } 1599 1600 static { 1601 U = Unsafe.getUnsafe(); 1602 Class<WorkQueue> klass = WorkQueue.class; 1603 PHASE = U.objectFieldOffset(klass, "phase"); 1604 BASE = U.objectFieldOffset(klass, "base"); 1605 TOP = U.objectFieldOffset(klass, "top"); 1606 ARRAY = U.objectFieldOffset(klass, "array"); 1607 } 1608 } 1609 1610 // static fields (initialized in static initializer below) 1611 1612 /** 1613 * Creates a new ForkJoinWorkerThread. This factory is used unless 1614 * overridden in ForkJoinPool constructors. 1615 */ 1616 public static final ForkJoinWorkerThreadFactory 1617 defaultForkJoinWorkerThreadFactory; 1618 1619 /** 1620 * Common (static) pool. Non-null for public use unless a static 1621 * construction exception, but internal usages null-check on use 1622 * to paranoically avoid potential initialization circularities 1623 * as well as to simplify generated code. 1624 */ 1625 static final ForkJoinPool common; 1626 1627 /** 1628 * Sequence number for creating worker names 1629 */ 1630 private static volatile int poolIds; 1631 1632 /** 1633 * For VirtualThread intrinsics 1634 */ 1635 private static final JavaLangAccess JLA; 1636 1637 // fields declared in order of their likely layout on most VMs 1638 volatile CountDownLatch termination; // lazily constructed 1639 final Predicate<? super ForkJoinPool> saturate; 1640 final ForkJoinWorkerThreadFactory factory; 1641 final UncaughtExceptionHandler ueh; // per-worker UEH 1642 final SharedThreadContainer container; 1643 final String workerNamePrefix; // null for common pool 1644 final String poolName; 1645 volatile DelayScheduler delayScheduler; // lazily constructed 1646 WorkQueue[] queues; // main registry 1647 volatile long runState; // versioned, lockable 1648 final long keepAlive; // milliseconds before dropping if idle 1649 final long config; // static configuration bits 1650 volatile long stealCount; // collects worker nsteals 1651 volatile long threadIds; // for worker thread names 1652 1653 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 1654 volatile long ctl; // main pool control 1655 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate 1656 int parallelism; // target number of workers 1657 1658 // Support for atomic operations 1659 private static final Unsafe U; 1660 private static final long CTL; 1661 private static final long RUNSTATE; 1662 private static final long PARALLELISM; 1663 private static final long THREADIDS; 1664 private static final long TERMINATION; 1665 private static final Object POOLIDS_BASE; 1666 private static final long POOLIDS; 1667 1668 private boolean compareAndSetCtl(long c, long v) { 1669 return U.compareAndSetLong(this, CTL, c, v); 1670 } 1671 private long compareAndExchangeCtl(long c, long v) { 1672 return U.compareAndExchangeLong(this, CTL, c, v); 1673 } 1674 private long getAndAddCtl(long v) { 1675 return U.getAndAddLong(this, CTL, v); 1676 } 1677 private long incrementThreadIds() { 1678 return U.getAndAddLong(this, THREADIDS, 1L); 1679 } 1680 private static int getAndAddPoolIds(int x) { 1681 return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x); 1682 } 1683 private int getAndSetParallelism(int v) { 1684 return U.getAndSetInt(this, PARALLELISM, v); 1685 } 1686 private int getParallelismOpaque() { 1687 return U.getIntOpaque(this, PARALLELISM); 1688 } 1689 private CountDownLatch cmpExTerminationSignal(CountDownLatch x) { 1690 return (CountDownLatch) 1691 U.compareAndExchangeReference(this, TERMINATION, null, x); 1692 } 1693 1694 // runState operations 1695 1696 private long getAndBitwiseOrRunState(long v) { // for status bits 1697 return U.getAndBitwiseOrLong(this, RUNSTATE, v); 1698 } 1699 private boolean casRunState(long c, long v) { 1700 return U.compareAndSetLong(this, RUNSTATE, c, v); 1701 } 1702 private void unlockRunState() { // increment lock bit 1703 U.getAndAddLong(this, RUNSTATE, RS_LOCK); 1704 } 1705 private long lockRunState() { // lock and return current state 1706 long s, u; // locked when RS_LOCK set 1707 if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK)) 1708 return u; 1709 else 1710 return spinLockRunState(); 1711 } 1712 private long spinLockRunState() { // spin/sleep 1713 for (int waits = 0;;) { 1714 long s, u; 1715 if (((s = runState) & RS_LOCK) == 0L) { 1716 if (casRunState(s, u = s + RS_LOCK)) 1717 return u; 1718 waits = 0; 1719 } else if (waits < SPIN_WAITS) { 1720 ++waits; 1721 Thread.onSpinWait(); 1722 } else { 1723 if (waits < MIN_SLEEP) 1724 waits = MIN_SLEEP; 1725 LockSupport.parkNanos(this, (long)waits); 1726 if (waits < MAX_SLEEP) 1727 waits <<= 1; 1728 } 1729 } 1730 } 1731 1732 static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask 1733 return p != null && (p.runState & STOP) != 0L; 1734 } 1735 1736 // Creating, registering, and deregistering workers 1737 1738 /** 1739 * Tries to construct and start one worker. Assumes that total 1740 * count has already been incremented as a reservation. Invokes 1741 * deregisterWorker on any failure. 1742 * 1743 * @return true if successful 1744 */ 1745 private boolean createWorker() { 1746 ForkJoinWorkerThreadFactory fac = factory; 1747 SharedThreadContainer ctr = container; 1748 Throwable ex = null; 1749 ForkJoinWorkerThread wt = null; 1750 try { 1751 if ((runState & STOP) == 0L && // avoid construction if terminating 1752 fac != null && (wt = fac.newThread(this)) != null) { 1753 if (ctr != null) 1754 ctr.start(wt); 1755 else 1756 wt.start(); 1757 return true; 1758 } 1759 } catch (Throwable rex) { 1760 ex = rex; 1761 } 1762 deregisterWorker(wt, ex); 1763 return false; 1764 } 1765 1766 /** 1767 * Provides a name for ForkJoinWorkerThread constructor. 1768 */ 1769 final String nextWorkerThreadName() { 1770 String prefix = workerNamePrefix; 1771 long tid = incrementThreadIds() + 1L; 1772 if (prefix == null) // commonPool has no prefix 1773 prefix = "ForkJoinPool.commonPool-worker-"; 1774 return prefix.concat(Long.toString(tid)); 1775 } 1776 1777 /** 1778 * Finishes initializing and records internal queue. 1779 * 1780 * @param w caller's WorkQueue 1781 */ 1782 final void registerWorker(WorkQueue w) { 1783 if (w != null && (runState & STOP) == 0L) { 1784 ThreadLocalRandom.localInit(); 1785 int seed = w.stackPred = ThreadLocalRandom.getProbe(); 1786 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag 1787 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan 1788 long stop = lockRunState() & STOP; 1789 try { 1790 WorkQueue[] qs; int n; 1791 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) { 1792 for (int k = n, m = n - 1; ; id += 2) { 1793 if (qs[id &= m] == null) 1794 break; 1795 if ((k -= 2) <= 0) { 1796 id |= n; 1797 break; 1798 } 1799 } 1800 w.phase = id | phaseSeq; // now publishable 1801 if (id < n) 1802 qs[id] = w; 1803 else { // expand 1804 int an = n << 1, am = an - 1; 1805 WorkQueue[] as = new WorkQueue[an]; 1806 as[id & am] = w; 1807 for (int j = 1; j < n; j += 2) 1808 as[j] = qs[j]; 1809 for (int j = 0; j < n; j += 2) { 1810 WorkQueue q; // shared queues may move 1811 if ((q = qs[j]) != null) 1812 as[q.phase & EXTERNAL_ID_MASK & am] = q; 1813 } 1814 U.storeFence(); // fill before publish 1815 queues = as; 1816 } 1817 } 1818 } finally { 1819 unlockRunState(); 1820 } 1821 } 1822 } 1823 1824 /** 1825 * Final callback from terminating worker, as well as upon failure 1826 * to construct or start a worker. Removes record of worker from 1827 * array, and adjusts counts. If pool is shutting down, tries to 1828 * complete termination. 1829 * 1830 * @param wt the worker thread, or null if construction failed 1831 * @param ex the exception causing failure, or null if none 1832 */ 1833 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1834 WorkQueue w = null; // null if not created 1835 int phase = 0; // 0 if not registered 1836 if (wt != null && (w = wt.workQueue) != null && 1837 (phase = w.phase) != 0 && (phase & IDLE) != 0) 1838 releaseWaiters(); // ensure released 1839 if (w == null || w.source != DROPPED) { 1840 long c = ctl; // decrement counts 1841 do {} while (c != (c = compareAndExchangeCtl( 1842 c, ((RC_MASK & (c - RC_UNIT)) | 1843 (TC_MASK & (c - TC_UNIT)) | 1844 (LMASK & c))))); 1845 } 1846 if (phase != 0 && w != null) { // remove index unless terminating 1847 long ns = w.nsteals & 0xffffffffL; 1848 if ((runState & STOP) == 0L) { 1849 WorkQueue[] qs; int n, i; 1850 if ((lockRunState() & STOP) == 0L && 1851 (qs = queues) != null && (n = qs.length) > 0 && 1852 qs[i = phase & SMASK & (n - 1)] == w) { 1853 qs[i] = null; 1854 stealCount += ns; // accumulate steals 1855 } 1856 unlockRunState(); 1857 } 1858 } 1859 if ((tryTerminate(false, false) & STOP) == 0L && 1860 phase != 0 && w != null && w.source != DROPPED) { 1861 signalWork(); // possibly replace 1862 w.cancelTasks(); // clean queue 1863 } 1864 if (ex != null) 1865 ForkJoinTask.rethrow(ex); 1866 } 1867 1868 /** 1869 * Releases an idle worker, or creates one if not enough exist. 1870 */ 1871 final void signalWork() { 1872 int pc = parallelism; 1873 for (long c = ctl;;) { 1874 WorkQueue[] qs = queues; 1875 long ac = (c + RC_UNIT) & RC_MASK, nc; 1876 int sp = (int)c, i = sp & SMASK; 1877 if ((short)(c >>> RC_SHIFT) >= pc) 1878 break; 1879 if (qs == null) 1880 break; 1881 if (qs.length <= i) 1882 break; 1883 WorkQueue w = qs[i], v = null; 1884 if (sp == 0) { 1885 if ((short)(c >>> TC_SHIFT) >= pc) 1886 break; 1887 nc = ((c + TC_UNIT) & TC_MASK); 1888 } 1889 else if ((v = w) == null) 1890 break; 1891 else 1892 nc = (v.stackPred & LMASK) | (c & TC_MASK); 1893 if (c == (c = compareAndExchangeCtl(c, nc | ac))) { 1894 if (v == null) 1895 createWorker(); 1896 else { 1897 v.phase = sp; 1898 if (v.parking != 0) 1899 U.unpark(v.owner); 1900 } 1901 break; 1902 } 1903 } 1904 } 1905 1906 /** 1907 * Releases all waiting workers. Called only during shutdown. 1908 */ 1909 private void releaseWaiters() { 1910 for (long c = ctl;;) { 1911 WorkQueue[] qs; WorkQueue v; int sp, i; 1912 if ((sp = (int)c) == 0 || (qs = queues) == null || 1913 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null) 1914 break; 1915 if (c == (c = compareAndExchangeCtl( 1916 c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | 1917 (v.stackPred & LMASK))))) { 1918 v.phase = sp; 1919 if (v.parking != 0) 1920 U.unpark(v.owner); 1921 } 1922 } 1923 } 1924 1925 /** 1926 * Internal version of isQuiescent and related functionality. 1927 * @return positive if stopping, nonnegative if terminating or all 1928 * workers are inactive and submission queues are empty and 1929 * unlocked; if so, setting STOP if shutdown is enabled 1930 */ 1931 private int quiescent() { 1932 for (;;) { 1933 long phaseSum = 0L; 1934 boolean swept = false; 1935 for (long e, prevRunState = 0L; ; prevRunState = e) { 1936 DelayScheduler ds; 1937 long c = ctl; 1938 if (((e = runState) & STOP) != 0L) 1939 return 1; // terminating 1940 else if ((c & RC_MASK) > 0L) 1941 return -1; // at least one active 1942 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) { 1943 long sum = c; 1944 WorkQueue[] qs = queues; 1945 int n = (qs == null) ? 0 : qs.length; 1946 for (int i = 0; i < n; ++i) { // scan queues 1947 WorkQueue q; 1948 if ((q = qs[i]) != null) { 1949 int p = q.phase, s = q.top, b = q.base; 1950 sum += (p & 0xffffffffL) | ((long)b << 32); 1951 if ((p & IDLE) == 0 || s - b > 0) 1952 return -1; 1953 } 1954 } 1955 swept = (phaseSum == (phaseSum = sum)); 1956 } 1957 else if ((e & SHUTDOWN) == 0) 1958 return 0; 1959 else if ((ds = delayScheduler) != null && !ds.canShutDown()) 1960 return 0; 1961 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) 1962 return 1; // enable termination 1963 else 1964 break; // restart 1965 } 1966 } 1967 } 1968 1969 /** 1970 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1971 * See above for explanation. 1972 * 1973 * @param w caller's WorkQueue (may be null on failed initialization) 1974 */ 1975 final void runWorker(WorkQueue w) { 1976 if (w != null) { 1977 int phase = w.phase, r = w.stackPred; // seed from registerWorker 1978 int fifo = w.config & FIFO, nsteals = 0, src = -1; 1979 for (;;) { 1980 WorkQueue[] qs; 1981 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 1982 if ((runState & STOP) != 0L || (qs = queues) == null) 1983 break; 1984 int n = qs.length, i = r, step = (r >>> 16) | 1; 1985 boolean rescan = false; 1986 scan: for (int l = n; l > 0; --l, i += step) { // scan queues 1987 int j, cap; WorkQueue q; ForkJoinTask<?>[] a; 1988 if ((q = qs[j = i & (n - 1)]) != null && 1989 (a = q.array) != null && (cap = a.length) > 0) { 1990 for (int m = cap - 1, pb = -1, b = q.base;;) { 1991 ForkJoinTask<?> t; long k; 1992 t = (ForkJoinTask<?>)U.getReferenceAcquire( 1993 a, k = slotOffset(m & b)); 1994 if (b != (b = q.base) || t == null || 1995 !U.compareAndSetReference(a, k, t, null)) { 1996 if (a[b & m] == null) { 1997 if (rescan) // end of run 1998 break scan; 1999 if (a[(b + 1) & m] == null && 2000 a[(b + 2) & m] == null) { 2001 break; // probably empty 2002 } 2003 if (pb == (pb = b)) { // track progress 2004 rescan = true; // stalled; reorder scan 2005 break scan; 2006 } 2007 } 2008 } 2009 else { 2010 boolean propagate; 2011 int nb = q.base = b + 1, prevSrc = src; 2012 w.nsteals = ++nsteals; 2013 w.source = src = j; // volatile 2014 rescan = true; 2015 int nh = t.noUserHelp(); 2016 if (propagate = 2017 (prevSrc != src || nh != 0) && a[nb & m] != null) 2018 signalWork(); 2019 w.topLevelExec(t, fifo); 2020 if ((b = q.base) != nb && !propagate) 2021 break scan; // reduce interference 2022 } 2023 } 2024 } 2025 } 2026 if (!rescan) { 2027 if (((phase = deactivate(w, phase)) & IDLE) != 0) 2028 break; 2029 src = -1; // re-enable propagation 2030 } 2031 } 2032 } 2033 } 2034 2035 /** 2036 * Deactivates and if necessary awaits signal or termination. 2037 * 2038 * @param w the worker 2039 * @param phase current phase 2040 * @return current phase, with IDLE set if worker should exit 2041 */ 2042 private int deactivate(WorkQueue w, int phase) { 2043 if (w == null) // currently impossible 2044 return IDLE; 2045 int p = phase | IDLE, activePhase = phase + (IDLE << 1); 2046 long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK); 2047 int sp = w.stackPred = (int)pc; // set ctl stack link 2048 w.phase = p; 2049 if (!compareAndSetCtl(pc, qc)) // try to enqueue 2050 return w.phase = phase; // back out on possible signal 2051 int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs; 2052 if (((e = runState) & STOP) != 0L || 2053 ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || 2054 (qs = queues) == null || (n = qs.length) <= 0) 2055 return IDLE; // terminating 2056 2057 for (int prechecks = Math.min(ac, 2), // reactivation threshold 2058 k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) { 2059 WorkQueue q; int cap; ForkJoinTask<?>[] a; long c; 2060 if (w.phase == activePhase) 2061 return activePhase; 2062 if (--k < 0) 2063 return awaitWork(w, p); // block, drop, or exit 2064 if ((q = qs[k & (n - 1)]) == null) 2065 Thread.onSpinWait(); 2066 else if ((a = q.array) != null && (cap = a.length) > 0 && 2067 a[q.base & (cap - 1)] != null && --prechecks < 0 && 2068 (int)(c = ctl) == activePhase && 2069 compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) 2070 return w.phase = activePhase; // reactivate 2071 } 2072 } 2073 2074 /** 2075 * Awaits signal or termination. 2076 * 2077 * @param w the work queue 2078 * @param p current phase (known to be idle) 2079 * @return current phase, with IDLE set if worker should exit 2080 */ 2081 private int awaitWork(WorkQueue w, int p) { 2082 if (w != null) { 2083 ForkJoinWorkerThread t; long deadline; 2084 if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null) 2085 t.resetThreadLocals(); // clear before reactivate 2086 if ((ctl & RC_MASK) > 0L) 2087 deadline = 0L; 2088 else if ((deadline = 2089 (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) + 2090 System.currentTimeMillis()) == 0L) 2091 deadline = 1L; // avoid zero 2092 int activePhase = p + IDLE; 2093 if ((p = w.phase) != activePhase && (runState & STOP) == 0L) { 2094 LockSupport.setCurrentBlocker(this); 2095 w.parking = 1; // enable unpark 2096 while ((p = w.phase) != activePhase) { 2097 boolean trimmable = false; int trim; 2098 Thread.interrupted(); // clear status 2099 if ((runState & STOP) != 0L) 2100 break; 2101 if (deadline != 0L) { 2102 if ((trim = tryTrim(w, p, deadline)) > 0) 2103 break; 2104 else if (trim < 0) 2105 deadline = 0L; 2106 else 2107 trimmable = true; 2108 } 2109 U.park(trimmable, deadline); 2110 } 2111 w.parking = 0; 2112 LockSupport.setCurrentBlocker(null); 2113 } 2114 } 2115 return p; 2116 } 2117 2118 /** 2119 * Tries to remove and deregister worker after timeout, and release 2120 * another to do the same. 2121 * @return > 0: trimmed, < 0 : not trimmable, else 0 2122 */ 2123 private int tryTrim(WorkQueue w, int phase, long deadline) { 2124 long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v; 2125 if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null) 2126 stat = -1; // no longer ctl top 2127 else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP) 2128 stat = 0; // spurious wakeup 2129 else if (!compareAndSetCtl( 2130 c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) | 2131 (TC_MASK & (c - TC_UNIT))))) 2132 stat = -1; // lost race to signaller 2133 else { 2134 stat = 1; 2135 w.source = DROPPED; 2136 w.phase = activePhase; 2137 if ((vp = (int)nc) != 0 && (vs = queues) != null && 2138 vs.length > (i = vp & SMASK) && (v = vs[i]) != null && 2139 compareAndSetCtl( // try to wake up next waiter 2140 nc, ((UMASK & (nc + RC_UNIT)) | 2141 (nc & TC_MASK) | (v.stackPred & LMASK)))) { 2142 v.source = INVALID_ID; // enable cascaded timeouts 2143 v.phase = vp; 2144 U.unpark(v.owner); 2145 } 2146 } 2147 return stat; 2148 } 2149 2150 /** 2151 * Scans for and returns a polled task, if available. Used only 2152 * for untracked polls. Begins scan at a random index to avoid 2153 * systematic unfairness. 2154 * 2155 * @param submissionsOnly if true, only scan submission queues 2156 */ 2157 private ForkJoinTask<?> pollScan(boolean submissionsOnly) { 2158 if ((runState & STOP) == 0L) { 2159 WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t; 2160 int r = ThreadLocalRandom.nextSecondarySeed(); 2161 if (submissionsOnly) // even indices only 2162 r &= ~1; 2163 int step = (submissionsOnly) ? 2 : 1; 2164 if ((qs = queues) != null && (n = qs.length) > 0) { 2165 for (int i = n; i > 0; i -= step, r += step) { 2166 if ((q = qs[r & (n - 1)]) != null && 2167 (t = q.poll()) != null) 2168 return t; 2169 } 2170 } 2171 } 2172 return null; 2173 } 2174 2175 /** 2176 * Tries to decrement counts (sometimes implicitly) and possibly 2177 * arrange for a compensating worker in preparation for 2178 * blocking. May fail due to interference, in which case -1 is 2179 * returned so caller may retry. A zero return value indicates 2180 * that the caller doesn't need to re-adjust counts when later 2181 * unblocked. 2182 * 2183 * @param c incoming ctl value 2184 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry 2185 */ 2186 private int tryCompensate(long c) { 2187 Predicate<? super ForkJoinPool> sat; 2188 long b = config; 2189 int pc = parallelism, // unpack fields 2190 minActive = (short)(b >>> RC_SHIFT), 2191 maxTotal = (short)(b >>> TC_SHIFT) + pc, 2192 active = (short)(c >>> RC_SHIFT), 2193 total = (short)(c >>> TC_SHIFT), 2194 sp = (int)c, 2195 stat = -1; // default retry return 2196 if (sp != 0 && active <= pc) { // activate idle worker 2197 WorkQueue[] qs; WorkQueue v; int i; 2198 if ((qs = queues) != null && qs.length > (i = sp & SMASK) && 2199 (v = qs[i]) != null && 2200 compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { 2201 v.phase = sp; 2202 if (v.parking != 0) 2203 U.unpark(v.owner); 2204 stat = UNCOMPENSATE; 2205 } 2206 } 2207 else if (active > minActive && total >= pc) { // reduce active workers 2208 if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) 2209 stat = UNCOMPENSATE; 2210 } 2211 else if (total < maxTotal && total < MAX_CAP) { // try to expand pool 2212 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 2213 if ((runState & STOP) != 0L) // terminating 2214 stat = 0; 2215 else if (compareAndSetCtl(c, nc)) 2216 stat = createWorker() ? UNCOMPENSATE : 0; 2217 } 2218 else if (!compareAndSetCtl(c, c)) // validate 2219 ; 2220 else if ((sat = saturate) != null && sat.test(this)) 2221 stat = 0; 2222 else 2223 throw new RejectedExecutionException( 2224 "Thread limit exceeded replacing blocked worker"); 2225 return stat; 2226 } 2227 2228 /** 2229 * Readjusts RC count; called from ForkJoinTask after blocking. 2230 */ 2231 final void uncompensate() { 2232 getAndAddCtl(RC_UNIT); 2233 } 2234 2235 /** 2236 * Helps if possible until the given task is done. Processes 2237 * compatible local tasks and scans other queues for task produced 2238 * by w's stealers; returning compensated blocking sentinel if 2239 * none are found. 2240 * 2241 * @param task the task 2242 * @param w caller's WorkQueue 2243 * @param internal true if w is owned by a ForkJoinWorkerThread 2244 * @return task status on exit, or UNCOMPENSATE for compensated blocking 2245 */ 2246 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) { 2247 if (w != null) 2248 w.tryRemoveAndExec(task, internal); 2249 int s = 0; 2250 if (task != null && (s = task.status) >= 0 && internal && w != null) { 2251 int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source; 2252 long sctl = 0L; // track stability 2253 outer: for (boolean rescan = true;;) { 2254 if ((s = task.status) < 0) 2255 break; 2256 if (!rescan) { 2257 if ((runState & STOP) != 0L) 2258 break; 2259 if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0) 2260 break; 2261 } 2262 rescan = false; 2263 WorkQueue[] qs = queues; 2264 int n = (qs == null) ? 0 : qs.length; 2265 scan: for (int l = n >>> 1; l > 0; --l, r += 2) { 2266 int j; WorkQueue q; 2267 if ((q = qs[j = r & SMASK & (n - 1)]) != null) { 2268 for (;;) { 2269 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 2270 boolean eligible = false; 2271 int sq = q.source, b, cap; long k; 2272 if ((a = q.array) == null || (cap = a.length) <= 0) 2273 break; 2274 t = (ForkJoinTask<?>)U.getReferenceAcquire( 2275 a, k = slotOffset((cap - 1) & (b = q.base))); 2276 if (t == task) 2277 eligible = true; 2278 else if (t != null) { // check steal chain 2279 for (int v = sq, d = cap;;) { 2280 WorkQueue p; 2281 if (v == wid) { 2282 eligible = true; 2283 break; 2284 } 2285 if ((v & 1) == 0 || // external or none 2286 --d < 0 || // bound depth 2287 (p = qs[v & (n - 1)]) == null) 2288 break; 2289 v = p.source; 2290 } 2291 } 2292 if ((s = task.status) < 0) 2293 break outer; // validate 2294 if (q.source == sq && q.base == b && 2295 U.getReference(a, k) == t) { 2296 if (!eligible) { // revisit if nonempty 2297 if (!rescan && t == null && q.top - b > 0) 2298 rescan = true; 2299 break; 2300 } 2301 if (U.compareAndSetReference(a, k, t, null)) { 2302 q.base = b + 1; 2303 w.source = j; // volatile write 2304 t.doExec(); 2305 w.source = wsrc; 2306 rescan = true; // restart at index r 2307 break scan; 2308 } 2309 } 2310 } 2311 } 2312 } 2313 } 2314 } 2315 return s; 2316 } 2317 2318 /** 2319 * Version of helpJoin for CountedCompleters. 2320 * 2321 * @param task root of computation (only called when a CountedCompleter) 2322 * @param w caller's WorkQueue 2323 * @param internal true if w is owned by a ForkJoinWorkerThread 2324 * @return task status on exit, or UNCOMPENSATE for compensated blocking 2325 */ 2326 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) { 2327 int s = 0; 2328 if (task != null && (s = task.status) >= 0 && w != null) { 2329 int r = w.phase + 1; // for indexing 2330 long sctl = 0L; // track stability 2331 outer: for (boolean rescan = true, locals = true;;) { 2332 if (locals && (s = w.helpComplete(task, internal, 0)) < 0) 2333 break; 2334 if ((s = task.status) < 0) 2335 break; 2336 if (!rescan) { 2337 if ((runState & STOP) != 0L) 2338 break; 2339 if (sctl == (sctl = ctl) && 2340 (!internal || (s = tryCompensate(sctl)) >= 0)) 2341 break; 2342 } 2343 rescan = locals = false; 2344 WorkQueue[] qs = queues; 2345 int n = (qs == null) ? 0 : qs.length; 2346 scan: for (int l = n; l > 0; --l, ++r) { 2347 int j; WorkQueue q; 2348 if ((q = qs[j = r & SMASK & (n - 1)]) != null) { 2349 for (;;) { 2350 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 2351 int b, cap, nb; long k; 2352 boolean eligible = false; 2353 if ((a = q.array) == null || (cap = a.length) <= 0) 2354 break; 2355 t = (ForkJoinTask<?>)U.getReferenceAcquire( 2356 a, k = slotOffset((cap - 1) & (b = q.base))); 2357 if (t instanceof CountedCompleter) { 2358 CountedCompleter<?> f = (CountedCompleter<?>)t; 2359 for (int steps = cap; steps > 0; --steps) { 2360 if (f == task) { 2361 eligible = true; 2362 break; 2363 } 2364 if ((f = f.completer) == null) 2365 break; 2366 } 2367 } 2368 if ((s = task.status) < 0) // validate 2369 break outer; 2370 if (q.base == b) { 2371 if (eligible) { 2372 if (U.compareAndSetReference( 2373 a, k, t, null)) { 2374 q.updateBase(b + 1); 2375 t.doExec(); 2376 locals = rescan = true; 2377 break scan; 2378 } 2379 } 2380 else if (U.getReference(a, k) == t) { 2381 if (!rescan && t == null && q.top - b > 0) 2382 rescan = true; // revisit 2383 break; 2384 } 2385 } 2386 } 2387 } 2388 } 2389 } 2390 } 2391 return s; 2392 } 2393 2394 /** 2395 * Runs tasks until all workers are inactive and no tasks are 2396 * found. Rather than blocking when tasks cannot be found, rescans 2397 * until all others cannot find tasks either. 2398 * 2399 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 2400 * @param interruptible true if return on interrupt 2401 * @return positive if quiescent, negative if interrupted, else 0 2402 */ 2403 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) { 2404 int phase; // w.phase inactive bit set when temporarily quiescent 2405 if (w == null || ((phase = w.phase) & IDLE) != 0) 2406 return 0; 2407 int wsrc = w.source; 2408 long startTime = System.nanoTime(); 2409 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos 2410 long prevSum = 0L; 2411 int activePhase = phase, inactivePhase = phase + IDLE; 2412 int r = phase + 1, waits = 0, returnStatus = 1; 2413 boolean locals = true; 2414 for (long e = runState;;) { 2415 if ((e & STOP) != 0L) 2416 break; // terminating 2417 if (interruptible && Thread.interrupted()) { 2418 returnStatus = -1; 2419 break; 2420 } 2421 if (locals) { // run local tasks before (re)polling 2422 locals = false; 2423 for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;) 2424 u.doExec(); 2425 } 2426 WorkQueue[] qs = queues; 2427 int n = (qs == null) ? 0 : qs.length; 2428 long phaseSum = 0L; 2429 boolean rescan = false, busy = false; 2430 scan: for (int l = n; l > 0; --l, ++r) { 2431 int j; WorkQueue q; 2432 if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) { 2433 for (;;) { 2434 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 2435 int b, cap; long k; 2436 if ((a = q.array) == null || (cap = a.length) <= 0) 2437 break; 2438 t = (ForkJoinTask<?>)U.getReferenceAcquire( 2439 a, k = slotOffset((cap - 1) & (b = q.base))); 2440 if (t != null && phase == inactivePhase) // reactivate 2441 w.phase = phase = activePhase; 2442 if (q.base == b && U.getReference(a, k) == t) { 2443 int nb = b + 1; 2444 if (t == null) { 2445 if (!rescan) { 2446 int qp = q.phase, mq = qp & (IDLE | 1); 2447 phaseSum += qp; 2448 if (mq == 0 || q.top - b > 0) 2449 rescan = true; 2450 else if (mq == 1) 2451 busy = true; 2452 } 2453 break; 2454 } 2455 if (U.compareAndSetReference(a, k, t, null)) { 2456 q.base = nb; 2457 w.source = j; // volatile write 2458 t.doExec(); 2459 w.source = wsrc; 2460 rescan = locals = true; 2461 break scan; 2462 } 2463 } 2464 } 2465 } 2466 } 2467 if (e != (e = runState) || prevSum != (prevSum = phaseSum) || 2468 rescan || (e & RS_LOCK) != 0L) 2469 ; // inconsistent 2470 else if (!busy) 2471 break; 2472 else if (phase == activePhase) { 2473 waits = 0; // recheck, then sleep 2474 w.phase = phase = inactivePhase; 2475 } 2476 else if (System.nanoTime() - startTime > nanos) { 2477 returnStatus = 0; // timed out 2478 break; 2479 } 2480 else if (waits == 0) // same as spinLockRunState except 2481 waits = MIN_SLEEP; // with rescan instead of onSpinWait 2482 else { 2483 LockSupport.parkNanos(this, (long)waits); 2484 if (waits < maxSleep) 2485 waits <<= 1; 2486 } 2487 } 2488 w.phase = activePhase; 2489 return returnStatus; 2490 } 2491 2492 /** 2493 * Helps quiesce from external caller until done, interrupted, or timeout 2494 * 2495 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 2496 * @param interruptible true if return on interrupt 2497 * @return positive if quiescent, negative if interrupted, else 0 2498 */ 2499 private int externalHelpQuiesce(long nanos, boolean interruptible) { 2500 if (quiescent() < 0) { 2501 long startTime = System.nanoTime(); 2502 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); 2503 for (int waits = 0;;) { 2504 ForkJoinTask<?> t; 2505 if (interruptible && Thread.interrupted()) 2506 return -1; 2507 else if ((t = pollScan(false)) != null) { 2508 waits = 0; 2509 t.doExec(); 2510 } 2511 else if (quiescent() >= 0) 2512 break; 2513 else if (System.nanoTime() - startTime > nanos) 2514 return 0; 2515 else if (waits == 0) 2516 waits = MIN_SLEEP; 2517 else { 2518 LockSupport.parkNanos(this, (long)waits); 2519 if (waits < maxSleep) 2520 waits <<= 1; 2521 } 2522 } 2523 } 2524 return 1; 2525 } 2526 2527 /** 2528 * Helps quiesce from either internal or external caller 2529 * 2530 * @param pool the pool to use, or null if any 2531 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 2532 * @param interruptible true if return on interrupt 2533 * @return positive if quiescent, negative if interrupted, else 0 2534 */ 2535 static final int helpQuiescePool(ForkJoinPool pool, long nanos, 2536 boolean interruptible) { 2537 Thread t; ForkJoinPool p; ForkJoinWorkerThread wt; 2538 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && 2539 (p = (wt = (ForkJoinWorkerThread)t).pool) != null && 2540 (p == pool || pool == null)) 2541 return p.helpQuiesce(wt.workQueue, nanos, interruptible); 2542 else if ((p = pool) != null || (p = common) != null) 2543 return p.externalHelpQuiesce(nanos, interruptible); 2544 else 2545 return 0; 2546 } 2547 2548 /** 2549 * Gets and removes a local or stolen task for the given worker. 2550 * 2551 * @return a task, if available 2552 */ 2553 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 2554 ForkJoinTask<?> t; 2555 if (w == null || (t = w.nextLocalTask()) == null) 2556 t = pollScan(false); 2557 return t; 2558 } 2559 2560 // External operations 2561 2562 /** 2563 * Finds and locks a WorkQueue for an external submitter, or 2564 * throws RejectedExecutionException if shutdown or terminating. 2565 * @param r current ThreadLocalRandom.getProbe() value 2566 * @param rejectOnShutdown true if RejectedExecutionException 2567 * should be thrown when shutdown (else only if terminating) 2568 */ 2569 private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) { 2570 int reuse; // nonzero if prefer create 2571 if ((reuse = r) == 0) { 2572 ThreadLocalRandom.localInit(); // initialize caller's probe 2573 r = ThreadLocalRandom.getProbe(); 2574 } 2575 for (int probes = 0; ; ++probes) { 2576 int n, i, id; WorkQueue[] qs; WorkQueue q; 2577 if ((qs = queues) == null) 2578 break; 2579 if ((n = qs.length) <= 0) 2580 break; 2581 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { 2582 WorkQueue w = new WorkQueue(null, id, 0, false); 2583 w.phase = id; 2584 boolean reject = ((lockRunState() & SHUTDOWN) != 0 && 2585 rejectOnShutdown); 2586 if (!reject && queues == qs && qs[i] == null) 2587 q = qs[i] = w; // else lost race to install 2588 unlockRunState(); 2589 if (q != null) 2590 return q; 2591 if (reject) 2592 break; 2593 reuse = 0; 2594 } 2595 if (reuse == 0 || !q.tryLockPhase()) { // move index 2596 if (reuse == 0) { 2597 if (probes >= n >> 1) 2598 reuse = r; // stop prefering free slot 2599 } 2600 else if (q != null) 2601 reuse = 0; // probe on collision 2602 r = ThreadLocalRandom.advanceProbe(r); 2603 } 2604 else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { 2605 q.unlockPhase(); // check while q lock held 2606 break; 2607 } 2608 else 2609 return q; 2610 } 2611 throw new RejectedExecutionException(); 2612 } 2613 2614 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) { 2615 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; 2616 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) && 2617 (wt = (ForkJoinWorkerThread)t).pool == this) { 2618 internal = true; 2619 q = wt.workQueue; 2620 } 2621 else { // find and lock queue 2622 internal = false; 2623 q = submissionQueue(ThreadLocalRandom.getProbe(), true); 2624 } 2625 q.push(task, signalIfEmpty ? this : null, internal); 2626 return task; 2627 } 2628 2629 /** 2630 * Returns queue for an external submission, bypassing call to 2631 * submissionQueue if already established and unlocked. 2632 */ 2633 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) { 2634 WorkQueue[] qs; WorkQueue q; int n; 2635 int r = ThreadLocalRandom.getProbe(); 2636 return (((qs = queues) != null && (n = qs.length) > 0 && 2637 (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && 2638 q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown)); 2639 } 2640 2641 /** 2642 * Returns queue for an external thread, if one exists that has 2643 * possibly ever submitted to the given pool (nonzero probe), or 2644 * null if none. 2645 */ 2646 static WorkQueue externalQueue(ForkJoinPool p) { 2647 WorkQueue[] qs; int n; 2648 int r = ThreadLocalRandom.getProbe(); 2649 return (p != null && (qs = p.queues) != null && 2650 (n = qs.length) > 0 && r != 0) ? 2651 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null; 2652 } 2653 2654 /** 2655 * Returns external queue for common pool. 2656 */ 2657 static WorkQueue commonQueue() { 2658 return externalQueue(common); 2659 } 2660 2661 /** 2662 * If the given executor is a ForkJoinPool, poll and execute 2663 * AsynchronousCompletionTasks from worker's queue until none are 2664 * available or blocker is released. 2665 */ 2666 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 2667 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; 2668 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2669 (wt = (ForkJoinWorkerThread)t).pool == e) 2670 w = wt.workQueue; 2671 else if (e instanceof ForkJoinPool) 2672 w = externalQueue((ForkJoinPool)e); 2673 if (w != null) 2674 w.helpAsyncBlocker(blocker); 2675 } 2676 2677 /** 2678 * Returns a cheap heuristic guide for task partitioning when 2679 * programmers, frameworks, tools, or languages have little or no 2680 * idea about task granularity. In essence, by offering this 2681 * method, we ask users only about tradeoffs in overhead vs 2682 * expected throughput and its variance, rather than how finely to 2683 * partition tasks. 2684 * 2685 * In a steady state strict (tree-structured) computation, each 2686 * thread makes available for stealing enough tasks for other 2687 * threads to remain active. Inductively, if all threads play by 2688 * the same rules, each thread should make available only a 2689 * constant number of tasks. 2690 * 2691 * The minimum useful constant is just 1. But using a value of 1 2692 * would require immediate replenishment upon each steal to 2693 * maintain enough tasks, which is infeasible. Further, 2694 * partitionings/granularities of offered tasks should minimize 2695 * steal rates, which in general means that threads nearer the top 2696 * of computation tree should generate more than those nearer the 2697 * bottom. In perfect steady state, each thread is at 2698 * approximately the same level of computation tree. However, 2699 * producing extra tasks amortizes the uncertainty of progress and 2700 * diffusion assumptions. 2701 * 2702 * So, users will want to use values larger (but not much larger) 2703 * than 1 to both smooth over transient shortages and hedge 2704 * against uneven progress; as traded off against the cost of 2705 * extra task overhead. We leave the user to pick a threshold 2706 * value to compare with the results of this call to guide 2707 * decisions, but recommend values such as 3. 2708 * 2709 * When all threads are active, it is on average OK to estimate 2710 * surplus strictly locally. In steady-state, if one thread is 2711 * maintaining say 2 surplus tasks, then so are others. So we can 2712 * just use estimated queue length. However, this strategy alone 2713 * leads to serious mis-estimates in some non-steady-state 2714 * conditions (ramp-up, ramp-down, other stalls). We can detect 2715 * many of these by further considering the number of "idle" 2716 * threads, that are known to have zero queued tasks, so 2717 * compensate by a factor of (#idle/#active) threads. 2718 */ 2719 static int getSurplusQueuedTaskCount() { 2720 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2721 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2722 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null && 2723 (q = wt.workQueue) != null) { 2724 int n = q.top - q.base; 2725 int p = pool.parallelism; 2726 int a = (short)(pool.ctl >>> RC_SHIFT); 2727 return n - (a > (p >>>= 1) ? 0 : 2728 a > (p >>>= 1) ? 1 : 2729 a > (p >>>= 1) ? 2 : 2730 a > (p >>>= 1) ? 4 : 2731 8); 2732 } 2733 return 0; 2734 } 2735 2736 // Termination 2737 2738 /** 2739 * Possibly initiates and/or completes pool termination. 2740 * 2741 * @param now if true, unconditionally terminate, else only 2742 * if no work and no active workers 2743 * @param enable if true, terminate when next possible 2744 * @return runState on exit 2745 */ 2746 private long tryTerminate(boolean now, boolean enable) { 2747 long e, isShutdown, ps; 2748 if (((e = runState) & TERMINATED) != 0L) 2749 now = false; 2750 else if ((e & STOP) != 0L) 2751 now = true; 2752 else if (now) { 2753 if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) { 2754 if ((ps & RS_LOCK) != 0L) { 2755 spinLockRunState(); // ensure queues array stable after stop 2756 unlockRunState(); 2757 } 2758 interruptAll(); 2759 } 2760 } 2761 else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) { 2762 long quiet; DelayScheduler ds; 2763 if (isShutdown == 0L) 2764 getAndBitwiseOrRunState(SHUTDOWN); 2765 if ((quiet = quiescent()) > 0) 2766 now = true; 2767 else if (quiet == 0 && (ds = delayScheduler) != null) 2768 ds.signal(); 2769 } 2770 2771 if (now) { 2772 DelayScheduler ds; 2773 releaseWaiters(); 2774 if ((ds = delayScheduler) != null) 2775 ds.signal(); 2776 for (;;) { 2777 if (((e = runState) & CLEANED) == 0L) { 2778 boolean clean = cleanQueues(); 2779 if (((e = runState) & CLEANED) == 0L && clean) 2780 e = getAndBitwiseOrRunState(CLEANED) | CLEANED; 2781 } 2782 if ((e & TERMINATED) != 0L) 2783 break; 2784 if (ctl != 0L) // else loop if didn't finish cleaning 2785 break; 2786 if ((ds = delayScheduler) != null && ds.signal() >= 0) 2787 break; 2788 if ((e & CLEANED) != 0L) { 2789 e |= TERMINATED; 2790 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { 2791 CountDownLatch done; SharedThreadContainer ctr; 2792 if ((done = termination) != null) 2793 done.countDown(); 2794 if ((ctr = container) != null) 2795 ctr.close(); 2796 } 2797 break; 2798 } 2799 } 2800 } 2801 return e; 2802 } 2803 2804 /** 2805 * Scans queues in a psuedorandom order based on thread id, 2806 * cancelling tasks until empty, or returning early upon 2807 * interference or still-active external queues, in which case 2808 * other calls will finish cancellation. 2809 * 2810 * @return true if all queues empty 2811 */ 2812 private boolean cleanQueues() { 2813 int r = (int)Thread.currentThread().threadId(); 2814 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 2815 int step = (r >>> 16) | 1; // randomize traversals 2816 WorkQueue[] qs = queues; 2817 int n = (qs == null) ? 0 : qs.length; 2818 for (int l = n; l > 0; --l, r += step) { 2819 WorkQueue q; ForkJoinTask<?>[] a; int cap; 2820 if ((q = qs[r & (n - 1)]) != null && 2821 (a = q.array) != null && (cap = a.length) > 0) { 2822 for (;;) { 2823 ForkJoinTask<?> t; int b; long k; 2824 t = (ForkJoinTask<?>)U.getReferenceAcquire( 2825 a, k = slotOffset((cap - 1) & (b = q.base))); 2826 if (q.base == b && t != null && 2827 U.compareAndSetReference(a, k, t, null)) { 2828 q.updateBase(b + 1); 2829 try { 2830 t.cancel(false); 2831 } catch (Throwable ignore) { 2832 } 2833 } 2834 else if ((q.phase & (IDLE|1)) == 0 || // externally locked 2835 q.top - q.base > 0) 2836 return false; // incomplete 2837 else 2838 break; 2839 } 2840 } 2841 } 2842 return true; 2843 } 2844 2845 /** 2846 * Interrupts all workers 2847 */ 2848 private void interruptAll() { 2849 Thread current = Thread.currentThread(); 2850 WorkQueue[] qs = queues; 2851 int n = (qs == null) ? 0 : qs.length; 2852 for (int i = 1; i < n; i += 2) { 2853 WorkQueue q; Thread o; 2854 if ((q = qs[i]) != null && (o = q.owner) != null && o != current) { 2855 try { 2856 o.interrupt(); 2857 } catch (Throwable ignore) { 2858 } 2859 } 2860 } 2861 } 2862 2863 /** 2864 * Returns termination signal, constructing if necessary 2865 */ 2866 private CountDownLatch terminationSignal() { 2867 CountDownLatch signal, s, u; 2868 if ((signal = termination) == null) 2869 signal = ((u = cmpExTerminationSignal( 2870 s = new CountDownLatch(1))) == null) ? s : u; 2871 return signal; 2872 } 2873 2874 // Exported methods 2875 2876 // Constructors 2877 2878 /** 2879 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2880 * java.lang.Runtime#availableProcessors}, using defaults for all 2881 * other parameters (see {@link #ForkJoinPool(int, 2882 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2883 * int, int, int, Predicate, long, TimeUnit)}). 2884 */ 2885 public ForkJoinPool() { 2886 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2887 defaultForkJoinWorkerThreadFactory, null, false, 2888 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2889 } 2890 2891 /** 2892 * Creates a {@code ForkJoinPool} with the indicated parallelism 2893 * level, using defaults for all other parameters (see {@link 2894 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2895 * UncaughtExceptionHandler, boolean, int, int, int, Predicate, 2896 * long, TimeUnit)}). 2897 * 2898 * @param parallelism the parallelism level 2899 * @throws IllegalArgumentException if parallelism less than or 2900 * equal to zero, or greater than implementation limit 2901 */ 2902 public ForkJoinPool(int parallelism) { 2903 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 2904 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2905 } 2906 2907 /** 2908 * Creates a {@code ForkJoinPool} with the given parameters (using 2909 * defaults for others -- see {@link #ForkJoinPool(int, 2910 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2911 * int, int, int, Predicate, long, TimeUnit)}). 2912 * 2913 * @param parallelism the parallelism level. For default value, 2914 * use {@link java.lang.Runtime#availableProcessors}. 2915 * @param factory the factory for creating new threads. For default value, 2916 * use {@link #defaultForkJoinWorkerThreadFactory}. 2917 * @param handler the handler for internal worker threads that 2918 * terminate due to unrecoverable errors encountered while executing 2919 * tasks. For default value, use {@code null}. 2920 * @param asyncMode if true, 2921 * establishes local first-in-first-out scheduling mode for forked 2922 * tasks that are never joined. This mode may be more appropriate 2923 * than default locally stack-based mode in applications in which 2924 * worker threads only process event-style asynchronous tasks. 2925 * For default value, use {@code false}. 2926 * @throws IllegalArgumentException if parallelism less than or 2927 * equal to zero, or greater than implementation limit 2928 * @throws NullPointerException if the factory is null 2929 */ 2930 public ForkJoinPool(int parallelism, 2931 ForkJoinWorkerThreadFactory factory, 2932 UncaughtExceptionHandler handler, 2933 boolean asyncMode) { 2934 this(parallelism, factory, handler, asyncMode, 2935 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2936 } 2937 2938 /** 2939 * Creates a {@code ForkJoinPool} with the given parameters. 2940 * 2941 * @param parallelism the parallelism level. For default value, 2942 * use {@link java.lang.Runtime#availableProcessors}. 2943 * 2944 * @param factory the factory for creating new threads. For 2945 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 2946 * 2947 * @param handler the handler for internal worker threads that 2948 * terminate due to unrecoverable errors encountered while 2949 * executing tasks. For default value, use {@code null}. 2950 * 2951 * @param asyncMode if true, establishes local first-in-first-out 2952 * scheduling mode for forked tasks that are never joined. This 2953 * mode may be more appropriate than default locally stack-based 2954 * mode in applications in which worker threads only process 2955 * event-style asynchronous tasks. For default value, use {@code 2956 * false}. 2957 * 2958 * @param corePoolSize ignored: used in previous releases of this 2959 * class but no longer applicable. Using {@code 0} maintains 2960 * compatibility across releases. 2961 * 2962 * @param maximumPoolSize the maximum number of threads allowed. 2963 * When the maximum is reached, attempts to replace blocked 2964 * threads fail. (However, because creation and termination of 2965 * different threads may overlap, and may be managed by the given 2966 * thread factory, this value may be transiently exceeded.) To 2967 * arrange the same value as is used by default for the common 2968 * pool, use {@code 256} plus the {@code parallelism} level. (By 2969 * default, the common pool allows a maximum of 256 spare 2970 * threads.) Using a value (for example {@code 2971 * Integer.MAX_VALUE}) larger than the implementation's total 2972 * thread limit has the same effect as using this limit (which is 2973 * the default). 2974 * 2975 * @param minimumRunnable the minimum allowed number of core 2976 * threads not blocked by a join or {@link ManagedBlocker}. To 2977 * ensure progress, when too few unblocked threads exist and 2978 * unexecuted tasks may exist, new threads are constructed, up to 2979 * the given maximumPoolSize. For the default value, use {@code 2980 * 1}, that ensures liveness. A larger value might improve 2981 * throughput in the presence of blocked activities, but might 2982 * not, due to increased overhead. A value of zero may be 2983 * acceptable when submitted tasks cannot have dependencies 2984 * requiring additional threads. 2985 * 2986 * @param saturate if non-null, a predicate invoked upon attempts 2987 * to create more than the maximum total allowed threads. By 2988 * default, when a thread is about to block on a join or {@link 2989 * ManagedBlocker}, but cannot be replaced because the 2990 * maximumPoolSize would be exceeded, a {@link 2991 * RejectedExecutionException} is thrown. But if this predicate 2992 * returns {@code true}, then no exception is thrown, so the pool 2993 * continues to operate with fewer than the target number of 2994 * runnable threads, which might not ensure progress. 2995 * 2996 * @param keepAliveTime the elapsed time since last use before 2997 * a thread is terminated (and then later replaced if needed). 2998 * For the default value, use {@code 60, TimeUnit.SECONDS}. 2999 * 3000 * @param unit the time unit for the {@code keepAliveTime} argument 3001 * 3002 * @throws IllegalArgumentException if parallelism is less than or 3003 * equal to zero, or is greater than implementation limit, 3004 * or if maximumPoolSize is less than parallelism, 3005 * of if the keepAliveTime is less than or equal to zero. 3006 * @throws NullPointerException if the factory is null 3007 * @since 9 3008 */ 3009 public ForkJoinPool(int parallelism, 3010 ForkJoinWorkerThreadFactory factory, 3011 UncaughtExceptionHandler handler, 3012 boolean asyncMode, 3013 int corePoolSize, 3014 int maximumPoolSize, 3015 int minimumRunnable, 3016 Predicate<? super ForkJoinPool> saturate, 3017 long keepAliveTime, 3018 TimeUnit unit) { 3019 int p = parallelism; 3020 if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L) 3021 throw new IllegalArgumentException(); 3022 if (factory == null || unit == null) 3023 throw new NullPointerException(); 3024 int size = Math.max(MIN_QUEUES_SIZE, 3025 1 << (33 - Integer.numberOfLeadingZeros(p - 1))); 3026 this.parallelism = p; 3027 this.factory = factory; 3028 this.ueh = handler; 3029 this.saturate = saturate; 3030 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); 3031 int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP); 3032 int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP); 3033 this.config = (((asyncMode ? FIFO : 0) & LMASK) | 3034 (((long)maxSpares) << TC_SHIFT) | 3035 (((long)minAvail) << RC_SHIFT)); 3036 this.queues = new WorkQueue[size]; 3037 String pid = Integer.toString(getAndAddPoolIds(1) + 1); 3038 String name = "ForkJoinPool-" + pid; 3039 this.poolName = name; 3040 this.workerNamePrefix = name + "-worker-"; 3041 this.container = SharedThreadContainer.create(name); 3042 } 3043 3044 /** 3045 * Constructor for common pool using parameters possibly 3046 * overridden by system properties 3047 */ 3048 private ForkJoinPool(byte forCommonPoolOnly) { 3049 String name = "ForkJoinPool.commonPool"; 3050 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory; 3051 UncaughtExceptionHandler handler = null; 3052 int maxSpares = DEFAULT_COMMON_MAX_SPARES; 3053 int pc = 0, preset = 0; // nonzero if size set as property 3054 try { // ignore exceptions in accessing/parsing properties 3055 String pp = System.getProperty 3056 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 3057 if (pp != null) { 3058 pc = Math.max(0, Integer.parseInt(pp)); 3059 preset = PRESET_SIZE; 3060 } 3061 String ms = System.getProperty 3062 ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); 3063 if (ms != null) 3064 maxSpares = Math.clamp(Integer.parseInt(ms), 0, MAX_CAP); 3065 String sf = System.getProperty 3066 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 3067 String sh = System.getProperty 3068 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 3069 if (sf != null || sh != null) { 3070 ClassLoader ldr = ClassLoader.getSystemClassLoader(); 3071 if (sf != null) 3072 fac = (ForkJoinWorkerThreadFactory) 3073 ldr.loadClass(sf).getConstructor().newInstance(); 3074 if (sh != null) 3075 handler = (UncaughtExceptionHandler) 3076 ldr.loadClass(sh).getConstructor().newInstance(); 3077 } 3078 } catch (Exception ignore) { 3079 } 3080 if (preset == 0) 3081 pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); 3082 int p = Math.min(pc, MAX_CAP); 3083 int size = Math.max(MIN_QUEUES_SIZE, 3084 (p == 0) ? 1 : 3085 1 << (33 - Integer.numberOfLeadingZeros(p-1))); 3086 this.parallelism = p; 3087 this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) | 3088 (1L << RC_SHIFT)); 3089 this.factory = fac; 3090 this.ueh = handler; 3091 this.keepAlive = DEFAULT_KEEPALIVE; 3092 this.saturate = null; 3093 this.workerNamePrefix = null; 3094 this.poolName = name; 3095 this.queues = new WorkQueue[size]; 3096 this.container = SharedThreadContainer.create(name); 3097 } 3098 3099 /** 3100 * Returns the common pool instance. This pool is statically 3101 * constructed; its run state is unaffected by attempts to {@link 3102 * #shutdown} or {@link #shutdownNow}. However this pool and any 3103 * ongoing processing are automatically terminated upon program 3104 * {@link System#exit}. Any program that relies on asynchronous 3105 * task processing to complete before program termination should 3106 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 3107 * before exit. 3108 * 3109 * @return the common pool instance 3110 * @since 1.8 3111 */ 3112 public static ForkJoinPool commonPool() { 3113 // assert common != null : "static init error"; 3114 return common; 3115 } 3116 3117 /** 3118 * Package-private access to commonPool overriding zero parallelism 3119 */ 3120 static ForkJoinPool asyncCommonPool() { 3121 ForkJoinPool cp; int p; 3122 if ((p = (cp = common).parallelism) == 0) 3123 U.compareAndSetInt(cp, PARALLELISM, 0, 2); 3124 return cp; 3125 } 3126 3127 // Execution methods 3128 3129 /** 3130 * Performs the given task, returning its result upon completion. 3131 * If the computation encounters an unchecked Exception or Error, 3132 * it is rethrown as the outcome of this invocation. Rethrown 3133 * exceptions behave in the same way as regular exceptions, but, 3134 * when possible, contain stack traces (as displayed for example 3135 * using {@code ex.printStackTrace()}) of both the current thread 3136 * as well as the thread actually encountering the exception; 3137 * minimally only the latter. 3138 * 3139 * @param task the task 3140 * @param <T> the type of the task's result 3141 * @return the task's result 3142 * @throws NullPointerException if the task is null 3143 * @throws RejectedExecutionException if the task cannot be 3144 * scheduled for execution 3145 */ 3146 public <T> T invoke(ForkJoinTask<T> task) { 3147 poolSubmit(true, Objects.requireNonNull(task)); 3148 try { 3149 return task.join(); 3150 } catch (RuntimeException | Error unchecked) { 3151 throw unchecked; 3152 } catch (Exception checked) { 3153 throw new RuntimeException(checked); 3154 } 3155 } 3156 3157 /** 3158 * Arranges for (asynchronous) execution of the given task. 3159 * 3160 * @param task the task 3161 * @throws NullPointerException if the task is null 3162 * @throws RejectedExecutionException if the task cannot be 3163 * scheduled for execution 3164 */ 3165 public void execute(ForkJoinTask<?> task) { 3166 poolSubmit(true, Objects.requireNonNull(task)); 3167 } 3168 3169 // AbstractExecutorService methods 3170 3171 /** 3172 * @throws NullPointerException if the task is null 3173 * @throws RejectedExecutionException if the task cannot be 3174 * scheduled for execution 3175 */ 3176 @Override 3177 @SuppressWarnings("unchecked") 3178 public void execute(Runnable task) { 3179 poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask<?>) 3180 ? (ForkJoinTask<Void>) task // avoid re-wrap 3181 : new ForkJoinTask.RunnableExecuteAction(task)); 3182 } 3183 3184 /** 3185 * Submits a ForkJoinTask for execution. 3186 * 3187 * @implSpec 3188 * This method is equivalent to {@link #externalSubmit(ForkJoinTask)} 3189 * when called from a thread that is not in this pool. 3190 * 3191 * @param task the task to submit 3192 * @param <T> the type of the task's result 3193 * @return the task 3194 * @throws NullPointerException if the task is null 3195 * @throws RejectedExecutionException if the task cannot be 3196 * scheduled for execution 3197 */ 3198 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 3199 return poolSubmit(true, Objects.requireNonNull(task)); 3200 } 3201 3202 /** 3203 * @throws NullPointerException if the task is null 3204 * @throws RejectedExecutionException if the task cannot be 3205 * scheduled for execution 3206 */ 3207 @Override 3208 public <T> ForkJoinTask<T> submit(Callable<T> task) { 3209 Objects.requireNonNull(task); 3210 return poolSubmit( 3211 true, 3212 (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3213 new ForkJoinTask.AdaptedCallable<T>(task) : 3214 new ForkJoinTask.AdaptedInterruptibleCallable<T>(task)); 3215 } 3216 3217 /** 3218 * @throws NullPointerException if the task is null 3219 * @throws RejectedExecutionException if the task cannot be 3220 * scheduled for execution 3221 */ 3222 @Override 3223 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 3224 Objects.requireNonNull(task); 3225 return poolSubmit( 3226 true, 3227 (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3228 new ForkJoinTask.AdaptedRunnable<T>(task, result) : 3229 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result)); 3230 } 3231 3232 /** 3233 * @throws NullPointerException if the task is null 3234 * @throws RejectedExecutionException if the task cannot be 3235 * scheduled for execution 3236 */ 3237 @Override 3238 @SuppressWarnings("unchecked") 3239 public ForkJoinTask<?> submit(Runnable task) { 3240 Objects.requireNonNull(task); 3241 return poolSubmit( 3242 true, 3243 (task instanceof ForkJoinTask<?>) ? 3244 (ForkJoinTask<Void>) task : // avoid re-wrap 3245 ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3246 new ForkJoinTask.AdaptedRunnable<Void>(task, null) : 3247 new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null))); 3248 } 3249 3250 /** 3251 * Submits the given task as if submitted from a non-{@code ForkJoinTask} 3252 * client. The task is added to a scheduling queue for submissions to the 3253 * pool even when called from a thread in the pool. 3254 * 3255 * @implSpec 3256 * This method is equivalent to {@link #submit(ForkJoinTask)} when called 3257 * from a thread that is not in this pool. 3258 * 3259 * @return the task 3260 * @param task the task to submit 3261 * @param <T> the type of the task's result 3262 * @throws NullPointerException if the task is null 3263 * @throws RejectedExecutionException if the task cannot be 3264 * scheduled for execution 3265 * @since 20 3266 */ 3267 public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { 3268 Objects.requireNonNull(task); 3269 externalSubmissionQueue(true).push(task, this, false); 3270 return task; 3271 } 3272 3273 /** 3274 * Submits the given task without guaranteeing that it will 3275 * eventually execute in the absence of available active threads. 3276 * In some contexts, this method may reduce contention and 3277 * overhead by relying on context-specific knowledge that existing 3278 * threads (possibly including the calling thread if operating in 3279 * this pool) will eventually be available to execute the task. 3280 * 3281 * @param task the task 3282 * @param <T> the type of the task's result 3283 * @return the task 3284 * @throws NullPointerException if the task is null 3285 * @throws RejectedExecutionException if the task cannot be 3286 * scheduled for execution 3287 * @since 19 3288 */ 3289 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) { 3290 return poolSubmit(false, Objects.requireNonNull(task)); 3291 } 3292 3293 /** 3294 * Changes the target parallelism of this pool, controlling the 3295 * future creation, use, and termination of worker threads. 3296 * Applications include contexts in which the number of available 3297 * processors changes over time. 3298 * 3299 * @implNote This implementation restricts the maximum number of 3300 * running threads to 32767 3301 * 3302 * @param size the target parallelism level 3303 * @return the previous parallelism level. 3304 * @throws IllegalArgumentException if size is less than 1 or 3305 * greater than the maximum supported by this pool. 3306 * @throws UnsupportedOperationException this is the{@link 3307 * #commonPool()} and parallelism level was set by System 3308 * property {@systemProperty 3309 * java.util.concurrent.ForkJoinPool.common.parallelism}. 3310 * @since 19 3311 */ 3312 public int setParallelism(int size) { 3313 if (size < 1 || size > MAX_CAP) 3314 throw new IllegalArgumentException(); 3315 if ((config & PRESET_SIZE) != 0) 3316 throw new UnsupportedOperationException("Cannot override System property"); 3317 return getAndSetParallelism(size); 3318 } 3319 3320 /** 3321 * Uninterrupible version of {@code invokeAll}. Executes the given 3322 * tasks, returning a list of Futures holding their status and 3323 * results when all complete, ignoring interrupts. {@link 3324 * Future#isDone} is {@code true} for each element of the returned 3325 * list. Note that a <em>completed</em> task could have 3326 * terminated either normally or by throwing an exception. The 3327 * results of this method are undefined if the given collection is 3328 * modified while this operation is in progress. 3329 * 3330 * @apiNote This method supports usages that previously relied on an 3331 * incompatible override of 3332 * {@link ExecutorService#invokeAll(java.util.Collection)}. 3333 * 3334 * @param tasks the collection of tasks 3335 * @param <T> the type of the values returned from the tasks 3336 * @return a list of Futures representing the tasks, in the same 3337 * sequential order as produced by the iterator for the 3338 * given task list, each of which has completed 3339 * @throws NullPointerException if tasks or any of its elements are {@code null} 3340 * @throws RejectedExecutionException if any task cannot be 3341 * scheduled for execution 3342 * @since 22 3343 */ 3344 public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) { 3345 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 3346 try { 3347 for (Callable<T> t : tasks) { 3348 ForkJoinTask<T> f = ForkJoinTask.adapt(t); 3349 futures.add(f); 3350 poolSubmit(true, f); 3351 } 3352 for (int i = futures.size() - 1; i >= 0; --i) 3353 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 3354 return futures; 3355 } catch (Throwable t) { 3356 for (Future<T> e : futures) 3357 e.cancel(true); 3358 throw t; 3359 } 3360 } 3361 3362 /** 3363 * Common support for timed and untimed invokeAll 3364 */ 3365 private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 3366 long deadline) 3367 throws InterruptedException { 3368 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 3369 try { 3370 for (Callable<T> t : tasks) { 3371 ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t); 3372 futures.add(f); 3373 poolSubmit(true, f); 3374 } 3375 for (int i = futures.size() - 1; i >= 0; --i) 3376 ((ForkJoinTask<?>)futures.get(i)) 3377 .quietlyJoinPoolInvokeAllTask(deadline); 3378 return futures; 3379 } catch (Throwable t) { 3380 for (Future<T> e : futures) 3381 e.cancel(true); 3382 throw t; 3383 } 3384 } 3385 3386 @Override 3387 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 3388 throws InterruptedException { 3389 return invokeAll(tasks, 0L); 3390 } 3391 // for jdk version < 22, replace with 3392 // /** 3393 // * @throws NullPointerException {@inheritDoc} 3394 // * @throws RejectedExecutionException {@inheritDoc} 3395 // */ 3396 // @Override 3397 // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 3398 // return invokeAllUninterruptibly(tasks); 3399 // } 3400 3401 @Override 3402 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 3403 long timeout, TimeUnit unit) 3404 throws InterruptedException { 3405 return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L); 3406 } 3407 3408 @Override 3409 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 3410 throws InterruptedException, ExecutionException { 3411 try { 3412 return new ForkJoinTask.InvokeAnyRoot<T>() 3413 .invokeAny(tasks, this, false, 0L); 3414 } catch (TimeoutException cannotHappen) { 3415 assert false; 3416 return null; 3417 } 3418 } 3419 3420 @Override 3421 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 3422 long timeout, TimeUnit unit) 3423 throws InterruptedException, ExecutionException, TimeoutException { 3424 return new ForkJoinTask.InvokeAnyRoot<T>() 3425 .invokeAny(tasks, this, true, unit.toNanos(timeout)); 3426 } 3427 3428 // Support for delayed tasks 3429 3430 /** 3431 * Returns STOP and SHUTDOWN status (zero if neither), masking or 3432 * truncating out other bits. 3433 */ 3434 final int shutdownStatus(DelayScheduler ds) { 3435 return (int)(runState & (SHUTDOWN | STOP)); 3436 } 3437 3438 /** 3439 * Tries to stop and possibly terminate if already enabled, return success. 3440 */ 3441 final boolean tryStopIfShutdown(DelayScheduler ds) { 3442 return (tryTerminate(false, false) & STOP) != 0L; 3443 } 3444 3445 /** 3446 * Creates and starts DelayScheduler 3447 */ 3448 private DelayScheduler startDelayScheduler() { 3449 DelayScheduler ds; 3450 if ((ds = delayScheduler) == null) { 3451 boolean start = false; 3452 String name = poolName + "-delayScheduler"; 3453 if (workerNamePrefix == null) 3454 asyncCommonPool(); // override common parallelism zero 3455 lockRunState(); 3456 try { 3457 if ((ds = delayScheduler) == null) { 3458 ds = delayScheduler = new DelayScheduler(this, name); 3459 start = true; 3460 } 3461 } finally { 3462 unlockRunState(); 3463 } 3464 if (start) { // start outside of lock 3465 // exceptions on start passed to (external) callers 3466 SharedThreadContainer ctr; 3467 if ((ctr = container) != null) 3468 ctr.start(ds); 3469 else 3470 ds.start(); 3471 } 3472 } 3473 return ds; 3474 } 3475 3476 /** 3477 * Arranges execution of a ScheduledForkJoinTask whose delay has 3478 * elapsed 3479 */ 3480 final void executeEnabledScheduledTask(ScheduledForkJoinTask<?> task) { 3481 externalSubmissionQueue(false).push(task, this, false); 3482 } 3483 3484 /** 3485 * Arranges delayed execution of a ScheduledForkJoinTask via the 3486 * DelayScheduler, creating and starting it if necessary. 3487 * @return the task 3488 */ 3489 final <T> ScheduledForkJoinTask<T> scheduleDelayedTask(ScheduledForkJoinTask<T> task) { 3490 DelayScheduler ds; 3491 if (((ds = delayScheduler) == null && 3492 (ds = startDelayScheduler()) == null) || 3493 (runState & SHUTDOWN) != 0L) 3494 throw new RejectedExecutionException(); 3495 ds.pend(task); 3496 return task; 3497 } 3498 3499 /** 3500 * Submits a one-shot task that becomes enabled after the given 3501 * delay. At that point it will execute unless explicitly 3502 * cancelled, or fail to execute (eventually reporting 3503 * cancellation) when encountering resource exhaustion, or the 3504 * pool is {@link #shutdownNow}, or is {@link #shutdown} when 3505 * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown} 3506 * is in effect. 3507 * 3508 * @param command the task to execute 3509 * @param delay the time from now to delay execution 3510 * @param unit the time unit of the delay parameter 3511 * @return a ForkJoinTask implementing the ScheduledFuture 3512 * interface, whose {@code get()} method will return 3513 * {@code null} upon normal completion. 3514 * @throws RejectedExecutionException if the pool is shutdown or 3515 * submission encounters resource exhaustion. 3516 * @throws NullPointerException if command or unit is null 3517 * @since 25 3518 */ 3519 public ScheduledFuture<?> schedule(Runnable command, 3520 long delay, TimeUnit unit) { 3521 return scheduleDelayedTask( 3522 new ScheduledForkJoinTask<Void>( 3523 unit.toNanos(delay), 0L, false, // implicit null check of unit 3524 Objects.requireNonNull(command), null, this)); 3525 } 3526 3527 /** 3528 * Submits a value-returning one-shot task that becomes enabled 3529 * after the given delay. At that point it will execute unless 3530 * explicitly cancelled, or fail to execute (eventually reporting 3531 * cancellation) when encountering resource exhaustion, or the 3532 * pool is {@link #shutdownNow}, or is {@link #shutdown} when 3533 * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown} 3534 * is in effect. 3535 * 3536 * @param callable the function to execute 3537 * @param delay the time from now to delay execution 3538 * @param unit the time unit of the delay parameter 3539 * @param <V> the type of the callable's result 3540 * @return a ForkJoinTask implementing the ScheduledFuture 3541 * interface, whose {@code get()} method will return the 3542 * value from the callable upon normal completion. 3543 * @throws RejectedExecutionException if the pool is shutdown or 3544 * submission encounters resource exhaustion. 3545 * @throws NullPointerException if command or unit is null 3546 * @since 25 3547 */ 3548 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 3549 long delay, TimeUnit unit) { 3550 return scheduleDelayedTask( 3551 new ScheduledForkJoinTask<V>( 3552 unit.toNanos(delay), 0L, false, null, // implicit null check of unit 3553 Objects.requireNonNull(callable), this)); 3554 } 3555 3556 /** 3557 * Submits a periodic action that becomes enabled first after the 3558 * given initial delay, and subsequently with the given period; 3559 * that is, executions will commence after 3560 * {@code initialDelay}, then {@code initialDelay + period}, then 3561 * {@code initialDelay + 2 * period}, and so on. 3562 * 3563 * <p>The sequence of task executions continues indefinitely until 3564 * one of the following exceptional completions occur: 3565 * <ul> 3566 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 3567 * <li>Method {@link #shutdownNow} is called 3568 * <li>Method {@link #shutdown} is called and the pool is 3569 * otherwise quiescent, in which case existing executions continue 3570 * but subsequent executions do not. 3571 * <li>An execution or the task encounters resource exhaustion. 3572 * <li>An execution of the task throws an exception. In this case 3573 * calling {@link Future#get() get} on the returned future will throw 3574 * {@link ExecutionException}, holding the exception as its cause. 3575 * </ul> 3576 * Subsequent executions are suppressed. Subsequent calls to 3577 * {@link Future#isDone isDone()} on the returned future will 3578 * return {@code true}. 3579 * 3580 * <p>If any execution of this task takes longer than its period, then 3581 * subsequent executions may start late, but will not concurrently 3582 * execute. 3583 * @param command the task to execute 3584 * @param initialDelay the time to delay first execution 3585 * @param period the period between successive executions 3586 * @param unit the time unit of the initialDelay and period parameters 3587 * @return a ForkJoinTask implementing the ScheduledFuture 3588 * interface. The future's {@link Future#get() get()} 3589 * method will never return normally, and will throw an 3590 * exception upon task cancellation or abnormal 3591 * termination of a task execution. 3592 * @throws RejectedExecutionException if the pool is shutdown or 3593 * submission encounters resource exhaustion. 3594 * @throws NullPointerException if command or unit is null 3595 * @throws IllegalArgumentException if period less than or equal to zero 3596 * @since 25 3597 */ 3598 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 3599 long initialDelay, 3600 long period, TimeUnit unit) { 3601 if (period <= 0L) 3602 throw new IllegalArgumentException(); 3603 return scheduleDelayedTask( 3604 new ScheduledForkJoinTask<Void>( 3605 unit.toNanos(initialDelay), // implicit null check of unit 3606 unit.toNanos(period), false, 3607 Objects.requireNonNull(command), null, this)); 3608 } 3609 3610 /** 3611 * Submits a periodic action that becomes enabled first after the 3612 * given initial delay, and subsequently with the given delay 3613 * between the termination of one execution and the commencement of 3614 * the next. 3615 * <p>The sequence of task executions continues indefinitely until 3616 * one of the following exceptional completions occur: 3617 * <ul> 3618 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 3619 * <li>Method {@link #shutdownNow} is called 3620 * <li>Method {@link #shutdown} is called and the pool is 3621 * otherwise quiescent, in which case existing executions continue 3622 * but subsequent executions do not. 3623 * <li>An execution or the task encounters resource exhaustion. 3624 * <li>An execution of the task throws an exception. In this case 3625 * calling {@link Future#get() get} on the returned future will throw 3626 * {@link ExecutionException}, holding the exception as its cause. 3627 * </ul> 3628 * Subsequent executions are suppressed. Subsequent calls to 3629 * {@link Future#isDone isDone()} on the returned future will 3630 * return {@code true}. 3631 * @param command the task to execute 3632 * @param initialDelay the time to delay first execution 3633 * @param delay the delay between the termination of one 3634 * execution and the commencement of the next 3635 * @param unit the time unit of the initialDelay and delay parameters 3636 * @return a ForkJoinTask implementing the ScheduledFuture 3637 * interface. The future's {@link Future#get() get()} 3638 * method will never return normally, and will throw an 3639 * exception upon task cancellation or abnormal 3640 * termination of a task execution. 3641 * @throws RejectedExecutionException if the pool is shutdown or 3642 * submission encounters resource exhaustion. 3643 * @throws NullPointerException if command or unit is null 3644 * @throws IllegalArgumentException if delay less than or equal to zero 3645 * @since 25 3646 */ 3647 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 3648 long initialDelay, 3649 long delay, TimeUnit unit) { 3650 if (delay <= 0L) 3651 throw new IllegalArgumentException(); 3652 return scheduleDelayedTask( 3653 new ScheduledForkJoinTask<Void>( 3654 unit.toNanos(initialDelay), // implicit null check of unit 3655 -unit.toNanos(delay), false, // negative for fixed delay 3656 Objects.requireNonNull(command), null, this)); 3657 } 3658 3659 /** 3660 * Body of a task performed on timeout of another task 3661 */ 3662 static final class TimeoutAction<V> implements Runnable { 3663 // set after construction, nulled after use 3664 ForkJoinTask.CallableWithTimeout<V> task; 3665 Consumer<? super ForkJoinTask<V>> action; 3666 TimeoutAction(Consumer<? super ForkJoinTask<V>> action) { 3667 this.action = action; 3668 } 3669 public void run() { 3670 ForkJoinTask.CallableWithTimeout<V> t = task; 3671 Consumer<? super ForkJoinTask<V>> a = action; 3672 task = null; 3673 action = null; 3674 if (t != null && t.status >= 0) { 3675 if (a == null) 3676 t.cancel(true); 3677 else { 3678 a.accept(t); 3679 t.interruptIfRunning(true); 3680 } 3681 } 3682 } 3683 } 3684 3685 /** 3686 * Submits a task executing the given function, cancelling the 3687 * task or performing a given timeoutAction if not completed 3688 * within the given timeout period. If the optional {@code 3689 * timeoutAction} is null, the task is cancelled (via {@code 3690 * cancel(true)}. Otherwise, the action is applied and the task 3691 * may be interrupted if running. Actions may include {@link 3692 * ForkJoinTask#complete} to set a replacement value or {@link 3693 * ForkJoinTask#completeExceptionally} to throw an appropriate 3694 * exception. Note that these can succeed only if the task has 3695 * not already completed when the timeoutAction executes. 3696 * 3697 * @param callable the function to execute 3698 * @param <V> the type of the callable's result 3699 * @param timeout the time to wait before cancelling if not completed 3700 * @param timeoutAction if nonnull, an action to perform on 3701 * timeout, otherwise the default action is to cancel using 3702 * {@code cancel(true)}. 3703 * @param unit the time unit of the timeout parameter 3704 * @return a Future that can be used to extract result or cancel 3705 * @throws RejectedExecutionException if the task cannot be 3706 * scheduled for execution 3707 * @throws NullPointerException if callable or unit is null 3708 * @since 25 3709 */ 3710 public <V> ForkJoinTask<V> submitWithTimeout(Callable<V> callable, 3711 long timeout, TimeUnit unit, 3712 Consumer<? super ForkJoinTask<V>> timeoutAction) { 3713 ForkJoinTask.CallableWithTimeout<V> task; TimeoutAction<V> onTimeout; 3714 Objects.requireNonNull(callable); 3715 ScheduledForkJoinTask<Void> timeoutTask = 3716 new ScheduledForkJoinTask<Void>( 3717 unit.toNanos(timeout), 0L, true, 3718 onTimeout = new TimeoutAction<V>(timeoutAction), null, this); 3719 onTimeout.task = task = 3720 new ForkJoinTask.CallableWithTimeout<V>(callable, timeoutTask); 3721 scheduleDelayedTask(timeoutTask); 3722 return poolSubmit(true, task); 3723 } 3724 3725 /** 3726 * Arranges that scheduled tasks that are not executing and have 3727 * not already been enabled for execution will not be executed and 3728 * will be cancelled upon {@link #shutdown} (unless this pool is 3729 * the {@link #commonPool()} which never shuts down). This method 3730 * may be invoked either before {@link #shutdown} to take effect 3731 * upon the next call, or afterwards to cancel such tasks, which 3732 * may then allow termination. Note that subsequent executions of 3733 * periodic tasks are always disabled upon shutdown, so this 3734 * method applies meaningfully only to non-periodic tasks. 3735 * @since 25 3736 */ 3737 public void cancelDelayedTasksOnShutdown() { 3738 DelayScheduler ds; 3739 if ((ds = delayScheduler) != null || 3740 (ds = startDelayScheduler()) != null) 3741 ds.cancelDelayedTasksOnShutdown(); 3742 } 3743 3744 /** 3745 * Returns the factory used for constructing new workers. 3746 * 3747 * @return the factory used for constructing new workers 3748 */ 3749 public ForkJoinWorkerThreadFactory getFactory() { 3750 return factory; 3751 } 3752 3753 /** 3754 * Returns the handler for internal worker threads that terminate 3755 * due to unrecoverable errors encountered while executing tasks. 3756 * 3757 * @return the handler, or {@code null} if none 3758 */ 3759 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 3760 return ueh; 3761 } 3762 3763 /** 3764 * Returns the targeted parallelism level of this pool. 3765 * 3766 * @return the targeted parallelism level of this pool 3767 */ 3768 public int getParallelism() { 3769 return Math.max(getParallelismOpaque(), 1); 3770 } 3771 3772 /** 3773 * Returns the targeted parallelism level of the common pool. 3774 * 3775 * @return the targeted parallelism level of the common pool 3776 * @since 1.8 3777 */ 3778 public static int getCommonPoolParallelism() { 3779 return common.getParallelism(); 3780 } 3781 3782 /** 3783 * Returns the number of worker threads that have started but not 3784 * yet terminated. The result returned by this method may differ 3785 * from {@link #getParallelism} when threads are created to 3786 * maintain parallelism when others are cooperatively blocked. 3787 * 3788 * @return the number of worker threads 3789 */ 3790 public int getPoolSize() { 3791 return (short)(ctl >>> TC_SHIFT); 3792 } 3793 3794 /** 3795 * Returns {@code true} if this pool uses local first-in-first-out 3796 * scheduling mode for forked tasks that are never joined. 3797 * 3798 * @return {@code true} if this pool uses async mode 3799 */ 3800 public boolean getAsyncMode() { 3801 return (config & FIFO) != 0; 3802 } 3803 3804 /** 3805 * Returns an estimate of the number of worker threads that are 3806 * not blocked waiting to join tasks or for other managed 3807 * synchronization. This method may overestimate the 3808 * number of running threads. 3809 * 3810 * @return the number of worker threads 3811 */ 3812 public int getRunningThreadCount() { 3813 WorkQueue[] qs; WorkQueue q; 3814 int rc = 0; 3815 if ((runState & TERMINATED) == 0L && (qs = queues) != null) { 3816 for (int i = 1; i < qs.length; i += 2) { 3817 if ((q = qs[i]) != null && q.isApparentlyUnblocked()) 3818 ++rc; 3819 } 3820 } 3821 return rc; 3822 } 3823 3824 /** 3825 * Returns an estimate of the number of threads that are currently 3826 * stealing or executing tasks. This method may overestimate the 3827 * number of active threads. 3828 * 3829 * @return the number of active threads 3830 */ 3831 public int getActiveThreadCount() { 3832 return Math.max((short)(ctl >>> RC_SHIFT), 0); 3833 } 3834 3835 /** 3836 * Returns {@code true} if all worker threads are currently idle. 3837 * An idle worker is one that cannot obtain a task to execute 3838 * because none are available to steal from other threads, and 3839 * there are no pending submissions to the pool. This method is 3840 * conservative; it might not return {@code true} immediately upon 3841 * idleness of all threads, but will eventually become true if 3842 * threads remain inactive. 3843 * 3844 * @return {@code true} if all threads are currently idle 3845 */ 3846 public boolean isQuiescent() { 3847 return quiescent() >= 0; 3848 } 3849 3850 /** 3851 * Returns an estimate of the total number of completed tasks that 3852 * were executed by a thread other than their submitter. The 3853 * reported value underestimates the actual total number of steals 3854 * when the pool is not quiescent. This value may be useful for 3855 * monitoring and tuning fork/join programs: in general, steal 3856 * counts should be high enough to keep threads busy, but low 3857 * enough to avoid overhead and contention across threads. 3858 * 3859 * @return the number of steals 3860 */ 3861 public long getStealCount() { 3862 long count = stealCount; 3863 WorkQueue[] qs; WorkQueue q; 3864 if ((qs = queues) != null) { 3865 for (int i = 1; i < qs.length; i += 2) { 3866 if ((q = qs[i]) != null) 3867 count += (long)q.nsteals & 0xffffffffL; 3868 } 3869 } 3870 return count; 3871 } 3872 3873 /** 3874 * Returns an estimate of the total number of tasks currently held 3875 * in queues by worker threads (but not including tasks submitted 3876 * to the pool that have not begun executing). This value is only 3877 * an approximation, obtained by iterating across all threads in 3878 * the pool. This method may be useful for tuning task 3879 * granularities.The returned count does not include scheduled 3880 * tasks that are not yet ready to execute, which are reported 3881 * separately by method {@link getDelayedTaskCount}. 3882 * 3883 * @return the number of queued tasks 3884 * @see ForkJoinWorkerThread#getQueuedTaskCount() 3885 */ 3886 public long getQueuedTaskCount() { 3887 WorkQueue[] qs; WorkQueue q; 3888 long count = 0; 3889 if ((runState & TERMINATED) == 0L && (qs = queues) != null) { 3890 for (int i = 1; i < qs.length; i += 2) { 3891 if ((q = qs[i]) != null) 3892 count += q.queueSize(); 3893 } 3894 } 3895 return count; 3896 } 3897 3898 /** 3899 * Returns an estimate of the number of tasks submitted to this 3900 * pool that have not yet begun executing. This method may take 3901 * time proportional to the number of submissions. 3902 * 3903 * @return the number of queued submissions 3904 */ 3905 public int getQueuedSubmissionCount() { 3906 WorkQueue[] qs; WorkQueue q; 3907 int count = 0; 3908 if ((runState & TERMINATED) == 0L && (qs = queues) != null) { 3909 for (int i = 0; i < qs.length; i += 2) { 3910 if ((q = qs[i]) != null) 3911 count += q.queueSize(); 3912 } 3913 } 3914 return count; 3915 } 3916 3917 /** 3918 * Returns an estimate of the number of delayed (including 3919 * periodic) tasks scheduled in this pool that are not yet ready 3920 * to submit for execution. The returned value is inaccurate while 3921 * delayed tasks are being processed. 3922 * 3923 * @return an estimate of the number of delayed tasks 3924 * @since 25 3925 */ 3926 public long getDelayedTaskCount() { 3927 DelayScheduler ds; 3928 return ((ds = delayScheduler) == null ? 0 : ds.lastStableSize()); 3929 } 3930 3931 /** 3932 * Returns {@code true} if there are any tasks submitted to this 3933 * pool that have not yet begun executing. 3934 * 3935 * @return {@code true} if there are any queued submissions 3936 */ 3937 public boolean hasQueuedSubmissions() { 3938 WorkQueue[] qs; WorkQueue q; 3939 if ((runState & STOP) == 0L && (qs = queues) != null) { 3940 for (int i = 0; i < qs.length; i += 2) { 3941 if ((q = qs[i]) != null && q.queueSize() > 0) 3942 return true; 3943 } 3944 } 3945 return false; 3946 } 3947 3948 /** 3949 * Removes and returns the next unexecuted submission if one is 3950 * available. This method may be useful in extensions to this 3951 * class that re-assign work in systems with multiple pools. 3952 * 3953 * @return the next submission, or {@code null} if none 3954 */ 3955 protected ForkJoinTask<?> pollSubmission() { 3956 return pollScan(true); 3957 } 3958 3959 /** 3960 * Removes all available unexecuted submitted and forked tasks 3961 * from scheduling queues and adds them to the given collection, 3962 * without altering their execution status. These may include 3963 * artificially generated or wrapped tasks. This method is 3964 * designed to be invoked only when the pool is known to be 3965 * quiescent. Invocations at other times may not remove all 3966 * tasks. A failure encountered while attempting to add elements 3967 * to collection {@code c} may result in elements being in 3968 * neither, either or both collections when the associated 3969 * exception is thrown. The behavior of this operation is 3970 * undefined if the specified collection is modified while the 3971 * operation is in progress. 3972 * 3973 * @param c the collection to transfer elements into 3974 * @return the number of elements transferred 3975 */ 3976 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 3977 int count = 0; 3978 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) { 3979 c.add(t); 3980 ++count; 3981 } 3982 return count; 3983 } 3984 3985 /** 3986 * Returns a string identifying this pool, as well as its state, 3987 * including indications of run state, parallelism level, and 3988 * worker and task counts. 3989 * 3990 * @return a string identifying this pool, as well as its state 3991 */ 3992 public String toString() { 3993 // Use a single pass through queues to collect counts 3994 DelayScheduler ds; 3995 long e = runState; 3996 long st = stealCount; 3997 long qt = 0L, ss = 0L; int rc = 0; 3998 WorkQueue[] qs; WorkQueue q; 3999 if ((qs = queues) != null) { 4000 for (int i = 0; i < qs.length; ++i) { 4001 if ((q = qs[i]) != null) { 4002 int size = q.queueSize(); 4003 if ((i & 1) == 0) 4004 ss += size; 4005 else { 4006 qt += size; 4007 st += (long)q.nsteals & 0xffffffffL; 4008 if (q.isApparentlyUnblocked()) 4009 ++rc; 4010 } 4011 } 4012 } 4013 } 4014 String delayed = ((ds = delayScheduler) == null ? "" : 4015 ", delayed = " + ds.lastStableSize()); 4016 int pc = parallelism; 4017 long c = ctl; 4018 int tc = (short)(c >>> TC_SHIFT); 4019 int ac = (short)(c >>> RC_SHIFT); 4020 if (ac < 0) // ignore transient negative 4021 ac = 0; 4022 String level = ((e & TERMINATED) != 0L ? "Terminated" : 4023 (e & STOP) != 0L ? "Terminating" : 4024 (e & SHUTDOWN) != 0L ? "Shutting down" : 4025 "Running"); 4026 return super.toString() + 4027 "[" + level + 4028 ", parallelism = " + pc + 4029 ", size = " + tc + 4030 ", active = " + ac + 4031 ", running = " + rc + 4032 ", steals = " + st + 4033 ", tasks = " + qt + 4034 ", submissions = " + ss + 4035 delayed + 4036 "]"; 4037 } 4038 4039 /** 4040 * Possibly initiates an orderly shutdown in which previously 4041 * submitted tasks are executed, but no new tasks will be 4042 * accepted. Invocation has no effect on execution state if this 4043 * is the {@link #commonPool()}, and no additional effect if 4044 * already shut down. Tasks that are in the process of being 4045 * submitted concurrently during the course of this method may or 4046 * may not be rejected. 4047 */ 4048 public void shutdown() { 4049 if (workerNamePrefix != null) // not common pool 4050 tryTerminate(false, true); 4051 } 4052 4053 /** 4054 * Possibly attempts to cancel and/or stop all tasks, and reject 4055 * all subsequently submitted tasks. Invocation has no effect on 4056 * execution state if this is the {@link #commonPool()}, and no 4057 * additional effect if already shut down. Otherwise, tasks that 4058 * are in the process of being submitted or executed concurrently 4059 * during the course of this method may or may not be 4060 * rejected. This method cancels both existing and unexecuted 4061 * tasks, in order to permit termination in the presence of task 4062 * dependencies. So the method always returns an empty list 4063 * (unlike the case for some other Executors). 4064 * 4065 * @return an empty list 4066 */ 4067 public List<Runnable> shutdownNow() { 4068 if (workerNamePrefix != null) // not common pool 4069 tryTerminate(true, true); 4070 return Collections.emptyList(); 4071 } 4072 4073 /** 4074 * Returns {@code true} if all tasks have completed following shut down. 4075 * 4076 * @return {@code true} if all tasks have completed following shut down 4077 */ 4078 public boolean isTerminated() { 4079 return (tryTerminate(false, false) & TERMINATED) != 0; 4080 } 4081 4082 /** 4083 * Returns {@code true} if the process of termination has 4084 * commenced but not yet completed. This method may be useful for 4085 * debugging. A return of {@code true} reported a sufficient 4086 * period after shutdown may indicate that submitted tasks have 4087 * ignored or suppressed interruption, or are waiting for I/O, 4088 * causing this executor not to properly terminate. (See the 4089 * advisory notes for class {@link ForkJoinTask} stating that 4090 * tasks should not normally entail blocking operations. But if 4091 * they do, they must abort them on interrupt.) 4092 * 4093 * @return {@code true} if terminating but not yet terminated 4094 */ 4095 public boolean isTerminating() { 4096 return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP; 4097 } 4098 4099 /** 4100 * Returns {@code true} if this pool has been shut down. 4101 * 4102 * @return {@code true} if this pool has been shut down 4103 */ 4104 public boolean isShutdown() { 4105 return (runState & SHUTDOWN) != 0L; 4106 } 4107 4108 /** 4109 * Blocks until all tasks have completed execution after a 4110 * shutdown request, or the timeout occurs, or the current thread 4111 * is interrupted, whichever happens first. Because the {@link 4112 * #commonPool()} never terminates until program shutdown, when 4113 * applied to the common pool, this method is equivalent to {@link 4114 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 4115 * 4116 * @param timeout the maximum time to wait 4117 * @param unit the time unit of the timeout argument 4118 * @return {@code true} if this executor terminated and 4119 * {@code false} if the timeout elapsed before termination 4120 * @throws InterruptedException if interrupted while waiting 4121 */ 4122 public boolean awaitTermination(long timeout, TimeUnit unit) 4123 throws InterruptedException { 4124 long nanos = unit.toNanos(timeout); 4125 CountDownLatch done; 4126 if (workerNamePrefix == null) { // is common pool 4127 if (helpQuiescePool(this, nanos, true) < 0) 4128 throw new InterruptedException(); 4129 return false; 4130 } 4131 else if ((tryTerminate(false, false) & TERMINATED) != 0 || 4132 (done = terminationSignal()) == null || 4133 (runState & TERMINATED) != 0L) 4134 return true; 4135 else 4136 return done.await(nanos, TimeUnit.NANOSECONDS); 4137 } 4138 4139 /** 4140 * If called by a ForkJoinTask operating in this pool, equivalent 4141 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 4142 * waits and/or attempts to assist performing tasks until this 4143 * pool {@link #isQuiescent} or the indicated timeout elapses. 4144 * 4145 * @param timeout the maximum time to wait 4146 * @param unit the time unit of the timeout argument 4147 * @return {@code true} if quiescent; {@code false} if the 4148 * timeout elapsed. 4149 */ 4150 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 4151 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0); 4152 } 4153 4154 /** 4155 * Unless this is the {@link #commonPool()}, initiates an orderly 4156 * shutdown in which previously submitted tasks are executed, but 4157 * no new tasks will be accepted, and waits until all tasks have 4158 * completed execution and the executor has terminated. 4159 * 4160 * <p> If already terminated, or this is the {@link 4161 * #commonPool()}, this method has no effect on execution, and 4162 * does not wait. Otherwise, if interrupted while waiting, this 4163 * method stops all executing tasks as if by invoking {@link 4164 * #shutdownNow()}. It then continues to wait until all actively 4165 * executing tasks have completed. Tasks that were awaiting 4166 * execution are not executed. The interrupt status will be 4167 * re-asserted before this method returns. 4168 * 4169 * @since 19 4170 */ 4171 @Override 4172 public void close() { 4173 if (workerNamePrefix != null) { 4174 CountDownLatch done = null; 4175 boolean interrupted = false; 4176 while ((tryTerminate(interrupted, true) & TERMINATED) == 0) { 4177 if (done == null) 4178 done = terminationSignal(); 4179 else { 4180 try { 4181 done.await(); 4182 break; 4183 } catch (InterruptedException ex) { 4184 interrupted = true; 4185 } 4186 } 4187 } 4188 if (interrupted) 4189 Thread.currentThread().interrupt(); 4190 } 4191 } 4192 4193 /** 4194 * Interface for extending managed parallelism for tasks running 4195 * in {@link ForkJoinPool}s. 4196 * 4197 * <p>A {@code ManagedBlocker} provides two methods. Method 4198 * {@link #isReleasable} must return {@code true} if blocking is 4199 * not necessary. Method {@link #block} blocks the current thread 4200 * if necessary (perhaps internally invoking {@code isReleasable} 4201 * before actually blocking). These actions are performed by any 4202 * thread invoking {@link 4203 * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual 4204 * methods in this API accommodate synchronizers that may, but 4205 * don't usually, block for long periods. Similarly, they allow 4206 * more efficient internal handling of cases in which additional 4207 * workers may be, but usually are not, needed to ensure 4208 * sufficient parallelism. Toward this end, implementations of 4209 * method {@code isReleasable} must be amenable to repeated 4210 * invocation. Neither method is invoked after a prior invocation 4211 * of {@code isReleasable} or {@code block} returns {@code true}. 4212 * 4213 * <p>For example, here is a ManagedBlocker based on a 4214 * ReentrantLock: 4215 * <pre> {@code 4216 * class ManagedLocker implements ManagedBlocker { 4217 * final ReentrantLock lock; 4218 * boolean hasLock = false; 4219 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 4220 * public boolean block() { 4221 * if (!hasLock) 4222 * lock.lock(); 4223 * return true; 4224 * } 4225 * public boolean isReleasable() { 4226 * return hasLock || (hasLock = lock.tryLock()); 4227 * } 4228 * }}</pre> 4229 * 4230 * <p>Here is a class that possibly blocks waiting for an 4231 * item on a given queue: 4232 * <pre> {@code 4233 * class QueueTaker<E> implements ManagedBlocker { 4234 * final BlockingQueue<E> queue; 4235 * volatile E item = null; 4236 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 4237 * public boolean block() throws InterruptedException { 4238 * if (item == null) 4239 * item = queue.take(); 4240 * return true; 4241 * } 4242 * public boolean isReleasable() { 4243 * return item != null || (item = queue.poll()) != null; 4244 * } 4245 * public E getItem() { // call after pool.managedBlock completes 4246 * return item; 4247 * } 4248 * }}</pre> 4249 */ 4250 public static interface ManagedBlocker { 4251 /** 4252 * Possibly blocks the current thread, for example waiting for 4253 * a lock or condition. 4254 * 4255 * @return {@code true} if no additional blocking is necessary 4256 * (i.e., if isReleasable would return true) 4257 * @throws InterruptedException if interrupted while waiting 4258 * (the method is not required to do so, but is allowed to) 4259 */ 4260 boolean block() throws InterruptedException; 4261 4262 /** 4263 * Returns {@code true} if blocking is unnecessary. 4264 * @return {@code true} if blocking is unnecessary 4265 */ 4266 boolean isReleasable(); 4267 } 4268 4269 /** 4270 * Runs the given possibly blocking task. When {@linkplain 4271 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 4272 * method possibly arranges for a spare thread to be activated if 4273 * necessary to ensure sufficient parallelism while the current 4274 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 4275 * 4276 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 4277 * {@code blocker.block()} until either method returns {@code true}. 4278 * Every call to {@code blocker.block()} is preceded by a call to 4279 * {@code blocker.isReleasable()} that returned {@code false}. 4280 * 4281 * <p>If not running in a ForkJoinPool, this method is 4282 * behaviorally equivalent to 4283 * <pre> {@code 4284 * while (!blocker.isReleasable()) 4285 * if (blocker.block()) 4286 * break;}</pre> 4287 * 4288 * If running in a ForkJoinPool, the pool may first be expanded to 4289 * ensure sufficient parallelism available during the call to 4290 * {@code blocker.block()}. 4291 * 4292 * @param blocker the blocker task 4293 * @throws InterruptedException if {@code blocker.block()} did so 4294 */ 4295 public static void managedBlock(ManagedBlocker blocker) 4296 throws InterruptedException { 4297 Thread t; ForkJoinPool p; 4298 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && 4299 (p = ((ForkJoinWorkerThread)t).pool) != null) 4300 p.compensatedBlock(blocker); 4301 else 4302 unmanagedBlock(blocker); 4303 } 4304 4305 /** ManagedBlock for ForkJoinWorkerThreads */ 4306 private void compensatedBlock(ManagedBlocker blocker) 4307 throws InterruptedException { 4308 Objects.requireNonNull(blocker); 4309 for (;;) { 4310 int comp; boolean done; 4311 long c = ctl; 4312 if (blocker.isReleasable()) 4313 break; 4314 if ((runState & STOP) != 0L) 4315 throw new InterruptedException(); 4316 if ((comp = tryCompensate(c)) >= 0) { 4317 try { 4318 done = blocker.block(); 4319 } finally { 4320 if (comp > 0) 4321 getAndAddCtl(RC_UNIT); 4322 } 4323 if (done) 4324 break; 4325 } 4326 } 4327 } 4328 4329 /** 4330 * Invokes tryCompensate to create or re-activate a spare thread to 4331 * compensate for a thread that performs a blocking operation. When the 4332 * blocking operation is done then endCompensatedBlock must be invoked 4333 * with the value returned by this method to re-adjust the parallelism. 4334 * @return value to use in endCompensatedBlock 4335 */ 4336 final long beginCompensatedBlock() { 4337 int c; 4338 do {} while ((c = tryCompensate(ctl)) < 0); 4339 return (c == 0) ? 0L : RC_UNIT; 4340 } 4341 4342 /** 4343 * Re-adjusts parallelism after a blocking operation completes. 4344 * @param post value from beginCompensatedBlock 4345 */ 4346 void endCompensatedBlock(long post) { 4347 if (post > 0L) { 4348 getAndAddCtl(post); 4349 } 4350 } 4351 4352 /** ManagedBlock for external threads */ 4353 private static void unmanagedBlock(ManagedBlocker blocker) 4354 throws InterruptedException { 4355 Objects.requireNonNull(blocker); 4356 do {} while (!blocker.isReleasable() && !blocker.block()); 4357 } 4358 4359 @Override 4360 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 4361 Objects.requireNonNull(runnable); 4362 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 4363 new ForkJoinTask.AdaptedRunnable<T>(runnable, value) : 4364 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value); 4365 } 4366 4367 @Override 4368 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 4369 Objects.requireNonNull(callable); 4370 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 4371 new ForkJoinTask.AdaptedCallable<T>(callable) : 4372 new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable); 4373 } 4374 4375 static { 4376 U = Unsafe.getUnsafe(); 4377 Class<ForkJoinPool> klass = ForkJoinPool.class; 4378 try { 4379 Field poolIdsField = klass.getDeclaredField("poolIds"); 4380 POOLIDS_BASE = U.staticFieldBase(poolIdsField); 4381 POOLIDS = U.staticFieldOffset(poolIdsField); 4382 } catch (NoSuchFieldException e) { 4383 throw new ExceptionInInitializerError(e); 4384 } 4385 CTL = U.objectFieldOffset(klass, "ctl"); 4386 RUNSTATE = U.objectFieldOffset(klass, "runState"); 4387 PARALLELISM = U.objectFieldOffset(klass, "parallelism"); 4388 THREADIDS = U.objectFieldOffset(klass, "threadIds"); 4389 TERMINATION = U.objectFieldOffset(klass, "termination"); 4390 Class<ForkJoinTask[]> aklass = ForkJoinTask[].class; 4391 ABASE = U.arrayBaseOffset(aklass); 4392 int scale = U.arrayIndexScale(aklass); 4393 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 4394 if ((scale & (scale - 1)) != 0) 4395 throw new Error("array index scale not a power of two"); 4396 4397 U.ensureClassInitialized(LockSupport.class); // ensure loaded and initialized 4398 // allow access to non-public methods 4399 JLA = SharedSecrets.getJavaLangAccess(); 4400 SharedSecrets.setJavaUtilConcurrentFJPAccess( 4401 new JavaUtilConcurrentFJPAccess() { 4402 @Override 4403 public long beginCompensatedBlock(ForkJoinPool pool) { 4404 return pool.beginCompensatedBlock(); 4405 } 4406 public void endCompensatedBlock(ForkJoinPool pool, long post) { 4407 pool.endCompensatedBlock(post); 4408 } 4409 }); 4410 defaultForkJoinWorkerThreadFactory = 4411 new DefaultForkJoinWorkerThreadFactory(); 4412 common = new ForkJoinPool((byte)0); 4413 } 4414 }