1 /*
   2  * Copyright (c) 2016, 2023, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.BufferedReader;
  29 import java.io.FilePermission;
  30 import java.io.IOException;
  31 import java.io.InputStream;
  32 import java.io.InputStreamReader;
  33 import java.nio.ByteBuffer;
  34 import java.nio.channels.FileChannel;
  35 import java.nio.charset.Charset;
  36 import java.nio.file.OpenOption;
  37 import java.nio.file.Path;
  38 import java.security.AccessControlContext;
  39 import java.security.AccessController;
  40 import java.security.PrivilegedAction;
  41 import java.security.PrivilegedActionException;
  42 import java.security.PrivilegedExceptionAction;
  43 import java.util.ArrayList;
  44 import java.util.Iterator;
  45 import java.util.List;
  46 import java.util.Objects;
  47 import java.util.Optional;
  48 import java.util.concurrent.ArrayBlockingQueue;
  49 import java.util.concurrent.BlockingQueue;
  50 import java.util.concurrent.CompletableFuture;
  51 import java.util.concurrent.CompletionStage;
  52 import java.util.concurrent.Executor;
  53 import java.util.concurrent.Flow;
  54 import java.util.concurrent.Flow.Subscriber;
  55 import java.util.concurrent.Flow.Subscription;
  56 import java.util.concurrent.atomic.AtomicBoolean;
  57 import java.util.concurrent.atomic.AtomicReference;
  58 import java.util.function.Consumer;
  59 import java.util.function.Function;
  60 import java.util.stream.Stream;
  61 import java.net.http.HttpResponse.BodySubscriber;
  62 import jdk.internal.net.http.common.Log;
  63 import jdk.internal.net.http.common.Logger;
  64 import jdk.internal.net.http.common.MinimalFuture;
  65 import jdk.internal.net.http.common.Utils;
  66 import jdk.internal.net.http.HttpClientImpl.DelegatingExecutor;
  67 import static java.nio.charset.StandardCharsets.UTF_8;
  68 
  69 public class ResponseSubscribers {
  70 
  71     /**
  72      * This interface is used by our BodySubscriber implementations to
  73      * declare whether calling getBody() inline is safe, or whether
  74      * it needs to be called asynchronously in an executor thread.
  75      * Calling getBody() inline is usually safe except when it
  76      * might block - which can be the case if the BodySubscriber
  77      * is provided by custom code, or if it uses a finisher that
  78      * might be called and might block before the last bit is
  79      * received (for instance, if a mapping subscriber is used with
  80      * a mapper function that maps an InputStream to a GZIPInputStream,
  81      * as the constructor of GZIPInputStream calls read()).
  82      * @param <T> The response type.
  83      */
  84     public interface TrustedSubscriber<T> extends BodySubscriber<T> {
  85         /**
  86          * Returns true if getBody() should be called asynchronously.
  87          * @implSpec The default implementation of this method returns
  88          *           false.
  89          * @return true if getBody() should be called asynchronously.
  90          */
  91         default boolean needsExecutor() { return false;}
  92 
  93         /**
  94          * Returns true if calling {@code bs::getBody} might block
  95          * and requires an executor.
  96          *
  97          * @implNote
  98          * In particular this method returns
  99          * true if {@code bs} is not a {@code TrustedSubscriber}.
 100          * If it is a {@code TrustedSubscriber}, it returns
 101          * {@code ((TrustedSubscriber) bs).needsExecutor()}.
 102          *
 103          * @param bs A BodySubscriber.
 104          * @return true if calling {@code bs::getBody} requires using
 105          *         an executor.
 106          */
 107         static boolean needsExecutor(BodySubscriber<?> bs) {
 108             if (bs instanceof TrustedSubscriber) {
 109                 return ((TrustedSubscriber) bs).needsExecutor();
 110             } else return true;
 111         }
 112     }
 113 
 114     public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
 115         private final Consumer<Optional<byte[]>> consumer;
 116         private Flow.Subscription subscription;
 117         private final CompletableFuture<Void> result = new MinimalFuture<>();
 118         private final AtomicBoolean subscribed = new AtomicBoolean();
 119 
 120         public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
 121             this.consumer = Objects.requireNonNull(consumer);
 122         }
 123 
 124         @Override
 125         public CompletionStage<Void> getBody() {
 126             return result;
 127         }
 128 
 129         @Override
 130         public void onSubscribe(Flow.Subscription subscription) {
 131             Objects.requireNonNull(subscription);
 132             if (!subscribed.compareAndSet(false, true)) {
 133                 subscription.cancel();
 134             } else {
 135                 this.subscription = subscription;
 136                 subscription.request(1);
 137             }
 138         }
 139 
 140         @Override
 141         public void onNext(List<ByteBuffer> items) {
 142             Objects.requireNonNull(items);
 143             for (ByteBuffer item : items) {
 144                 byte[] buf = new byte[item.remaining()];
 145                 item.get(buf);
 146                 consumer.accept(Optional.of(buf));
 147             }
 148             subscription.request(1);
 149         }
 150 
 151         @Override
 152         public void onError(Throwable throwable) {
 153             Objects.requireNonNull(throwable);
 154             result.completeExceptionally(throwable);
 155         }
 156 
 157         @Override
 158         public void onComplete() {
 159             consumer.accept(Optional.empty());
 160             result.complete(null);
 161         }
 162 
 163     }
 164 
 165     /**
 166      * A Subscriber that writes the flow of data to a given file.
 167      *
 168      * Privileged actions are performed within a limited doPrivileged that only
 169      * asserts the specific, write, file permissions that were checked during
 170      * the construction of this PathSubscriber.
 171      */
 172     public static class PathSubscriber implements TrustedSubscriber<Path> {
 173 
 174         private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
 175 
 176         private final Path file;
 177         private final OpenOption[] options;
 178         @SuppressWarnings("removal")
 179         private final AccessControlContext acc;
 180         private final FilePermission[] filePermissions;
 181         private final boolean isDefaultFS;
 182         private final CompletableFuture<Path> result = new MinimalFuture<>();
 183 
 184         private final AtomicBoolean subscribed = new AtomicBoolean();
 185         private volatile Flow.Subscription subscription;
 186         private volatile FileChannel out;
 187 
 188         private static final String pathForSecurityCheck(Path path) {
 189             return path.toFile().getPath();
 190         }
 191 
 192         /**
 193          * Factory for creating PathSubscriber.
 194          *
 195          * Permission checks are performed here before construction of the
 196          * PathSubscriber. Permission checking and construction are deliberately
 197          * and tightly co-located.
 198          */
 199         public static PathSubscriber create(Path file,
 200                                             List<OpenOption> options) {
 201             @SuppressWarnings("removal")
 202             SecurityManager sm = System.getSecurityManager();
 203             FilePermission filePermission = null;
 204             if (sm != null) {
 205                 try {
 206                     String fn = pathForSecurityCheck(file);
 207                     FilePermission writePermission = new FilePermission(fn, "write");
 208                     sm.checkPermission(writePermission);
 209                     filePermission = writePermission;
 210                 } catch (UnsupportedOperationException ignored) {
 211                     // path not associated with the default file system provider
 212                 }
 213             }
 214 
 215             assert filePermission == null || filePermission.getActions().equals("write");
 216             @SuppressWarnings("removal")
 217             AccessControlContext acc = sm != null ? AccessController.getContext() : null;
 218             return new PathSubscriber(file, options, acc, filePermission);
 219         }
 220 
 221         // pp so handler implementations in the same package can construct
 222         /*package-private*/ PathSubscriber(Path file,
 223                                            List<OpenOption> options,
 224                                            @SuppressWarnings("removal") AccessControlContext acc,
 225                                            FilePermission... filePermissions) {
 226             this.file = file;
 227             this.options = options.stream().toArray(OpenOption[]::new);
 228             this.acc = acc;
 229             this.filePermissions = filePermissions == null || filePermissions[0] == null
 230                             ? EMPTY_FILE_PERMISSIONS : filePermissions;
 231             this.isDefaultFS = isDefaultFS(file);
 232         }
 233 
 234         private static boolean isDefaultFS(Path file) {
 235             try {
 236                 file.toFile();
 237                 return true;
 238             } catch (UnsupportedOperationException uoe) {
 239                 return false;
 240             }
 241         }
 242 
 243         @SuppressWarnings("removal")
 244         @Override
 245         public void onSubscribe(Flow.Subscription subscription) {
 246             Objects.requireNonNull(subscription);
 247             if (!subscribed.compareAndSet(false, true)) {
 248                 subscription.cancel();
 249                 return;
 250             }
 251 
 252             this.subscription = subscription;
 253             if (acc == null) {
 254                 try {
 255                     out = FileChannel.open(file, options);
 256                 } catch (IOException ioe) {
 257                     result.completeExceptionally(ioe);
 258                     subscription.cancel();
 259                     return;
 260                 }
 261             } else {
 262                 try {
 263                     PrivilegedExceptionAction<FileChannel> pa =
 264                             () -> FileChannel.open(file, options);
 265                     out = isDefaultFS
 266                             ? AccessController.doPrivileged(pa, acc, filePermissions)
 267                             : AccessController.doPrivileged(pa, acc);
 268                 } catch (PrivilegedActionException pae) {
 269                     Throwable t = pae.getCause() != null ? pae.getCause() : pae;
 270                     result.completeExceptionally(t);
 271                     subscription.cancel();
 272                     return;
 273                 } catch (Exception e) {
 274                     result.completeExceptionally(e);
 275                     subscription.cancel();
 276                     return;
 277                 }
 278             }
 279             subscription.request(1);
 280         }
 281 
 282         @Override
 283         public void onNext(List<ByteBuffer> items) {
 284             try {
 285                 ByteBuffer[] buffers = items.toArray(Utils.EMPTY_BB_ARRAY);
 286                 do {
 287                     out.write(buffers);
 288                 } while (Utils.hasRemaining(buffers));
 289             } catch (IOException ex) {
 290                 close();
 291                 subscription.cancel();
 292                 result.completeExceptionally(ex);
 293             }
 294             subscription.request(1);
 295         }
 296 
 297         @Override
 298         public void onError(Throwable e) {
 299             result.completeExceptionally(e);
 300             close();
 301         }
 302 
 303         @Override
 304         public void onComplete() {
 305             close();
 306             result.complete(file);
 307         }
 308 
 309         @Override
 310         public CompletionStage<Path> getBody() {
 311             return result;
 312         }
 313 
 314         @SuppressWarnings("removal")
 315         private void close() {
 316             if (acc == null) {
 317                 Utils.close(out);
 318             } else {
 319                 PrivilegedAction<Void> pa = () -> {
 320                     Utils.close(out);
 321                     return null;
 322                 };
 323                 if (isDefaultFS) {
 324                     AccessController.doPrivileged(pa, acc, filePermissions);
 325                 } else {
 326                     AccessController.doPrivileged(pa, acc);
 327                 }
 328             }
 329         }
 330     }
 331 
 332     public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
 333         private final Function<byte[], T> finisher;
 334         private final CompletableFuture<T> result = new MinimalFuture<>();
 335         private final List<ByteBuffer> received = new ArrayList<>();
 336 
 337         private volatile Flow.Subscription subscription;
 338 
 339         public ByteArraySubscriber(Function<byte[],T> finisher) {
 340             this.finisher = finisher;
 341         }
 342 
 343         @Override
 344         public void onSubscribe(Flow.Subscription subscription) {
 345             if (this.subscription != null) {
 346                 subscription.cancel();
 347                 return;
 348             }
 349             this.subscription = subscription;
 350             // We can handle whatever you've got
 351             subscription.request(Long.MAX_VALUE);
 352         }
 353 
 354         @Override
 355         public void onNext(List<ByteBuffer> items) {
 356             // incoming buffers are allocated by http client internally,
 357             // and won't be used anywhere except this place.
 358             // So it's free simply to store them for further processing.
 359             assert Utils.hasRemaining(items);
 360             received.addAll(items);
 361         }
 362 
 363         @Override
 364         public void onError(Throwable throwable) {
 365             received.clear();
 366             result.completeExceptionally(throwable);
 367         }
 368 
 369         private static byte[] join(List<ByteBuffer> bytes) {
 370             int size = Utils.remaining(bytes, Integer.MAX_VALUE);
 371             byte[] res = new byte[size];
 372             int from = 0;
 373             for (ByteBuffer b : bytes) {
 374                 int l = b.remaining();
 375                 b.get(res, from, l);
 376                 from += l;
 377             }
 378             return res;
 379         }
 380 
 381         @Override
 382         public void onComplete() {
 383             try {
 384                 result.complete(finisher.apply(join(received)));
 385                 received.clear();
 386             } catch (IllegalArgumentException e) {
 387                 result.completeExceptionally(e);
 388             }
 389         }
 390 
 391         @Override
 392         public CompletionStage<T> getBody() {
 393             return result;
 394         }
 395     }
 396 
 397     /**
 398      * An InputStream built on top of the Flow API.
 399      */
 400     public static class HttpResponseInputStream extends InputStream
 401         implements TrustedSubscriber<InputStream>
 402     {
 403         static final int MAX_BUFFERS_IN_QUEUE = 1;  // lock-step with the producer
 404 
 405         // An immutable ByteBuffer sentinel to mark that the last byte was received.
 406         private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
 407         private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
 408         private static final Logger debug =
 409                 Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);
 410 
 411         // A queue of yet unprocessed ByteBuffers received from the flow API.
 412         private final BlockingQueue<List<ByteBuffer>> buffers;
 413         private volatile Flow.Subscription subscription;
 414         private volatile boolean closed;
 415         private volatile Throwable failed;
 416         private volatile Iterator<ByteBuffer> currentListItr;
 417         private volatile ByteBuffer currentBuffer;
 418         private final AtomicBoolean subscribed = new AtomicBoolean();
 419 
 420         public HttpResponseInputStream() {
 421             this(MAX_BUFFERS_IN_QUEUE);
 422         }
 423 
 424         HttpResponseInputStream(int maxBuffers) {
 425             int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
 426             // 1 additional slot needed for LAST_LIST added by onComplete
 427             this.buffers = new ArrayBlockingQueue<>(capacity + 1);
 428         }
 429 
 430         @Override
 431         public CompletionStage<InputStream> getBody() {
 432             // Returns the stream immediately, before the
 433             // response body is received.
 434             // This makes it possible for sendAsync().get().body()
 435             // to complete before the response body is received.
 436             return CompletableFuture.completedStage(this);
 437         }
 438 
 439         // Returns the current byte buffer to read from.
 440         // If the current buffer has no remaining data, this method will take the
 441         // next buffer from the buffers queue, possibly blocking until
 442         // a new buffer is made available through the Flow API, or the
 443         // end of the flow has been reached.
 444         private ByteBuffer current() throws IOException {
 445             while (currentBuffer == null || !currentBuffer.hasRemaining()) {
 446                 // Check whether the stream is closed or exhausted
 447                 if (closed || failed != null) {
 448                     throw new IOException("closed", failed);
 449                 }
 450                 if (currentBuffer == LAST_BUFFER) break;
 451 
 452                 try {
 453                     if (currentListItr == null || !currentListItr.hasNext()) {
 454                         // Take a new list of buffers from the queue, blocking
 455                         // if none is available yet...
 456 
 457                         if (debug.on()) debug.log("Taking list of Buffers");
 458                         List<ByteBuffer> lb = buffers.take();
 459                         currentListItr = lb.iterator();
 460                         if (debug.on()) debug.log("List of Buffers Taken");
 461 
 462                         // Check whether an exception was encountered upstream
 463                         if (closed || failed != null)
 464                             throw new IOException("closed", failed);
 465 
 466                         // Check whether we're done.
 467                         if (lb == LAST_LIST) {
 468                             currentListItr = null;
 469                             currentBuffer = LAST_BUFFER;
 470                             break;
 471                         }
 472 
 473                         // Request another upstream item ( list of buffers )
 474                         Flow.Subscription s = subscription;
 475                         if (s != null) {
 476                             if (debug.on()) debug.log("Increased demand by 1");
 477                             s.request(1);
 478                         }
 479                         assert currentListItr != null;
 480                         if (lb.isEmpty()) continue;
 481                     }
 482                     assert currentListItr != null;
 483                     assert currentListItr.hasNext();
 484                     if (debug.on()) debug.log("Next Buffer");
 485                     currentBuffer = currentListItr.next();
 486                 } catch (InterruptedException ex) {
 487                     try {
 488                         close();
 489                     } catch (IOException ignored) {
 490                     }
 491                     Thread.currentThread().interrupt();
 492                     throw new IOException(ex);
 493                 }
 494             }
 495             assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
 496             return currentBuffer;
 497         }
 498 
 499         @Override
 500         public int read(byte[] bytes, int off, int len) throws IOException {
 501             Objects.checkFromIndexSize(off, len, bytes.length);
 502             if (len == 0) {
 503                 return 0;
 504             }
 505             // get the buffer to read from, possibly blocking if
 506             // none is available
 507             ByteBuffer buffer;
 508             if ((buffer = current()) == LAST_BUFFER) return -1;
 509 
 510             // don't attempt to read more than what is available
 511             // in the current buffer.
 512             int read = Math.min(buffer.remaining(), len);
 513             assert read > 0 && read <= buffer.remaining();
 514 
 515             // buffer.get() will do the boundary check for us.
 516             buffer.get(bytes, off, read);
 517             return read;
 518         }
 519 
 520         @Override
 521         public int read() throws IOException {
 522             ByteBuffer buffer;
 523             if ((buffer = current()) == LAST_BUFFER) return -1;
 524             return buffer.get() & 0xFF;
 525         }
 526 
 527         @Override
 528         public int available() throws IOException {
 529             // best effort: returns the number of remaining bytes in
 530             // the current buffer if any, or 1 if the current buffer
 531             // is null or empty but the queue or current buffer list
 532             // are not empty. Returns 0 otherwise.
 533             if (closed) return 0;
 534             int available = 0;
 535             ByteBuffer current = currentBuffer;
 536             if (current == LAST_BUFFER) return 0;
 537             if (current != null) available = current.remaining();
 538             if (available != 0) return available;
 539             Iterator<?> iterator = currentListItr;
 540             if (iterator != null && iterator.hasNext()) return 1;
 541             if (buffers.isEmpty()) return 0;
 542             return 1;
 543         }
 544 
 545         @Override
 546         public void onSubscribe(Flow.Subscription s) {
 547             Objects.requireNonNull(s);
 548             if (debug.on()) debug.log("onSubscribe called");
 549             try {
 550                 if (!subscribed.compareAndSet(false, true)) {
 551                     if (debug.on()) debug.log("Already subscribed: canceling");
 552                     s.cancel();
 553                 } else {
 554                     // check whether the stream is already closed.
 555                     // if so, we should cancel the subscription
 556                     // immediately.
 557                     boolean closed;
 558                     synchronized (this) {
 559                         closed = this.closed;
 560                         if (!closed) {
 561                             this.subscription = s;
 562                             // should contain at least 2, unless closed or failed.
 563                             assert buffers.remainingCapacity() > 1 || failed != null
 564                                     : "buffers capacity: " + buffers.remainingCapacity()
 565                                     + ", closed: " + closed + ", terminated: "
 566                                     + buffers.contains(LAST_LIST)
 567                                     + ", failed: " + failed;
 568                         }
 569                     }
 570                     if (closed) {
 571                         if (debug.on()) debug.log("Already closed: canceling");
 572                         s.cancel();
 573                         return;
 574                     }
 575                     if (debug.on())
 576                         debug.log("onSubscribe: requesting "
 577                                   + Math.max(1, buffers.remainingCapacity() - 1));
 578                     s.request(Math.max(1, buffers.remainingCapacity() - 1));
 579                 }
 580             } catch (Throwable t) {
 581                 failed = t;
 582                 if (debug.on())
 583                     debug.log("onSubscribe failed", t);
 584                 try {
 585                     close();
 586                 } catch (IOException x) {
 587                     // OK
 588                 } finally {
 589                     onError(t);
 590                 }
 591             }
 592         }
 593 
 594         @Override
 595         public void onNext(List<ByteBuffer> t) {
 596             Objects.requireNonNull(t);
 597             try {
 598                 if (debug.on()) debug.log("next item received");
 599                 if (!buffers.offer(t)) {
 600                     throw new IllegalStateException("queue is full");
 601                 }
 602                 if (debug.on()) debug.log("item offered");
 603             } catch (Throwable ex) {
 604                 failed = ex;
 605                 try {
 606                     close();
 607                 } catch (IOException ex1) {
 608                     // OK
 609                 } finally {
 610                     onError(ex);
 611                 }
 612             }
 613         }
 614 
 615         @Override
 616         public void onError(Throwable thrwbl) {
 617             if (debug.on())
 618                 debug.log("onError called: " + thrwbl);
 619             subscription = null;
 620             failed = Objects.requireNonNull(thrwbl);
 621             // The client process that reads the input stream might
 622             // be blocked in queue.take().
 623             // Tries to offer LAST_LIST to the queue. If the queue is
 624             // full we don't care if we can't insert this buffer, as
 625             // the client can't be blocked in queue.take() in that case.
 626             // Adding LAST_LIST to the queue is harmless, as the client
 627             // should find failed != null before handling LAST_LIST.
 628             buffers.offer(LAST_LIST);
 629         }
 630 
 631         @Override
 632         public void onComplete() {
 633             if (debug.on())
 634                 debug.log("onComplete called");
 635             subscription = null;
 636             onNext(LAST_LIST);
 637         }
 638 
 639         @Override
 640         public void close() throws IOException {
 641             Flow.Subscription s;
 642             synchronized (this) {
 643                 if (closed) return;
 644                 closed = true;
 645                 s = subscription;
 646                 subscription = null;
 647             }
 648             if (debug.on())
 649                 debug.log("close called");
 650             // s will be null if already completed
 651             try {
 652                 if (s != null) {
 653                     s.cancel();
 654                 }
 655             } finally {
 656                 buffers.offer(LAST_LIST);
 657                 super.close();
 658             }
 659         }
 660 
 661     }
 662 
 663     public static BodySubscriber<Stream<String>> createLineStream() {
 664         return createLineStream(UTF_8);
 665     }
 666 
 667     public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
 668         Objects.requireNonNull(charset);
 669         BodySubscriber<InputStream> s = new HttpResponseInputStream();
 670         // Creates a MappingSubscriber with a trusted finisher that is
 671         // trusted not to block.
 672         return new MappingSubscriber<InputStream,Stream<String>>(s,
 673             (InputStream stream) -> {
 674                 return new BufferedReader(new InputStreamReader(stream, charset))
 675                             .lines().onClose(() -> Utils.close(stream));
 676             }, true);
 677     }
 678 
 679     /**
 680      * Currently this consumes all of the data and ignores it
 681      */
 682     public static class NullSubscriber<T> implements TrustedSubscriber<T> {
 683 
 684         private final CompletableFuture<T> cf = new MinimalFuture<>();
 685         private final Optional<T> result;
 686         private final AtomicBoolean subscribed = new AtomicBoolean();
 687 
 688         public NullSubscriber(Optional<T> result) {
 689             this.result = result;
 690         }
 691 
 692         @Override
 693         public void onSubscribe(Flow.Subscription subscription) {
 694             Objects.requireNonNull(subscription);
 695             if (!subscribed.compareAndSet(false, true)) {
 696                 subscription.cancel();
 697             } else {
 698                 subscription.request(Long.MAX_VALUE);
 699             }
 700         }
 701 
 702         @Override
 703         public void onNext(List<ByteBuffer> items) {
 704             Objects.requireNonNull(items);
 705         }
 706 
 707         @Override
 708         public void onError(Throwable throwable) {
 709             Objects.requireNonNull(throwable);
 710             cf.completeExceptionally(throwable);
 711         }
 712 
 713         @Override
 714         public void onComplete() {
 715             if (result.isPresent()) {
 716                 cf.complete(result.get());
 717             } else {
 718                 cf.complete(null);
 719             }
 720         }
 721 
 722         @Override
 723         public CompletionStage<T> getBody() {
 724             return cf;
 725         }
 726     }
 727 
 728     /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
 729     public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
 730         implements TrustedSubscriber<R>
 731     {
 732         private final CompletableFuture<R> cf = new MinimalFuture<>();
 733         private final S subscriber;
 734         private final Function<? super S,? extends R> finisher;
 735         private volatile Subscription subscription;
 736 
 737         // The finisher isn't called until all bytes have been received,
 738         // and so shouldn't need an executor. No need to override
 739         // TrustedSubscriber::needsExecutor
 740         public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {
 741             this.subscriber = Objects.requireNonNull(subscriber);
 742             this.finisher = Objects.requireNonNull(finisher);
 743         }
 744 
 745         @Override
 746         public void onSubscribe(Subscription subscription) {
 747             Objects.requireNonNull(subscription);
 748             if (this.subscription != null) {
 749                 subscription.cancel();
 750             } else {
 751                 this.subscription = subscription;
 752                 subscriber.onSubscribe(subscription);
 753             }
 754         }
 755 
 756         @Override
 757         public void onNext(List<ByteBuffer> item) {
 758             Objects.requireNonNull(item);
 759             try {
 760                 subscriber.onNext(item);
 761             } catch (Throwable throwable) {
 762                 subscription.cancel();
 763                 onError(throwable);
 764             }
 765         }
 766 
 767         @Override
 768         public void onError(Throwable throwable) {
 769             Objects.requireNonNull(throwable);
 770             try {
 771                 subscriber.onError(throwable);
 772             } finally {
 773                 cf.completeExceptionally(throwable);
 774             }
 775         }
 776 
 777         @Override
 778         public void onComplete() {
 779             try {
 780                 subscriber.onComplete();
 781             } finally {
 782                 try {
 783                     cf.complete(finisher.apply(subscriber));
 784                 } catch (Throwable throwable) {
 785                     cf.completeExceptionally(throwable);
 786                 }
 787             }
 788         }
 789 
 790         @Override
 791         public CompletionStage<R> getBody() {
 792             return cf;
 793         }
 794     }
 795 
 796     /**
 797      * A body subscriber which receives input from an upstream subscriber
 798      * and maps that subscriber's body type to a new type. The upstream subscriber
 799      * delegates all flow operations directly to this object. The
 800      * {@link CompletionStage} returned by {@link #getBody()}} takes the output
 801      * of the upstream {@code getBody()} and applies the mapper function to
 802      * obtain the new {@code CompletionStage} type.
 803      *
 804      * @param <T> the upstream body type
 805      * @param <U> this subscriber's body type
 806      */
 807     public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {
 808         private final BodySubscriber<T> upstream;
 809         private final Function<? super T,? extends U> mapper;
 810         private final boolean trusted;
 811 
 812         public MappingSubscriber(BodySubscriber<T> upstream,
 813                                  Function<? super T,? extends U> mapper) {
 814             this(upstream, mapper, false);
 815         }
 816 
 817         // creates a MappingSubscriber with a mapper that is trusted
 818         // to not block when called.
 819         MappingSubscriber(BodySubscriber<T> upstream,
 820                           Function<? super T,? extends U> mapper,
 821                           boolean trusted) {
 822             this.upstream = Objects.requireNonNull(upstream);
 823             this.mapper = Objects.requireNonNull(mapper);
 824             this.trusted = trusted;
 825         }
 826 
 827         // There is no way to know whether a custom mapper function
 828         // might block or not - so we should return true unless the
 829         // mapper is implemented and trusted by our own code not to
 830         // block.
 831         @Override
 832         public boolean needsExecutor() {
 833             return !trusted || TrustedSubscriber.needsExecutor(upstream);
 834         }
 835 
 836         // If upstream.getBody() is already completed (case of InputStream),
 837         // then calling upstream.getBody().thenApply(mapper) might block
 838         // if the mapper blocks. We should probably add a variant of
 839         // MappingSubscriber that calls thenApplyAsync instead, but this
 840         // needs a new public API point. See needsExecutor() above.
 841         @Override
 842         public CompletionStage<U> getBody() {
 843             return upstream.getBody().thenApply(mapper);
 844         }
 845 
 846         @Override
 847         public void onSubscribe(Flow.Subscription subscription) {
 848             upstream.onSubscribe(subscription);
 849         }
 850 
 851         @Override
 852         public void onNext(List<ByteBuffer> item) {
 853             upstream.onNext(item);
 854         }
 855 
 856         @Override
 857         public void onError(Throwable throwable) {
 858             upstream.onError(throwable);
 859         }
 860 
 861         @Override
 862         public void onComplete() {
 863             upstream.onComplete();
 864         }
 865     }
 866 
 867     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
 868     static class PublishingBodySubscriber
 869             implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {
 870         private final MinimalFuture<Flow.Subscription>
 871                 subscriptionCF = new MinimalFuture<>();
 872         private final MinimalFuture<SubscriberRef>
 873                 subscribedCF = new MinimalFuture<>();
 874         private AtomicReference<SubscriberRef>
 875                 subscriberRef = new AtomicReference<>();
 876         private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =
 877                 subscriptionCF.thenCompose(
 878                         (s) -> MinimalFuture.completedFuture(this::subscribe));
 879 
 880         // We use the completionCF to ensure that only one of
 881         // onError or onComplete is ever called.
 882         private final MinimalFuture<Void> completionCF;
 883         private PublishingBodySubscriber() {
 884             completionCF = new MinimalFuture<>();
 885             completionCF.whenComplete(
 886                     (r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));
 887         }
 888 
 889         // An object that holds a reference to a Flow.Subscriber.
 890         // The reference is cleared when the subscriber is completed - either
 891         // normally or exceptionally, or when the subscription is cancelled.
 892         static final class SubscriberRef {
 893             volatile Flow.Subscriber<? super List<ByteBuffer>> ref;
 894             SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 895                 ref = subscriber;
 896             }
 897             Flow.Subscriber<? super List<ByteBuffer>> get() {
 898                 return ref;
 899             }
 900             Flow.Subscriber<? super List<ByteBuffer>> clear() {
 901                 Flow.Subscriber<? super List<ByteBuffer>> res = ref;
 902                 ref = null;
 903                 return res;
 904             }
 905         }
 906 
 907         // A subscription that wraps an upstream subscription and
 908         // holds a reference to a subscriber. The subscriber reference
 909         // is cleared when the subscription is cancelled
 910         static final class SubscriptionRef implements Flow.Subscription {
 911             final Flow.Subscription subscription;
 912             final SubscriberRef subscriberRef;
 913             SubscriptionRef(Flow.Subscription subscription,
 914                             SubscriberRef subscriberRef) {
 915                 this.subscription = subscription;
 916                 this.subscriberRef = subscriberRef;
 917             }
 918             @Override
 919             public void request(long n) {
 920                 if (subscriberRef.get() != null) {
 921                     subscription.request(n);
 922                 }
 923             }
 924             @Override
 925             public void cancel() {
 926                 subscription.cancel();
 927                 subscriberRef.clear();
 928             }
 929 
 930             void subscribe() {
 931                 Subscriber<?> subscriber = subscriberRef.get();
 932                 if (subscriber != null) {
 933                     subscriber.onSubscribe(this);
 934                 }
 935             }
 936 
 937             @Override
 938             public String toString() {
 939                 return "SubscriptionRef/"
 940                         + subscription.getClass().getName()
 941                         + "@"
 942                         + System.identityHashCode(subscription);
 943             }
 944         }
 945 
 946         // This is a callback for the subscribedCF.
 947         // Do not call directly!
 948         private void complete(SubscriberRef ref, Throwable t) {
 949             assert ref != null;
 950             Subscriber<?> s = ref.clear();
 951             // maybe null if subscription was cancelled
 952             if (s == null) return;
 953             if (t == null) {
 954                 try {
 955                     s.onComplete();
 956                 } catch (Throwable x) {
 957                     s.onError(x);
 958                 }
 959             } else {
 960                 s.onError(t);
 961             }
 962         }
 963 
 964         private void signalError(Throwable err) {
 965             if (err == null) {
 966                 err = new NullPointerException("null throwable");
 967             }
 968             completionCF.completeExceptionally(err);
 969         }
 970 
 971         private void signalComplete() {
 972             completionCF.complete(null);
 973         }
 974 
 975         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 976             Objects.requireNonNull(subscriber, "subscriber must not be null");
 977             SubscriberRef ref = new SubscriberRef(subscriber);
 978             if (subscriberRef.compareAndSet(null, ref)) {
 979                 subscriptionCF.thenAccept((s) -> {
 980                     SubscriptionRef subscription = new SubscriptionRef(s,ref);
 981                     try {
 982                         subscription.subscribe();
 983                         subscribedCF.complete(ref);
 984                     } catch (Throwable t) {
 985                         if (Log.errors()) {
 986                             Log.logError("Failed to call onSubscribe: " +
 987                                     "cancelling subscription: " + t);
 988                             Log.logError(t);
 989                         }
 990                         subscription.cancel();
 991                     }
 992                 });
 993             } else {
 994                 subscriber.onSubscribe(new Flow.Subscription() {
 995                     @Override public void request(long n) { }
 996                     @Override public void cancel() { }
 997                 });
 998                 subscriber.onError(new IllegalStateException(
 999                         "This publisher has already one subscriber"));
1000             }
1001         }
1002 
1003         private final AtomicBoolean subscribed = new AtomicBoolean();
1004 
1005         @Override
1006         public void onSubscribe(Flow.Subscription subscription) {
1007             Objects.requireNonNull(subscription);
1008             if (!subscribed.compareAndSet(false, true)) {
1009                 subscription.cancel();
1010             } else {
1011                 subscriptionCF.complete(subscription);
1012             }
1013         }
1014 
1015         @Override
1016         public void onNext(List<ByteBuffer> item) {
1017             Objects.requireNonNull(item);
1018             try {
1019                 // cannot be called before onSubscribe()
1020                 assert subscriptionCF.isDone();
1021                 SubscriberRef ref = subscriberRef.get();
1022                 // cannot be called before subscriber calls request(1)
1023                 assert ref != null;
1024                 Flow.Subscriber<? super List<ByteBuffer>>
1025                         subscriber = ref.get();
1026                 if (subscriber != null) {
1027                     // may be null if subscription was cancelled.
1028                     subscriber.onNext(item);
1029                 }
1030             } catch (Throwable err) {
1031                 signalError(err);
1032                 subscriptionCF.thenAccept(s -> s.cancel());
1033             }
1034         }
1035 
1036         @Override
1037         public void onError(Throwable throwable) {
1038             // cannot be called before onSubscribe();
1039             assert suppress(subscriptionCF.isDone(),
1040                     "onError called before onSubscribe",
1041                     throwable);
1042             // onError can be called before request(1), and therefore can
1043             // be called before subscriberRef is set.
1044             signalError(throwable);
1045             Objects.requireNonNull(throwable);
1046         }
1047 
1048         @Override
1049         public void onComplete() {
1050             // cannot be called before onSubscribe()
1051             if (!subscriptionCF.isDone()) {
1052                 signalError(new InternalError(
1053                         "onComplete called before onSubscribed"));
1054             } else {
1055                 // onComplete can be called before request(1),
1056                 // and therefore can be called before subscriberRef
1057                 // is set.
1058                 signalComplete();
1059             }
1060         }
1061 
1062         @Override
1063         public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
1064             return body;
1065         }
1066 
1067         private boolean suppress(boolean condition,
1068                                  String assertion,
1069                                  Throwable carrier) {
1070             if (!condition) {
1071                 if (carrier != null) {
1072                     carrier.addSuppressed(new AssertionError(assertion));
1073                 } else if (Log.errors()) {
1074                     Log.logError(new AssertionError(assertion));
1075                 }
1076             }
1077             return true;
1078         }
1079 
1080     }
1081 
1082     public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>
1083     createPublisher() {
1084         return new PublishingBodySubscriber();
1085     }
1086 
1087 
1088     /**
1089      * Tries to determine whether bs::getBody must be invoked asynchronously,
1090      * and if so, uses the provided executor to do it.
1091      * If the executor is a {@link HttpClientImpl.DelegatingExecutor},
1092      * uses the executor's delegate.
1093      * @param e    The executor to use if an executor is required.
1094      * @param bs   The BodySubscriber (trusted or not)
1095      * @param <T>  The type of the response.
1096      * @return A completion stage that completes when the completion
1097      *         stage returned by bs::getBody completes. This may, or
1098      *         may not, be the same completion stage.
1099      */
1100     public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {
1101         if (TrustedSubscriber.needsExecutor(bs)) {
1102             // getBody must be called in the executor
1103             return getBodyAsync(e, bs, new MinimalFuture<>());
1104         } else {
1105             // No executor needed
1106             return bs.getBody();
1107         }
1108     }
1109 
1110     /**
1111      * Invokes bs::getBody using the provided executor.
1112      * If invoking bs::getBody requires an executor, and the given executor
1113      * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1114      * delegate is used. If an error occurs anywhere then the given {@code cf}
1115      * is completed exceptionally (this method does not throw).
1116      * @param e   The executor that should be used to call bs::getBody
1117      * @param bs  The BodySubscriber
1118      * @param cf  A completable future that this function will set up
1119      *            to complete when the completion stage returned by
1120      *            bs::getBody completes.
1121      *            In case of any error while trying to set up the
1122      *            completion chain, {@code cf} will be completed
1123      *            exceptionally with that error.
1124      * @param <T> The response type.
1125      * @return The provided {@code cf}.
1126      */
1127     public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1128                                                       BodySubscriber<T> bs,
1129                                                       CompletableFuture<T> cf) {
1130         return getBodyAsync(e, bs, cf, cf::completeExceptionally);
1131     }
1132 
1133     /**
1134      * Invokes bs::getBody using the provided executor.
1135      * If invoking bs::getBody requires an executor, and the given executor
1136      * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1137      * delegate is used.
1138      * The provided {@code cf} is completed with the result (exceptional
1139      * or not) of the completion stage returned by bs::getBody.
1140      * If an error occurs when trying to set up the
1141      * completion chain, the provided {@code errorHandler} is invoked,
1142      * but {@code cf} is not necessarily affected.
1143      * This method does not throw.
1144      * @param e   The executor that should be used to call bs::getBody
1145      * @param bs  The BodySubscriber
1146      * @param cf  A completable future that this function will set up
1147      *            to complete when the completion stage returned by
1148      *            bs::getBody completes.
1149      *            In case of any error while trying to set up the
1150      *            completion chain, {@code cf} will be completed
1151      *            exceptionally with that error.
1152      * @param errorHandler The handler to invoke if an error is raised
1153      *                     while trying to set up the completion chain.
1154      * @param <T> The response type.
1155      * @return The provide {@code cf}. If the {@code errorHandler} is
1156      * invoked, it is the responsibility of the {@code errorHandler} to
1157      * complete the {@code cf}, if needed.
1158      */
1159     public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1160                                                       BodySubscriber<T> bs,
1161                                                       CompletableFuture<T> cf,
1162                                                       Consumer<Throwable> errorHandler) {
1163         assert errorHandler != null;
1164         try {
1165             assert e != null;
1166             assert cf != null;
1167 
1168             if (TrustedSubscriber.needsExecutor(bs)) {
1169                 e = (e instanceof DelegatingExecutor exec)
1170                         ? exec::ensureExecutedAsync : e;
1171             }
1172 
1173             e.execute(() -> {
1174                 try {
1175                     bs.getBody().whenComplete((r, t) -> {
1176                         if (t != null) {
1177                             cf.completeExceptionally(t);
1178                         } else {
1179                             cf.complete(r);
1180                         }
1181                     });
1182                 } catch (Throwable t) {
1183                     // the errorHandler will complete the CF
1184                     errorHandler.accept(t);
1185                 }
1186             });
1187             return cf;
1188 
1189         } catch (Throwable t) {
1190             // the errorHandler will complete the CF
1191             errorHandler.accept(t);
1192         }
1193         return cf;
1194     }
1195 }