< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
@@ -51,10 +51,11 @@
  import java.util.function.Predicate;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.locks.LockSupport;
  import java.util.concurrent.locks.ReentrantLock;
  import java.util.concurrent.locks.Condition;
+ import jdk.internal.vm.SharedThreadContainer;
  
  /**
   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
   * A {@code ForkJoinPool} provides the entry point for submissions
   * from non-{@code ForkJoinTask} clients, as well as management and

@@ -964,24 +965,32 @@
          /**
           * Pushes a task. Call only by owner in unshared queues.
           *
           * @param task the task. Caller must ensure non-null.
           * @param pool (no-op if null)
+          * @param signalOnEmpty signal a worker if queue was empty
           * @throws RejectedExecutionException if array cannot be resized
           */
-         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
+         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean signalOnEmpty) {
              ForkJoinTask<?>[] a = array;
              int s = top++, d = s - base, cap, m; // skip insert if disabled
              if (a != null && pool != null && (cap = a.length) > 0) {
                  setSlotVolatile(a, (m = cap - 1) & s, task);
-                 if (d == m)
+                 if (d == m) {
                      growArray();
-                 if (d == m || a[m & (s - 1)] == null)
-                     pool.signalWork(); // signal if was empty or resized
+                     pool.signalWork();  // signal if resized
+                 } else {
+                     if (signalOnEmpty && a[m & (s - 1)] == null)
+                         pool.signalWork(); // signal if was empty
+                 }
              }
          }
  
+         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
+             push(task, pool, true);
+         }
+ 
          /**
           * Pushes task to a shared queue with lock already held, and unlocks.
           *
           * @return true if caller should signal work
           */

@@ -1311,31 +1320,10 @@
       * Permission required for callers of methods that may start or
       * kill threads.
       */
      static final RuntimePermission modifyThreadPermission;
  
-     /**
-      * Common (static) pool. Non-null for public use unless a static
-      * construction exception, but internal usages null-check on use
-      * to paranoically avoid potential initialization circularities
-      * as well as to simplify generated code.
-      */
-     static final ForkJoinPool common;
- 
-     /**
-      * Common pool parallelism. To allow simpler use and management
-      * when common pool threads are disabled, we allow the underlying
-      * common.parallelism field to be zero, but in that case still report
-      * parallelism as 1 to reflect resulting caller-runs mechanics.
-      */
-     static final int COMMON_PARALLELISM;
- 
-     /**
-      * Limit on spare thread construction in tryCompensate.
-      */
-     private static final int COMMON_MAX_SPARES;
- 
      /**
       * Sequence number for creating worker names
       */
      private static volatile int poolIds;
  

@@ -1350,20 +1338,10 @@
      /**
       * Undershoot tolerance for idle timeouts
       */
      private static final long TIMEOUT_SLOP = 20L;
  
