1 /* 2 * Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.Permission; 30 import java.util.ArrayList; 31 import java.util.Collection; 32 import java.util.Iterator; 33 import java.util.List; 34 import java.util.Objects; 35 import java.util.Set; 36 import java.util.concurrent.locks.LockSupport; 37 import java.util.stream.Stream; 38 import static java.util.concurrent.TimeUnit.NANOSECONDS; 39 import jdk.internal.access.JavaLangAccess; 40 import jdk.internal.access.SharedSecrets; 41 import jdk.internal.vm.ThreadContainer; 42 import jdk.internal.vm.ThreadContainers; 43 44 /** 45 * An ExecutorService that starts a new thread for each task. The number of 46 * threads is unbounded. 47 */ 48 class ThreadPerTaskExecutor extends ThreadContainer implements ExecutorService { 49 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 50 private static final Permission MODIFY_THREAD = new RuntimePermission("modifyThread"); 51 private static final VarHandle STATE; 52 static { 53 try { 54 MethodHandles.Lookup l = MethodHandles.lookup(); 55 STATE = l.findVarHandle(ThreadPerTaskExecutor.class, "state", int.class); 56 } catch (Exception e) { 57 throw new InternalError(e); 58 } 59 } 60 61 private final ThreadFactory factory; 62 private final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 63 private final CountDownLatch terminationSignal = new CountDownLatch(1); 64 65 // states: RUNNING -> SHUTDOWN -> TERMINATED 66 private static final int RUNNING = 0; 67 private static final int SHUTDOWN = 1; 68 private static final int TERMINATED = 2; 69 private volatile int state; 70 71 // the key for this container in the registry 72 private volatile Object key; 73 74 private ThreadPerTaskExecutor(ThreadFactory factory) { 75 super(/*shared*/ true); 76 this.factory = Objects.requireNonNull(factory); 77 } 78 79 /** 80 * Creates a thread-per-task executor that creates threads using the given factory. 81 */ 82 static ThreadPerTaskExecutor create(ThreadFactory factory) { 83 var executor = new ThreadPerTaskExecutor(factory); 84 // register it to allow discovery by serviceability tools 85 executor.key = ThreadContainers.registerContainer(executor); 86 return executor; 87 } 88 89 /** 90 * Throws SecurityException if there is a security manager set and it denies 91 * RuntimePermission("modifyThread"). 92 */ 93 @SuppressWarnings("removal") 94 private void checkPermission() { 95 SecurityManager sm = System.getSecurityManager(); 96 if (sm != null) { 97 sm.checkPermission(MODIFY_THREAD); 98 } 99 } 100 101 /** 102 * Throws RejectedExecutionException if the executor has been shutdown. 103 */ 104 private void ensureNotShutdown() { 105 if (state >= SHUTDOWN) { 106 // shutdown or terminated 107 throw new RejectedExecutionException(); 108 } 109 } 110 111 /** 112 * Attempts to terminate if already shutdown. If this method terminates the 113 * executor then it signals any threads that are waiting for termination. 114 */ 115 private void tryTerminate() { 116 assert state >= SHUTDOWN; 117 if (threads.isEmpty() 118 && STATE.compareAndSet(this, SHUTDOWN, TERMINATED)) { 119 120 // signal waiters 121 terminationSignal.countDown(); 122 123 // remove from registry 124 ThreadContainers.deregisterContainer(key); 125 } 126 } 127 128 /** 129 * Attempts to shutdown and terminate the executor. 130 * If interruptThreads is true then all running threads are interrupted. 131 */ 132 private void tryShutdownAndTerminate(boolean interruptThreads) { 133 if (STATE.compareAndSet(this, RUNNING, SHUTDOWN)) 134 tryTerminate(); 135 if (interruptThreads) { 136 threads.forEach(Thread::interrupt); 137 } 138 } 139 140 @Override 141 public Stream<Thread> threads() { 142 return threads.stream().filter(Thread::isAlive); 143 } 144 145 @Override 146 public long threadCount() { 147 return threads.size(); 148 } 149 150 @Override 151 public void shutdown() { 152 checkPermission(); 153 if (!isShutdown()) 154 tryShutdownAndTerminate(false); 155 } 156 157 @Override 158 public List<Runnable> shutdownNow() { 159 checkPermission(); 160 if (!isTerminated()) 161 tryShutdownAndTerminate(true); 162 return List.of(); 163 } 164 165 @Override 166 public boolean isShutdown() { 167 return state >= SHUTDOWN; 168 } 169 170 @Override 171 public boolean isTerminated() { 172 return state >= TERMINATED; 173 } 174 175 @Override 176 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 177 Objects.requireNonNull(unit); 178 if (isTerminated()) { 179 return true; 180 } else { 181 return terminationSignal.await(timeout, unit); 182 } 183 } 184 185 /** 186 * Waits for executor to terminate. 187 */ 188 private void awaitTermination() { 189 boolean terminated = isTerminated(); 190 if (!terminated) { 191 tryShutdownAndTerminate(false); 192 boolean interrupted = false; 193 while (!terminated) { 194 try { 195 terminated = awaitTermination(1L, TimeUnit.DAYS); 196 } catch (InterruptedException e) { 197 if (!interrupted) { 198 tryShutdownAndTerminate(true); 199 interrupted = true; 200 } 201 } 202 } 203 if (interrupted) { 204 Thread.currentThread().interrupt(); 205 } 206 } 207 } 208 209 @Override 210 public void close() { 211 checkPermission(); 212 awaitTermination(); 213 } 214 215 /** 216 * Creates a thread to run the given task. 217 */ 218 private Thread newThread(Runnable task) { 219 Thread thread = factory.newThread(task); 220 if (thread == null) 221 throw new RejectedExecutionException(); 222 return thread; 223 } 224 225 /** 226 * Notify the executor that the task executed by the given thread is complete. 227 * If the executor has been shutdown then this method will attempt to terminate 228 * the executor. 229 */ 230 private void taskComplete(Thread thread) { 231 boolean removed = threads.remove(thread); 232 assert removed; 233 if (state == SHUTDOWN) { 234 tryTerminate(); 235 } 236 } 237 238 /** 239 * Adds a thread to the set of threads and starts it. 240 * @throws RejectedExecutionException 241 */ 242 private void start(Thread thread) { 243 assert thread.getState() == Thread.State.NEW; 244 threads.add(thread); 245 246 boolean started = false; 247 try { 248 if (state == RUNNING) { 249 JLA.start(thread, this); 250 started = true; 251 } 252 } finally { 253 if (!started) { 254 taskComplete(thread); 255 } 256 } 257 258 // throw REE if thread not started and no exception thrown 259 if (!started) { 260 throw new RejectedExecutionException(); 261 } 262 } 263 264 /** 265 * Starts a thread to execute the given task. 266 * @throws RejectedExecutionException 267 */ 268 private Thread start(Runnable task) { 269 Objects.requireNonNull(task); 270 ensureNotShutdown(); 271 Thread thread = newThread(new TaskRunner(this, task)); 272 start(thread); 273 return thread; 274 } 275 276 @Override 277 public void execute(Runnable task) { 278 start(task); 279 } 280 281 @Override 282 public <T> Future<T> submit(Callable<T> task) { 283 Objects.requireNonNull(task); 284 ensureNotShutdown(); 285 var future = new ThreadBoundFuture<>(this, task); 286 Thread thread = future.thread(); 287 start(thread); 288 return future; 289 } 290 291 @Override 292 public Future<?> submit(Runnable task) { 293 return submit(Executors.callable(task)); 294 } 295 296 @Override 297 public <T> Future<T> submit(Runnable task, T result) { 298 return submit(Executors.callable(task, result)); 299 } 300 301 /** 302 * Runs a task and notifies the executor when it completes. 303 */ 304 private static class TaskRunner implements Runnable { 305 final ThreadPerTaskExecutor executor; 306 final Runnable task; 307 TaskRunner(ThreadPerTaskExecutor executor, Runnable task) { 308 this.executor = executor; 309 this.task = task; 310 } 311 @Override 312 public void run() { 313 try { 314 task.run(); 315 } finally { 316 executor.taskComplete(Thread.currentThread()); 317 } 318 } 319 } 320 321 /** 322 * A Future for a task that runs in its own thread. The thread is 323 * created (but not started) when the Future is created. The thread 324 * is interrupted when the future is cancelled. The executor is 325 * notified when the task completes. 326 */ 327 private static class ThreadBoundFuture<T> 328 extends CompletableFuture<T> implements Runnable { 329 330 final ThreadPerTaskExecutor executor; 331 final Callable<T> task; 332 final Thread thread; 333 334 ThreadBoundFuture(ThreadPerTaskExecutor executor, Callable<T> task) { 335 this.executor = executor; 336 this.task = task; 337 this.thread = executor.newThread(this); 338 } 339 340 Thread thread() { 341 return thread; 342 } 343 344 @Override 345 public void run() { 346 if (Thread.currentThread() != thread) { 347 // should not happen except where something casts this object 348 // to a Runnable and invokes the run method. 349 throw new WrongThreadException(); 350 } 351 try { 352 T result = task.call(); 353 complete(result); 354 } catch (Throwable e) { 355 completeExceptionally(e); 356 } finally { 357 executor.taskComplete(thread); 358 } 359 } 360 361 @Override 362 public boolean cancel(boolean mayInterruptIfRunning) { 363 boolean cancelled = super.cancel(mayInterruptIfRunning); 364 if (cancelled && mayInterruptIfRunning) 365 thread.interrupt(); 366 return cancelled; 367 } 368 } 369 370 @Override 371 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 372 throws InterruptedException { 373 374 Objects.requireNonNull(tasks); 375 List<Future<T>> futures = new ArrayList<>(); 376 int j = 0; 377 try { 378 for (Callable<T> t : tasks) { 379 Future<T> f = submit(t); 380 futures.add(f); 381 } 382 for (int size = futures.size(); j < size; j++) { 383 Future<T> f = futures.get(j); 384 if (!f.isDone()) { 385 try { 386 f.get(); 387 } catch (ExecutionException | CancellationException ignore) { } 388 } 389 } 390 return futures; 391 } finally { 392 cancelAll(futures, j); 393 } 394 } 395 396 @Override 397 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 398 long timeout, TimeUnit unit) 399 throws InterruptedException { 400 401 Objects.requireNonNull(tasks); 402 long deadline = System.nanoTime() + unit.toNanos(timeout); 403 List<Future<T>> futures = new ArrayList<>(); 404 int j = 0; 405 try { 406 for (Callable<T> t : tasks) { 407 Future<T> f = submit(t); 408 futures.add(f); 409 } 410 for (int size = futures.size(); j < size; j++) { 411 Future<T> f = futures.get(j); 412 if (!f.isDone()) { 413 try { 414 f.get(deadline - System.nanoTime(), NANOSECONDS); 415 } catch (TimeoutException e) { 416 break; 417 } catch (ExecutionException | CancellationException ignore) { } 418 } 419 } 420 return futures; 421 } finally { 422 cancelAll(futures, j); 423 } 424 } 425 426 private <T> void cancelAll(List<Future<T>> futures, int j) { 427 for (int size = futures.size(); j < size; j++) 428 futures.get(j).cancel(true); 429 } 430 431 @Override 432 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 433 throws InterruptedException, ExecutionException { 434 try { 435 return invokeAny(tasks, false, 0, null); 436 } catch (TimeoutException e) { 437 // should not happen 438 throw new InternalError(e); 439 } 440 } 441 442 @Override 443 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 444 throws InterruptedException, ExecutionException, TimeoutException { 445 Objects.requireNonNull(unit); 446 return invokeAny(tasks, true, timeout, unit); 447 } 448 449 private <T> T invokeAny(Collection<? extends Callable<T>> tasks, 450 boolean timed, 451 long timeout, 452 TimeUnit unit) 453 throws InterruptedException, ExecutionException, TimeoutException { 454 455 int size = tasks.size(); 456 if (size == 0) { 457 throw new IllegalArgumentException("'tasks' is empty"); 458 } 459 460 var holder = new AnyResultHolder<T>(Thread.currentThread()); 461 var threadList = new ArrayList<Thread>(size); 462 long nanos = (timed) ? unit.toNanos(timeout) : 0; 463 long startNanos = (timed) ? System.nanoTime() : 0; 464 465 try { 466 int count = 0; 467 Iterator<? extends Callable<T>> iterator = tasks.iterator(); 468 while (count < size && iterator.hasNext()) { 469 Callable<T> task = iterator.next(); 470 Objects.requireNonNull(task); 471 Thread thread = start(() -> { 472 try { 473 T r = task.call(); 474 holder.complete(r); 475 } catch (Throwable e) { 476 holder.completeExceptionally(e); 477 } 478 }); 479 threadList.add(thread); 480 count++; 481 } 482 if (count == 0) { 483 throw new IllegalArgumentException("'tasks' is empty"); 484 } 485 486 if (Thread.interrupted()) 487 throw new InterruptedException(); 488 T result = holder.result(); 489 while (result == null && holder.exceptionCount() < count) { 490 if (timed) { 491 long remainingNanos = nanos - (System.nanoTime() - startNanos); 492 if (remainingNanos <= 0) 493 throw new TimeoutException(); 494 LockSupport.parkNanos(remainingNanos); 495 } else { 496 LockSupport.park(); 497 } 498 if (Thread.interrupted()) 499 throw new InterruptedException(); 500 result = holder.result(); 501 } 502 503 if (result != null) { 504 return (result != AnyResultHolder.NULL) ? result : null; 505 } else { 506 throw new ExecutionException(holder.firstException()); 507 } 508 509 } finally { 510 // interrupt any threads that are still running 511 for (Thread t : threadList) { 512 if (t.isAlive()) { 513 t.interrupt(); 514 } 515 } 516 } 517 } 518 519 /** 520 * An object for use by invokeAny to hold the result of the first task 521 * to complete normally and/or the first exception thrown. The object 522 * also maintains a count of the number of tasks that attempted to 523 * complete up to when the first tasks completes normally. 524 */ 525 private static class AnyResultHolder<T> { 526 private static final VarHandle RESULT; 527 private static final VarHandle EXCEPTION; 528 private static final VarHandle EXCEPTION_COUNT; 529 static { 530 try { 531 MethodHandles.Lookup l = MethodHandles.lookup(); 532 RESULT = l.findVarHandle(AnyResultHolder.class, "result", Object.class); 533 EXCEPTION = l.findVarHandle(AnyResultHolder.class, "exception", Throwable.class); 534 EXCEPTION_COUNT = l.findVarHandle(AnyResultHolder.class, "exceptionCount", int.class); 535 } catch (Exception e) { 536 throw new InternalError(e); 537 } 538 } 539 private static final Object NULL = new Object(); 540 541 private final Thread owner; 542 private volatile T result; 543 private volatile Throwable exception; 544 private volatile int exceptionCount; 545 546 AnyResultHolder(Thread owner) { 547 this.owner = owner; 548 } 549 550 /** 551 * Complete with the given result if not already completed. The winner 552 * unparks the owner thread. 553 */ 554 void complete(T value) { 555 @SuppressWarnings("unchecked") 556 T v = (value != null) ? value : (T) NULL; 557 if (result == null && RESULT.compareAndSet(this, null, v)) { 558 LockSupport.unpark(owner); 559 } 560 } 561 562 /** 563 * Complete with the given exception. If the result is not already 564 * set then it unparks the owner thread. 565 */ 566 void completeExceptionally(Throwable exc) { 567 if (result == null) { 568 if (exception == null) 569 EXCEPTION.compareAndSet(this, null, exc); 570 EXCEPTION_COUNT.getAndAdd(this, 1); 571 LockSupport.unpark(owner); 572 } 573 } 574 575 /** 576 * Returns non-null if a task completed successfully. The result is 577 * NULL if completed with null. 578 */ 579 T result() { 580 return result; 581 } 582 583 /** 584 * Returns the first exception thrown if recorded by this object. 585 * 586 * @apiNote The result() method should be used to test if there is 587 * a result before invoking the exception method. 588 */ 589 Throwable firstException() { 590 return exception; 591 } 592 593 /** 594 * Returns the number of tasks that terminated with an exception before 595 * a task completed normally. 596 */ 597 int exceptionCount() { 598 return exceptionCount; 599 } 600 } 601 }