1 /* 2 * Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.concurrent; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.security.Permission; 30 import java.util.ArrayList; 31 import java.util.Collection; 32 import java.util.Iterator; 33 import java.util.List; 34 import java.util.Objects; 35 import java.util.Set; 36 import java.util.concurrent.locks.LockSupport; 37 import java.util.stream.Stream; 38 import static java.util.concurrent.TimeUnit.NANOSECONDS; 39 import jdk.internal.access.JavaLangAccess; 40 import jdk.internal.access.SharedSecrets; 41 import jdk.internal.vm.ThreadContainer; 42 import jdk.internal.vm.ThreadContainers; 43 44 /** 45 * An ExecutorService that starts a new thread for each task. The number of 46 * threads is unbounded. 47 */ 48 class ThreadPerTaskExecutor extends ThreadContainer implements ExecutorService { 49 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 50 private static final Permission MODIFY_THREAD = new RuntimePermission("modifyThread"); 51 private static final VarHandle STATE; 52 static { 53 try { 54 MethodHandles.Lookup l = MethodHandles.lookup(); 55 STATE = l.findVarHandle(ThreadPerTaskExecutor.class, "state", int.class); 56 } catch (Exception e) { 57 throw new InternalError(e); 58 } 59 } 60 61 private final ThreadFactory factory; 62 private final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 63 private final CountDownLatch terminationSignal = new CountDownLatch(1); 64 65 // states: RUNNING -> SHUTDOWN -> TERMINATED 66 private static final int RUNNING = 0; 67 private static final int SHUTDOWN = 1; 68 private static final int TERMINATED = 2; 69 private volatile int state; 70 71 // the key for this container in the registry 72 private volatile Object key; 73 74 private ThreadPerTaskExecutor(ThreadFactory factory) { 75 super(/*shared*/ true); 76 this.factory = Objects.requireNonNull(factory); 77 } 78 79 /** 80 * Creates a thread-per-task executor that creates threads using the given factory. 81 */ 82 static ThreadPerTaskExecutor create(ThreadFactory factory) { 83 var executor = new ThreadPerTaskExecutor(factory); 84 // register it to allow discovery by serviceability tools 85 executor.key = ThreadContainers.registerContainer(executor); 86 return executor; 87 } 88 89 /** 90 * Throws SecurityException if there is a security manager set and it denies 91 * RuntimePermission("modifyThread"). 92 */ 93 @SuppressWarnings("removal") 94 private void checkPermission() { 95 SecurityManager sm = System.getSecurityManager(); 96 if (sm != null) { 97 sm.checkPermission(MODIFY_THREAD); 98 } 99 } 100 101 /** 102 * Throws RejectedExecutionException if the executor has been shutdown. 103 */ 104 private void ensureNotShutdown() { 105 if (state >= SHUTDOWN) { 106 // shutdown or terminated 107 throw new RejectedExecutionException(); 108 } 109 } 110 111 /** 112 * Attempts to terminate if already shutdown. If this method terminates the 113 * executor then it signals any threads that are waiting for termination. 114 */ 115 private void tryTerminate() { 116 assert state >= SHUTDOWN; 117 if (threads.isEmpty() 118 && STATE.compareAndSet(this, SHUTDOWN, TERMINATED)) { 119 120 // signal waiters 121 terminationSignal.countDown(); 122 123 // remove from registry 124 ThreadContainers.deregisterContainer(key); 125 } 126 } 127 128 /** 129 * Attempts to shutdown and terminate the executor. 130 * If interruptThreads is true then all running threads are interrupted. 131 */ 132 private void tryShutdownAndTerminate(boolean interruptThreads) { 133 if (STATE.compareAndSet(this, RUNNING, SHUTDOWN)) 134 tryTerminate(); 135 if (interruptThreads) { 136 threads.forEach(Thread::interrupt); 137 } 138 } 139 140 @Override 141 public Stream<Thread> threads() { 142 return threads.stream().filter(Thread::isAlive); 143 } 144 145 @Override 146 public long threadCount() { 147 return threads.size(); 148 } 149 150 @Override 151 public void shutdown() { 152 checkPermission(); 153 if (!isShutdown()) 154 tryShutdownAndTerminate(false); 155 } 156 157 @Override 158 public List<Runnable> shutdownNow() { 159 checkPermission(); 160 if (!isTerminated()) 161 tryShutdownAndTerminate(true); 162 return List.of(); 163 } 164 165 @Override 166 public boolean isShutdown() { 167 return state >= SHUTDOWN; 168 } 169 170 @Override 171 public boolean isTerminated() { 172 return state >= TERMINATED; 173 } 174 175 @Override 176 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 177 Objects.requireNonNull(unit); 178 if (isTerminated()) { 179 return true; 180 } else { 181 return terminationSignal.await(timeout, unit); 182 } 183 } 184 185 /** 186 * Waits for executor to terminate. 187 */ 188 private void awaitTermination() { 189 boolean terminated = isTerminated(); 190 if (!terminated) { 191 tryShutdownAndTerminate(false); 192 boolean interrupted = false; 193 while (!terminated) { 194 try { 195 terminated = awaitTermination(1L, TimeUnit.DAYS); 196 } catch (InterruptedException e) { 197 if (!interrupted) { 198 tryShutdownAndTerminate(true); 199 interrupted = true; 200 } 201 } 202 } 203 if (interrupted) { 204 Thread.currentThread().interrupt(); 205 } 206 } 207 } 208 209 @Override 210 public void close() { 211 checkPermission(); 212 awaitTermination(); 213 } 214 215 /** 216 * Creates a thread to run the given task. 217 */ 218 private Thread newThread(Runnable task) { 219 Thread thread = factory.newThread(task); 220 if (thread == null) 221 throw new RejectedExecutionException(); 222 return thread; 223 } 224 225 /** 226 * Notify the executor that the task executed by the given thread is complete. 227 * If the executor has been shutdown then this method will attempt to terminate 228 * the executor. 229 */ 230 private void taskComplete(Thread thread) { 231 boolean removed = threads.remove(thread); 232 assert removed; 233 if (state == SHUTDOWN) { 234 tryTerminate(); 235 } 236 } 237 238 /** 239 * Adds a thread to the set of threads and starts it. 240 * @throws RejectedExecutionException 241 */ 242 private void start(Thread thread) { 243 assert thread.getState() == Thread.State.NEW; 244 threads.add(thread); 245 246 boolean started = false; 247 try { 248 if (state == RUNNING) { 249 JLA.start(thread, this); 250 started = true; 251 } 252 } finally { 253 if (!started) { 254 taskComplete(thread); 255 } 256 } 257 258 // throw REE if thread not started and no exception thrown 259 if (!started) { 260 throw new RejectedExecutionException(); 261 } 262 } 263 264 /** 265 * Starts a thread to execute the given task. 266 * @throws RejectedExecutionException 267 */ 268 private Thread start(Runnable task) { 269 Objects.requireNonNull(task); 270 ensureNotShutdown(); 271 Thread thread = newThread(new TaskRunner(this, task)); 272 start(thread); 273 return thread; 274 } 275 276 @Override 277 public void execute(Runnable task) { 278 start(task); 279 } 280 281 @Override 282 public <T> Future<T> submit(Callable<T> task) { 283 Objects.requireNonNull(task); 284 ensureNotShutdown(); 285 var future = new ThreadBoundFuture<>(this, task); 286 Thread thread = future.thread(); 287 start(thread); 288 return future; 289 } 290 291 @Override 292 public Future<?> submit(Runnable task) { 293 return submit(Executors.callable(task)); 294 } 295 296 @Override 297 public <T> Future<T> submit(Runnable task, T result) { 298 return submit(Executors.callable(task, result)); 299 } 300 301 /** 302 * Runs a task and notifies the executor when it completes. 303 */ 304 private static class TaskRunner implements Runnable { 305 final ThreadPerTaskExecutor executor; 306 final Runnable task; 307 TaskRunner(ThreadPerTaskExecutor executor, Runnable task) { 308 this.executor = executor; 309 this.task = task; 310 } 311 @Override 312 public void run() { 313 try { 314 task.run(); 315 } finally { 316 executor.taskComplete(Thread.currentThread()); 317 } 318 } 319 } 320 321 /** 322 * A Future for a task that runs in its own thread. The thread is 323 * created (but not started) when the Future is created. The thread 324 * is interrupted when the future is cancelled. The executor is 325 * notified when the task completes. 326 */ 327 private static class ThreadBoundFuture<T> 328 extends FutureTask<T> implements Runnable { 329 330 final ThreadPerTaskExecutor executor; 331 final Thread thread; 332 333 ThreadBoundFuture(ThreadPerTaskExecutor executor, Callable<T> task) { 334 super(task); 335 this.executor = executor; 336 this.thread = executor.newThread(this); 337 } 338 339 Thread thread() { 340 return thread; 341 } 342 343 @Override 344 protected void done() { 345 executor.taskComplete(thread); 346 } 347 } 348 349 @Override 350 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 351 throws InterruptedException { 352 353 Objects.requireNonNull(tasks); 354 List<Future<T>> futures = new ArrayList<>(); 355 int j = 0; 356 try { 357 for (Callable<T> t : tasks) { 358 Future<T> f = submit(t); 359 futures.add(f); 360 } 361 for (int size = futures.size(); j < size; j++) { 362 Future<T> f = futures.get(j); 363 if (!f.isDone()) { 364 try { 365 f.get(); 366 } catch (ExecutionException | CancellationException ignore) { } 367 } 368 } 369 return futures; 370 } finally { 371 cancelAll(futures, j); 372 } 373 } 374 375 @Override 376 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 377 long timeout, TimeUnit unit) 378 throws InterruptedException { 379 380 Objects.requireNonNull(tasks); 381 long deadline = System.nanoTime() + unit.toNanos(timeout); 382 List<Future<T>> futures = new ArrayList<>(); 383 int j = 0; 384 try { 385 for (Callable<T> t : tasks) { 386 Future<T> f = submit(t); 387 futures.add(f); 388 } 389 for (int size = futures.size(); j < size; j++) { 390 Future<T> f = futures.get(j); 391 if (!f.isDone()) { 392 try { 393 f.get(deadline - System.nanoTime(), NANOSECONDS); 394 } catch (TimeoutException e) { 395 break; 396 } catch (ExecutionException | CancellationException ignore) { } 397 } 398 } 399 return futures; 400 } finally { 401 cancelAll(futures, j); 402 } 403 } 404 405 private <T> void cancelAll(List<Future<T>> futures, int j) { 406 for (int size = futures.size(); j < size; j++) 407 futures.get(j).cancel(true); 408 } 409 410 @Override 411 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 412 throws InterruptedException, ExecutionException { 413 try { 414 return invokeAny(tasks, false, 0, null); 415 } catch (TimeoutException e) { 416 // should not happen 417 throw new InternalError(e); 418 } 419 } 420 421 @Override 422 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 423 throws InterruptedException, ExecutionException, TimeoutException { 424 Objects.requireNonNull(unit); 425 return invokeAny(tasks, true, timeout, unit); 426 } 427 428 private <T> T invokeAny(Collection<? extends Callable<T>> tasks, 429 boolean timed, 430 long timeout, 431 TimeUnit unit) 432 throws InterruptedException, ExecutionException, TimeoutException { 433 434 int size = tasks.size(); 435 if (size == 0) { 436 throw new IllegalArgumentException("'tasks' is empty"); 437 } 438 439 var holder = new AnyResultHolder<T>(Thread.currentThread()); 440 var threadList = new ArrayList<Thread>(size); 441 long nanos = (timed) ? unit.toNanos(timeout) : 0; 442 long startNanos = (timed) ? System.nanoTime() : 0; 443 444 try { 445 int count = 0; 446 Iterator<? extends Callable<T>> iterator = tasks.iterator(); 447 while (count < size && iterator.hasNext()) { 448 Callable<T> task = iterator.next(); 449 Objects.requireNonNull(task); 450 Thread thread = start(() -> { 451 try { 452 T r = task.call(); 453 holder.complete(r); 454 } catch (Throwable e) { 455 holder.completeExceptionally(e); 456 } 457 }); 458 threadList.add(thread); 459 count++; 460 } 461 if (count == 0) { 462 throw new IllegalArgumentException("'tasks' is empty"); 463 } 464 465 if (Thread.interrupted()) 466 throw new InterruptedException(); 467 T result = holder.result(); 468 while (result == null && holder.exceptionCount() < count) { 469 if (timed) { 470 long remainingNanos = nanos - (System.nanoTime() - startNanos); 471 if (remainingNanos <= 0) 472 throw new TimeoutException(); 473 LockSupport.parkNanos(remainingNanos); 474 } else { 475 LockSupport.park(); 476 } 477 if (Thread.interrupted()) 478 throw new InterruptedException(); 479 result = holder.result(); 480 } 481 482 if (result != null) { 483 return (result != AnyResultHolder.NULL) ? result : null; 484 } else { 485 throw new ExecutionException(holder.firstException()); 486 } 487 488 } finally { 489 // interrupt any threads that are still running 490 for (Thread t : threadList) { 491 if (t.isAlive()) { 492 t.interrupt(); 493 } 494 } 495 } 496 } 497 498 /** 499 * An object for use by invokeAny to hold the result of the first task 500 * to complete normally and/or the first exception thrown. The object 501 * also maintains a count of the number of tasks that attempted to 502 * complete up to when the first tasks completes normally. 503 */ 504 private static class AnyResultHolder<T> { 505 private static final VarHandle RESULT; 506 private static final VarHandle EXCEPTION; 507 private static final VarHandle EXCEPTION_COUNT; 508 static { 509 try { 510 MethodHandles.Lookup l = MethodHandles.lookup(); 511 RESULT = l.findVarHandle(AnyResultHolder.class, "result", Object.class); 512 EXCEPTION = l.findVarHandle(AnyResultHolder.class, "exception", Throwable.class); 513 EXCEPTION_COUNT = l.findVarHandle(AnyResultHolder.class, "exceptionCount", int.class); 514 } catch (Exception e) { 515 throw new InternalError(e); 516 } 517 } 518 private static final Object NULL = new Object(); 519 520 private final Thread owner; 521 private volatile T result; 522 private volatile Throwable exception; 523 private volatile int exceptionCount; 524 525 AnyResultHolder(Thread owner) { 526 this.owner = owner; 527 } 528 529 /** 530 * Complete with the given result if not already completed. The winner 531 * unparks the owner thread. 532 */ 533 void complete(T value) { 534 @SuppressWarnings("unchecked") 535 T v = (value != null) ? value : (T) NULL; 536 if (result == null && RESULT.compareAndSet(this, null, v)) { 537 LockSupport.unpark(owner); 538 } 539 } 540 541 /** 542 * Complete with the given exception. If the result is not already 543 * set then it unparks the owner thread. 544 */ 545 void completeExceptionally(Throwable exc) { 546 if (result == null) { 547 if (exception == null) 548 EXCEPTION.compareAndSet(this, null, exc); 549 EXCEPTION_COUNT.getAndAdd(this, 1); 550 LockSupport.unpark(owner); 551 } 552 } 553 554 /** 555 * Returns non-null if a task completed successfully. The result is 556 * NULL if completed with null. 557 */ 558 T result() { 559 return result; 560 } 561 562 /** 563 * Returns the first exception thrown if recorded by this object. 564 * 565 * @apiNote The result() method should be used to test if there is 566 * a result before invoking the exception method. 567 */ 568 Throwable firstException() { 569 return exception; 570 } 571 572 /** 573 * Returns the number of tasks that terminated with an exception before 574 * a task completed normally. 575 */ 576 int exceptionCount() { 577 return exceptionCount; 578 } 579 } 580 }