1 /*
   2  * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.lang;
  26 
  27 import java.lang.invoke.MethodHandles;
  28 import java.lang.invoke.VarHandle;
  29 import java.security.AccessController;
  30 import java.security.PrivilegedAction;
  31 import java.time.Duration;
  32 import java.time.Instant;
  33 import java.util.Objects;
  34 import java.util.Set;
  35 import java.util.concurrent.Callable;
  36 import java.util.concurrent.ConcurrentHashMap;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.Executors;
  39 import java.util.concurrent.Future;
  40 import java.util.concurrent.ScheduledExecutorService;
  41 import java.util.concurrent.ScheduledThreadPoolExecutor;
  42 import java.util.concurrent.TimeUnit;
  43 import java.util.concurrent.locks.Condition;
  44 import java.util.concurrent.locks.ReentrantLock;
  45 
  46 import jdk.internal.misc.Strands;
  47 
  48 /**
  49  * A scope in which {@link Fiber fibers} are scheduled.
  50  *
  51  * <p> A {@code FiberScope} is opened and <em>entered</em> by creating an instance,
  52  * usually by means of one of the static {@linkplain #open(Option...) open} methods
  53  * defined here. The thread or fiber that opens the scope is the <i>owner</i> of
  54  * the scope. The scope is closed and <em>exited</em> by invoking its {@linkplain
  55  * #close() close} method. A scope can only be exited when all fibers scheduled
  56  * in the scope have terminated; the {@code close} method blocks until all fibers
  57  * scheduled in the scope have terminated. As a special case, fibers can be
  58  * scheduled in the <em>{@linkplain #background() background} scope</em> for cases
  59  * where fibers are <em>unmanaged</em> or need to <em>outlive</em> the thread or
  60  * fiber that schedules them.
  61  *
  62  * <p> {@code FiberScope} implements {@linkplain AutoCloseable} so that the
  63  * try-with-resources statement can be used to ensure that a scope is closed.
  64  * The following example schedules two fibers in a scope. The try-with-resources
  65  * statement completes when the block completes and both fibers scheduled in the
  66  * cope terminate.
  67  *
  68  * <pre>{@code
  69  *     try (var scope = FiberScope.open()) {
  70  *         var fiber1 = scope.schedule(() -> "one");
  71  *         var fiber2 = scope.schedule(() -> "two");
  72  *     });
  73  * }</pre>
  74  *
  75  * <p> A {@code FiberScope} can be created to track the fibers scheduled in the
  76  * scope so that they can be cancelled in bulk when the owner is {@link Fiber#cancel()
  77  * cancelled}, the owner closes the scope, a deadline is reached, or a timeout
  78  * expires. Alternatively, a {@code FiberScope} can be created to shield the owner
  79  * from cancellation, useful for recovery or cleanup that needs to complete without
  80  * cancellation.
  81  *
  82  * <p> Fiber scopes may be nested. A thread or fiber executing in a scope may open
  83  * and enter another (inner) scope. Any fibers scheduled in the inner scope must
  84  * terminate to allow the thread or fiber exit back to the parent/enclosing scope.
  85  *
  86  * <p> Unless otherwise noted, passing a {@code null} argument will cause a
  87  * {@linkplain NullPointerException} to be thrown.
  88  *
  89  * @apiNote For now, {@code FiberScope}s can only be created with the {@code open}
  90  * methods defined here. A future revision of this API may support a builder or
  91  * other ways to compose scopes that make use of the <em>lifecycle events</em> that
  92  * are not currently exposed. A future revision may also add a type parameter.
  93  */
  94 
  95 public class FiberScope implements AutoCloseable {
  96     private static final VarHandle COUNT;
  97     static {
  98         try {
  99             MethodHandles.Lookup l = MethodHandles.lookup();
 100             COUNT = l.findVarHandle(FiberScope.class, "count", int.class);
 101         } catch (Exception e) {
 102             throw new InternalError(e);
 103         }
 104     }
 105     private volatile int count;
 106 
 107     private final boolean cancellable;
 108     private final Object owner;
 109     private final FiberScope parent;
 110 
 111     // close support
 112     private volatile boolean closed;
 113     private final ReentrantLock closeLock = new ReentrantLock();
 114     private volatile Condition closeCondition;  // created lazily
 115 
 116     // background scope
 117     private static final FiberScope BACKGROUND = new BackgroundFiberScope();
 118 
 119     /**
 120      * Creates and enters a new scope.
 121      */
 122     FiberScope(boolean cancellable) {
 123         this.cancellable = cancellable;
 124         this.owner = Strands.currentStrand();
 125         if (owner instanceof Thread) {
 126             Thread thread = (Thread) owner;
 127             this.parent = thread.scope();
 128             thread.setScope(this);
 129         } else {
 130             Fiber<?> fiber = (Fiber<?>) owner;
 131             this.parent = fiber.scope();
 132             fiber.setScope(this);
 133         }
 134     }
 135 
 136     /**
 137      * Constructor for the background scope.
 138      */
 139     FiberScope(Void ignore, boolean cancellable) {
 140         this.cancellable = cancellable;
 141         this.owner = null;
 142         this.parent = null;
 143     }
 144 
 145     /**
 146      * Returns true if this scope is cancellable.
 147      */
 148     boolean isCancellable() {
 149         return cancellable;
 150     }
 151 
 152     /**
 153      * Returns true if this scope propagates cancellation to nested scopes.
 154      */
 155     boolean propagatesCancel() {
 156         return false;
 157     }
 158 
 159     /**
 160      * Return the scope owner.
 161      */
 162     Object owner() {
 163         return owner;
 164     }
 165 
 166     /**
 167      * Returns the parent scope or {@code null} if there is no parent.
 168      */
 169     FiberScope parent() {
 170         return parent;
 171     }
 172 
 173     /**
 174      * Returns the current fiber scope. If invoked on a thread that is not in a
 175      * scope then the background scope is returned (to make it look like threads
 176      * start out in the background scope).
 177      */
 178     static FiberScope current() {
 179         Object strand = Strands.currentStrand();
 180         if (strand instanceof Thread) {
 181             Thread thread = (Thread) strand;
 182             FiberScope scope = thread.scope();
 183             return (scope != null) ? scope : BACKGROUND;
 184         } else {
 185             FiberScope scope = ((Fiber<?>) strand).scope();
 186             assert scope != null;
 187             return scope;
 188         }
 189     }
 190 
 191     /**
 192      * Returns the scope to schedule fibers in the <em>background</em>. A fiber
 193      * scheduled in the <em>background</em> scope can outlive the thread or fiber
 194      * that schedules it. The scope cannot be closed; its {@link #close() close}
 195      * method always fails.
 196      *
 197      * @return the background scope
 198      */
 199     @SuppressWarnings("unchecked")
 200     public static FiberScope  background() {
 201         return BACKGROUND;
 202     }
 203 
 204     /**
 205      * Defines options to specify when opening a fiber scope.
 206      *
 207      * @see FiberScope#open(Option...)
 208      */
 209     public enum Option {
 210         /**
 211          * Fibers in the scope are <em>shielded</em> from cancellation. This
 212          * option is intended to be used by cleanup and recovery operations that
 213          * cannot be cancelled.
 214          */
 215         IGNORE_CANCEL,
 216         /**
 217          * Cancel all fibers remaining in the scope when the owner closes the scope.
 218          */
 219         CANCEL_AT_CLOSE,
 220         /**
 221          * Cancel all fibers in the scope when the owner is cancelled.
 222          */
 223         PROPAGATE_CANCEL,
 224     }
 225 
 226     /**
 227      * Creates and enters a new scope. The current {@link Thread#currentThread()
 228      * thread} or {@link Fiber#current() fiber} is the <em>owner</em> of the scope,
 229      * only the owner can exit the scope with the {@linkplain #close() close}
 230      * method.
 231      *
 232      * <p> Options can be used to configure the scope:
 233      *
 234      * <ul>
 235      *   <li><p> {@linkplain Option#IGNORE_CANCEL IGNORE_CANCEL}: The owner is
 236      *   <em>shielded</em> from cancellation when executing in this scope.
 237      *   Fibers scheduled in the scope are also shielded.
 238      *   This option cannot be specified with any other option. </p></li>
 239      *
 240      *   <li><p> {@linkplain Option#CANCEL_AT_CLOSE CANCEL_AT_CLOSE}: A scope
 241      *   created with this option cancels all fibers scheduled in the scope
 242      *   when the owner {@link #close() closes} the scope. </p></li>
 243      *
 244      *   <li><p> {@linkplain Option#PROPAGATE_CANCEL PROPAGATE_CANCEL}: If
 245      *   the owner is {@link Fiber#cancel() cancelled}  then the cancellation is
 246      *   propagated to fibers scheduled in the scope.
 247      *   Once the owner is cancelled, scheduling additional fibers in the scope
 248      *   will schedule the fibers with their <em>cancel status</em> set. </p></li>
 249      * </ul>
 250      *
 251      * <p> The {@code IGNORE_CANCEL} and {@code PROPAGATE_CANCEL} options are
 252      * inherited the from parent/enclosing scope when no options are specified.
 253      *
 254      * @param options options to configure the scope
 255      * @return a new scope
 256      * @throws IllegalArgumentException if an illegal combination of options is
 257      *         specified
 258      */
 259     public static FiberScope open(Option... options) {
 260         FiberScope current = current();
 261         boolean cancellable = true;
 262         boolean cancelAtClose = false;
 263         boolean propagateCancel = false;
 264 
 265         if (options.length > 0) {
 266             for (Option option : options) {
 267                 Objects.requireNonNull(option);
 268                 switch (option) {
 269                     case IGNORE_CANCEL:
 270                         cancellable = false;
 271                         break;
 272                     case CANCEL_AT_CLOSE:
 273                         cancelAtClose = true;
 274                         break;
 275                     case PROPAGATE_CANCEL:
 276                         propagateCancel = true;
 277                         break;
 278                 }
 279             }
 280 
 281             // check for invalidate combinations of options
 282             if (!cancellable && (cancelAtClose || propagateCancel)) {
 283                 throw new IllegalArgumentException("IGNORE_CANCEL specified with other options");
 284             }
 285         } else {
 286             // inherit IGNORE_CANCEL when no options specified
 287             cancellable = current().isCancellable();
 288         }
 289 
 290         // inherit PROPAGATE_CANCEL when new scope is cancellable
 291         propagateCancel |= (cancellable && current.propagatesCancel());
 292 
 293         // Threads don't support cancellation so create TimedFiberScope scope when
 294         // the parent scope has a deadline
 295         if (propagateCancel
 296                 && Fiber.currentFiber() == null
 297                 && current instanceof TimedFiberScope
 298                 && current.owner() instanceof TimedFiberScope) {
 299             Instant deadline = ((TimedFiberScope) current).deadline();
 300             return new TimedFiberScope(deadline, cancelAtClose);
 301         }
 302 
 303         if (cancellable && (cancelAtClose || propagateCancel)) {
 304             return new CancellingFiberScope(cancelAtClose, propagateCancel);
 305         } else {
 306             return new FiberScope(cancellable);
 307         }
 308     }
 309 
 310     /**
 311      * Creates and enters a new scope. This method is equivalent to creating the
 312      * scope with the {@linkplain #open(Option...)} method with the {@linkplain
 313      * Option#PROPAGATE_CANCEL PROPAGATE_CANCEL} option.
 314      *
 315      * <p> If the deadline is reached before the owner has exited the scope then
 316      * all fibers scheduled in the scope (that haven't terminated) are cancelled.
 317      * Furthermore, if the owner is not waiting in the {@code close} method when
 318      * the deadline is reached then it is {@link Fiber#cancel() cancelled} (if
 319      * it's a fiber) or {@link Thread#interrupt() interrupted} (if it's a thread).
 320      *
 321      * @param deadline the deadline
 322      * @param options options to configure the scope
 323      * @return a new scope
 324      * @throws IllegalArgumentException if {@code IGNORE_CANCEL} is specified
 325      */
 326     public static FiberScope open(Instant deadline, Option... options) {
 327         Objects.requireNonNull(deadline);
 328         boolean cancelAtClose = false;
 329         for (Option option : options) {
 330             Objects.requireNonNull(option);
 331             if (option == Option.IGNORE_CANCEL) {
 332                 throw new IllegalArgumentException("IGNORE_CANCEL not allowed");
 333             }
 334             if (option == Option.CANCEL_AT_CLOSE) {
 335                 cancelAtClose = true;;
 336             }
 337         }
 338         return new TimedFiberScope(deadline, cancelAtClose);
 339     }
 340 
 341     /**
 342      * Creates and enters a new scope. This method is equivalent to creating the
 343      * scope with the {@linkplain #open(Option...)} method with the {@linkplain
 344      * Option#PROPAGATE_CANCEL PROPAGATE_CANCEL} option.
 345      *
 346      * <p> If the timeout expires before the owner has exited the scope then
 347      * all fibers scheduled in the scope (that haven't terminated) are cancelled.
 348      * Furthermore, if the owner is not waiting in the {@code close} method when
 349      * the timeout expires then it is {@link Fiber#cancel() cancelled} (if
 350      * it's a fiber) or {@link Thread#interrupt() interrupted} (if it's a thread).
 351      *
 352      * @param timeout the timeout
 353      * @param options options to configure the scope
 354      * @return a new scope
 355      * @throws IllegalArgumentException if {@code IGNORE_CANCEL} is specified
 356      */
 357     public static FiberScope open(Duration timeout, Option... options) {
 358         return open(Instant.now().plus(timeout), options);
 359     }
 360 
 361     /**
 362      * Invoked when a fiber is initially scheduled.
 363      *
 364      * <p> The method is invoked on the thread or the fiber in the scope that
 365      * created the given fiber. The method is invoked before the given fiber is
 366      * initially scheduled.
 367      *
 368      * @param fiber the newly created, but not scheduled, fiber
 369      */
 370     void onSchedule(Fiber<?> fiber) { }
 371 
 372     <V> Fiber<V> schedule(Fiber<V> fiber) {
 373         FiberScope scope = current();
 374         assert scope != null;
 375         while (scope != this) {
 376             scope = scope.parent;
 377             if (scope == null) {
 378                 throw new IllegalCallerException("Caller not in fiber scope");
 379             }
 380         }
 381 
 382         onSchedule(fiber);
 383 
 384         // onSchedule may have closed the scope
 385         if (closed) {
 386             throw new IllegalStateException("Scope is closed");
 387         }
 388 
 389         COUNT.getAndAdd(this, 1);
 390         return fiber.schedule(this);
 391     }
 392 
 393     /**
 394      * Creates and schedules a new {@link Fiber fiber} to run the given task in
 395      * this scope. The fiber is scheduled in this scope with the default scheduler.
 396      *
 397      * <p> If the scope owner is {@link Fiber#isCancelled() cancelled} and this
 398      * scope was created with (or inherited) the {@linkplain Option#PROPAGATE_CANCEL
 399      * PROPAGATE_CANCEL} option then the fiber is scheduled with its cancel status
 400      * set.
 401      *
 402      * @param task the task to execute
 403      * @param <V> the result type
 404      * @return the fiber
 405      * @throws IllegalCallerException if the caller thread or fiber is not
 406      *         executing in the scope
 407      */
 408     public final <V> Fiber<V> schedule(Runnable task) {
 409         return schedule(new Fiber<>(task));
 410     }
 411 
 412     /**
 413      * Creates and schedules a new {@link Fiber fiber} to run the given task in
 414      * this scope. The fiber is scheduled in this scope with the default scheduler.
 415      *
 416      * @param task the task to execute
 417      * @param <V> the result type
 418      * @return the fiber
 419      * @throws IllegalCallerException if the caller thread or fiber is not
 420      *         executing in the scope
 421      */
 422     public final <V> Fiber<V> schedule(Callable<? extends V> task) {
 423         return schedule(new Fiber<V>(task));
 424     }
 425 
 426     /**
 427      * Creates and schedules a new {@link Fiber fiber} to run the given task in
 428      * this scope. The fiber is scheduled in this scope with the given scheduler.
 429      *
 430      * @param scheduler the scheduler
 431      * @param task the task to execute
 432      * @param <V> the result type
 433      * @return the fiber
 434      * @throws IllegalCallerException if the caller thread or fiber is not
 435      *         executing in the scope
 436      */
 437     public final <V> Fiber<V> schedule(Executor scheduler, Runnable task) {
 438         return schedule(new Fiber<>(scheduler, task));
 439     }
 440 
 441     /**
 442      * Creates and schedules a new {@link Fiber fiber} to run the given task in
 443      * this scope. The fiber is scheduled in this scope with the given scheduler.
 444      *
 445      * @param scheduler the scheduler
 446      * @param task the task to execute
 447      * @param <V> the result type
 448      * @return the fiber
 449      * @throws IllegalCallerException if the caller thread or fiber is not
 450      *         executing in the scope
 451      */
 452     public final <V> Fiber<V> schedule(Executor scheduler, Callable<? extends V> task) {
 453         return schedule(new Fiber<>(scheduler, task));
 454     }
 455 
 456     /**
 457      * Invoked by a fiber when the task completes.
 458      *
 459      * @param result the task result
 460      * @param exc the exception
 461      */
 462     void onComplete(Object result, Throwable exc) { }
 463 
 464     /**
 465      * Invoked by the fiber when the task completes.
 466      */
 467     final void afterTask(Object result, Throwable exc) {
 468         onComplete(result, exc);
 469     }
 470 
 471     /**
 472     * Invoked when the owner fiber is cancelled.
 473     */
 474     void onCancel() { }
 475 
 476     /**
 477      * Invoked when a fiber is cancelled
 478      */
 479     final void afterCancel(Fiber<?> fiber) {
 480         if (cancellable && fiber == owner()) {
 481             onCancel();
 482         }
 483     }
 484 
 485     /**
 486      * Invoked on a carrier thread when a fiber terminates.
 487      */
 488     void afterTerminate() {
 489         COUNT.getAndAdd(this, -1);
 490         if (closed && count == 0) {
 491             closeLock.lock();
 492             try {
 493                 Condition condition = closeCondition;
 494                 if (condition != null) {
 495                     condition.signalAll();
 496                 }
 497             } finally {
 498                 closeLock.unlock();
 499             }
 500         }
 501     }
 502 
 503     /**
 504      * Returns true the scope is open.
 505      */
 506     final boolean isOpen() {
 507         return !closed;
 508     }
 509 
 510     /**
 511      * Invoked by the owner thread/fiber when close is invoked.
 512      */
 513     void onClose() { }
 514 
 515     void onAfterClose() { }
 516 
 517     /**
 518      * Closes and exits the scope. This method waits for all fibers scheduled in
 519      * the scope to terminate.
 520      *
 521      * <p> If this scope was created with the {@linkplain Option#CANCEL_AT_CLOSE
 522      * CANCEL_AT_CLOSE} option then all fibers scheduled in the scope that haven't
 523      * terminated are {@link Fiber#cancel() cancelled}.
 524      *
 525      * <p> Once exited, the current thread or fiber will return to the parent
 526      * scope that this thread or fiber was in before it entered this scope. If
 527      * the current fiber is cancelled and is the owner of the parent scope then
 528      * all fibers in the parent scope are cancelled.
 529      *
 530      * <p> If this scope is already closed then the ownner invoking this metho
 531      * has no effect.
 532      *
 533      * @throws IllegalCallerException if the caller is not the owner
 534      * @throws IllegalStateException if the caller is in a nested scope
 535      */
 536     @Override
 537     public final void close() {
 538         if (Strands.currentStrand() != owner)
 539             throw new IllegalCallerException();
 540         if (closed)
 541             return;
 542         if (current() != this)
 543             throw new IllegalStateException();
 544         closed = true;
 545 
 546         try {
 547             onClose();
 548         } finally {
 549             finishClose();
 550         }
 551         onAfterClose();
 552     }
 553 
 554     private void finishClose() {
 555         boolean interrupted = false;
 556         while (count > 0) {
 557             closeLock.lock();
 558             try {
 559                 Condition condition = this.closeCondition;
 560                 if (count > 0) {
 561                     if (condition == null) {
 562                         this.closeCondition = condition = closeLock.newCondition();
 563                     }
 564                     try {
 565                         condition.await();
 566                     } catch (InterruptedException e) {
 567                         interrupted = true;
 568                     }
 569                 }
 570             } finally {
 571                 closeLock.unlock();
 572             }
 573             if (interrupted) {
 574                 onCancel();
 575             }
 576         }
 577 
 578         if (interrupted) {
 579             Strands.interruptSelf();
 580         }
 581 
 582         // restore to parent scope
 583         if (owner instanceof Thread) {
 584             Thread thread = (Thread) owner;
 585             assert thread.scope() == this;
 586             thread.setScope(parent);
 587         } else {
 588             Fiber<?> fiber = (Fiber<?>) owner;
 589             assert fiber.scope() == this;
 590             fiber.setScope(parent);
 591         }
 592 
 593         // propagate cancel to parent scope
 594         if (parent != null && parent.owner() == owner && Fiber.cancelled()) {
 595             parent.afterCancel((Fiber<?>) owner);
 596         }
 597     }
 598 }
 599 
 600 /**
 601  * The background (top-most) FiberScope that does not track fibers scheduled in
 602  * the scope.
 603  */
 604 class BackgroundFiberScope extends FiberScope {
 605     BackgroundFiberScope() {
 606         super(null, true);
 607     }
 608 
 609     @Override
 610     <V> Fiber<V> schedule(Fiber<V> fiber) {
 611         return fiber.schedule(this);
 612     }
 613 
 614     @Override
 615     void afterTerminate() {
 616         // do nothing
 617     }
 618 }
 619 
 620 /**
 621  * A FiberScope that supports cancelling fibers scheduled in the scope. The
 622  * fibers can be cancelled when the owner is cancelled and/or when the owner
 623  * closes the scope.
 624  */
 625 class CancellingFiberScope extends FiberScope {
 626     private final Set<Fiber<?>> fibers = ConcurrentHashMap.newKeySet();
 627     private final boolean cancelAtClose;
 628     private final boolean propagateCancel;
 629     private volatile boolean cancelled;
 630 
 631     CancellingFiberScope(boolean cancelAtClose, boolean propagateCancel) {
 632         super(true);
 633         this.cancelAtClose = cancelAtClose;
 634         this.propagateCancel = propagateCancel;
 635     }
 636 
 637     @Override
 638     boolean propagatesCancel() {
 639         return propagateCancel;
 640     }
 641 
 642     /**
 643      * Invoked on the thread or fiber in the scope when scheduling a newly
 644      * created fiber to execute.
 645      */
 646     @Override
 647     void onSchedule(Fiber<?> fiber) {
 648         assert fiber.getState() == Thread.State.NEW;
 649         fibers.add(fiber);
 650         if (cancelled) {
 651             fiber.cancel();
 652         }
 653     }
 654 
 655     /**
 656      * Invoked by the fiber when the task completes.
 657      */
 658     @Override
 659     void onComplete(Object result, Throwable exc) {
 660         Fiber<?> fiber = Fiber.currentFiber();
 661         assert fiber != null;
 662         fibers.remove(fiber);
 663     }
 664 
 665     /**
 666      * Invoked on the (potentially arbitrary) thread or fiber when it cancels
 667      * the scope owner.
 668      */
 669     @Override
 670     void onCancel() {
 671         cancelled = true;
 672         if (propagateCancel) {
 673             fibers.forEach(Fiber::cancel);
 674         }
 675     }
 676 
 677     /**
 678      * Invoked by the owner when it closes the scope.
 679      */
 680     @Override
 681     void onClose() {
 682         if (cancelAtClose) {
 683             cancelled = true;
 684             fibers.forEach(Fiber::cancel);
 685         }
 686     }
 687 }
 688 
 689 /**
 690  * A FiberScope that supports cancelling all fibers scheduled in the scope when
 691  * a deadline is reached.
 692  */
 693 class TimedFiberScope extends CancellingFiberScope {
 694     private final Instant deadline;
 695     private final Future<?> timer;
 696 
 697     TimedFiberScope(Instant deadline, boolean cancelAtClose) {
 698         super(cancelAtClose, true);
 699         Duration timeout = Duration.between(Instant.now(), deadline);
 700         long nanos = TimeUnit.NANOSECONDS.convert(timeout);
 701         this.deadline = deadline;
 702         this.timer = timeoutScheduler.schedule(this::timeoutExpired, nanos, TimeUnit.NANOSECONDS);
 703     }
 704 
 705     Instant deadline() {
 706         return deadline;
 707     }
 708 
 709     /**
 710      * Invoked when the deadline is reached before the timer task is cancelled.
 711      */
 712     private void timeoutExpired() {
 713         // interrupt or cancel owner if it hasn't invoked close
 714         if (isOpen()) {
 715             Object owner = owner();
 716             if (owner instanceof Fiber) {
 717                 ((Fiber<?>) owner).cancel();
 718             } else {
 719                 ((Thread) owner).interrupt();
 720             }
 721         }
 722 
 723         // cancels the fibers scheduled in the scope
 724         onCancel();
 725     }
 726 
 727     /**
 728      * Invoked on the thread or fiber in the scope when scheduling a newly
 729      * created fiber to execute.
 730      */
 731     @Override
 732     void onSchedule(Fiber<?> fiber) {
 733         super.onSchedule(fiber);
 734 
 735         // cancel the fiber if the deadline has been reached
 736         if (timer.isDone()) {
 737             fiber.cancel();
 738         }
 739     }
 740 
 741     /**
 742      * Invoked by the owner thread/fiber after the scope is closed.
 743      */
 744     @Override
 745     void onAfterClose() {
 746         timer.cancel(false);
 747     }
 748 
 749     private static final ScheduledExecutorService timeoutScheduler = timeoutScheduler();
 750     private static ScheduledExecutorService timeoutScheduler() {
 751         ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
 752             Executors.newScheduledThreadPool(1, r ->
 753                 AccessController.doPrivileged(new PrivilegedAction<>() {
 754                     public Thread run() {
 755                         Thread t = new Thread(r);
 756                         t.setDaemon(true);
 757                         return t;
 758                     }
 759                 }));
 760         stpe.setRemoveOnCancelPolicy(true);
 761         return stpe;
 762     }
 763 }