1 /*
   2  * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.BufferedReader;
  29 import java.io.FilePermission;
  30 import java.io.IOException;
  31 import java.io.InputStream;
  32 import java.io.InputStreamReader;
  33 import java.nio.ByteBuffer;
  34 import java.nio.channels.FileChannel;
  35 import java.nio.charset.Charset;
  36 import java.nio.file.OpenOption;
  37 import java.nio.file.Path;
  38 import java.security.AccessController;
  39 import java.security.PrivilegedActionException;
  40 import java.security.PrivilegedExceptionAction;
  41 import java.util.ArrayList;
  42 import java.util.Iterator;
  43 import java.util.List;
  44 import java.util.Objects;
  45 import java.util.Optional;
  46 import java.util.concurrent.ArrayBlockingQueue;
  47 import java.util.concurrent.BlockingQueue;
  48 import java.util.concurrent.CompletableFuture;
  49 import java.util.concurrent.CompletionStage;
  50 import java.util.concurrent.Executor;
  51 import java.util.concurrent.Flow;
  52 import java.util.concurrent.Flow.Subscriber;
  53 import java.util.concurrent.Flow.Subscription;
  54 import java.util.concurrent.atomic.AtomicBoolean;
  55 import java.util.concurrent.atomic.AtomicReference;
  56 import java.util.function.Consumer;
  57 import java.util.function.Function;
  58 import java.util.stream.Stream;
  59 import java.net.http.HttpResponse.BodySubscriber;
  60 import jdk.internal.net.http.common.Log;
  61 import jdk.internal.net.http.common.Logger;
  62 import jdk.internal.net.http.common.MinimalFuture;
  63 import jdk.internal.net.http.common.Utils;
  64 import static java.nio.charset.StandardCharsets.UTF_8;
  65 
  66 public class ResponseSubscribers {
  67 
  68     /**
  69      * This interface is used by our BodySubscriber implementations to
  70      * declare whether calling getBody() inline is safe, or whether
  71      * it needs to be called asynchronously in an executor thread.
  72      * Calling getBody() inline is usually safe except when it
  73      * might block - which can be the case if the BodySubscriber
  74      * is provided by custom code, or if it uses a finisher that
  75      * might be called and might block before the last bit is
  76      * received (for instance, if a mapping subscriber is used with
  77      * a mapper function that maps an InputStream to a GZIPInputStream,
  78      * as the the constructor of GZIPInputStream calls read()).
  79      * @param <T> The response type.
  80      */
  81     public interface TrustedSubscriber<T> extends BodySubscriber<T> {
  82         /**
  83          * Returns true if getBody() should be called asynchronously.
  84          * @implSpec The default implementation of this method returns
  85          *           false.
  86          * @return true if getBody() should be called asynchronously.
  87          */
  88         default boolean needsExecutor() { return false;}
  89 
  90         /**
  91          * Returns true if calling {@code bs::getBody} might block
  92          * and requires an executor.
  93          *
  94          * @implNote
  95          * In particular this method returns
  96          * true if {@code bs} is not a {@code TrustedSubscriber}.
  97          * If it is a {@code TrustedSubscriber}, it returns
  98          * {@code ((TrustedSubscriber) bs).needsExecutor()}.
  99          *
 100          * @param bs A BodySubscriber.
 101          * @return true if calling {@code bs::getBody} requires using
 102          *         an executor.
 103          */
 104         static boolean needsExecutor(BodySubscriber<?> bs) {
 105             if (bs instanceof TrustedSubscriber) {
 106                 return ((TrustedSubscriber) bs).needsExecutor();
 107             } else return true;
 108         }
 109     }
 110 
 111     public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
 112         private final Consumer<Optional<byte[]>> consumer;
 113         private Flow.Subscription subscription;
 114         private final CompletableFuture<Void> result = new MinimalFuture<>();
 115         private final AtomicBoolean subscribed = new AtomicBoolean();
 116 
 117         public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
 118             this.consumer = Objects.requireNonNull(consumer);
 119         }
 120 
 121         @Override
 122         public CompletionStage<Void> getBody() {
 123             return result;
 124         }
 125 
 126         @Override
 127         public void onSubscribe(Flow.Subscription subscription) {
 128             if (!subscribed.compareAndSet(false, true)) {
 129                 subscription.cancel();
 130             } else {
 131                 this.subscription = subscription;
 132                 subscription.request(1);
 133             }
 134         }
 135 
 136         @Override
 137         public void onNext(List<ByteBuffer> items) {
 138             for (ByteBuffer item : items) {
 139                 byte[] buf = new byte[item.remaining()];
 140                 item.get(buf);
 141                 consumer.accept(Optional.of(buf));
 142             }
 143             subscription.request(1);
 144         }
 145 
 146         @Override
 147         public void onError(Throwable throwable) {
 148             result.completeExceptionally(throwable);
 149         }
 150 
 151         @Override
 152         public void onComplete() {
 153             consumer.accept(Optional.empty());
 154             result.complete(null);
 155         }
 156 
 157     }
 158 
 159     /**
 160      * A Subscriber that writes the flow of data to a given file.
 161      *
 162      * Privileged actions are performed within a limited doPrivileged that only
 163      * asserts the specific, write, file permissions that were checked during
 164      * the construction of this PathSubscriber.
 165      */
 166     public static class PathSubscriber implements TrustedSubscriber<Path> {
 167 
 168         private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
 169 
 170         private final Path file;
 171         private final OpenOption[] options;
 172         private final FilePermission[] filePermissions;
 173         private final CompletableFuture<Path> result = new MinimalFuture<>();
 174 
 175         private volatile Flow.Subscription subscription;
 176         private volatile FileChannel out;
 177 
 178         private static final String pathForSecurityCheck(Path path) {
 179             return path.toFile().getPath();
 180         }
 181 
 182         /**
 183          * Factory for creating PathSubscriber.
 184          *
 185          * Permission checks are performed here before construction of the
 186          * PathSubscriber. Permission checking and construction are deliberately
 187          * and tightly co-located.
 188          */
 189         public static PathSubscriber create(Path file,
 190                                             List<OpenOption> options) {
 191             FilePermission filePermission = null;
 192             SecurityManager sm = System.getSecurityManager();
 193             if (sm != null) {
 194                 String fn = pathForSecurityCheck(file);
 195                 FilePermission writePermission = new FilePermission(fn, "write");
 196                 sm.checkPermission(writePermission);
 197                 filePermission = writePermission;
 198             }
 199             return new PathSubscriber(file, options, filePermission);
 200         }
 201 
 202         // pp so handler implementations in the same package can construct
 203         /*package-private*/ PathSubscriber(Path file,
 204                                            List<OpenOption> options,
 205                                            FilePermission... filePermissions) {
 206             this.file = file;
 207             this.options = options.stream().toArray(OpenOption[]::new);
 208             this.filePermissions =
 209                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
 210         }
 211 
 212         @Override
 213         public void onSubscribe(Flow.Subscription subscription) {
 214             this.subscription = subscription;
 215             if (System.getSecurityManager() == null) {
 216                 try {
 217                     out = FileChannel.open(file, options);
 218                 } catch (IOException ioe) {
 219                     result.completeExceptionally(ioe);
 220                     return;
 221                 }
 222             } else {
 223                 try {
 224                     PrivilegedExceptionAction<FileChannel> pa =
 225                             () -> FileChannel.open(file, options);
 226                     out = AccessController.doPrivileged(pa, null, filePermissions);
 227                 } catch (PrivilegedActionException pae) {
 228                     Throwable t = pae.getCause() != null ? pae.getCause() : pae;
 229                     result.completeExceptionally(t);
 230                     subscription.cancel();
 231                     return;
 232                 }
 233             }
 234             subscription.request(1);
 235         }
 236 
 237         @Override
 238         public void onNext(List<ByteBuffer> items) {
 239             try {
 240                 out.write(items.toArray(Utils.EMPTY_BB_ARRAY));
 241             } catch (IOException ex) {
 242                 Utils.close(out);
 243                 subscription.cancel();
 244                 result.completeExceptionally(ex);
 245             }
 246             subscription.request(1);
 247         }
 248 
 249         @Override
 250         public void onError(Throwable e) {
 251             result.completeExceptionally(e);
 252             Utils.close(out);
 253         }
 254 
 255         @Override
 256         public void onComplete() {
 257             Utils.close(out);
 258             result.complete(file);
 259         }
 260 
 261         @Override
 262         public CompletionStage<Path> getBody() {
 263             return result;
 264         }
 265     }
 266 
 267     public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
 268         private final Function<byte[], T> finisher;
 269         private final CompletableFuture<T> result = new MinimalFuture<>();
 270         private final List<ByteBuffer> received = new ArrayList<>();
 271 
 272         private volatile Flow.Subscription subscription;
 273 
 274         public ByteArraySubscriber(Function<byte[],T> finisher) {
 275             this.finisher = finisher;
 276         }
 277 
 278         @Override
 279         public void onSubscribe(Flow.Subscription subscription) {
 280             if (this.subscription != null) {
 281                 subscription.cancel();
 282                 return;
 283             }
 284             this.subscription = subscription;
 285             // We can handle whatever you've got
 286             subscription.request(Long.MAX_VALUE);
 287         }
 288 
 289         @Override
 290         public void onNext(List<ByteBuffer> items) {
 291             // incoming buffers are allocated by http client internally,
 292             // and won't be used anywhere except this place.
 293             // So it's free simply to store them for further processing.
 294             assert Utils.hasRemaining(items);
 295             received.addAll(items);
 296         }
 297 
 298         @Override
 299         public void onError(Throwable throwable) {
 300             received.clear();
 301             result.completeExceptionally(throwable);
 302         }
 303 
 304         static private byte[] join(List<ByteBuffer> bytes) {
 305             int size = Utils.remaining(bytes, Integer.MAX_VALUE);
 306             byte[] res = new byte[size];
 307             int from = 0;
 308             for (ByteBuffer b : bytes) {
 309                 int l = b.remaining();
 310                 b.get(res, from, l);
 311                 from += l;
 312             }
 313             return res;
 314         }
 315 
 316         @Override
 317         public void onComplete() {
 318             try {
 319                 result.complete(finisher.apply(join(received)));
 320                 received.clear();
 321             } catch (IllegalArgumentException e) {
 322                 result.completeExceptionally(e);
 323             }
 324         }
 325 
 326         @Override
 327         public CompletionStage<T> getBody() {
 328             return result;
 329         }
 330     }
 331 
 332     /**
 333      * An InputStream built on top of the Flow API.
 334      */
 335     public static class HttpResponseInputStream extends InputStream
 336         implements TrustedSubscriber<InputStream>
 337     {
 338         final static int MAX_BUFFERS_IN_QUEUE = 1;  // lock-step with the producer
 339 
 340         // An immutable ByteBuffer sentinel to mark that the last byte was received.
 341         private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
 342         private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
 343         private static final Logger debug =
 344                 Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);
 345 
 346         // A queue of yet unprocessed ByteBuffers received from the flow API.
 347         private final BlockingQueue<List<ByteBuffer>> buffers;
 348         private volatile Flow.Subscription subscription;
 349         private volatile boolean closed;
 350         private volatile Throwable failed;
 351         private volatile Iterator<ByteBuffer> currentListItr;
 352         private volatile ByteBuffer currentBuffer;
 353         private final AtomicBoolean subscribed = new AtomicBoolean();
 354 
 355         public HttpResponseInputStream() {
 356             this(MAX_BUFFERS_IN_QUEUE);
 357         }
 358 
 359         HttpResponseInputStream(int maxBuffers) {
 360             int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
 361             // 1 additional slot needed for LAST_LIST added by onComplete
 362             this.buffers = new ArrayBlockingQueue<>(capacity + 1);
 363         }
 364 
 365         @Override
 366         public CompletionStage<InputStream> getBody() {
 367             // Returns the stream immediately, before the
 368             // response body is received.
 369             // This makes it possible for sendAsync().get().body()
 370             // to complete before the response body is received.
 371             return CompletableFuture.completedStage(this);
 372         }
 373 
 374         // Returns the current byte buffer to read from.
 375         // If the current buffer has no remaining data, this method will take the
 376         // next buffer from the buffers queue, possibly blocking until
 377         // a new buffer is made available through the Flow API, or the
 378         // end of the flow has been reached.
 379         private ByteBuffer current() throws IOException {
 380             while (currentBuffer == null || !currentBuffer.hasRemaining()) {
 381                 // Check whether the stream is closed or exhausted
 382                 if (closed || failed != null) {
 383                     throw new IOException("closed", failed);
 384                 }
 385                 if (currentBuffer == LAST_BUFFER) break;
 386 
 387                 try {
 388                     if (currentListItr == null || !currentListItr.hasNext()) {
 389                         // Take a new list of buffers from the queue, blocking
 390                         // if none is available yet...
 391 
 392                         if (debug.on()) debug.log("Taking list of Buffers");
 393                         List<ByteBuffer> lb = buffers.take();
 394                         currentListItr = lb.iterator();
 395                         if (debug.on()) debug.log("List of Buffers Taken");
 396 
 397                         // Check whether an exception was encountered upstream
 398                         if (closed || failed != null)
 399                             throw new IOException("closed", failed);
 400 
 401                         // Check whether we're done.
 402                         if (lb == LAST_LIST) {
 403                             currentListItr = null;
 404                             currentBuffer = LAST_BUFFER;
 405                             break;
 406                         }
 407 
 408                         // Request another upstream item ( list of buffers )
 409                         Flow.Subscription s = subscription;
 410                         if (s != null) {
 411                             if (debug.on()) debug.log("Increased demand by 1");
 412                             s.request(1);
 413                         }
 414                         assert currentListItr != null;
 415                         if (lb.isEmpty()) continue;
 416                     }
 417                     assert currentListItr != null;
 418                     assert currentListItr.hasNext();
 419                     if (debug.on()) debug.log("Next Buffer");
 420                     currentBuffer = currentListItr.next();
 421                 } catch (InterruptedException ex) {
 422                     // continue
 423                 }
 424             }
 425             assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
 426             return currentBuffer;
 427         }
 428 
 429         @Override
 430         public int read(byte[] bytes, int off, int len) throws IOException {
 431             // get the buffer to read from, possibly blocking if
 432             // none is available
 433             ByteBuffer buffer;
 434             if ((buffer = current()) == LAST_BUFFER) return -1;
 435 
 436             // don't attempt to read more than what is available
 437             // in the current buffer.
 438             int read = Math.min(buffer.remaining(), len);
 439             assert read > 0 && read <= buffer.remaining();
 440 
 441             // buffer.get() will do the boundary check for us.
 442             buffer.get(bytes, off, read);
 443             return read;
 444         }
 445 
 446         @Override
 447         public int read() throws IOException {
 448             ByteBuffer buffer;
 449             if ((buffer = current()) == LAST_BUFFER) return -1;
 450             return buffer.get() & 0xFF;
 451         }
 452 
 453         @Override
 454         public int available() throws IOException {
 455             // best effort: returns the number of remaining bytes in
 456             // the current buffer if any, or 1 if the current buffer
 457             // is null or empty but the queue or current buffer list
 458             // are not empty. Returns 0 otherwise.
 459             if (closed) return 0;
 460             int available = 0;
 461             ByteBuffer current = currentBuffer;
 462             if (current == LAST_BUFFER) return 0;
 463             if (current != null) available = current.remaining();
 464             if (available != 0) return available;
 465             Iterator<?> iterator = currentListItr;
 466             if (iterator != null && iterator.hasNext()) return 1;
 467             if (buffers.isEmpty()) return 0;
 468             return 1;
 469         }
 470 
 471         @Override
 472         public void onSubscribe(Flow.Subscription s) {
 473             try {
 474                 if (!subscribed.compareAndSet(false, true)) {
 475                     s.cancel();
 476                 } else {
 477                     // check whether the stream is already closed.
 478                     // if so, we should cancel the subscription
 479                     // immediately.
 480                     boolean closed;
 481                     synchronized (this) {
 482                         closed = this.closed;
 483                         if (!closed) {
 484                             this.subscription = s;
 485                         }
 486                     }
 487                     if (closed) {
 488                         s.cancel();
 489                         return;
 490                     }
 491                     assert buffers.remainingCapacity() > 1; // should contain at least 2
 492                     if (debug.on())
 493                         debug.log("onSubscribe: requesting "
 494                                   + Math.max(1, buffers.remainingCapacity() - 1));
 495                     s.request(Math.max(1, buffers.remainingCapacity() - 1));
 496                 }
 497             } catch (Throwable t) {
 498                 failed = t;
 499                 try {
 500                     close();
 501                 } catch (IOException x) {
 502                     // OK
 503                 } finally {
 504                     onError(t);
 505                 }
 506             }
 507         }
 508 
 509         @Override
 510         public void onNext(List<ByteBuffer> t) {
 511             Objects.requireNonNull(t);
 512             try {
 513                 if (debug.on()) debug.log("next item received");
 514                 if (!buffers.offer(t)) {
 515                     throw new IllegalStateException("queue is full");
 516                 }
 517                 if (debug.on()) debug.log("item offered");
 518             } catch (Throwable ex) {
 519                 failed = ex;
 520                 try {
 521                     close();
 522                 } catch (IOException ex1) {
 523                     // OK
 524                 } finally {
 525                     onError(ex);
 526                 }
 527             }
 528         }
 529 
 530         @Override
 531         public void onError(Throwable thrwbl) {
 532             subscription = null;
 533             failed = Objects.requireNonNull(thrwbl);
 534             // The client process that reads the input stream might
 535             // be blocked in queue.take().
 536             // Tries to offer LAST_LIST to the queue. If the queue is
 537             // full we don't care if we can't insert this buffer, as
 538             // the client can't be blocked in queue.take() in that case.
 539             // Adding LAST_LIST to the queue is harmless, as the client
 540             // should find failed != null before handling LAST_LIST.
 541             buffers.offer(LAST_LIST);
 542         }
 543 
 544         @Override
 545         public void onComplete() {
 546             subscription = null;
 547             onNext(LAST_LIST);
 548         }
 549 
 550         @Override
 551         public void close() throws IOException {
 552             Flow.Subscription s;
 553             synchronized (this) {
 554                 if (closed) return;
 555                 closed = true;
 556                 s = subscription;
 557                 subscription = null;
 558             }
 559             // s will be null if already completed
 560             try {
 561                 if (s != null) {
 562                     s.cancel();
 563                 }
 564             } finally {
 565                 buffers.offer(LAST_LIST);
 566                 super.close();
 567             }
 568         }
 569 
 570     }
 571 
 572     public static BodySubscriber<Stream<String>> createLineStream() {
 573         return createLineStream(UTF_8);
 574     }
 575 
 576     public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
 577         Objects.requireNonNull(charset);
 578         BodySubscriber<InputStream> s = new HttpResponseInputStream();
 579         // Creates a MappingSubscriber with a trusted finisher that is
 580         // trusted not to block.
 581         return new MappingSubscriber<InputStream,Stream<String>>(s,
 582             (InputStream stream) -> {
 583                 return new BufferedReader(new InputStreamReader(stream, charset))
 584                             .lines().onClose(() -> Utils.close(stream));
 585             }, true);
 586     }
 587 
 588     /**
 589      * Currently this consumes all of the data and ignores it
 590      */
 591     public static class NullSubscriber<T> implements TrustedSubscriber<T> {
 592 
 593         private final CompletableFuture<T> cf = new MinimalFuture<>();
 594         private final Optional<T> result;
 595         private final AtomicBoolean subscribed = new AtomicBoolean();
 596 
 597         public NullSubscriber(Optional<T> result) {
 598             this.result = result;
 599         }
 600 
 601         @Override
 602         public void onSubscribe(Flow.Subscription subscription) {
 603             if (!subscribed.compareAndSet(false, true)) {
 604                 subscription.cancel();
 605             } else {
 606                 subscription.request(Long.MAX_VALUE);
 607             }
 608         }
 609 
 610         @Override
 611         public void onNext(List<ByteBuffer> items) {
 612             Objects.requireNonNull(items);
 613         }
 614 
 615         @Override
 616         public void onError(Throwable throwable) {
 617             cf.completeExceptionally(throwable);
 618         }
 619 
 620         @Override
 621         public void onComplete() {
 622             if (result.isPresent()) {
 623                 cf.complete(result.get());
 624             } else {
 625                 cf.complete(null);
 626             }
 627         }
 628 
 629         @Override
 630         public CompletionStage<T> getBody() {
 631             return cf;
 632         }
 633     }
 634 
 635     /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
 636     public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
 637         implements TrustedSubscriber<R>
 638     {
 639         private final CompletableFuture<R> cf = new MinimalFuture<>();
 640         private final S subscriber;
 641         private final Function<? super S,? extends R> finisher;
 642         private volatile Subscription subscription;
 643 
 644         // The finisher isn't called until all bytes have been received,
 645         // and so shouldn't need an executor. No need to override
 646         // TrustedSubscriber::needsExecutor
 647         public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {
 648             this.subscriber = Objects.requireNonNull(subscriber);
 649             this.finisher = Objects.requireNonNull(finisher);
 650         }
 651 
 652         @Override
 653         public void onSubscribe(Subscription subscription) {
 654             Objects.requireNonNull(subscription);
 655             if (this.subscription != null) {
 656                 subscription.cancel();
 657             } else {
 658                 this.subscription = subscription;
 659                 subscriber.onSubscribe(subscription);
 660             }
 661         }
 662 
 663         @Override
 664         public void onNext(List<ByteBuffer> item) {
 665             Objects.requireNonNull(item);
 666             try {
 667                 subscriber.onNext(item);
 668             } catch (Throwable throwable) {
 669                 subscription.cancel();
 670                 onError(throwable);
 671             }
 672         }
 673 
 674         @Override
 675         public void onError(Throwable throwable) {
 676             Objects.requireNonNull(throwable);
 677             try {
 678                 subscriber.onError(throwable);
 679             } finally {
 680                 cf.completeExceptionally(throwable);
 681             }
 682         }
 683 
 684         @Override
 685         public void onComplete() {
 686             try {
 687                 subscriber.onComplete();
 688             } finally {
 689                 try {
 690                     cf.complete(finisher.apply(subscriber));
 691                 } catch (Throwable throwable) {
 692                     cf.completeExceptionally(throwable);
 693                 }
 694             }
 695         }
 696 
 697         @Override
 698         public CompletionStage<R> getBody() {
 699             return cf;
 700         }
 701     }
 702 
 703     /**
 704      * A body subscriber which receives input from an upstream subscriber
 705      * and maps that subscriber's body type to a new type. The upstream subscriber
 706      * delegates all flow operations directly to this object. The
 707      * {@link CompletionStage} returned by {@link #getBody()}} takes the output
 708      * of the upstream {@code getBody()} and applies the mapper function to
 709      * obtain the new {@code CompletionStage} type.
 710      *
 711      * @param <T> the upstream body type
 712      * @param <U> this subscriber's body type
 713      */
 714     public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {
 715         private final BodySubscriber<T> upstream;
 716         private final Function<? super T,? extends U> mapper;
 717         private final boolean trusted;
 718 
 719         public MappingSubscriber(BodySubscriber<T> upstream,
 720                                  Function<? super T,? extends U> mapper) {
 721             this(upstream, mapper, false);
 722         }
 723 
 724         // creates a MappingSubscriber with a mapper that is trusted
 725         // to not block when called.
 726         MappingSubscriber(BodySubscriber<T> upstream,
 727                           Function<? super T,? extends U> mapper,
 728                           boolean trusted) {
 729             this.upstream = Objects.requireNonNull(upstream);
 730             this.mapper = Objects.requireNonNull(mapper);
 731             this.trusted = trusted;
 732         }
 733 
 734         // There is no way to know whether a custom mapper function
 735         // might block or not - so we should return true unless the
 736         // mapper is implemented and trusted by our own code not to
 737         // block.
 738         @Override
 739         public boolean needsExecutor() {
 740             return !trusted || TrustedSubscriber.needsExecutor(upstream);
 741         }
 742 
 743         // If upstream.getBody() is already completed (case of InputStream),
 744         // then calling upstream.getBody().thenApply(mapper) might block
 745         // if the mapper blocks. We should probably add a variant of
 746         // MappingSubscriber that calls thenApplyAsync instead, but this
 747         // needs a new public API point. See needsExecutor() above.
 748         @Override
 749         public CompletionStage<U> getBody() {
 750             return upstream.getBody().thenApply(mapper);
 751         }
 752 
 753         @Override
 754         public void onSubscribe(Flow.Subscription subscription) {
 755             upstream.onSubscribe(subscription);
 756         }
 757 
 758         @Override
 759         public void onNext(List<ByteBuffer> item) {
 760             upstream.onNext(item);
 761         }
 762 
 763         @Override
 764         public void onError(Throwable throwable) {
 765             upstream.onError(throwable);
 766         }
 767 
 768         @Override
 769         public void onComplete() {
 770             upstream.onComplete();
 771         }
 772     }
 773 
 774     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
 775     static class PublishingBodySubscriber
 776             implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {
 777         private final MinimalFuture<Flow.Subscription>
 778                 subscriptionCF = new MinimalFuture<>();
 779         private final MinimalFuture<SubscriberRef>
 780                 subscribedCF = new MinimalFuture<>();
 781         private AtomicReference<SubscriberRef>
 782                 subscriberRef = new AtomicReference<>();
 783         private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =
 784                 subscriptionCF.thenCompose(
 785                         (s) -> MinimalFuture.completedFuture(this::subscribe));
 786 
 787         // We use the completionCF to ensure that only one of
 788         // onError or onComplete is ever called.
 789         private final MinimalFuture<Void> completionCF;
 790         private PublishingBodySubscriber() {
 791             completionCF = new MinimalFuture<>();
 792             completionCF.whenComplete(
 793                     (r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));
 794         }
 795 
 796         // An object that holds a reference to a Flow.Subscriber.
 797         // The reference is cleared when the subscriber is completed - either
 798         // normally or exceptionally, or when the subscription is cancelled.
 799         static final class SubscriberRef {
 800             volatile Flow.Subscriber<? super List<ByteBuffer>> ref;
 801             SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 802                 ref = subscriber;
 803             }
 804             Flow.Subscriber<? super List<ByteBuffer>> get() {
 805                 return ref;
 806             }
 807             Flow.Subscriber<? super List<ByteBuffer>> clear() {
 808                 Flow.Subscriber<? super List<ByteBuffer>> res = ref;
 809                 ref = null;
 810                 return res;
 811             }
 812         }
 813 
 814         // A subscription that wraps an upstream subscription and
 815         // holds a reference to a subscriber. The subscriber reference
 816         // is cleared when the subscription is cancelled
 817         final static class SubscriptionRef implements Flow.Subscription {
 818             final Flow.Subscription subscription;
 819             final SubscriberRef subscriberRef;
 820             SubscriptionRef(Flow.Subscription subscription,
 821                             SubscriberRef subscriberRef) {
 822                 this.subscription = subscription;
 823                 this.subscriberRef = subscriberRef;
 824             }
 825             @Override
 826             public void request(long n) {
 827                 if (subscriberRef.get() != null) {
 828                     subscription.request(n);
 829                 }
 830             }
 831             @Override
 832             public void cancel() {
 833                 subscription.cancel();
 834                 subscriberRef.clear();
 835             }
 836 
 837             void subscribe() {
 838                 Subscriber<?> subscriber = subscriberRef.get();
 839                 if (subscriber != null) {
 840                     subscriber.onSubscribe(this);
 841                 }
 842             }
 843 
 844             @Override
 845             public String toString() {
 846                 return "SubscriptionRef/"
 847                         + subscription.getClass().getName()
 848                         + "@"
 849                         + System.identityHashCode(subscription);
 850             }
 851         }
 852 
 853         // This is a callback for the subscribedCF.
 854         // Do not call directly!
 855         private void complete(SubscriberRef ref, Throwable t) {
 856             assert ref != null;
 857             Subscriber<?> s = ref.clear();
 858             // maybe null if subscription was cancelled
 859             if (s == null) return;
 860             if (t == null) {
 861                 try {
 862                     s.onComplete();
 863                 } catch (Throwable x) {
 864                     s.onError(x);
 865                 }
 866             } else {
 867                 s.onError(t);
 868             }
 869         }
 870 
 871         private void signalError(Throwable err) {
 872             if (err == null) {
 873                 err = new NullPointerException("null throwable");
 874             }
 875             completionCF.completeExceptionally(err);
 876         }
 877 
 878         private void signalComplete() {
 879             completionCF.complete(null);
 880         }
 881 
 882         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 883             Objects.requireNonNull(subscriber, "subscriber must not be null");
 884             SubscriberRef ref = new SubscriberRef(subscriber);
 885             if (subscriberRef.compareAndSet(null, ref)) {
 886                 subscriptionCF.thenAccept((s) -> {
 887                     SubscriptionRef subscription = new SubscriptionRef(s,ref);
 888                     try {
 889                         subscription.subscribe();
 890                         subscribedCF.complete(ref);
 891                     } catch (Throwable t) {
 892                         if (Log.errors()) {
 893                             Log.logError("Failed to call onSubscribe: " +
 894                                     "cancelling subscription: " + t);
 895                             Log.logError(t);
 896                         }
 897                         subscription.cancel();
 898                     }
 899                 });
 900             } else {
 901                 subscriber.onSubscribe(new Flow.Subscription() {
 902                     @Override public void request(long n) { }
 903                     @Override public void cancel() { }
 904                 });
 905                 subscriber.onError(new IllegalStateException(
 906                         "This publisher has already one subscriber"));
 907             }
 908         }
 909 
 910         @Override
 911         public void onSubscribe(Flow.Subscription subscription) {
 912             subscriptionCF.complete(subscription);
 913         }
 914 
 915         @Override
 916         public void onNext(List<ByteBuffer> item) {
 917             try {
 918                 // cannot be called before onSubscribe()
 919                 assert subscriptionCF.isDone();
 920                 SubscriberRef ref = subscriberRef.get();
 921                 // cannot be called before subscriber calls request(1)
 922                 assert ref != null;
 923                 Flow.Subscriber<? super List<ByteBuffer>>
 924                         subscriber = ref.get();
 925                 if (subscriber != null) {
 926                     // may be null if subscription was cancelled.
 927                     subscriber.onNext(item);
 928                 }
 929             } catch (Throwable err) {
 930                 signalError(err);
 931                 subscriptionCF.thenAccept(s -> s.cancel());
 932             }
 933         }
 934 
 935         @Override
 936         public void onError(Throwable throwable) {
 937             // cannot be called before onSubscribe();
 938             assert suppress(subscriptionCF.isDone(),
 939                     "onError called before onSubscribe",
 940                     throwable);
 941             // onError can be called before request(1), and therefore can
 942             // be called before subscriberRef is set.
 943             signalError(throwable);
 944         }
 945 
 946         @Override
 947         public void onComplete() {
 948             // cannot be called before onSubscribe()
 949             if (!subscriptionCF.isDone()) {
 950                 signalError(new InternalError(
 951                         "onComplete called before onSubscribed"));
 952             } else {
 953                 // onComplete can be called before request(1),
 954                 // and therefore can be called before subscriberRef
 955                 // is set.
 956                 signalComplete();
 957             }
 958         }
 959 
 960         @Override
 961         public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
 962             return body;
 963         }
 964 
 965         private boolean suppress(boolean condition,
 966                                  String assertion,
 967                                  Throwable carrier) {
 968             if (!condition) {
 969                 if (carrier != null) {
 970                     carrier.addSuppressed(new AssertionError(assertion));
 971                 } else if (Log.errors()) {
 972                     Log.logError(new AssertionError(assertion));
 973                 }
 974             }
 975             return true;
 976         }
 977 
 978     }
 979 
 980     public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>
 981     createPublisher() {
 982         return new PublishingBodySubscriber();
 983     }
 984 
 985 
 986     /**
 987      * Tries to determine whether bs::getBody must be invoked asynchronously,
 988      * and if so, uses the provided executor to do it.
 989      * If the executor is a {@link HttpClientImpl.DelegatingExecutor},
 990      * uses the executor's delegate.
 991      * @param e    The executor to use if an executor is required.
 992      * @param bs   The BodySubscriber (trusted or not)
 993      * @param <T>  The type of the response.
 994      * @return A completion stage that completes when the completion
 995      *         stage returned by bs::getBody completes. This may, or
 996      *         may not, be the same completion stage.
 997      */
 998     public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {
 999         if (TrustedSubscriber.needsExecutor(bs)) {
1000             // getBody must be called in the executor
1001             return getBodyAsync(e, bs, new MinimalFuture<>());
1002         } else {
1003             // No executor needed
1004             return bs.getBody();
1005         }
1006     }
1007 
1008     /**
1009      * Invokes bs::getBody using the provided executor.
1010      * If invoking bs::getBody requires an executor, and the given executor
1011      * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1012      * delegate is used. If an error occurs anywhere then the given {code cf}
1013      * is completed exceptionally (this method does not throw).
1014      * @param e   The executor that should be used to call bs::getBody
1015      * @param bs  The BodySubscriber
1016      * @param cf  A completable future that this function will set up
1017      *            to complete when the completion stage returned by
1018      *            bs::getBody completes.
1019      *            In case of any error while trying to set up the
1020      *            completion chain, {@code cf} will be completed
1021      *            exceptionally with that error.
1022      * @param <T> The response type.
1023      * @return The provided {@code cf}.
1024      */
1025     public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1026                                                       BodySubscriber<T> bs,
1027                                                       CompletableFuture<T> cf) {
1028         return getBodyAsync(e, bs, cf, cf::completeExceptionally);
1029     }
1030 
1031     /**
1032      * Invokes bs::getBody using the provided executor.
1033      * If invoking bs::getBody requires an executor, and the given executor
1034      * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1035      * delegate is used.
1036      * The provided {@code cf} is completed with the result (exceptional
1037      * or not) of the completion stage returned by bs::getBody.
1038      * If an error occurs when trying to set up the
1039      * completion chain, the provided {@code errorHandler} is invoked,
1040      * but {@code cf} is not necessarily affected.
1041      * This method does not throw.
1042      * @param e   The executor that should be used to call bs::getBody
1043      * @param bs  The BodySubscriber
1044      * @param cf  A completable future that this function will set up
1045      *            to complete when the completion stage returned by
1046      *            bs::getBody completes.
1047      *            In case of any error while trying to set up the
1048      *            completion chain, {@code cf} will be completed
1049      *            exceptionally with that error.
1050      * @param errorHandler The handler to invoke if an error is raised
1051      *                     while trying to set up the completion chain.
1052      * @param <T> The response type.
1053      * @return The provide {@code cf}. If the {@code errorHandler} is
1054      * invoked, it is the responsibility of the {@code errorHandler} to
1055      * complete the {@code cf}, if needed.
1056      */
1057     public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1058                                                       BodySubscriber<T> bs,
1059                                                       CompletableFuture<T> cf,
1060                                                       Consumer<Throwable> errorHandler) {
1061         assert errorHandler != null;
1062         try {
1063             assert e != null;
1064             assert cf != null;
1065 
1066             if (TrustedSubscriber.needsExecutor(bs)) {
1067                 e = (e instanceof HttpClientImpl.DelegatingExecutor)
1068                         ? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;
1069             }
1070 
1071             e.execute(() -> {
1072                 try {
1073                     bs.getBody().whenComplete((r, t) -> {
1074                         if (t != null) {
1075                             cf.completeExceptionally(t);
1076                         } else {
1077                             cf.complete(r);
1078                         }
1079                     });
1080                 } catch (Throwable t) {
1081                     errorHandler.accept(t);
1082                 }
1083             });
1084             return cf;
1085 
1086         } catch (Throwable t) {
1087             errorHandler.accept(t);
1088         }
1089         return cf;
1090     }
1091 }