1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.lang.reflect.Field; 40 import java.security.AccessController; 41 import java.security.AccessControlContext; 42 import java.security.Permission; 43 import java.security.Permissions; 44 import java.security.PrivilegedAction; 45 import java.security.ProtectionDomain; 46 import java.util.ArrayList; 47 import java.util.Collection; 48 import java.util.Collections; 49 import java.util.List; 50 import java.util.Objects; 51 import java.util.function.Predicate; 52 import java.util.concurrent.CountDownLatch; 53 import java.util.concurrent.locks.LockSupport; 54 import jdk.internal.access.JavaUtilConcurrentFJPAccess; 55 import jdk.internal.access.SharedSecrets; 56 import jdk.internal.misc.Unsafe; 57 import jdk.internal.vm.SharedThreadContainer; 58 59 /** 60 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 61 * A {@code ForkJoinPool} provides the entry point for submissions 62 * from non-{@code ForkJoinTask} clients, as well as management and 63 * monitoring operations. 64 * 65 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 66 * ExecutorService} mainly by virtue of employing 67 * <em>work-stealing</em>: all threads in the pool attempt to find and 68 * execute tasks submitted to the pool and/or created by other active 69 * tasks (eventually blocking waiting for work if none exist). This 70 * enables efficient processing when most tasks spawn other subtasks 71 * (as do most {@code ForkJoinTask}s), as well as when many small 72 * tasks are submitted to the pool from external clients. Especially 73 * when setting <em>asyncMode</em> to true in constructors, {@code 74 * ForkJoinPool}s may also be appropriate for use with event-style 75 * tasks that are never joined. All worker threads are initialized 76 * with {@link Thread#isDaemon} set {@code true}. 77 * 78 * <p>A static {@link #commonPool()} is available and appropriate for 79 * most applications. The common pool is used by any ForkJoinTask that 80 * is not explicitly submitted to a specified pool. Using the common 81 * pool normally reduces resource usage (its threads are slowly 82 * reclaimed during periods of non-use, and reinstated upon subsequent 83 * use). 84 * 85 * <p>For applications that require separate or custom pools, a {@code 86 * ForkJoinPool} may be constructed with a given target parallelism 87 * level; by default, equal to the number of available processors. 88 * The pool attempts to maintain enough active (or available) threads 89 * by dynamically adding, suspending, or resuming internal worker 90 * threads, even if some tasks are stalled waiting to join others. 91 * However, no such adjustments are guaranteed in the face of blocked 92 * I/O or other unmanaged synchronization. The nested {@link 93 * ManagedBlocker} interface enables extension of the kinds of 94 * synchronization accommodated. The default policies may be 95 * overridden using a constructor with parameters corresponding to 96 * those documented in class {@link ThreadPoolExecutor}. 97 * 98 * <p>In addition to execution and lifecycle control methods, this 99 * class provides status check methods (for example 100 * {@link #getStealCount}) that are intended to aid in developing, 101 * tuning, and monitoring fork/join applications. Also, method 102 * {@link #toString} returns indications of pool state in a 103 * convenient form for informal monitoring. 104 * 105 * <p>As is the case with other ExecutorServices, there are three 106 * main task execution methods summarized in the following table. 107 * These are designed to be used primarily by clients not already 108 * engaged in fork/join computations in the current pool. The main 109 * forms of these methods accept instances of {@code ForkJoinTask}, 110 * but overloaded forms also allow mixed execution of plain {@code 111 * Runnable}- or {@code Callable}- based activities as well. However, 112 * tasks that are already executing in a pool should normally instead 113 * use the within-computation forms listed in the table unless using 114 * async event-style tasks that are not usually joined, in which case 115 * there is little difference among choice of methods. 116 * 117 * <table class="plain"> 118 * <caption>Summary of task execution methods</caption> 119 * <tr> 120 * <td></td> 121 * <th scope="col"> Call from non-fork/join clients</th> 122 * <th scope="col"> Call from within fork/join computations</th> 123 * </tr> 124 * <tr> 125 * <th scope="row" style="text-align:left"> Arrange async execution</th> 126 * <td> {@link #execute(ForkJoinTask)}</td> 127 * <td> {@link ForkJoinTask#fork}</td> 128 * </tr> 129 * <tr> 130 * <th scope="row" style="text-align:left"> Await and obtain result</th> 131 * <td> {@link #invoke(ForkJoinTask)}</td> 132 * <td> {@link ForkJoinTask#invoke}</td> 133 * </tr> 134 * <tr> 135 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 136 * <td> {@link #submit(ForkJoinTask)}</td> 137 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 138 * </tr> 139 * </table> 140 * 141 * <p>The parameters used to construct the common pool may be controlled by 142 * setting the following {@linkplain System#getProperty system properties}: 143 * <ul> 144 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism} 145 * - the parallelism level, a non-negative integer 146 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory} 147 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 148 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 149 * is used to load this class. 150 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler} 151 * - the class name of a {@link UncaughtExceptionHandler}. 152 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 153 * is used to load this class. 154 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares} 155 * - the maximum number of allowed extra threads to maintain target 156 * parallelism (default 256). 157 * </ul> 158 * If no thread factory is supplied via a system property, then the 159 * common pool uses a factory that uses the system class loader as the 160 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 161 * In addition, if a {@link SecurityManager} is present, then 162 * the common pool uses a factory supplying threads that have no 163 * {@link Permissions} enabled, and are not guaranteed to preserve 164 * the values of {@link java.lang.ThreadLocal} variables across tasks. 165 * 166 * Upon any error in establishing these settings, default parameters 167 * are used. It is possible to disable or limit the use of threads in 168 * the common pool by setting the parallelism property to zero, and/or 169 * using a factory that may return {@code null}. However doing so may 170 * cause unjoined tasks to never be executed. 171 * 172 * @implNote This implementation restricts the maximum number of 173 * running threads to 32767. Attempts to create pools with greater 174 * than the maximum number result in {@code 175 * IllegalArgumentException}. Also, this implementation rejects 176 * submitted tasks (that is, by throwing {@link 177 * RejectedExecutionException}) only when the pool is shut down or 178 * internal resources have been exhausted. 179 * 180 * @since 1.7 181 * @author Doug Lea 182 */ 183 public class ForkJoinPool extends AbstractExecutorService { 184 185 /* 186 * Implementation Overview 187 * 188 * This class and its nested classes provide the main 189 * functionality and control for a set of worker threads. Because 190 * most internal methods and nested classes are interrelated, 191 * their main rationale and descriptions are presented here; 192 * individual methods and nested classes contain only brief 193 * comments about details. Broadly: submissions from non-FJ 194 * threads enter into submission queues. Workers take these tasks 195 * and typically split them into subtasks that may be stolen by 196 * other workers. Work-stealing based on randomized scans 197 * generally leads to better throughput than "work dealing" in 198 * which producers assign tasks to idle threads, in part because 199 * threads that have finished other tasks before the signalled 200 * thread wakes up (which can be a long time) can take the task 201 * instead. Preference rules give first priority to processing 202 * tasks from their own queues (LIFO or FIFO, depending on mode), 203 * then to randomized FIFO steals of tasks in other queues. 204 * 205 * This framework began as vehicle for supporting structured 206 * parallelism using work-stealing, designed to work best when 207 * tasks are dag-structured (wrt completion dependencies), nested 208 * (generated using recursion or completions), of reasonable 209 * granularity, independent (wrt memory and resources) and where 210 * callers participate in task execution. These are properties 211 * that anyone aiming for efficient parallel multicore execution 212 * should design for. Over time, the scalability advantages of 213 * this framework led to extensions to better support more diverse 214 * usage contexts, amounting to weakenings or violations of each 215 * of these properties. Accommodating them may compromise 216 * performance, but mechanics discussed below include tradeoffs 217 * attempting to arrange that no single performance issue dominates. 218 * 219 * Here's a brief history of major revisions, each also with other 220 * minor features and changes. 221 * 222 * 1. Only handle recursively structured computational tasks 223 * 2. Async (FIFO) mode and striped submission queues 224 * 3. Completion-based tasks (mainly CountedCompleters) 225 * 4. CommonPool and parallelStream support 226 * 5. InterruptibleTasks for externally submitted tasks 227 * 228 * Most changes involve adaptions of base algorithms using 229 * combinations of static and dynamic bitwise mode settings (both 230 * here and in ForkJoinTask), and subclassing of ForkJoinTask. 231 * There are a fair number of odd code constructions and design 232 * decisions for components that reside at the edge of Java vs JVM 233 * functionality. 234 * 235 * WorkQueues 236 * ========== 237 * 238 * Most operations occur within work-stealing queues (in nested 239 * class WorkQueue). These are special forms of Deques that 240 * support only three of the four possible end-operations -- push, 241 * pop, and poll (aka steal), under the further constraints that 242 * push and pop are called only from the owning thread (or, as 243 * extended here, under a lock), while poll may be called from 244 * other threads. (If you are unfamiliar with them, you probably 245 * want to read Herlihy and Shavit's book "The Art of 246 * Multiprocessor programming", chapter 16 describing these in 247 * more detail before proceeding.) The main work-stealing queue 248 * design is roughly similar to those in the papers "Dynamic 249 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 250 * (http://research.sun.com/scalable/pubs/index.html) and 251 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 252 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 253 * The main differences ultimately stem from GC requirements that 254 * we null out taken slots as soon as we can, to maintain as small 255 * a footprint as possible even in programs generating huge 256 * numbers of tasks. To accomplish this, we shift the CAS 257 * arbitrating pop vs poll (steal) from being on the indices 258 * ("base" and "top") to the slots themselves. These provide the 259 * primary required memory ordering -- see "Correct and Efficient 260 * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and 261 * Nardelli, PPoPP 2013 262 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 263 * analysis of memory ordering requirements in work-stealing 264 * algorithms similar to the one used here. We use per-operation 265 * ordered writes of various kinds for updates, but usually use 266 * explicit load fences for reads, to cover access of several 267 * fields of possibly several objects without further constraining 268 * read-by-read ordering. 269 * 270 * We also support a user mode in which local task processing is 271 * in FIFO, not LIFO order, simply by using a local version of 272 * poll rather than pop. This can be useful in message-passing 273 * frameworks in which tasks are never joined, although with 274 * increased contention among task producers and consumers. Also, 275 * the same data structure (and class) is used for "submission 276 * queues" (described below) holding externally submitted tasks, 277 * that differ only in that a lock (using field "phase"; see below) is 278 * required by external callers to push and pop tasks. 279 * 280 * Adding tasks then takes the form of a classic array push(task) 281 * in a circular buffer: 282 * q.array[q.top++ % length] = task; 283 * 284 * The actual code needs to null-check and size-check the array, 285 * uses masking, not mod, for indexing a power-of-two-sized array, 286 * enforces memory ordering, supports resizing, and possibly 287 * signals waiting workers to start scanning (described below), 288 * which requires stronger forms of order accesses. 289 * 290 * The pop operation (always performed by owner) is of the form: 291 * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null) 292 * decrement top and return task; 293 * If this fails, the queue is empty. This operation is one part 294 * of the nextLocalTask method, that instead does a local-poll 295 * in FIFO mode. 296 * 297 * The poll operation is, basically: 298 * if (CAS nonnull task t = q.array[k = q.base % length] to null) 299 * increment base and return task; 300 * 301 * However, there are several more cases that must be dealt with. 302 * Some of them are just due to asynchrony; others reflect 303 * contention and stealing policies. Stepping through them 304 * illustrates some of the implementation decisions in this class. 305 * 306 * * Slot k must be read with an acquiring read, which it must 307 * anyway to dereference and run the task if the (acquiring) 308 * CAS succeeds, but uses an explicit acquire fence to support 309 * the following rechecks even if the CAS is not attempted. 310 * 311 * * q.base may change between reading and using its value to 312 * index the slot. To avoid trying to use the wrong t, the 313 * index and slot must be reread (not necessarily immediately) 314 * until consistent, unless this is a local poll by owner, in 315 * which case this form of inconsistency can only appear as t 316 * being null, below. 317 * 318 * * Similarly, q.array may change (due to a resize), unless this 319 * is a local poll by owner. Otherwise, when t is present, this 320 * only needs consideration on CAS failure (since a CAS 321 * confirms the non-resized case.) 322 * 323 * * t may appear null because a previous poll operation has not 324 * yet incremented q.base, so the read is from an already-taken 325 * index. This form of stall reflects the non-lock-freedom of 326 * the poll operation. Stalls can be detected by observing that 327 * q.base doesn't change on repeated reads of null t and when 328 * no other alternatives apply, spin-wait for it to settle. To 329 * reduce producing these kinds of stalls by other stealers, we 330 * encourage timely writes to indices using otherwise 331 * unnecessarily strong writes. 332 * 333 * * The CAS may fail, in which case we may want to retry unless 334 * there is too much contention. One goal is to balance and 335 * spread out the many forms of contention that may be 336 * encountered across polling and other operations to avoid 337 * sustained performance degradations. Across all cases where 338 * alternatives exist, a bounded number of CAS misses or stalls 339 * are tolerated (for slots, ctl, and elsewhere described 340 * below) before taking alternative action. These may move 341 * contention or retries elsewhere, which is still preferable 342 * to single-point bottlenecks. 343 * 344 * * Even though the check "top == base" is quiescently accurate 345 * to determine whether a queue is empty, it is not of much use 346 * when deciding whether to try to poll or repoll after a 347 * failure. Both top and base may move independently, and both 348 * lag updates to the underlying array. To reduce memory 349 * contention, non-owners avoid reading the "top" when 350 * possible, by using one-ahead reads to check whether to 351 * repoll, relying on the fact that a non-empty queue does not 352 * have two null slots in a row, except in cases (resizes and 353 * shifts) that can be detected with a secondary recheck that 354 * is less likely to conflict with owner writes. 355 * 356 * The poll operations in q.poll(), runWorker(), helpJoin(), and 357 * elsewhere differ with respect to whether other queues are 358 * available to try, and the presence or nature of screening steps 359 * when only some kinds of tasks can be taken. When alternatives 360 * (or failing) is an option, they uniformly give up after 361 * bounded numbers of stalls and/or CAS failures, which reduces 362 * contention when too many workers are polling too few tasks. 363 * Overall, in the aggregate, we ensure probabilistic 364 * non-blockingness of work-stealing at least until checking 365 * quiescence (which is intrinsically blocking): If an attempted 366 * steal fails in these ways, a scanning thief chooses a different 367 * target to try next. In contexts where alternatives aren't 368 * available, and when progress conditions can be isolated to 369 * values of a single variable, simple spinloops (using 370 * Thread.onSpinWait) are used to reduce memory traffic. 371 * 372 * WorkQueues are also used in a similar way for tasks submitted 373 * to the pool. We cannot mix these tasks in the same queues used 374 * by workers. Instead, we randomly associate submission queues 375 * with submitting threads, using a form of hashing. The 376 * ThreadLocalRandom probe value serves as a hash code for 377 * choosing existing queues, and may be randomly repositioned upon 378 * contention with other submitters. In essence, submitters act 379 * like workers except that they are restricted to executing local 380 * tasks that they submitted (or when known, subtasks thereof). 381 * Insertion of tasks in shared mode requires a lock. We use only 382 * a simple spinlock (as one role of field "phase") because 383 * submitters encountering a busy queue move to a different 384 * position to use or create other queues. They (spin) block when 385 * registering new queues, or indirectly elsewhere, by revisiting 386 * later. 387 * 388 * Management 389 * ========== 390 * 391 * The main throughput advantages of work-stealing stem from 392 * decentralized control -- workers mostly take tasks from 393 * themselves or each other, at rates that can exceed a billion 394 * per second. Most non-atomic control is performed by some form 395 * of scanning across or within queues. The pool itself creates, 396 * activates (enables scanning for and running tasks), 397 * deactivates, blocks, and terminates threads, all with minimal 398 * central information. There are only a few properties that we 399 * can globally track or maintain, so we pack them into a small 400 * number of variables, often maintaining atomicity without 401 * blocking or locking. Nearly all essentially atomic control 402 * state is held in a few variables that are by far most often 403 * read (not written) as status and consistency checks. We pack as 404 * much information into them as we can. 405 * 406 * Field "ctl" contains 64 bits holding information needed to 407 * atomically decide to add, enqueue (on an event queue), and 408 * dequeue and release workers. To enable this packing, we 409 * restrict maximum parallelism to (1<<15)-1 (which is far in 410 * excess of normal operating range) to allow ids, counts, and 411 * their negations (used for thresholding) to fit into 16bit 412 * subfields. 413 * 414 * Field "runState" and per-WorkQueue field "phase" play similar 415 * roles, as lockable, versioned counters. Field runState also 416 * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED). 417 * The version tags enable detection of state changes (by 418 * comparing two reads) modulo bit wraparound. The bit range in 419 * each case suffices for purposes of determining quiescence, 420 * termination, avoiding ABA-like errors, and signal control, most 421 * of which are ultimately based on at most 15bit ranges (due to 422 * 32767 max total workers). RunState updates do not need to be 423 * atomic with respect to ctl updates, but because they are not, 424 * some care is required to avoid stalls. The seqLock properties 425 * detect changes and conditionally upgrade to coordinate with 426 * updates. It is typically held for less than a dozen 427 * instructions unless the queue array is being resized, during 428 * which contention is rare. To be conservative, lockRunState is 429 * implemented as a spin/sleep loop. Here and elsewhere spin 430 * constants are short enough to apply even on systems with few 431 * available processors. In addition to checking pool status, 432 * reads of runState sometimes serve as acquire fences before 433 * reading other fields. 434 * 435 * Field "parallelism" holds the target parallelism (normally 436 * corresponding to pool size). Users can dynamically reset target 437 * parallelism, but is only accessed when signalling or awaiting 438 * work, so only slowly has an effect in creating threads or 439 * letting them time out and terminate when idle. 440 * 441 * Array "queues" holds references to WorkQueues. It is updated 442 * (only during worker creation and termination) under the 443 * runState lock. It is otherwise concurrently readable but reads 444 * for use in scans (see below) are always prefaced by a volatile 445 * read of runState (or equivalent constructions), ensuring that 446 * its state is current at the point it is used (which is all we 447 * require). To simplify index-based operations, the array size is 448 * always a power of two, and all readers must tolerate null 449 * slots. Worker queues are at odd indices. Worker phase ids 450 * masked with SMASK match their index. Shared (submission) queues 451 * are at even indices. Grouping them together in this way aids in 452 * task scanning: At top-level, both kinds of queues should be 453 * sampled with approximately the same probability, which is 454 * simpler if they are all in the same array. But we also need to 455 * identify what kind they are without looking at them, leading to 456 * this odd/even scheme. One disadvantage is that there are 457 * usually many fewer submission queues, so there can be many 458 * wasted probes (null slots). But this is still cheaper than 459 * alternatives. Other loops over the queues array vary in origin 460 * and stride depending on whether they cover only submission 461 * (even) or worker (odd) queues or both, and whether they require 462 * randomness (in which case cyclically exhaustive strides may be 463 * used). 464 * 465 * All worker thread creation is on-demand, triggered by task 466 * submissions, replacement of terminated workers, and/or 467 * compensation for blocked workers. However, all other support 468 * code is set up to work with other policies. To ensure that we 469 * do not hold on to worker or task references that would prevent 470 * GC, all accesses to workQueues in waiting, signalling, and 471 * control methods are via indices into the queues array (which is 472 * one source of some of the messy code constructions here). In 473 * essence, the queues array serves as a weak reference 474 * mechanism. In particular, the stack top subfield of ctl stores 475 * indices, not references. Operations on queues obtained from 476 * these indices remain valid (with at most some unnecessary extra 477 * work) even if an underlying worker failed and was replaced by 478 * another at the same index. During termination, worker queue 479 * array updates are disabled. 480 * 481 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 482 * cannot let workers spin indefinitely scanning for tasks when 483 * none can be found immediately, and we cannot start/resume 484 * workers unless there appear to be tasks available. On the 485 * other hand, we must quickly prod them into action when new 486 * tasks are submitted or generated. These latencies are mainly a 487 * function of JVM park/unpark (and underlying OS) performance, 488 * which can be slow and variable (even though usages are 489 * streamlined as much as possible). In many usages, ramp-up time 490 * is the main limiting factor in overall performance, which is 491 * compounded at program start-up by JIT compilation and 492 * allocation. On the other hand, throughput degrades when too 493 * many threads poll for too few tasks. (See below.) 494 * 495 * The "ctl" field atomically maintains total and "released" 496 * worker counts, plus the head of the available worker queue 497 * (actually stack, represented by the lower 32bit subfield of 498 * ctl). Released workers are those known to be scanning for 499 * and/or running tasks (we cannot accurately determine 500 * which). Unreleased ("available") workers are recorded in the 501 * ctl stack. These workers are made eligible for signalling by 502 * enqueuing in ctl (see method deactivate). This "queue" is a 503 * form of Treiber stack. This is ideal for activating threads in 504 * most-recently used order, and improves performance and 505 * locality, outweighing the disadvantages of being prone to 506 * contention and inability to release a worker unless it is 507 * topmost on stack. The top stack state holds the value of the 508 * "phase" field of the worker: its index and status, plus a 509 * version counter that, in addition to the count subfields (also 510 * serving as version stamps) provide protection against Treiber 511 * stack ABA effects. 512 * 513 * Creating workers. To create a worker, we pre-increment counts 514 * (serving as a reservation), and attempt to construct a 515 * ForkJoinWorkerThread via its factory. On starting, the new 516 * thread first invokes registerWorker, where it is assigned an 517 * index in the queues array (expanding the array if necessary). 518 * Upon any exception across these steps, or null return from 519 * factory, deregisterWorker adjusts counts and records 520 * accordingly. If a null return, the pool continues running with 521 * fewer than the target number workers. If exceptional, the 522 * exception is propagated, generally to some external caller. 523 * 524 * WorkQueue field "phase" encodes the queue array id in lower 525 * bits, and otherwise acts similarly to the pool runState field: 526 * The "IDLE" bit is clear while active (either a released worker 527 * or a locked external queue), with other bits serving as a 528 * version counter to distinguish changes across multiple reads. 529 * Note that phase field updates lag queue CAS releases; seeing a 530 * non-idle phase does not guarantee that the worker is available 531 * (and so is never checked in this way). 532 * 533 * The ctl field also serves as the basis for memory 534 * synchronization surrounding activation. This uses a more 535 * efficient version of a Dekker-like rule that task producers and 536 * consumers sync with each other by both writing/CASing ctl (even 537 * if to its current value). However, rather than CASing ctl to 538 * its current value in the common case where no action is 539 * required, we reduce write contention by ensuring that 540 * signalWork invocations are prefaced with a fully fenced memory 541 * access (which is usually needed anyway). 542 * 543 * Signalling. Signals (in signalWork) cause new or reactivated 544 * workers to scan for tasks. Method signalWork and its callers 545 * try to approximate the unattainable goal of having the right 546 * number of workers activated for the tasks at hand, but must err 547 * on the side of too many workers vs too few to avoid stalls: 548 * 549 * * If computations are purely tree structured, it suffices for 550 * every worker to activate another when it pushes a task into 551 * an empty queue, resulting in O(log(#threads)) steps to full 552 * activation. Emptiness must be conservatively approximated 553 * (by checking if there is apparently at most one existing 554 * task) which may result in unnecessary signals. Also, to 555 * reduce resource usages in some cases, at the expense of 556 * slower startup in others, activation of an idle thread is 557 * preferred over creating a new one, here and elsewhere. 558 * 559 * * At the other extreme, if "flat" tasks (those that do not in 560 * turn generate others) come in serially from only a single 561 * producer, each worker taking its first (since the last 562 * activation) task from a queue should propagate a signal if 563 * there are more tasks in that queue. This is equivalent to, 564 * but generally faster than, arranging the stealer take 565 * multiple tasks, re-pushing one or more on its own queue, and 566 * signalling (because its queue is empty), also resulting in 567 * logarithmic full activation time. 568 * 569 * * Because we don't know about usage patterns (or most commonly, 570 * mixtures), we use both approaches, which present even more 571 * opportunities to over-signal. (Failure to distinguish these 572 * cases in terms of submission methods was arguably an early 573 * design mistake.) Note that in either of these contexts, 574 * signals may be (and often are) unnecessary because active 575 * workers continue scanning after running tasks without the 576 * need to be signalled (which is one reason work stealing is 577 * often faster than alternatives), so additional workers 578 * aren't needed. 579 * 580 * * For rapidly branching tasks that require full pool resources, 581 * oversignalling is OK, because signalWork will soon have no 582 * more workers to create or reactivate. But for others (mainly 583 * externally submitted tasks), overprovisioning may cause very 584 * noticeable slowdowns due to contention and resource 585 * wastage. We reduce impact by deactivating workers when 586 * queues don't have accessible tasks, but reactivating and 587 * rescanning if other tasks remain. 588 * 589 * * Despite these, signal contention and overhead effects still 590 * occur during ramp-up and ramp-down of small computations. 591 * 592 * Scanning. Method runWorker performs top-level scanning for (and 593 * execution of) tasks by polling a pseudo-random permutation of 594 * the array (by starting at a given index, and using a constant 595 * cyclically exhaustive stride.) It uses the same basic polling 596 * method as WorkQueue.poll(), but restarts with a different 597 * permutation on each invocation. The pseudorandom generator 598 * need not have high-quality statistical properties in the long 599 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence 600 * from ThreadLocalRandom probes, which are cheap and 601 * suffice. Each queue's polling attempts to avoid becoming stuck 602 * when other scanners/pollers stall. Scans do not otherwise 603 * explicitly take into account core affinities, loads, cache 604 * localities, etc, However, they do exploit temporal locality 605 * (which usually approximates these) by preferring to re-poll 606 * from the same queue after a successful poll before trying 607 * others, which also reduces bookkeeping, cache traffic, and 608 * scanning overhead. But it also reduces fairness, which is 609 * partially counteracted by giving up on detected interference 610 * (which also reduces contention when too many workers try to 611 * take small tasks from the same queue). 612 * 613 * Deactivation. When no tasks are found by a worker in runWorker, 614 * it tries to deactivate()), giving up (and rescanning) on "ctl" 615 * contention. To avoid missed signals during deactivation, the 616 * method rescans and reactivates if there may have been a missed 617 * signal during deactivation. Because idle workers are often not 618 * yet blocked (parked), we use a WorkQueue field to advertise 619 * that a waiter actually needs unparking upon signal. 620 * 621 * Quiescence. Workers scan looking for work, giving up when they 622 * don't find any, without being sure that none are available. 623 * However, some required functionality relies on consensus about 624 * quiescence (also termination, discussed below). The count 625 * fields in ctl allow accurate discovery of states in which all 626 * workers are idle. However, because external (asynchronous) 627 * submitters are not part of this vote, these mechanisms 628 * themselves do not guarantee that the pool is in a quiescent 629 * state with respect to methods isQuiescent, shutdown (which 630 * begins termination when quiescent), helpQuiesce, and indirectly 631 * others including tryCompensate. Method quiescent() is used in 632 * all of these contexts. It provides checks that all workers are 633 * idle and there are no submissions that they could poll if they 634 * were not idle, retrying on inconsistent reads of queues and 635 * using the runState seqLock to retry on queue array updates. 636 * (It also reports quiescence if the pool is terminating.) A true 637 * report means only that there was a moment at which quiescence 638 * held. False negatives are inevitable (for example when queues 639 * indices lag updates, as described above), which is accommodated 640 * when (tentatively) idle by scanning for work etc, and then 641 * re-invoking. This includes cases in which the final unparked 642 * thread (in deactivate()) uses quiescent() to check for tasks 643 * that could have been added during a race window that would not 644 * be accompanied by a signal, in which case re-activating itself 645 * (or any other worker) to rescan. Method helpQuiesce acts 646 * similarly but cannot rely on ctl counts to determine that all 647 * workers are inactive because the caller and any others 648 * executing helpQuiesce are not included in counts. 649 * 650 * Termination. A call to shutdownNow invokes tryTerminate to 651 * atomically set a runState mode bit. However, the process of 652 * termination is intrinsically non-atomic. The calling thread, as 653 * well as other workers thereafter terminating help cancel queued 654 * tasks and interrupt other workers. These actions race with 655 * unterminated workers. By default, workers check for 656 * termination only when accessing pool state. This may take a 657 * while but suffices for structured computational tasks. But not 658 * necessarily for others. Class InterruptibleTask (see below) 659 * further arranges runState checks before executing task bodies, 660 * and ensures interrupts while terminating. Even so, there are no 661 * guarantees after an abrupt shutdown that remaining tasks 662 * complete normally or exceptionally or are cancelled. 663 * Termination may fail to complete if running tasks ignore both 664 * task status and interrupts and/or produce more tasks after 665 * others that could cancel them have exited. 666 * 667 * Trimming workers. To release resources after periods of lack of 668 * use, a worker starting to wait when the pool is quiescent will 669 * time out and terminate if the pool has remained quiescent for 670 * period given by field keepAlive (default 60sec), which applies 671 * to the first timeout of a quiescent pool. Subsequent cases use 672 * minimal delays such that, if still quiescent, all will be 673 * released soon thereafter. This is checked by setting the 674 * "source" field of signallee to an invalid value, that will 675 * remain invalid only if it did not process any tasks. 676 * 677 * Joining Tasks 678 * ============= 679 * 680 * The "Join" part of ForkJoinPools consists of a set of 681 * mechanisms that sometimes or always (depending on the kind of 682 * task) avoid context switching or adding worker threads when one 683 * task would otherwise be blocked waiting for completion of 684 * another, basically, just by running that task or one of its 685 * subtasks if not already taken. These mechanics are disabled for 686 * InterruptibleTasks, that guarantee that callers do not execute 687 * submitted tasks. 688 * 689 * The basic structure of joining is an extended spin/block scheme 690 * in which workers check for task completions status between 691 * steps to find other work, until relevant pool state stabilizes 692 * enough to believe that no such tasks are available, at which 693 * point blocking. This is usually a good choice of when to block 694 * that would otherwise be harder to approximate. 695 * 696 * These forms of helping may increase stack space usage, but that 697 * space is bounded in tree/dag structured procedurally parallel 698 * designs to be no more than that if a task were executed only by 699 * the joining thread. This is arranged by associated task 700 * subclasses that also help detect and control the ways in which 701 * this may occur. 702 * 703 * Normally, the first option when joining a task that is not done 704 * is to try to take it from the local queue and run it. Method 705 * tryRemoveAndExec tries to do so. For tasks with any form of 706 * subtasks that must be completed first, we try to locate these 707 * subtasks and run them as well. This is easy when local, but 708 * when stolen, steal-backs are restricted to the same rules as 709 * stealing (polling), which requires additional bookkeeping and 710 * scanning. This cost is still very much worthwhile because of 711 * its impact on task scheduling and resource control. 712 * 713 * The two methods for finding and executing subtasks vary in 714 * details. The algorithm in helpJoin entails a form of "linear 715 * helping". Each worker records (in field "source") the index of 716 * the internal queue from which it last stole a task. (Note: 717 * because chains cannot include even-numbered external queues, 718 * they are ignored, and 0 is an OK default. However, the source 719 * field is set anyway, or eventually to DROPPED, to ensure 720 * volatile memory synchronization effects.) The scan in method 721 * helpJoin uses these markers to try to find a worker to help 722 * (i.e., steal back a task from and execute it) that could make 723 * progress toward completion of the actively joined task. Thus, 724 * the joiner executes a task that would be on its own local deque 725 * if the to-be-joined task had not been stolen. This is a 726 * conservative variant of the approach described in Wagner & 727 * Calder "Leapfrogging: a portable technique for implementing 728 * efficient futures" SIGPLAN Notices, 1993 729 * (http://portal.acm.org/citation.cfm?id=155354). It differs 730 * mainly in that we only record queues, not full dependency 731 * links. This requires a linear scan of the queues to locate 732 * stealers, but isolates cost to when it is needed, rather than 733 * adding to per-task overhead. For CountedCompleters, the 734 * analogous method helpComplete doesn't need stealer-tracking, 735 * but requires a similar (but simpler) check of completion 736 * chains. 737 * 738 * In either case, searches can fail to locate stealers when 739 * stalls delay recording sources or issuing subtasks. We avoid 740 * some of these cases by using snapshotted values of ctl as a 741 * check that the numbers of workers are not changing, along with 742 * rescans to deal with contention and stalls. But even when 743 * accurately identified, stealers might not ever produce a task 744 * that the joiner can in turn help with. 745 * 746 * Related method helpAsyncBlocker does not directly rely on 747 * subtask structure, but instead avoids or postpones blocking of 748 * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by 749 * executing other asyncs that can be processed in any order. 750 * This is currently invoked only in non-join-based blocking 751 * contexts from classes CompletableFuture and 752 * SubmissionPublisher, that could be further generalized. 753 * 754 * When any of the above fail to avoid blocking, we rely on 755 * "compensation" -- an indirect form of context switching that 756 * either activates an existing worker to take the place of the 757 * blocked one, or expands the number of workers. 758 * 759 * Compensation does not by default aim to keep exactly the target 760 * parallelism number of unblocked threads running at any given 761 * time. Some previous versions of this class employed immediate 762 * compensations for any blocked join. However, in practice, the 763 * vast majority of blockages are transient byproducts of GC and 764 * other JVM or OS activities that are made worse by replacement 765 * by causing longer-term oversubscription. These are inevitable 766 * without (unobtainably) perfect information about whether worker 767 * creation is actually necessary. False alarms are common enough 768 * to negatively impact performance, so compensation is by default 769 * attempted only when it appears possible that the pool could 770 * stall due to lack of any unblocked workers. However, we allow 771 * users to override defaults using the long form of the 772 * ForkJoinPool constructor. The compensation mechanism may also 773 * be bounded. Bounds for the commonPool better enable JVMs to 774 * cope with programming errors and abuse before running out of 775 * resources to do so. 776 * 777 * The ManagedBlocker extension API can't use helping so relies 778 * only on compensation in method awaitBlocker. This API was 779 * designed to highlight the uncertainty of compensation decisions 780 * by requiring implementation of method isReleasable to abort 781 * compensation during attempts to obtain a stable snapshot. But 782 * users now rely upon the fact that if isReleasable always 783 * returns false, the API can be used to obtain precautionary 784 * compensation, which is sometimes the only reasonable option 785 * when running unknown code in tasks; which is now supported more 786 * simply (see method beginCompensatedBlock). 787 * 788 * Common Pool 789 * =========== 790 * 791 * The static common pool always exists after static 792 * initialization. Since it (or any other created pool) need 793 * never be used, we minimize initial construction overhead and 794 * footprint to the setup of about a dozen fields, although with 795 * some System property parsing and security processing that takes 796 * far longer than the actual construction when SecurityManagers 797 * are used or properties are set. The common pool is 798 * distinguished by having a null workerNamePrefix (which is an 799 * odd convention, but avoids the need to decode status in factory 800 * classes). It also has PRESET_SIZE config set if parallelism 801 * was configured by system property. 802 * 803 * When external threads use the common pool, they can perform 804 * subtask processing (see helpComplete and related methods) upon 805 * joins, unless they are submitted using ExecutorService 806 * submission methods, which implicitly disallow this. This 807 * caller-helps policy makes it sensible to set common pool 808 * parallelism level to one (or more) less than the total number 809 * of available cores, or even zero for pure caller-runs. External 810 * threads waiting for joins first check the common pool for their 811 * task, which fails quickly if the caller did not fork to common 812 * pool. 813 * 814 * Guarantees for common pool parallelism zero are limited to 815 * tasks that are joined by their callers in a tree-structured 816 * fashion or use CountedCompleters (as is true for jdk 817 * parallelStreams). Support infiltrates several methods, 818 * including those that retry helping steps until we are sure that 819 * none apply if there are no workers. 820 * 821 * As a more appropriate default in managed environments, unless 822 * overridden by system properties, we use workers of subclass 823 * InnocuousForkJoinWorkerThread when there is a SecurityManager 824 * present. These workers have no permissions set, do not belong 825 * to any user-defined ThreadGroup, and clear all ThreadLocals 826 * after executing any top-level task. The associated mechanics 827 * may be JVM-dependent and must access particular Thread class 828 * fields to achieve this effect. 829 * 830 * InterruptibleTasks 831 * ==================== 832 * 833 * Regular ForkJoinTasks manage task cancellation (method cancel) 834 * independently from the interrupt status of threads running 835 * tasks. Interrupts are issued internally only while 836 * terminating, to wake up workers and cancel queued tasks. By 837 * default, interrupts are cleared only when necessary to ensure 838 * that calls to LockSupport.park do not loop indefinitely (park 839 * returns immediately if the current thread is interrupted). 840 * 841 * To comply with ExecutorService specs, we use subclasses of 842 * abstract class InterruptibleTask for tasks that require 843 * stronger interruption and cancellation guarantees. External 844 * submitters never run these tasks, even if in the common pool. 845 * InterruptibleTasks include a "runner" field (implemented 846 * similarly to FutureTask) to support cancel(true). Upon pool 847 * shutdown, runners are interrupted so they can cancel. Since 848 * external joining callers never run these tasks, they must await 849 * cancellation by others, which can occur along several different 850 * paths. 851 * 852 * Across these APIs, rules for reporting exceptions for tasks 853 * with results accessed via join() differ from those via get(), 854 * which differ from those invoked using pool submit methods by 855 * non-workers (which comply with Future.get() specs). Internal 856 * usages of ForkJoinTasks ignore interrupt status when executing 857 * or awaiting completion. Otherwise, reporting task results or 858 * exceptions is preferred to throwing InterruptedExceptions, 859 * which are in turn preferred to timeouts. Similarly, completion 860 * status is preferred to reporting cancellation. Cancellation is 861 * reported as an unchecked exception by join(), and by worker 862 * calls to get(), but is otherwise wrapped in a (checked) 863 * ExecutionException. 864 * 865 * Worker Threads cannot be VirtualThreads, as enforced by 866 * requiring ForkJoinWorkerThreads in factories. There are 867 * several constructions relying on this. However as of this 868 * writing, virtual thread bodies are by default run as some form 869 * of InterruptibleTask. 870 * 871 * Memory placement 872 * ================ 873 * 874 * Performance is very sensitive to placement of instances of 875 * ForkJoinPool and WorkQueues and their queue arrays, as well as 876 * the placement of their fields. Caches misses and contention due 877 * to false-sharing have been observed to slow down some programs 878 * by more than a factor of four. Effects may vary across initial 879 * memory configuarations, applications, and different garbage 880 * collectors and GC settings, so there is no perfect solution. 881 * Too much isolation may generate more cache misses in common 882 * cases (because some fields snd slots are usually read at the 883 * same time). The @Contended annotation provides only rough 884 * control (for good reason). Similarly for relying on fields 885 * being placed in size-sorted declaration order. 886 * 887 * We isolate the ForkJoinPool.ctl field that otherwise causes the 888 * most false-sharing misses with respect to other fields. Also, 889 * ForkJoinPool fields are ordered such that fields less prone to 890 * contention effects are first, offsetting those that otherwise 891 * would be, while also reducing total footprint vs using 892 * multiple @Contended regions, which tends to slow down 893 * less-contended applications. To help arrange this, some 894 * non-reference fields are declared as "long" even when ints or 895 * shorts would suffice. For class WorkQueue, an 896 * embedded @Contended region segregates fields most heavily 897 * updated by owners from those most commonly read by stealers or 898 * other management. For class WorkQueue, an embedded padded 899 * region segregates fields (all declared as "int") most heavily 900 * updated by owners from those most commonly read by stealers or 901 * other management. 902 * 903 * Initial sizing and resizing of WorkQueue arrays is an even more 904 * delicate tradeoff because the best strategy systematically 905 * varies across garbage collectors. Small arrays are better for 906 * locality and reduce GC scan time, but large arrays reduce both 907 * direct false-sharing and indirect cases due to GC bookkeeping 908 * (cardmarks etc), and reduce the number of resizes, which are 909 * not especially fast because they require atomic transfers. 910 * Currently, arrays for workers are initialized to be just large 911 * enough to avoid resizing in most tree-structured tasks, but 912 * larger for external queues where both false-sharing problems 913 * and the need for resizing are more common. (Maintenance note: 914 * any changes in fields, queues, or their uses, or JVM layout 915 * policies, must be accompanied by re-evaluation of these 916 * placement and sizing decisions.) 917 * 918 * Style notes 919 * =========== 920 * 921 * Memory ordering relies mainly on atomic operations (CAS, 922 * getAndSet, getAndAdd) along with moded accesses. These use 923 * jdk-internal Unsafe for atomics and special memory modes, 924 * rather than VarHandles, to avoid initialization dependencies in 925 * other jdk components that require early parallelism. This can 926 * be awkward and ugly, but also reflects the need to control 927 * outcomes across the unusual cases that arise in very racy code 928 * with very few invariants. All atomic task slot updates use 929 * Unsafe operations requiring offset positions, not indices, as 930 * computed by method slotOffset. All fields are read into locals 931 * before use, and null-checked if they are references, even if 932 * they can never be null under current usages. Usually, 933 * computations (held in local variables) are defined as soon as 934 * logically enabled, sometimes to convince compilers that they 935 * may be performed despite memory ordering constraints. Array 936 * accesses using masked indices include checks (that are always 937 * true) that the array length is non-zero to avoid compilers 938 * inserting more expensive traps. This is usually done in a 939 * "C"-like style of listing declarations at the heads of methods 940 * or blocks, and using inline assignments on first encounter. 941 * Nearly all explicit checks lead to bypass/return, not exception 942 * throws, because they may legitimately arise during shutdown. A 943 * few unusual loop constructions encourage (with varying 944 * effectiveness) JVMs about where (not) to place safepoints. 945 * 946 * There is a lot of representation-level coupling among classes 947 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 948 * fields of WorkQueue maintain data structures managed by 949 * ForkJoinPool, so are directly accessed. There is little point 950 * trying to reduce this, since any associated future changes in 951 * representations will need to be accompanied by algorithmic 952 * changes anyway. Several methods intrinsically sprawl because 953 * they must accumulate sets of consistent reads of fields held in 954 * local variables. Some others are artificially broken up to 955 * reduce producer/consumer imbalances due to dynamic compilation. 956 * There are also other coding oddities (including several 957 * unnecessary-looking hoisted null checks) that help some methods 958 * perform reasonably even when interpreted (not compiled). 959 * 960 * The order of declarations in this file is (with a few exceptions): 961 * (1) Static configuration constants 962 * (2) Static utility functions 963 * (3) Nested (static) classes 964 * (4) Fields, along with constants used when unpacking some of them 965 * (5) Internal control methods 966 * (6) Callbacks and other support for ForkJoinTask methods 967 * (7) Exported methods 968 * (8) Static block initializing statics in minimally dependent order 969 * 970 */ 971 972 // static configuration constants 973 974 /** 975 * Default idle timeout value (in milliseconds) for idle threads 976 * to park waiting for new work before terminating. 977 */ 978 static final long DEFAULT_KEEPALIVE = 60_000L; 979 980 /** 981 * Undershoot tolerance for idle timeouts, also serving as the 982 * minimum allowed timeout value. 983 */ 984 static final long TIMEOUT_SLOP = 20L; 985 986 /** 987 * The default value for common pool maxSpares. Overridable using 988 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares" 989 * system property. The default value is far in excess of normal 990 * requirements, but also far short of maximum capacity and typical OS 991 * thread limits, so allows JVMs to catch misuse/abuse before 992 * running out of resources needed to do so. 993 */ 994 static final int DEFAULT_COMMON_MAX_SPARES = 256; 995 996 /** 997 * Initial capacity of work-stealing queue array for workers. 998 * Must be a power of two, at least 2. See above. 999 */ 1000 static final int INITIAL_QUEUE_CAPACITY = 1 << 6; 1001 1002 /** 1003 * Initial capacity of work-stealing queue array for external queues. 1004 * Must be a power of two, at least 2. See above. 1005 */ 1006 static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9; 1007 1008 // conversions among short, int, long 1009 static final int SMASK = 0xffff; // (unsigned) short bits 1010 static final long LMASK = 0xffffffffL; // lower 32 bits of long 1011 static final long UMASK = ~LMASK; // upper 32 bits 1012 1013 // masks and sentinels for queue indices 1014 static final int MAX_CAP = 0x7fff; // max # workers 1015 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id 1016 static final int INVALID_ID = 0x4000; // unused external queue id 1017 1018 // pool.runState bits 1019 static final long STOP = 1L << 0; // terminating 1020 static final long SHUTDOWN = 1L << 1; // terminate when quiescent 1021 static final long TERMINATED = 1L << 2; // only set if STOP also set 1022 static final long RS_LOCK = 1L << 3; // lowest seqlock bit 1023 1024 // spin/sleep limits for runState locking and elsewhere 1025 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait 1026 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos 1027 static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos 1028 1029 // {pool, workQueue} config bits 1030 static final int FIFO = 1 << 0; // fifo queue or access mode 1031 static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers 1032 static final int PRESET_SIZE = 1 << 2; // size was set by property 1033 1034 // others 1035 static final int DROPPED = 1 << 16; // removed from ctl counts 1036 static final int UNCOMPENSATE = 1 << 16; // tryCompensate return 1037 static final int IDLE = 1 << 16; // phase seqlock/version count 1038 1039 /* 1040 * Bits and masks for ctl and bounds are packed with 4 16 bit subfields: 1041 * RC: Number of released (unqueued) workers 1042 * TC: Number of total workers 1043 * SS: version count and status of top waiting thread 1044 * ID: poolIndex of top of Treiber stack of waiters 1045 * 1046 * When convenient, we can extract the lower 32 stack top bits 1047 * (including version bits) as sp=(int)ctl. When sp is non-zero, 1048 * there are waiting workers. Count fields may be transiently 1049 * negative during termination because of out-of-order updates. 1050 * To deal with this, we use casts in and out of "short" and/or 1051 * signed shifts to maintain signedness. Because it occupies 1052 * uppermost bits, we can add one release count using getAndAdd of 1053 * RC_UNIT, rather than CAS, when returning from a blocked join. 1054 * Other updates of multiple subfields require CAS. 1055 */ 1056 1057 // Release counts 1058 static final int RC_SHIFT = 48; 1059 static final long RC_UNIT = 0x0001L << RC_SHIFT; 1060 static final long RC_MASK = 0xffffL << RC_SHIFT; 1061 // Total counts 1062 static final int TC_SHIFT = 32; 1063 static final long TC_UNIT = 0x0001L << TC_SHIFT; 1064 static final long TC_MASK = 0xffffL << TC_SHIFT; 1065 1066 /* 1067 * All atomic operations on task arrays (queues) use Unsafe 1068 * operations that take array offsets versus indices, based on 1069 * array base and shift constants established during static 1070 * initialization. 1071 */ 1072 static final long ABASE; 1073 static final int ASHIFT; 1074 1075 // Static utilities 1076 1077 /** 1078 * Returns the array offset corresponding to the given index for 1079 * Unsafe task queue operations 1080 */ 1081 static long slotOffset(int index) { 1082 return ((long)index << ASHIFT) + ABASE; 1083 } 1084 1085 /** 1086 * If there is a security manager, makes sure caller has 1087 * permission to modify threads. 1088 */ 1089 @SuppressWarnings("removal") 1090 private static void checkPermission() { 1091 SecurityManager security; RuntimePermission perm; 1092 if ((security = System.getSecurityManager()) != null) { 1093 if ((perm = modifyThreadPermission) == null) 1094 modifyThreadPermission = perm = // races OK 1095 new RuntimePermission("modifyThread"); 1096 security.checkPermission(perm); 1097 } 1098 } 1099 1100 // Nested classes 1101 1102 /** 1103 * Factory for creating new {@link ForkJoinWorkerThread}s. 1104 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 1105 * for {@code ForkJoinWorkerThread} subclasses that extend base 1106 * functionality or initialize threads with different contexts. 1107 */ 1108 public static interface ForkJoinWorkerThreadFactory { 1109 /** 1110 * Returns a new worker thread operating in the given pool. 1111 * Returning null or throwing an exception may result in tasks 1112 * never being executed. If this method throws an exception, 1113 * it is relayed to the caller of the method (for example 1114 * {@code execute}) causing attempted thread creation. If this 1115 * method returns null or throws an exception, it is not 1116 * retried until the next attempted creation (for example 1117 * another call to {@code execute}). 1118 * 1119 * @param pool the pool this thread works in 1120 * @return the new worker thread, or {@code null} if the request 1121 * to create a thread is rejected 1122 * @throws NullPointerException if the pool is null 1123 */ 1124 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 1125 } 1126 1127 /** 1128 * Default ForkJoinWorkerThreadFactory implementation; creates a 1129 * new ForkJoinWorkerThread using the system class loader as the 1130 * thread context class loader. 1131 */ 1132 static final class DefaultForkJoinWorkerThreadFactory 1133 implements ForkJoinWorkerThreadFactory { 1134 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 1135 boolean isCommon = (pool.workerNamePrefix == null); 1136 @SuppressWarnings("removal") 1137 SecurityManager sm = System.getSecurityManager(); 1138 if (sm != null && isCommon) 1139 return newCommonWithACC(pool); 1140 else 1141 return newRegularWithACC(pool); 1142 } 1143 1144 /* 1145 * Create and use static AccessControlContexts only if there 1146 * is a SecurityManager. (These can be removed if/when 1147 * SecurityManagers are removed from platform.) The ACCs are 1148 * immutable and equivalent even when racily initialized, so 1149 * they don't require locking, although with the chance of 1150 * needlessly duplicate construction. 1151 */ 1152 @SuppressWarnings("removal") 1153 static volatile AccessControlContext regularACC, commonACC; 1154 1155 @SuppressWarnings("removal") 1156 static ForkJoinWorkerThread newRegularWithACC(ForkJoinPool pool) { 1157 AccessControlContext acc = regularACC; 1158 if (acc == null) { 1159 Permissions ps = new Permissions(); 1160 ps.add(new RuntimePermission("getClassLoader")); 1161 ps.add(new RuntimePermission("setContextClassLoader")); 1162 regularACC = acc = 1163 new AccessControlContext(new ProtectionDomain[] { 1164 new ProtectionDomain(null, ps) }); 1165 } 1166 return AccessController.doPrivileged( 1167 new PrivilegedAction<>() { 1168 public ForkJoinWorkerThread run() { 1169 return new ForkJoinWorkerThread(null, pool, true, false); 1170 }}, acc); 1171 } 1172 1173 @SuppressWarnings("removal") 1174 static ForkJoinWorkerThread newCommonWithACC(ForkJoinPool pool) { 1175 AccessControlContext acc = commonACC; 1176 if (acc == null) { 1177 Permissions ps = new Permissions(); 1178 ps.add(new RuntimePermission("getClassLoader")); 1179 ps.add(new RuntimePermission("setContextClassLoader")); 1180 ps.add(new RuntimePermission("modifyThread")); 1181 ps.add(new RuntimePermission("enableContextClassLoaderOverride")); 1182 ps.add(new RuntimePermission("modifyThreadGroup")); 1183 commonACC = acc = 1184 new AccessControlContext(new ProtectionDomain[] { 1185 new ProtectionDomain(null, ps) }); 1186 } 1187 return AccessController.doPrivileged( 1188 new PrivilegedAction<>() { 1189 public ForkJoinWorkerThread run() { 1190 return new ForkJoinWorkerThread. 1191 InnocuousForkJoinWorkerThread(pool); 1192 }}, acc); 1193 } 1194 } 1195 1196 /** 1197 * Queues supporting work-stealing as well as external task 1198 * submission. See above for descriptions and algorithms. 1199 */ 1200 static final class WorkQueue { 1201 // fields declared in order of their likely layout on most VMs 1202 final ForkJoinWorkerThread owner; // null if shared 1203 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size 1204 int base; // index of next slot for poll 1205 final int config; // mode bits 1206 1207 // fields otherwise causing more unnecessary false-sharing cache misses 1208 @jdk.internal.vm.annotation.Contended("w") 1209 int top; // index of next slot for push 1210 @jdk.internal.vm.annotation.Contended("w") 1211 volatile int phase; // versioned active status 1212 @jdk.internal.vm.annotation.Contended("w") 1213 int stackPred; // pool stack (ctl) predecessor link 1214 @jdk.internal.vm.annotation.Contended("w") 1215 volatile int source; // source queue id (or DROPPED) 1216 @jdk.internal.vm.annotation.Contended("w") 1217 int nsteals; // number of steals from other queues 1218 @jdk.internal.vm.annotation.Contended("w") 1219 volatile int parking; // nonzero if parked in awaitWork 1220 1221 // Support for atomic operations 1222 private static final Unsafe U; 1223 private static final long PHASE; 1224 private static final long BASE; 1225 private static final long TOP; 1226 private static final long ARRAY; 1227 1228 final void updateBase(int v) { 1229 U.putIntVolatile(this, BASE, v); 1230 } 1231 final void updateTop(int v) { 1232 U.putIntOpaque(this, TOP, v); 1233 } 1234 final void updateArray(ForkJoinTask<?>[] a) { 1235 U.getAndSetReference(this, ARRAY, a); 1236 } 1237 final void unlockPhase() { 1238 U.getAndAddInt(this, PHASE, IDLE); 1239 } 1240 final boolean tryLockPhase() { // seqlock acquire 1241 int p; 1242 return (((p = phase) & IDLE) != 0 && 1243 U.compareAndSetInt(this, PHASE, p, p + IDLE)); 1244 } 1245 1246 /** 1247 * Constructor. For internal queues, most fields are initialized 1248 * upon thread start in pool.registerWorker. 1249 */ 1250 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, 1251 boolean clearThreadLocals) { 1252 array = new ForkJoinTask<?>[owner == null ? 1253 INITIAL_EXTERNAL_QUEUE_CAPACITY : 1254 INITIAL_QUEUE_CAPACITY]; 1255 this.owner = owner; 1256 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg; 1257 } 1258 1259 /** 1260 * Returns an exportable index (used by ForkJoinWorkerThread). 1261 */ 1262 final int getPoolIndex() { 1263 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit 1264 } 1265 1266 /** 1267 * Returns the approximate number of tasks in the queue. 1268 */ 1269 final int queueSize() { 1270 int unused = phase; // for ordering effect 1271 return Math.max(top - base, 0); // ignore transient negative 1272 } 1273 1274 /** 1275 * Pushes a task. Called only by owner or if already locked 1276 * 1277 * @param task the task. Caller must ensure non-null. 1278 * @param pool the pool to signal if was previously empty, else null 1279 * @param internal if caller owns this queue 1280 * @throws RejectedExecutionException if array could not be resized 1281 */ 1282 final void push(ForkJoinTask<?> task, ForkJoinPool pool, 1283 boolean internal) { 1284 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a; 1285 if ((a = array) != null && (cap = a.length) > 0) { // else disabled 1286 if ((room = (m = cap - 1) - (s - b)) >= 0) { 1287 top = s + 1; 1288 long pos = slotOffset(m & s); 1289 if (!internal) 1290 U.putReference(a, pos, task); // inside lock 1291 else 1292 U.getAndSetReference(a, pos, task); // fully fenced 1293 if (room == 0) // resize 1294 growArray(a, cap, s); 1295 } 1296 if (!internal) 1297 unlockPhase(); 1298 if (room < 0) 1299 throw new RejectedExecutionException("Queue capacity exceeded"); 1300 else if ((room == 0 || 1301 a[m & (s - 2)] == null) && // at most one existing task 1302 pool != null) 1303 pool.signalWork(); 1304 } 1305 } 1306 1307 /** 1308 * Resizes the queue array unless out of memory. 1309 * @param a old array 1310 * @param cap old array capacity 1311 * @param s current top 1312 */ 1313 private void growArray(ForkJoinTask<?>[] a, int cap, int s) { 1314 int newCap = cap << 1; 1315 if (a != null && a.length == cap && cap > 0 && newCap > 0) { 1316 ForkJoinTask<?>[] newArray = null; 1317 try { 1318 newArray = new ForkJoinTask<?>[newCap]; 1319 } catch (OutOfMemoryError ex) { 1320 } 1321 if (newArray != null) { // else throw on next push 1322 int mask = cap - 1, newMask = newCap - 1; 1323 for (int k = s, j = cap; j > 0; --j, --k) { 1324 ForkJoinTask<?> u; // poll old, push to new 1325 if ((u = (ForkJoinTask<?>)U.getAndSetReference( 1326 a, slotOffset(k & mask), null)) == null) 1327 break; // lost to pollers 1328 newArray[k & newMask] = u; 1329 } 1330 updateArray(newArray); // fully fenced 1331 } 1332 } 1333 } 1334 1335 /** 1336 * Takes next task, if one exists, in order specified by mode, 1337 * so acts as either local-pop or local-poll. Called only by owner. 1338 * @param fifo nonzero if FIFO mode 1339 */ 1340 private ForkJoinTask<?> nextLocalTask(int fifo) { 1341 ForkJoinTask<?> t = null; 1342 ForkJoinTask<?>[] a = array; 1343 int b = base, p = top, cap; 1344 if (p - b > 0 && a != null && (cap = a.length) > 0) { 1345 for (int m = cap - 1, s, nb;;) { 1346 if (fifo == 0 || (nb = b + 1) == p) { 1347 if ((t = (ForkJoinTask<?>)U.getAndSetReference( 1348 a, slotOffset(m & (s = p - 1)), null)) != null) 1349 updateTop(s); // else lost race for only task 1350 break; 1351 } 1352 if ((t = (ForkJoinTask<?>)U.getAndSetReference( 1353 a, slotOffset(m & b), null)) != null) { 1354 updateBase(nb); 1355 break; 1356 } 1357 while (b == (b = base)) { 1358 U.loadFence(); 1359 Thread.onSpinWait(); // spin to reduce memory traffic 1360 } 1361 if (p - b <= 0) 1362 break; 1363 } 1364 } 1365 return t; 1366 } 1367 1368 /** 1369 * Takes next task, if one exists, using configured mode. 1370 * (Always internal, never called for Common pool.) 1371 */ 1372 final ForkJoinTask<?> nextLocalTask() { 1373 return nextLocalTask(config & FIFO); 1374 } 1375 1376 /** 1377 * Pops the given task only if it is at the current top. 1378 * @param task the task. Caller must ensure non-null. 1379 * @param internal if caller owns this queue 1380 */ 1381 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) { 1382 boolean taken = false; 1383 ForkJoinTask<?>[] a = array; 1384 int p = top, s = p - 1, cap, k; 1385 if (a != null && (cap = a.length) > 0 && 1386 a[k = (cap - 1) & s] == task && 1387 (internal || tryLockPhase())) { 1388 if (top == p && 1389 U.compareAndSetReference(a, slotOffset(k), task, null)) { 1390 taken = true; 1391 updateTop(s); 1392 } 1393 if (!internal) 1394 unlockPhase(); 1395 } 1396 return taken; 1397 } 1398 1399 /** 1400 * Returns next task, if one exists, in order specified by mode. 1401 */ 1402 final ForkJoinTask<?> peek() { 1403 ForkJoinTask<?>[] a = array; 1404 int b = base, cfg = config, p = top, cap; 1405 if (p != b && a != null && (cap = a.length) > 0) { 1406 if ((cfg & FIFO) == 0) 1407 return a[(cap - 1) & (p - 1)]; 1408 else { // skip over in-progress removals 1409 ForkJoinTask<?> t; 1410 for ( ; p - b > 0; ++b) { 1411 if ((t = a[(cap - 1) & b]) != null) 1412 return t; 1413 } 1414 } 1415 } 1416 return null; 1417 } 1418 1419 /** 1420 * Polls for a task. Used only by non-owners. 1421 */ 1422 final ForkJoinTask<?> poll() { 1423 for (int b = base;;) { 1424 int cap, k, nb; ForkJoinTask<?>[] a; 1425 if ((a = array) == null || (cap = a.length) <= 0) 1426 break; 1427 long kp = slotOffset(k = (cap - 1) & b); 1428 int nk = (nb = b + 1) & (cap - 1); // next slot 1429 int sk = (b + 2) & (cap - 1); // 2nd slot ahead 1430 ForkJoinTask<?> t = a[k]; 1431 U.loadFence(); 1432 if (b == (b = base)) { // else inconsistent 1433 if (t != null) { 1434 if (U.compareAndSetReference(a, kp, t, null)) { 1435 updateBase(nb); 1436 return t; 1437 } 1438 b = base; 1439 } 1440 else if (a[sk] == null && a[nk] == null && a[k] == null) { 1441 if (top - b <= 0) 1442 break; // empty 1443 Thread.onSpinWait(); // stalled 1444 } 1445 } 1446 } 1447 return null; 1448 } 1449 1450 // specialized execution methods 1451 1452 /** 1453 * Runs the given task, as well as remaining local tasks. 1454 */ 1455 final void topLevelExec(ForkJoinTask<?> task, int cfg) { 1456 int fifo = cfg & FIFO; 1457 while (task != null) { 1458 task.doExec(); 1459 task = nextLocalTask(fifo); 1460 } 1461 if ((cfg & CLEAR_TLS) != 0) 1462 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); 1463 } 1464 1465 /** 1466 * Deep form of tryUnpush: Traverses from top and removes and 1467 * runs task if present. 1468 */ 1469 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) { 1470 ForkJoinTask<?>[] a = array; 1471 int b = base, p = top, s = p - 1, d = p - b, cap; 1472 if (a != null && (cap = a.length) > 0) { 1473 for (int m = cap - 1, i = s; d > 0; --i, --d) { 1474 ForkJoinTask<?> t; int k; boolean taken; 1475 if ((t = a[k = i & m]) == null) 1476 break; 1477 if (t == task) { 1478 long pos = slotOffset(k); 1479 if (!internal && !tryLockPhase()) 1480 break; // fail if locked 1481 if (taken = 1482 (top == p && 1483 U.compareAndSetReference(a, pos, task, null))) { 1484 if (i == s) // act as pop 1485 updateTop(s); 1486 else if (i == base) // act as poll 1487 updateBase(i + 1); 1488 else { // swap with top 1489 U.putReferenceVolatile( 1490 a, pos, (ForkJoinTask<?>) 1491 U.getAndSetReference( 1492 a, slotOffset(s & m), null)); 1493 updateTop(s); 1494 } 1495 } 1496 if (!internal) 1497 unlockPhase(); 1498 if (taken) 1499 task.doExec(); 1500 break; 1501 } 1502 } 1503 } 1504 } 1505 1506 /** 1507 * Tries to pop and run tasks within the target's computation 1508 * until done, not found, or limit exceeded. 1509 * 1510 * @param task root of computation 1511 * @param limit max runs, or zero for no limit 1512 * @return task status if known to be done 1513 */ 1514 final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) { 1515 int status = 0; 1516 if (task != null) { 1517 outer: for (;;) { 1518 ForkJoinTask<?>[] a; ForkJoinTask<?> t; boolean taken; 1519 int stat, p, s, cap, k; 1520 if ((stat = task.status) < 0) { 1521 status = stat; 1522 break; 1523 } 1524 if ((a = array) == null || (cap = a.length) <= 0) 1525 break; 1526 if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null) 1527 break; 1528 if (!(t instanceof CountedCompleter)) 1529 break; 1530 CountedCompleter<?> f = (CountedCompleter<?>)t; 1531 for (int steps = cap;;) { // bound path 1532 if (f == task) 1533 break; 1534 if ((f = f.completer) == null || --steps == 0) 1535 break outer; 1536 } 1537 if (!internal && !tryLockPhase()) 1538 break; 1539 if (taken = 1540 (top == p && 1541 U.compareAndSetReference(a, slotOffset(k), t, null))) 1542 updateTop(s); 1543 if (!internal) 1544 unlockPhase(); 1545 if (!taken) 1546 break; 1547 t.doExec(); 1548 if (limit != 0 && --limit == 0) 1549 break; 1550 } 1551 } 1552 return status; 1553 } 1554 1555 /** 1556 * Tries to poll and run AsynchronousCompletionTasks until 1557 * none found or blocker is released 1558 * 1559 * @param blocker the blocker 1560 */ 1561 final void helpAsyncBlocker(ManagedBlocker blocker) { 1562 for (;;) { 1563 ForkJoinTask<?>[] a; int b, cap, k; 1564 if ((a = array) == null || (cap = a.length) <= 0) 1565 break; 1566 ForkJoinTask<?> t = a[k = (b = base) & (cap - 1)]; 1567 U.loadFence(); 1568 if (t == null) { 1569 if (top - b <= 0) 1570 break; 1571 } 1572 else if (!(t instanceof CompletableFuture 1573 .AsynchronousCompletionTask)) 1574 break; 1575 if (blocker != null && blocker.isReleasable()) 1576 break; 1577 if (base == b && t != null && 1578 U.compareAndSetReference(a, slotOffset(k), t, null)) { 1579 updateBase(b + 1); 1580 t.doExec(); 1581 } 1582 } 1583 } 1584 1585 // misc 1586 1587 /** 1588 * Returns true if internal and not known to be blocked. 1589 */ 1590 final boolean isApparentlyUnblocked() { 1591 Thread wt; Thread.State s; 1592 return ((wt = owner) != null && (phase & IDLE) != 0 && 1593 (s = wt.getState()) != Thread.State.BLOCKED && 1594 s != Thread.State.WAITING && 1595 s != Thread.State.TIMED_WAITING); 1596 } 1597 1598 static { 1599 U = Unsafe.getUnsafe(); 1600 Class<WorkQueue> klass = WorkQueue.class; 1601 PHASE = U.objectFieldOffset(klass, "phase"); 1602 BASE = U.objectFieldOffset(klass, "base"); 1603 TOP = U.objectFieldOffset(klass, "top"); 1604 ARRAY = U.objectFieldOffset(klass, "array"); 1605 } 1606 } 1607 1608 // static fields (initialized in static initializer below) 1609 1610 /** 1611 * Creates a new ForkJoinWorkerThread. This factory is used unless 1612 * overridden in ForkJoinPool constructors. 1613 */ 1614 public static final ForkJoinWorkerThreadFactory 1615 defaultForkJoinWorkerThreadFactory; 1616 1617 /** 1618 * Common (static) pool. Non-null for public use unless a static 1619 * construction exception, but internal usages null-check on use 1620 * to paranoically avoid potential initialization circularities 1621 * as well as to simplify generated code. 1622 */ 1623 static final ForkJoinPool common; 1624 1625 /** 1626 * Sequence number for creating worker names 1627 */ 1628 private static volatile int poolIds; 1629 1630 /** 1631 * Permission required for callers of methods that may start or 1632 * kill threads. Lazily constructed. 1633 */ 1634 static volatile RuntimePermission modifyThreadPermission; 1635 1636 // fields declared in order of their likely layout on most VMs 1637 volatile CountDownLatch termination; // lazily constructed 1638 final Predicate<? super ForkJoinPool> saturate; 1639 final ForkJoinWorkerThreadFactory factory; 1640 final UncaughtExceptionHandler ueh; // per-worker UEH 1641 final SharedThreadContainer container; 1642 final String workerNamePrefix; // null for common pool 1643 WorkQueue[] queues; // main registry 1644 volatile long runState; // versioned, lockable 1645 final long keepAlive; // milliseconds before dropping if idle 1646 final long config; // static configuration bits 1647 volatile long stealCount; // collects worker nsteals 1648 volatile long threadIds; // for worker thread names 1649 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 1650 volatile long ctl; // main pool control 1651 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate 1652 int parallelism; // target number of workers 1653 1654 // Support for atomic operations 1655 private static final Unsafe U; 1656 private static final long CTL; 1657 private static final long RUNSTATE; 1658 private static final long PARALLELISM; 1659 private static final long THREADIDS; 1660 private static final long TERMINATION; 1661 private static final Object POOLIDS_BASE; 1662 private static final long POOLIDS; 1663 1664 private boolean compareAndSetCtl(long c, long v) { 1665 return U.compareAndSetLong(this, CTL, c, v); 1666 } 1667 private long compareAndExchangeCtl(long c, long v) { 1668 return U.compareAndExchangeLong(this, CTL, c, v); 1669 } 1670 private long getAndAddCtl(long v) { 1671 return U.getAndAddLong(this, CTL, v); 1672 } 1673 private long incrementThreadIds() { 1674 return U.getAndAddLong(this, THREADIDS, 1L); 1675 } 1676 private static int getAndAddPoolIds(int x) { 1677 return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x); 1678 } 1679 private int getAndSetParallelism(int v) { 1680 return U.getAndSetInt(this, PARALLELISM, v); 1681 } 1682 private int getParallelismOpaque() { 1683 return U.getIntOpaque(this, PARALLELISM); 1684 } 1685 private CountDownLatch cmpExTerminationSignal(CountDownLatch x) { 1686 return (CountDownLatch) 1687 U.compareAndExchangeReference(this, TERMINATION, null, x); 1688 } 1689 1690 // runState operations 1691 1692 private long getAndBitwiseOrRunState(long v) { // for status bits 1693 return U.getAndBitwiseOrLong(this, RUNSTATE, v); 1694 } 1695 private boolean casRunState(long c, long v) { 1696 return U.compareAndSetLong(this, RUNSTATE, c, v); 1697 } 1698 private void unlockRunState() { // increment lock bit 1699 U.getAndAddLong(this, RUNSTATE, RS_LOCK); 1700 } 1701 private long lockRunState() { // lock and return current state 1702 long s, u; // locked when RS_LOCK set 1703 if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK)) 1704 return u; 1705 else 1706 return spinLockRunState(); 1707 } 1708 private long spinLockRunState() { // spin/sleep 1709 for (int waits = 0;;) { 1710 long s, u; 1711 if (((s = runState) & RS_LOCK) == 0L) { 1712 if (casRunState(s, u = s + RS_LOCK)) 1713 return u; 1714 waits = 0; 1715 } else if (waits < SPIN_WAITS) { 1716 ++waits; 1717 Thread.onSpinWait(); 1718 } else { 1719 if (waits < MIN_SLEEP) 1720 waits = MIN_SLEEP; 1721 LockSupport.parkNanos(this, (long)waits); 1722 if (waits < MAX_SLEEP) 1723 waits <<= 1; 1724 } 1725 } 1726 } 1727 1728 static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask 1729 return p != null && (p.runState & STOP) != 0L; 1730 } 1731 1732 // Creating, registering, and deregistering workers 1733 1734 /** 1735 * Tries to construct and start one worker. Assumes that total 1736 * count has already been incremented as a reservation. Invokes 1737 * deregisterWorker on any failure. 1738 * 1739 * @return true if successful 1740 */ 1741 private boolean createWorker() { 1742 ForkJoinWorkerThreadFactory fac = factory; 1743 SharedThreadContainer ctr = container; 1744 Throwable ex = null; 1745 ForkJoinWorkerThread wt = null; 1746 try { 1747 if ((runState & STOP) == 0L && // avoid construction if terminating 1748 fac != null && (wt = fac.newThread(this)) != null) { 1749 if (ctr != null) 1750 ctr.start(wt); 1751 else 1752 wt.start(); 1753 return true; 1754 } 1755 } catch (Throwable rex) { 1756 ex = rex; 1757 } 1758 deregisterWorker(wt, ex); 1759 return false; 1760 } 1761 1762 /** 1763 * Provides a name for ForkJoinWorkerThread constructor. 1764 */ 1765 final String nextWorkerThreadName() { 1766 String prefix = workerNamePrefix; 1767 long tid = incrementThreadIds() + 1L; 1768 if (prefix == null) // commonPool has no prefix 1769 prefix = "ForkJoinPool.commonPool-worker-"; 1770 return prefix.concat(Long.toString(tid)); 1771 } 1772 1773 /** 1774 * Finishes initializing and records internal queue. 1775 * 1776 * @param w caller's WorkQueue 1777 */ 1778 final void registerWorker(WorkQueue w) { 1779 if (w != null) { 1780 ThreadLocalRandom.localInit(); 1781 int seed = w.stackPred = ThreadLocalRandom.getProbe(); 1782 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag 1783 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan 1784 long stop = lockRunState() & STOP; 1785 try { 1786 WorkQueue[] qs; int n; 1787 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) { 1788 for (int k = n, m = n - 1; ; id += 2) { 1789 if (qs[id &= m] == null) 1790 break; 1791 if ((k -= 2) <= 0) { 1792 id |= n; 1793 break; 1794 } 1795 } 1796 w.phase = id | phaseSeq; // now publishable 1797 if (id < n) 1798 qs[id] = w; 1799 else { // expand 1800 int an = n << 1, am = an - 1; 1801 WorkQueue[] as = new WorkQueue[an]; 1802 as[id & am] = w; 1803 for (int j = 1; j < n; j += 2) 1804 as[j] = qs[j]; 1805 for (int j = 0; j < n; j += 2) { 1806 WorkQueue q; // shared queues may move 1807 if ((q = qs[j]) != null) 1808 as[q.phase & EXTERNAL_ID_MASK & am] = q; 1809 } 1810 U.storeFence(); // fill before publish 1811 queues = as; 1812 } 1813 } 1814 } finally { 1815 unlockRunState(); 1816 } 1817 } 1818 } 1819 1820 /** 1821 * Final callback from terminating worker, as well as upon failure 1822 * to construct or start a worker. Removes record of worker from 1823 * array, and adjusts counts. If pool is shutting down, tries to 1824 * complete termination. 1825 * 1826 * @param wt the worker thread, or null if construction failed 1827 * @param ex the exception causing failure, or null if none 1828 */ 1829 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1830 if ((runState & STOP) != 0L) // ensure released 1831 releaseAll(); 1832 WorkQueue w = null; 1833 int src = 0, phase = 0; 1834 boolean replaceable = false; 1835 if (wt != null && (w = wt.workQueue) != null) { 1836 phase = w.phase; 1837 if ((src = w.source) != DROPPED) { 1838 w.source = DROPPED; // else already dropped 1839 if (phase != 0) { // else failed to start 1840 replaceable = true; 1841 if (w.top - w.base > 0) { 1842 ForkJoinTask<?> t; // cancel remaining tasks 1843 while ((t = w.nextLocalTask()) != null) { 1844 try { 1845 t.cancel(false); 1846 } catch (Throwable ignore) { 1847 } 1848 } 1849 } 1850 } 1851 } 1852 } 1853 if (src != DROPPED) { // decrement counts 1854 long c = ctl; 1855 do {} while (c != (c = compareAndExchangeCtl( 1856 c, ((RC_MASK & (c - RC_UNIT)) | 1857 (TC_MASK & (c - TC_UNIT)) | 1858 (LMASK & c))))); 1859 } 1860 if ((tryTerminate(false, false) & STOP) == 0L && w != null) { 1861 WorkQueue[] qs; int n, i; // remove index unless terminating 1862 long ns = w.nsteals & 0xffffffffL; 1863 if ((lockRunState() & STOP) != 0L) 1864 replaceable = false; 1865 else if ((qs = queues) != null && (n = qs.length) > 0 && 1866 qs[i = phase & SMASK & (n - 1)] == w) { 1867 qs[i] = null; 1868 stealCount += ns; // accumulate steals 1869 } 1870 unlockRunState(); 1871 if (replaceable) 1872 signalWork(); 1873 } 1874 if (ex != null) 1875 ForkJoinTask.rethrow(ex); 1876 } 1877 1878 /** 1879 * Releases an idle worker, or creates one if not enough exist. 1880 */ 1881 final void signalWork() { 1882 int pc = parallelism; 1883 for (long c = ctl;;) { 1884 WorkQueue[] qs = queues; 1885 long ac = (c + RC_UNIT) & RC_MASK, nc; 1886 int sp = (int)c, i = sp & SMASK; 1887 if ((short)(c >>> RC_SHIFT) >= pc) 1888 break; 1889 if (qs == null) 1890 break; 1891 if (qs.length <= i) 1892 break; 1893 WorkQueue w = qs[i], v = null; 1894 if (sp == 0) { 1895 if ((short)(c >>> TC_SHIFT) >= pc) 1896 break; 1897 nc = ((c + TC_UNIT) & TC_MASK); 1898 } 1899 else if ((v = w) == null) 1900 break; 1901 else 1902 nc = (v.stackPred & LMASK) | (c & TC_MASK); 1903 if (c == (c = compareAndExchangeCtl(c, nc | ac))) { 1904 if (v == null) 1905 createWorker(); 1906 else { 1907 v.phase = sp; 1908 if (v.parking != 0) 1909 U.unpark(v.owner); 1910 } 1911 break; 1912 } 1913 } 1914 } 1915 1916 /** 1917 * Releases all waiting workers. Called only during shutdown. 1918 * 1919 * @return current ctl 1920 */ 1921 private long releaseAll() { 1922 long c = ctl; 1923 for (;;) { 1924 WorkQueue[] qs; WorkQueue v; int sp, i; 1925 if ((sp = (int)c) == 0 || (qs = queues) == null || 1926 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null) 1927 break; 1928 if (c == (c = compareAndExchangeCtl( 1929 c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | 1930 (v.stackPred & LMASK))))) { 1931 v.phase = sp; 1932 if (v.parking != 0) 1933 U.unpark(v.owner); 1934 } 1935 } 1936 return c; 1937 } 1938 1939 /** 1940 * Internal version of isQuiescent and related functionality. 1941 * @return positive if stopping, nonnegative if terminating or all 1942 * workers are inactive and submission queues are empty and 1943 * unlocked; if so, setting STOP if shutdown is enabled 1944 */ 1945 private int quiescent() { 1946 outer: for (;;) { 1947 long phaseSum = 0L; 1948 boolean swept = false; 1949 for (long e, prevRunState = 0L; ; prevRunState = e) { 1950 long c = ctl; 1951 if (((e = runState) & STOP) != 0L) 1952 return 1; // terminating 1953 else if ((c & RC_MASK) > 0L) 1954 return -1; // at least one active 1955 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) { 1956 long sum = c; 1957 WorkQueue[] qs = queues; 1958 int n = (qs == null) ? 0 : qs.length; 1959 for (int i = 0; i < n; ++i) { // scan queues 1960 WorkQueue q; 1961 if ((q = qs[i]) != null) { 1962 int p = q.phase, s = q.top, b = q.base; 1963 sum += (p & 0xffffffffL) | ((long)b << 32); 1964 if ((p & IDLE) == 0 || s - b > 0) 1965 return -1; 1966 } 1967 } 1968 swept = (phaseSum == (phaseSum = sum)); 1969 } 1970 else if ((e & SHUTDOWN) == 0) 1971 return 0; 1972 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) 1973 return 1; // enable termination 1974 else 1975 break; // restart 1976 } 1977 } 1978 } 1979 1980 /** 1981 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1982 * See above for explanation. 1983 * 1984 * @param w caller's WorkQueue (may be null on failed initialization) 1985 */ 1986 final void runWorker(WorkQueue w) { 1987 if (w != null) { 1988 int phase = w.phase, r = w.stackPred; // seed from registerWorker 1989 int cfg = w.config, src = -1, nsteals = 0; 1990 rescan: for (boolean scanned = false;;) { 1991 WorkQueue[] qs; 1992 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 1993 if ((runState & STOP) != 0L || (qs = queues) == null) 1994 return; 1995 int n = qs.length, i = r, step = (r >>> 16) | 1; 1996 for (int l = n; l > 0; --l, i += step) { // scan queues 1997 int j; WorkQueue q; 1998 if ((q = qs[j = i & (n - 1)]) != null) { 1999 boolean taken = false; 2000 for (int pb = -1, b = q.base;;) { 2001 int cap, k, nb; ForkJoinTask<?>[] a; 2002 if ((a = q.array) == null || (cap = a.length) <= 0) 2003 continue rescan; 2004 long kp = slotOffset(k = (cap - 1) & b); 2005 int nk = (nb = b + 1) & (cap - 1); // next slot 2006 int sk = (b + 2) & (cap - 1); // 2nd slot ahead 2007 ForkJoinTask<?> t = a[k]; 2008 U.loadFence(); 2009 if (b != (b = q.base)) 2010 ; // inconsistent 2011 else if (t == null) { // possibly empty 2012 if (a[sk] == null && a[nk] == null && 2013 a[k] == null) { // screen 2014 if (q.top - b > 0) { // stalled 2015 if (!taken) // move unless taking 2016 continue rescan; 2017 } 2018 else if (taken) 2019 continue rescan; // depleted; restart 2020 else 2021 break; // empty 2022 } 2023 if (pb == (pb = b)) // base unchanged 2024 Thread.onSpinWait(); 2025 } 2026 else if (!U.compareAndSetReference(a, kp, t, null)) 2027 b = q.base; // contended 2028 else { 2029 q.base = nb; 2030 w.nsteals = ++nsteals; 2031 w.source = j; // volatile write 2032 if (taken != (taken = true) && a[nk] != null) 2033 signalWork(); // propagate signal 2034 w.topLevelExec(t, cfg); 2035 if ((b = q.base) != nb && src != (src = j)) 2036 continue rescan; // reduce interference 2037 } 2038 } 2039 } 2040 } 2041 if (!scanned) 2042 scanned = true; // rescan before deactivate 2043 else if (((phase = deactivate(w, r, phase)) & IDLE) == 0) 2044 scanned = false; 2045 else 2046 return; 2047 } 2048 } 2049 } 2050 2051 /** 2052 * Deactivates and if necessary awaits signal or termination. 2053 * 2054 * @param w the worker 2055 * @param r random seed 2056 * @param phase current phase 2057 * @return current phase, with IDLE set if worker should exit 2058 */ 2059 private int deactivate(WorkQueue w, int r, int phase) { 2060 int p = phase | IDLE, activePhase = phase + (IDLE << 1); 2061 if (w != null) { // always true 2062 w.phase = p; 2063 long pc = ctl, qc; 2064 for (;;) { // try to enqueue 2065 w.stackPred = (int)pc; // set ctl stack link 2066 qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK); 2067 if (pc == (pc = compareAndExchangeCtl(pc, qc))) // success 2068 break; 2069 if ((pc & RC_MASK) >= (qc & RC_MASK)) { 2070 p = w.phase = phase; // back out on possible signal 2071 break; 2072 } 2073 } 2074 if (p != phase && // check quiescent termination 2075 ((runState & SHUTDOWN) == 0L || quiescent() <= 0)) { 2076 WorkQueue[] qs; 2077 int spins = ((short)(qc >>> TC_SHIFT) << 1) + SPIN_WAITS + 1; 2078 while ((p = w.phase) != activePhase && --spins > 0) 2079 Thread.onSpinWait(); // reduce flailing 2080 if (p != activePhase && (qs = queues) != null) { 2081 int n = qs.length, step = (r >>> 16) | 1; 2082 for (int i = r, l = n; l > 0; --l, i += step) { 2083 WorkQueue q; // check for missed signals 2084 if ((q = qs[i & (n - 1)]) != null && 2085 q.top - q.base > 0) { 2086 if (ctl == qc && compareAndSetCtl(qc, pc)) { 2087 p = w.phase = activePhase; 2088 break; // self-signal 2089 } 2090 if ((p = w.phase) == activePhase) 2091 break; 2092 } 2093 } 2094 if (p != activePhase) { 2095 long delay = (((qc & RC_MASK) > 0L) ? 0L : 2096 (w.source != INVALID_ID) ? keepAlive : 2097 TIMEOUT_SLOP); // minimal delay if cascade 2098 if ((p = w.phase) != activePhase) 2099 p = awaitWork(w, p, delay); // block, drop, or exit 2100 } 2101 } 2102 } 2103 } 2104 return p; 2105 } 2106 2107 /** 2108 * Awaits signal or termination. 2109 * 2110 * @param w the work queue 2111 * @param p current phase (known to be idle) 2112 * @param delay if nonzero keepAlive before trimming if quiescent 2113 * @return current phase, with IDLE set if worker should exit 2114 */ 2115 private int awaitWork(WorkQueue w, int p, long delay) { 2116 if (w != null) { 2117 int activePhase = p + IDLE; 2118 LockSupport.setCurrentBlocker(this); 2119 long deadline = (delay == 0L ? 0L : 2120 delay + System.currentTimeMillis()); 2121 w.parking = 1; // enable unpark 2122 while ((p = w.phase) != activePhase) { 2123 boolean trimmable = false; int trim; 2124 Thread.interrupted(); // clear status 2125 if ((runState & STOP) != 0L) 2126 break; 2127 if (deadline != 0L) { 2128 if ((trim = tryTrim(w, p, deadline)) > 0) 2129 break; 2130 else if (trim < 0) 2131 deadline = 0L; 2132 else 2133 trimmable = true; 2134 } 2135 U.park(trimmable, deadline); 2136 } 2137 w.parking = 0; 2138 LockSupport.setCurrentBlocker(null); 2139 } 2140 return p; 2141 } 2142 2143 /** 2144 * Tries to remove and deregister worker after timeout, and release 2145 * another to do the same. 2146 * @return > 0: trimmed, < 0 : not trimmable, else 0 2147 */ 2148 private int tryTrim(WorkQueue w, int phase, long deadline) { 2149 long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v; 2150 if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null) 2151 stat = -1; // no longer ctl top 2152 else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP) 2153 stat = 0; // spurious wakeup 2154 else if (!compareAndSetCtl( 2155 c, nc = (w.stackPred & LMASK) | (UMASK & (c - TC_UNIT)))) 2156 stat = -1; // lost race to signaller 2157 else { 2158 stat = 1; 2159 w.source = DROPPED; 2160 w.phase = activePhase; 2161 if ((vp = (int)nc) != 0 && (vs = queues) != null && 2162 vs.length > (i = vp & SMASK) && (v = vs[i]) != null && 2163 compareAndSetCtl( // try to wake up next waiter 2164 nc, ((UMASK & (nc + RC_UNIT)) | 2165 (nc & TC_MASK) | (v.stackPred & LMASK)))) { 2166 v.source = INVALID_ID; // enable cascaded timeouts 2167 v.phase = vp; 2168 U.unpark(v.owner); 2169 } 2170 } 2171 return stat; 2172 } 2173 2174 /** 2175 * Scans for and returns a polled task, if available. Used only 2176 * for untracked polls. Begins scan at a random index to avoid 2177 * systematic unfairness. 2178 * 2179 * @param submissionsOnly if true, only scan submission queues 2180 */ 2181 private ForkJoinTask<?> pollScan(boolean submissionsOnly) { 2182 if ((runState & STOP) == 0L) { 2183 WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t; 2184 int r = ThreadLocalRandom.nextSecondarySeed(); 2185 if (submissionsOnly) // even indices only 2186 r &= ~1; 2187 int step = (submissionsOnly) ? 2 : 1; 2188 if ((qs = queues) != null && (n = qs.length) > 0) { 2189 for (int i = n; i > 0; i -= step, r += step) { 2190 if ((q = qs[r & (n - 1)]) != null && 2191 (t = q.poll()) != null) 2192 return t; 2193 } 2194 } 2195 } 2196 return null; 2197 } 2198 2199 /** 2200 * Tries to decrement counts (sometimes implicitly) and possibly 2201 * arrange for a compensating worker in preparation for 2202 * blocking. May fail due to interference, in which case -1 is 2203 * returned so caller may retry. A zero return value indicates 2204 * that the caller doesn't need to re-adjust counts when later 2205 * unblocked. 2206 * 2207 * @param c incoming ctl value 2208 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry 2209 */ 2210 private int tryCompensate(long c) { 2211 Predicate<? super ForkJoinPool> sat; 2212 long b = config; 2213 int pc = parallelism, // unpack fields 2214 minActive = (short)(b >>> RC_SHIFT), 2215 maxTotal = (short)(b >>> TC_SHIFT) + pc, 2216 active = (short)(c >>> RC_SHIFT), 2217 total = (short)(c >>> TC_SHIFT), 2218 sp = (int)c, 2219 stat = -1; // default retry return 2220 if (sp != 0 && active <= pc) { // activate idle worker 2221 WorkQueue[] qs; WorkQueue v; int i; 2222 if ((qs = queues) != null && qs.length > (i = sp & SMASK) && 2223 (v = qs[i]) != null && 2224 compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { 2225 v.phase = sp; 2226 if (v.parking != 0) 2227 U.unpark(v.owner); 2228 stat = UNCOMPENSATE; 2229 } 2230 } 2231 else if (active > minActive && total >= pc) { // reduce active workers 2232 if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) 2233 stat = UNCOMPENSATE; 2234 } 2235 else if (total < maxTotal && total < MAX_CAP) { // try to expand pool 2236 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 2237 if ((runState & STOP) != 0L) // terminating 2238 stat = 0; 2239 else if (compareAndSetCtl(c, nc)) 2240 stat = createWorker() ? UNCOMPENSATE : 0; 2241 } 2242 else if (!compareAndSetCtl(c, c)) // validate 2243 ; 2244 else if ((sat = saturate) != null && sat.test(this)) 2245 stat = 0; 2246 else 2247 throw new RejectedExecutionException( 2248 "Thread limit exceeded replacing blocked worker"); 2249 return stat; 2250 } 2251 2252 /** 2253 * Readjusts RC count; called from ForkJoinTask after blocking. 2254 */ 2255 final void uncompensate() { 2256 getAndAddCtl(RC_UNIT); 2257 } 2258 2259 /** 2260 * Helps if possible until the given task is done. Processes 2261 * compatible local tasks and scans other queues for task produced 2262 * by w's stealers; returning compensated blocking sentinel if 2263 * none are found. 2264 * 2265 * @param task the task 2266 * @param w caller's WorkQueue 2267 * @param internal true if w is owned by a ForkJoinWorkerThread 2268 * @return task status on exit, or UNCOMPENSATE for compensated blocking 2269 */ 2270 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) { 2271 if (w != null) 2272 w.tryRemoveAndExec(task, internal); 2273 int s = 0; 2274 if (task != null && (s = task.status) >= 0 && internal && w != null) { 2275 int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source; 2276 long sctl = 0L; // track stability 2277 outer: for (boolean rescan = true;;) { 2278 if ((s = task.status) < 0) 2279 break; 2280 if (!rescan) { 2281 if ((runState & STOP) != 0L) 2282 break; 2283 if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0) 2284 break; 2285 } 2286 rescan = false; 2287 WorkQueue[] qs = queues; 2288 int n = (qs == null) ? 0 : qs.length; 2289 scan: for (int l = n >>> 1; l > 0; --l, r += 2) { 2290 int j; WorkQueue q; 2291 if ((q = qs[j = r & SMASK & (n - 1)]) != null) { 2292 for (;;) { 2293 int sq = q.source, b, cap, k; ForkJoinTask<?>[] a; 2294 if ((a = q.array) == null || (cap = a.length) <= 0) 2295 break; 2296 ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; 2297 U.loadFence(); 2298 boolean eligible = false; 2299 if (t == task) 2300 eligible = true; 2301 else if (t != null) { // check steal chain 2302 for (int v = sq, d = cap;;) { 2303 WorkQueue p; 2304 if (v == wid) { 2305 eligible = true; 2306 break; 2307 } 2308 if ((v & 1) == 0 || // external or none 2309 --d < 0 || // bound depth 2310 (p = qs[v & (n - 1)]) == null) 2311 break; 2312 v = p.source; 2313 } 2314 } 2315 if ((s = task.status) < 0) 2316 break outer; // validate 2317 if (q.source == sq && q.base == b && a[k] == t) { 2318 int nb = b + 1, nk = nb & (cap - 1); 2319 if (!eligible) { // revisit if nonempty 2320 if (!rescan && t == null && 2321 (a[nk] != null || q.top - b > 0)) 2322 rescan = true; 2323 break; 2324 } 2325 if (U.compareAndSetReference( 2326 a, slotOffset(k), t, null)) { 2327 q.updateBase(nb); 2328 w.source = j; 2329 t.doExec(); 2330 w.source = wsrc; 2331 rescan = true; // restart at index r 2332 break scan; 2333 } 2334 } 2335 } 2336 } 2337 } 2338 } 2339 } 2340 return s; 2341 } 2342 2343 /** 2344 * Version of helpJoin for CountedCompleters. 2345 * 2346 * @param task root of computation (only called when a CountedCompleter) 2347 * @param w caller's WorkQueue 2348 * @param internal true if w is owned by a ForkJoinWorkerThread 2349 * @return task status on exit, or UNCOMPENSATE for compensated blocking 2350 */ 2351 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) { 2352 int s = 0; 2353 if (task != null && (s = task.status) >= 0 && w != null) { 2354 int r = w.phase + 1; // for indexing 2355 long sctl = 0L; // track stability 2356 outer: for (boolean rescan = true, locals = true;;) { 2357 if (locals && (s = w.helpComplete(task, internal, 0)) < 0) 2358 break; 2359 if ((s = task.status) < 0) 2360 break; 2361 if (!rescan) { 2362 if ((runState & STOP) != 0L) 2363 break; 2364 if (sctl == (sctl = ctl) && 2365 (!internal || (s = tryCompensate(sctl)) >= 0)) 2366 break; 2367 } 2368 rescan = locals = false; 2369 WorkQueue[] qs = queues; 2370 int n = (qs == null) ? 0 : qs.length; 2371 scan: for (int l = n; l > 0; --l, ++r) { 2372 int j; WorkQueue q; 2373 if ((q = qs[j = r & SMASK & (n - 1)]) != null) { 2374 for (;;) { 2375 ForkJoinTask<?>[] a; int b, cap, k; 2376 if ((a = q.array) == null || (cap = a.length) <= 0) 2377 break; 2378 ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; 2379 U.loadFence(); 2380 boolean eligible = false; 2381 if (t instanceof CountedCompleter) { 2382 CountedCompleter<?> f = (CountedCompleter<?>)t; 2383 for (int steps = cap; steps > 0; --steps) { 2384 if (f == task) { 2385 eligible = true; 2386 break; 2387 } 2388 if ((f = f.completer) == null) 2389 break; 2390 } 2391 } 2392 if ((s = task.status) < 0) // validate 2393 break outer; 2394 if (q.base == b) { 2395 int nb = b + 1, nk = nb & (cap - 1); 2396 if (eligible) { 2397 if (U.compareAndSetReference( 2398 a, slotOffset(k), t, null)) { 2399 q.updateBase(nb); 2400 t.doExec(); 2401 locals = rescan = true; 2402 break scan; 2403 } 2404 } 2405 else if (a[k] == t) { 2406 if (!rescan && t == null && 2407 (a[nk] != null || q.top - b > 0)) 2408 rescan = true; // revisit 2409 break; 2410 } 2411 } 2412 } 2413 } 2414 } 2415 } 2416 } 2417 return s; 2418 } 2419 2420 /** 2421 * Runs tasks until all workers are inactive and no tasks are 2422 * found. Rather than blocking when tasks cannot be found, rescans 2423 * until all others cannot find tasks either. 2424 * 2425 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 2426 * @param interruptible true if return on interrupt 2427 * @return positive if quiescent, negative if interrupted, else 0 2428 */ 2429 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) { 2430 int phase; // w.phase inactive bit set when temporarily quiescent 2431 if (w == null || ((phase = w.phase) & IDLE) != 0) 2432 return 0; 2433 int wsrc = w.source; 2434 long startTime = System.nanoTime(); 2435 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos 2436 long prevSum = 0L; 2437 int activePhase = phase, inactivePhase = phase + IDLE; 2438 int r = phase + 1, waits = 0, returnStatus = 1; 2439 boolean locals = true; 2440 for (long e = runState;;) { 2441 if ((e & STOP) != 0L) 2442 break; // terminating 2443 if (interruptible && Thread.interrupted()) { 2444 returnStatus = -1; 2445 break; 2446 } 2447 if (locals) { // run local tasks before (re)polling 2448 locals = false; 2449 for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;) 2450 u.doExec(); 2451 } 2452 WorkQueue[] qs = queues; 2453 int n = (qs == null) ? 0 : qs.length; 2454 long phaseSum = 0L; 2455 boolean rescan = false, busy = false; 2456 scan: for (int l = n; l > 0; --l, ++r) { 2457 int j; WorkQueue q; 2458 if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) { 2459 for (;;) { 2460 ForkJoinTask<?>[] a; int b, cap, k; 2461 if ((a = q.array) == null || (cap = a.length) <= 0) 2462 break; 2463 ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; 2464 if (t != null && phase == inactivePhase) // reactivate 2465 w.phase = phase = activePhase; 2466 U.loadFence(); 2467 if (q.base == b && a[k] == t) { 2468 int nb = b + 1; 2469 if (t == null) { 2470 if (!rescan) { 2471 int qp = q.phase, mq = qp & (IDLE | 1); 2472 phaseSum += qp; 2473 if (mq == 0 || q.top - b > 0) 2474 rescan = true; 2475 else if (mq == 1) 2476 busy = true; 2477 } 2478 break; 2479 } 2480 if (U.compareAndSetReference( 2481 a, slotOffset(k), t, null)) { 2482 q.updateBase(nb); 2483 w.source = j; 2484 t.doExec(); 2485 w.source = wsrc; 2486 rescan = locals = true; 2487 break scan; 2488 } 2489 } 2490 } 2491 } 2492 } 2493 if (e != (e = runState) || prevSum != (prevSum = phaseSum) || 2494 rescan || (e & RS_LOCK) != 0L) 2495 ; // inconsistent 2496 else if (!busy) 2497 break; 2498 else if (phase == activePhase) { 2499 waits = 0; // recheck, then sleep 2500 w.phase = phase = inactivePhase; 2501 } 2502 else if (System.nanoTime() - startTime > nanos) { 2503 returnStatus = 0; // timed out 2504 break; 2505 } 2506 else if (waits == 0) // same as spinLockRunState except 2507 waits = MIN_SLEEP; // with rescan instead of onSpinWait 2508 else { 2509 LockSupport.parkNanos(this, (long)waits); 2510 if (waits < maxSleep) 2511 waits <<= 1; 2512 } 2513 } 2514 w.phase = activePhase; 2515 return returnStatus; 2516 } 2517 2518 /** 2519 * Helps quiesce from external caller until done, interrupted, or timeout 2520 * 2521 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 2522 * @param interruptible true if return on interrupt 2523 * @return positive if quiescent, negative if interrupted, else 0 2524 */ 2525 private int externalHelpQuiesce(long nanos, boolean interruptible) { 2526 if (quiescent() < 0) { 2527 long startTime = System.nanoTime(); 2528 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); 2529 for (int waits = 0;;) { 2530 ForkJoinTask<?> t; 2531 if (interruptible && Thread.interrupted()) 2532 return -1; 2533 else if ((t = pollScan(false)) != null) { 2534 waits = 0; 2535 t.doExec(); 2536 } 2537 else if (quiescent() >= 0) 2538 break; 2539 else if (System.nanoTime() - startTime > nanos) 2540 return 0; 2541 else if (waits == 0) 2542 waits = MIN_SLEEP; 2543 else { 2544 LockSupport.parkNanos(this, (long)waits); 2545 if (waits < maxSleep) 2546 waits <<= 1; 2547 } 2548 } 2549 } 2550 return 1; 2551 } 2552 2553 /** 2554 * Helps quiesce from either internal or external caller 2555 * 2556 * @param pool the pool to use, or null if any 2557 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 2558 * @param interruptible true if return on interrupt 2559 * @return positive if quiescent, negative if interrupted, else 0 2560 */ 2561 static final int helpQuiescePool(ForkJoinPool pool, long nanos, 2562 boolean interruptible) { 2563 Thread t; ForkJoinPool p; ForkJoinWorkerThread wt; 2564 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && 2565 (p = (wt = (ForkJoinWorkerThread)t).pool) != null && 2566 (p == pool || pool == null)) 2567 return p.helpQuiesce(wt.workQueue, nanos, interruptible); 2568 else if ((p = pool) != null || (p = common) != null) 2569 return p.externalHelpQuiesce(nanos, interruptible); 2570 else 2571 return 0; 2572 } 2573 2574 /** 2575 * Gets and removes a local or stolen task for the given worker. 2576 * 2577 * @return a task, if available 2578 */ 2579 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 2580 ForkJoinTask<?> t; 2581 if (w == null || (t = w.nextLocalTask()) == null) 2582 t = pollScan(false); 2583 return t; 2584 } 2585 2586 // External operations 2587 2588 /** 2589 * Finds and locks a WorkQueue for an external submitter, or 2590 * throws RejectedExecutionException if shutdown or terminating. 2591 * @param r current ThreadLocalRandom.getProbe() value 2592 * @param isSubmit false if this is for a common pool fork 2593 */ 2594 private WorkQueue submissionQueue(int r) { 2595 if (r == 0) { 2596 ThreadLocalRandom.localInit(); // initialize caller's probe 2597 r = ThreadLocalRandom.getProbe(); 2598 } 2599 for (;;) { 2600 int n, i, id; WorkQueue[] qs; WorkQueue q, w = null; 2601 if ((qs = queues) == null) 2602 break; 2603 if ((n = qs.length) <= 0) 2604 break; 2605 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { 2606 if (w == null) 2607 w = new WorkQueue(null, id, 0, false); 2608 w.phase = id; 2609 long stop = lockRunState() & STOP; 2610 if (stop == 0L && queues == qs && qs[i] == null) { 2611 q = qs[i] = w; // else retry 2612 w = null; 2613 } 2614 unlockRunState(); 2615 if (q != null) 2616 return q; 2617 if (stop != 0L) 2618 break; 2619 } 2620 else if (!q.tryLockPhase()) // move index 2621 r = ThreadLocalRandom.advanceProbe(r); 2622 else if ((runState & SHUTDOWN) != 0L) { 2623 q.unlockPhase(); // check while q lock held 2624 break; 2625 } 2626 else 2627 return q; 2628 } 2629 tryTerminate(false, false); 2630 throw new RejectedExecutionException(); 2631 } 2632 2633 private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) { 2634 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; 2635 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2636 (wt = (ForkJoinWorkerThread)t).pool == this) { 2637 internal = true; 2638 q = wt.workQueue; 2639 } 2640 else { // find and lock queue 2641 internal = false; 2642 q = submissionQueue(ThreadLocalRandom.getProbe()); 2643 } 2644 q.push(task, signalIfEmpty ? this : null, internal); 2645 } 2646 2647 /** 2648 * Returns queue for an external submission, bypassing call to 2649 * submissionQueue if already established and unlocked. 2650 */ 2651 final WorkQueue externalSubmissionQueue() { 2652 WorkQueue[] qs; WorkQueue q; int n; 2653 int r = ThreadLocalRandom.getProbe(); 2654 return (((qs = queues) != null && (n = qs.length) > 0 && 2655 (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && 2656 q.tryLockPhase()) ? q : submissionQueue(r)); 2657 } 2658 2659 /** 2660 * Returns queue for an external thread, if one exists that has 2661 * possibly ever submitted to the given pool (nonzero probe), or 2662 * null if none. 2663 */ 2664 static WorkQueue externalQueue(ForkJoinPool p) { 2665 WorkQueue[] qs; int n; 2666 int r = ThreadLocalRandom.getProbe(); 2667 return (p != null && (qs = p.queues) != null && 2668 (n = qs.length) > 0 && r != 0) ? 2669 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null; 2670 } 2671 2672 /** 2673 * Returns external queue for common pool. 2674 */ 2675 static WorkQueue commonQueue() { 2676 return externalQueue(common); 2677 } 2678 2679 /** 2680 * If the given executor is a ForkJoinPool, poll and execute 2681 * AsynchronousCompletionTasks from worker's queue until none are 2682 * available or blocker is released. 2683 */ 2684 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 2685 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; 2686 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2687 (wt = (ForkJoinWorkerThread)t).pool == e) 2688 w = wt.workQueue; 2689 else if (e instanceof ForkJoinPool) 2690 w = externalQueue((ForkJoinPool)e); 2691 if (w != null) 2692 w.helpAsyncBlocker(blocker); 2693 } 2694 2695 /** 2696 * Returns a cheap heuristic guide for task partitioning when 2697 * programmers, frameworks, tools, or languages have little or no 2698 * idea about task granularity. In essence, by offering this 2699 * method, we ask users only about tradeoffs in overhead vs 2700 * expected throughput and its variance, rather than how finely to 2701 * partition tasks. 2702 * 2703 * In a steady state strict (tree-structured) computation, each 2704 * thread makes available for stealing enough tasks for other 2705 * threads to remain active. Inductively, if all threads play by 2706 * the same rules, each thread should make available only a 2707 * constant number of tasks. 2708 * 2709 * The minimum useful constant is just 1. But using a value of 1 2710 * would require immediate replenishment upon each steal to 2711 * maintain enough tasks, which is infeasible. Further, 2712 * partitionings/granularities of offered tasks should minimize 2713 * steal rates, which in general means that threads nearer the top 2714 * of computation tree should generate more than those nearer the 2715 * bottom. In perfect steady state, each thread is at 2716 * approximately the same level of computation tree. However, 2717 * producing extra tasks amortizes the uncertainty of progress and 2718 * diffusion assumptions. 2719 * 2720 * So, users will want to use values larger (but not much larger) 2721 * than 1 to both smooth over transient shortages and hedge 2722 * against uneven progress; as traded off against the cost of 2723 * extra task overhead. We leave the user to pick a threshold 2724 * value to compare with the results of this call to guide 2725 * decisions, but recommend values such as 3. 2726 * 2727 * When all threads are active, it is on average OK to estimate 2728 * surplus strictly locally. In steady-state, if one thread is 2729 * maintaining say 2 surplus tasks, then so are others. So we can 2730 * just use estimated queue length. However, this strategy alone 2731 * leads to serious mis-estimates in some non-steady-state 2732 * conditions (ramp-up, ramp-down, other stalls). We can detect 2733 * many of these by further considering the number of "idle" 2734 * threads, that are known to have zero queued tasks, so 2735 * compensate by a factor of (#idle/#active) threads. 2736 */ 2737 static int getSurplusQueuedTaskCount() { 2738 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2739 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2740 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null && 2741 (q = wt.workQueue) != null) { 2742 int n = q.top - q.base; 2743 int p = pool.parallelism; 2744 int a = (short)(pool.ctl >>> RC_SHIFT); 2745 return n - (a > (p >>>= 1) ? 0 : 2746 a > (p >>>= 1) ? 1 : 2747 a > (p >>>= 1) ? 2 : 2748 a > (p >>>= 1) ? 4 : 2749 8); 2750 } 2751 return 0; 2752 } 2753 2754 // Termination 2755 2756 /** 2757 * Possibly initiates and/or completes pool termination. 2758 * 2759 * @param now if true, unconditionally terminate, else only 2760 * if no work and no active workers 2761 * @param enable if true, terminate when next possible 2762 * @return runState on exit 2763 */ 2764 private long tryTerminate(boolean now, boolean enable) { 2765 long e = runState, isShutdown; 2766 if ((e & STOP) == 0L) { 2767 if (now) 2768 runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN; 2769 else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) { 2770 if (isShutdown == 0) 2771 getAndBitwiseOrRunState(SHUTDOWN); 2772 if (quiescent() > 0) 2773 e = runState; 2774 } 2775 if ((e & STOP) != 0L && (releaseAll() & RC_MASK) > 0L && now) 2776 interruptAll(); 2777 } 2778 if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks 2779 if ((ctl & RC_MASK) > 0L) { // unless all inactive 2780 int r = (int)Thread.currentThread().threadId(); 2781 WorkQueue[] qs = queues; // stagger traversals 2782 int n = (qs == null) ? 0 : qs.length; 2783 for (int l = n; l > 0; --l, ++r) { 2784 WorkQueue q; ForkJoinTask<?> t; 2785 if ((q = qs[r & (n - 1)]) != null && 2786 q.source != DROPPED) { 2787 while ((t = q.poll()) != null) { 2788 try { 2789 t.cancel(false); 2790 } catch (Throwable ignore) { 2791 } 2792 } 2793 } 2794 } 2795 } 2796 if (((e = runState) & TERMINATED) == 0L && ctl == 0L) { 2797 e |= TERMINATED; 2798 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { 2799 CountDownLatch done; SharedThreadContainer ctr; 2800 if ((done = termination) != null) 2801 done.countDown(); 2802 if ((ctr = container) != null) 2803 ctr.close(); 2804 } 2805 } 2806 } 2807 return e; 2808 } 2809 2810 /** 2811 * Interrupts all workers 2812 */ 2813 private void interruptAll() { 2814 Thread current = Thread.currentThread(); 2815 WorkQueue[] qs = queues; 2816 int n = (qs == null) ? 0 : qs.length; 2817 for (int i = 1; i < n; i += 2) { 2818 WorkQueue q; Thread o; 2819 if ((q = qs[i]) != null && (o = q.owner) != null && o != current && 2820 q.source != DROPPED) { 2821 try { 2822 o.interrupt(); 2823 } catch (Throwable ignore) { 2824 } 2825 } 2826 } 2827 } 2828 2829 /** 2830 * Returns termination signal, constructing if necessary 2831 */ 2832 private CountDownLatch terminationSignal() { 2833 CountDownLatch signal, s, u; 2834 if ((signal = termination) == null) 2835 signal = ((u = cmpExTerminationSignal( 2836 s = new CountDownLatch(1))) == null) ? s : u; 2837 return signal; 2838 } 2839 2840 // Exported methods 2841 2842 // Constructors 2843 2844 /** 2845 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2846 * java.lang.Runtime#availableProcessors}, using defaults for all 2847 * other parameters (see {@link #ForkJoinPool(int, 2848 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2849 * int, int, int, Predicate, long, TimeUnit)}). 2850 * 2851 * @throws SecurityException if a security manager exists and 2852 * the caller is not permitted to modify threads 2853 * because it does not hold {@link 2854 * java.lang.RuntimePermission}{@code ("modifyThread")} 2855 */ 2856 public ForkJoinPool() { 2857 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2858 defaultForkJoinWorkerThreadFactory, null, false, 2859 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2860 } 2861 2862 /** 2863 * Creates a {@code ForkJoinPool} with the indicated parallelism 2864 * level, using defaults for all other parameters (see {@link 2865 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2866 * UncaughtExceptionHandler, boolean, int, int, int, Predicate, 2867 * long, TimeUnit)}). 2868 * 2869 * @param parallelism the parallelism level 2870 * @throws IllegalArgumentException if parallelism less than or 2871 * equal to zero, or greater than implementation limit 2872 * @throws SecurityException if a security manager exists and 2873 * the caller is not permitted to modify threads 2874 * because it does not hold {@link 2875 * java.lang.RuntimePermission}{@code ("modifyThread")} 2876 */ 2877 public ForkJoinPool(int parallelism) { 2878 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 2879 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2880 } 2881 2882 /** 2883 * Creates a {@code ForkJoinPool} with the given parameters (using 2884 * defaults for others -- see {@link #ForkJoinPool(int, 2885 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2886 * int, int, int, Predicate, long, TimeUnit)}). 2887 * 2888 * @param parallelism the parallelism level. For default value, 2889 * use {@link java.lang.Runtime#availableProcessors}. 2890 * @param factory the factory for creating new threads. For default value, 2891 * use {@link #defaultForkJoinWorkerThreadFactory}. 2892 * @param handler the handler for internal worker threads that 2893 * terminate due to unrecoverable errors encountered while executing 2894 * tasks. For default value, use {@code null}. 2895 * @param asyncMode if true, 2896 * establishes local first-in-first-out scheduling mode for forked 2897 * tasks that are never joined. This mode may be more appropriate 2898 * than default locally stack-based mode in applications in which 2899 * worker threads only process event-style asynchronous tasks. 2900 * For default value, use {@code false}. 2901 * @throws IllegalArgumentException if parallelism less than or 2902 * equal to zero, or greater than implementation limit 2903 * @throws NullPointerException if the factory is null 2904 * @throws SecurityException if a security manager exists and 2905 * the caller is not permitted to modify threads 2906 * because it does not hold {@link 2907 * java.lang.RuntimePermission}{@code ("modifyThread")} 2908 */ 2909 public ForkJoinPool(int parallelism, 2910 ForkJoinWorkerThreadFactory factory, 2911 UncaughtExceptionHandler handler, 2912 boolean asyncMode) { 2913 this(parallelism, factory, handler, asyncMode, 2914 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2915 } 2916 2917 /** 2918 * Creates a {@code ForkJoinPool} with the given parameters. 2919 * 2920 * @param parallelism the parallelism level. For default value, 2921 * use {@link java.lang.Runtime#availableProcessors}. 2922 * 2923 * @param factory the factory for creating new threads. For 2924 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 2925 * 2926 * @param handler the handler for internal worker threads that 2927 * terminate due to unrecoverable errors encountered while 2928 * executing tasks. For default value, use {@code null}. 2929 * 2930 * @param asyncMode if true, establishes local first-in-first-out 2931 * scheduling mode for forked tasks that are never joined. This 2932 * mode may be more appropriate than default locally stack-based 2933 * mode in applications in which worker threads only process 2934 * event-style asynchronous tasks. For default value, use {@code 2935 * false}. 2936 * 2937 * @param corePoolSize the number of threads to keep in the pool 2938 * (unless timed out after an elapsed keep-alive). Normally (and 2939 * by default) this is the same value as the parallelism level, 2940 * but may be set to a larger value to reduce dynamic overhead if 2941 * tasks regularly block. Using a smaller value (for example 2942 * {@code 0}) has the same effect as the default. 2943 * 2944 * @param maximumPoolSize the maximum number of threads allowed. 2945 * When the maximum is reached, attempts to replace blocked 2946 * threads fail. (However, because creation and termination of 2947 * different threads may overlap, and may be managed by the given 2948 * thread factory, this value may be transiently exceeded.) To 2949 * arrange the same value as is used by default for the common 2950 * pool, use {@code 256} plus the {@code parallelism} level. (By 2951 * default, the common pool allows a maximum of 256 spare 2952 * threads.) Using a value (for example {@code 2953 * Integer.MAX_VALUE}) larger than the implementation's total 2954 * thread limit has the same effect as using this limit (which is 2955 * the default). 2956 * 2957 * @param minimumRunnable the minimum allowed number of core 2958 * threads not blocked by a join or {@link ManagedBlocker}. To 2959 * ensure progress, when too few unblocked threads exist and 2960 * unexecuted tasks may exist, new threads are constructed, up to 2961 * the given maximumPoolSize. For the default value, use {@code 2962 * 1}, that ensures liveness. A larger value might improve 2963 * throughput in the presence of blocked activities, but might 2964 * not, due to increased overhead. A value of zero may be 2965 * acceptable when submitted tasks cannot have dependencies 2966 * requiring additional threads. 2967 * 2968 * @param saturate if non-null, a predicate invoked upon attempts 2969 * to create more than the maximum total allowed threads. By 2970 * default, when a thread is about to block on a join or {@link 2971 * ManagedBlocker}, but cannot be replaced because the 2972 * maximumPoolSize would be exceeded, a {@link 2973 * RejectedExecutionException} is thrown. But if this predicate 2974 * returns {@code true}, then no exception is thrown, so the pool 2975 * continues to operate with fewer than the target number of 2976 * runnable threads, which might not ensure progress. 2977 * 2978 * @param keepAliveTime the elapsed time since last use before 2979 * a thread is terminated (and then later replaced if needed). 2980 * For the default value, use {@code 60, TimeUnit.SECONDS}. 2981 * 2982 * @param unit the time unit for the {@code keepAliveTime} argument 2983 * 2984 * @throws IllegalArgumentException if parallelism is less than or 2985 * equal to zero, or is greater than implementation limit, 2986 * or if maximumPoolSize is less than parallelism, 2987 * of if the keepAliveTime is less than or equal to zero. 2988 * @throws NullPointerException if the factory is null 2989 * @throws SecurityException if a security manager exists and 2990 * the caller is not permitted to modify threads 2991 * because it does not hold {@link 2992 * java.lang.RuntimePermission}{@code ("modifyThread")} 2993 * @since 9 2994 */ 2995 public ForkJoinPool(int parallelism, 2996 ForkJoinWorkerThreadFactory factory, 2997 UncaughtExceptionHandler handler, 2998 boolean asyncMode, 2999 int corePoolSize, 3000 int maximumPoolSize, 3001 int minimumRunnable, 3002 Predicate<? super ForkJoinPool> saturate, 3003 long keepAliveTime, 3004 TimeUnit unit) { 3005 checkPermission(); 3006 int p = parallelism; 3007 if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L) 3008 throw new IllegalArgumentException(); 3009 if (factory == null || unit == null) 3010 throw new NullPointerException(); 3011 int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); 3012 this.parallelism = p; 3013 this.factory = factory; 3014 this.ueh = handler; 3015 this.saturate = saturate; 3016 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); 3017 int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP); 3018 int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP); 3019 this.config = (((asyncMode ? FIFO : 0) & LMASK) | 3020 (((long)maxSpares) << TC_SHIFT) | 3021 (((long)minAvail) << RC_SHIFT)); 3022 this.queues = new WorkQueue[size]; 3023 String pid = Integer.toString(getAndAddPoolIds(1) + 1); 3024 String name = "ForkJoinPool-" + pid; 3025 this.workerNamePrefix = name + "-worker-"; 3026 this.container = SharedThreadContainer.create(name); 3027 } 3028 3029 /** 3030 * Constructor for common pool using parameters possibly 3031 * overridden by system properties 3032 */ 3033 private ForkJoinPool(byte forCommonPoolOnly) { 3034 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory; 3035 UncaughtExceptionHandler handler = null; 3036 int maxSpares = DEFAULT_COMMON_MAX_SPARES; 3037 int pc = 0, preset = 0; // nonzero if size set as property 3038 try { // ignore exceptions in accessing/parsing properties 3039 String pp = System.getProperty 3040 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 3041 if (pp != null) { 3042 pc = Math.max(0, Integer.parseInt(pp)); 3043 preset = PRESET_SIZE; 3044 } 3045 String ms = System.getProperty 3046 ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); 3047 if (ms != null) 3048 maxSpares = Math.clamp(Integer.parseInt(ms), 0, MAX_CAP); 3049 String sf = System.getProperty 3050 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 3051 String sh = System.getProperty 3052 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 3053 if (sf != null || sh != null) { 3054 ClassLoader ldr = ClassLoader.getSystemClassLoader(); 3055 if (sf != null) 3056 fac = (ForkJoinWorkerThreadFactory) 3057 ldr.loadClass(sf).getConstructor().newInstance(); 3058 if (sh != null) 3059 handler = (UncaughtExceptionHandler) 3060 ldr.loadClass(sh).getConstructor().newInstance(); 3061 } 3062 } catch (Exception ignore) { 3063 } 3064 if (preset == 0) 3065 pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); 3066 int p = Math.min(pc, MAX_CAP); 3067 int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1)); 3068 this.parallelism = p; 3069 this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) | 3070 (1L << RC_SHIFT)); 3071 this.factory = fac; 3072 this.ueh = handler; 3073 this.keepAlive = DEFAULT_KEEPALIVE; 3074 this.saturate = null; 3075 this.workerNamePrefix = null; 3076 this.queues = new WorkQueue[size]; 3077 this.container = SharedThreadContainer.create("ForkJoinPool.commonPool"); 3078 } 3079 3080 /** 3081 * Returns the common pool instance. This pool is statically 3082 * constructed; its run state is unaffected by attempts to {@link 3083 * #shutdown} or {@link #shutdownNow}. However this pool and any 3084 * ongoing processing are automatically terminated upon program 3085 * {@link System#exit}. Any program that relies on asynchronous 3086 * task processing to complete before program termination should 3087 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 3088 * before exit. 3089 * 3090 * @return the common pool instance 3091 * @since 1.8 3092 */ 3093 public static ForkJoinPool commonPool() { 3094 // assert common != null : "static init error"; 3095 return common; 3096 } 3097 3098 // Execution methods 3099 3100 /** 3101 * Performs the given task, returning its result upon completion. 3102 * If the computation encounters an unchecked Exception or Error, 3103 * it is rethrown as the outcome of this invocation. Rethrown 3104 * exceptions behave in the same way as regular exceptions, but, 3105 * when possible, contain stack traces (as displayed for example 3106 * using {@code ex.printStackTrace()}) of both the current thread 3107 * as well as the thread actually encountering the exception; 3108 * minimally only the latter. 3109 * 3110 * @param task the task 3111 * @param <T> the type of the task's result 3112 * @return the task's result 3113 * @throws NullPointerException if the task is null 3114 * @throws RejectedExecutionException if the task cannot be 3115 * scheduled for execution 3116 */ 3117 public <T> T invoke(ForkJoinTask<T> task) { 3118 Objects.requireNonNull(task); 3119 poolSubmit(true, task); 3120 try { 3121 return task.join(); 3122 } catch (RuntimeException | Error unchecked) { 3123 throw unchecked; 3124 } catch (Exception checked) { 3125 throw new RuntimeException(checked); 3126 } 3127 } 3128 3129 /** 3130 * Arranges for (asynchronous) execution of the given task. 3131 * 3132 * @param task the task 3133 * @throws NullPointerException if the task is null 3134 * @throws RejectedExecutionException if the task cannot be 3135 * scheduled for execution 3136 */ 3137 public void execute(ForkJoinTask<?> task) { 3138 Objects.requireNonNull(task); 3139 poolSubmit(true, task); 3140 } 3141 3142 // AbstractExecutorService methods 3143 3144 /** 3145 * @throws NullPointerException if the task is null 3146 * @throws RejectedExecutionException if the task cannot be 3147 * scheduled for execution 3148 */ 3149 @Override 3150 @SuppressWarnings("unchecked") 3151 public void execute(Runnable task) { 3152 poolSubmit(true, (task instanceof ForkJoinTask<?>) 3153 ? (ForkJoinTask<Void>) task // avoid re-wrap 3154 : new ForkJoinTask.RunnableExecuteAction(task)); 3155 } 3156 3157 /** 3158 * Submits a ForkJoinTask for execution. 3159 * 3160 * @implSpec 3161 * This method is equivalent to {@link #externalSubmit(ForkJoinTask)} 3162 * when called from a thread that is not in this pool. 3163 * 3164 * @param task the task to submit 3165 * @param <T> the type of the task's result 3166 * @return the task 3167 * @throws NullPointerException if the task is null 3168 * @throws RejectedExecutionException if the task cannot be 3169 * scheduled for execution 3170 */ 3171 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 3172 Objects.requireNonNull(task); 3173 poolSubmit(true, task); 3174 return task; 3175 } 3176 3177 /** 3178 * @throws NullPointerException if the task is null 3179 * @throws RejectedExecutionException if the task cannot be 3180 * scheduled for execution 3181 */ 3182 @Override 3183 public <T> ForkJoinTask<T> submit(Callable<T> task) { 3184 ForkJoinTask<T> t = 3185 (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3186 new ForkJoinTask.AdaptedCallable<T>(task) : 3187 new ForkJoinTask.AdaptedInterruptibleCallable<T>(task); 3188 poolSubmit(true, t); 3189 return t; 3190 } 3191 3192 /** 3193 * @throws NullPointerException if the task is null 3194 * @throws RejectedExecutionException if the task cannot be 3195 * scheduled for execution 3196 */ 3197 @Override 3198 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 3199 ForkJoinTask<T> t = 3200 (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3201 new ForkJoinTask.AdaptedRunnable<T>(task, result) : 3202 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result); 3203 poolSubmit(true, t); 3204 return t; 3205 } 3206 3207 /** 3208 * @throws NullPointerException if the task is null 3209 * @throws RejectedExecutionException if the task cannot be 3210 * scheduled for execution 3211 */ 3212 @Override 3213 @SuppressWarnings("unchecked") 3214 public ForkJoinTask<?> submit(Runnable task) { 3215 ForkJoinTask<?> f = (task instanceof ForkJoinTask<?>) ? 3216 (ForkJoinTask<Void>) task : // avoid re-wrap 3217 ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3218 new ForkJoinTask.AdaptedRunnable<Void>(task, null) : 3219 new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)); 3220 poolSubmit(true, f); 3221 return f; 3222 } 3223 3224 /** 3225 * Submits the given task as if submitted from a non-{@code ForkJoinTask} 3226 * client. The task is added to a scheduling queue for submissions to the 3227 * pool even when called from a thread in the pool. 3228 * 3229 * @implSpec 3230 * This method is equivalent to {@link #submit(ForkJoinTask)} when called 3231 * from a thread that is not in this pool. 3232 * 3233 * @return the task 3234 * @param task the task to submit 3235 * @param <T> the type of the task's result 3236 * @throws NullPointerException if the task is null 3237 * @throws RejectedExecutionException if the task cannot be 3238 * scheduled for execution 3239 * @since 20 3240 */ 3241 public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { 3242 Objects.requireNonNull(task); 3243 externalSubmissionQueue().push(task, this, false); 3244 return task; 3245 } 3246 3247 /** 3248 * Submits the given task without guaranteeing that it will 3249 * eventually execute in the absence of available active threads. 3250 * In some contexts, this method may reduce contention and 3251 * overhead by relying on context-specific knowledge that existing 3252 * threads (possibly including the calling thread if operating in 3253 * this pool) will eventually be available to execute the task. 3254 * 3255 * @param task the task 3256 * @param <T> the type of the task's result 3257 * @return the task 3258 * @throws NullPointerException if the task is null 3259 * @throws RejectedExecutionException if the task cannot be 3260 * scheduled for execution 3261 * @since 19 3262 */ 3263 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) { 3264 Objects.requireNonNull(task); 3265 poolSubmit(false, task); 3266 return task; 3267 } 3268 3269 /** 3270 * Changes the target parallelism of this pool, controlling the 3271 * future creation, use, and termination of worker threads. 3272 * Applications include contexts in which the number of available 3273 * processors changes over time. 3274 * 3275 * @implNote This implementation restricts the maximum number of 3276 * running threads to 32767 3277 * 3278 * @param size the target parallelism level 3279 * @return the previous parallelism level. 3280 * @throws IllegalArgumentException if size is less than 1 or 3281 * greater than the maximum supported by this pool. 3282 * @throws UnsupportedOperationException this is the{@link 3283 * #commonPool()} and parallelism level was set by System 3284 * property {@systemProperty 3285 * java.util.concurrent.ForkJoinPool.common.parallelism}. 3286 * @throws SecurityException if a security manager exists and 3287 * the caller is not permitted to modify threads 3288 * because it does not hold {@link 3289 * java.lang.RuntimePermission}{@code ("modifyThread")} 3290 * @since 19 3291 */ 3292 public int setParallelism(int size) { 3293 if (size < 1 || size > MAX_CAP) 3294 throw new IllegalArgumentException(); 3295 if ((config & PRESET_SIZE) != 0) 3296 throw new UnsupportedOperationException("Cannot override System property"); 3297 checkPermission(); 3298 return getAndSetParallelism(size); 3299 } 3300 3301 /** 3302 * Uninterrupible version of {@code invokeAll}. Executes the given 3303 * tasks, returning a list of Futures holding their status and 3304 * results when all complete, ignoring interrupts. {@link 3305 * Future#isDone} is {@code true} for each element of the returned 3306 * list. Note that a <em>completed</em> task could have 3307 * terminated either normally or by throwing an exception. The 3308 * results of this method are undefined if the given collection is 3309 * modified while this operation is in progress. 3310 * 3311 * @apiNote This method supports usages that previously relied on an 3312 * incompatible override of 3313 * {@link ExecutorService#invokeAll(java.util.Collection)}. 3314 * 3315 * @param tasks the collection of tasks 3316 * @param <T> the type of the values returned from the tasks 3317 * @return a list of Futures representing the tasks, in the same 3318 * sequential order as produced by the iterator for the 3319 * given task list, each of which has completed 3320 * @throws NullPointerException if tasks or any of its elements are {@code null} 3321 * @throws RejectedExecutionException if any task cannot be 3322 * scheduled for execution 3323 * @since 22 3324 */ 3325 public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) { 3326 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 3327 try { 3328 for (Callable<T> t : tasks) { 3329 ForkJoinTask<T> f = ForkJoinTask.adapt(t); 3330 futures.add(f); 3331 poolSubmit(true, f); 3332 } 3333 for (int i = futures.size() - 1; i >= 0; --i) 3334 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 3335 return futures; 3336 } catch (Throwable t) { 3337 for (Future<T> e : futures) 3338 e.cancel(true); 3339 throw t; 3340 } 3341 } 3342 3343 /** 3344 * Common support for timed and untimed invokeAll 3345 */ 3346 private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 3347 long deadline) 3348 throws InterruptedException { 3349 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 3350 try { 3351 for (Callable<T> t : tasks) { 3352 ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t); 3353 futures.add(f); 3354 poolSubmit(true, f); 3355 } 3356 for (int i = futures.size() - 1; i >= 0; --i) 3357 ((ForkJoinTask<?>)futures.get(i)) 3358 .quietlyJoinPoolInvokeAllTask(deadline); 3359 return futures; 3360 } catch (Throwable t) { 3361 for (Future<T> e : futures) 3362 e.cancel(true); 3363 throw t; 3364 } 3365 } 3366 3367 @Override 3368 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 3369 throws InterruptedException { 3370 return invokeAll(tasks, 0L); 3371 } 3372 // for jdk version < 22, replace with 3373 // /** 3374 // * @throws NullPointerException {@inheritDoc} 3375 // * @throws RejectedExecutionException {@inheritDoc} 3376 // */ 3377 // @Override 3378 // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 3379 // return invokeAllUninterruptibly(tasks); 3380 // } 3381 3382 @Override 3383 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 3384 long timeout, TimeUnit unit) 3385 throws InterruptedException { 3386 return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L); 3387 } 3388 3389 @Override 3390 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 3391 throws InterruptedException, ExecutionException { 3392 try { 3393 return new ForkJoinTask.InvokeAnyRoot<T>() 3394 .invokeAny(tasks, this, false, 0L); 3395 } catch (TimeoutException cannotHappen) { 3396 assert false; 3397 return null; 3398 } 3399 } 3400 3401 @Override 3402 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 3403 long timeout, TimeUnit unit) 3404 throws InterruptedException, ExecutionException, TimeoutException { 3405 return new ForkJoinTask.InvokeAnyRoot<T>() 3406 .invokeAny(tasks, this, true, unit.toNanos(timeout)); 3407 } 3408 3409 /** 3410 * Returns the factory used for constructing new workers. 3411 * 3412 * @return the factory used for constructing new workers 3413 */ 3414 public ForkJoinWorkerThreadFactory getFactory() { 3415 return factory; 3416 } 3417 3418 /** 3419 * Returns the handler for internal worker threads that terminate 3420 * due to unrecoverable errors encountered while executing tasks. 3421 * 3422 * @return the handler, or {@code null} if none 3423 */ 3424 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 3425 return ueh; 3426 } 3427 3428 /** 3429 * Returns the targeted parallelism level of this pool. 3430 * 3431 * @return the targeted parallelism level of this pool 3432 */ 3433 public int getParallelism() { 3434 return Math.max(getParallelismOpaque(), 1); 3435 } 3436 3437 /** 3438 * Returns the targeted parallelism level of the common pool. 3439 * 3440 * @return the targeted parallelism level of the common pool 3441 * @since 1.8 3442 */ 3443 public static int getCommonPoolParallelism() { 3444 return common.getParallelism(); 3445 } 3446 3447 /** 3448 * Returns the number of worker threads that have started but not 3449 * yet terminated. The result returned by this method may differ 3450 * from {@link #getParallelism} when threads are created to 3451 * maintain parallelism when others are cooperatively blocked. 3452 * 3453 * @return the number of worker threads 3454 */ 3455 public int getPoolSize() { 3456 return (short)(ctl >>> TC_SHIFT); 3457 } 3458 3459 /** 3460 * Returns {@code true} if this pool uses local first-in-first-out 3461 * scheduling mode for forked tasks that are never joined. 3462 * 3463 * @return {@code true} if this pool uses async mode 3464 */ 3465 public boolean getAsyncMode() { 3466 return (config & FIFO) != 0; 3467 } 3468 3469 /** 3470 * Returns an estimate of the number of worker threads that are 3471 * not blocked waiting to join tasks or for other managed 3472 * synchronization. This method may overestimate the 3473 * number of running threads. 3474 * 3475 * @return the number of worker threads 3476 */ 3477 public int getRunningThreadCount() { 3478 WorkQueue[] qs; WorkQueue q; 3479 int rc = 0; 3480 if ((runState & TERMINATED) == 0L && (qs = queues) != null) { 3481 for (int i = 1; i < qs.length; i += 2) { 3482 if ((q = qs[i]) != null && q.isApparentlyUnblocked()) 3483 ++rc; 3484 } 3485 } 3486 return rc; 3487 } 3488 3489 /** 3490 * Returns an estimate of the number of threads that are currently 3491 * stealing or executing tasks. This method may overestimate the 3492 * number of active threads. 3493 * 3494 * @return the number of active threads 3495 */ 3496 public int getActiveThreadCount() { 3497 return Math.max((short)(ctl >>> RC_SHIFT), 0); 3498 } 3499 3500 /** 3501 * Returns {@code true} if all worker threads are currently idle. 3502 * An idle worker is one that cannot obtain a task to execute 3503 * because none are available to steal from other threads, and 3504 * there are no pending submissions to the pool. This method is 3505 * conservative; it might not return {@code true} immediately upon 3506 * idleness of all threads, but will eventually become true if 3507 * threads remain inactive. 3508 * 3509 * @return {@code true} if all threads are currently idle 3510 */ 3511 public boolean isQuiescent() { 3512 return quiescent() >= 0; 3513 } 3514 3515 /** 3516 * Returns an estimate of the total number of completed tasks that 3517 * were executed by a thread other than their submitter. The 3518 * reported value underestimates the actual total number of steals 3519 * when the pool is not quiescent. This value may be useful for 3520 * monitoring and tuning fork/join programs: in general, steal 3521 * counts should be high enough to keep threads busy, but low 3522 * enough to avoid overhead and contention across threads. 3523 * 3524 * @return the number of steals 3525 */ 3526 public long getStealCount() { 3527 long count = stealCount; 3528 WorkQueue[] qs; WorkQueue q; 3529 if ((qs = queues) != null) { 3530 for (int i = 1; i < qs.length; i += 2) { 3531 if ((q = qs[i]) != null) 3532 count += (long)q.nsteals & 0xffffffffL; 3533 } 3534 } 3535 return count; 3536 } 3537 3538 /** 3539 * Returns an estimate of the total number of tasks currently held 3540 * in queues by worker threads (but not including tasks submitted 3541 * to the pool that have not begun executing). This value is only 3542 * an approximation, obtained by iterating across all threads in 3543 * the pool. This method may be useful for tuning task 3544 * granularities. 3545 * 3546 * @return the number of queued tasks 3547 * @see ForkJoinWorkerThread#getQueuedTaskCount() 3548 */ 3549 public long getQueuedTaskCount() { 3550 WorkQueue[] qs; WorkQueue q; 3551 int count = 0; 3552 if ((runState & TERMINATED) == 0L && (qs = queues) != null) { 3553 for (int i = 1; i < qs.length; i += 2) { 3554 if ((q = qs[i]) != null) 3555 count += q.queueSize(); 3556 } 3557 } 3558 return count; 3559 } 3560 3561 /** 3562 * Returns an estimate of the number of tasks submitted to this 3563 * pool that have not yet begun executing. This method may take 3564 * time proportional to the number of submissions. 3565 * 3566 * @return the number of queued submissions 3567 */ 3568 public int getQueuedSubmissionCount() { 3569 WorkQueue[] qs; WorkQueue q; 3570 int count = 0; 3571 if ((runState & TERMINATED) == 0L && (qs = queues) != null) { 3572 for (int i = 0; i < qs.length; i += 2) { 3573 if ((q = qs[i]) != null) 3574 count += q.queueSize(); 3575 } 3576 } 3577 return count; 3578 } 3579 3580 /** 3581 * Returns {@code true} if there are any tasks submitted to this 3582 * pool that have not yet begun executing. 3583 * 3584 * @return {@code true} if there are any queued submissions 3585 */ 3586 public boolean hasQueuedSubmissions() { 3587 WorkQueue[] qs; WorkQueue q; 3588 if ((runState & STOP) == 0L && (qs = queues) != null) { 3589 for (int i = 0; i < qs.length; i += 2) { 3590 if ((q = qs[i]) != null && q.queueSize() > 0) 3591 return true; 3592 } 3593 } 3594 return false; 3595 } 3596 3597 /** 3598 * Removes and returns the next unexecuted submission if one is 3599 * available. This method may be useful in extensions to this 3600 * class that re-assign work in systems with multiple pools. 3601 * 3602 * @return the next submission, or {@code null} if none 3603 */ 3604 protected ForkJoinTask<?> pollSubmission() { 3605 return pollScan(true); 3606 } 3607 3608 /** 3609 * Removes all available unexecuted submitted and forked tasks 3610 * from scheduling queues and adds them to the given collection, 3611 * without altering their execution status. These may include 3612 * artificially generated or wrapped tasks. This method is 3613 * designed to be invoked only when the pool is known to be 3614 * quiescent. Invocations at other times may not remove all 3615 * tasks. A failure encountered while attempting to add elements 3616 * to collection {@code c} may result in elements being in 3617 * neither, either or both collections when the associated 3618 * exception is thrown. The behavior of this operation is 3619 * undefined if the specified collection is modified while the 3620 * operation is in progress. 3621 * 3622 * @param c the collection to transfer elements into 3623 * @return the number of elements transferred 3624 */ 3625 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 3626 int count = 0; 3627 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) { 3628 c.add(t); 3629 ++count; 3630 } 3631 return count; 3632 } 3633 3634 /** 3635 * Returns a string identifying this pool, as well as its state, 3636 * including indications of run state, parallelism level, and 3637 * worker and task counts. 3638 * 3639 * @return a string identifying this pool, as well as its state 3640 */ 3641 public String toString() { 3642 // Use a single pass through queues to collect counts 3643 long e = runState; 3644 long st = stealCount; 3645 long qt = 0L, ss = 0L; int rc = 0; 3646 WorkQueue[] qs; WorkQueue q; 3647 if ((qs = queues) != null) { 3648 for (int i = 0; i < qs.length; ++i) { 3649 if ((q = qs[i]) != null) { 3650 int size = q.queueSize(); 3651 if ((i & 1) == 0) 3652 ss += size; 3653 else { 3654 qt += size; 3655 st += (long)q.nsteals & 0xffffffffL; 3656 if (q.isApparentlyUnblocked()) 3657 ++rc; 3658 } 3659 } 3660 } 3661 } 3662 3663 int pc = parallelism; 3664 long c = ctl; 3665 int tc = (short)(c >>> TC_SHIFT); 3666 int ac = (short)(c >>> RC_SHIFT); 3667 if (ac < 0) // ignore transient negative 3668 ac = 0; 3669 String level = ((e & TERMINATED) != 0L ? "Terminated" : 3670 (e & STOP) != 0L ? "Terminating" : 3671 (e & SHUTDOWN) != 0L ? "Shutting down" : 3672 "Running"); 3673 return super.toString() + 3674 "[" + level + 3675 ", parallelism = " + pc + 3676 ", size = " + tc + 3677 ", active = " + ac + 3678 ", running = " + rc + 3679 ", steals = " + st + 3680 ", tasks = " + qt + 3681 ", submissions = " + ss + 3682 "]"; 3683 } 3684 3685 /** 3686 * Possibly initiates an orderly shutdown in which previously 3687 * submitted tasks are executed, but no new tasks will be 3688 * accepted. Invocation has no effect on execution state if this 3689 * is the {@link #commonPool()}, and no additional effect if 3690 * already shut down. Tasks that are in the process of being 3691 * submitted concurrently during the course of this method may or 3692 * may not be rejected. 3693 * 3694 * @throws SecurityException if a security manager exists and 3695 * the caller is not permitted to modify threads 3696 * because it does not hold {@link 3697 * java.lang.RuntimePermission}{@code ("modifyThread")} 3698 */ 3699 public void shutdown() { 3700 checkPermission(); 3701 if (workerNamePrefix != null) // not common pool 3702 tryTerminate(false, true); 3703 } 3704 3705 /** 3706 * Possibly attempts to cancel and/or stop all tasks, and reject 3707 * all subsequently submitted tasks. Invocation has no effect on 3708 * execution state if this is the {@link #commonPool()}, and no 3709 * additional effect if already shut down. Otherwise, tasks that 3710 * are in the process of being submitted or executed concurrently 3711 * during the course of this method may or may not be 3712 * rejected. This method cancels both existing and unexecuted 3713 * tasks, in order to permit termination in the presence of task 3714 * dependencies. So the method always returns an empty list 3715 * (unlike the case for some other Executors). 3716 * 3717 * @return an empty list 3718 * @throws SecurityException if a security manager exists and 3719 * the caller is not permitted to modify threads 3720 * because it does not hold {@link 3721 * java.lang.RuntimePermission}{@code ("modifyThread")} 3722 */ 3723 public List<Runnable> shutdownNow() { 3724 checkPermission(); 3725 if (workerNamePrefix != null) // not common pool 3726 tryTerminate(true, true); 3727 return Collections.emptyList(); 3728 } 3729 3730 /** 3731 * Returns {@code true} if all tasks have completed following shut down. 3732 * 3733 * @return {@code true} if all tasks have completed following shut down 3734 */ 3735 public boolean isTerminated() { 3736 return (tryTerminate(false, false) & TERMINATED) != 0; 3737 } 3738 3739 /** 3740 * Returns {@code true} if the process of termination has 3741 * commenced but not yet completed. This method may be useful for 3742 * debugging. A return of {@code true} reported a sufficient 3743 * period after shutdown may indicate that submitted tasks have 3744 * ignored or suppressed interruption, or are waiting for I/O, 3745 * causing this executor not to properly terminate. (See the 3746 * advisory notes for class {@link ForkJoinTask} stating that 3747 * tasks should not normally entail blocking operations. But if 3748 * they do, they must abort them on interrupt.) 3749 * 3750 * @return {@code true} if terminating but not yet terminated 3751 */ 3752 public boolean isTerminating() { 3753 return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP; 3754 } 3755 3756 /** 3757 * Returns {@code true} if this pool has been shut down. 3758 * 3759 * @return {@code true} if this pool has been shut down 3760 */ 3761 public boolean isShutdown() { 3762 return (runState & SHUTDOWN) != 0L; 3763 } 3764 3765 /** 3766 * Blocks until all tasks have completed execution after a 3767 * shutdown request, or the timeout occurs, or the current thread 3768 * is interrupted, whichever happens first. Because the {@link 3769 * #commonPool()} never terminates until program shutdown, when 3770 * applied to the common pool, this method is equivalent to {@link 3771 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 3772 * 3773 * @param timeout the maximum time to wait 3774 * @param unit the time unit of the timeout argument 3775 * @return {@code true} if this executor terminated and 3776 * {@code false} if the timeout elapsed before termination 3777 * @throws InterruptedException if interrupted while waiting 3778 */ 3779 public boolean awaitTermination(long timeout, TimeUnit unit) 3780 throws InterruptedException { 3781 long nanos = unit.toNanos(timeout); 3782 CountDownLatch done; 3783 if (workerNamePrefix == null) { // is common pool 3784 if (helpQuiescePool(this, nanos, true) < 0) 3785 throw new InterruptedException(); 3786 return false; 3787 } 3788 else if ((tryTerminate(false, false) & TERMINATED) != 0 || 3789 (done = terminationSignal()) == null || 3790 (runState & TERMINATED) != 0L) 3791 return true; 3792 else 3793 return done.await(nanos, TimeUnit.NANOSECONDS); 3794 } 3795 3796 /** 3797 * If called by a ForkJoinTask operating in this pool, equivalent 3798 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 3799 * waits and/or attempts to assist performing tasks until this 3800 * pool {@link #isQuiescent} or the indicated timeout elapses. 3801 * 3802 * @param timeout the maximum time to wait 3803 * @param unit the time unit of the timeout argument 3804 * @return {@code true} if quiescent; {@code false} if the 3805 * timeout elapsed. 3806 */ 3807 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 3808 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0); 3809 } 3810 3811 /** 3812 * Unless this is the {@link #commonPool()}, initiates an orderly 3813 * shutdown in which previously submitted tasks are executed, but 3814 * no new tasks will be accepted, and waits until all tasks have 3815 * completed execution and the executor has terminated. 3816 * 3817 * <p> If already terminated, or this is the {@link 3818 * #commonPool()}, this method has no effect on execution, and 3819 * does not wait. Otherwise, if interrupted while waiting, this 3820 * method stops all executing tasks as if by invoking {@link 3821 * #shutdownNow()}. It then continues to wait until all actively 3822 * executing tasks have completed. Tasks that were awaiting 3823 * execution are not executed. The interrupt status will be 3824 * re-asserted before this method returns. 3825 * 3826 * @throws SecurityException if a security manager exists and 3827 * shutting down this ExecutorService may manipulate 3828 * threads that the caller is not permitted to modify 3829 * because it does not hold {@link 3830 * java.lang.RuntimePermission}{@code ("modifyThread")}, 3831 * or the security manager's {@code checkAccess} method 3832 * denies access. 3833 * @since 19 3834 */ 3835 @Override 3836 public void close() { 3837 if (workerNamePrefix != null) { 3838 checkPermission(); 3839 CountDownLatch done = null; 3840 boolean interrupted = false; 3841 while ((tryTerminate(interrupted, true) & TERMINATED) == 0) { 3842 if (done == null) 3843 done = terminationSignal(); 3844 else { 3845 try { 3846 done.await(); 3847 break; 3848 } catch (InterruptedException ex) { 3849 interrupted = true; 3850 } 3851 } 3852 } 3853 if (interrupted) 3854 Thread.currentThread().interrupt(); 3855 } 3856 } 3857 3858 /** 3859 * Interface for extending managed parallelism for tasks running 3860 * in {@link ForkJoinPool}s. 3861 * 3862 * <p>A {@code ManagedBlocker} provides two methods. Method 3863 * {@link #isReleasable} must return {@code true} if blocking is 3864 * not necessary. Method {@link #block} blocks the current thread 3865 * if necessary (perhaps internally invoking {@code isReleasable} 3866 * before actually blocking). These actions are performed by any 3867 * thread invoking {@link 3868 * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual 3869 * methods in this API accommodate synchronizers that may, but 3870 * don't usually, block for long periods. Similarly, they allow 3871 * more efficient internal handling of cases in which additional 3872 * workers may be, but usually are not, needed to ensure 3873 * sufficient parallelism. Toward this end, implementations of 3874 * method {@code isReleasable} must be amenable to repeated 3875 * invocation. Neither method is invoked after a prior invocation 3876 * of {@code isReleasable} or {@code block} returns {@code true}. 3877 * 3878 * <p>For example, here is a ManagedBlocker based on a 3879 * ReentrantLock: 3880 * <pre> {@code 3881 * class ManagedLocker implements ManagedBlocker { 3882 * final ReentrantLock lock; 3883 * boolean hasLock = false; 3884 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3885 * public boolean block() { 3886 * if (!hasLock) 3887 * lock.lock(); 3888 * return true; 3889 * } 3890 * public boolean isReleasable() { 3891 * return hasLock || (hasLock = lock.tryLock()); 3892 * } 3893 * }}</pre> 3894 * 3895 * <p>Here is a class that possibly blocks waiting for an 3896 * item on a given queue: 3897 * <pre> {@code 3898 * class QueueTaker<E> implements ManagedBlocker { 3899 * final BlockingQueue<E> queue; 3900 * volatile E item = null; 3901 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3902 * public boolean block() throws InterruptedException { 3903 * if (item == null) 3904 * item = queue.take(); 3905 * return true; 3906 * } 3907 * public boolean isReleasable() { 3908 * return item != null || (item = queue.poll()) != null; 3909 * } 3910 * public E getItem() { // call after pool.managedBlock completes 3911 * return item; 3912 * } 3913 * }}</pre> 3914 */ 3915 public static interface ManagedBlocker { 3916 /** 3917 * Possibly blocks the current thread, for example waiting for 3918 * a lock or condition. 3919 * 3920 * @return {@code true} if no additional blocking is necessary 3921 * (i.e., if isReleasable would return true) 3922 * @throws InterruptedException if interrupted while waiting 3923 * (the method is not required to do so, but is allowed to) 3924 */ 3925 boolean block() throws InterruptedException; 3926 3927 /** 3928 * Returns {@code true} if blocking is unnecessary. 3929 * @return {@code true} if blocking is unnecessary 3930 */ 3931 boolean isReleasable(); 3932 } 3933 3934 /** 3935 * Runs the given possibly blocking task. When {@linkplain 3936 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 3937 * method possibly arranges for a spare thread to be activated if 3938 * necessary to ensure sufficient parallelism while the current 3939 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 3940 * 3941 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 3942 * {@code blocker.block()} until either method returns {@code true}. 3943 * Every call to {@code blocker.block()} is preceded by a call to 3944 * {@code blocker.isReleasable()} that returned {@code false}. 3945 * 3946 * <p>If not running in a ForkJoinPool, this method is 3947 * behaviorally equivalent to 3948 * <pre> {@code 3949 * while (!blocker.isReleasable()) 3950 * if (blocker.block()) 3951 * break;}</pre> 3952 * 3953 * If running in a ForkJoinPool, the pool may first be expanded to 3954 * ensure sufficient parallelism available during the call to 3955 * {@code blocker.block()}. 3956 * 3957 * @param blocker the blocker task 3958 * @throws InterruptedException if {@code blocker.block()} did so 3959 */ 3960 public static void managedBlock(ManagedBlocker blocker) 3961 throws InterruptedException { 3962 Thread t; ForkJoinPool p; 3963 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && 3964 (p = ((ForkJoinWorkerThread)t).pool) != null) 3965 p.compensatedBlock(blocker); 3966 else 3967 unmanagedBlock(blocker); 3968 } 3969 3970 /** ManagedBlock for ForkJoinWorkerThreads */ 3971 private void compensatedBlock(ManagedBlocker blocker) 3972 throws InterruptedException { 3973 Objects.requireNonNull(blocker); 3974 for (;;) { 3975 int comp; boolean done; 3976 long c = ctl; 3977 if (blocker.isReleasable()) 3978 break; 3979 if ((runState & STOP) != 0L) 3980 throw new InterruptedException(); 3981 if ((comp = tryCompensate(c)) >= 0) { 3982 try { 3983 done = blocker.block(); 3984 } finally { 3985 if (comp > 0) 3986 getAndAddCtl(RC_UNIT); 3987 } 3988 if (done) 3989 break; 3990 } 3991 } 3992 } 3993 3994 /** 3995 * Invokes tryCompensate to create or re-activate a spare thread to 3996 * compensate for a thread that performs a blocking operation. When the 3997 * blocking operation is done then endCompensatedBlock must be invoked 3998 * with the value returned by this method to re-adjust the parallelism. 3999 * @return value to use in endCompensatedBlock 4000 */ 4001 final long beginCompensatedBlock() { 4002 int c; 4003 do {} while ((c = tryCompensate(ctl)) < 0); 4004 return (c == 0) ? 0L : RC_UNIT; 4005 } 4006 4007 /** 4008 * Re-adjusts parallelism after a blocking operation completes. 4009 * @param post value from beginCompensatedBlock 4010 */ 4011 void endCompensatedBlock(long post) { 4012 if (post > 0L) { 4013 getAndAddCtl(post); 4014 } 4015 } 4016 4017 /** ManagedBlock for external threads */ 4018 private static void unmanagedBlock(ManagedBlocker blocker) 4019 throws InterruptedException { 4020 Objects.requireNonNull(blocker); 4021 do {} while (!blocker.isReleasable() && !blocker.block()); 4022 } 4023 4024 @Override 4025 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 4026 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 4027 new ForkJoinTask.AdaptedRunnable<T>(runnable, value) : 4028 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value); 4029 } 4030 4031 @Override 4032 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 4033 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 4034 new ForkJoinTask.AdaptedCallable<T>(callable) : 4035 new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable); 4036 } 4037 4038 static { 4039 U = Unsafe.getUnsafe(); 4040 Class<ForkJoinPool> klass = ForkJoinPool.class; 4041 try { 4042 Field poolIdsField = klass.getDeclaredField("poolIds"); 4043 POOLIDS_BASE = U.staticFieldBase(poolIdsField); 4044 POOLIDS = U.staticFieldOffset(poolIdsField); 4045 } catch (NoSuchFieldException e) { 4046 throw new ExceptionInInitializerError(e); 4047 } 4048 CTL = U.objectFieldOffset(klass, "ctl"); 4049 RUNSTATE = U.objectFieldOffset(klass, "runState"); 4050 PARALLELISM = U.objectFieldOffset(klass, "parallelism"); 4051 THREADIDS = U.objectFieldOffset(klass, "threadIds"); 4052 TERMINATION = U.objectFieldOffset(klass, "termination"); 4053 Class<ForkJoinTask[]> aklass = ForkJoinTask[].class; 4054 ABASE = U.arrayBaseOffset(aklass); 4055 int scale = U.arrayIndexScale(aklass); 4056 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 4057 if ((scale & (scale - 1)) != 0) 4058 throw new Error("array index scale not a power of two"); 4059 4060 defaultForkJoinWorkerThreadFactory = 4061 new DefaultForkJoinWorkerThreadFactory(); 4062 @SuppressWarnings("removal") 4063 ForkJoinPool p = common = (System.getSecurityManager() == null) ? 4064 new ForkJoinPool((byte)0) : 4065 AccessController.doPrivileged(new PrivilegedAction<>() { 4066 public ForkJoinPool run() { 4067 return new ForkJoinPool((byte)0); }}); 4068 // allow access to non-public methods 4069 SharedSecrets.setJavaUtilConcurrentFJPAccess( 4070 new JavaUtilConcurrentFJPAccess() { 4071 @Override 4072 public long beginCompensatedBlock(ForkJoinPool pool) { 4073 return pool.beginCompensatedBlock(); 4074 } 4075 public void endCompensatedBlock(ForkJoinPool pool, long post) { 4076 pool.endCompensatedBlock(post); 4077 } 4078 }); 4079 Class<?> dep = LockSupport.class; // ensure loaded 4080 } 4081 }