< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
*** 51,10 ***
--- 51,12 ---
  import java.util.function.Predicate;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.locks.LockSupport;
  import java.util.concurrent.locks.ReentrantLock;
  import java.util.concurrent.locks.Condition;
+ import jdk.internal.misc.VirtualThreads;
+ import jdk.internal.vm.SharedThreadContainer;
  
  /**
   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
   * A {@code ForkJoinPool} provides the entry point for submissions
   * from non-{@code ForkJoinTask} clients, as well as management and

*** 964,24 ***
          /**
           * Pushes a task. Call only by owner in unshared queues.
           *
           * @param task the task. Caller must ensure non-null.
           * @param pool (no-op if null)
           * @throws RejectedExecutionException if array cannot be resized
           */
!         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
              ForkJoinTask<?>[] a = array;
              int s = top++, d = s - base, cap, m; // skip insert if disabled
              if (a != null && pool != null && (cap = a.length) > 0) {
                  setSlotVolatile(a, (m = cap - 1) & s, task);
!                 if (d == m)
                      growArray();
!                 if (d == m || a[m & (s - 1)] == null)
!                     pool.signalWork(); // signal if was empty or resized
              }
          }
  
          /**
           * Pushes task to a shared queue with lock already held, and unlocks.
           *
           * @return true if caller should signal work
           */
--- 966,32 ---
          /**
           * Pushes a task. Call only by owner in unshared queues.
           *
           * @param task the task. Caller must ensure non-null.
           * @param pool (no-op if null)
+          * @param signalOnEmpty signal a worker if queue was empty
           * @throws RejectedExecutionException if array cannot be resized
           */
!         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean signalOnEmpty) {
              ForkJoinTask<?>[] a = array;
              int s = top++, d = s - base, cap, m; // skip insert if disabled
              if (a != null && pool != null && (cap = a.length) > 0) {
                  setSlotVolatile(a, (m = cap - 1) & s, task);
!                 if (d == m) {
                      growArray();
!                     pool.signalWork();  // signal if resized
!                 } else {
+                     if (signalOnEmpty && a[m & (s - 1)] == null)
+                         pool.signalWork(); // signal if was empty
+                 }
              }
          }
  
+         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
+             push(task, pool, true);
+         }
+ 
          /**
           * Pushes task to a shared queue with lock already held, and unlocks.
           *
           * @return true if caller should signal work
           */

*** 1311,31 ***
       * Permission required for callers of methods that may start or
       * kill threads.
       */
      static final RuntimePermission modifyThreadPermission;
  
-     /**
-      * Common (static) pool. Non-null for public use unless a static
-      * construction exception, but internal usages null-check on use
-      * to paranoically avoid potential initialization circularities
-      * as well as to simplify generated code.
-      */
-     static final ForkJoinPool common;
- 
-     /**
-      * Common pool parallelism. To allow simpler use and management
-      * when common pool threads are disabled, we allow the underlying
-      * common.parallelism field to be zero, but in that case still report
-      * parallelism as 1 to reflect resulting caller-runs mechanics.
-      */
-     static final int COMMON_PARALLELISM;
- 
-     /**
-      * Limit on spare thread construction in tryCompensate.
-      */
-     private static final int COMMON_MAX_SPARES;
- 
      /**
       * Sequence number for creating worker names
       */
      private static volatile int poolIds;
  
--- 1321,10 ---

*** 1350,20 ***
      /**
       * Undershoot tolerance for idle timeouts
       */
      private static final long TIMEOUT_SLOP = 20L;
  
