1 /* 2 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.internal.misc; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.time.Duration; 30 import java.util.Objects; 31 import java.util.Set; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.TimeoutException; 34 import java.util.concurrent.StructureViolationException; 35 import java.util.concurrent.locks.LockSupport; 36 import java.util.stream.Stream; 37 import jdk.internal.access.JavaLangAccess; 38 import jdk.internal.access.SharedSecrets; 39 import jdk.internal.invoke.MhUtil; 40 import jdk.internal.vm.ScopedValueContainer; 41 import jdk.internal.vm.ThreadContainer; 42 import jdk.internal.vm.ThreadContainers; 43 import static java.util.concurrent.TimeUnit.NANOSECONDS; 44 45 /** 46 * A grouping of threads that typically run closely related tasks. Threads started 47 * in a flock remain in the flock until they terminate. 48 * 49 * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock, 50 * the {@link #start(Thread) start} method to start a thread in the flock, and the 51 * {@link #close() close} method to close the flock. The {@code close} waits for all 52 * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used 53 * to wait for all threads to finish without closing the flock. The {@link #wakeup()} 54 * method will cause {@code awaitAll} method to complete early, which can be used to 55 * support cancellation in higher-level APIs. ThreadFlock also defines the {@link 56 * #shutdown() shutdown} method to prevent new threads from starting while allowing 57 * existing threads in the flock to continue. 58 * 59 * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The 60 * thread that opens a new flock is the {@link #owner() owner}. The owner closes the 61 * flock when done, failure to do so may result in a resource leak. The {@code open} 62 * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock 63 * while a <em>nested</em> flock is open. A ThreadFlock can be used with the 64 * try-with-resources construct if required but more likely, the close method of a 65 * higher-level API that implements {@link AutoCloseable} will close the flock. 66 * 67 * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in 68 * flock "A" may itself open a new flock "B", implicitly forming a tree where flock 69 * "A" is the parent of flock "B". When nested, say where thread {@code T} opens 70 * flock "B" and then invokes a method that opens flock "C", then the enclosing 71 * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does 72 * not define APIs that exposes the tree structure. It does define the {@link 73 * #containsThread(Thread) containsThread} method to test if a flock contains a 74 * thread, a test that is equivalent to testing membership of flocks in the tree. 75 * The {@code start} and {@code shutdown} methods are confined to the flock 76 * owner or threads contained in the flock. The confinement check is equivalent to 77 * invoking the {@code containsThread} method to test if the caller is contained 78 * in the flock. 79 * 80 * <p> Unless otherwise specified, passing a {@code null} argument to a method 81 * in this class will cause a {@link NullPointerException} to be thrown. 82 */ 83 public class ThreadFlock implements AutoCloseable { 84 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 85 private static final VarHandle THREAD_COUNT; 86 private static final VarHandle PERMIT; 87 static { 88 MethodHandles.Lookup l = MethodHandles.lookup(); 89 THREAD_COUNT = MhUtil.findVarHandle(l, "threadCount", int.class); 90 PERMIT = MhUtil.findVarHandle(l, "permit", boolean.class); 91 } 92 93 private final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 94 95 // thread count, need to re-examine contention once API is stable 96 private volatile int threadCount; 97 98 private final String name; 99 private final ScopedValueContainer.BindingsSnapshot scopedValueBindings; 100 private final ThreadContainerImpl container; // encapsulate for now 101 102 // state 103 private volatile boolean shutdown; 104 private volatile boolean closed; 105 106 // set by wakeup, cleared by awaitAll 107 private volatile boolean permit; 108 109 ThreadFlock(String name) { 110 this.name = name; 111 this.scopedValueBindings = ScopedValueContainer.captureBindings(); 112 this.container = new ThreadContainerImpl(this); 113 } 114 115 private long threadCount() { 116 return threadCount; 117 } 118 119 private ScopedValueContainer.BindingsSnapshot scopedValueBindings() { 120 return scopedValueBindings; 121 } 122 123 private void incrementThreadCount() { 124 THREAD_COUNT.getAndAdd(this, 1); 125 } 126 127 /** 128 * Decrement the thread count. Unpark the owner if the count goes to zero. 129 */ 130 private void decrementThreadCount() { 131 int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1; 132 133 // signal owner when the count goes to zero 134 if (count == 0) { 135 LockSupport.unpark(owner()); 136 } 137 } 138 139 /** 140 * Invoked on the parent thread when starting {@code thread}. 141 */ 142 private void onStart(Thread thread) { 143 incrementThreadCount(); 144 boolean done = false; 145 try { 146 boolean added = threads.add(thread); 147 assert added; 148 if (shutdown) 149 throw new IllegalStateException("Shutdown"); 150 done = true; 151 } finally { 152 if (!done) { 153 threads.remove(thread); 154 decrementThreadCount(); 155 } 156 } 157 } 158 159 /** 160 * Invoked on the terminating thread or the parent thread when starting 161 * {@code thread} failed. This method is only called if onStart succeeded. 162 */ 163 private void onExit(Thread thread) { 164 boolean removed = threads.remove(thread); 165 assert removed; 166 decrementThreadCount(); 167 } 168 169 /** 170 * Clear wakeup permit. 171 */ 172 private void clearPermit() { 173 if (permit) 174 permit = false; 175 } 176 177 /** 178 * Sets the wakeup permit to the given value, returning the previous value. 179 */ 180 private boolean getAndSetPermit(boolean newValue) { 181 if (permit != newValue) { 182 return (boolean) PERMIT.getAndSet(this, newValue); 183 } else { 184 return newValue; 185 } 186 } 187 188 /** 189 * Throws WrongThreadException if the current thread is not the owner. 190 */ 191 private void ensureOwner() { 192 if (Thread.currentThread() != owner()) 193 throw new WrongThreadException("Current thread not owner"); 194 } 195 196 /** 197 * Throws WrongThreadException if the current thread is not the owner 198 * or a thread contained in the flock. 199 */ 200 private void ensureOwnerOrContainsThread() { 201 Thread currentThread = Thread.currentThread(); 202 if (currentThread != owner() && !containsThread(currentThread)) 203 throw new WrongThreadException("Current thread not owner or thread in flock"); 204 } 205 206 /** 207 * Opens a new thread flock. The flock is owned by the current thread. It can be 208 * named to aid debugging. 209 * 210 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value} 211 * bindings for inheritance by threads created in the flock. 212 * 213 * <p> For the purposes of containment, monitoring, and debugging, the parent 214 * of the new flock is determined as follows: 215 * <ul> 216 * <li> If the current thread is the owner of open flocks then the most recently 217 * created, and open, flock is the parent of the new flock. In other words, the 218 * <em>enclosing flock</em> is the parent. 219 * <li> If the current thread is not the owner of any open flocks then the 220 * parent of the new flock is the current thread's flock. If the current thread 221 * was not started in a flock then the new flock does not have a parent. 222 * </ul> 223 * 224 * @param name the name of the flock, can be null 225 * @return a new thread flock 226 */ 227 public static ThreadFlock open(String name) { 228 var flock = new ThreadFlock(name); 229 flock.container.push(); 230 return flock; 231 } 232 233 /** 234 * {@return the name of this flock or {@code null} if unnamed} 235 */ 236 public String name() { 237 return name; 238 } 239 240 /** 241 * {@return the owner of this flock} 242 */ 243 public Thread owner() { 244 return container.owner(); 245 } 246 247 /** 248 * Starts the given unstarted thread in this flock. 249 * 250 * <p> The thread is started with the scoped value bindings that were captured 251 * when opening the flock. The bindings must match the current thread's bindings. 252 * 253 * <p> This method may only be invoked by the flock owner or threads {@linkplain 254 * #containsThread(Thread) contained} in the flock. 255 * 256 * @param thread the unstarted thread 257 * @return the thread, started 258 * @throws IllegalStateException if this flock is shutdown or closed 259 * @throws IllegalThreadStateException if the given thread was already started 260 * @throws WrongThreadException if the current thread is not the owner or a thread 261 * contained in the flock 262 * @throws StructureViolationException if the current scoped value bindings are 263 * not the same as when the flock was created 264 */ 265 public Thread start(Thread thread) { 266 ensureOwnerOrContainsThread(); 267 JLA.start(thread, container); 268 return thread; 269 } 270 271 /** 272 * Shutdown this flock so that no new threads can be started, existing threads 273 * in the flock will continue to run. This method is a no-op if the flock is 274 * already shutdown or closed. 275 */ 276 public void shutdown() { 277 if (!shutdown) { 278 shutdown = true; 279 } 280 } 281 282 /** 283 * Wait for all threads in the flock to finish executing their tasks. This method 284 * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked, 285 * or the current thread is interrupted. 286 * 287 * <p> This method may only be invoked by the flock owner. The method trivially 288 * returns true when the flock is closed. 289 * 290 * <p> This method clears the effect of any previous invocations of the 291 * {@code wakeup} method. 292 * 293 * @return true if there are no threads in the flock, false if wakeup was invoked 294 * and there are unfinished threads 295 * @throws InterruptedException if interrupted while waiting 296 * @throws WrongThreadException if invoked by a thread that is not the owner 297 */ 298 public boolean awaitAll() throws InterruptedException { 299 ensureOwner(); 300 301 if (getAndSetPermit(false)) 302 return (threadCount == 0); 303 304 while (threadCount > 0 && !permit) { 305 LockSupport.park(); 306 if (Thread.interrupted()) 307 throw new InterruptedException(); 308 } 309 clearPermit(); 310 return (threadCount == 0); 311 } 312 313 /** 314 * Wait, up to the given waiting timeout, for all threads in the flock to finish 315 * executing their tasks. This method waits until all threads finish, the {@link 316 * #wakeup() wakeup} method is invoked, the current thread is interrupted, or 317 * the timeout expires. 318 * 319 * <p> This method may only be invoked by the flock owner. The method trivially 320 * returns true when the flock is closed. 321 * 322 * <p> This method clears the effect of any previous invocations of the {@code wakeup} 323 * method. 324 * 325 * @param timeout the maximum duration to wait 326 * @return true if there are no threads in the flock, false if wakeup was invoked 327 * and there are unfinished threads 328 * @throws InterruptedException if interrupted while waiting 329 * @throws TimeoutException if the wait timed out 330 * @throws WrongThreadException if invoked by a thread that is not the owner 331 */ 332 public boolean awaitAll(Duration timeout) 333 throws InterruptedException, TimeoutException { 334 Objects.requireNonNull(timeout); 335 ensureOwner(); 336 337 if (getAndSetPermit(false)) 338 return (threadCount == 0); 339 340 long startNanos = System.nanoTime(); 341 long nanos = NANOSECONDS.convert(timeout); 342 long remainingNanos = nanos; 343 while (threadCount > 0 && remainingNanos > 0 && !permit) { 344 LockSupport.parkNanos(remainingNanos); 345 if (Thread.interrupted()) 346 throw new InterruptedException(); 347 remainingNanos = nanos - (System.nanoTime() - startNanos); 348 } 349 350 boolean done = (threadCount == 0); 351 if (!done && remainingNanos <= 0 && !permit) { 352 throw new TimeoutException(); 353 } else { 354 clearPermit(); 355 return done; 356 } 357 } 358 359 /** 360 * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the 361 * {@linkplain #owner() owner} to return immediately. 362 * 363 * <p> If the owner is blocked in {@code awaitAll} then it will return immediately. 364 * If the owner is not blocked in {@code awaitAll} then its next call to wait 365 * will return immediately. The method does nothing when the flock is closed. 366 */ 367 public void wakeup() { 368 if (!getAndSetPermit(true) && Thread.currentThread() != owner()) { 369 LockSupport.unpark(owner()); 370 } 371 } 372 373 /** 374 * Closes this flock. This method first shuts down the flock to prevent 375 * new threads from starting. It then waits for the threads in the flock 376 * to finish executing their tasks. In other words, this method blocks until 377 * all threads in the flock finish. 378 * 379 * <p> This method may only be invoked by the flock owner. 380 * 381 * <p> If interrupted then this method continues to wait until all threads 382 * finish, before completing with the interrupt status set. 383 * 384 * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If 385 * this method is called to close a flock before nested flocks are closed then it 386 * closes the nested flocks (in the reverse order that they were created in), 387 * closes this flock, and then throws {@code StructureViolationException}. 388 * Similarly, if this method is called to close a thread flock while executing with 389 * scoped value bindings, and the thread flock was created before the scoped values 390 * were bound, then {@code StructureViolationException} is thrown after closing the 391 * thread flock. 392 * 393 * @throws WrongThreadException if invoked by a thread that is not the owner 394 * @throws StructureViolationException if a structure violation was detected 395 */ 396 public void close() { 397 ensureOwner(); 398 if (closed) 399 return; 400 401 // shutdown, if not already shutdown 402 if (!shutdown) 403 shutdown = true; 404 405 // wait for threads to finish 406 boolean interrupted = false; 407 try { 408 while (threadCount > 0) { 409 LockSupport.park(); 410 if (Thread.interrupted()) { 411 interrupted = true; 412 } 413 } 414 415 } finally { 416 try { 417 container.close(); // may throw 418 } finally { 419 closed = true; 420 if (interrupted) Thread.currentThread().interrupt(); 421 } 422 } 423 } 424 425 /** 426 * {@return true if the flock has been {@linkplain #shutdown() shut down}} 427 */ 428 public boolean isShutdown() { 429 return shutdown; 430 } 431 432 /** 433 * {@return true if the flock has been {@linkplain #close() closed}} 434 */ 435 public boolean isClosed() { 436 return closed; 437 } 438 439 /** 440 * {@return a stream of the threads in this flock} 441 * The elements of the stream are threads that were started in this flock 442 * but have not terminated. The stream will reflect the set of threads in the 443 * flock at some point at or since the creation of the stream. It may or may 444 * not reflect changes to the set of threads subsequent to creation of the 445 * stream. 446 */ 447 public Stream<Thread> threads() { 448 return threads.stream(); 449 } 450 451 /** 452 * Tests if this flock contains the given thread. This method returns {@code true} 453 * if the thread was started in this flock and has not finished. If the thread 454 * is not in this flock then it tests if the thread is in flocks owned by threads 455 * in this flock, essentially equivalent to invoking {@code containsThread} method 456 * on all flocks owned by the threads in this flock. 457 * 458 * @param thread the thread 459 * @return true if this flock contains the thread 460 */ 461 public boolean containsThread(Thread thread) { 462 var c = JLA.threadContainer(thread); 463 if (c == this.container) 464 return true; 465 if (c != null && c != ThreadContainers.root()) { 466 var parent = c.parent(); 467 while (parent != null) { 468 if (parent == this.container) 469 return true; 470 parent = parent.parent(); 471 } 472 } 473 return false; 474 } 475 476 @Override 477 public String toString() { 478 String id = Objects.toIdentityString(this); 479 if (name != null) { 480 return name + "/" + id; 481 } else { 482 return id; 483 } 484 } 485 486 /** 487 * A ThreadContainer backed by a ThreadFlock. 488 */ 489 private static class ThreadContainerImpl extends ThreadContainer { 490 private final ThreadFlock flock; 491 private volatile Object key; 492 private boolean closing; 493 494 ThreadContainerImpl(ThreadFlock flock) { 495 super(/*shared*/ false); 496 this.flock = flock; 497 } 498 499 @Override 500 public ThreadContainerImpl push() { 501 // Virtual threads in the root containers may not be tracked so need 502 // to register container to ensure that it is found 503 if (!ThreadContainers.trackAllThreads()) { 504 Thread thread = Thread.currentThread(); 505 if (thread.isVirtual() 506 && JLA.threadContainer(thread) == ThreadContainers.root()) { 507 this.key = ThreadContainers.registerContainer(this); 508 } 509 } 510 super.push(); 511 return this; 512 } 513 514 /** 515 * Invoked by ThreadFlock.close when closing the flock. This method pops the 516 * container from the current thread's scope stack. 517 */ 518 void close() { 519 assert Thread.currentThread() == owner(); 520 if (!closing) { 521 closing = true; 522 boolean atTop = popForcefully(); // may block 523 Object key = this.key; 524 if (key != null) 525 ThreadContainers.deregisterContainer(key); 526 if (!atTop) 527 throw new StructureViolationException(); 528 } 529 } 530 531 /** 532 * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to 533 * close the flock. This method does not pop the container from the current 534 * thread's scope stack. 535 */ 536 @Override 537 protected boolean tryClose() { 538 assert Thread.currentThread() == owner(); 539 if (!closing) { 540 closing = true; 541 flock.close(); 542 Object key = this.key; 543 if (key != null) 544 ThreadContainers.deregisterContainer(key); 545 return true; 546 } else { 547 assert false : "Should not get there"; 548 return false; 549 } 550 } 551 552 @Override 553 public String name() { 554 return flock.name(); 555 } 556 @Override 557 public long threadCount() { 558 return flock.threadCount(); 559 } 560 @Override 561 public Stream<Thread> threads() { 562 return flock.threads().filter(Thread::isAlive); 563 } 564 @Override 565 public void onStart(Thread thread) { 566 flock.onStart(thread); 567 } 568 @Override 569 public void onExit(Thread thread) { 570 flock.onExit(thread); 571 } 572 @Override 573 public ScopedValueContainer.BindingsSnapshot scopedValueBindings() { 574 return flock.scopedValueBindings(); 575 } 576 } 577 }