1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.internal.misc; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.time.Duration; 30 import java.util.Objects; 31 import java.util.Set; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.TimeoutException; 34 import java.util.concurrent.StructureViolationException; 35 import java.util.concurrent.locks.LockSupport; 36 import java.util.stream.Stream; 37 import jdk.internal.access.JavaLangAccess; 38 import jdk.internal.access.SharedSecrets; 39 import jdk.internal.invoke.MhUtil; 40 import jdk.internal.vm.ScopedValueContainer; 41 import jdk.internal.vm.ThreadContainer; 42 import jdk.internal.vm.ThreadContainers; 43 import static java.util.concurrent.TimeUnit.NANOSECONDS; 44 45 /** 46 * A grouping of threads that typically run closely related tasks. Threads started 47 * in a flock remain in the flock until they terminate. 48 * 49 * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock, 50 * the {@link #start(Thread) start} method to start a thread in the flock, and the 51 * {@link #close() close} method to close the flock. The {@code close} waits for all 52 * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used 53 * to wait for all threads to finish without closing the flock. The {@link #wakeup()} 54 * method will cause {@code awaitAll} method to complete early, which can be used to 55 * support cancellation in higher-level APIs. ThreadFlock also defines the {@link 56 * #shutdown() shutdown} method to prevent new threads from starting while allowing 57 * existing threads in the flock to continue. 58 * 59 * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The 60 * thread that opens a new flock is the {@link #owner() owner}. The owner closes the 61 * flock when done, failure to do so may result in a resource leak. The {@code open} 62 * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock 63 * while a <em>nested</em> flock is open. A ThreadFlock can be used with the 64 * try-with-resources construct if required but more likely, the close method of a 65 * higher-level API that implements {@link AutoCloseable} will close the flock. 66 * 67 * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in 68 * flock "A" may itself open a new flock "B", implicitly forming a tree where flock 69 * "A" is the parent of flock "B". When nested, say where thread {@code T} opens 70 * flock "B" and then invokes a method that opens flock "C", then the enclosing 71 * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does 72 * not define APIs that exposes the tree structure. It does define the {@link 73 * #containsThread(Thread) containsThread} method to test if a flock contains a 74 * thread, a test that is equivalent to testing membership of flocks in the tree. 75 * The {@code start} and {@code shutdown} methods are confined to the flock 76 * owner or threads contained in the flock. The confinement check is equivalent to 77 * invoking the {@code containsThread} method to test if the caller is contained 78 * in the flock. 79 * 80 * <p> Unless otherwise specified, passing a {@code null} argument to a method 81 * in this class will cause a {@link NullPointerException} to be thrown. 82 */ 83 public class ThreadFlock implements AutoCloseable { 84 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 85 private static final VarHandle THREAD_COUNT; 86 private static final VarHandle PERMIT; 87 static { 88 MethodHandles.Lookup l = MethodHandles.lookup(); 89 THREAD_COUNT = MhUtil.findVarHandle(l, "threadCount", int.class); 90 PERMIT = MhUtil.findVarHandle(l, "permit", boolean.class); 91 } 92 93 private final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 94 95 // thread count, need to re-examine contention once API is stable 96 private volatile int threadCount; 97 98 private final String name; 99 private final ScopedValueContainer.BindingsSnapshot scopedValueBindings; 100 private final ThreadContainerImpl container; // encapsulate for now 101 102 // state 103 private volatile boolean shutdown; 104 private volatile boolean closed; 105 106 // set by wakeup, cleared by awaitAll 107 private volatile boolean permit; 108 109 ThreadFlock(String name) { 110 this.name = name; 111 this.scopedValueBindings = ScopedValueContainer.captureBindings(); 112 this.container = new ThreadContainerImpl(this); 113 } 114 115 private long threadCount() { 116 return threadCount; 117 } 118 119 private ScopedValueContainer.BindingsSnapshot scopedValueBindings() { 120 return scopedValueBindings; 121 } 122 123 private void incrementThreadCount() { 124 THREAD_COUNT.getAndAdd(this, 1); 125 } 126 127 /** 128 * Decrement the thread count. Unpark the owner if the count goes to zero. 129 */ 130 private void decrementThreadCount() { 131 int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1; 132 133 // signal owner when the count goes to zero 134 if (count == 0) { 135 LockSupport.unpark(owner()); 136 } 137 } 138 139 /** 140 * Invoked on the parent thread when starting {@code thread}. 141 */ 142 private void onStart(Thread thread) { 143 incrementThreadCount(); 144 boolean done = false; 145 try { 146 boolean added = threads.add(thread); 147 assert added; 148 if (shutdown) 149 throw new IllegalStateException("Shutdown"); 150 done = true; 151 } finally { 152 if (!done) { 153 threads.remove(thread); 154 decrementThreadCount(); 155 } 156 } 157 } 158 159 /** 160 * Invoked on the terminating thread or the parent thread when starting 161 * {@code thread} failed. This method is only called if onStart succeeded. 162 */ 163 private void onExit(Thread thread) { 164 boolean removed = threads.remove(thread); 165 assert removed; 166 decrementThreadCount(); 167 } 168 169 /** 170 * Clear wakeup permit. 171 */ 172 private void clearPermit() { 173 if (permit) 174 permit = false; 175 } 176 177 /** 178 * Sets the wakeup permit to the given value, returning the previous value. 179 */ 180 private boolean getAndSetPermit(boolean newValue) { 181 if (permit != newValue) { 182 return (boolean) PERMIT.getAndSet(this, newValue); 183 } else { 184 return newValue; 185 } 186 } 187 188 /** 189 * Throws WrongThreadException if the current thread is not the owner. 190 */ 191 private void ensureOwner() { 192 if (Thread.currentThread() != owner()) 193 throw new WrongThreadException("Current thread not owner"); 194 } 195 196 /** 197 * Throws WrongThreadException if the current thread is not the owner 198 * or a thread contained in the flock. 199 */ 200 private void ensureOwnerOrContainsThread() { 201 Thread currentThread = Thread.currentThread(); 202 if (currentThread != owner() && !containsThread(currentThread)) 203 throw new WrongThreadException("Current thread not owner or thread in flock"); 204 } 205 206 /** 207 * Opens a new thread flock. The flock is owned by the current thread. It can be 208 * named to aid debugging. 209 * 210 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value} 211 * bindings for inheritance by threads created in the flock. 212 * 213 * <p> For the purposes of containment, monitoring, and debugging, the parent 214 * of the new flock is determined as follows: 215 * <ul> 216 * <li> If the current thread is the owner of open flocks then the most recently 217 * created, and open, flock is the parent of the new flock. In other words, the 218 * <em>enclosing flock</em> is the parent. 219 * <li> If the current thread is not the owner of any open flocks then the 220 * parent of the new flock is the current thread's flock. If the current thread 221 * was not started in a flock then the new flock does not have a parent. 222 * </ul> 223 * 224 * @param name the name of the flock, can be null 225 * @return a new thread flock 226 */ 227 public static ThreadFlock open(String name) { 228 var flock = new ThreadFlock(name); 229 flock.container.push(); 230 return flock; 231 } 232 233 /** 234 * {@return the name of this flock or {@code null} if unnamed} 235 */ 236 public String name() { 237 return name; 238 } 239 240 /** 241 * {@return the owner of this flock} 242 */ 243 public Thread owner() { 244 return container.owner(); 245 } 246 247 /** 248 * Starts the given unstarted thread in this flock. 249 * 250 * <p> The thread is started with the scoped value bindings that were captured 251 * when opening the flock. The bindings must match the current thread's bindings. 252 * 253 * <p> This method may only be invoked by the flock owner or threads {@linkplain 254 * #containsThread(Thread) contained} in the flock. 255 * 256 * @param thread the unstarted thread 257 * @return the thread, started 258 * @throws IllegalStateException if this flock is shutdown or closed 259 * @throws IllegalThreadStateException if the given thread was already started 260 * @throws WrongThreadException if the current thread is not the owner or a thread 261 * contained in the flock 262 * @throws StructureViolationException if the current scoped value bindings are 263 * not the same as when the flock was created 264 */ 265 public Thread start(Thread thread) { 266 ensureOwnerOrContainsThread(); 267 JLA.start(thread, container); 268 return thread; 269 } 270 271 /** 272 * Shutdown this flock so that no new threads can be started, existing threads 273 * in the flock will continue to run. This method is a no-op if the flock is 274 * already shutdown or closed. 275 * 276 * <p> This method may only be invoked by the flock owner or threads {@linkplain 277 * #containsThread(Thread) contained} in the flock. 278 * 279 * @throws WrongThreadException if the current thread is not the owner or a thread 280 * contained in the flock 281 */ 282 public void shutdown() { 283 ensureOwnerOrContainsThread(); 284 if (!shutdown) { 285 shutdown = true; 286 } 287 } 288 289 /** 290 * Wait for all threads in the flock to finish executing their tasks. This method 291 * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked, 292 * or the current thread is interrupted. 293 * 294 * <p> This method may only be invoked by the flock owner. The method trivially 295 * returns true when the flock is closed. 296 * 297 * <p> This method clears the effect of any previous invocations of the 298 * {@code wakeup} method. 299 * 300 * @return true if there are no threads in the flock, false if wakeup was invoked 301 * and there are unfinished threads 302 * @throws InterruptedException if interrupted while waiting 303 * @throws WrongThreadException if invoked by a thread that is not the owner 304 */ 305 public boolean awaitAll() throws InterruptedException { 306 ensureOwner(); 307 308 if (getAndSetPermit(false)) 309 return (threadCount == 0); 310 311 while (threadCount > 0 && !permit) { 312 LockSupport.park(); 313 if (Thread.interrupted()) 314 throw new InterruptedException(); 315 } 316 clearPermit(); 317 return (threadCount == 0); 318 } 319 320 /** 321 * Wait, up to the given waiting timeout, for all threads in the flock to finish 322 * executing their tasks. This method waits until all threads finish, the {@link 323 * #wakeup() wakeup} method is invoked, the current thread is interrupted, or 324 * the timeout expires. 325 * 326 * <p> This method may only be invoked by the flock owner. The method trivially 327 * returns true when the flock is closed. 328 * 329 * <p> This method clears the effect of any previous invocations of the {@code wakeup} 330 * method. 331 * 332 * @param timeout the maximum duration to wait 333 * @return true if there are no threads in the flock, false if wakeup was invoked 334 * and there are unfinished threads 335 * @throws InterruptedException if interrupted while waiting 336 * @throws TimeoutException if the wait timed out 337 * @throws WrongThreadException if invoked by a thread that is not the owner 338 */ 339 public boolean awaitAll(Duration timeout) 340 throws InterruptedException, TimeoutException { 341 Objects.requireNonNull(timeout); 342 ensureOwner(); 343 344 if (getAndSetPermit(false)) 345 return (threadCount == 0); 346 347 long startNanos = System.nanoTime(); 348 long nanos = NANOSECONDS.convert(timeout); 349 long remainingNanos = nanos; 350 while (threadCount > 0 && remainingNanos > 0 && !permit) { 351 LockSupport.parkNanos(remainingNanos); 352 if (Thread.interrupted()) 353 throw new InterruptedException(); 354 remainingNanos = nanos - (System.nanoTime() - startNanos); 355 } 356 357 boolean done = (threadCount == 0); 358 if (!done && remainingNanos <= 0 && !permit) { 359 throw new TimeoutException(); 360 } else { 361 clearPermit(); 362 return done; 363 } 364 } 365 366 /** 367 * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the 368 * {@linkplain #owner() owner} to return immediately. 369 * 370 * <p> If the owner is blocked in {@code awaitAll} then it will return immediately. 371 * If the owner is not blocked in {@code awaitAll} then its next call to wait 372 * will return immediately. The method does nothing when the flock is closed. 373 * 374 * @throws WrongThreadException if the current thread is not the owner or a thread 375 * contained in the flock 376 */ 377 public void wakeup() { 378 ensureOwnerOrContainsThread(); 379 if (!getAndSetPermit(true) && Thread.currentThread() != owner()) { 380 LockSupport.unpark(owner()); 381 } 382 } 383 384 /** 385 * Closes this flock. This method first shuts down the flock to prevent 386 * new threads from starting. It then waits for the threads in the flock 387 * to finish executing their tasks. In other words, this method blocks until 388 * all threads in the flock finish. 389 * 390 * <p> This method may only be invoked by the flock owner. 391 * 392 * <p> If interrupted then this method continues to wait until all threads 393 * finish, before completing with the interrupt status set. 394 * 395 * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If 396 * this method is called to close a flock before nested flocks are closed then it 397 * closes the nested flocks (in the reverse order that they were created in), 398 * closes this flock, and then throws {@code StructureViolationException}. 399 * Similarly, if this method is called to close a thread flock while executing with 400 * scoped value bindings, and the thread flock was created before the scoped values 401 * were bound, then {@code StructureViolationException} is thrown after closing the 402 * thread flock. 403 * 404 * @throws WrongThreadException if invoked by a thread that is not the owner 405 * @throws StructureViolationException if a structure violation was detected 406 */ 407 public void close() { 408 ensureOwner(); 409 if (closed) 410 return; 411 412 // shutdown, if not already shutdown 413 if (!shutdown) 414 shutdown = true; 415 416 // wait for threads to finish 417 boolean interrupted = false; 418 try { 419 while (threadCount > 0) { 420 LockSupport.park(); 421 if (Thread.interrupted()) { 422 interrupted = true; 423 } 424 } 425 426 } finally { 427 try { 428 container.close(); // may throw 429 } finally { 430 closed = true; 431 if (interrupted) Thread.currentThread().interrupt(); 432 } 433 } 434 } 435 436 /** 437 * {@return true if the flock has been {@linkplain #shutdown() shut down}} 438 */ 439 public boolean isShutdown() { 440 return shutdown; 441 } 442 443 /** 444 * {@return true if the flock has been {@linkplain #close() closed}} 445 */ 446 public boolean isClosed() { 447 return closed; 448 } 449 450 /** 451 * {@return a stream of the threads in this flock} 452 * The elements of the stream are threads that were started in this flock 453 * but have not terminated. The stream will reflect the set of threads in the 454 * flock at some point at or since the creation of the stream. It may or may 455 * not reflect changes to the set of threads subsequent to creation of the 456 * stream. 457 */ 458 public Stream<Thread> threads() { 459 return threads.stream(); 460 } 461 462 /** 463 * Tests if this flock contains the given thread. This method returns {@code true} 464 * if the thread was started in this flock and has not finished. If the thread 465 * is not in this flock then it tests if the thread is in flocks owned by threads 466 * in this flock, essentially equivalent to invoking {@code containsThread} method 467 * on all flocks owned by the threads in this flock. 468 * 469 * @param thread the thread 470 * @return true if this flock contains the thread 471 */ 472 public boolean containsThread(Thread thread) { 473 var c = JLA.threadContainer(thread); 474 if (c == this.container) 475 return true; 476 if (c != null && c != ThreadContainers.root()) { 477 var parent = c.parent(); 478 while (parent != null) { 479 if (parent == this.container) 480 return true; 481 parent = parent.parent(); 482 } 483 } 484 return false; 485 } 486 487 @Override 488 public String toString() { 489 String id = Objects.toIdentityString(this); 490 if (name != null) { 491 return name + "/" + id; 492 } else { 493 return id; 494 } 495 } 496 497 /** 498 * A ThreadContainer backed by a ThreadFlock. 499 */ 500 private static class ThreadContainerImpl extends ThreadContainer { 501 private final ThreadFlock flock; 502 private volatile Object key; 503 private boolean closing; 504 505 ThreadContainerImpl(ThreadFlock flock) { 506 super(/*shared*/ false); 507 this.flock = flock; 508 } 509 510 @Override 511 public ThreadContainerImpl push() { 512 // Virtual threads in the root containers may not be tracked so need 513 // to register container to ensure that it is found 514 if (!ThreadContainers.trackAllThreads()) { 515 Thread thread = Thread.currentThread(); 516 if (thread.isVirtual() 517 && JLA.threadContainer(thread) == ThreadContainers.root()) { 518 this.key = ThreadContainers.registerContainer(this); 519 } 520 } 521 super.push(); 522 return this; 523 } 524 525 /** 526 * Invoked by ThreadFlock.close when closing the flock. This method pops the 527 * container from the current thread's scope stack. 528 */ 529 void close() { 530 assert Thread.currentThread() == owner(); 531 if (!closing) { 532 closing = true; 533 boolean atTop = popForcefully(); // may block 534 Object key = this.key; 535 if (key != null) 536 ThreadContainers.deregisterContainer(key); 537 if (!atTop) 538 throw new StructureViolationException(); 539 } 540 } 541 542 /** 543 * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to 544 * close the flock. This method does not pop the container from the current 545 * thread's scope stack. 546 */ 547 @Override 548 protected boolean tryClose() { 549 assert Thread.currentThread() == owner(); 550 if (!closing) { 551 closing = true; 552 flock.close(); 553 Object key = this.key; 554 if (key != null) 555 ThreadContainers.deregisterContainer(key); 556 return true; 557 } else { 558 assert false : "Should not get there"; 559 return false; 560 } 561 } 562 563 @Override 564 public String name() { 565 return flock.name(); 566 } 567 @Override 568 public long threadCount() { 569 return flock.threadCount(); 570 } 571 @Override 572 public Stream<Thread> threads() { 573 return flock.threads().filter(Thread::isAlive); 574 } 575 @Override 576 public void onStart(Thread thread) { 577 flock.onStart(thread); 578 } 579 @Override 580 public void onExit(Thread thread) { 581 flock.onExit(thread); 582 } 583 @Override 584 public ScopedValueContainer.BindingsSnapshot scopedValueBindings() { 585 return flock.scopedValueBindings(); 586 } 587 } 588 }