1 /*
  2  * Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package java.util.concurrent;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.security.Permission;
 30 import java.util.ArrayList;
 31 import java.util.Collection;
 32 import java.util.Iterator;
 33 import java.util.List;
 34 import java.util.Objects;
 35 import java.util.Set;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.stream.Stream;
 38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 39 import jdk.internal.access.JavaLangAccess;
 40 import jdk.internal.access.SharedSecrets;
 41 import jdk.internal.vm.ThreadContainer;
 42 import jdk.internal.vm.ThreadContainers;
 43 
 44 /**
 45  * An ExecutorService that starts a new thread for each task. The number of
 46  * threads is unbounded.
 47  */
 48 class ThreadPerTaskExecutor extends ThreadContainer implements ExecutorService {
 49     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 50     private static final Permission MODIFY_THREAD = new RuntimePermission("modifyThread");
 51     private static final VarHandle STATE;
 52     static {
 53         try {
 54             MethodHandles.Lookup l = MethodHandles.lookup();
 55             STATE = l.findVarHandle(ThreadPerTaskExecutor.class, "state", int.class);
 56         } catch (Exception e) {
 57             throw new InternalError(e);
 58         }
 59     }
 60 
 61     private final ThreadFactory factory;
 62     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 63     private final CountDownLatch terminationSignal = new CountDownLatch(1);
 64 
 65     // states: RUNNING -> SHUTDOWN -> TERMINATED
 66     private static final int RUNNING    = 0;
 67     private static final int SHUTDOWN   = 1;
 68     private static final int TERMINATED = 2;
 69     private volatile int state;
 70 
 71     // the key for this container in the registry
 72     private volatile Object key;
 73 
 74     private ThreadPerTaskExecutor(ThreadFactory factory) {
 75         super(/*shared*/ true);
 76         this.factory = Objects.requireNonNull(factory);
 77     }
 78 
 79     /**
 80      * Creates a thread-per-task executor that creates threads using the given factory.
 81      */
 82     static ThreadPerTaskExecutor create(ThreadFactory factory) {
 83         var executor = new ThreadPerTaskExecutor(factory);
 84         // register it to allow discovery by serviceability tools
 85         executor.key = ThreadContainers.registerContainer(executor);
 86         return executor;
 87     }
 88 
 89     /**
 90      * Throws SecurityException if there is a security manager set and it denies
 91      * RuntimePermission("modifyThread").
 92      */
 93     @SuppressWarnings("removal")
 94     private void checkPermission() {
 95         SecurityManager sm = System.getSecurityManager();
 96         if (sm != null) {
 97             sm.checkPermission(MODIFY_THREAD);
 98         }
 99     }
100 
101     /**
102      * Throws RejectedExecutionException if the executor has been shutdown.
103      */
104     private void ensureNotShutdown() {
105         if (state >= SHUTDOWN) {
106             // shutdown or terminated
107             throw new RejectedExecutionException();
108         }
109     }
110 
111     /**
112      * Attempts to terminate if already shutdown. If this method terminates the
113      * executor then it signals any threads that are waiting for termination.
114      */
115     private void tryTerminate() {
116         assert state >= SHUTDOWN;
117         if (threads.isEmpty()
118             && STATE.compareAndSet(this, SHUTDOWN, TERMINATED)) {
119 
120             // signal waiters
121             terminationSignal.countDown();
122 
123             // remove from registry
124             ThreadContainers.deregisterContainer(key);
125         }
126     }
127 
128     /**
129      * Attempts to shutdown and terminate the executor.
130      * If interruptThreads is true then all running threads are interrupted.
131      */
132     private void tryShutdownAndTerminate(boolean interruptThreads) {
133         if (STATE.compareAndSet(this, RUNNING, SHUTDOWN))
134             tryTerminate();
135         if (interruptThreads) {
136             threads.forEach(Thread::interrupt);
137         }
138     }
139 
140     @Override
141     public Stream<Thread> threads() {
142         return threads.stream().filter(Thread::isAlive);
143     }
144 
145     @Override
146     public long threadCount() {
147         return threads.size();
148     }
149 
150     @Override
151     public void shutdown() {
152         checkPermission();
153         if (!isShutdown())
154             tryShutdownAndTerminate(false);
155     }
156 
157     @Override
158     public List<Runnable> shutdownNow() {
159         checkPermission();
160         if (!isTerminated())
161             tryShutdownAndTerminate(true);
162         return List.of();
163     }
164 
165     @Override
166     public boolean isShutdown() {
167         return state >= SHUTDOWN;
168     }
169 
170     @Override
171     public boolean isTerminated() {
172         return state >= TERMINATED;
173     }
174 
175     @Override
176     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
177         Objects.requireNonNull(unit);
178         if (isTerminated()) {
179             return true;
180         } else {
181             return terminationSignal.await(timeout, unit);
182         }
183     }
184 
185     /**
186      * Waits for executor to terminate.
187      */
188     private void awaitTermination() {
189         boolean terminated = isTerminated();
190         if (!terminated) {
191             tryShutdownAndTerminate(false);
192             boolean interrupted = false;
193             while (!terminated) {
194                 try {
195                     terminated = awaitTermination(1L, TimeUnit.DAYS);
196                 } catch (InterruptedException e) {
197                     if (!interrupted) {
198                         tryShutdownAndTerminate(true);
199                         interrupted = true;
200                     }
201                 }
202             }
203             if (interrupted) {
204                 Thread.currentThread().interrupt();
205             }
206         }
207     }
208 
209     @Override
210     public void close() {
211         checkPermission();
212         awaitTermination();
213     }
214 
215     /**
216      * Creates a thread to run the given task.
217      */
218     private Thread newThread(Runnable task) {
219         Thread thread = factory.newThread(task);
220         if (thread == null)
221             throw new RejectedExecutionException();
222         return thread;
223     }
224 
225     /**
226      * Notify the executor that the task executed by the given thread is complete.
227      * If the executor has been shutdown then this method will attempt to terminate
228      * the executor.
229      */
230     private void taskComplete(Thread thread) {
231         boolean removed = threads.remove(thread);
232         assert removed;
233         if (state == SHUTDOWN) {
234             tryTerminate();
235         }
236     }
237 
238     /**
239      * Adds a thread to the set of threads and starts it.
240      * @throws RejectedExecutionException
241      */
242     private void start(Thread thread) {
243         assert thread.getState() == Thread.State.NEW;
244         threads.add(thread);
245 
246         boolean started = false;
247         try {
248             if (state == RUNNING) {
249                 JLA.start(thread, this);
250                 started = true;
251             }
252         } finally {
253             if (!started) {
254                 taskComplete(thread);
255             }
256         }
257 
258         // throw REE if thread not started and no exception thrown
259         if (!started) {
260             throw new RejectedExecutionException();
261         }
262     }
263 
264     /**
265      * Starts a thread to execute the given task.
266      * @throws RejectedExecutionException
267      */
268     private Thread start(Runnable task) {
269         Objects.requireNonNull(task);
270         ensureNotShutdown();
271         Thread thread = newThread(new TaskRunner(this, task));
272         start(thread);
273         return thread;
274     }
275 
276     @Override
277     public void execute(Runnable task) {
278         start(task);
279     }
280 
281     @Override
282     public <T> Future<T> submit(Callable<T> task) {
283         Objects.requireNonNull(task);
284         ensureNotShutdown();
285         var future = new ThreadBoundFuture<>(this, task);
286         Thread thread = future.thread();
287         start(thread);
288         return future;
289     }
290 
291     @Override
292     public Future<?> submit(Runnable task) {
293         return submit(Executors.callable(task));
294     }
295 
296     @Override
297     public <T> Future<T> submit(Runnable task, T result) {
298         return submit(Executors.callable(task, result));
299     }
300 
301     /**
302      * Runs a task and notifies the executor when it completes.
303      */
304     private static class TaskRunner implements Runnable {
305         final ThreadPerTaskExecutor executor;
306         final Runnable task;
307         TaskRunner(ThreadPerTaskExecutor executor, Runnable task) {
308             this.executor = executor;
309             this.task = task;
310         }
311         @Override
312         public void run() {
313             try {
314                 task.run();
315             } finally {
316                 executor.taskComplete(Thread.currentThread());
317             }
318         }
319     }
320 
321     /**
322      * A Future for a task that runs in its own thread. The thread is
323      * created (but not started) when the Future is created. The thread
324      * is interrupted when the future is cancelled. The executor is
325      * notified when the task completes.
326      */
327     private static class ThreadBoundFuture<T>
328             extends FutureTask<T> implements Runnable {
329 
330         final ThreadPerTaskExecutor executor;
331         final Thread thread;
332 
333         ThreadBoundFuture(ThreadPerTaskExecutor executor, Callable<T> task) {
334             super(task);
335             this.executor = executor;
336             this.thread = executor.newThread(this);
337         }
338 
339         Thread thread() {
340             return thread;
341         }
342 
343         @Override
344         protected void done() {
345             executor.taskComplete(thread);
346         }
347     }
348 
349     @Override
350     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
351             throws InterruptedException {
352 
353         Objects.requireNonNull(tasks);
354         List<Future<T>> futures = new ArrayList<>();
355         int j = 0;
356         try {
357             for (Callable<T> t : tasks) {
358                 Future<T> f = submit(t);
359                 futures.add(f);
360             }
361             for (int size = futures.size(); j < size; j++) {
362                 Future<T> f = futures.get(j);
363                 if (!f.isDone()) {
364                     try {
365                         f.get();
366                     } catch (ExecutionException | CancellationException ignore) { }
367                 }
368             }
369             return futures;
370         } finally {
371             cancelAll(futures, j);
372         }
373     }
374 
375     @Override
376     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
377                                          long timeout, TimeUnit unit)
378             throws InterruptedException {
379 
380         Objects.requireNonNull(tasks);
381         long deadline = System.nanoTime() + unit.toNanos(timeout);
382         List<Future<T>> futures = new ArrayList<>();
383         int j = 0;
384         try {
385             for (Callable<T> t : tasks) {
386                 Future<T> f = submit(t);
387                 futures.add(f);
388             }
389             for (int size = futures.size(); j < size; j++) {
390                 Future<T> f = futures.get(j);
391                 if (!f.isDone()) {
392                     try {
393                         f.get(deadline - System.nanoTime(), NANOSECONDS);
394                     } catch (TimeoutException e) {
395                         break;
396                     } catch (ExecutionException | CancellationException ignore) { }
397                 }
398             }
399             return futures;
400         } finally {
401             cancelAll(futures, j);
402         }
403     }
404 
405     private <T> void cancelAll(List<Future<T>> futures, int j) {
406         for (int size = futures.size(); j < size; j++)
407             futures.get(j).cancel(true);
408     }
409 
410     @Override
411     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
412             throws InterruptedException, ExecutionException {
413         try {
414             return invokeAny(tasks, false, 0, null);
415         } catch (TimeoutException e) {
416             // should not happen
417             throw new InternalError(e);
418         }
419     }
420 
421     @Override
422     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
423             throws InterruptedException, ExecutionException, TimeoutException {
424         Objects.requireNonNull(unit);
425         return invokeAny(tasks, true, timeout, unit);
426     }
427 
428     private <T> T invokeAny(Collection<? extends Callable<T>> tasks,
429                             boolean timed,
430                             long timeout,
431                             TimeUnit unit)
432             throws InterruptedException, ExecutionException, TimeoutException {
433 
434         int size = tasks.size();
435         if (size == 0) {
436             throw new IllegalArgumentException("'tasks' is empty");
437         }
438 
439         var holder = new AnyResultHolder<T>(Thread.currentThread());
440         var threadList = new ArrayList<Thread>(size);
441         long nanos = (timed) ? unit.toNanos(timeout) : 0;
442         long startNanos = (timed) ? System.nanoTime() : 0;
443 
444         try {
445             int count = 0;
446             Iterator<? extends Callable<T>> iterator = tasks.iterator();
447             while (count < size && iterator.hasNext()) {
448                 Callable<T> task = iterator.next();
449                 Objects.requireNonNull(task);
450                 Thread thread = start(() -> {
451                     try {
452                         T r = task.call();
453                         holder.complete(r);
454                     } catch (Throwable e) {
455                         holder.completeExceptionally(e);
456                     }
457                 });
458                 threadList.add(thread);
459                 count++;
460             }
461             if (count == 0) {
462                 throw new IllegalArgumentException("'tasks' is empty");
463             }
464 
465             if (Thread.interrupted())
466                 throw new InterruptedException();
467             T result = holder.result();
468             while (result == null && holder.exceptionCount() < count) {
469                 if (timed) {
470                     long remainingNanos = nanos - (System.nanoTime() - startNanos);
471                     if (remainingNanos <= 0)
472                         throw new TimeoutException();
473                     LockSupport.parkNanos(remainingNanos);
474                 } else {
475                     LockSupport.park();
476                 }
477                 if (Thread.interrupted())
478                     throw new InterruptedException();
479                 result = holder.result();
480             }
481 
482             if (result != null) {
483                 return (result != AnyResultHolder.NULL) ? result : null;
484             } else {
485                 throw new ExecutionException(holder.firstException());
486             }
487 
488         } finally {
489             // interrupt any threads that are still running
490             for (Thread t : threadList) {
491                 if (t.isAlive()) {
492                     t.interrupt();
493                 }
494             }
495         }
496     }
497 
498     /**
499      * An object for use by invokeAny to hold the result of the first task
500      * to complete normally and/or the first exception thrown. The object
501      * also maintains a count of the number of tasks that attempted to
502      * complete up to when the first tasks completes normally.
503      */
504     private static class AnyResultHolder<T> {
505         private static final VarHandle RESULT;
506         private static final VarHandle EXCEPTION;
507         private static final VarHandle EXCEPTION_COUNT;
508         static {
509             try {
510                 MethodHandles.Lookup l = MethodHandles.lookup();
511                 RESULT = l.findVarHandle(AnyResultHolder.class, "result", Object.class);
512                 EXCEPTION = l.findVarHandle(AnyResultHolder.class, "exception", Throwable.class);
513                 EXCEPTION_COUNT = l.findVarHandle(AnyResultHolder.class, "exceptionCount", int.class);
514             } catch (Exception e) {
515                 throw new InternalError(e);
516             }
517         }
518         private static final Object NULL = new Object();
519 
520         private final Thread owner;
521         private volatile T result;
522         private volatile Throwable exception;
523         private volatile int exceptionCount;
524 
525         AnyResultHolder(Thread owner) {
526             this.owner = owner;
527         }
528 
529         /**
530          * Complete with the given result if not already completed. The winner
531          * unparks the owner thread.
532          */
533         void complete(T value) {
534             @SuppressWarnings("unchecked")
535             T v = (value != null) ? value : (T) NULL;
536             if (result == null && RESULT.compareAndSet(this, null, v)) {
537                 LockSupport.unpark(owner);
538             }
539         }
540 
541         /**
542          * Complete with the given exception. If the result is not already
543          * set then it unparks the owner thread.
544          */
545         void completeExceptionally(Throwable exc) {
546             if (result == null) {
547                 if (exception == null)
548                     EXCEPTION.compareAndSet(this, null, exc);
549                 EXCEPTION_COUNT.getAndAdd(this, 1);
550                 LockSupport.unpark(owner);
551             }
552         }
553 
554         /**
555          * Returns non-null if a task completed successfully. The result is
556          * NULL if completed with null.
557          */
558         T result() {
559             return result;
560         }
561 
562         /**
563          * Returns the first exception thrown if recorded by this object.
564          *
565          * @apiNote The result() method should be used to test if there is
566          * a result before invoking the exception method.
567          */
568         Throwable firstException() {
569             return exception;
570         }
571 
572         /**
573          * Returns the number of tasks that terminated with an exception before
574          * a task completed normally.
575          */
576         int exceptionCount() {
577             return exceptionCount;
578         }
579     }
580 }