-     /**
-      * The default value for COMMON_MAX_SPARES.  Overridable using the
-      * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
-      * property.  The default value is far in excess of normal
-      * requirements, but also far short of MAX_CAP and typical OS
-      * thread limits, so allows JVMs to catch misuse/abuse before
-      * running out of resources needed to do so.
-      */
-     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
- 
      /*
       * Bits and masks for field ctl, packed with 4 16 bit subfields:
       * RC: Number of released (unqueued) workers minus target parallelism
       * TC: Number of total workers minus target parallelism
       * SS: version count and status of top waiting thread
--- 1339,10 ---

*** 1417,10 ***
--- 1396,11 ---
      Condition termination;               // lazily constructed
      final String workerNamePrefix;       // null for common pool
      final ForkJoinWorkerThreadFactory factory;
      final UncaughtExceptionHandler ueh;  // per-worker UEH
      final Predicate<? super ForkJoinPool> saturate;
+     final SharedThreadContainer container;
  
      @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
      volatile long ctl;                   // main pool control
  
      // Support for atomic operations

*** 1460,11 ***
          ForkJoinWorkerThreadFactory fac = factory;
          Throwable ex = null;
          ForkJoinWorkerThread wt = null;
          try {
              if (fac != null && (wt = fac.newThread(this)) != null) {
!                 wt.start();
                  return true;
              }
          } catch (Throwable rex) {
              ex = rex;
          }
--- 1440,11 ---
          ForkJoinWorkerThreadFactory fac = factory;
          Throwable ex = null;
          ForkJoinWorkerThread wt = null;
          try {
              if (fac != null && (wt = fac.newThread(this)) != null) {
!                 container.start(wt);
                  return true;
              }
          } catch (Throwable rex) {
              ex = rex;
          }

*** 2195,19 ***
          else
              externalPush(task);
          return task;
      }
  
      /**
       * Returns common pool queue for an external thread that has
       * possibly ever submitted a common pool task (nonzero probe), or
       * null if none.
       */
      static WorkQueue commonQueue() {
          ForkJoinPool p; WorkQueue[] qs;
          int r = ThreadLocalRandom.getProbe(), n;
!         return ((p = common) != null && (qs = p.queues) != null &&
                  (n = qs.length) > 0 && r != 0) ?
              qs[(n - 1) & (r << 1)] : null;
      }
  
      /**
--- 2175,36 ---
          else
              externalPush(task);
          return task;
      }
  
+     /**
+      * Pushes an external submission to the current carrier thread's work queue if
+      * possible. This method is invoked (reflectively) by the virtual thread
+      * implementation.
+      *
+      * @param signalOnEmpty true to signal a worker when the queue is empty
+      */
+     private void externalExecuteTask(Runnable task, boolean signalOnEmpty) {
+         var forkJoinTask = new ForkJoinTask.RunnableExecuteAction(task);
+         Thread t = VirtualThreads.currentCarrierThread();
+         if (t instanceof ForkJoinWorkerThread wt && wt.pool == this) {
+             wt.workQueue.push(forkJoinTask, this, signalOnEmpty);
+         } else {
+             externalPush(forkJoinTask);
+         }
+     }
+ 
      /**
       * Returns common pool queue for an external thread that has
       * possibly ever submitted a common pool task (nonzero probe), or
       * null if none.
       */
      static WorkQueue commonQueue() {
          ForkJoinPool p; WorkQueue[] qs;
          int r = ThreadLocalRandom.getProbe(), n;
!         return ((p = CommonPool.common) != null && (qs = p.queues) != null &&
                  (n = qs.length) > 0 && r != 0) ?
              qs[(n - 1) & (r << 1)] : null;
      }
  
      /**

*** 2344,10 ***
--- 2341,11 ---
                  (lock = registrationLock) != null) {
                  lock.lock();
                  if ((cond = termination) != null)
                      cond.signalAll();
                  lock.unlock();
+                 container.close();
              }
              if (changed)
                  rescan = true;
              else if (rescan)
                  rescan = false;

*** 2542,10 ***
--- 2540,13 ---
                      (((long)(-p)     << RC_SHIFT) & RC_MASK));
          this.registrationLock = new ReentrantLock();
          this.queues = new WorkQueue[size];
          String pid = Integer.toString(getAndAddPoolIds(1) + 1);
          this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
+ 
+         String name = "ForkJoinPool-" + pid;
+         this.container = SharedThreadContainer.create(name);
      }
  
      // helper method for commonPool constructor
      private static Object newInstanceFromSystemProperty(String property)
          throws ReflectiveOperationException {

*** 2581,11 ***
          this.workerNamePrefix = null;
          int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
          this.mode = p;
          if (p > 0) {
              size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
!             this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
              this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
                          (((long)(-p) << RC_SHIFT) & RC_MASK));
          } else {  // zero min, max, spare counts, 1 slot
              size = 1;
              this.bounds = 0;
--- 2582,11 ---
          this.workerNamePrefix = null;
          int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
          this.mode = p;
          if (p > 0) {
              size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
!             this.bounds = ((1 - p) & SMASK) | (CommonPool.COMMON_MAX_SPARES << SWIDTH);
              this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
                          (((long)(-p) << RC_SHIFT) & RC_MASK));
          } else {  // zero min, max, spare counts, 1 slot
              size = 1;
              this.bounds = 0;

*** 2593,10 ***
--- 2594,13 ---
          }
          this.factory = (fac != null) ? fac :
              new DefaultCommonPoolForkJoinWorkerThreadFactory();
          this.queues = new WorkQueue[size];
          this.registrationLock = new ReentrantLock();
+ 
+         String name = "ForkJoinPool.commonPool";
+         this.container = SharedThreadContainer.create(name);
      }
  
      /**
       * Returns the common pool instance. This pool is statically
       * constructed; its run state is unaffected by attempts to {@link

*** 2610,11 ***
       * @return the common pool instance
       * @since 1.8
       */
      public static ForkJoinPool commonPool() {
          // assert common != null : "static init error";
!         return common;
      }
  
      // Execution methods
  
      /**
--- 2614,11 ---
       * @return the common pool instance
       * @since 1.8
       */
      public static ForkJoinPool commonPool() {
          // assert common != null : "static init error";
!         return CommonPool.common;
      }
  
      // Execution methods
  
      /**

*** 2930,11 ***
       *
       * @return the targeted parallelism level of the common pool
       * @since 1.8
       */
      public static int getCommonPoolParallelism() {
!         return COMMON_PARALLELISM;
      }
  
      /**
       * Returns the number of worker threads that have started but not
       * yet terminated.  The result returned by this method may differ
--- 2934,11 ---
       *
       * @return the targeted parallelism level of the common pool
       * @since 1.8
       */
      public static int getCommonPoolParallelism() {
!         return CommonPool.COMMON_PARALLELISM;
      }
  
      /**
       * Returns the number of worker threads that have started but not
       * yet terminated.  The result returned by this method may differ

*** 3191,11 ***
       *         because it does not hold {@link
       *         java.lang.RuntimePermission}{@code ("modifyThread")}
       */
      public void shutdown() {
          checkPermission();
!         if (this != common)
              tryTerminate(false, true);
      }
  
      /**
       * Possibly attempts to cancel and/or stop all tasks, and reject
--- 3195,11 ---
       *         because it does not hold {@link
       *         java.lang.RuntimePermission}{@code ("modifyThread")}
       */
      public void shutdown() {
          checkPermission();
!         if (this != CommonPool.common)
              tryTerminate(false, true);
      }
  
      /**
       * Possibly attempts to cancel and/or stop all tasks, and reject

*** 3215,11 ***
       *         because it does not hold {@link
       *         java.lang.RuntimePermission}{@code ("modifyThread")}
       */
      public List<Runnable> shutdownNow() {
          checkPermission();
!         if (this != common)
              tryTerminate(true, true);
          return Collections.emptyList();
      }
  
      /**
--- 3219,11 ---
       *         because it does not hold {@link
       *         java.lang.RuntimePermission}{@code ("modifyThread")}
       */
      public List<Runnable> shutdownNow() {
          checkPermission();
!         if (this != CommonPool.common)
              tryTerminate(true, true);
          return Collections.emptyList();
      }
  
      /**

*** 3274,11 ***
      public boolean awaitTermination(long timeout, TimeUnit unit)
          throws InterruptedException {
          ReentrantLock lock; Condition cond;
          long nanos = unit.toNanos(timeout);
          boolean terminated = false;
!         if (this == common) {
              Thread t; ForkJoinWorkerThread wt; int q;
              if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
                  (wt = (ForkJoinWorkerThread)t).pool == this)
                  q = helpQuiescePool(wt.workQueue, nanos, true);
              else
--- 3278,11 ---
      public boolean awaitTermination(long timeout, TimeUnit unit)
          throws InterruptedException {
          ReentrantLock lock; Condition cond;
          long nanos = unit.toNanos(timeout);
          boolean terminated = false;
!         if (this == CommonPool.common) {
              Thread t; ForkJoinWorkerThread wt; int q;
              if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
                  (wt = (ForkJoinWorkerThread)t).pool == this)
                  q = helpQuiescePool(wt.workQueue, nanos, true);
              else

*** 3491,26 ***
  
          // Reduce the risk of rare disastrous classloading in first call to
          // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
          Class<?> ensureLoaded = LockSupport.class;
  
!         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
-         try {
-             String p = System.getProperty
-                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
-             if (p != null)
-                 commonMaxSpares = Integer.parseInt(p);
-         } catch (Exception ignore) {}
-         COMMON_MAX_SPARES = commonMaxSpares;
- 
-         defaultForkJoinWorkerThreadFactory =
-             new DefaultForkJoinWorkerThreadFactory();
          modifyThreadPermission = new RuntimePermission("modifyThread");
!         @SuppressWarnings("removal")
!         ForkJoinPool tmp = AccessController.doPrivileged(new PrivilegedAction<>() {
!             public ForkJoinPool run() {
!                 return new ForkJoinPool((byte)0); }});
!         common = tmp;
  
!         COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
      }
  }
--- 3495,61 ---
  
          // Reduce the risk of rare disastrous classloading in first call to
          // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
          Class<?> ensureLoaded = LockSupport.class;
  
!         defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();
          modifyThreadPermission = new RuntimePermission("modifyThread");
!     }
! 
!     static class CommonPool {
!         /**
!          * Common (static) pool. Non-null for public use unless a static
+          * construction exception, but internal usages null-check on use
+          * to paranoically avoid potential initialization circularities
+          * as well as to simplify generated code.
+          */
+         static final ForkJoinPool common;
+ 
+         /**
+          * Common pool parallelism. To allow simpler use and management
+          * when common pool threads are disabled, we allow the underlying
+          * common.parallelism field to be zero, but in that case still report
+          * parallelism as 1 to reflect resulting caller-runs mechanics.
+          */
+         static final int COMMON_PARALLELISM;
  
!         /**
+          * Limit on spare thread construction in tryCompensate.
+          */
+         private static final int COMMON_MAX_SPARES;
+ 
+         /**
+          * The default value for COMMON_MAX_SPARES.  Overridable using the
+          * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
+          * property.  The default value is far in excess of normal
+          * requirements, but also far short of MAX_CAP and typical OS
+          * thread limits, so allows JVMs to catch misuse/abuse before
+          * running out of resources needed to do so.
+          */
+         private static final int DEFAULT_COMMON_MAX_SPARES = 256;
+ 
+         static {
+             int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
+             try {
+                 String p = System.getProperty
+                     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
+                 if (p != null)
+                     commonMaxSpares = Integer.parseInt(p);
+             } catch (Exception ignore) {}
+             COMMON_MAX_SPARES = commonMaxSpares;
+ 
+             @SuppressWarnings("removal")
+             ForkJoinPool tmp = AccessController.doPrivileged(new PrivilegedAction<>() {
+                 public ForkJoinPool run() {
+                     return new ForkJoinPool((byte)0); }});
+             common = tmp;
+ 
+             COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
+         }
      }
  }
< prev index next >