< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page




  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.locks.LockSupport;

  53 
  54 /**
  55  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  56  * A {@code ForkJoinPool} provides the entry point for submissions
  57  * from non-{@code ForkJoinTask} clients, as well as management and
  58  * monitoring operations.
  59  *
  60  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  61  * ExecutorService} mainly by virtue of employing
  62  * <em>work-stealing</em>: all threads in the pool attempt to find and
  63  * execute tasks submitted to the pool and/or created by other active
  64  * tasks (eventually blocking waiting for work if none exist). This
  65  * enables efficient processing when most tasks spawn other subtasks
  66  * (as do most {@code ForkJoinTask}s), as well as when many small
  67  * tasks are submitted to the pool from external clients.  Especially
  68  * when setting <em>asyncMode</em> to true in constructors, {@code
  69  * ForkJoinPool}s may also be appropriate for use with event-style
  70  * tasks that are never joined. All worker threads are initialized
  71  * with {@link Thread#isDaemon} set {@code true}.
  72  *


1905                     }
1906                 }
1907             }
1908             else if (!q.tryLockPhase()) // move if busy
1909                 r = ThreadLocalRandom.advanceProbe(r);
1910             else {
1911                 if (q.lockedPush(task))
1912                     signalWork(null);
1913                 return;
1914             }
1915         }
1916     }
1917 
1918     /**
1919      * Pushes a possibly-external submission.
1920      */
1921     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1922         Thread t; ForkJoinWorkerThread w; WorkQueue q;
1923         if (task == null)
1924             throw new NullPointerException();
1925         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1926             (w = (ForkJoinWorkerThread)t).pool == this &&
1927             (q = w.workQueue) != null)
1928             q.push(task);
1929         else
1930             externalPush(task);
1931         return task;
1932     }
1933 
1934     /**
1935      * Returns common pool queue for an external thread.
1936      */
1937     static WorkQueue commonSubmitterQueue() {
1938         ForkJoinPool p = common;
1939         int r = ThreadLocalRandom.getProbe();
1940         WorkQueue[] ws; int n;
1941         return (p != null && (ws = p.workQueues) != null &&
1942                 (n = ws.length) > 0) ?
1943             ws[(n - 1) & r & SQMASK] : null;
1944     }
1945 


3098      * <p>If not running in a ForkJoinPool, this method is
3099      * behaviorally equivalent to
3100      * <pre> {@code
3101      * while (!blocker.isReleasable())
3102      *   if (blocker.block())
3103      *     break;}</pre>
3104      *
3105      * If running in a ForkJoinPool, the pool may first be expanded to
3106      * ensure sufficient parallelism available during the call to
3107      * {@code blocker.block()}.
3108      *
3109      * @param blocker the blocker task
3110      * @throws InterruptedException if {@code blocker.block()} did so
3111      */
3112     public static void managedBlock(ManagedBlocker blocker)
3113         throws InterruptedException {
3114         if (blocker == null) throw new NullPointerException();
3115         ForkJoinPool p;
3116         ForkJoinWorkerThread wt;
3117         WorkQueue w;
3118         Thread t = Thread.currentThread();
3119         if ((t instanceof ForkJoinWorkerThread) &&
3120             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3121             (w = wt.workQueue) != null) {
3122             int block;
3123             while (!blocker.isReleasable()) {
3124                 if ((block = p.tryCompensate(w)) != 0) {
3125                     try {
3126                         do {} while (!blocker.isReleasable() &&
3127                                      !blocker.block());
3128                     } finally {
3129                         CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3130                     }
3131                     break;
3132                 }
3133             }
3134         }
3135         else {
3136             do {} while (!blocker.isReleasable() &&
3137                          !blocker.block());
3138         }




  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.locks.LockSupport;
  53 import jdk.internal.misc.Strands;
  54 
  55 /**
  56  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  57  * A {@code ForkJoinPool} provides the entry point for submissions
  58  * from non-{@code ForkJoinTask} clients, as well as management and
  59  * monitoring operations.
  60  *
  61  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  62  * ExecutorService} mainly by virtue of employing
  63  * <em>work-stealing</em>: all threads in the pool attempt to find and
  64  * execute tasks submitted to the pool and/or created by other active
  65  * tasks (eventually blocking waiting for work if none exist). This
  66  * enables efficient processing when most tasks spawn other subtasks
  67  * (as do most {@code ForkJoinTask}s), as well as when many small
  68  * tasks are submitted to the pool from external clients.  Especially
  69  * when setting <em>asyncMode</em> to true in constructors, {@code
  70  * ForkJoinPool}s may also be appropriate for use with event-style
  71  * tasks that are never joined. All worker threads are initialized
  72  * with {@link Thread#isDaemon} set {@code true}.
  73  *


1906                     }
1907                 }
1908             }
1909             else if (!q.tryLockPhase()) // move if busy
1910                 r = ThreadLocalRandom.advanceProbe(r);
1911             else {
1912                 if (q.lockedPush(task))
1913                     signalWork(null);
1914                 return;
1915             }
1916         }
1917     }
1918 
1919     /**
1920      * Pushes a possibly-external submission.
1921      */
1922     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1923         Thread t; ForkJoinWorkerThread w; WorkQueue q;
1924         if (task == null)
1925             throw new NullPointerException();
1926         if (((t = Strands.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
1927             (w = (ForkJoinWorkerThread)t).pool == this &&
1928             (q = w.workQueue) != null)
1929             q.push(task);
1930         else
1931             externalPush(task);
1932         return task;
1933     }
1934 
1935     /**
1936      * Returns common pool queue for an external thread.
1937      */
1938     static WorkQueue commonSubmitterQueue() {
1939         ForkJoinPool p = common;
1940         int r = ThreadLocalRandom.getProbe();
1941         WorkQueue[] ws; int n;
1942         return (p != null && (ws = p.workQueues) != null &&
1943                 (n = ws.length) > 0) ?
1944             ws[(n - 1) & r & SQMASK] : null;
1945     }
1946 


3099      * <p>If not running in a ForkJoinPool, this method is
3100      * behaviorally equivalent to
3101      * <pre> {@code
3102      * while (!blocker.isReleasable())
3103      *   if (blocker.block())
3104      *     break;}</pre>
3105      *
3106      * If running in a ForkJoinPool, the pool may first be expanded to
3107      * ensure sufficient parallelism available during the call to
3108      * {@code blocker.block()}.
3109      *
3110      * @param blocker the blocker task
3111      * @throws InterruptedException if {@code blocker.block()} did so
3112      */
3113     public static void managedBlock(ManagedBlocker blocker)
3114         throws InterruptedException {
3115         if (blocker == null) throw new NullPointerException();
3116         ForkJoinPool p;
3117         ForkJoinWorkerThread wt;
3118         WorkQueue w;
3119         Thread t = Strands.currentCarrierThread();
3120         if ((t instanceof ForkJoinWorkerThread) &&
3121             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3122             (w = wt.workQueue) != null) {
3123             int block;
3124             while (!blocker.isReleasable()) {
3125                 if ((block = p.tryCompensate(w)) != 0) {
3126                     try {
3127                         do {} while (!blocker.isReleasable() &&
3128                                      !blocker.block());
3129                     } finally {
3130                         CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3131                     }
3132                     break;
3133                 }
3134             }
3135         }
3136         else {
3137             do {} while (!blocker.isReleasable() &&
3138                          !blocker.block());
3139         }


< prev index next >