< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
@@ -51,10 +51,12 @@
  import java.util.function.Predicate;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.locks.LockSupport;
  import java.util.concurrent.locks.ReentrantLock;
  import java.util.concurrent.locks.Condition;
+ import jdk.internal.misc.VirtualThreads;
+ import jdk.internal.vm.SharedThreadContainer;
  
  /**
   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
   * A {@code ForkJoinPool} provides the entry point for submissions
   * from non-{@code ForkJoinTask} clients, as well as management and

@@ -964,24 +966,32 @@
          /**
           * Pushes a task. Call only by owner in unshared queues.
           *
           * @param task the task. Caller must ensure non-null.
           * @param pool (no-op if null)
+          * @param signalOnEmpty signal a worker if queue was empty
           * @throws RejectedExecutionException if array cannot be resized
           */
-         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
+         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean signalOnEmpty) {
              ForkJoinTask<?>[] a = array;
              int s = top++, d = s - base, cap, m; // skip insert if disabled
              if (a != null && pool != null && (cap = a.length) > 0) {
                  setSlotVolatile(a, (m = cap - 1) & s, task);
-                 if (d == m)
+                 if (d == m) {
                      growArray();
-                 if (d == m || a[m & (s - 1)] == null)
-                     pool.signalWork(); // signal if was empty or resized
+                     pool.signalWork();  // signal if resized
+                 } else {
+                     if (signalOnEmpty && a[m & (s - 1)] == null)
+                         pool.signalWork(); // signal if was empty
+                 }
              }
          }
  
+         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
+             push(task, pool, true);
+         }
+ 
          /**
           * Pushes task to a shared queue with lock already held, and unlocks.
           *
           * @return true if caller should signal work
           */

@@ -1417,10 +1427,11 @@
      Condition termination;               // lazily constructed
      final String workerNamePrefix;       // null for common pool
      final ForkJoinWorkerThreadFactory factory;
      final UncaughtExceptionHandler ueh;  // per-worker UEH
      final Predicate<? super ForkJoinPool> saturate;
+     final SharedThreadContainer container;
  
      @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
      volatile long ctl;                   // main pool control
  
      // Support for atomic operations

@@ -1460,11 +1471,11 @@
          ForkJoinWorkerThreadFactory fac = factory;
          Throwable ex = null;
          ForkJoinWorkerThread wt = null;
          try {
              if (fac != null && (wt = fac.newThread(this)) != null) {
-                 wt.start();
+                 container.start(wt);
                  return true;
              }
          } catch (Throwable rex) {
              ex = rex;
          }

@@ -2195,10 +2206,27 @@
          else
              externalPush(task);
          return task;
      }
  
+     /**
+      * Pushes an external submission to the current carrier thread's work queue if
+      * possible. This method is invoked (reflectively) by the virtual thread
+      * implementation.
+      *
+      * @param signalOnEmpty true to signal a worker when the queue is empty
+      */
+     private void externalExecuteTask(Runnable task, boolean signalOnEmpty) {
+         var forkJoinTask = new ForkJoinTask.RunnableExecuteAction(task);
+         Thread t = VirtualThreads.currentCarrierThread();
+         if (t instanceof ForkJoinWorkerThread wt && wt.pool == this) {
+             wt.workQueue.push(forkJoinTask, this, signalOnEmpty);
+         } else {
+             externalPush(forkJoinTask);
+         }
+     }
+ 
      /**
       * Returns common pool queue for an external thread that has
       * possibly ever submitted a common pool task (nonzero probe), or
       * null if none.
       */

@@ -2344,10 +2372,11 @@
                  (lock = registrationLock) != null) {
                  lock.lock();
                  if ((cond = termination) != null)
                      cond.signalAll();
                  lock.unlock();
+                 container.close();
              }
              if (changed)
                  rescan = true;
              else if (rescan)
                  rescan = false;

@@ -2542,10 +2571,13 @@
                      (((long)(-p)     << RC_SHIFT) & RC_MASK));
          this.registrationLock = new ReentrantLock();
          this.queues = new WorkQueue[size];
          String pid = Integer.toString(getAndAddPoolIds(1) + 1);
          this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
+ 
+         String name = "ForkJoinPool-" + pid;
+         this.container = SharedThreadContainer.create(name);
      }
  
      // helper method for commonPool constructor
      private static Object newInstanceFromSystemProperty(String property)
          throws ReflectiveOperationException {

@@ -2593,10 +2625,13 @@
          }
          this.factory = (fac != null) ? fac :
              new DefaultCommonPoolForkJoinWorkerThreadFactory();
          this.queues = new WorkQueue[size];
          this.registrationLock = new ReentrantLock();
+ 
+         String name = "ForkJoinPool.commonPool";
+         this.container = SharedThreadContainer.create(name);
      }
  
      /**
       * Returns the common pool instance. This pool is statically
       * constructed; its run state is unaffected by attempts to {@link
< prev index next >