1 /*
  2  * Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 
 26 package jdk.internal.misc;
 27 
 28 import java.lang.invoke.MethodHandle;
 29 import java.lang.invoke.MethodHandles;
 30 import java.lang.invoke.MethodType;
 31 import java.security.AccessController;
 32 import java.security.PrivilegedExceptionAction;
 33 import java.util.concurrent.Callable;
 34 import java.util.concurrent.ExecutionException;
 35 import java.util.concurrent.ExecutorService;
 36 import java.util.concurrent.Executors;
 37 import java.util.concurrent.ForkJoinPool;
 38 import java.util.concurrent.ForkJoinPool.ManagedBlocker;
 39 import java.util.concurrent.Future;
 40 import java.util.concurrent.ThreadFactory;
 41 import jdk.internal.access.JavaLangAccess;
 42 import jdk.internal.access.SharedSecrets;
 43 
 44 /**
 45  * Defines static methods to execute blocking tasks on virtual threads.
 46  * If the carrier thread is a CarrierThread then the task runs in a
 47  * ForkJoinPool.ManagedBlocker to that its pool may be expanded to support
 48  * additional parallelism during the blocking operation.
 49  */
 50 public class Blocker {
 51     private static final Unsafe U = Unsafe.getUnsafe();
 52     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 53 
 54     private static final MethodHandle compensatedBlock;
 55     static {
 56         try {
 57             PrivilegedExceptionAction<MethodHandles.Lookup> pa = () ->
 58                 MethodHandles.privateLookupIn(ForkJoinPool.class, MethodHandles.lookup());
 59             @SuppressWarnings("removal")
 60             MethodHandles.Lookup l = AccessController.doPrivileged(pa);
 61             MethodType methodType = MethodType.methodType(void.class, ManagedBlocker.class);
 62             compensatedBlock = l.findVirtual(ForkJoinPool.class, "compensatedBlock", methodType);
 63         } catch (Exception e) {
 64             throw new InternalError(e);
 65         }
 66     }
 67 
 68     private Blocker() { }
 69 
 70     /**
 71      * A task that returns a result and may throw an exception.
 72      */
 73     @FunctionalInterface
 74     public interface BlockingCallable<V, X extends Throwable> {
 75         V call() throws X;
 76     }
 77 
 78     /**
 79      * A task that may throw an exception.
 80      */
 81     @FunctionalInterface
 82     public interface BlockingRunnable<X extends Throwable> {
 83         void run() throws X;
 84     }
 85 
 86     private static class CallableBlocker<V, X extends Throwable> implements ManagedBlocker {
 87         private final BlockingCallable<V, X> task;
 88         private boolean done;
 89         private V result;
 90 
 91         CallableBlocker(BlockingCallable<V, X> task) {
 92             this.task = task;
 93         }
 94 
 95         V result() {
 96             return result;
 97         }
 98 
 99         @Override
100         public boolean block() {
101             try {
102                 result = task.call();
103             } catch (Throwable e) {
104                 U.throwException(e);
105             } finally {
106                 done = true;
107             }
108             return true;
109         }
110 
111         @Override
112         public boolean isReleasable() {
113             return done;
114         }
115     }
116 
117     private static class RunnableBlocker<X extends Throwable> implements ManagedBlocker {
118         private final BlockingRunnable<X> task;
119         private boolean done;
120 
121         RunnableBlocker(BlockingRunnable<X> task) {
122             this.task = task;
123         }
124 
125         @Override
126         public boolean block() {
127             try {
128                 task.run();
129             } catch (Throwable e) {
130                 U.throwException(e);
131             } finally {
132                 done = true;
133             }
134             return true;
135         }
136 
137         @Override
138         public boolean isReleasable() {
139             return done;
140         }
141     }
142 
143     /**
144      * Executes a task that may block and pin the current thread. If invoked on a
145      * virtual thread and the current carrier thread is in a CarrierThread then the
146      * pool may be expanded to support additional parallelism during the call to
147      * this method.
148      */
149     public static <V, X extends Throwable> V managedBlock(BlockingCallable<V, X> task) {
150         Thread t = JLA.currentCarrierThread();
151         if (t instanceof CarrierThread ct && !ct.blocked()) {
152             ct.blocked(true);
153             try {
154                 var blocker = new CallableBlocker<>(task);
155                 compensatedBlock.invoke(ct.getPool(), blocker);
156                 return blocker.result();
157             } catch (Throwable e) {
158                 U.throwException(e);
159             } finally {
160                 ct.blocked(false);
161             }
162             assert false;  // should not get here
163         }
164 
165         // run directly
166         try {
167             return task.call();
168         } catch (Throwable e) {
169             U.throwException(e);
170             return null;
171         }
172     }
173 
174     /**
175      * Executes a task that may block and pin the current thread. If invoked on a
176      * virtual thread and the current carrier thread is in a CarrierThread then the
177      * pool may be expanded to support additional parallelism during the call to
178      * this method.
179      */
180     public static <X extends Throwable> void managedBlock(BlockingRunnable<X> task) {
181         Thread t = JLA.currentCarrierThread();
182         if (t instanceof CarrierThread ct && !ct.blocked()) {
183             ct.blocked(true);
184             try {
185                 compensatedBlock.invoke(ct.getPool(), new RunnableBlocker<>(task));
186                 return;
187             } catch (Throwable e) {
188                 U.throwException(e);
189             } finally {
190                 ct.blocked(false);
191             }
192             assert false;  // should not get here
193         }
194 
195         // run directly
196         try {
197             task.run();
198         } catch (Throwable e) {
199             U.throwException(e);
200         }
201     }
202 
203     /**
204      * Runs the given task in a background thread pool.
205      */
206     public static <V> V runInThreadPool(Callable<V> task) {
207         Future<V> future = ThreadPool.THREAD_POOL.submit(task);
208         boolean interrupted = false;
209         try {
210             for (;;) {
211                 try {
212                     return future.get();
213                 } catch (ExecutionException e) {
214                     U.throwException(e.getCause());
215                     return null;
216                 } catch (InterruptedException e) {
217                     interrupted = true;
218                 }
219             }
220         } finally {
221             if (interrupted) Thread.currentThread().interrupt();
222         }
223     }
224 
225     /**
226      * Runs the given task in a background thread pool.
227      */
228     public static void runInThreadPool(Runnable task) {
229         Future<?> future = ThreadPool.THREAD_POOL.submit(task);
230         boolean interrupted = false;
231         try {
232             for (;;) {
233                 try {
234                     future.get();
235                 } catch (ExecutionException e) {
236                     U.throwException(e.getCause());
237                 } catch (InterruptedException e) {
238                     interrupted = true;
239                 }
240             }
241         } finally {
242             if (interrupted) Thread.currentThread().interrupt();
243         }
244     }
245 
246     private static class ThreadPool {
247         private static final ExecutorService THREAD_POOL;
248         static {
249             int parallelism = Runtime.getRuntime().availableProcessors() << 1;
250             ThreadFactory factory = task -> InnocuousThread.newThread(task);
251             THREAD_POOL = Executors.newFixedThreadPool(parallelism, factory);
252         }
253     }
254 }