1 /*
  2  * Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package jdk.internal.misc;
 27 
 28 import java.lang.invoke.MethodHandle;
 29 import java.lang.invoke.MethodHandles;
 30 import java.lang.invoke.MethodType;
 31 import java.security.AccessController;
 32 import java.security.PrivilegedExceptionAction;
 33 import java.util.concurrent.Callable;
 34 import java.util.concurrent.ExecutionException;
 35 import java.util.concurrent.ExecutorService;
 36 import java.util.concurrent.Executors;
 37 import java.util.concurrent.ForkJoinPool;
 38 import java.util.concurrent.ForkJoinPool.ManagedBlocker;
 39 import java.util.concurrent.ForkJoinWorkerThread;
 40 import java.util.concurrent.Future;
 41 import java.util.concurrent.ThreadFactory;
 42 
 43 import jdk.internal.access.JavaLangAccess;
 44 import jdk.internal.access.SharedSecrets;
 45 
 46 /**
 47  * Defines static methods to execute blocking tasks on virtual threads.
 48  * If the carrier thread is a ForkJoinWorkerThread then the task runs in a
 49  * ForkJoinPool.ManagedBlocker to that its pool may be expanded to support
 50  * additional parallelism during the blocking operation.
 51  */
 52 public class Blocker {
 53     private static final Unsafe U = Unsafe.getUnsafe();
 54     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 55 
 56     private static final MethodHandle compensatedBlock;
 57     static {
 58         try {
 59             PrivilegedExceptionAction<MethodHandles.Lookup> pa = () ->
 60                 MethodHandles.privateLookupIn(ForkJoinPool.class, MethodHandles.lookup());
 61             @SuppressWarnings("removal")
 62             MethodHandles.Lookup l = AccessController.doPrivileged(pa);
 63             MethodType methodType = MethodType.methodType(void.class, ManagedBlocker.class);
 64             compensatedBlock = l.findVirtual(ForkJoinPool.class, "compensatedBlock", methodType);
 65         } catch (Exception e) {
 66             throw new InternalError(e);
 67         }
 68     }
 69 
 70     private Blocker() { }
 71 
 72     /**
 73      * A task that returns a result and may throw an exception.
 74      */
 75     @FunctionalInterface
 76     public interface BlockingCallable<V, X extends Throwable> {
 77         V call() throws X;
 78     }
 79 
 80     /**
 81      * A task that may throw an exception.
 82      */
 83     @FunctionalInterface
 84     public interface BlockingRunnable<X extends Throwable> {
 85         void run() throws X;
 86     }
 87 
 88     private static class CallableBlocker<V, X extends Throwable>
 89             implements ManagedBlocker {
 90 
 91         private final BlockingCallable<V, X> task;
 92         private boolean done;
 93         private V result;
 94 
 95         CallableBlocker(BlockingCallable<V, X> task) {
 96             this.task = task;
 97         }
 98 
 99         V result() {
100             return result;
101         }
102 
103         @Override
104         public boolean block() {
105             try {
106                 result = task.call();
107             } catch (Throwable e) {
108                 U.throwException(e);
109             } finally {
110                 done = true;
111             }
112             return true;
113         }
114 
115         @Override
116         public boolean isReleasable() {
117             return done;
118         }
119     }
120 
121     private static class RunnableBlocker<X extends Throwable>
122             implements ManagedBlocker {
123         private final BlockingRunnable<X> task;
124         private boolean done;
125 
126         RunnableBlocker(BlockingRunnable<X> task) {
127             this.task = task;
128         }
129 
130         @Override
131         public boolean block() {
132             try {
133                 task.run();
134             } catch (Throwable e) {
135                 U.throwException(e);
136             } finally {
137                 done = true;
138             }
139             return true;
140         }
141 
142         @Override
143         public boolean isReleasable() {
144             return done;
145         }
146     }
147 
148     /**
149      * Executes a task that may block and pin the current thread. If invoked on a
150      * virtual thread and the current carrier thread is in a ForkJoinPool then the
151      * pool may be expanded to support additional parallelism during the call to
152      * this method.
153      */
154     public static <V, X extends Throwable> V managedBlock(BlockingCallable<V, X> task) {
155         Thread carrier = JLA.currentCarrierThread();
156         ForkJoinPool pool;
157         if (carrier instanceof ForkJoinWorkerThread
158             && (pool = ((ForkJoinWorkerThread) carrier).getPool()) != null) {
159             try {
160                 var blocker = new CallableBlocker<>(task);
161                 compensatedBlock.invoke(pool, blocker);
162                 return blocker.result();
163             } catch (Throwable e) {
164                 U.throwException(e);
165             }
166             assert false;  // should not get here
167         }
168 
169         // run directly
170         try {
171             return task.call();
172         } catch (Throwable e) {
173             U.throwException(e);
174             return null;
175         }
176     }
177 
178     /**
179      * Executes a task that may block and pin the current thread. If invoked on a
180      * virtual thread and the current carrier thread is in a ForkJoinPool then the
181      * pool may be expanded to support additional parallelism during the call to
182      * this method.
183      */
184     public static <X extends Throwable> void managedBlock(BlockingRunnable<X> task) {
185         Thread carrier = JLA.currentCarrierThread();
186         ForkJoinPool pool;
187         if (carrier instanceof ForkJoinWorkerThread
188                 && (pool = ((ForkJoinWorkerThread) carrier).getPool()) != null) {
189             try {
190                 compensatedBlock.invoke(pool, new RunnableBlocker<>(task));
191                 return;
192             } catch (Throwable e) {
193                 U.throwException(e);
194             }
195             assert false;  // should not get here
196         }
197 
198         // run directly
199         try {
200             task.run();
201         } catch (Throwable e) {
202             U.throwException(e);
203         }
204     }
205 
206     /**
207      * Runs the given task in a background thread pool.
208      */
209     public static <V> V runInThreadPool(Callable<V> task) {
210         Future<V> future = ThreadPool.THREAD_POOL.submit(task);
211         boolean interrupted = false;
212         try {
213             for (;;) {
214                 try {
215                     return future.get();
216                 } catch (ExecutionException e) {
217                     U.throwException(e.getCause());
218                     return null;
219                 } catch (InterruptedException e) {
220                     interrupted = true;
221                 }
222             }
223         } finally {
224             if (interrupted) Thread.currentThread().interrupt();
225         }
226     }
227 
228     /**
229      * Runs the given task in a background thread pool.
230      */
231     public static void runInThreadPool(Runnable task) {
232         Future<?> future = ThreadPool.THREAD_POOL.submit(task);
233         boolean interrupted = false;
234         try {
235             for (;;) {
236                 try {
237                     future.get();
238                 } catch (ExecutionException e) {
239                     U.throwException(e.getCause());
240                 } catch (InterruptedException e) {
241                     interrupted = true;
242                 }
243             }
244         } finally {
245             if (interrupted) Thread.currentThread().interrupt();
246         }
247     }
248 
249     private static class ThreadPool {
250         private static final ExecutorService THREAD_POOL;
251         static {
252             int parallelism = Runtime.getRuntime().availableProcessors() << 1;
253             ThreadFactory factory = task -> InnocuousThread.newThread(task);
254             THREAD_POOL = Executors.newFixedThreadPool(parallelism, factory);
255         }
256     }
257 }