1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.internal.misc; 26 27 import java.lang.invoke.MethodHandles; 28 import java.lang.invoke.VarHandle; 29 import java.time.Duration; 30 import java.util.Objects; 31 import java.util.Set; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.TimeoutException; 34 import java.util.concurrent.StructureViolationException; 35 import java.util.concurrent.locks.LockSupport; 36 import java.util.stream.Stream; 37 import jdk.internal.access.JavaLangAccess; 38 import jdk.internal.access.SharedSecrets; 39 import jdk.internal.vm.ScopedValueContainer; 40 import jdk.internal.vm.ThreadContainer; 41 import jdk.internal.vm.ThreadContainers; 42 import static java.util.concurrent.TimeUnit.NANOSECONDS; 43 44 /** 45 * A grouping of threads that typically run closely related tasks. Threads started 46 * in a flock remain in the flock until they terminate. 47 * 48 * <p> ThreadFlock defines the {@link #open(String) open} method to open a new flock, 49 * the {@link #start(Thread) start} method to start a thread in the flock, and the 50 * {@link #close() close} method to close the flock. The {@code close} waits for all 51 * threads in the flock to finish. The {@link #awaitAll() awaitAll} method may be used 52 * to wait for all threads to finish without closing the flock. The {@link #wakeup()} 53 * method will cause {@code awaitAll} method to complete early, which can be used to 54 * support cancellation in higher-level APIs. ThreadFlock also defines the {@link 55 * #shutdown() shutdown} method to prevent new threads from starting while allowing 56 * existing threads in the flock to continue. 57 * 58 * <p> Thread flocks are intended to be used in a <em>structured manner</em>. The 59 * thread that opens a new flock is the {@link #owner() owner}. The owner closes the 60 * flock when done, failure to do so may result in a resource leak. The {@code open} 61 * and {@code close} should be matched to avoid closing an <em>enclosing</em> flock 62 * while a <em>nested</em> flock is open. A ThreadFlock can be used with the 63 * try-with-resources construct if required but more likely, the close method of a 64 * higher-level API that implements {@link AutoCloseable} will close the flock. 65 * 66 * <p> Thread flocks are conceptually nodes in a tree. A thread {@code T} started in 67 * flock "A" may itself open a new flock "B", implicitly forming a tree where flock 68 * "A" is the parent of flock "B". When nested, say where thread {@code T} opens 69 * flock "B" and then invokes a method that opens flock "C", then the enclosing 70 * flock "B" is conceptually the parent of the nested flock "C". ThreadFlock does 71 * not define APIs that exposes the tree structure. It does define the {@link 72 * #containsThread(Thread) containsThread} method to test if a flock contains a 73 * thread, a test that is equivalent to testing membership of flocks in the tree. 74 * The {@code start} and {@code shutdown} methods are confined to the flock 75 * owner or threads contained in the flock. The confinement check is equivalent to 76 * invoking the {@code containsThread} method to test if the caller is contained 77 * in the flock. 78 * 79 * <p> Unless otherwise specified, passing a {@code null} argument to a method 80 * in this class will cause a {@link NullPointerException} to be thrown. 81 */ 82 public class ThreadFlock implements AutoCloseable { 83 private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); 84 private static final VarHandle THREAD_COUNT; 85 private static final VarHandle PERMIT; 86 static { 87 try { 88 MethodHandles.Lookup l = MethodHandles.lookup(); 89 THREAD_COUNT = l.findVarHandle(ThreadFlock.class, "threadCount", int.class); 90 PERMIT = l.findVarHandle(ThreadFlock.class, "permit", boolean.class); 91 } catch (Exception e) { 92 throw new InternalError(e); 93 } 94 } 95 96 private final Set<Thread> threads = ConcurrentHashMap.newKeySet(); 97 98 // thread count, need to re-examine contention once API is stable 99 private volatile int threadCount; 100 101 private final String name; 102 private final ScopedValueContainer.BindingsSnapshot scopedValueBindings; 103 private final ThreadContainerImpl container; // encapsulate for now 104 105 // state 106 private volatile boolean shutdown; 107 private volatile boolean closed; 108 109 // set by wakeup, cleared by awaitAll 110 private volatile boolean permit; 111 112 ThreadFlock(String name) { 113 this.name = name; 114 this.scopedValueBindings = ScopedValueContainer.captureBindings(); 115 this.container = new ThreadContainerImpl(this); 116 } 117 118 private long threadCount() { 119 return threadCount; 120 } 121 122 private ScopedValueContainer.BindingsSnapshot scopedValueBindings() { 123 return scopedValueBindings; 124 } 125 126 private void incrementThreadCount() { 127 THREAD_COUNT.getAndAdd(this, 1); 128 } 129 130 /** 131 * Decrement the thread count. Unpark the owner if the count goes to zero. 132 */ 133 private void decrementThreadCount() { 134 int count = (int) THREAD_COUNT.getAndAdd(this, -1) - 1; 135 136 // signal owner when the count goes to zero 137 if (count == 0) { 138 LockSupport.unpark(owner()); 139 } 140 } 141 142 /** 143 * Invoked on the parent thread when starting {@code thread}. 144 */ 145 private void onStart(Thread thread) { 146 incrementThreadCount(); 147 boolean done = false; 148 try { 149 boolean added = threads.add(thread); 150 assert added; 151 if (shutdown) 152 throw new IllegalStateException("Shutdown"); 153 done = true; 154 } finally { 155 if (!done) { 156 threads.remove(thread); 157 decrementThreadCount(); 158 } 159 } 160 } 161 162 /** 163 * Invoked on the terminating thread or the parent thread when starting 164 * {@code thread} failed. This method is only called if onStart succeeded. 165 */ 166 private void onExit(Thread thread) { 167 boolean removed = threads.remove(thread); 168 assert removed; 169 decrementThreadCount(); 170 } 171 172 /** 173 * Clear wakeup permit. 174 */ 175 private void clearPermit() { 176 if (permit) 177 permit = false; 178 } 179 180 /** 181 * Sets the wakeup permit to the given value, returning the previous value. 182 */ 183 private boolean getAndSetPermit(boolean newValue) { 184 if (permit != newValue) { 185 return (boolean) PERMIT.getAndSet(this, newValue); 186 } else { 187 return newValue; 188 } 189 } 190 191 /** 192 * Throws WrongThreadException if the current thread is not the owner. 193 */ 194 private void ensureOwner() { 195 if (Thread.currentThread() != owner()) 196 throw new WrongThreadException("Current thread not owner"); 197 } 198 199 /** 200 * Throws WrongThreadException if the current thread is not the owner 201 * or a thread contained in the flock. 202 */ 203 private void ensureOwnerOrContainsThread() { 204 Thread currentThread = Thread.currentThread(); 205 if (currentThread != owner() && !containsThread(currentThread)) 206 throw new WrongThreadException("Current thread not owner or thread in flock"); 207 } 208 209 /** 210 * Opens a new thread flock. The flock is owned by the current thread. It can be 211 * named to aid debugging. 212 * 213 * <p> This method captures the current thread's {@linkplain ScopedValue scoped value} 214 * bindings for inheritance by threads created in the flock. 215 * 216 * <p> For the purposes of containment, monitoring, and debugging, the parent 217 * of the new flock is determined as follows: 218 * <ul> 219 * <li> If the current thread is the owner of open flocks then the most recently 220 * created, and open, flock is the parent of the new flock. In other words, the 221 * <em>enclosing flock</em> is the parent. 222 * <li> If the current thread is not the owner of any open flocks then the 223 * parent of the new flock is the current thread's flock. If the current thread 224 * was not started in a flock then the new flock does not have a parent. 225 * </ul> 226 * 227 * @param name the name of the flock, can be null 228 * @return a new thread flock 229 */ 230 public static ThreadFlock open(String name) { 231 var flock = new ThreadFlock(name); 232 flock.container.push(); 233 return flock; 234 } 235 236 /** 237 * {@return the name of this flock or {@code null} if unnamed} 238 */ 239 public String name() { 240 return name; 241 } 242 243 /** 244 * {@return the owner of this flock} 245 */ 246 public Thread owner() { 247 return container.owner(); 248 } 249 250 /** 251 * Starts the given unstarted thread in this flock. 252 * 253 * <p> The thread is started with the scoped value bindings that were captured 254 * when opening the flock. The bindings must match the current thread's bindings. 255 * 256 * <p> This method may only be invoked by the flock owner or threads {@linkplain 257 * #containsThread(Thread) contained} in the flock. 258 * 259 * @param thread the unstarted thread 260 * @return the thread, started 261 * @throws IllegalStateException if this flock is shutdown or closed 262 * @throws IllegalThreadStateException if the given thread was already started 263 * @throws WrongThreadException if the current thread is not the owner or a thread 264 * contained in the flock 265 * @throws StructureViolationException if the current scoped value bindings are 266 * not the same as when the flock was created 267 */ 268 public Thread start(Thread thread) { 269 ensureOwnerOrContainsThread(); 270 JLA.start(thread, container); 271 return thread; 272 } 273 274 /** 275 * Shutdown this flock so that no new threads can be started, existing threads 276 * in the flock will continue to run. This method is a no-op if the flock is 277 * already shutdown or closed. 278 * 279 * <p> This method may only be invoked by the flock owner or threads {@linkplain 280 * #containsThread(Thread) contained} in the flock. 281 * 282 * @throws WrongThreadException if the current thread is not the owner or a thread 283 * contained in the flock 284 */ 285 public void shutdown() { 286 ensureOwnerOrContainsThread(); 287 if (!shutdown) { 288 shutdown = true; 289 } 290 } 291 292 /** 293 * Wait for all threads in the flock to finish executing their tasks. This method 294 * waits until all threads finish, the {@link #wakeup() wakeup} method is invoked, 295 * or the current thread is interrupted. 296 * 297 * <p> This method may only be invoked by the flock owner. The method trivially 298 * returns true when the flock is closed. 299 * 300 * <p> This method clears the effect of any previous invocations of the 301 * {@code wakeup} method. 302 * 303 * @return true if there are no threads in the flock, false if wakeup was invoked 304 * and there are unfinished threads 305 * @throws InterruptedException if interrupted while waiting 306 * @throws WrongThreadException if invoked by a thread that is not the owner 307 */ 308 public boolean awaitAll() throws InterruptedException { 309 ensureOwner(); 310 311 if (getAndSetPermit(false)) 312 return (threadCount == 0); 313 314 while (threadCount > 0 && !permit) { 315 LockSupport.park(); 316 if (Thread.interrupted()) 317 throw new InterruptedException(); 318 } 319 clearPermit(); 320 return (threadCount == 0); 321 } 322 323 /** 324 * Wait, up to the given waiting timeout, for all threads in the flock to finish 325 * executing their tasks. This method waits until all threads finish, the {@link 326 * #wakeup() wakeup} method is invoked, the current thread is interrupted, or 327 * the timeout expires. 328 * 329 * <p> This method may only be invoked by the flock owner. The method trivially 330 * returns true when the flock is closed. 331 * 332 * <p> This method clears the effect of any previous invocations of the {@code wakeup} 333 * method. 334 * 335 * @param timeout the maximum duration to wait 336 * @return true if there are no threads in the flock, false if wakeup was invoked 337 * and there are unfinished threads 338 * @throws InterruptedException if interrupted while waiting 339 * @throws TimeoutException if the wait timed out 340 * @throws WrongThreadException if invoked by a thread that is not the owner 341 */ 342 public boolean awaitAll(Duration timeout) 343 throws InterruptedException, TimeoutException { 344 Objects.requireNonNull(timeout); 345 ensureOwner(); 346 347 if (getAndSetPermit(false)) 348 return (threadCount == 0); 349 350 long startNanos = System.nanoTime(); 351 long nanos = NANOSECONDS.convert(timeout); 352 long remainingNanos = nanos; 353 while (threadCount > 0 && remainingNanos > 0 && !permit) { 354 LockSupport.parkNanos(remainingNanos); 355 if (Thread.interrupted()) 356 throw new InterruptedException(); 357 remainingNanos = nanos - (System.nanoTime() - startNanos); 358 } 359 360 boolean done = (threadCount == 0); 361 if (!done && remainingNanos <= 0 && !permit) { 362 throw new TimeoutException(); 363 } else { 364 clearPermit(); 365 return done; 366 } 367 } 368 369 /** 370 * Causes the call to {@link #awaitAll()} or {@link #awaitAll(Duration)} by the 371 * {@linkplain #owner() owner} to return immediately. 372 * 373 * <p> If the owner is blocked in {@code awaitAll} then it will return immediately. 374 * If the owner is not blocked in {@code awaitAll} then its next call to wait 375 * will return immediately. The method does nothing when the flock is closed. 376 * 377 * @throws WrongThreadException if the current thread is not the owner or a thread 378 * contained in the flock 379 */ 380 public void wakeup() { 381 ensureOwnerOrContainsThread(); 382 if (!getAndSetPermit(true) && Thread.currentThread() != owner()) { 383 LockSupport.unpark(owner()); 384 } 385 } 386 387 /** 388 * Closes this flock. This method first shuts down the flock to prevent 389 * new threads from starting. It then waits for the threads in the flock 390 * to finish executing their tasks. In other words, this method blocks until 391 * all threads in the flock finish. 392 * 393 * <p> This method may only be invoked by the flock owner. 394 * 395 * <p> If interrupted then this method continues to wait until all threads 396 * finish, before completing with the interrupt status set. 397 * 398 * <p> A ThreadFlock is intended to be used in a <em>structured manner</em>. If 399 * this method is called to close a flock before nested flocks are closed then it 400 * closes the nested flocks (in the reverse order that they were created in), 401 * closes this flock, and then throws {@code StructureViolationException}. 402 * Similarly, if this method is called to close a thread flock while executing with 403 * scoped value bindings, and the thread flock was created before the scoped values 404 * were bound, then {@code StructureViolationException} is thrown after closing the 405 * thread flock. 406 * 407 * @throws WrongThreadException if invoked by a thread that is not the owner 408 * @throws StructureViolationException if a structure violation was detected 409 */ 410 public void close() { 411 ensureOwner(); 412 if (closed) 413 return; 414 415 // shutdown, if not already shutdown 416 if (!shutdown) 417 shutdown = true; 418 419 // wait for threads to finish 420 boolean interrupted = false; 421 try { 422 while (threadCount > 0) { 423 LockSupport.park(); 424 if (Thread.interrupted()) { 425 interrupted = true; 426 } 427 } 428 429 } finally { 430 try { 431 container.close(); // may throw 432 } finally { 433 closed = true; 434 if (interrupted) Thread.currentThread().interrupt(); 435 } 436 } 437 } 438 439 /** 440 * {@return true if the flock has been {@linkplain #shutdown() shut down}} 441 */ 442 public boolean isShutdown() { 443 return shutdown; 444 } 445 446 /** 447 * {@return true if the flock has been {@linkplain #close() closed}} 448 */ 449 public boolean isClosed() { 450 return closed; 451 } 452 453 /** 454 * {@return a stream of the threads in this flock} 455 * The elements of the stream are threads that were started in this flock 456 * but have not terminated. The stream will reflect the set of threads in the 457 * flock at some point at or since the creation of the stream. It may or may 458 * not reflect changes to the set of threads subsequent to creation of the 459 * stream. 460 */ 461 public Stream<Thread> threads() { 462 return threads.stream(); 463 } 464 465 /** 466 * Tests if this flock contains the given thread. This method returns {@code true} 467 * if the thread was started in this flock and has not finished. If the thread 468 * is not in this flock then it tests if the thread is in flocks owned by threads 469 * in this flock, essentially equivalent to invoking {@code containsThread} method 470 * on all flocks owned by the threads in this flock. 471 * 472 * @param thread the thread 473 * @return true if this flock contains the thread 474 */ 475 public boolean containsThread(Thread thread) { 476 var c = JLA.threadContainer(thread); 477 if (c == this.container) 478 return true; 479 if (c != null && c != ThreadContainers.root()) { 480 var parent = c.parent(); 481 while (parent != null) { 482 if (parent == this.container) 483 return true; 484 parent = parent.parent(); 485 } 486 } 487 return false; 488 } 489 490 @Override 491 public String toString() { 492 String id = Objects.toIdentityString(this); 493 if (name != null) { 494 return name + "/" + id; 495 } else { 496 return id; 497 } 498 } 499 500 /** 501 * A ThreadContainer backed by a ThreadFlock. 502 */ 503 private static class ThreadContainerImpl extends ThreadContainer { 504 private final ThreadFlock flock; 505 private volatile Object key; 506 private boolean closing; 507 508 ThreadContainerImpl(ThreadFlock flock) { 509 super(/*shared*/ false); 510 this.flock = flock; 511 } 512 513 @Override 514 public ThreadContainerImpl push() { 515 // Virtual threads in the root containers may not be tracked so need 516 // to register container to ensure that it is found 517 if (!ThreadContainers.trackAllThreads()) { 518 Thread thread = Thread.currentThread(); 519 if (thread.isVirtual() 520 && JLA.threadContainer(thread) == ThreadContainers.root()) { 521 this.key = ThreadContainers.registerContainer(this); 522 } 523 } 524 super.push(); 525 return this; 526 } 527 528 /** 529 * Invoked by ThreadFlock.close when closing the flock. This method pops the 530 * container from the current thread's scope stack. 531 */ 532 void close() { 533 assert Thread.currentThread() == owner(); 534 if (!closing) { 535 closing = true; 536 boolean atTop = popForcefully(); // may block 537 Object key = this.key; 538 if (key != null) 539 ThreadContainers.deregisterContainer(key); 540 if (!atTop) 541 throw new StructureViolationException(); 542 } 543 } 544 545 /** 546 * Invoked when an enclosing scope is closing. Invokes ThreadFlock.close to 547 * close the flock. This method does not pop the container from the current 548 * thread's scope stack. 549 */ 550 @Override 551 protected boolean tryClose() { 552 assert Thread.currentThread() == owner(); 553 if (!closing) { 554 closing = true; 555 flock.close(); 556 Object key = this.key; 557 if (key != null) 558 ThreadContainers.deregisterContainer(key); 559 return true; 560 } else { 561 assert false : "Should not get there"; 562 return false; 563 } 564 } 565 566 @Override 567 public String name() { 568 return flock.name(); 569 } 570 @Override 571 public long threadCount() { 572 return flock.threadCount(); 573 } 574 @Override 575 public Stream<Thread> threads() { 576 return flock.threads().filter(Thread::isAlive); 577 } 578 @Override 579 public void onStart(Thread thread) { 580 flock.onStart(thread); 581 } 582 @Override 583 public void onExit(Thread thread) { 584 flock.onExit(thread); 585 } 586 @Override 587 public ScopedValueContainer.BindingsSnapshot scopedValueBindings() { 588 return flock.scopedValueBindings(); 589 } 590 } 591 } --- EOF ---