-     /**
-      * The default value for COMMON_MAX_SPARES.  Overridable using the
-      * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
-      * property.  The default value is far in excess of normal
-      * requirements, but also far short of MAX_CAP and typical OS
-      * thread limits, so allows JVMs to catch misuse/abuse before
-      * running out of resources needed to do so.
-      */
-     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
- 
      /*
       * Bits and masks for field ctl, packed with 4 16 bit subfields:
       * RC: Number of released (unqueued) workers minus target parallelism
       * TC: Number of total workers minus target parallelism
       * SS: version count and status of top waiting thread

@@ -1417,10 +1395,11 @@
      Condition termination;               // lazily constructed
      final String workerNamePrefix;       // null for common pool
      final ForkJoinWorkerThreadFactory factory;
      final UncaughtExceptionHandler ueh;  // per-worker UEH
      final Predicate<? super ForkJoinPool> saturate;
+     final SharedThreadContainer container;
  
      @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
      volatile long ctl;                   // main pool control
  
      // Support for atomic operations

@@ -1460,11 +1439,11 @@
          ForkJoinWorkerThreadFactory fac = factory;
          Throwable ex = null;
          ForkJoinWorkerThread wt = null;
          try {
              if (fac != null && (wt = fac.newThread(this)) != null) {
-                 wt.start();
+                 container.start(wt);
                  return true;
              }
          } catch (Throwable rex) {
              ex = rex;
          }

@@ -1678,11 +1657,11 @@
              w.stackPred = (int)prevCtl;
              c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK);
          } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c)));
  
          Thread.interrupted();                // clear status
-         LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
+         //LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
          long deadline = 0L;                  // nonzero if possibly quiescent
          int ac = (int)(c >> RC_SHIFT), md;
          if ((md = mode) < 0)                 // pool is terminating
              return -1;
          else if ((md & SMASK) + ac <= 0) {

@@ -2195,19 +2174,33 @@
          else
              externalPush(task);
          return task;
      }
  
+     /**
+      * Pushes a possibly-external submission without signalling when the queue
+      * is empty. This method is invoked (reflectively) by the virtual thread
+      * implementation.
+      */
+     private void externalLazySubmit(ForkJoinTask<?> task) {
+         Thread t = Thread.currentThread();
+         if (t instanceof ForkJoinWorkerThread wt && wt.pool == this) {
+             wt.workQueue.push(task, this, false);
+         } else {
+             externalPush(task);
+         }
+     }
+ 
      /**
       * Returns common pool queue for an external thread that has
       * possibly ever submitted a common pool task (nonzero probe), or
       * null if none.
       */
      static WorkQueue commonQueue() {
          ForkJoinPool p; WorkQueue[] qs;
          int r = ThreadLocalRandom.getProbe(), n;
-         return ((p = common) != null && (qs = p.queues) != null &&
+         return ((p = CommonPool.common) != null && (qs = p.queues) != null &&
                  (n = qs.length) > 0 && r != 0) ?
              qs[(n - 1) & (r << 1)] : null;
      }
  
      /**

@@ -2344,10 +2337,11 @@
                  (lock = registrationLock) != null) {
                  lock.lock();
                  if ((cond = termination) != null)
                      cond.signalAll();
                  lock.unlock();
+                 container.close();
              }
              if (changed)
                  rescan = true;
              else if (rescan)
                  rescan = false;

@@ -2542,10 +2536,13 @@
                      (((long)(-p)     << RC_SHIFT) & RC_MASK));
          this.registrationLock = new ReentrantLock();
          this.queues = new WorkQueue[size];
          String pid = Integer.toString(getAndAddPoolIds(1) + 1);
          this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
+ 
+         String name = "ForkJoinPool-" + pid;
+         this.container = SharedThreadContainer.create(name);
      }
  
      // helper method for commonPool constructor
      private static Object newInstanceFromSystemProperty(String property)
          throws ReflectiveOperationException {

@@ -2581,11 +2578,11 @@
          this.workerNamePrefix = null;
          int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
          this.mode = p;
          if (p > 0) {
              size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
-             this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
+             this.bounds = ((1 - p) & SMASK) | (CommonPool.COMMON_MAX_SPARES << SWIDTH);
              this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
                          (((long)(-p) << RC_SHIFT) & RC_MASK));
          } else {  // zero min, max, spare counts, 1 slot
              size = 1;
              this.bounds = 0;

@@ -2593,10 +2590,13 @@
          }
          this.factory = (fac != null) ? fac :
              new DefaultCommonPoolForkJoinWorkerThreadFactory();
          this.queues = new WorkQueue[size];
          this.registrationLock = new ReentrantLock();
+ 
+         String name = "ForkJoinPool.commonPool";
+         this.container = SharedThreadContainer.create(name);
      }
  
      /**
       * Returns the common pool instance. This pool is statically
       * constructed; its run state is unaffected by attempts to {@link

@@ -2610,11 +2610,11 @@
       * @return the common pool instance
       * @since 1.8
       */
      public static ForkJoinPool commonPool() {
          // assert common != null : "static init error";
-         return common;
+         return CommonPool.common;
      }
  
      // Execution methods
  
      /**

@@ -2930,11 +2930,11 @@
       *
       * @return the targeted parallelism level of the common pool
       * @since 1.8
       */
      public static int getCommonPoolParallelism() {
-         return COMMON_PARALLELISM;
+         return CommonPool.COMMON_PARALLELISM;
      }
  
      /**
       * Returns the number of worker threads that have started but not
       * yet terminated.  The result returned by this method may differ

@@ -3191,11 +3191,11 @@
       *         because it does not hold {@link
       *         java.lang.RuntimePermission}{@code ("modifyThread")}
       */
      public void shutdown() {
          checkPermission();
-         if (this != common)
+         if (this != CommonPool.common)
              tryTerminate(false, true);
      }
  
      /**
       * Possibly attempts to cancel and/or stop all tasks, and reject

@@ -3215,11 +3215,11 @@
       *         because it does not hold {@link
       *         java.lang.RuntimePermission}{@code ("modifyThread")}
       */
      public List<Runnable> shutdownNow() {
          checkPermission();
-         if (this != common)
+         if (this != CommonPool.common)
              tryTerminate(true, true);
          return Collections.emptyList();
      }
  
      /**

@@ -3274,11 +3274,11 @@
      public boolean awaitTermination(long timeout, TimeUnit unit)
          throws InterruptedException {
          ReentrantLock lock; Condition cond;
          long nanos = unit.toNanos(timeout);
          boolean terminated = false;
-         if (this == common) {
+         if (this == CommonPool.common) {
              Thread t; ForkJoinWorkerThread wt; int q;
              if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
                  (wt = (ForkJoinWorkerThread)t).pool == this)
                  q = helpQuiescePool(wt.workQueue, nanos, true);
              else

@@ -3491,26 +3491,61 @@
  
          // Reduce the risk of rare disastrous classloading in first call to
          // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
          Class<?> ensureLoaded = LockSupport.class;
  
-         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
-         try {
-             String p = System.getProperty
-                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
-             if (p != null)
-                 commonMaxSpares = Integer.parseInt(p);
-         } catch (Exception ignore) {}
-         COMMON_MAX_SPARES = commonMaxSpares;
- 
-         defaultForkJoinWorkerThreadFactory =
-             new DefaultForkJoinWorkerThreadFactory();
+         defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();
          modifyThreadPermission = new RuntimePermission("modifyThread");
-         @SuppressWarnings("removal")
-         ForkJoinPool tmp = AccessController.doPrivileged(new PrivilegedAction<>() {
-             public ForkJoinPool run() {
-                 return new ForkJoinPool((byte)0); }});
-         common = tmp;
+     }
+ 
+     static class CommonPool {
+         /**
+          * Common (static) pool. Non-null for public use unless a static
+          * construction exception, but internal usages null-check on use
+          * to paranoically avoid potential initialization circularities
+          * as well as to simplify generated code.
+          */
+         static final ForkJoinPool common;
+ 
+         /**
+          * Common pool parallelism. To allow simpler use and management
+          * when common pool threads are disabled, we allow the underlying
+          * common.parallelism field to be zero, but in that case still report
+          * parallelism as 1 to reflect resulting caller-runs mechanics.
+          */
+         static final int COMMON_PARALLELISM;
  
-         COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
+         /**
+          * Limit on spare thread construction in tryCompensate.
+          */
+         private static final int COMMON_MAX_SPARES;
+ 
+         /**
+          * The default value for COMMON_MAX_SPARES.  Overridable using the
+          * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
+          * property.  The default value is far in excess of normal
+          * requirements, but also far short of MAX_CAP and typical OS
+          * thread limits, so allows JVMs to catch misuse/abuse before
+          * running out of resources needed to do so.
+          */
+         private static final int DEFAULT_COMMON_MAX_SPARES = 256;
+ 
+         static {
+             int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
+             try {
+                 String p = System.getProperty
+                     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
+                 if (p != null)
+                     commonMaxSpares = Integer.parseInt(p);
+             } catch (Exception ignore) {}
+             COMMON_MAX_SPARES = commonMaxSpares;
+ 
+             @SuppressWarnings("removal")
+             ForkJoinPool tmp = AccessController.doPrivileged(new PrivilegedAction<>() {
+                 public ForkJoinPool run() {
+                     return new ForkJoinPool((byte)0); }});
+             common = tmp;
+ 
+             COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
+         }
      }
  }
< prev index next >