1 /*
  2  * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.  Oracle designates this
  8  * particular file as subject to the "Classpath" exception as provided
  9  * by Oracle in the LICENSE file that accompanied this code.
 10  *
 11  * This code is distributed in the hope that it will be useful, but WITHOUT
 12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 14  * version 2 for more details (a copy is included in the LICENSE file that
 15  * accompanied this code).
 16  *
 17  * You should have received a copy of the GNU General Public License version
 18  * 2 along with this work; if not, write to the Free Software Foundation,
 19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 20  *
 21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 22  * or visit www.oracle.com if you need additional information or have any
 23  * questions.
 24  */
 25 package java.util.concurrent;
 26 
 27 import java.lang.invoke.MethodHandles;
 28 import java.lang.invoke.VarHandle;
 29 import java.util.ArrayList;
 30 import java.util.Comparator;
 31 import java.util.List;
 32 import java.util.NoSuchElementException;
 33 import java.util.Objects;
 34 import java.util.concurrent.StructuredTaskScope.Joiner;
 35 import java.util.concurrent.StructuredTaskScope.Subtask;
 36 import java.util.function.Predicate;
 37 import java.util.stream.Stream;
 38 import jdk.internal.invoke.MhUtil;
 39 
 40 /**
 41  * Built-in StructuredTaskScope.Joiner implementations.
 42  */
 43 class Joiners {
 44     private Joiners() { }
 45 
 46     /**
 47      * A joiner that returns a stream of all subtasks when all subtasks complete
 48      * successfully. Cancels the scope if any subtask fails.
 49      */
 50     static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
 51         private static final VarHandle FIRST_EXCEPTION =
 52                 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
 53 
 54         // list of forked subtasks, only accessed by owner thread
 55         private final List<Subtask<T>> subtasks = new ArrayList<>();
 56 
 57         private volatile Throwable firstException;
 58 
 59         @Override
 60         public boolean onFork(Subtask<? extends T> subtask) {
 61             if (subtask.state() != Subtask.State.UNAVAILABLE) {
 62                 throw new IllegalArgumentException();
 63             }
 64             @SuppressWarnings("unchecked")
 65             var s = (Subtask<T>) subtask;
 66             subtasks.add(s);
 67             return false;
 68         }
 69 
 70         @Override
 71         public boolean onComplete(Subtask<? extends T> subtask) {
 72             Subtask.State state = subtask.state();
 73             if (state == Subtask.State.UNAVAILABLE) {
 74                 throw new IllegalArgumentException();
 75             }
 76             return (state == Subtask.State.FAILED)
 77                     && (firstException == null)
 78                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
 79         }
 80 
 81         @Override
 82         public Stream<Subtask<T>> result() throws Throwable {
 83             Throwable ex = firstException;
 84             if (ex != null) {
 85                 throw ex;
 86             } else {
 87                 return subtasks.stream();
 88             }
 89         }
 90     }
 91 
 92     /**
 93      * A joiner that returns the result of the first subtask to complete successfully.
 94      * Cancels the scope if any subtasks succeeds.
 95      */
 96     static final class AnySuccessful<T> implements Joiner<T, T> {
 97         private static final VarHandle SUBTASK =
 98                 MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class);
 99 
100         // UNAVAILABLE < FAILED < SUCCESS
101         private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR =
102                 Comparator.comparingInt(AnySuccessful::stateToInt);
103 
104         private volatile Subtask<T> subtask;
105 
106         /**
107          * Maps a Subtask.State to an int that can be compared.
108          */
109         private static int stateToInt(Subtask.State s) {
110             return switch (s) {
111                 case UNAVAILABLE -> 0;
112                 case FAILED      -> 1;
113                 case SUCCESS     -> 2;
114             };
115         }
116 
117         @Override
118         public boolean onComplete(Subtask<? extends T> subtask) {
119             Subtask.State state = subtask.state();
120             if (state == Subtask.State.UNAVAILABLE) {
121                 throw new IllegalArgumentException();
122             }
123             Subtask<T> s;
124             while (((s = this.subtask) == null)
125                     || SUBTASK_STATE_COMPARATOR.compare(s.state(), state) < 0) {
126                 if (SUBTASK.compareAndSet(this, s, subtask)) {
127                     return (state == Subtask.State.SUCCESS);
128                 }
129             }
130             return false;
131         }
132 
133         @Override
134         public T result() throws Throwable {
135             Subtask<T> subtask = this.subtask;
136             if (subtask == null) {
137                 throw new NoSuchElementException("No subtasks completed");
138             }
139             return switch (subtask.state()) {
140                 case SUCCESS -> subtask.get();
141                 case FAILED -> throw subtask.exception();
142                 default -> throw new InternalError();
143             };
144         }
145     }
146 
147     /**
148      * A joiner that that waits for all successful subtasks. Cancels the scope if any
149      * subtask fails.
150      */
151     static final class AwaitSuccessful<T> implements Joiner<T, Void> {
152         private static final VarHandle FIRST_EXCEPTION =
153                 MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
154         private volatile Throwable firstException;
155 
156         @Override
157         public boolean onComplete(Subtask<? extends T> subtask) {
158             Subtask.State state = subtask.state();
159             if (state == Subtask.State.UNAVAILABLE) {
160                 throw new IllegalArgumentException();
161             }
162             return (state == Subtask.State.FAILED)
163                     && (firstException == null)
164                     && FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
165         }
166 
167         @Override
168         public Void result() throws Throwable {
169             Throwable ex = firstException;
170             if (ex != null) {
171                 throw ex;
172             } else {
173                 return null;
174             }
175         }
176     }
177 
178     /**
179      * A joiner that returns a stream of all subtasks.
180      */
181     static class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
182         private final Predicate<Subtask<? extends T>> isDone;
183 
184         // list of forked subtasks, only accessed by owner thread
185         private final List<Subtask<T>> subtasks = new ArrayList<>();
186 
187         AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
188             this.isDone = Objects.requireNonNull(isDone);
189         }
190 
191         @Override
192         public boolean onFork(Subtask<? extends T> subtask) {
193             if (subtask.state() != Subtask.State.UNAVAILABLE) {
194                 throw new IllegalArgumentException();
195             }
196             @SuppressWarnings("unchecked")
197             var s = (Subtask<T>) subtask;
198             subtasks.add(s);
199             return false;
200         }
201 
202         @Override
203         public boolean onComplete(Subtask<? extends T> subtask) {
204             if (subtask.state() == Subtask.State.UNAVAILABLE) {
205                 throw new IllegalArgumentException();
206             }
207             return isDone.test(subtask);
208         }
209 
210         @Override
211         public Stream<Subtask<T>> result() {
212             return subtasks.stream();
213         }
214     }
215 }