1 /*
  2  * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package java.util.concurrent;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.util.ArrayList;
 30 import java.util.Comparator;
 31 import java.util.List;
 32 import java.util.NoSuchElementException;
 33 import java.util.Objects;
 34 import java.util.concurrent.StructuredTaskScope.Joiner;
 35 import java.util.concurrent.StructuredTaskScope.Subtask;
 36 import java.util.function.Predicate;
 37 import java.util.stream.Stream;
 38 import jdk.internal.invoke.MhUtil;
 39 
 40 /**
 41  * Built-in StructuredTaskScope.Joiner implementations.
 42  */
 43 class Joiners {
 44     private Joiners() { }
 45 
 46     /**
 47      * Throws IllegalArgumentException if the subtask is not in the UNAVAILABLE state.
 48      */
 49     private static void ensureUnavailable(Subtask<?> subtask) {
 50         if (subtask.state() != Subtask.State.UNAVAILABLE) {
 51             throw new IllegalArgumentException("Subtask not in UNAVAILABLE state");
 52         }
 53     }
 54 
 55     /**
 56      * Throws IllegalArgumentException if the subtask has not completed.
 57      */
 58     private static Subtask.State ensureCompleted(Subtask<?> subtask) {
 59         Subtask.State state = subtask.state();
 60         if (state == Subtask.State.UNAVAILABLE) {
 61             throw new IllegalArgumentException("Subtask has not completed");
 62         }
 63         return state;
 64     }
 65 
 66     /**
 67      * A joiner that returns a stream of all subtasks when all subtasks complete
 68      * successfully. Cancels the scope if any subtask fails.
 69      */
 70     static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
 71         private static final VarHandle FIRST_EXCEPTION =
 72                 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
 73 
 74         // list of forked subtasks, only accessed by owner thread
 75         private final List<Subtask<T>> subtasks = new ArrayList<>();
 76 
 77         private volatile Throwable firstException;
 78 
 79         @Override
 80         public boolean onFork(Subtask<? extends T> subtask) {
 81             ensureUnavailable(subtask);
 82             @SuppressWarnings("unchecked")
 83             var s = (Subtask<T>) subtask;
 84             subtasks.add(s);
 85             return false;
 86         }
 87 
 88         @Override
 89         public boolean onComplete(Subtask<? extends T> subtask) {
 90             Subtask.State state = ensureCompleted(subtask);
 91             return (state == Subtask.State.FAILED)
 92                     && (firstException == null)
 93                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
 94         }
 95 
 96         @Override
 97         public Stream<Subtask<T>> result() throws Throwable {
 98             Throwable ex = firstException;
 99             if (ex != null) {
100                 throw ex;
101             } else {
102                 return subtasks.stream();
103             }
104         }
105     }
106 
107     /**
108      * A joiner that returns the result of the first subtask to complete successfully.
109      * Cancels the scope if any subtasks succeeds.
110      */
111     static final class AnySuccessful<T> implements Joiner<T, T> {
112         private static final VarHandle SUBTASK =
113                 MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class);
114 
115         // UNAVAILABLE < FAILED < SUCCESS
116         private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR =
117                 Comparator.comparingInt(AnySuccessful::stateToInt);
118 
119         private volatile Subtask<T> subtask;
120 
121         /**
122          * Maps a Subtask.State to an int that can be compared.
123          */
124         private static int stateToInt(Subtask.State s) {
125             return switch (s) {
126                 case UNAVAILABLE -> 0;
127                 case FAILED      -> 1;
128                 case SUCCESS     -> 2;
129             };
130         }
131 
132         @Override
133         public boolean onComplete(Subtask<? extends T> subtask) {
134             Subtask.State state = ensureCompleted(subtask);
135             Subtask<T> s;
136             while (((s = this.subtask) == null)
137                     || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
138                 if (SUBTASK.compareAndSet(this, s, subtask)) {
139                     return (state == Subtask.State.SUCCESS);
140                 }
141             }
142             return false;
143         }
144 
145         @Override
146         public T result() throws Throwable {
147             Subtask<T> subtask = this.subtask;
148             if (subtask == null) {
149                 throw new NoSuchElementException("No subtasks completed");
150             }
151             return switch (subtask.state()) {
152                 case SUCCESS -> subtask.get();
153                 case FAILED -> throw subtask.exception();
154                 default -> throw new InternalError();
155             };
156         }
157     }
158 
159     /**
160      * A joiner that that waits for all successful subtasks. Cancels the scope if any
161      * subtask fails.
162      */
163     static final class AwaitSuccessful<T> implements Joiner<T, Void> {
164         private static final VarHandle FIRST_EXCEPTION =
165                 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
166         private volatile Throwable firstException;
167 
168         @Override
169         public boolean onComplete(Subtask<? extends T> subtask) {
170             Subtask.State state = ensureCompleted(subtask);
171             return (state == Subtask.State.FAILED)
172                     && (firstException == null)
173                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
174         }
175 
176         @Override
177         public Void result() throws Throwable {
178             Throwable ex = firstException;
179             if (ex != null) {
180                 throw ex;
181             } else {
182                 return null;
183             }
184         }
185     }
186 
187     /**
188      * A joiner that returns a stream of all subtasks.
189      */
190     static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
191         private final Predicate<Subtask<? extends T>> isDone;
192 
193         // list of forked subtasks, only accessed by owner thread
194         private final List<Subtask<T>> subtasks = new ArrayList<>();
195 
196         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
197             this.isDone = Objects.requireNonNull(isDone);
198         }
199 
200         @Override
201         public boolean onFork(Subtask<? extends T> subtask) {
202             ensureUnavailable(subtask);
203             @SuppressWarnings("unchecked")
204             var s = (Subtask<T>) subtask;
205             subtasks.add(s);
206             return false;
207         }
208 
209         @Override
210         public boolean onComplete(Subtask<? extends T> subtask) {
211             ensureCompleted(subtask);
212             return isDone.test(subtask);
213         }
214 
215         @Override
216         public Stream<Subtask<T>> result() {
217             return subtasks.stream();
218         }
219     }
220 }