1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.lang.reflect.Field; 40 import java.security.AccessController; 41 import java.security.AccessControlContext; 42 import java.security.Permission; 43 import java.security.Permissions; 44 import java.security.PrivilegedAction; 45 import java.security.ProtectionDomain; 46 import java.util.ArrayList; 47 import java.util.Collection; 48 import java.util.Collections; 49 import java.util.List; 50 import java.util.Objects; 51 import java.util.function.Predicate; 52 import java.util.concurrent.CountDownLatch; 53 import java.util.concurrent.locks.LockSupport; 54 import jdk.internal.access.JavaUtilConcurrentFJPAccess; 55 import jdk.internal.access.SharedSecrets; 56 import jdk.internal.misc.Unsafe; 57 import jdk.internal.vm.SharedThreadContainer; 58 59 /** 60 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 61 * A {@code ForkJoinPool} provides the entry point for submissions 62 * from non-{@code ForkJoinTask} clients, as well as management and 63 * monitoring operations. 64 * 65 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 66 * ExecutorService} mainly by virtue of employing 67 * <em>work-stealing</em>: all threads in the pool attempt to find and 68 * execute tasks submitted to the pool and/or created by other active 69 * tasks (eventually blocking waiting for work if none exist). This 70 * enables efficient processing when most tasks spawn other subtasks 71 * (as do most {@code ForkJoinTask}s), as well as when many small 72 * tasks are submitted to the pool from external clients. Especially 73 * when setting <em>asyncMode</em> to true in constructors, {@code 74 * ForkJoinPool}s may also be appropriate for use with event-style 75 * tasks that are never joined. All worker threads are initialized 76 * with {@link Thread#isDaemon} set {@code true}. 77 * 78 * <p>A static {@link #commonPool()} is available and appropriate for 79 * most applications. The common pool is used by any ForkJoinTask that 80 * is not explicitly submitted to a specified pool. Using the common 81 * pool normally reduces resource usage (its threads are slowly 82 * reclaimed during periods of non-use, and reinstated upon subsequent 83 * use). 84 * 85 * <p>For applications that require separate or custom pools, a {@code 86 * ForkJoinPool} may be constructed with a given target parallelism 87 * level; by default, equal to the number of available processors. 88 * The pool attempts to maintain enough active (or available) threads 89 * by dynamically adding, suspending, or resuming internal worker 90 * threads, even if some tasks are stalled waiting to join others. 91 * However, no such adjustments are guaranteed in the face of blocked 92 * I/O or other unmanaged synchronization. The nested {@link 93 * ManagedBlocker} interface enables extension of the kinds of 94 * synchronization accommodated. The default policies may be 95 * overridden using a constructor with parameters corresponding to 96 * those documented in class {@link ThreadPoolExecutor}. 97 * 98 * <p>In addition to execution and lifecycle control methods, this 99 * class provides status check methods (for example 100 * {@link #getStealCount}) that are intended to aid in developing, 101 * tuning, and monitoring fork/join applications. Also, method 102 * {@link #toString} returns indications of pool state in a 103 * convenient form for informal monitoring. 104 * 105 * <p>As is the case with other ExecutorServices, there are three 106 * main task execution methods summarized in the following table. 107 * These are designed to be used primarily by clients not already 108 * engaged in fork/join computations in the current pool. The main 109 * forms of these methods accept instances of {@code ForkJoinTask}, 110 * but overloaded forms also allow mixed execution of plain {@code 111 * Runnable}- or {@code Callable}- based activities as well. However, 112 * tasks that are already executing in a pool should normally instead 113 * use the within-computation forms listed in the table unless using 114 * async event-style tasks that are not usually joined, in which case 115 * there is little difference among choice of methods. 116 * 117 * <table class="plain"> 118 * <caption>Summary of task execution methods</caption> 119 * <tr> 120 * <td></td> 121 * <th scope="col"> Call from non-fork/join clients</th> 122 * <th scope="col"> Call from within fork/join computations</th> 123 * </tr> 124 * <tr> 125 * <th scope="row" style="text-align:left"> Arrange async execution</th> 126 * <td> {@link #execute(ForkJoinTask)}</td> 127 * <td> {@link ForkJoinTask#fork}</td> 128 * </tr> 129 * <tr> 130 * <th scope="row" style="text-align:left"> Await and obtain result</th> 131 * <td> {@link #invoke(ForkJoinTask)}</td> 132 * <td> {@link ForkJoinTask#invoke}</td> 133 * </tr> 134 * <tr> 135 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 136 * <td> {@link #submit(ForkJoinTask)}</td> 137 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 138 * </tr> 139 * </table> 140 * 141 * <p>The parameters used to construct the common pool may be controlled by 142 * setting the following {@linkplain System#getProperty system properties}: 143 * <ul> 144 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism} 145 * - the parallelism level, a non-negative integer 146 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory} 147 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 148 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 149 * is used to load this class. 150 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler} 151 * - the class name of a {@link UncaughtExceptionHandler}. 152 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 153 * is used to load this class. 154 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares} 155 * - the maximum number of allowed extra threads to maintain target 156 * parallelism (default 256). 157 * </ul> 158 * If no thread factory is supplied via a system property, then the 159 * common pool uses a factory that uses the system class loader as the 160 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 161 * In addition, if a {@link SecurityManager} is present, then 162 * the common pool uses a factory supplying threads that have no 163 * {@link Permissions} enabled, and are not guaranteed to preserve 164 * the values of {@link java.lang.ThreadLocal} variables across tasks. 165 * 166 * Upon any error in establishing these settings, default parameters 167 * are used. It is possible to disable or limit the use of threads in 168 * the common pool by setting the parallelism property to zero, and/or 169 * using a factory that may return {@code null}. However doing so may 170 * cause unjoined tasks to never be executed. 171 * 172 * @implNote This implementation restricts the maximum number of 173 * running threads to 32767. Attempts to create pools with greater 174 * than the maximum number result in {@code 175 * IllegalArgumentException}. Also, this implementation rejects 176 * submitted tasks (that is, by throwing {@link 177 * RejectedExecutionException}) only when the pool is shut down or 178 * internal resources have been exhausted. 179 * 180 * @since 1.7 181 * @author Doug Lea 182 */ 183 public class ForkJoinPool extends AbstractExecutorService { 184 185 /* 186 * Implementation Overview -- omitted until stable 187 * 188 */ 189 190 // static configuration constants 191 192 /** 193 * Default idle timeout value (in milliseconds) for idle threads 194 * to park waiting for new work before terminating. 195 */ 196 static final long DEFAULT_KEEPALIVE = 60_000L; 197 198 /** 199 * Undershoot tolerance for idle timeouts 200 */ 201 static final long TIMEOUT_SLOP = 20L; 202 203 /** 204 * The default value for common pool maxSpares. Overridable using 205 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares" 206 * system property. The default value is far in excess of normal 207 * requirements, but also far short of maximum capacity and typical OS 208 * thread limits, so allows JVMs to catch misuse/abuse before 209 * running out of resources needed to do so. 210 */ 211 static final int DEFAULT_COMMON_MAX_SPARES = 256; 212 213 /** 214 * Initial capacity of work-stealing queue array. Must be a power 215 * of two, at least 2. See above. 216 */ 217 static final int INITIAL_QUEUE_CAPACITY = 1 << 6; 218 219 // conversions among short, int, long 220 static final int SMASK = 0xffff; // (unsigned) short bits 221 static final long LMASK = 0xffffffffL; // lower 32 bits of long 222 static final long UMASK = ~LMASK; // upper 32 bits 223 224 // masks and sentinels for queue indices 225 static final int MAX_CAP = 0x7fff; // max # workers 226 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id 227 static final int INVALID_ID = 0x4000; // unused external queue id 228 229 // pool.runState bits 230 static final int STOP = 1 << 0; // terminating 231 static final int SHUTDOWN = 1 << 1; // terminate when quiescent 232 static final int TERMINATED = 1 << 2; // only set if STOP also set 233 static final int RS_LOCK = 1 << 3; // lowest seqlock bit 234 235 // spin/sleep limits for runState locking and elsewhere 236 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait 237 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos 238 static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos 239 240 // {pool, workQueue} config bits 241 static final int FIFO = 1 << 0; // fifo queue or access mode 242 static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers 243 static final int PRESET_SIZE = 1 << 2; // size was set by property 244 245 // others 246 static final int DEREGISTERED = 1 << 31; // worker terminating 247 static final int UNCOMPENSATE = 1 << 16; // tryCompensate return 248 static final int IDLE = 1 << 16; // phase seqlock/version count 249 250 /* 251 * Bits and masks for ctl and bounds are packed with 4 16 bit subfields: 252 * RC: Number of released (unqueued) workers 253 * TC: Number of total workers 254 * SS: version count and status of top waiting thread 255 * ID: poolIndex of top of Treiber stack of waiters 256 * 257 * When convenient, we can extract the lower 32 stack top bits 258 * (including version bits) as sp=(int)ctl. When sp is non-zero, 259 * there are waiting workers. Count fields may be transiently 260 * negative during termination because of out-of-order updates. 261 * To deal with this, we use casts in and out of "short" and/or 262 * signed shifts to maintain signedness. Because it occupies 263 * uppermost bits, we can add one release count using getAndAdd of 264 * RC_UNIT, rather than CAS, when returning from a blocked join. 265 * Other updates of multiple subfields require CAS. 266 */ 267 268 // Release counts 269 static final int RC_SHIFT = 48; 270 static final long RC_UNIT = 0x0001L << RC_SHIFT; 271 static final long RC_MASK = 0xffffL << RC_SHIFT; 272 // Total counts 273 static final int TC_SHIFT = 32; 274 static final long TC_UNIT = 0x0001L << TC_SHIFT; 275 static final long TC_MASK = 0xffffL << TC_SHIFT; 276 277 /* 278 * All atomic operations on task arrays (queues) use Unsafe 279 * operations that take array offsets versus indices, based on 280 * array base and shift constants established during static 281 * initialization. 282 */ 283 static final long ABASE; 284 static final int ASHIFT; 285 286 // Static utilities 287 288 /** 289 * Returns the array offset corresponding to the given index for 290 * Unsafe task queue operations 291 */ 292 static long slotOffset(int index) { 293 return ((long)index << ASHIFT) + ABASE; 294 } 295 296 /** 297 * If there is a security manager, makes sure caller has 298 * permission to modify threads. 299 */ 300 @SuppressWarnings("removal") 301 private static void checkPermission() { 302 SecurityManager security; RuntimePermission perm; 303 if ((security = System.getSecurityManager()) != null) { 304 if ((perm = modifyThreadPermission) == null) 305 modifyThreadPermission = perm = // races OK 306 new RuntimePermission("modifyThread"); 307 security.checkPermission(perm); 308 } 309 } 310 311 // Nested classes 312 313 /** 314 * Factory for creating new {@link ForkJoinWorkerThread}s. 315 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 316 * for {@code ForkJoinWorkerThread} subclasses that extend base 317 * functionality or initialize threads with different contexts. 318 */ 319 public static interface ForkJoinWorkerThreadFactory { 320 /** 321 * Returns a new worker thread operating in the given pool. 322 * Returning null or throwing an exception may result in tasks 323 * never being executed. If this method throws an exception, 324 * it is relayed to the caller of the method (for example 325 * {@code execute}) causing attempted thread creation. If this 326 * method returns null or throws an exception, it is not 327 * retried until the next attempted creation (for example 328 * another call to {@code execute}). 329 * 330 * @param pool the pool this thread works in 331 * @return the new worker thread, or {@code null} if the request 332 * to create a thread is rejected 333 * @throws NullPointerException if the pool is null 334 */ 335 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 336 } 337 338 /** 339 * Default ForkJoinWorkerThreadFactory implementation; creates a 340 * new ForkJoinWorkerThread using the system class loader as the 341 * thread context class loader. 342 */ 343 static final class DefaultForkJoinWorkerThreadFactory 344 implements ForkJoinWorkerThreadFactory { 345 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 346 boolean isCommon = (pool.workerNamePrefix == null); 347 @SuppressWarnings("removal") 348 SecurityManager sm = System.getSecurityManager(); 349 if (sm == null) { 350 if (isCommon) 351 return new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool); 352 else 353 return new ForkJoinWorkerThread(null, pool, true, false); 354 } else if (isCommon) 355 return newCommonWithACC(pool); 356 else 357 return newRegularWithACC(pool); 358 } 359 360 /* 361 * Create and use static AccessControlContexts only if there 362 * is a SecurityManager. (These can be removed if/when 363 * SecurityManagers are removed from platform.) The ACCs are 364 * immutable and equivalent even when racily initialized, so 365 * they don't require locking, although with the chance of 366 * needlessly duplicate construction. 367 */ 368 @SuppressWarnings("removal") 369 static volatile AccessControlContext regularACC, commonACC; 370 371 @SuppressWarnings("removal") 372 static ForkJoinWorkerThread newRegularWithACC(ForkJoinPool pool) { 373 AccessControlContext acc = regularACC; 374 if (acc == null) { 375 Permissions ps = new Permissions(); 376 ps.add(new RuntimePermission("getClassLoader")); 377 ps.add(new RuntimePermission("setContextClassLoader")); 378 regularACC = acc = 379 new AccessControlContext(new ProtectionDomain[] { 380 new ProtectionDomain(null, ps) }); 381 } 382 return AccessController.doPrivileged( 383 new PrivilegedAction<>() { 384 public ForkJoinWorkerThread run() { 385 return new ForkJoinWorkerThread(null, pool, true, false); 386 }}, acc); 387 } 388 389 @SuppressWarnings("removal") 390 static ForkJoinWorkerThread newCommonWithACC(ForkJoinPool pool) { 391 AccessControlContext acc = commonACC; 392 if (acc == null) { 393 Permissions ps = new Permissions(); 394 ps.add(new RuntimePermission("getClassLoader")); 395 ps.add(new RuntimePermission("setContextClassLoader")); 396 ps.add(new RuntimePermission("modifyThread")); 397 ps.add(new RuntimePermission("enableContextClassLoaderOverride")); 398 ps.add(new RuntimePermission("modifyThreadGroup")); 399 commonACC = acc = 400 new AccessControlContext(new ProtectionDomain[] { 401 new ProtectionDomain(null, ps) }); 402 } 403 return AccessController.doPrivileged( 404 new PrivilegedAction<>() { 405 public ForkJoinWorkerThread run() { 406 return new ForkJoinWorkerThread. 407 InnocuousForkJoinWorkerThread(pool); 408 }}, acc); 409 } 410 } 411 412 /** 413 * Queues supporting work-stealing as well as external task 414 * submission. See above for descriptions and algorithms. 415 */ 416 static final class WorkQueue { 417 // fields declared in order of their likely layout on most VMs 418 final ForkJoinWorkerThread owner; // null if shared 419 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size 420 int base; // index of next slot for poll 421 final int config; // mode bits 422 423 // fields otherwise causing more unnecessary false-sharing cache misses 424 @jdk.internal.vm.annotation.Contended("w") 425 int top; // index of next slot for push 426 @jdk.internal.vm.annotation.Contended("w") 427 volatile int phase; // versioned active status 428 @jdk.internal.vm.annotation.Contended("w") 429 int stackPred; // pool stack (ctl) predecessor link 430 @jdk.internal.vm.annotation.Contended("w") 431 volatile int source; // source queue id (or DEREGISTERED) 432 @jdk.internal.vm.annotation.Contended("w") 433 int nsteals; // number of steals from other queues 434 @jdk.internal.vm.annotation.Contended("w") 435 volatile int parking; // nonzero if parked in awaitWork 436 437 // Support for atomic operations 438 private static final Unsafe U; 439 private static final long PHASE; 440 private static final long BASE; 441 private static final long TOP; 442 private static final long SOURCE; 443 private static final long ARRAY; 444 445 final void updateBase(int v) { 446 U.putIntVolatile(this, BASE, v); 447 } 448 final void updateTop(int v) { 449 U.putIntOpaque(this, TOP, v); 450 } 451 final void setSource(int v) { 452 U.getAndSetInt(this, SOURCE, v); 453 } 454 final void updateArray(ForkJoinTask<?>[] a) { 455 U.getAndSetReference(this, ARRAY, a); 456 } 457 final void unlockPhase() { 458 U.getAndAddInt(this, PHASE, IDLE); 459 } 460 final boolean tryLockPhase() { // seqlock acquire 461 int p; 462 return (((p = phase) & IDLE) != 0 && 463 U.compareAndSetInt(this, PHASE, p, p + IDLE)); 464 } 465 466 /** 467 * Constructor. For internal queues, most fields are initialized 468 * upon thread start in pool.registerWorker. 469 */ 470 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, 471 boolean clearThreadLocals) { 472 if (clearThreadLocals) 473 cfg |= CLEAR_TLS; 474 this.config = cfg; 475 top = base = 1; 476 this.phase = id; 477 this.owner = owner; 478 } 479 480 /** 481 * Returns an exportable index (used by ForkJoinWorkerThread). 482 */ 483 final int getPoolIndex() { 484 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit 485 } 486 487 /** 488 * Returns the approximate number of tasks in the queue. 489 */ 490 final int queueSize() { 491 int unused = phase; // for ordering effect 492 return Math.max(top - base, 0); // ignore transient negative 493 } 494 495 /** 496 * Pushes a task. Called only by owner or if already locked 497 * 498 * @param task the task. Caller must ensure non-null. 499 * @param pool the pool to signal if was previously empty, else null 500 * @param internal if caller owns this queue 501 * @throws RejectedExecutionException if array could not be resized 502 */ 503 final void push(ForkJoinTask<?> task, ForkJoinPool pool, 504 boolean internal) { 505 int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask<?>[] a; 506 if ((a = array) == null || (cap = a.length) <= 0 || 507 (room = (m = cap - 1) - (s - b)) < 0) { // could not resize 508 if (!internal) 509 unlockPhase(); 510 throw new RejectedExecutionException("Queue capacity exceeded"); 511 } 512 top = s + 1; 513 long pos = slotOffset(p = m & s); 514 if (!internal) 515 U.putReference(a, pos, task); // inside lock 516 else 517 U.getAndSetReference(a, pos, task); // fully fenced 518 if (room == 0 && (newCap = cap << 1) > 0) { 519 ForkJoinTask<?>[] newArray = null; 520 try { // resize for next time 521 newArray = new ForkJoinTask<?>[newCap]; 522 } catch (OutOfMemoryError ex) { 523 } 524 if (newArray != null) { // else throw on next push 525 int newMask = newCap - 1; // poll old, push to new 526 p = newMask & s; 527 for (int k = s, j = cap; j > 0; --j, --k) { 528 ForkJoinTask<?> u; 529 if ((u = (ForkJoinTask<?>)U.getAndSetReference( 530 a, slotOffset(k & m), null)) == null) 531 break; // lost to pollers 532 newArray[k & newMask] = u; 533 } 534 updateArray(a = newArray); // fully fenced 535 } 536 } 537 if (!internal) 538 unlockPhase(); 539 if ((room == 0 || a[m & (s - 1)] == null) && pool != null) 540 pool.signalWork(a, p); 541 } 542 543 /** 544 * Takes next task, if one exists, in order specified by mode, 545 * so acts as either local-pop or local-poll. Called only by owner. 546 * @param fifo nonzero if FIFO mode 547 */ 548 private ForkJoinTask<?> nextLocalTask(int fifo) { 549 ForkJoinTask<?> t = null; 550 ForkJoinTask<?>[] a = array; 551 int b = base, p = top, cap; 552 if (p - b > 0 && a != null && (cap = a.length) > 0) { 553 for (int m = cap - 1, s, nb;;) { 554 if (fifo == 0 || (nb = b + 1) == p) { 555 if ((t = (ForkJoinTask<?>)U.getAndSetReference( 556 a, slotOffset(m & (s = p - 1)), null)) != null) 557 updateTop(s); // else lost race for only task 558 break; 559 } 560 if ((t = (ForkJoinTask<?>)U.getAndSetReference( 561 a, slotOffset(m & b), null)) != null) { 562 updateBase(nb); 563 break; 564 } 565 while (b == (b = base)) { 566 U.loadFence(); 567 Thread.onSpinWait(); // spin to reduce memory traffic 568 } 569 if (p - b <= 0) 570 break; 571 } 572 } 573 return t; 574 } 575 576 /** 577 * Takes next task, if one exists, using configured mode. 578 * (Always internal, never called for Common pool.) 579 */ 580 final ForkJoinTask<?> nextLocalTask() { 581 return nextLocalTask(config & FIFO); 582 } 583 584 /** 585 * Pops the given task only if it is at the current top. 586 * @param task the task. Caller must ensure non-null. 587 * @param internal if caller owns this queue 588 */ 589 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) { 590 boolean taken = false; 591 ForkJoinTask<?>[] a = array; 592 int p = top, s = p - 1, cap, k; 593 if (a != null && (cap = a.length) > 0 && 594 a[k = (cap - 1) & s] == task && 595 (internal || tryLockPhase())) { 596 if (top == p && 597 U.compareAndSetReference(a, slotOffset(k), task, null)) { 598 taken = true; 599 updateTop(s); 600 } 601 if (!internal) 602 unlockPhase(); 603 } 604 return taken; 605 } 606 607 /** 608 * Returns next task, if one exists, in order specified by mode. 609 */ 610 final ForkJoinTask<?> peek() { 611 ForkJoinTask<?>[] a = array; 612 int b = base, cfg = config, p = top, cap; 613 if (p != b && a != null && (cap = a.length) > 0) { 614 if ((cfg & FIFO) == 0) 615 return a[(cap - 1) & (p - 1)]; 616 else { // skip over in-progress removals 617 ForkJoinTask<?> t; 618 for ( ; p - b > 0; ++b) { 619 if ((t = a[(cap - 1) & b]) != null) 620 return t; 621 } 622 } 623 } 624 return null; 625 } 626 627 /** 628 * Polls for a task. Used only by non-owners. 629 * 630 */ 631 final ForkJoinTask<?> poll() { 632 for (;;) { 633 ForkJoinTask<?>[] a = array; 634 int b = base, cap, k; 635 if (a == null || (cap = a.length) <= 0) 636 break; 637 ForkJoinTask<?> t = a[k = b & (cap - 1)]; 638 U.loadFence(); 639 if (base == b) { 640 Object o; 641 int nb = b + 1, nk = nb & (cap - 1); 642 if (t == null) 643 o = a[k]; 644 else if (t == (o = U.compareAndExchangeReference( 645 a, slotOffset(k), t, null))) { 646 updateBase(nb); 647 return t; 648 } 649 if (o == null && a[nk] == null && array == a && 650 (phase & (IDLE | 1)) != 0 && top - base <= 0) 651 break; // empty 652 } 653 } 654 return null; 655 } 656 657 // specialized execution methods 658 659 /** 660 * Runs the given task, as well as remaining local tasks, plus 661 * those from src queue that can be taken without interference. 662 */ 663 final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, 664 int srcBase, int cfg) { 665 if (task != null && src != null) { 666 int fifo = cfg & FIFO, nstolen = 1; 667 for (;;) { 668 task.doExec(); 669 if ((task = nextLocalTask(fifo)) == null) { 670 int k, cap; ForkJoinTask<?>[] a; 671 if (src.base != srcBase || 672 (a = src.array) == null || (cap = a.length) <= 0 || 673 (task = a[k = srcBase & (cap - 1)]) == null) 674 break; 675 U.loadFence(); 676 if (src.base != srcBase || !U.compareAndSetReference( 677 a, slotOffset(k), task, null)) 678 break; 679 src.updateBase(++srcBase); 680 ++nstolen; 681 } 682 } 683 nsteals += nstolen; 684 if ((cfg & CLEAR_TLS) != 0) 685 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); 686 } 687 } 688 689 /** 690 * Deep form of tryUnpush: Traverses from top and removes and 691 * runs task if present. 692 */ 693 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) { 694 ForkJoinTask<?>[] a = array; 695 int b = base, p = top, s = p - 1, d = p - b, cap; 696 if (a != null && (cap = a.length) > 0) { 697 for (int m = cap - 1, i = s; d > 0; --i, --d) { 698 ForkJoinTask<?> t; int k; boolean taken; 699 if ((t = a[k = i & m]) == null) 700 break; 701 if (t == task) { 702 long pos = slotOffset(k); 703 if (!internal && !tryLockPhase()) 704 break; // fail if locked 705 if (taken = 706 (top == p && 707 U.compareAndSetReference(a, pos, task, null))) { 708 if (i == s) // act as pop 709 updateTop(s); 710 else if (i == base) // act as poll 711 updateBase(i + 1); 712 else { // swap with top 713 U.putReferenceVolatile( 714 a, pos, (ForkJoinTask<?>) 715 U.getAndSetReference( 716 a, slotOffset(s & m), null)); 717 updateTop(s); 718 } 719 } 720 if (!internal) 721 unlockPhase(); 722 if (taken) 723 task.doExec(); 724 break; 725 } 726 } 727 } 728 } 729 730 /** 731 * Tries to pop and run tasks within the target's computation 732 * until done, not found, or limit exceeded. 733 * 734 * @param task root of computation 735 * @param limit max runs, or zero for no limit 736 * @return task status if known to be done 737 */ 738 final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) { 739 int status = 0; 740 if (task != null) { 741 outer: for (;;) { 742 ForkJoinTask<?>[] a; ForkJoinTask<?> t; boolean taken; 743 int stat, p, s, cap, k; 744 if ((stat = task.status) < 0) { 745 status = stat; 746 break; 747 } 748 if ((a = array) == null || (cap = a.length) <= 0) 749 break; 750 if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null) 751 break; 752 if (!(t instanceof CountedCompleter)) 753 break; 754 CountedCompleter<?> f = (CountedCompleter<?>)t; 755 for (int steps = cap;;) { // bound path 756 if (f == task) 757 break; 758 if ((f = f.completer) == null || --steps == 0) 759 break outer; 760 } 761 if (!internal && !tryLockPhase()) 762 break; 763 if (taken = 764 (top == p && 765 U.compareAndSetReference(a, slotOffset(k), t, null))) 766 updateTop(s); 767 if (!internal) 768 unlockPhase(); 769 if (!taken) 770 break; 771 t.doExec(); 772 if (limit != 0 && --limit == 0) 773 break; 774 } 775 } 776 return status; 777 } 778 779 /** 780 * Tries to poll and run AsynchronousCompletionTasks until 781 * none found or blocker is released 782 * 783 * @param blocker the blocker 784 */ 785 final void helpAsyncBlocker(ManagedBlocker blocker) { 786 for (;;) { 787 ForkJoinTask<?>[] a; int b, cap, k; 788 if ((a = array) == null || (cap = a.length) <= 0) 789 break; 790 ForkJoinTask<?> t = a[k = (b = base) & (cap - 1)]; 791 U.loadFence(); 792 if (t == null) { 793 if (top - b <= 0) 794 break; 795 } 796 else if (!(t instanceof CompletableFuture 797 .AsynchronousCompletionTask)) 798 break; 799 if (blocker != null && blocker.isReleasable()) 800 break; 801 if (base == b && t != null && 802 U.compareAndSetReference(a, slotOffset(k), t, null)) { 803 updateBase(b + 1); 804 t.doExec(); 805 } 806 } 807 } 808 809 // misc 810 811 /** 812 * Returns true if internal and not known to be blocked. 813 */ 814 final boolean isApparentlyUnblocked() { 815 Thread wt; Thread.State s; 816 return ((wt = owner) != null && (phase & IDLE) != 0 && 817 (s = wt.getState()) != Thread.State.BLOCKED && 818 s != Thread.State.WAITING && 819 s != Thread.State.TIMED_WAITING); 820 } 821 822 static { 823 U = Unsafe.getUnsafe(); 824 Class<WorkQueue> klass = WorkQueue.class; 825 PHASE = U.objectFieldOffset(klass, "phase"); 826 BASE = U.objectFieldOffset(klass, "base"); 827 TOP = U.objectFieldOffset(klass, "top"); 828 SOURCE = U.objectFieldOffset(klass, "source"); 829 ARRAY = U.objectFieldOffset(klass, "array"); 830 } 831 } 832 833 // static fields (initialized in static initializer below) 834 835 /** 836 * Creates a new ForkJoinWorkerThread. This factory is used unless 837 * overridden in ForkJoinPool constructors. 838 */ 839 public static final ForkJoinWorkerThreadFactory 840 defaultForkJoinWorkerThreadFactory; 841 842 /** 843 * Common (static) pool. Non-null for public use unless a static 844 * construction exception, but internal usages null-check on use 845 * to paranoically avoid potential initialization circularities 846 * as well as to simplify generated code. 847 */ 848 static final ForkJoinPool common; 849 850 /** 851 * Sequence number for creating worker names 852 */ 853 private static volatile int poolIds; 854 855 /** 856 * Permission required for callers of methods that may start or 857 * kill threads. Lazily constructed. 858 */ 859 static volatile RuntimePermission modifyThreadPermission; 860 861 // fields declared in order of their likely layout on most VMs 862 volatile CountDownLatch termination; // lazily constructed 863 final Predicate<? super ForkJoinPool> saturate; 864 final ForkJoinWorkerThreadFactory factory; 865 final UncaughtExceptionHandler ueh; // per-worker UEH 866 final SharedThreadContainer container; 867 final String workerNamePrefix; // null for common pool 868 WorkQueue[] queues; // main registry 869 final long keepAlive; // milliseconds before dropping if idle 870 final long config; // static configuration bits 871 volatile long stealCount; // collects worker nsteals 872 volatile long threadIds; // for worker thread names 873 volatile int runState; // versioned, lockable 874 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 875 volatile long ctl; // main pool control 876 @jdk.internal.vm.annotation.Contended("fjpctl") // colocate 877 int parallelism; // target number of workers 878 879 // Support for atomic operations 880 private static final Unsafe U; 881 private static final long CTL; 882 private static final long RUNSTATE; 883 private static final long PARALLELISM; 884 private static final long THREADIDS; 885 private static final long TERMINATION; 886 private static final Object POOLIDS_BASE; 887 private static final long POOLIDS; 888 889 private boolean compareAndSetCtl(long c, long v) { 890 return U.compareAndSetLong(this, CTL, c, v); 891 } 892 private long compareAndExchangeCtl(long c, long v) { 893 return U.compareAndExchangeLong(this, CTL, c, v); 894 } 895 private long getAndAddCtl(long v) { 896 return U.getAndAddLong(this, CTL, v); 897 } 898 private long incrementThreadIds() { 899 return U.getAndAddLong(this, THREADIDS, 1L); 900 } 901 private static int getAndAddPoolIds(int x) { 902 return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x); 903 } 904 private int getAndSetParallelism(int v) { 905 return U.getAndSetInt(this, PARALLELISM, v); 906 } 907 private int getParallelismOpaque() { 908 return U.getIntOpaque(this, PARALLELISM); 909 } 910 private CountDownLatch cmpExTerminationSignal(CountDownLatch x) { 911 return (CountDownLatch) 912 U.compareAndExchangeReference(this, TERMINATION, null, x); 913 } 914 915 // runState operations 916 917 private int getAndBitwiseOrRunState(int v) { // for status bits 918 return U.getAndBitwiseOrInt(this, RUNSTATE, v); 919 } 920 private boolean casRunState(int c, int v) { 921 return U.compareAndSetInt(this, RUNSTATE, c, v); 922 } 923 private void unlockRunState() { // increment lock bit 924 U.getAndAddInt(this, RUNSTATE, RS_LOCK); 925 } 926 private int lockRunState() { // lock and return current state 927 int s, u; // locked when RS_LOCK set 928 if (((s = runState) & RS_LOCK) == 0 && casRunState(s, u = s + RS_LOCK)) 929 return u; 930 else 931 return spinLockRunState(); 932 } 933 private int spinLockRunState() { // spin/sleep 934 for (int waits = 0, s, u;;) { 935 if (((s = runState) & RS_LOCK) == 0) { 936 if (casRunState(s, u = s + RS_LOCK)) 937 return u; 938 waits = 0; 939 } else if (waits < SPIN_WAITS) { 940 ++waits; 941 Thread.onSpinWait(); 942 } else { 943 if (waits < MIN_SLEEP) 944 waits = MIN_SLEEP; 945 LockSupport.parkNanos(this, (long)waits); 946 if (waits < MAX_SLEEP) 947 waits <<= 1; 948 } 949 } 950 } 951 952 static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask 953 return p != null && (p.runState & STOP) != 0; 954 } 955 956 // Creating, registering, and deregistering workers 957 958 /** 959 * Tries to construct and start one worker. Assumes that total 960 * count has already been incremented as a reservation. Invokes 961 * deregisterWorker on any failure. 962 * 963 * @return true if successful 964 */ 965 private boolean createWorker() { 966 ForkJoinWorkerThreadFactory fac = factory; 967 SharedThreadContainer ctr = container; 968 Throwable ex = null; 969 ForkJoinWorkerThread wt = null; 970 try { 971 if ((runState & STOP) == 0 && // avoid construction if terminating 972 fac != null && (wt = fac.newThread(this)) != null) { 973 if (ctr != null) 974 ctr.start(wt); 975 else 976 wt.start(); 977 return true; 978 } 979 } catch (Throwable rex) { 980 ex = rex; 981 } 982 deregisterWorker(wt, ex); 983 return false; 984 } 985 986 /** 987 * Provides a name for ForkJoinWorkerThread constructor. 988 */ 989 final String nextWorkerThreadName() { 990 String prefix = workerNamePrefix; 991 long tid = incrementThreadIds() + 1L; 992 if (prefix == null) // commonPool has no prefix 993 prefix = "ForkJoinPool.commonPool-worker-"; 994 return prefix.concat(Long.toString(tid)); 995 } 996 997 /** 998 * Finishes initializing and records internal queue. 999 * 1000 * @param w caller's WorkQueue 1001 */ 1002 final void registerWorker(WorkQueue w) { 1003 if (w != null) { 1004 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 1005 ThreadLocalRandom.localInit(); 1006 int seed = w.stackPred = ThreadLocalRandom.getProbe(); 1007 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag 1008 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan 1009 int stop = lockRunState() & STOP; 1010 try { 1011 WorkQueue[] qs; int n; 1012 if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0) { 1013 for (int k = n, m = n - 1; ; id += 2) { 1014 if (qs[id &= m] == null) 1015 break; 1016 if ((k -= 2) <= 0) { 1017 id |= n; 1018 break; 1019 } 1020 } 1021 w.phase = id | phaseSeq; // now publishable 1022 if (id < n) 1023 qs[id] = w; 1024 else { // expand 1025 int an = n << 1, am = an - 1; 1026 WorkQueue[] as = new WorkQueue[an]; 1027 as[id & am] = w; 1028 for (int j = 1; j < n; j += 2) 1029 as[j] = qs[j]; 1030 for (int j = 0; j < n; j += 2) { 1031 WorkQueue q; // shared queues may move 1032 if ((q = qs[j]) != null) 1033 as[q.phase & EXTERNAL_ID_MASK & am] = q; 1034 } 1035 U.storeFence(); // fill before publish 1036 queues = as; 1037 } 1038 } 1039 } finally { 1040 unlockRunState(); 1041 } 1042 } 1043 } 1044 1045 /** 1046 * Final callback from terminating worker, as well as upon failure 1047 * to construct or start a worker. Removes record of worker from 1048 * array, and adjusts counts. If pool is shutting down, tries to 1049 * complete termination. 1050 * 1051 * @param wt the worker thread, or null if construction failed 1052 * @param ex the exception causing failure, or null if none 1053 */ 1054 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1055 WorkQueue w = null; 1056 int src = 0, phase = 0; 1057 boolean replaceable = false; 1058 if (wt != null && (w = wt.workQueue) != null) { 1059 phase = w.phase; 1060 if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout 1061 w.source = DEREGISTERED; 1062 if (phase != 0) { // else failed to start 1063 replaceable = true; 1064 if ((phase & IDLE) != 0) 1065 releaseAll(); // pool stopped before released 1066 } 1067 } 1068 } 1069 if (src != DEREGISTERED) { // decrement counts 1070 long c = ctl; 1071 do {} while (c != (c = compareAndExchangeCtl( 1072 c, ((RC_MASK & (c - RC_UNIT)) | 1073 (TC_MASK & (c - TC_UNIT)) | 1074 (LMASK & c))))); 1075 if (w != null) { // cancel remaining tasks 1076 for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) { 1077 try { 1078 t.cancel(false); 1079 } catch (Throwable ignore) { 1080 } 1081 } 1082 } 1083 } 1084 if ((tryTerminate(false, false) & STOP) == 0 && w != null) { 1085 WorkQueue[] qs; int n, i; // remove index unless terminating 1086 long ns = w.nsteals & 0xffffffffL; 1087 if ((lockRunState() & STOP) != 0) 1088 replaceable = false; 1089 else if ((qs = queues) != null && (n = qs.length) > 0 && 1090 qs[i = phase & SMASK & (n - 1)] == w) { 1091 qs[i] = null; 1092 stealCount += ns; // accumulate steals 1093 } 1094 unlockRunState(); 1095 if (replaceable) 1096 signalWork(null, 0); 1097 } 1098 if (ex != null) 1099 ForkJoinTask.rethrow(ex); 1100 } 1101 1102 /** 1103 * Releases an idle worker, or creates one if not enough exist, 1104 * returning on contention if a signal task is already taken. 1105 * 1106 * @param a if nonnull, a task array holding task signalled 1107 * @param k index of task in array 1108 */ 1109 final void signalWork(ForkJoinTask<?>[] a, int k) { 1110 int pc = parallelism; 1111 for (long c = ctl;;) { 1112 WorkQueue[] qs = queues; 1113 if (a != null && a.length > k && k >= 0 && a[k] == null) 1114 break; 1115 boolean done = false; 1116 WorkQueue v = null; 1117 long nc = 0L, ac = (c + RC_UNIT) & RC_MASK; 1118 int sp = (int)c, i = sp & SMASK; 1119 if ((short)(c >>> RC_SHIFT) >= pc || qs == null || qs.length <= i) 1120 done = true; 1121 else { 1122 WorkQueue w = qs[i]; 1123 if (sp == 0) { 1124 if ((short)(c >>> TC_SHIFT) >= pc) 1125 done = true; 1126 else 1127 nc = ac | ((c + TC_UNIT) & TC_MASK); 1128 } 1129 else if ((v = w) == null) 1130 done = true; 1131 else 1132 nc = ac | (c & TC_MASK) | (v.stackPred & LMASK); 1133 } 1134 if (c == (c = ctl)) { // confirm 1135 if (done) 1136 break; 1137 else if (c == (c = compareAndExchangeCtl(c, nc))) { 1138 if (v == null) 1139 createWorker(); 1140 else { 1141 v.phase = sp; 1142 if (v.parking != 0) 1143 U.unpark(v.owner); 1144 } 1145 break; 1146 } 1147 } 1148 } 1149 } 1150 1151 /** 1152 * Reactivates the given worker, and possibly others if not top of 1153 * ctl stack. Called only during shutdown to ensure release on 1154 * termination. 1155 */ 1156 private void releaseAll() { 1157 for (long c = ctl;;) { 1158 WorkQueue[] qs; WorkQueue v; int sp, i; 1159 if ((sp = (int)c) == 0 || (qs = queues) == null || 1160 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null) 1161 break; 1162 if (c == (c = compareAndExchangeCtl( 1163 c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | 1164 (v.stackPred & LMASK))))) { 1165 v.phase = sp; 1166 if (v.parking != 0) 1167 U.unpark(v.owner); 1168 } 1169 } 1170 } 1171 1172 /** 1173 * Internal version of isQuiescent and related functionality. 1174 * @return positive if stopping, nonnegative if terminating or all 1175 * workers are inactive and submission queues are empty and 1176 * unlocked; if so, setting STOP if shutdown is enabled 1177 */ 1178 private int quiescent() { 1179 outer: for (;;) { 1180 long phaseSum = 0L; 1181 boolean swept = false; 1182 for (int e, prevRunState = 0; ; prevRunState = e) { 1183 long c = ctl; 1184 if (((e = runState) & STOP) != 0) 1185 return 1; // terminating 1186 else if ((c & RC_MASK) > 0L) 1187 return -1; // at least one active 1188 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) { 1189 long sum = c; 1190 WorkQueue[] qs = queues; WorkQueue q; 1191 int n = (qs == null) ? 0 : qs.length; 1192 for (int i = 0; i < n; ++i) { // scan queues 1193 if ((q = qs[i]) != null) { 1194 int p = q.phase, s = q.top, b = q.base; 1195 sum += (p & 0xffffffffL) | ((long)b << 32); 1196 if ((p & IDLE) == 0 || s - b > 0) { 1197 if ((i & 1) == 0 && compareAndSetCtl(c, c)) 1198 signalWork(q.array, q.base); 1199 return -1; 1200 } 1201 } 1202 } 1203 swept = (phaseSum == (phaseSum = sum)); 1204 } 1205 else if ((e & SHUTDOWN) == 0) 1206 return 0; 1207 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) { 1208 releaseAll(); // confirmed 1209 return 1; // enable termination 1210 } 1211 else 1212 break; // restart 1213 } 1214 } 1215 } 1216 1217 /** 1218 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1219 * See above for explanation. 1220 * 1221 * @param w caller's WorkQueue (may be null on failed initialization) 1222 */ 1223 final void runWorker(WorkQueue w) { 1224 if (w != null) { 1225 int cfg = w.config & (FIFO|CLEAR_TLS), r = w.stackPred; 1226 long stat; 1227 do { 1228 r = (int)(stat = scan(w, r, cfg)); 1229 } while ((int)(stat >>> 32) == 0 || 1230 (quiescent() <= 0 && awaitWork(w) == 0)); 1231 } 1232 } 1233 1234 /** 1235 * Scans for and if found executes top-level task 1236 * 1237 * @param w caller's WorkQueue 1238 * @param random seed 1239 * @param cfg config bits 1240 * @return retry status and seed for next use 1241 */ 1242 private long scan(WorkQueue w, int r, int cfg) { 1243 int spinScans = 0; // to rescan after deactivate 1244 while (w != null && (runState & STOP) == 0) { 1245 WorkQueue[] qs = queues; 1246 int n = (qs == null) ? 0 : qs.length; 1247 long prevCtl = ctl; // for signal check 1248 int phase = w.phase; // IDLE set when deactivated 1249 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // advance xorshift 1250 int i = r, step = (r >>> 16) | 1; // scan random permutation 1251 int stalls = 0; // move and restart if stuck 1252 for (int l = n; l > 0; --l, i += step) { 1253 int j; WorkQueue q; 1254 if ((q = qs[j = i & SMASK & (n - 1)]) != null) { 1255 for (;;) { 1256 int cap, b, k; long kp; ForkJoinTask<?>[] a; 1257 if ((a = q.array) == null || (cap = a.length) <= 0) 1258 break; 1259 ForkJoinTask<?> t = a[k = (cap - 1) & (b = q.base)]; 1260 U.loadFence(); 1261 if (q.base != b) 1262 continue; 1263 int nb = b + 1, nk = nb & (cap - 1); 1264 if (U.getReference(a, kp = slotOffset(k)) != t) 1265 ; // screen CAS 1266 else if (t == null) { // check if empty 1267 if (a[nk] == null && 1268 a[(nb + 1) & (cap - 1)] == null) { 1269 if (q.top - b <= 0) 1270 break; // probe slots as filter 1271 } 1272 else if (++stalls > n) 1273 return r & LMASK; // restart to randomly move 1274 else if (stalls != 1) 1275 Thread.onSpinWait(); 1276 } 1277 else if ((phase & IDLE) != 0) { // recheck or reactivate 1278 long sp = w.stackPred & LMASK, sc; int np; 1279 if (((phase = w.phase) & IDLE) != 0) { 1280 if ((np = phase + 1) != (int)(sc = ctl)) 1281 break; // ineligible 1282 if (compareAndSetCtl( 1283 sc, sp | ((sc + RC_UNIT) & UMASK))) 1284 w.phase = np; 1285 } 1286 return r & LMASK; // restart 1287 } 1288 else if (U.compareAndSetReference(a, kp, t, null)) { 1289 q.base = nb; 1290 w.setSource(j); // fully fenced 1291 signalWork(a, nk); // signal if a[nk] nonnull 1292 w.topLevelExec(t, q, nb, cfg); 1293 return r & LMASK; 1294 } 1295 } 1296 } 1297 } 1298 if (w.phase == phase) { 1299 int ac; long c; // avoid missed signals 1300 if (((ac = (short)((c = ctl) >>> RC_SHIFT)) <= 0 || 1301 c == prevCtl || ac < (short)(prevCtl >>> RC_SHIFT))) { 1302 if ((phase & IDLE) == 0) { // try to deactivate 1303 long ap = (phase + (IDLE << 1)) & LMASK; 1304 spinScans = 0; 1305 w.stackPred = (int)c; // set ctl stack link 1306 w.phase = phase | IDLE; 1307 while (c != (c = compareAndExchangeCtl( 1308 c, ap | ((c - RC_UNIT) & UMASK)))) { 1309 if (ac <= (ac = (short)(c >>> RC_SHIFT))) { 1310 w.phase = phase; // nondecreasing; back out 1311 break; 1312 } 1313 w.stackPred = (int)c; // retry 1314 } 1315 } 1316 else if (ac <= 0 || (spinScans += ac) >= SPIN_WAITS) 1317 break; 1318 } 1319 } 1320 } 1321 return (1L << 32) | (r & LMASK); 1322 } 1323 1324 /** 1325 * Awaits signal or termination. 1326 * 1327 * @param w the WorkQueue (may be null if already terminated) 1328 * @return nonzero for exit 1329 */ 1330 private int awaitWork(WorkQueue w) { 1331 int p = IDLE, phase; 1332 if (w != null && (p = (phase = w.phase) & IDLE) != 0) { 1333 int nextPhase = phase + IDLE; 1334 long deadline = 0L, c; // set if all idle and w is ctl top 1335 if (((c = ctl) & RC_MASK) <= 0L && (int)c == nextPhase) { 1336 int np = parallelism, nt = (short)(c >>> TC_SHIFT); 1337 long delay = keepAlive; // scale if not fully populated 1338 if (nt != (nt = Math.max(nt, np)) && nt > 0) 1339 delay = Math.max(TIMEOUT_SLOP, delay / nt); 1340 long d = delay + System.currentTimeMillis(); 1341 deadline = (d == 0L) ? 1L : d; 1342 } 1343 LockSupport.setCurrentBlocker(this); 1344 w.parking = 1; // enable unpark 1345 for (;;) { // emulate LockSupport.park 1346 if ((runState & STOP) != 0) 1347 break; 1348 if ((p = w.phase & IDLE) == 0) 1349 break; 1350 U.park(deadline != 0L, deadline); 1351 if ((p = w.phase & IDLE) == 0) 1352 break; 1353 if ((runState & STOP) != 0) 1354 break; 1355 Thread.interrupted(); // clear for next park 1356 if (deadline != 0L && // try to trim 1357 deadline - System.currentTimeMillis() < TIMEOUT_SLOP) { 1358 long sp = w.stackPred & LMASK, dc = ctl; 1359 long nc = sp | (UMASK & (dc - TC_UNIT)); 1360 if ((int)dc == nextPhase && compareAndSetCtl(dc, nc)) { 1361 WorkQueue[] qs; WorkQueue v; int vp, i; 1362 w.source = DEREGISTERED; 1363 w.phase = nextPhase; // try to wake up next waiter 1364 if ((vp = (int)nc) != 0 && (qs = queues) != null && 1365 qs.length > (i = vp & SMASK) && 1366 (v = qs[i]) != null && 1367 compareAndSetCtl(nc, ((UMASK & (nc + RC_UNIT)) | 1368 (nc & TC_MASK) | 1369 (v.stackPred & LMASK)))) { 1370 v.phase = vp; 1371 U.unpark(v.owner); 1372 } 1373 break; 1374 } 1375 deadline = 0L; // no longer trimmable 1376 } 1377 } 1378 w.parking = 0; // disable unpark 1379 LockSupport.setCurrentBlocker(null); 1380 } 1381 return p; 1382 } 1383 1384 /** 1385 * Scans for and returns a polled task, if available. Used only 1386 * for untracked polls. Begins scan at a random index to avoid 1387 * systematic unfairness. 1388 * 1389 * @param submissionsOnly if true, only scan submission queues 1390 */ 1391 private ForkJoinTask<?> pollScan(boolean submissionsOnly) { 1392 if ((runState & STOP) == 0) { 1393 WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t; 1394 int r = ThreadLocalRandom.nextSecondarySeed(); 1395 if (submissionsOnly) // even indices only 1396 r &= ~1; 1397 int step = (submissionsOnly) ? 2 : 1; 1398 if ((qs = queues) != null && (n = qs.length) > 0) { 1399 for (int i = n; i > 0; i -= step, r += step) { 1400 if ((q = qs[r & (n - 1)]) != null && 1401 (t = q.poll()) != null) 1402 return t; 1403 } 1404 } 1405 } 1406 return null; 1407 } 1408 1409 /** 1410 * Tries to decrement counts (sometimes implicitly) and possibly 1411 * arrange for a compensating worker in preparation for 1412 * blocking. May fail due to interference, in which case -1 is 1413 * returned so caller may retry. A zero return value indicates 1414 * that the caller doesn't need to re-adjust counts when later 1415 * unblocked. 1416 * 1417 * @param c incoming ctl value 1418 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry 1419 */ 1420 private int tryCompensate(long c) { 1421 Predicate<? super ForkJoinPool> sat; 1422 long b = config; 1423 int pc = parallelism, // unpack fields 1424 minActive = (short)(b >>> RC_SHIFT), 1425 maxTotal = (short)(b >>> TC_SHIFT) + pc, 1426 active = (short)(c >>> RC_SHIFT), 1427 total = (short)(c >>> TC_SHIFT), 1428 sp = (int)c, 1429 stat = -1; // default retry return 1430 if (sp != 0 && active <= pc) { // activate idle worker 1431 WorkQueue[] qs; WorkQueue v; int i; 1432 if ((qs = queues) != null && qs.length > (i = sp & SMASK) && 1433 (v = qs[i]) != null && 1434 compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { 1435 v.phase = sp; 1436 if (v.parking != 0) 1437 U.unpark(v.owner); 1438 stat = UNCOMPENSATE; 1439 } 1440 } 1441 else if (active > minActive && total >= pc) { // reduce active workers 1442 if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) 1443 stat = UNCOMPENSATE; 1444 } 1445 else if (total < maxTotal && total < MAX_CAP) { // try to expand pool 1446 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 1447 if ((runState & STOP) != 0) // terminating 1448 stat = 0; 1449 else if (compareAndSetCtl(c, nc)) 1450 stat = createWorker() ? UNCOMPENSATE : 0; 1451 } 1452 else if (!compareAndSetCtl(c, c)) // validate 1453 ; 1454 else if ((sat = saturate) != null && sat.test(this)) 1455 stat = 0; 1456 else 1457 throw new RejectedExecutionException( 1458 "Thread limit exceeded replacing blocked worker"); 1459 return stat; 1460 } 1461 1462 /** 1463 * Readjusts RC count; called from ForkJoinTask after blocking. 1464 */ 1465 final void uncompensate() { 1466 getAndAddCtl(RC_UNIT); 1467 } 1468 1469 /** 1470 * Helps if possible until the given task is done. Processes 1471 * compatible local tasks and scans other queues for task produced 1472 * by w's stealers; returning compensated blocking sentinel if 1473 * none are found. 1474 * 1475 * @param task the task 1476 * @param w caller's WorkQueue 1477 * @param internal true if w is owned by a ForkJoinWorkerThread 1478 * @return task status on exit, or UNCOMPENSATE for compensated blocking 1479 */ 1480 1481 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) { 1482 if (w != null) 1483 w.tryRemoveAndExec(task, internal); 1484 int s = 0; 1485 if (task != null && (s = task.status) >= 0 && internal && w != null) { 1486 int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source; 1487 long sctl = 0L; // track stability 1488 outer: for (boolean rescan = true;;) { 1489 if ((s = task.status) < 0) 1490 break; 1491 if (!rescan) { 1492 if ((runState & STOP) != 0) 1493 break; 1494 if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0) 1495 break; 1496 } 1497 rescan = false; 1498 WorkQueue[] qs = queues; 1499 int n = (qs == null) ? 0 : qs.length; 1500 scan: for (int l = n >>> 1; l > 0; --l, r += 2) { 1501 int j; WorkQueue q; 1502 if ((q = qs[j = r & SMASK & (n - 1)]) != null) { 1503 for (;;) { 1504 int sq = q.source, b, cap, k; ForkJoinTask<?>[] a; 1505 if ((a = q.array) == null || (cap = a.length) <= 0) 1506 break; 1507 ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; 1508 U.loadFence(); 1509 boolean eligible = false; 1510 if (t == task) 1511 eligible = true; 1512 else if (t != null) { // check steal chain 1513 for (int v = sq, d = cap;;) { 1514 WorkQueue p; 1515 if (v == wid) { 1516 eligible = true; 1517 break; 1518 } 1519 if ((v & 1) == 0 || // external or none 1520 --d < 0 || // bound depth 1521 (p = qs[v & (n - 1)]) == null) 1522 break; 1523 v = p.source; 1524 } 1525 } 1526 if ((s = task.status) < 0) 1527 break outer; // validate 1528 if (q.source == sq && q.base == b && a[k] == t) { 1529 int nb = b + 1, nk = nb & (cap - 1); 1530 if (!eligible) { // revisit if nonempty 1531 if (!rescan && t == null && 1532 (a[nk] != null || q.top - b > 0)) 1533 rescan = true; 1534 break; 1535 } 1536 if (U.compareAndSetReference( 1537 a, slotOffset(k), t, null)) { 1538 q.updateBase(nb); 1539 w.source = j; 1540 t.doExec(); 1541 w.source = wsrc; 1542 rescan = true; // restart at index r 1543 break scan; 1544 } 1545 } 1546 } 1547 } 1548 } 1549 } 1550 } 1551 return s; 1552 } 1553 1554 /** 1555 * Version of helpJoin for CountedCompleters. 1556 * 1557 * @param task root of computation (only called when a CountedCompleter) 1558 * @param w caller's WorkQueue 1559 * @param internal true if w is owned by a ForkJoinWorkerThread 1560 * @return task status on exit, or UNCOMPENSATE for compensated blocking 1561 */ 1562 final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) { 1563 int s = 0; 1564 if (task != null && (s = task.status) >= 0 && w != null) { 1565 int r = w.phase + 1; // for indexing 1566 long sctl = 0L; // track stability 1567 outer: for (boolean rescan = true, locals = true;;) { 1568 if (locals && (s = w.helpComplete(task, internal, 0)) < 0) 1569 break; 1570 if ((s = task.status) < 0) 1571 break; 1572 if (!rescan) { 1573 if ((runState & STOP) != 0) 1574 break; 1575 if (sctl == (sctl = ctl) && 1576 (!internal || (s = tryCompensate(sctl)) >= 0)) 1577 break; 1578 } 1579 rescan = locals = false; 1580 WorkQueue[] qs = queues; 1581 int n = (qs == null) ? 0 : qs.length; 1582 scan: for (int l = n; l > 0; --l, ++r) { 1583 int j; WorkQueue q; 1584 if ((q = qs[j = r & SMASK & (n - 1)]) != null) { 1585 for (;;) { 1586 ForkJoinTask<?>[] a; int b, cap, k; 1587 if ((a = q.array) == null || (cap = a.length) <= 0) 1588 break; 1589 ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; 1590 U.loadFence(); 1591 boolean eligible = false; 1592 if (t instanceof CountedCompleter) { 1593 CountedCompleter<?> f = (CountedCompleter<?>)t; 1594 for (int steps = cap; steps > 0; --steps) { 1595 if (f == task) { 1596 eligible = true; 1597 break; 1598 } 1599 if ((f = f.completer) == null) 1600 break; 1601 } 1602 } 1603 if ((s = task.status) < 0) // validate 1604 break outer; 1605 if (q.base == b) { 1606 int nb = b + 1, nk = nb & (cap - 1); 1607 if (eligible) { 1608 if (U.compareAndSetReference( 1609 a, slotOffset(k), t, null)) { 1610 q.updateBase(nb); 1611 t.doExec(); 1612 locals = rescan = true; 1613 break scan; 1614 } 1615 } 1616 else if (a[k] == t) { 1617 if (!rescan && t == null && 1618 (a[nk] != null || q.top - b > 0)) 1619 rescan = true; // revisit 1620 break; 1621 } 1622 } 1623 } 1624 } 1625 } 1626 } 1627 } 1628 return s; 1629 } 1630 1631 /** 1632 * Runs tasks until all workers are inactive and no tasks are 1633 * found. Rather than blocking when tasks cannot be found, rescans 1634 * until all others cannot find tasks either. 1635 * 1636 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 1637 * @param interruptible true if return on interrupt 1638 * @return positive if quiescent, negative if interrupted, else 0 1639 */ 1640 private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) { 1641 int phase; // w.phase inactive bit set when temporarily quiescent 1642 if (w == null || ((phase = w.phase) & IDLE) != 0) 1643 return 0; 1644 int wsrc = w.source; 1645 long startTime = System.nanoTime(); 1646 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos 1647 long prevSum = 0L; 1648 int activePhase = phase, inactivePhase = phase + IDLE; 1649 int r = phase + 1, waits = 0, returnStatus = 1; 1650 boolean locals = true; 1651 for (int e = runState;;) { 1652 if ((e & STOP) != 0) 1653 break; // terminating 1654 if (interruptible && Thread.interrupted()) { 1655 returnStatus = -1; 1656 break; 1657 } 1658 if (locals) { // run local tasks before (re)polling 1659 locals = false; 1660 for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;) 1661 u.doExec(); 1662 } 1663 WorkQueue[] qs = queues; 1664 int n = (qs == null) ? 0 : qs.length; 1665 long phaseSum = 0L; 1666 boolean rescan = false, busy = false; 1667 scan: for (int l = n; l > 0; --l, ++r) { 1668 int j; WorkQueue q; 1669 if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) { 1670 for (;;) { 1671 ForkJoinTask<?>[] a; int b, cap, k; 1672 if ((a = q.array) == null || (cap = a.length) <= 0) 1673 break; 1674 ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; 1675 if (t != null && phase == inactivePhase) // reactivate 1676 w.phase = phase = activePhase; 1677 U.loadFence(); 1678 if (q.base == b && a[k] == t) { 1679 int nb = b + 1; 1680 if (t == null) { 1681 if (!rescan) { 1682 int qp = q.phase, mq = qp & (IDLE | 1); 1683 phaseSum += qp; 1684 if (mq == 0 || q.top - b > 0) 1685 rescan = true; 1686 else if (mq == 1) 1687 busy = true; 1688 } 1689 break; 1690 } 1691 if (U.compareAndSetReference( 1692 a, slotOffset(k), t, null)) { 1693 q.updateBase(nb); 1694 w.source = j; 1695 t.doExec(); 1696 w.source = wsrc; 1697 rescan = locals = true; 1698 break scan; 1699 } 1700 } 1701 } 1702 } 1703 } 1704 if (e != (e = runState) || prevSum != (prevSum = phaseSum) || 1705 rescan || (e & RS_LOCK) != 0) 1706 ; // inconsistent 1707 else if (!busy) 1708 break; 1709 else if (phase == activePhase) { 1710 waits = 0; // recheck, then sleep 1711 w.phase = phase = inactivePhase; 1712 } 1713 else if (System.nanoTime() - startTime > nanos) { 1714 returnStatus = 0; // timed out 1715 break; 1716 } 1717 else if (waits == 0) // same as spinLockRunState except 1718 waits = MIN_SLEEP; // with rescan instead of onSpinWait 1719 else { 1720 LockSupport.parkNanos(this, (long)waits); 1721 if (waits < maxSleep) 1722 waits <<= 1; 1723 } 1724 } 1725 w.phase = activePhase; 1726 return returnStatus; 1727 } 1728 1729 /** 1730 * Helps quiesce from external caller until done, interrupted, or timeout 1731 * 1732 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 1733 * @param interruptible true if return on interrupt 1734 * @return positive if quiescent, negative if interrupted, else 0 1735 */ 1736 private int externalHelpQuiesce(long nanos, boolean interruptible) { 1737 if (quiescent() < 0) { 1738 long startTime = System.nanoTime(); 1739 long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); 1740 for (int waits = 0;;) { 1741 ForkJoinTask<?> t; 1742 if (interruptible && Thread.interrupted()) 1743 return -1; 1744 else if ((t = pollScan(false)) != null) { 1745 waits = 0; 1746 t.doExec(); 1747 } 1748 else if (quiescent() >= 0) 1749 break; 1750 else if (System.nanoTime() - startTime > nanos) 1751 return 0; 1752 else if (waits == 0) 1753 waits = MIN_SLEEP; 1754 else { 1755 LockSupport.parkNanos(this, (long)waits); 1756 if (waits < maxSleep) 1757 waits <<= 1; 1758 } 1759 } 1760 } 1761 return 1; 1762 } 1763 1764 /** 1765 * Helps quiesce from either internal or external caller 1766 * 1767 * @param pool the pool to use, or null if any 1768 * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) 1769 * @param interruptible true if return on interrupt 1770 * @return positive if quiescent, negative if interrupted, else 0 1771 */ 1772 static final int helpQuiescePool(ForkJoinPool pool, long nanos, 1773 boolean interruptible) { 1774 Thread t; ForkJoinPool p; ForkJoinWorkerThread wt; 1775 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && 1776 (p = (wt = (ForkJoinWorkerThread)t).pool) != null && 1777 (p == pool || pool == null)) 1778 return p.helpQuiesce(wt.workQueue, nanos, interruptible); 1779 else if ((p = pool) != null || (p = common) != null) 1780 return p.externalHelpQuiesce(nanos, interruptible); 1781 else 1782 return 0; 1783 } 1784 1785 /** 1786 * Gets and removes a local or stolen task for the given worker. 1787 * 1788 * @return a task, if available 1789 */ 1790 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 1791 ForkJoinTask<?> t; 1792 if (w == null || (t = w.nextLocalTask()) == null) 1793 t = pollScan(false); 1794 return t; 1795 } 1796 1797 // External operations 1798 1799 /** 1800 * Finds and locks a WorkQueue for an external submitter, or 1801 * throws RejectedExecutionException if shutdown or terminating. 1802 * @param r current ThreadLocalRandom.getProbe() value 1803 * @param isSubmit false if this is for a common pool fork 1804 */ 1805 private WorkQueue submissionQueue(int r) { 1806 if (r == 0) { 1807 ThreadLocalRandom.localInit(); // initialize caller's probe 1808 r = ThreadLocalRandom.getProbe(); 1809 } 1810 for (;;) { 1811 int n, i, id; WorkQueue[] qs; WorkQueue q; 1812 if ((qs = queues) == null) 1813 break; 1814 if ((n = qs.length) <= 0) 1815 break; 1816 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { 1817 WorkQueue w = new WorkQueue(null, id, (int)config, false); 1818 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 1819 int stop = lockRunState() & STOP; 1820 if (stop == 0 && queues == qs && qs[i] == null) 1821 q = qs[i] = w; // else discard; retry 1822 unlockRunState(); 1823 if (q != null) 1824 return q; 1825 if (stop != 0) 1826 break; 1827 } 1828 else if (!q.tryLockPhase()) // move index 1829 r = ThreadLocalRandom.advanceProbe(r); 1830 else if ((runState & SHUTDOWN) != 0) { 1831 q.unlockPhase(); // check while q lock held 1832 break; 1833 } 1834 else 1835 return q; 1836 } 1837 tryTerminate(false, false); 1838 throw new RejectedExecutionException(); 1839 } 1840 1841 private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) { 1842 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; 1843 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 1844 (wt = (ForkJoinWorkerThread)t).pool == this) { 1845 internal = true; 1846 q = wt.workQueue; 1847 } 1848 else { // find and lock queue 1849 internal = false; 1850 q = submissionQueue(ThreadLocalRandom.getProbe()); 1851 } 1852 q.push(task, signalIfEmpty ? this : null, internal); 1853 } 1854 1855 /** 1856 * Returns queue for an external submission, bypassing call to 1857 * submissionQueue if already established and unlocked. 1858 */ 1859 final WorkQueue externalSubmissionQueue() { 1860 WorkQueue[] qs; WorkQueue q; int n; 1861 int r = ThreadLocalRandom.getProbe(); 1862 return (((qs = queues) != null && (n = qs.length) > 0 && 1863 (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && 1864 q.tryLockPhase()) ? q : submissionQueue(r)); 1865 } 1866 1867 /** 1868 * Returns queue for an external thread, if one exists that has 1869 * possibly ever submitted to the given pool (nonzero probe), or 1870 * null if none. 1871 */ 1872 static WorkQueue externalQueue(ForkJoinPool p) { 1873 WorkQueue[] qs; int n; 1874 int r = ThreadLocalRandom.getProbe(); 1875 return (p != null && (qs = p.queues) != null && 1876 (n = qs.length) > 0 && r != 0) ? 1877 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null; 1878 } 1879 1880 /** 1881 * Returns external queue for common pool. 1882 */ 1883 static WorkQueue commonQueue() { 1884 return externalQueue(common); 1885 } 1886 1887 /** 1888 * If the given executor is a ForkJoinPool, poll and execute 1889 * AsynchronousCompletionTasks from worker's queue until none are 1890 * available or blocker is released. 1891 */ 1892 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 1893 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; 1894 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 1895 (wt = (ForkJoinWorkerThread)t).pool == e) 1896 w = wt.workQueue; 1897 else if (e instanceof ForkJoinPool) 1898 w = externalQueue((ForkJoinPool)e); 1899 if (w != null) 1900 w.helpAsyncBlocker(blocker); 1901 } 1902 1903 /** 1904 * Returns a cheap heuristic guide for task partitioning when 1905 * programmers, frameworks, tools, or languages have little or no 1906 * idea about task granularity. In essence, by offering this 1907 * method, we ask users only about tradeoffs in overhead vs 1908 * expected throughput and its variance, rather than how finely to 1909 * partition tasks. 1910 * 1911 * In a steady state strict (tree-structured) computation, each 1912 * thread makes available for stealing enough tasks for other 1913 * threads to remain active. Inductively, if all threads play by 1914 * the same rules, each thread should make available only a 1915 * constant number of tasks. 1916 * 1917 * The minimum useful constant is just 1. But using a value of 1 1918 * would require immediate replenishment upon each steal to 1919 * maintain enough tasks, which is infeasible. Further, 1920 * partitionings/granularities of offered tasks should minimize 1921 * steal rates, which in general means that threads nearer the top 1922 * of computation tree should generate more than those nearer the 1923 * bottom. In perfect steady state, each thread is at 1924 * approximately the same level of computation tree. However, 1925 * producing extra tasks amortizes the uncertainty of progress and 1926 * diffusion assumptions. 1927 * 1928 * So, users will want to use values larger (but not much larger) 1929 * than 1 to both smooth over transient shortages and hedge 1930 * against uneven progress; as traded off against the cost of 1931 * extra task overhead. We leave the user to pick a threshold 1932 * value to compare with the results of this call to guide 1933 * decisions, but recommend values such as 3. 1934 * 1935 * When all threads are active, it is on average OK to estimate 1936 * surplus strictly locally. In steady-state, if one thread is 1937 * maintaining say 2 surplus tasks, then so are others. So we can 1938 * just use estimated queue length. However, this strategy alone 1939 * leads to serious mis-estimates in some non-steady-state 1940 * conditions (ramp-up, ramp-down, other stalls). We can detect 1941 * many of these by further considering the number of "idle" 1942 * threads, that are known to have zero queued tasks, so 1943 * compensate by a factor of (#idle/#active) threads. 1944 */ 1945 static int getSurplusQueuedTaskCount() { 1946 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 1947 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 1948 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null && 1949 (q = wt.workQueue) != null) { 1950 int n = q.top - q.base; 1951 int p = pool.parallelism; 1952 int a = (short)(pool.ctl >>> RC_SHIFT); 1953 return n - (a > (p >>>= 1) ? 0 : 1954 a > (p >>>= 1) ? 1 : 1955 a > (p >>>= 1) ? 2 : 1956 a > (p >>>= 1) ? 4 : 1957 8); 1958 } 1959 return 0; 1960 } 1961 1962 // Termination 1963 1964 /** 1965 * Possibly initiates and/or completes pool termination. 1966 * 1967 * @param now if true, unconditionally terminate, else only 1968 * if no work and no active workers 1969 * @param enable if true, terminate when next possible 1970 * @return runState on exit 1971 */ 1972 private int tryTerminate(boolean now, boolean enable) { 1973 int e = runState, isShutdown; 1974 if ((e & STOP) == 0) { 1975 if (now) { 1976 runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN; 1977 releaseAll(); 1978 } 1979 else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) { 1980 if (isShutdown == 0) 1981 getAndBitwiseOrRunState(SHUTDOWN); 1982 if (quiescent() > 0) 1983 e = runState; 1984 } 1985 } 1986 if ((e & (STOP | TERMINATED)) == STOP) { 1987 if ((ctl & RC_MASK) > 0L) { // avoid if quiescent shutdown 1988 helpTerminate(now); 1989 e = runState; 1990 } 1991 if ((e & TERMINATED) == 0 && ctl == 0L) { 1992 e |= TERMINATED; 1993 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) { 1994 CountDownLatch done; SharedThreadContainer ctr; 1995 if ((done = termination) != null) 1996 done.countDown(); 1997 if ((ctr = container) != null) 1998 ctr.close(); 1999 } 2000 } 2001 } 2002 return e; 2003 } 2004 2005 /** 2006 * Cancels tasks and interrupts workers 2007 */ 2008 private void helpTerminate(boolean now) { 2009 Thread current = Thread.currentThread(); 2010 int r = (int)current.threadId(); // stagger traversals 2011 WorkQueue[] qs = queues; 2012 int n = (qs == null) ? 0 : qs.length; 2013 for (int l = n; l > 0; --l, ++r) { 2014 WorkQueue q; ForkJoinTask<?> t; Thread o; 2015 int j = r & SMASK & (n - 1); 2016 if ((q = qs[j]) != null && q.source != DEREGISTERED) { 2017 while ((t = q.poll()) != null) { 2018 try { 2019 t.cancel(false); 2020 } catch (Throwable ignore) { 2021 } 2022 } 2023 if ((r & 1) != 0 && (o = q.owner) != null && 2024 o != current && q.source != DEREGISTERED && 2025 (now || !o.isInterrupted())) { 2026 try { 2027 o.interrupt(); 2028 } catch (Throwable ignore) { 2029 } 2030 } 2031 } 2032 } 2033 } 2034 2035 /** 2036 * Returns termination signal, constructing if necessary 2037 */ 2038 private CountDownLatch terminationSignal() { 2039 CountDownLatch signal, s, u; 2040 if ((signal = termination) == null) 2041 signal = ((u = cmpExTerminationSignal( 2042 s = new CountDownLatch(1))) == null) ? s : u; 2043 return signal; 2044 } 2045 2046 // Exported methods 2047 2048 // Constructors 2049 2050 /** 2051 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2052 * java.lang.Runtime#availableProcessors}, using defaults for all 2053 * other parameters (see {@link #ForkJoinPool(int, 2054 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2055 * int, int, int, Predicate, long, TimeUnit)}). 2056 * 2057 * @throws SecurityException if a security manager exists and 2058 * the caller is not permitted to modify threads 2059 * because it does not hold {@link 2060 * java.lang.RuntimePermission}{@code ("modifyThread")} 2061 */ 2062 public ForkJoinPool() { 2063 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2064 defaultForkJoinWorkerThreadFactory, null, false, 2065 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2066 } 2067 2068 /** 2069 * Creates a {@code ForkJoinPool} with the indicated parallelism 2070 * level, using defaults for all other parameters (see {@link 2071 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2072 * UncaughtExceptionHandler, boolean, int, int, int, Predicate, 2073 * long, TimeUnit)}). 2074 * 2075 * @param parallelism the parallelism level 2076 * @throws IllegalArgumentException if parallelism less than or 2077 * equal to zero, or greater than implementation limit 2078 * @throws SecurityException if a security manager exists and 2079 * the caller is not permitted to modify threads 2080 * because it does not hold {@link 2081 * java.lang.RuntimePermission}{@code ("modifyThread")} 2082 */ 2083 public ForkJoinPool(int parallelism) { 2084 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 2085 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2086 } 2087 2088 /** 2089 * Creates a {@code ForkJoinPool} with the given parameters (using 2090 * defaults for others -- see {@link #ForkJoinPool(int, 2091 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2092 * int, int, int, Predicate, long, TimeUnit)}). 2093 * 2094 * @param parallelism the parallelism level. For default value, 2095 * use {@link java.lang.Runtime#availableProcessors}. 2096 * @param factory the factory for creating new threads. For default value, 2097 * use {@link #defaultForkJoinWorkerThreadFactory}. 2098 * @param handler the handler for internal worker threads that 2099 * terminate due to unrecoverable errors encountered while executing 2100 * tasks. For default value, use {@code null}. 2101 * @param asyncMode if true, 2102 * establishes local first-in-first-out scheduling mode for forked 2103 * tasks that are never joined. This mode may be more appropriate 2104 * than default locally stack-based mode in applications in which 2105 * worker threads only process event-style asynchronous tasks. 2106 * For default value, use {@code false}. 2107 * @throws IllegalArgumentException if parallelism less than or 2108 * equal to zero, or greater than implementation limit 2109 * @throws NullPointerException if the factory is null 2110 * @throws SecurityException if a security manager exists and 2111 * the caller is not permitted to modify threads 2112 * because it does not hold {@link 2113 * java.lang.RuntimePermission}{@code ("modifyThread")} 2114 */ 2115 public ForkJoinPool(int parallelism, 2116 ForkJoinWorkerThreadFactory factory, 2117 UncaughtExceptionHandler handler, 2118 boolean asyncMode) { 2119 this(parallelism, factory, handler, asyncMode, 2120 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2121 } 2122 2123 /** 2124 * Creates a {@code ForkJoinPool} with the given parameters. 2125 * 2126 * @param parallelism the parallelism level. For default value, 2127 * use {@link java.lang.Runtime#availableProcessors}. 2128 * 2129 * @param factory the factory for creating new threads. For 2130 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 2131 * 2132 * @param handler the handler for internal worker threads that 2133 * terminate due to unrecoverable errors encountered while 2134 * executing tasks. For default value, use {@code null}. 2135 * 2136 * @param asyncMode if true, establishes local first-in-first-out 2137 * scheduling mode for forked tasks that are never joined. This 2138 * mode may be more appropriate than default locally stack-based 2139 * mode in applications in which worker threads only process 2140 * event-style asynchronous tasks. For default value, use {@code 2141 * false}. 2142 * 2143 * @param corePoolSize the number of threads to keep in the pool 2144 * (unless timed out after an elapsed keep-alive). Normally (and 2145 * by default) this is the same value as the parallelism level, 2146 * but may be set to a larger value to reduce dynamic overhead if 2147 * tasks regularly block. Using a smaller value (for example 2148 * {@code 0}) has the same effect as the default. 2149 * 2150 * @param maximumPoolSize the maximum number of threads allowed. 2151 * When the maximum is reached, attempts to replace blocked 2152 * threads fail. (However, because creation and termination of 2153 * different threads may overlap, and may be managed by the given 2154 * thread factory, this value may be transiently exceeded.) To 2155 * arrange the same value as is used by default for the common 2156 * pool, use {@code 256} plus the {@code parallelism} level. (By 2157 * default, the common pool allows a maximum of 256 spare 2158 * threads.) Using a value (for example {@code 2159 * Integer.MAX_VALUE}) larger than the implementation's total 2160 * thread limit has the same effect as using this limit (which is 2161 * the default). 2162 * 2163 * @param minimumRunnable the minimum allowed number of core 2164 * threads not blocked by a join or {@link ManagedBlocker}. To 2165 * ensure progress, when too few unblocked threads exist and 2166 * unexecuted tasks may exist, new threads are constructed, up to 2167 * the given maximumPoolSize. For the default value, use {@code 2168 * 1}, that ensures liveness. A larger value might improve 2169 * throughput in the presence of blocked activities, but might 2170 * not, due to increased overhead. A value of zero may be 2171 * acceptable when submitted tasks cannot have dependencies 2172 * requiring additional threads. 2173 * 2174 * @param saturate if non-null, a predicate invoked upon attempts 2175 * to create more than the maximum total allowed threads. By 2176 * default, when a thread is about to block on a join or {@link 2177 * ManagedBlocker}, but cannot be replaced because the 2178 * maximumPoolSize would be exceeded, a {@link 2179 * RejectedExecutionException} is thrown. But if this predicate 2180 * returns {@code true}, then no exception is thrown, so the pool 2181 * continues to operate with fewer than the target number of 2182 * runnable threads, which might not ensure progress. 2183 * 2184 * @param keepAliveTime the elapsed time since last use before 2185 * a thread is terminated (and then later replaced if needed). 2186 * For the default value, use {@code 60, TimeUnit.SECONDS}. 2187 * 2188 * @param unit the time unit for the {@code keepAliveTime} argument 2189 * 2190 * @throws IllegalArgumentException if parallelism is less than or 2191 * equal to zero, or is greater than implementation limit, 2192 * or if maximumPoolSize is less than parallelism, 2193 * of if the keepAliveTime is less than or equal to zero. 2194 * @throws NullPointerException if the factory is null 2195 * @throws SecurityException if a security manager exists and 2196 * the caller is not permitted to modify threads 2197 * because it does not hold {@link 2198 * java.lang.RuntimePermission}{@code ("modifyThread")} 2199 * @since 9 2200 */ 2201 public ForkJoinPool(int parallelism, 2202 ForkJoinWorkerThreadFactory factory, 2203 UncaughtExceptionHandler handler, 2204 boolean asyncMode, 2205 int corePoolSize, 2206 int maximumPoolSize, 2207 int minimumRunnable, 2208 Predicate<? super ForkJoinPool> saturate, 2209 long keepAliveTime, 2210 TimeUnit unit) { 2211 checkPermission(); 2212 int p = parallelism; 2213 if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L) 2214 throw new IllegalArgumentException(); 2215 if (factory == null || unit == null) 2216 throw new NullPointerException(); 2217 int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); 2218 this.parallelism = p; 2219 this.factory = factory; 2220 this.ueh = handler; 2221 this.saturate = saturate; 2222 this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); 2223 int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP); 2224 int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP); 2225 this.config = (((asyncMode ? FIFO : 0) & LMASK) | 2226 (((long)maxSpares) << TC_SHIFT) | 2227 (((long)minAvail) << RC_SHIFT)); 2228 this.queues = new WorkQueue[size]; 2229 String pid = Integer.toString(getAndAddPoolIds(1) + 1); 2230 String name = "ForkJoinPool-" + pid; 2231 this.workerNamePrefix = name + "-worker-"; 2232 this.container = SharedThreadContainer.create(name); 2233 } 2234 2235 /** 2236 * Constructor for common pool using parameters possibly 2237 * overridden by system properties 2238 */ 2239 private ForkJoinPool(byte forCommonPoolOnly) { 2240 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory; 2241 UncaughtExceptionHandler handler = null; 2242 int maxSpares = DEFAULT_COMMON_MAX_SPARES; 2243 int pc = 0, preset = 0; // nonzero if size set as property 2244 try { // ignore exceptions in accessing/parsing properties 2245 String pp = System.getProperty 2246 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 2247 if (pp != null) { 2248 pc = Math.max(0, Integer.parseInt(pp)); 2249 preset = PRESET_SIZE; 2250 } 2251 String ms = System.getProperty 2252 ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); 2253 if (ms != null) 2254 maxSpares = Math.clamp(Integer.parseInt(ms), 0, MAX_CAP); 2255 String sf = System.getProperty 2256 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 2257 String sh = System.getProperty 2258 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 2259 if (sf != null || sh != null) { 2260 ClassLoader ldr = ClassLoader.getSystemClassLoader(); 2261 if (sf != null) 2262 fac = (ForkJoinWorkerThreadFactory) 2263 ldr.loadClass(sf).getConstructor().newInstance(); 2264 if (sh != null) 2265 handler = (UncaughtExceptionHandler) 2266 ldr.loadClass(sh).getConstructor().newInstance(); 2267 } 2268 } catch (Exception ignore) { 2269 } 2270 if (preset == 0) 2271 pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); 2272 int p = Math.min(pc, MAX_CAP); 2273 int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1)); 2274 this.parallelism = p; 2275 this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) | 2276 (1L << RC_SHIFT)); 2277 this.factory = fac; 2278 this.ueh = handler; 2279 this.keepAlive = DEFAULT_KEEPALIVE; 2280 this.saturate = null; 2281 this.workerNamePrefix = null; 2282 this.queues = new WorkQueue[size]; 2283 this.container = SharedThreadContainer.create("ForkJoinPool.commonPool"); 2284 } 2285 2286 /** 2287 * Returns the common pool instance. This pool is statically 2288 * constructed; its run state is unaffected by attempts to {@link 2289 * #shutdown} or {@link #shutdownNow}. However this pool and any 2290 * ongoing processing are automatically terminated upon program 2291 * {@link System#exit}. Any program that relies on asynchronous 2292 * task processing to complete before program termination should 2293 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2294 * before exit. 2295 * 2296 * @return the common pool instance 2297 * @since 1.8 2298 */ 2299 public static ForkJoinPool commonPool() { 2300 // assert common != null : "static init error"; 2301 return common; 2302 } 2303 2304 // Execution methods 2305 2306 /** 2307 * Performs the given task, returning its result upon completion. 2308 * If the computation encounters an unchecked Exception or Error, 2309 * it is rethrown as the outcome of this invocation. Rethrown 2310 * exceptions behave in the same way as regular exceptions, but, 2311 * when possible, contain stack traces (as displayed for example 2312 * using {@code ex.printStackTrace()}) of both the current thread 2313 * as well as the thread actually encountering the exception; 2314 * minimally only the latter. 2315 * 2316 * @param task the task 2317 * @param <T> the type of the task's result 2318 * @return the task's result 2319 * @throws NullPointerException if the task is null 2320 * @throws RejectedExecutionException if the task cannot be 2321 * scheduled for execution 2322 */ 2323 public <T> T invoke(ForkJoinTask<T> task) { 2324 Objects.requireNonNull(task); 2325 poolSubmit(true, task); 2326 try { 2327 return task.join(); 2328 } catch (RuntimeException | Error unchecked) { 2329 throw unchecked; 2330 } catch (Exception checked) { 2331 throw new RuntimeException(checked); 2332 } 2333 } 2334 2335 /** 2336 * Arranges for (asynchronous) execution of the given task. 2337 * 2338 * @param task the task 2339 * @throws NullPointerException if the task is null 2340 * @throws RejectedExecutionException if the task cannot be 2341 * scheduled for execution 2342 */ 2343 public void execute(ForkJoinTask<?> task) { 2344 Objects.requireNonNull(task); 2345 poolSubmit(true, task); 2346 } 2347 2348 // AbstractExecutorService methods 2349 2350 /** 2351 * @throws NullPointerException if the task is null 2352 * @throws RejectedExecutionException if the task cannot be 2353 * scheduled for execution 2354 */ 2355 @Override 2356 @SuppressWarnings("unchecked") 2357 public void execute(Runnable task) { 2358 poolSubmit(true, (task instanceof ForkJoinTask<?>) 2359 ? (ForkJoinTask<Void>) task // avoid re-wrap 2360 : new ForkJoinTask.RunnableExecuteAction(task)); 2361 } 2362 2363 /** 2364 * Submits a ForkJoinTask for execution. 2365 * 2366 * @implSpec 2367 * This method is equivalent to {@link #externalSubmit(ForkJoinTask)} 2368 * when called from a thread that is not in this pool. 2369 * 2370 * @param task the task to submit 2371 * @param <T> the type of the task's result 2372 * @return the task 2373 * @throws NullPointerException if the task is null 2374 * @throws RejectedExecutionException if the task cannot be 2375 * scheduled for execution 2376 */ 2377 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2378 Objects.requireNonNull(task); 2379 poolSubmit(true, task); 2380 return task; 2381 } 2382 2383 /** 2384 * @throws NullPointerException if the task is null 2385 * @throws RejectedExecutionException if the task cannot be 2386 * scheduled for execution 2387 */ 2388 @Override 2389 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2390 ForkJoinTask<T> t = 2391 (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 2392 new ForkJoinTask.AdaptedCallable<T>(task) : 2393 new ForkJoinTask.AdaptedInterruptibleCallable<T>(task); 2394 poolSubmit(true, t); 2395 return t; 2396 } 2397 2398 /** 2399 * @throws NullPointerException if the task is null 2400 * @throws RejectedExecutionException if the task cannot be 2401 * scheduled for execution 2402 */ 2403 @Override 2404 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2405 ForkJoinTask<T> t = 2406 (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 2407 new ForkJoinTask.AdaptedRunnable<T>(task, result) : 2408 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result); 2409 poolSubmit(true, t); 2410 return t; 2411 } 2412 2413 /** 2414 * @throws NullPointerException if the task is null 2415 * @throws RejectedExecutionException if the task cannot be 2416 * scheduled for execution 2417 */ 2418 @Override 2419 @SuppressWarnings("unchecked") 2420 public ForkJoinTask<?> submit(Runnable task) { 2421 ForkJoinTask<?> f = (task instanceof ForkJoinTask<?>) ? 2422 (ForkJoinTask<Void>) task : // avoid re-wrap 2423 ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? 2424 new ForkJoinTask.AdaptedRunnable<Void>(task, null) : 2425 new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)); 2426 poolSubmit(true, f); 2427 return f; 2428 } 2429 2430 /** 2431 * Submits the given task as if submitted from a non-{@code ForkJoinTask} 2432 * client. The task is added to a scheduling queue for submissions to the 2433 * pool even when called from a thread in the pool. 2434 * 2435 * @implSpec 2436 * This method is equivalent to {@link #submit(ForkJoinTask)} when called 2437 * from a thread that is not in this pool. 2438 * 2439 * @return the task 2440 * @param task the task to submit 2441 * @param <T> the type of the task's result 2442 * @throws NullPointerException if the task is null 2443 * @throws RejectedExecutionException if the task cannot be 2444 * scheduled for execution 2445 * @since 20 2446 */ 2447 public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { 2448 Objects.requireNonNull(task); 2449 externalSubmissionQueue().push(task, this, false); 2450 return task; 2451 } 2452 2453 /** 2454 * Submits the given task without guaranteeing that it will 2455 * eventually execute in the absence of available active threads. 2456 * In some contexts, this method may reduce contention and 2457 * overhead by relying on context-specific knowledge that existing 2458 * threads (possibly including the calling thread if operating in 2459 * this pool) will eventually be available to execute the task. 2460 * 2461 * @param task the task 2462 * @param <T> the type of the task's result 2463 * @return the task 2464 * @throws NullPointerException if the task is null 2465 * @throws RejectedExecutionException if the task cannot be 2466 * scheduled for execution 2467 * @since 19 2468 */ 2469 public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) { 2470 Objects.requireNonNull(task); 2471 poolSubmit(false, task); 2472 return task; 2473 } 2474 2475 /** 2476 * Changes the target parallelism of this pool, controlling the 2477 * future creation, use, and termination of worker threads. 2478 * Applications include contexts in which the number of available 2479 * processors changes over time. 2480 * 2481 * @implNote This implementation restricts the maximum number of 2482 * running threads to 32767 2483 * 2484 * @param size the target parallelism level 2485 * @return the previous parallelism level. 2486 * @throws IllegalArgumentException if size is less than 1 or 2487 * greater than the maximum supported by this pool. 2488 * @throws UnsupportedOperationException this is the{@link 2489 * #commonPool()} and parallelism level was set by System 2490 * property {@systemProperty 2491 * java.util.concurrent.ForkJoinPool.common.parallelism}. 2492 * @throws SecurityException if a security manager exists and 2493 * the caller is not permitted to modify threads 2494 * because it does not hold {@link 2495 * java.lang.RuntimePermission}{@code ("modifyThread")} 2496 * @since 19 2497 */ 2498 public int setParallelism(int size) { 2499 if (size < 1 || size > MAX_CAP) 2500 throw new IllegalArgumentException(); 2501 if ((config & PRESET_SIZE) != 0) 2502 throw new UnsupportedOperationException("Cannot override System property"); 2503 checkPermission(); 2504 return getAndSetParallelism(size); 2505 } 2506 2507 /** 2508 * Uninterrupible version of {@code invokeAll}. Executes the given 2509 * tasks, returning a list of Futures holding their status and 2510 * results when all complete, ignoring interrupts. {@link 2511 * Future#isDone} is {@code true} for each element of the returned 2512 * list. Note that a <em>completed</em> task could have 2513 * terminated either normally or by throwing an exception. The 2514 * results of this method are undefined if the given collection is 2515 * modified while this operation is in progress. 2516 * 2517 * @apiNote This method supports usages that previously relied on an 2518 * incompatible override of 2519 * {@link ExecutorService#invokeAll(java.util.Collection)}. 2520 * 2521 * @param tasks the collection of tasks 2522 * @param <T> the type of the values returned from the tasks 2523 * @return a list of Futures representing the tasks, in the same 2524 * sequential order as produced by the iterator for the 2525 * given task list, each of which has completed 2526 * @throws NullPointerException if tasks or any of its elements are {@code null} 2527 * @throws RejectedExecutionException if any task cannot be 2528 * scheduled for execution 2529 * @since 22 2530 */ 2531 public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) { 2532 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 2533 try { 2534 for (Callable<T> t : tasks) { 2535 ForkJoinTask<T> f = ForkJoinTask.adapt(t); 2536 futures.add(f); 2537 poolSubmit(true, f); 2538 } 2539 for (int i = futures.size() - 1; i >= 0; --i) 2540 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2541 return futures; 2542 } catch (Throwable t) { 2543 for (Future<T> e : futures) 2544 e.cancel(true); 2545 throw t; 2546 } 2547 } 2548 2549 /** 2550 * Common support for timed and untimed invokeAll 2551 */ 2552 private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 2553 long deadline) 2554 throws InterruptedException { 2555 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 2556 try { 2557 for (Callable<T> t : tasks) { 2558 ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t); 2559 futures.add(f); 2560 poolSubmit(true, f); 2561 } 2562 for (int i = futures.size() - 1; i >= 0; --i) 2563 ((ForkJoinTask<?>)futures.get(i)) 2564 .quietlyJoinPoolInvokeAllTask(deadline); 2565 return futures; 2566 } catch (Throwable t) { 2567 for (Future<T> e : futures) 2568 e.cancel(true); 2569 throw t; 2570 } 2571 } 2572 2573 @Override 2574 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 2575 throws InterruptedException { 2576 return invokeAll(tasks, 0L); 2577 } 2578 // for jdk version < 22, replace with 2579 // /** 2580 // * @throws NullPointerException {@inheritDoc} 2581 // * @throws RejectedExecutionException {@inheritDoc} 2582 // */ 2583 // @Override 2584 // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2585 // return invokeAllUninterruptibly(tasks); 2586 // } 2587 2588 @Override 2589 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 2590 long timeout, TimeUnit unit) 2591 throws InterruptedException { 2592 return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L); 2593 } 2594 2595 @Override 2596 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 2597 throws InterruptedException, ExecutionException { 2598 try { 2599 return new ForkJoinTask.InvokeAnyRoot<T>() 2600 .invokeAny(tasks, this, false, 0L); 2601 } catch (TimeoutException cannotHappen) { 2602 assert false; 2603 return null; 2604 } 2605 } 2606 2607 @Override 2608 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 2609 long timeout, TimeUnit unit) 2610 throws InterruptedException, ExecutionException, TimeoutException { 2611 return new ForkJoinTask.InvokeAnyRoot<T>() 2612 .invokeAny(tasks, this, true, unit.toNanos(timeout)); 2613 } 2614 2615 /** 2616 * Returns the factory used for constructing new workers. 2617 * 2618 * @return the factory used for constructing new workers 2619 */ 2620 public ForkJoinWorkerThreadFactory getFactory() { 2621 return factory; 2622 } 2623 2624 /** 2625 * Returns the handler for internal worker threads that terminate 2626 * due to unrecoverable errors encountered while executing tasks. 2627 * 2628 * @return the handler, or {@code null} if none 2629 */ 2630 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2631 return ueh; 2632 } 2633 2634 /** 2635 * Returns the targeted parallelism level of this pool. 2636 * 2637 * @return the targeted parallelism level of this pool 2638 */ 2639 public int getParallelism() { 2640 return Math.max(getParallelismOpaque(), 1); 2641 } 2642 2643 /** 2644 * Returns the targeted parallelism level of the common pool. 2645 * 2646 * @return the targeted parallelism level of the common pool 2647 * @since 1.8 2648 */ 2649 public static int getCommonPoolParallelism() { 2650 return common.getParallelism(); 2651 } 2652 2653 /** 2654 * Returns the number of worker threads that have started but not 2655 * yet terminated. The result returned by this method may differ 2656 * from {@link #getParallelism} when threads are created to 2657 * maintain parallelism when others are cooperatively blocked. 2658 * 2659 * @return the number of worker threads 2660 */ 2661 public int getPoolSize() { 2662 return (short)(ctl >>> TC_SHIFT); 2663 } 2664 2665 /** 2666 * Returns {@code true} if this pool uses local first-in-first-out 2667 * scheduling mode for forked tasks that are never joined. 2668 * 2669 * @return {@code true} if this pool uses async mode 2670 */ 2671 public boolean getAsyncMode() { 2672 return (config & FIFO) != 0; 2673 } 2674 2675 /** 2676 * Returns an estimate of the number of worker threads that are 2677 * not blocked waiting to join tasks or for other managed 2678 * synchronization. This method may overestimate the 2679 * number of running threads. 2680 * 2681 * @return the number of worker threads 2682 */ 2683 public int getRunningThreadCount() { 2684 WorkQueue[] qs; WorkQueue q; 2685 int rc = 0; 2686 if ((runState & TERMINATED) == 0 && (qs = queues) != null) { 2687 for (int i = 1; i < qs.length; i += 2) { 2688 if ((q = qs[i]) != null && q.isApparentlyUnblocked()) 2689 ++rc; 2690 } 2691 } 2692 return rc; 2693 } 2694 2695 /** 2696 * Returns an estimate of the number of threads that are currently 2697 * stealing or executing tasks. This method may overestimate the 2698 * number of active threads. 2699 * 2700 * @return the number of active threads 2701 */ 2702 public int getActiveThreadCount() { 2703 return Math.max((short)(ctl >>> RC_SHIFT), 0); 2704 } 2705 2706 /** 2707 * Returns {@code true} if all worker threads are currently idle. 2708 * An idle worker is one that cannot obtain a task to execute 2709 * because none are available to steal from other threads, and 2710 * there are no pending submissions to the pool. This method is 2711 * conservative; it might not return {@code true} immediately upon 2712 * idleness of all threads, but will eventually become true if 2713 * threads remain inactive. 2714 * 2715 * @return {@code true} if all threads are currently idle 2716 */ 2717 public boolean isQuiescent() { 2718 return quiescent() >= 0; 2719 } 2720 2721 /** 2722 * Returns an estimate of the total number of completed tasks that 2723 * were executed by a thread other than their submitter. The 2724 * reported value underestimates the actual total number of steals 2725 * when the pool is not quiescent. This value may be useful for 2726 * monitoring and tuning fork/join programs: in general, steal 2727 * counts should be high enough to keep threads busy, but low 2728 * enough to avoid overhead and contention across threads. 2729 * 2730 * @return the number of steals 2731 */ 2732 public long getStealCount() { 2733 long count = stealCount; 2734 WorkQueue[] qs; WorkQueue q; 2735 if ((qs = queues) != null) { 2736 for (int i = 1; i < qs.length; i += 2) { 2737 if ((q = qs[i]) != null) 2738 count += (long)q.nsteals & 0xffffffffL; 2739 } 2740 } 2741 return count; 2742 } 2743 2744 /** 2745 * Returns an estimate of the total number of tasks currently held 2746 * in queues by worker threads (but not including tasks submitted 2747 * to the pool that have not begun executing). This value is only 2748 * an approximation, obtained by iterating across all threads in 2749 * the pool. This method may be useful for tuning task 2750 * granularities. 2751 * 2752 * @return the number of queued tasks 2753 * @see ForkJoinWorkerThread#getQueuedTaskCount() 2754 */ 2755 public long getQueuedTaskCount() { 2756 WorkQueue[] qs; WorkQueue q; 2757 int count = 0; 2758 if ((runState & TERMINATED) == 0 && (qs = queues) != null) { 2759 for (int i = 1; i < qs.length; i += 2) { 2760 if ((q = qs[i]) != null) 2761 count += q.queueSize(); 2762 } 2763 } 2764 return count; 2765 } 2766 2767 /** 2768 * Returns an estimate of the number of tasks submitted to this 2769 * pool that have not yet begun executing. This method may take 2770 * time proportional to the number of submissions. 2771 * 2772 * @return the number of queued submissions 2773 */ 2774 public int getQueuedSubmissionCount() { 2775 WorkQueue[] qs; WorkQueue q; 2776 int count = 0; 2777 if ((runState & TERMINATED) == 0 && (qs = queues) != null) { 2778 for (int i = 0; i < qs.length; i += 2) { 2779 if ((q = qs[i]) != null) 2780 count += q.queueSize(); 2781 } 2782 } 2783 return count; 2784 } 2785 2786 /** 2787 * Returns {@code true} if there are any tasks submitted to this 2788 * pool that have not yet begun executing. 2789 * 2790 * @return {@code true} if there are any queued submissions 2791 */ 2792 public boolean hasQueuedSubmissions() { 2793 WorkQueue[] qs; WorkQueue q; 2794 if ((runState & STOP) == 0 && (qs = queues) != null) { 2795 for (int i = 0; i < qs.length; i += 2) { 2796 if ((q = qs[i]) != null && q.queueSize() > 0) 2797 return true; 2798 } 2799 } 2800 return false; 2801 } 2802 2803 /** 2804 * Removes and returns the next unexecuted submission if one is 2805 * available. This method may be useful in extensions to this 2806 * class that re-assign work in systems with multiple pools. 2807 * 2808 * @return the next submission, or {@code null} if none 2809 */ 2810 protected ForkJoinTask<?> pollSubmission() { 2811 return pollScan(true); 2812 } 2813 2814 /** 2815 * Removes all available unexecuted submitted and forked tasks 2816 * from scheduling queues and adds them to the given collection, 2817 * without altering their execution status. These may include 2818 * artificially generated or wrapped tasks. This method is 2819 * designed to be invoked only when the pool is known to be 2820 * quiescent. Invocations at other times may not remove all 2821 * tasks. A failure encountered while attempting to add elements 2822 * to collection {@code c} may result in elements being in 2823 * neither, either or both collections when the associated 2824 * exception is thrown. The behavior of this operation is 2825 * undefined if the specified collection is modified while the 2826 * operation is in progress. 2827 * 2828 * @param c the collection to transfer elements into 2829 * @return the number of elements transferred 2830 */ 2831 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2832 int count = 0; 2833 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) { 2834 c.add(t); 2835 ++count; 2836 } 2837 return count; 2838 } 2839 2840 /** 2841 * Returns a string identifying this pool, as well as its state, 2842 * including indications of run state, parallelism level, and 2843 * worker and task counts. 2844 * 2845 * @return a string identifying this pool, as well as its state 2846 */ 2847 public String toString() { 2848 // Use a single pass through queues to collect counts 2849 int e = runState; 2850 long st = stealCount; 2851 long qt = 0L, ss = 0L; int rc = 0; 2852 WorkQueue[] qs; WorkQueue q; 2853 if ((qs = queues) != null) { 2854 for (int i = 0; i < qs.length; ++i) { 2855 if ((q = qs[i]) != null) { 2856 int size = q.queueSize(); 2857 if ((i & 1) == 0) 2858 ss += size; 2859 else { 2860 qt += size; 2861 st += (long)q.nsteals & 0xffffffffL; 2862 if (q.isApparentlyUnblocked()) 2863 ++rc; 2864 } 2865 } 2866 } 2867 } 2868 2869 int pc = parallelism; 2870 long c = ctl; 2871 int tc = (short)(c >>> TC_SHIFT); 2872 int ac = (short)(c >>> RC_SHIFT); 2873 if (ac < 0) // ignore transient negative 2874 ac = 0; 2875 String level = ((e & TERMINATED) != 0 ? "Terminated" : 2876 (e & STOP) != 0 ? "Terminating" : 2877 (e & SHUTDOWN) != 0 ? "Shutting down" : 2878 "Running"); 2879 return super.toString() + 2880 "[" + level + 2881 ", parallelism = " + pc + 2882 ", size = " + tc + 2883 ", active = " + ac + 2884 ", running = " + rc + 2885 ", steals = " + st + 2886 ", tasks = " + qt + 2887 ", submissions = " + ss + 2888 "]"; 2889 } 2890 2891 /** 2892 * Possibly initiates an orderly shutdown in which previously 2893 * submitted tasks are executed, but no new tasks will be 2894 * accepted. Invocation has no effect on execution state if this 2895 * is the {@link #commonPool()}, and no additional effect if 2896 * already shut down. Tasks that are in the process of being 2897 * submitted concurrently during the course of this method may or 2898 * may not be rejected. 2899 * 2900 * @throws SecurityException if a security manager exists and 2901 * the caller is not permitted to modify threads 2902 * because it does not hold {@link 2903 * java.lang.RuntimePermission}{@code ("modifyThread")} 2904 */ 2905 public void shutdown() { 2906 checkPermission(); 2907 if (workerNamePrefix != null) // not common pool 2908 tryTerminate(false, true); 2909 } 2910 2911 /** 2912 * Possibly attempts to cancel and/or stop all tasks, and reject 2913 * all subsequently submitted tasks. Invocation has no effect on 2914 * execution state if this is the {@link #commonPool()}, and no 2915 * additional effect if already shut down. Otherwise, tasks that 2916 * are in the process of being submitted or executed concurrently 2917 * during the course of this method may or may not be 2918 * rejected. This method cancels both existing and unexecuted 2919 * tasks, in order to permit termination in the presence of task 2920 * dependencies. So the method always returns an empty list 2921 * (unlike the case for some other Executors). 2922 * 2923 * @return an empty list 2924 * @throws SecurityException if a security manager exists and 2925 * the caller is not permitted to modify threads 2926 * because it does not hold {@link 2927 * java.lang.RuntimePermission}{@code ("modifyThread")} 2928 */ 2929 public List<Runnable> shutdownNow() { 2930 checkPermission(); 2931 if (workerNamePrefix != null) // not common pool 2932 tryTerminate(true, true); 2933 return Collections.emptyList(); 2934 } 2935 2936 /** 2937 * Returns {@code true} if all tasks have completed following shut down. 2938 * 2939 * @return {@code true} if all tasks have completed following shut down 2940 */ 2941 public boolean isTerminated() { 2942 return (tryTerminate(false, false) & TERMINATED) != 0; 2943 } 2944 2945 /** 2946 * Returns {@code true} if the process of termination has 2947 * commenced but not yet completed. This method may be useful for 2948 * debugging. A return of {@code true} reported a sufficient 2949 * period after shutdown may indicate that submitted tasks have 2950 * ignored or suppressed interruption, or are waiting for I/O, 2951 * causing this executor not to properly terminate. (See the 2952 * advisory notes for class {@link ForkJoinTask} stating that 2953 * tasks should not normally entail blocking operations. But if 2954 * they do, they must abort them on interrupt.) 2955 * 2956 * @return {@code true} if terminating but not yet terminated 2957 */ 2958 public boolean isTerminating() { 2959 return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP; 2960 } 2961 2962 /** 2963 * Returns {@code true} if this pool has been shut down. 2964 * 2965 * @return {@code true} if this pool has been shut down 2966 */ 2967 public boolean isShutdown() { 2968 return (runState & SHUTDOWN) != 0; 2969 } 2970 2971 /** 2972 * Blocks until all tasks have completed execution after a 2973 * shutdown request, or the timeout occurs, or the current thread 2974 * is interrupted, whichever happens first. Because the {@link 2975 * #commonPool()} never terminates until program shutdown, when 2976 * applied to the common pool, this method is equivalent to {@link 2977 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2978 * 2979 * @param timeout the maximum time to wait 2980 * @param unit the time unit of the timeout argument 2981 * @return {@code true} if this executor terminated and 2982 * {@code false} if the timeout elapsed before termination 2983 * @throws InterruptedException if interrupted while waiting 2984 */ 2985 public boolean awaitTermination(long timeout, TimeUnit unit) 2986 throws InterruptedException { 2987 long nanos = unit.toNanos(timeout); 2988 CountDownLatch done; 2989 if (workerNamePrefix == null) { // is common pool 2990 if (helpQuiescePool(this, nanos, true) < 0) 2991 throw new InterruptedException(); 2992 return false; 2993 } 2994 else if ((tryTerminate(false, false) & TERMINATED) != 0 || 2995 (done = terminationSignal()) == null || 2996 (runState & TERMINATED) != 0) 2997 return true; 2998 else 2999 return done.await(nanos, TimeUnit.NANOSECONDS); 3000 } 3001 3002 /** 3003 * If called by a ForkJoinTask operating in this pool, equivalent 3004 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 3005 * waits and/or attempts to assist performing tasks until this 3006 * pool {@link #isQuiescent} or the indicated timeout elapses. 3007 * 3008 * @param timeout the maximum time to wait 3009 * @param unit the time unit of the timeout argument 3010 * @return {@code true} if quiescent; {@code false} if the 3011 * timeout elapsed. 3012 */ 3013 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 3014 return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0); 3015 } 3016 3017 /** 3018 * Unless this is the {@link #commonPool()}, initiates an orderly 3019 * shutdown in which previously submitted tasks are executed, but 3020 * no new tasks will be accepted, and waits until all tasks have 3021 * completed execution and the executor has terminated. 3022 * 3023 * <p> If already terminated, or this is the {@link 3024 * #commonPool()}, this method has no effect on execution, and 3025 * does not wait. Otherwise, if interrupted while waiting, this 3026 * method stops all executing tasks as if by invoking {@link 3027 * #shutdownNow()}. It then continues to wait until all actively 3028 * executing tasks have completed. Tasks that were awaiting 3029 * execution are not executed. The interrupt status will be 3030 * re-asserted before this method returns. 3031 * 3032 * @throws SecurityException if a security manager exists and 3033 * shutting down this ExecutorService may manipulate 3034 * threads that the caller is not permitted to modify 3035 * because it does not hold {@link 3036 * java.lang.RuntimePermission}{@code ("modifyThread")}, 3037 * or the security manager's {@code checkAccess} method 3038 * denies access. 3039 * @since 19 3040 */ 3041 @Override 3042 public void close() { 3043 if (workerNamePrefix != null) { 3044 checkPermission(); 3045 CountDownLatch done = null; 3046 boolean interrupted = false; 3047 while ((tryTerminate(interrupted, true) & TERMINATED) == 0) { 3048 if (done == null) 3049 done = terminationSignal(); 3050 else { 3051 try { 3052 done.await(); 3053 break; 3054 } catch (InterruptedException ex) { 3055 interrupted = true; 3056 } 3057 } 3058 } 3059 if (interrupted) 3060 Thread.currentThread().interrupt(); 3061 } 3062 } 3063 3064 /** 3065 * Interface for extending managed parallelism for tasks running 3066 * in {@link ForkJoinPool}s. 3067 * 3068 * <p>A {@code ManagedBlocker} provides two methods. Method 3069 * {@link #isReleasable} must return {@code true} if blocking is 3070 * not necessary. Method {@link #block} blocks the current thread 3071 * if necessary (perhaps internally invoking {@code isReleasable} 3072 * before actually blocking). These actions are performed by any 3073 * thread invoking {@link 3074 * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual 3075 * methods in this API accommodate synchronizers that may, but 3076 * don't usually, block for long periods. Similarly, they allow 3077 * more efficient internal handling of cases in which additional 3078 * workers may be, but usually are not, needed to ensure 3079 * sufficient parallelism. Toward this end, implementations of 3080 * method {@code isReleasable} must be amenable to repeated 3081 * invocation. Neither method is invoked after a prior invocation 3082 * of {@code isReleasable} or {@code block} returns {@code true}. 3083 * 3084 * <p>For example, here is a ManagedBlocker based on a 3085 * ReentrantLock: 3086 * <pre> {@code 3087 * class ManagedLocker implements ManagedBlocker { 3088 * final ReentrantLock lock; 3089 * boolean hasLock = false; 3090 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3091 * public boolean block() { 3092 * if (!hasLock) 3093 * lock.lock(); 3094 * return true; 3095 * } 3096 * public boolean isReleasable() { 3097 * return hasLock || (hasLock = lock.tryLock()); 3098 * } 3099 * }}</pre> 3100 * 3101 * <p>Here is a class that possibly blocks waiting for an 3102 * item on a given queue: 3103 * <pre> {@code 3104 * class QueueTaker<E> implements ManagedBlocker { 3105 * final BlockingQueue<E> queue; 3106 * volatile E item = null; 3107 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3108 * public boolean block() throws InterruptedException { 3109 * if (item == null) 3110 * item = queue.take(); 3111 * return true; 3112 * } 3113 * public boolean isReleasable() { 3114 * return item != null || (item = queue.poll()) != null; 3115 * } 3116 * public E getItem() { // call after pool.managedBlock completes 3117 * return item; 3118 * } 3119 * }}</pre> 3120 */ 3121 public static interface ManagedBlocker { 3122 /** 3123 * Possibly blocks the current thread, for example waiting for 3124 * a lock or condition. 3125 * 3126 * @return {@code true} if no additional blocking is necessary 3127 * (i.e., if isReleasable would return true) 3128 * @throws InterruptedException if interrupted while waiting 3129 * (the method is not required to do so, but is allowed to) 3130 */ 3131 boolean block() throws InterruptedException; 3132 3133 /** 3134 * Returns {@code true} if blocking is unnecessary. 3135 * @return {@code true} if blocking is unnecessary 3136 */ 3137 boolean isReleasable(); 3138 } 3139 3140 /** 3141 * Runs the given possibly blocking task. When {@linkplain 3142 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 3143 * method possibly arranges for a spare thread to be activated if 3144 * necessary to ensure sufficient parallelism while the current 3145 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 3146 * 3147 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 3148 * {@code blocker.block()} until either method returns {@code true}. 3149 * Every call to {@code blocker.block()} is preceded by a call to 3150 * {@code blocker.isReleasable()} that returned {@code false}. 3151 * 3152 * <p>If not running in a ForkJoinPool, this method is 3153 * behaviorally equivalent to 3154 * <pre> {@code 3155 * while (!blocker.isReleasable()) 3156 * if (blocker.block()) 3157 * break;}</pre> 3158 * 3159 * If running in a ForkJoinPool, the pool may first be expanded to 3160 * ensure sufficient parallelism available during the call to 3161 * {@code blocker.block()}. 3162 * 3163 * @param blocker the blocker task 3164 * @throws InterruptedException if {@code blocker.block()} did so 3165 */ 3166 public static void managedBlock(ManagedBlocker blocker) 3167 throws InterruptedException { 3168 Thread t; ForkJoinPool p; 3169 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && 3170 (p = ((ForkJoinWorkerThread)t).pool) != null) 3171 p.compensatedBlock(blocker); 3172 else 3173 unmanagedBlock(blocker); 3174 } 3175 3176 /** ManagedBlock for ForkJoinWorkerThreads */ 3177 private void compensatedBlock(ManagedBlocker blocker) 3178 throws InterruptedException { 3179 Objects.requireNonNull(blocker); 3180 for (;;) { 3181 int comp; boolean done; 3182 long c = ctl; 3183 if (blocker.isReleasable()) 3184 break; 3185 if ((runState & STOP) != 0) 3186 throw new InterruptedException(); 3187 if ((comp = tryCompensate(c)) >= 0) { 3188 try { 3189 done = blocker.block(); 3190 } finally { 3191 if (comp > 0) 3192 getAndAddCtl(RC_UNIT); 3193 } 3194 if (done) 3195 break; 3196 } 3197 } 3198 } 3199 3200 /** 3201 * Invokes tryCompensate to create or re-activate a spare thread to 3202 * compensate for a thread that performs a blocking operation. When the 3203 * blocking operation is done then endCompensatedBlock must be invoked 3204 * with the value returned by this method to re-adjust the parallelism. 3205 * @return value to use in endCompensatedBlock 3206 */ 3207 final long beginCompensatedBlock() { 3208 int c; 3209 do {} while ((c = tryCompensate(ctl)) < 0); 3210 return (c == 0) ? 0L : RC_UNIT; 3211 } 3212 3213 /** 3214 * Re-adjusts parallelism after a blocking operation completes. 3215 * @param post value from beginCompensatedBlock 3216 */ 3217 void endCompensatedBlock(long post) { 3218 if (post > 0L) { 3219 getAndAddCtl(post); 3220 } 3221 } 3222 3223 /** ManagedBlock for external threads */ 3224 private static void unmanagedBlock(ManagedBlocker blocker) 3225 throws InterruptedException { 3226 Objects.requireNonNull(blocker); 3227 do {} while (!blocker.isReleasable() && !blocker.block()); 3228 } 3229 3230 @Override 3231 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3232 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3233 new ForkJoinTask.AdaptedRunnable<T>(runnable, value) : 3234 new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value); 3235 } 3236 3237 @Override 3238 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3239 return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 3240 new ForkJoinTask.AdaptedCallable<T>(callable) : 3241 new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable); 3242 } 3243 3244 static { 3245 U = Unsafe.getUnsafe(); 3246 Class<ForkJoinPool> klass = ForkJoinPool.class; 3247 try { 3248 Field poolIdsField = klass.getDeclaredField("poolIds"); 3249 POOLIDS_BASE = U.staticFieldBase(poolIdsField); 3250 POOLIDS = U.staticFieldOffset(poolIdsField); 3251 } catch (NoSuchFieldException e) { 3252 throw new ExceptionInInitializerError(e); 3253 } 3254 CTL = U.objectFieldOffset(klass, "ctl"); 3255 RUNSTATE = U.objectFieldOffset(klass, "runState"); 3256 PARALLELISM = U.objectFieldOffset(klass, "parallelism"); 3257 THREADIDS = U.objectFieldOffset(klass, "threadIds"); 3258 TERMINATION = U.objectFieldOffset(klass, "termination"); 3259 Class<ForkJoinTask[]> aklass = ForkJoinTask[].class; 3260 ABASE = U.arrayBaseOffset(aklass); 3261 int scale = U.arrayIndexScale(aklass); 3262 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 3263 if ((scale & (scale - 1)) != 0) 3264 throw new Error("array index scale not a power of two"); 3265 3266 defaultForkJoinWorkerThreadFactory = 3267 new DefaultForkJoinWorkerThreadFactory(); 3268 @SuppressWarnings("removal") 3269 ForkJoinPool p = common = (System.getSecurityManager() == null) ? 3270 new ForkJoinPool((byte)0) : 3271 AccessController.doPrivileged(new PrivilegedAction<>() { 3272 public ForkJoinPool run() { 3273 return new ForkJoinPool((byte)0); }}); 3274 // allow access to non-public methods 3275 SharedSecrets.setJavaUtilConcurrentFJPAccess( 3276 new JavaUtilConcurrentFJPAccess() { 3277 @Override 3278 public long beginCompensatedBlock(ForkJoinPool pool) { 3279 return pool.beginCompensatedBlock(); 3280 } 3281 public void endCompensatedBlock(ForkJoinPool pool, long post) { 3282 pool.endCompensatedBlock(post); 3283 } 3284 }); 3285 Class<?> dep = LockSupport.class; // ensure loaded 3286 } 3287 }