1 /*
  2  * Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package java.util.concurrent;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.security.Permission;
 30 import java.util.ArrayList;
 31 import java.util.Collection;
 32 import java.util.Iterator;
 33 import java.util.List;
 34 import java.util.Objects;
 35 import java.util.Set;
 36 import java.util.concurrent.locks.LockSupport;
 37 import java.util.stream.Stream;
 38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 39 import jdk.internal.access.JavaLangAccess;
 40 import jdk.internal.access.SharedSecrets;
 41 import jdk.internal.vm.ThreadContainer;
 42 import jdk.internal.vm.ThreadContainers;
 43 
 44 /**
 45  * An ExecutorService that starts a new thread for each task. The number of
 46  * threads is unbounded.
 47  */
 48 class ThreadPerTaskExecutor extends ThreadContainer implements ExecutorService {
 49     private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
 50     private static final Permission MODIFY_THREAD = new RuntimePermission("modifyThread");
 51     private static final VarHandle STATE;
 52     static {
 53         try {
 54             MethodHandles.Lookup l = MethodHandles.lookup();
 55             STATE = l.findVarHandle(ThreadPerTaskExecutor.class, "state", int.class);
 56         } catch (Exception e) {
 57             throw new InternalError(e);
 58         }
 59     }
 60 
 61     private final ThreadFactory factory;
 62     private final Set<Thread> threads = ConcurrentHashMap.newKeySet();
 63     private final CountDownLatch terminationSignal = new CountDownLatch(1);
 64 
 65     // states: RUNNING -> SHUTDOWN -> TERMINATED
 66     private static final int RUNNING    = 0;
 67     private static final int SHUTDOWN   = 1;
 68     private static final int TERMINATED = 2;
 69     private volatile int state;
 70 
 71     // the key for this container in the registry
 72     private volatile Object key;
 73 
 74     private ThreadPerTaskExecutor(ThreadFactory factory) {
 75         super(/*shared*/ true);
 76         this.factory = Objects.requireNonNull(factory);
 77     }
 78 
 79     /**
 80      * Creates a thread-per-task executor that creates threads using the given factory.
 81      */
 82     static ThreadPerTaskExecutor create(ThreadFactory factory) {
 83         var executor = new ThreadPerTaskExecutor(factory);
 84         // register it to allow discovery by serviceability tools
 85         executor.key = ThreadContainers.registerContainer(executor);
 86         return executor;
 87     }
 88 
 89     /**
 90      * Throws SecurityException if there is a security manager set and it denies
 91      * RuntimePermission("modifyThread").
 92      */
 93     @SuppressWarnings("removal")
 94     private void checkPermission() {
 95         SecurityManager sm = System.getSecurityManager();
 96         if (sm != null) {
 97             sm.checkPermission(MODIFY_THREAD);
 98         }
 99     }
100 
101     /**
102      * Throws RejectedExecutionException if the executor has been shutdown.
103      */
104     private void ensureNotShutdown() {
105         if (state >= SHUTDOWN) {
106             // shutdown or terminated
107             throw new RejectedExecutionException();
108         }
109     }
110 
111     /**
112      * Attempts to terminate if already shutdown. If this method terminates the
113      * executor then it signals any threads that are waiting for termination.
114      */
115     private void tryTerminate() {
116         assert state >= SHUTDOWN;
117         if (threads.isEmpty()
118             && STATE.compareAndSet(this, SHUTDOWN, TERMINATED)) {
119 
120             // signal waiters
121             terminationSignal.countDown();
122 
123             // remove from registry
124             ThreadContainers.deregisterContainer(key);
125         }
126     }
127 
128     /**
129      * Attempts to shutdown and terminate the executor.
130      * If interruptThreads is true then all running threads are interrupted.
131      */
132     private void tryShutdownAndTerminate(boolean interruptThreads) {
133         if (STATE.compareAndSet(this, RUNNING, SHUTDOWN))
134             tryTerminate();
135         if (interruptThreads) {
136             threads.forEach(Thread::interrupt);
137         }
138     }
139 
140     @Override
141     public Stream<Thread> threads() {
142         return threads.stream().filter(Thread::isAlive);
143     }
144 
145     @Override
146     public long threadCount() {
147         return threads.size();
148     }
149 
150     @Override
151     public void shutdown() {
152         checkPermission();
153         if (!isShutdown())
154             tryShutdownAndTerminate(false);
155     }
156 
157     @Override
158     public List<Runnable> shutdownNow() {
159         checkPermission();
160         if (!isTerminated())
161             tryShutdownAndTerminate(true);
162         return List.of();
163     }
164 
165     @Override
166     public boolean isShutdown() {
167         return state >= SHUTDOWN;
168     }
169 
170     @Override
171     public boolean isTerminated() {
172         return state >= TERMINATED;
173     }
174 
175     @Override
176     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
177         Objects.requireNonNull(unit);
178         if (isTerminated()) {
179             return true;
180         } else {
181             return terminationSignal.await(timeout, unit);
182         }
183     }
184 
185     /**
186      * Waits for executor to terminate.
187      */
188     private void awaitTermination() {
189         boolean terminated = isTerminated();
190         if (!terminated) {
191             tryShutdownAndTerminate(false);
192             boolean interrupted = false;
193             while (!terminated) {
194                 try {
195                     terminated = awaitTermination(1L, TimeUnit.DAYS);
196                 } catch (InterruptedException e) {
197                     if (!interrupted) {
198                         tryShutdownAndTerminate(true);
199                         interrupted = true;
200                     }
201                 }
202             }
203             if (interrupted) {
204                 Thread.currentThread().interrupt();
205             }
206         }
207     }
208 
209     @Override
210     public void close() {
211         checkPermission();
212         awaitTermination();
213     }
214 
215     /**
216      * Creates a thread to run the given task.
217      */
218     private Thread newThread(Runnable task) {
219         Thread thread = factory.newThread(task);
220         if (thread == null)
221             throw new RejectedExecutionException();
222         return thread;
223     }
224 
225     /**
226      * Notify the executor that the task executed by the given thread is complete.
227      * If the executor has been shutdown then this method will attempt to terminate
228      * the executor.
229      */
230     private void taskComplete(Thread thread) {
231         boolean removed = threads.remove(thread);
232         assert removed;
233         if (state == SHUTDOWN) {
234             tryTerminate();
235         }
236     }
237 
238     /**
239      * Adds a thread to the set of threads and starts it.
240      * @throws RejectedExecutionException
241      */
242     private void start(Thread thread) {
243         assert thread.getState() == Thread.State.NEW;
244         threads.add(thread);
245 
246         boolean started = false;
247         try {
248             if (state == RUNNING) {
249                 JLA.start(thread, this);
250                 started = true;
251             }
252         } finally {
253             if (!started) {
254                 taskComplete(thread);
255             }
256         }
257 
258         // throw REE if thread not started and no exception thrown
259         if (!started) {
260             throw new RejectedExecutionException();
261         }
262     }
263 
264     /**
265      * Starts a thread to execute the given task.
266      * @throws RejectedExecutionException
267      */
268     private Thread start(Runnable task) {
269         Objects.requireNonNull(task);
270         ensureNotShutdown();
271         Thread thread = newThread(new TaskRunner(this, task));
272         start(thread);
273         return thread;
274     }
275 
276     @Override
277     public void execute(Runnable task) {
278         start(task);
279     }
280 
281     @Override
282     public <T> Future<T> submit(Callable<T> task) {
283         Objects.requireNonNull(task);
284         ensureNotShutdown();
285         var future = new ThreadBoundFuture<>(this, task);
286         Thread thread = future.thread();
287         start(thread);
288         return future;
289     }
290 
291     @Override
292     public Future<?> submit(Runnable task) {
293         return submit(Executors.callable(task));
294     }
295 
296     @Override
297     public <T> Future<T> submit(Runnable task, T result) {
298         return submit(Executors.callable(task, result));
299     }
300 
301     /**
302      * Runs a task and notifies the executor when it completes.
303      */
304     private static class TaskRunner implements Runnable {
305         final ThreadPerTaskExecutor executor;
306         final Runnable task;
307         TaskRunner(ThreadPerTaskExecutor executor, Runnable task) {
308             this.executor = executor;
309             this.task = task;
310         }
311         @Override
312         public void run() {
313             try {
314                 task.run();
315             } finally {
316                 executor.taskComplete(Thread.currentThread());
317             }
318         }
319     }
320 
321     /**
322      * A Future for a task that runs in its own thread. The thread is
323      * created (but not started) when the Future is created. The thread
324      * is interrupted when the future is cancelled. The executor is
325      * notified when the task completes.
326      */
327     private static class ThreadBoundFuture<T>
328             extends CompletableFuture<T> implements Runnable {
329 
330         final ThreadPerTaskExecutor executor;
331         final Callable<T> task;
332         final Thread thread;
333 
334         ThreadBoundFuture(ThreadPerTaskExecutor executor, Callable<T> task) {
335             this.executor = executor;
336             this.task = task;
337             this.thread = executor.newThread(this);
338         }
339 
340         Thread thread() {
341             return thread;
342         }
343 
344         @Override
345         public void run() {
346             if (Thread.currentThread() != thread) {
347                 // should not happen except where something casts this object
348                 // to a Runnable and invokes the run method.
349                 throw new WrongThreadException();
350             }
351             try {
352                 T result = task.call();
353                 complete(result);
354             } catch (Throwable e) {
355                 completeExceptionally(e);
356             } finally {
357                 executor.taskComplete(thread);
358             }
359         }
360 
361         @Override
362         public boolean cancel(boolean mayInterruptIfRunning) {
363             boolean cancelled = super.cancel(mayInterruptIfRunning);
364             if (cancelled && mayInterruptIfRunning)
365                 thread.interrupt();
366             return cancelled;
367         }
368     }
369 
370     @Override
371     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
372             throws InterruptedException {
373 
374         Objects.requireNonNull(tasks);
375         List<Future<T>> futures = new ArrayList<>();
376         int j = 0;
377         try {
378             for (Callable<T> t : tasks) {
379                 Future<T> f = submit(t);
380                 futures.add(f);
381             }
382             for (int size = futures.size(); j < size; j++) {
383                 Future<T> f = futures.get(j);
384                 if (!f.isDone()) {
385                     try {
386                         f.get();
387                     } catch (ExecutionException | CancellationException ignore) { }
388                 }
389             }
390             return futures;
391         } finally {
392             cancelAll(futures, j);
393         }
394     }
395 
396     @Override
397     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
398                                          long timeout, TimeUnit unit)
399             throws InterruptedException {
400 
401         Objects.requireNonNull(tasks);
402         long deadline = System.nanoTime() + unit.toNanos(timeout);
403         List<Future<T>> futures = new ArrayList<>();
404         int j = 0;
405         try {
406             for (Callable<T> t : tasks) {
407                 Future<T> f = submit(t);
408                 futures.add(f);
409             }
410             for (int size = futures.size(); j < size; j++) {
411                 Future<T> f = futures.get(j);
412                 if (!f.isDone()) {
413                     try {
414                         f.get(deadline - System.nanoTime(), NANOSECONDS);
415                     } catch (TimeoutException e) {
416                         break;
417                     } catch (ExecutionException | CancellationException ignore) { }
418                 }
419             }
420             return futures;
421         } finally {
422             cancelAll(futures, j);
423         }
424     }
425 
426     private <T> void cancelAll(List<Future<T>> futures, int j) {
427         for (int size = futures.size(); j < size; j++)
428             futures.get(j).cancel(true);
429     }
430 
431     @Override
432     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
433             throws InterruptedException, ExecutionException {
434         try {
435             return invokeAny(tasks, false, 0, null);
436         } catch (TimeoutException e) {
437             // should not happen
438             throw new InternalError(e);
439         }
440     }
441 
442     @Override
443     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
444             throws InterruptedException, ExecutionException, TimeoutException {
445         Objects.requireNonNull(unit);
446         return invokeAny(tasks, true, timeout, unit);
447     }
448 
449     private <T> T invokeAny(Collection<? extends Callable<T>> tasks,
450                             boolean timed,
451                             long timeout,
452                             TimeUnit unit)
453             throws InterruptedException, ExecutionException, TimeoutException {
454 
455         int size = tasks.size();
456         if (size == 0) {
457             throw new IllegalArgumentException("'tasks' is empty");
458         }
459 
460         var holder = new AnyResultHolder<T>(Thread.currentThread());
461         var threadList = new ArrayList<Thread>(size);
462         long nanos = (timed) ? unit.toNanos(timeout) : 0;
463         long startNanos = (timed) ? System.nanoTime() : 0;
464 
465         try {
466             int count = 0;
467             Iterator<? extends Callable<T>> iterator = tasks.iterator();
468             while (count < size && iterator.hasNext()) {
469                 Callable<T> task = iterator.next();
470                 Objects.requireNonNull(task);
471                 Thread thread = start(() -> {
472                     try {
473                         T r = task.call();
474                         holder.complete(r);
475                     } catch (Throwable e) {
476                         holder.completeExceptionally(e);
477                     }
478                 });
479                 threadList.add(thread);
480                 count++;
481             }
482             if (count == 0) {
483                 throw new IllegalArgumentException("'tasks' is empty");
484             }
485 
486             if (Thread.interrupted())
487                 throw new InterruptedException();
488             T result = holder.result();
489             while (result == null && holder.exceptionCount() < count) {
490                 if (timed) {
491                     long remainingNanos = nanos - (System.nanoTime() - startNanos);
492                     if (remainingNanos <= 0)
493                         throw new TimeoutException();
494                     LockSupport.parkNanos(remainingNanos);
495                 } else {
496                     LockSupport.park();
497                 }
498                 if (Thread.interrupted())
499                     throw new InterruptedException();
500                 result = holder.result();
501             }
502 
503             if (result != null) {
504                 return (result != AnyResultHolder.NULL) ? result : null;
505             } else {
506                 throw new ExecutionException(holder.firstException());
507             }
508 
509         } finally {
510             // interrupt any threads that are still running
511             for (Thread t : threadList) {
512                 if (t.isAlive()) {
513                     t.interrupt();
514                 }
515             }
516         }
517     }
518 
519     /**
520      * An object for use by invokeAny to hold the result of the first task
521      * to complete normally and/or the first exception thrown. The object
522      * also maintains a count of the number of tasks that attempted to
523      * complete up to when the first tasks completes normally.
524      */
525     private static class AnyResultHolder<T> {
526         private static final VarHandle RESULT;
527         private static final VarHandle EXCEPTION;
528         private static final VarHandle EXCEPTION_COUNT;
529         static {
530             try {
531                 MethodHandles.Lookup l = MethodHandles.lookup();
532                 RESULT = l.findVarHandle(AnyResultHolder.class, "result", Object.class);
533                 EXCEPTION = l.findVarHandle(AnyResultHolder.class, "exception", Throwable.class);
534                 EXCEPTION_COUNT = l.findVarHandle(AnyResultHolder.class, "exceptionCount", int.class);
535             } catch (Exception e) {
536                 throw new InternalError(e);
537             }
538         }
539         private static final Object NULL = new Object();
540 
541         private final Thread owner;
542         private volatile T result;
543         private volatile Throwable exception;
544         private volatile int exceptionCount;
545 
546         AnyResultHolder(Thread owner) {
547             this.owner = owner;
548         }
549 
550         /**
551          * Complete with the given result if not already completed. The winner
552          * unparks the owner thread.
553          */
554         void complete(T value) {
555             @SuppressWarnings("unchecked")
556             T v = (value != null) ? value : (T) NULL;
557             if (result == null && RESULT.compareAndSet(this, null, v)) {
558                 LockSupport.unpark(owner);
559             }
560         }
561 
562         /**
563          * Complete with the given exception. If the result is not already
564          * set then it unparks the owner thread.
565          */
566         void completeExceptionally(Throwable exc) {
567             if (result == null) {
568                 if (exception == null)
569                     EXCEPTION.compareAndSet(this, null, exc);
570                 EXCEPTION_COUNT.getAndAdd(this, 1);
571                 LockSupport.unpark(owner);
572             }
573         }
574 
575         /**
576          * Returns non-null if a task completed successfully. The result is
577          * NULL if completed with null.
578          */
579         T result() {
580             return result;
581         }
582 
583         /**
584          * Returns the first exception thrown if recorded by this object.
585          *
586          * @apiNote The result() method should be used to test if there is
587          * a result before invoking the exception method.
588          */
589         Throwable firstException() {
590             return exception;
591         }
592 
593         /**
594          * Returns the number of tasks that terminated with an exception before
595          * a task completed normally.
596          */
597         int exceptionCount() {
598             return exceptionCount;
599         }
600     }
601 }