1 /*
   2  * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.BufferedReader;
  29 import java.io.FilePermission;
  30 import java.io.IOException;
  31 import java.io.InputStream;
  32 import java.io.InputStreamReader;
  33 import java.nio.ByteBuffer;
  34 import java.nio.channels.FileChannel;
  35 import java.nio.charset.Charset;
  36 import java.nio.file.OpenOption;
  37 import java.nio.file.Path;
  38 import java.security.AccessController;
  39 import java.security.PrivilegedActionException;
  40 import java.security.PrivilegedExceptionAction;
  41 import java.util.ArrayList;
  42 import java.util.Iterator;
  43 import java.util.List;
  44 import java.util.Objects;
  45 import java.util.Optional;
  46 import java.util.concurrent.ArrayBlockingQueue;
  47 import java.util.concurrent.BlockingQueue;
  48 import java.util.concurrent.CompletableFuture;
  49 import java.util.concurrent.CompletionStage;
  50 import java.util.concurrent.Executor;
  51 import java.util.concurrent.Flow;
  52 import java.util.concurrent.Flow.Subscriber;
  53 import java.util.concurrent.Flow.Subscription;
  54 import java.util.concurrent.atomic.AtomicBoolean;
  55 import java.util.concurrent.atomic.AtomicReference;
  56 import java.util.function.Consumer;
  57 import java.util.function.Function;
  58 import java.util.stream.Stream;
  59 import java.net.http.HttpResponse.BodySubscriber;
  60 import jdk.internal.net.http.common.Log;
  61 import jdk.internal.net.http.common.Logger;
  62 import jdk.internal.net.http.common.MinimalFuture;
  63 import jdk.internal.net.http.common.Utils;
  64 import static java.nio.charset.StandardCharsets.UTF_8;
  65 
  66 public class ResponseSubscribers {
  67 
  68     /**
  69      * This interface is used by our BodySubscriber implementations to
  70      * declare whether calling getBody() inline is safe, or whether
  71      * it needs to be called asynchronously in an executor thread.
  72      * Calling getBody() inline is usually safe except when it
  73      * might block - which can be the case if the BodySubscriber
  74      * is provided by custom code, or if it uses a finisher that
  75      * might be called and might block before the last bit is
  76      * received (for instance, if a mapping subscriber is used with
  77      * a mapper function that maps an InputStream to a GZIPInputStream,
  78      * as the the constructor of GZIPInputStream calls read()).
  79      * @param <T> The response type.
  80      */
  81     public interface TrustedSubscriber<T> extends BodySubscriber<T> {
  82         /**
  83          * Returns true if getBody() should be called asynchronously.
  84          * @implSpec The default implementation of this method returns
  85          *           false.
  86          * @return true if getBody() should be called asynchronously.
  87          */
  88         default boolean needsExecutor() { return false;}
  89 
  90         /**
  91          * Returns true if calling {@code bs::getBody} might block
  92          * and requires an executor.
  93          *
  94          * @implNote
  95          * In particular this method returns
  96          * true if {@code bs} is not a {@code TrustedSubscriber}.
  97          * If it is a {@code TrustedSubscriber}, it returns
  98          * {@code ((TrustedSubscriber) bs).needsExecutor()}.
  99          *
 100          * @param bs A BodySubscriber.
 101          * @return true if calling {@code bs::getBody} requires using
 102          *         an executor.
 103          */
 104         static boolean needsExecutor(BodySubscriber<?> bs) {
 105             if (bs instanceof TrustedSubscriber) {
 106                 return ((TrustedSubscriber) bs).needsExecutor();
 107             } else return true;
 108         }
 109     }
 110 
 111     public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
 112         private final Consumer<Optional<byte[]>> consumer;
 113         private Flow.Subscription subscription;
 114         private final CompletableFuture<Void> result = new MinimalFuture<>();
 115         private final AtomicBoolean subscribed = new AtomicBoolean();
 116 
 117         public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
 118             this.consumer = Objects.requireNonNull(consumer);
 119         }
 120 
 121         @Override
 122         public CompletionStage<Void> getBody() {
 123             return result;
 124         }
 125 
 126         @Override
 127         public void onSubscribe(Flow.Subscription subscription) {
 128             Objects.requireNonNull(subscription);
 129             if (!subscribed.compareAndSet(false, true)) {
 130                 subscription.cancel();
 131             } else {
 132                 this.subscription = subscription;
 133                 subscription.request(1);
 134             }
 135         }
 136 
 137         @Override
 138         public void onNext(List<ByteBuffer> items) {
 139             Objects.requireNonNull(items);
 140             for (ByteBuffer item : items) {
 141                 byte[] buf = new byte[item.remaining()];
 142                 item.get(buf);
 143                 consumer.accept(Optional.of(buf));
 144             }
 145             subscription.request(1);
 146         }
 147 
 148         @Override
 149         public void onError(Throwable throwable) {
 150             Objects.requireNonNull(throwable);
 151             result.completeExceptionally(throwable);
 152         }
 153 
 154         @Override
 155         public void onComplete() {
 156             consumer.accept(Optional.empty());
 157             result.complete(null);
 158         }
 159 
 160     }
 161 
 162     /**
 163      * A Subscriber that writes the flow of data to a given file.
 164      *
 165      * Privileged actions are performed within a limited doPrivileged that only
 166      * asserts the specific, write, file permissions that were checked during
 167      * the construction of this PathSubscriber.
 168      */
 169     public static class PathSubscriber implements TrustedSubscriber<Path> {
 170 
 171         private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
 172 
 173         private final Path file;
 174         private final OpenOption[] options;
 175         private final FilePermission[] filePermissions;
 176         private final CompletableFuture<Path> result = new MinimalFuture<>();
 177 
 178         private final AtomicBoolean subscribed = new AtomicBoolean();
 179         private volatile Flow.Subscription subscription;
 180         private volatile FileChannel out;
 181 
 182         private static final String pathForSecurityCheck(Path path) {
 183             return path.toFile().getPath();
 184         }
 185 
 186         /**
 187          * Factory for creating PathSubscriber.
 188          *
 189          * Permission checks are performed here before construction of the
 190          * PathSubscriber. Permission checking and construction are deliberately
 191          * and tightly co-located.
 192          */
 193         public static PathSubscriber create(Path file,
 194                                             List<OpenOption> options) {
 195             FilePermission filePermission = null;
 196             SecurityManager sm = System.getSecurityManager();
 197             if (sm != null) {
 198                 String fn = pathForSecurityCheck(file);
 199                 FilePermission writePermission = new FilePermission(fn, "write");
 200                 sm.checkPermission(writePermission);
 201                 filePermission = writePermission;
 202             }
 203             return new PathSubscriber(file, options, filePermission);
 204         }
 205 
 206         // pp so handler implementations in the same package can construct
 207         /*package-private*/ PathSubscriber(Path file,
 208                                            List<OpenOption> options,
 209                                            FilePermission... filePermissions) {
 210             this.file = file;
 211             this.options = options.stream().toArray(OpenOption[]::new);
 212             this.filePermissions =
 213                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
 214         }
 215 
 216         @Override
 217         public void onSubscribe(Flow.Subscription subscription) {
 218             Objects.requireNonNull(subscription);
 219             if (!subscribed.compareAndSet(false, true)) {
 220                 subscription.cancel();
 221                 return;
 222             }
 223 
 224             this.subscription = subscription;
 225             if (System.getSecurityManager() == null) {
 226                 try {
 227                     out = FileChannel.open(file, options);
 228                 } catch (IOException ioe) {
 229                     result.completeExceptionally(ioe);
 230                     return;
 231                 }
 232             } else {
 233                 try {
 234                     PrivilegedExceptionAction<FileChannel> pa =
 235                             () -> FileChannel.open(file, options);
 236                     out = AccessController.doPrivileged(pa, null, filePermissions);
 237                 } catch (PrivilegedActionException pae) {
 238                     Throwable t = pae.getCause() != null ? pae.getCause() : pae;
 239                     result.completeExceptionally(t);
 240                     subscription.cancel();
 241                     return;
 242                 }
 243             }
 244             subscription.request(1);
 245         }
 246 
 247         @Override
 248         public void onNext(List<ByteBuffer> items) {
 249             try {
 250                 out.write(items.toArray(Utils.EMPTY_BB_ARRAY));
 251             } catch (IOException ex) {
 252                 Utils.close(out);
 253                 subscription.cancel();
 254                 result.completeExceptionally(ex);
 255             }
 256             subscription.request(1);
 257         }
 258 
 259         @Override
 260         public void onError(Throwable e) {
 261             result.completeExceptionally(e);
 262             Utils.close(out);
 263         }
 264 
 265         @Override
 266         public void onComplete() {
 267             Utils.close(out);
 268             result.complete(file);
 269         }
 270 
 271         @Override
 272         public CompletionStage<Path> getBody() {
 273             return result;
 274         }
 275     }
 276 
 277     public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
 278         private final Function<byte[], T> finisher;
 279         private final CompletableFuture<T> result = new MinimalFuture<>();
 280         private final List<ByteBuffer> received = new ArrayList<>();
 281 
 282         private volatile Flow.Subscription subscription;
 283 
 284         public ByteArraySubscriber(Function<byte[],T> finisher) {
 285             this.finisher = finisher;
 286         }
 287 
 288         @Override
 289         public void onSubscribe(Flow.Subscription subscription) {
 290             if (this.subscription != null) {
 291                 subscription.cancel();
 292                 return;
 293             }
 294             this.subscription = subscription;
 295             // We can handle whatever you've got
 296             subscription.request(Long.MAX_VALUE);
 297         }
 298 
 299         @Override
 300         public void onNext(List<ByteBuffer> items) {
 301             // incoming buffers are allocated by http client internally,
 302             // and won't be used anywhere except this place.
 303             // So it's free simply to store them for further processing.
 304             assert Utils.hasRemaining(items);
 305             received.addAll(items);
 306         }
 307 
 308         @Override
 309         public void onError(Throwable throwable) {
 310             received.clear();
 311             result.completeExceptionally(throwable);
 312         }
 313 
 314         static private byte[] join(List<ByteBuffer> bytes) {
 315             int size = Utils.remaining(bytes, Integer.MAX_VALUE);
 316             byte[] res = new byte[size];
 317             int from = 0;
 318             for (ByteBuffer b : bytes) {
 319                 int l = b.remaining();
 320                 b.get(res, from, l);
 321                 from += l;
 322             }
 323             return res;
 324         }
 325 
 326         @Override
 327         public void onComplete() {
 328             try {
 329                 result.complete(finisher.apply(join(received)));
 330                 received.clear();
 331             } catch (IllegalArgumentException e) {
 332                 result.completeExceptionally(e);
 333             }
 334         }
 335 
 336         @Override
 337         public CompletionStage<T> getBody() {
 338             return result;
 339         }
 340     }
 341 
 342     /**
 343      * An InputStream built on top of the Flow API.
 344      */
 345     public static class HttpResponseInputStream extends InputStream
 346         implements TrustedSubscriber<InputStream>
 347     {
 348         final static int MAX_BUFFERS_IN_QUEUE = 1;  // lock-step with the producer
 349 
 350         // An immutable ByteBuffer sentinel to mark that the last byte was received.
 351         private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
 352         private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
 353         private static final Logger debug =
 354                 Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);
 355 
 356         // A queue of yet unprocessed ByteBuffers received from the flow API.
 357         private final BlockingQueue<List<ByteBuffer>> buffers;
 358         private volatile Flow.Subscription subscription;
 359         private volatile boolean closed;
 360         private volatile Throwable failed;
 361         private volatile Iterator<ByteBuffer> currentListItr;
 362         private volatile ByteBuffer currentBuffer;
 363         private final AtomicBoolean subscribed = new AtomicBoolean();
 364 
 365         public HttpResponseInputStream() {
 366             this(MAX_BUFFERS_IN_QUEUE);
 367         }
 368 
 369         HttpResponseInputStream(int maxBuffers) {
 370             int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
 371             // 1 additional slot needed for LAST_LIST added by onComplete
 372             this.buffers = new ArrayBlockingQueue<>(capacity + 1);
 373         }
 374 
 375         @Override
 376         public CompletionStage<InputStream> getBody() {
 377             // Returns the stream immediately, before the
 378             // response body is received.
 379             // This makes it possible for sendAsync().get().body()
 380             // to complete before the response body is received.
 381             return CompletableFuture.completedStage(this);
 382         }
 383 
 384         // Returns the current byte buffer to read from.
 385         // If the current buffer has no remaining data, this method will take the
 386         // next buffer from the buffers queue, possibly blocking until
 387         // a new buffer is made available through the Flow API, or the
 388         // end of the flow has been reached.
 389         private ByteBuffer current() throws IOException {
 390             while (currentBuffer == null || !currentBuffer.hasRemaining()) {
 391                 // Check whether the stream is closed or exhausted
 392                 if (closed || failed != null) {
 393                     throw new IOException("closed", failed);
 394                 }
 395                 if (currentBuffer == LAST_BUFFER) break;
 396 
 397                 try {
 398                     if (currentListItr == null || !currentListItr.hasNext()) {
 399                         // Take a new list of buffers from the queue, blocking
 400                         // if none is available yet...
 401 
 402                         if (debug.on()) debug.log("Taking list of Buffers");
 403                         List<ByteBuffer> lb = buffers.take();
 404                         currentListItr = lb.iterator();
 405                         if (debug.on()) debug.log("List of Buffers Taken");
 406 
 407                         // Check whether an exception was encountered upstream
 408                         if (closed || failed != null)
 409                             throw new IOException("closed", failed);
 410 
 411                         // Check whether we're done.
 412                         if (lb == LAST_LIST) {
 413                             currentListItr = null;
 414                             currentBuffer = LAST_BUFFER;
 415                             break;
 416                         }
 417 
 418                         // Request another upstream item ( list of buffers )
 419                         Flow.Subscription s = subscription;
 420                         if (s != null) {
 421                             if (debug.on()) debug.log("Increased demand by 1");
 422                             s.request(1);
 423                         }
 424                         assert currentListItr != null;
 425                         if (lb.isEmpty()) continue;
 426                     }
 427                     assert currentListItr != null;
 428                     assert currentListItr.hasNext();
 429                     if (debug.on()) debug.log("Next Buffer");
 430                     currentBuffer = currentListItr.next();
 431                 } catch (InterruptedException ex) {
 432                     // continue
 433                 }
 434             }
 435             assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
 436             return currentBuffer;
 437         }
 438 
 439         @Override
 440         public int read(byte[] bytes, int off, int len) throws IOException {
 441             // get the buffer to read from, possibly blocking if
 442             // none is available
 443             ByteBuffer buffer;
 444             if ((buffer = current()) == LAST_BUFFER) return -1;
 445 
 446             // don't attempt to read more than what is available
 447             // in the current buffer.
 448             int read = Math.min(buffer.remaining(), len);
 449             assert read > 0 && read <= buffer.remaining();
 450 
 451             // buffer.get() will do the boundary check for us.
 452             buffer.get(bytes, off, read);
 453             return read;
 454         }
 455 
 456         @Override
 457         public int read() throws IOException {
 458             ByteBuffer buffer;
 459             if ((buffer = current()) == LAST_BUFFER) return -1;
 460             return buffer.get() & 0xFF;
 461         }
 462 
 463         @Override
 464         public int available() throws IOException {
 465             // best effort: returns the number of remaining bytes in
 466             // the current buffer if any, or 1 if the current buffer
 467             // is null or empty but the queue or current buffer list
 468             // are not empty. Returns 0 otherwise.
 469             if (closed) return 0;
 470             int available = 0;
 471             ByteBuffer current = currentBuffer;
 472             if (current == LAST_BUFFER) return 0;
 473             if (current != null) available = current.remaining();
 474             if (available != 0) return available;
 475             Iterator<?> iterator = currentListItr;
 476             if (iterator != null && iterator.hasNext()) return 1;
 477             if (buffers.isEmpty()) return 0;
 478             return 1;
 479         }
 480 
 481         @Override
 482         public void onSubscribe(Flow.Subscription s) {
 483             Objects.requireNonNull(s);
 484             try {
 485                 if (!subscribed.compareAndSet(false, true)) {
 486                     s.cancel();
 487                 } else {
 488                     // check whether the stream is already closed.
 489                     // if so, we should cancel the subscription
 490                     // immediately.
 491                     boolean closed;
 492                     synchronized (this) {
 493                         closed = this.closed;
 494                         if (!closed) {
 495                             this.subscription = s;
 496                         }
 497                     }
 498                     if (closed) {
 499                         s.cancel();
 500                         return;
 501                     }
 502                     assert buffers.remainingCapacity() > 1; // should contain at least 2
 503                     if (debug.on())
 504                         debug.log("onSubscribe: requesting "
 505                                   + Math.max(1, buffers.remainingCapacity() - 1));
 506                     s.request(Math.max(1, buffers.remainingCapacity() - 1));
 507                 }
 508             } catch (Throwable t) {
 509                 failed = t;
 510                 try {
 511                     close();
 512                 } catch (IOException x) {
 513                     // OK
 514                 } finally {
 515                     onError(t);
 516                 }
 517             }
 518         }
 519 
 520         @Override
 521         public void onNext(List<ByteBuffer> t) {
 522             Objects.requireNonNull(t);
 523             try {
 524                 if (debug.on()) debug.log("next item received");
 525                 if (!buffers.offer(t)) {
 526                     throw new IllegalStateException("queue is full");
 527                 }
 528                 if (debug.on()) debug.log("item offered");
 529             } catch (Throwable ex) {
 530                 failed = ex;
 531                 try {
 532                     close();
 533                 } catch (IOException ex1) {
 534                     // OK
 535                 } finally {
 536                     onError(ex);
 537                 }
 538             }
 539         }
 540 
 541         @Override
 542         public void onError(Throwable thrwbl) {
 543             subscription = null;
 544             failed = Objects.requireNonNull(thrwbl);
 545             // The client process that reads the input stream might
 546             // be blocked in queue.take().
 547             // Tries to offer LAST_LIST to the queue. If the queue is
 548             // full we don't care if we can't insert this buffer, as
 549             // the client can't be blocked in queue.take() in that case.
 550             // Adding LAST_LIST to the queue is harmless, as the client
 551             // should find failed != null before handling LAST_LIST.
 552             buffers.offer(LAST_LIST);
 553         }
 554 
 555         @Override
 556         public void onComplete() {
 557             subscription = null;
 558             onNext(LAST_LIST);
 559         }
 560 
 561         @Override
 562         public void close() throws IOException {
 563             Flow.Subscription s;
 564             synchronized (this) {
 565                 if (closed) return;
 566                 closed = true;
 567                 s = subscription;
 568                 subscription = null;
 569             }
 570             // s will be null if already completed
 571             try {
 572                 if (s != null) {
 573                     s.cancel();
 574                 }
 575             } finally {
 576                 buffers.offer(LAST_LIST);
 577                 super.close();
 578             }
 579         }
 580 
 581     }
 582 
 583     public static BodySubscriber<Stream<String>> createLineStream() {
 584         return createLineStream(UTF_8);
 585     }
 586 
 587     public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
 588         Objects.requireNonNull(charset);
 589         BodySubscriber<InputStream> s = new HttpResponseInputStream();
 590         // Creates a MappingSubscriber with a trusted finisher that is
 591         // trusted not to block.
 592         return new MappingSubscriber<InputStream,Stream<String>>(s,
 593             (InputStream stream) -> {
 594                 return new BufferedReader(new InputStreamReader(stream, charset))
 595                             .lines().onClose(() -> Utils.close(stream));
 596             }, true);
 597     }
 598 
 599     /**
 600      * Currently this consumes all of the data and ignores it
 601      */
 602     public static class NullSubscriber<T> implements TrustedSubscriber<T> {
 603 
 604         private final CompletableFuture<T> cf = new MinimalFuture<>();
 605         private final Optional<T> result;
 606         private final AtomicBoolean subscribed = new AtomicBoolean();
 607 
 608         public NullSubscriber(Optional<T> result) {
 609             this.result = result;
 610         }
 611 
 612         @Override
 613         public void onSubscribe(Flow.Subscription subscription) {
 614             Objects.requireNonNull(subscription);
 615             if (!subscribed.compareAndSet(false, true)) {
 616                 subscription.cancel();
 617             } else {
 618                 subscription.request(Long.MAX_VALUE);
 619             }
 620         }
 621 
 622         @Override
 623         public void onNext(List<ByteBuffer> items) {
 624             Objects.requireNonNull(items);
 625         }
 626 
 627         @Override
 628         public void onError(Throwable throwable) {
 629             Objects.requireNonNull(throwable);
 630             cf.completeExceptionally(throwable);
 631         }
 632 
 633         @Override
 634         public void onComplete() {
 635             if (result.isPresent()) {
 636                 cf.complete(result.get());
 637             } else {
 638                 cf.complete(null);
 639             }
 640         }
 641 
 642         @Override
 643         public CompletionStage<T> getBody() {
 644             return cf;
 645         }
 646     }
 647 
 648     /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
 649     public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
 650         implements TrustedSubscriber<R>
 651     {
 652         private final CompletableFuture<R> cf = new MinimalFuture<>();
 653         private final S subscriber;
 654         private final Function<? super S,? extends R> finisher;
 655         private volatile Subscription subscription;
 656 
 657         // The finisher isn't called until all bytes have been received,
 658         // and so shouldn't need an executor. No need to override
 659         // TrustedSubscriber::needsExecutor
 660         public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {
 661             this.subscriber = Objects.requireNonNull(subscriber);
 662             this.finisher = Objects.requireNonNull(finisher);
 663         }
 664 
 665         @Override
 666         public void onSubscribe(Subscription subscription) {
 667             Objects.requireNonNull(subscription);
 668             if (this.subscription != null) {
 669                 subscription.cancel();
 670             } else {
 671                 this.subscription = subscription;
 672                 subscriber.onSubscribe(subscription);
 673             }
 674         }
 675 
 676         @Override
 677         public void onNext(List<ByteBuffer> item) {
 678             Objects.requireNonNull(item);
 679             try {
 680                 subscriber.onNext(item);
 681             } catch (Throwable throwable) {
 682                 subscription.cancel();
 683                 onError(throwable);
 684             }
 685         }
 686 
 687         @Override
 688         public void onError(Throwable throwable) {
 689             Objects.requireNonNull(throwable);
 690             try {
 691                 subscriber.onError(throwable);
 692             } finally {
 693                 cf.completeExceptionally(throwable);
 694             }
 695         }
 696 
 697         @Override
 698         public void onComplete() {
 699             try {
 700                 subscriber.onComplete();
 701             } finally {
 702                 try {
 703                     cf.complete(finisher.apply(subscriber));
 704                 } catch (Throwable throwable) {
 705                     cf.completeExceptionally(throwable);
 706                 }
 707             }
 708         }
 709 
 710         @Override
 711         public CompletionStage<R> getBody() {
 712             return cf;
 713         }
 714     }
 715 
 716     /**
 717      * A body subscriber which receives input from an upstream subscriber
 718      * and maps that subscriber's body type to a new type. The upstream subscriber
 719      * delegates all flow operations directly to this object. The
 720      * {@link CompletionStage} returned by {@link #getBody()}} takes the output
 721      * of the upstream {@code getBody()} and applies the mapper function to
 722      * obtain the new {@code CompletionStage} type.
 723      *
 724      * @param <T> the upstream body type
 725      * @param <U> this subscriber's body type
 726      */
 727     public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {
 728         private final BodySubscriber<T> upstream;
 729         private final Function<? super T,? extends U> mapper;
 730         private final boolean trusted;
 731 
 732         public MappingSubscriber(BodySubscriber<T> upstream,
 733                                  Function<? super T,? extends U> mapper) {
 734             this(upstream, mapper, false);
 735         }
 736 
 737         // creates a MappingSubscriber with a mapper that is trusted
 738         // to not block when called.
 739         MappingSubscriber(BodySubscriber<T> upstream,
 740                           Function<? super T,? extends U> mapper,
 741                           boolean trusted) {
 742             this.upstream = Objects.requireNonNull(upstream);
 743             this.mapper = Objects.requireNonNull(mapper);
 744             this.trusted = trusted;
 745         }
 746 
 747         // There is no way to know whether a custom mapper function
 748         // might block or not - so we should return true unless the
 749         // mapper is implemented and trusted by our own code not to
 750         // block.
 751         @Override
 752         public boolean needsExecutor() {
 753             return !trusted || TrustedSubscriber.needsExecutor(upstream);
 754         }
 755 
 756         // If upstream.getBody() is already completed (case of InputStream),
 757         // then calling upstream.getBody().thenApply(mapper) might block
 758         // if the mapper blocks. We should probably add a variant of
 759         // MappingSubscriber that calls thenApplyAsync instead, but this
 760         // needs a new public API point. See needsExecutor() above.
 761         @Override
 762         public CompletionStage<U> getBody() {
 763             return upstream.getBody().thenApply(mapper);
 764         }
 765 
 766         @Override
 767         public void onSubscribe(Flow.Subscription subscription) {
 768             upstream.onSubscribe(subscription);
 769         }
 770 
 771         @Override
 772         public void onNext(List<ByteBuffer> item) {
 773             upstream.onNext(item);
 774         }
 775 
 776         @Override
 777         public void onError(Throwable throwable) {
 778             upstream.onError(throwable);
 779         }
 780 
 781         @Override
 782         public void onComplete() {
 783             upstream.onComplete();
 784         }
 785     }
 786 
 787     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
 788     static class PublishingBodySubscriber
 789             implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {
 790         private final MinimalFuture<Flow.Subscription>
 791                 subscriptionCF = new MinimalFuture<>();
 792         private final MinimalFuture<SubscriberRef>
 793                 subscribedCF = new MinimalFuture<>();
 794         private AtomicReference<SubscriberRef>
 795                 subscriberRef = new AtomicReference<>();
 796         private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =
 797                 subscriptionCF.thenCompose(
 798                         (s) -> MinimalFuture.completedFuture(this::subscribe));
 799 
 800         // We use the completionCF to ensure that only one of
 801         // onError or onComplete is ever called.
 802         private final MinimalFuture<Void> completionCF;
 803         private PublishingBodySubscriber() {
 804             completionCF = new MinimalFuture<>();
 805             completionCF.whenComplete(
 806                     (r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));
 807         }
 808 
 809         // An object that holds a reference to a Flow.Subscriber.
 810         // The reference is cleared when the subscriber is completed - either
 811         // normally or exceptionally, or when the subscription is cancelled.
 812         static final class SubscriberRef {
 813             volatile Flow.Subscriber<? super List<ByteBuffer>> ref;
 814             SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 815                 ref = subscriber;
 816             }
 817             Flow.Subscriber<? super List<ByteBuffer>> get() {
 818                 return ref;
 819             }
 820             Flow.Subscriber<? super List<ByteBuffer>> clear() {
 821                 Flow.Subscriber<? super List<ByteBuffer>> res = ref;
 822                 ref = null;
 823                 return res;
 824             }
 825         }
 826 
 827         // A subscription that wraps an upstream subscription and
 828         // holds a reference to a subscriber. The subscriber reference
 829         // is cleared when the subscription is cancelled
 830         final static class SubscriptionRef implements Flow.Subscription {
 831             final Flow.Subscription subscription;
 832             final SubscriberRef subscriberRef;
 833             SubscriptionRef(Flow.Subscription subscription,
 834                             SubscriberRef subscriberRef) {
 835                 this.subscription = subscription;
 836                 this.subscriberRef = subscriberRef;
 837             }
 838             @Override
 839             public void request(long n) {
 840                 if (subscriberRef.get() != null) {
 841                     subscription.request(n);
 842                 }
 843             }
 844             @Override
 845             public void cancel() {
 846                 subscription.cancel();
 847                 subscriberRef.clear();
 848             }
 849 
 850             void subscribe() {
 851                 Subscriber<?> subscriber = subscriberRef.get();
 852                 if (subscriber != null) {
 853                     subscriber.onSubscribe(this);
 854                 }
 855             }
 856 
 857             @Override
 858             public String toString() {
 859                 return "SubscriptionRef/"
 860                         + subscription.getClass().getName()
 861                         + "@"
 862                         + System.identityHashCode(subscription);
 863             }
 864         }
 865 
 866         // This is a callback for the subscribedCF.
 867         // Do not call directly!
 868         private void complete(SubscriberRef ref, Throwable t) {
 869             assert ref != null;
 870             Subscriber<?> s = ref.clear();
 871             // maybe null if subscription was cancelled
 872             if (s == null) return;
 873             if (t == null) {
 874                 try {
 875                     s.onComplete();
 876                 } catch (Throwable x) {
 877                     s.onError(x);
 878                 }
 879             } else {
 880                 s.onError(t);
 881             }
 882         }
 883 
 884         private void signalError(Throwable err) {
 885             if (err == null) {
 886                 err = new NullPointerException("null throwable");
 887             }
 888             completionCF.completeExceptionally(err);
 889         }
 890 
 891         private void signalComplete() {
 892             completionCF.complete(null);
 893         }
 894 
 895         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 896             Objects.requireNonNull(subscriber, "subscriber must not be null");
 897             SubscriberRef ref = new SubscriberRef(subscriber);
 898             if (subscriberRef.compareAndSet(null, ref)) {
 899                 subscriptionCF.thenAccept((s) -> {
 900                     SubscriptionRef subscription = new SubscriptionRef(s,ref);
 901                     try {
 902                         subscription.subscribe();
 903                         subscribedCF.complete(ref);
 904                     } catch (Throwable t) {
 905                         if (Log.errors()) {
 906                             Log.logError("Failed to call onSubscribe: " +
 907                                     "cancelling subscription: " + t);
 908                             Log.logError(t);
 909                         }
 910                         subscription.cancel();
 911                     }
 912                 });
 913             } else {
 914                 subscriber.onSubscribe(new Flow.Subscription() {
 915                     @Override public void request(long n) { }
 916                     @Override public void cancel() { }
 917                 });
 918                 subscriber.onError(new IllegalStateException(
 919                         "This publisher has already one subscriber"));
 920             }
 921         }
 922 
 923         private final AtomicBoolean subscribed = new AtomicBoolean();
 924 
 925         @Override
 926         public void onSubscribe(Flow.Subscription subscription) {
 927             Objects.requireNonNull(subscription);
 928             if (!subscribed.compareAndSet(false, true)) {
 929                 subscription.cancel();
 930             } else {
 931                 subscriptionCF.complete(subscription);
 932             }
 933         }
 934 
 935         @Override
 936         public void onNext(List<ByteBuffer> item) {
 937             Objects.requireNonNull(item);
 938             try {
 939                 // cannot be called before onSubscribe()
 940                 assert subscriptionCF.isDone();
 941                 SubscriberRef ref = subscriberRef.get();
 942                 // cannot be called before subscriber calls request(1)
 943                 assert ref != null;
 944                 Flow.Subscriber<? super List<ByteBuffer>>
 945                         subscriber = ref.get();
 946                 if (subscriber != null) {
 947                     // may be null if subscription was cancelled.
 948                     subscriber.onNext(item);
 949                 }
 950             } catch (Throwable err) {
 951                 signalError(err);
 952                 subscriptionCF.thenAccept(s -> s.cancel());
 953             }
 954         }
 955 
 956         @Override
 957         public void onError(Throwable throwable) {
 958             // cannot be called before onSubscribe();
 959             assert suppress(subscriptionCF.isDone(),
 960                     "onError called before onSubscribe",
 961                     throwable);
 962             // onError can be called before request(1), and therefore can
 963             // be called before subscriberRef is set.
 964             signalError(throwable);
 965             Objects.requireNonNull(throwable);
 966         }
 967 
 968         @Override
 969         public void onComplete() {
 970             // cannot be called before onSubscribe()
 971             if (!subscriptionCF.isDone()) {
 972                 signalError(new InternalError(
 973                         "onComplete called before onSubscribed"));
 974             } else {
 975                 // onComplete can be called before request(1),
 976                 // and therefore can be called before subscriberRef
 977                 // is set.
 978                 signalComplete();
 979             }
 980         }
 981 
 982         @Override
 983         public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
 984             return body;
 985         }
 986 
 987         private boolean suppress(boolean condition,
 988                                  String assertion,
 989                                  Throwable carrier) {
 990             if (!condition) {
 991                 if (carrier != null) {
 992                     carrier.addSuppressed(new AssertionError(assertion));
 993                 } else if (Log.errors()) {
 994                     Log.logError(new AssertionError(assertion));
 995                 }
 996             }
 997             return true;
 998         }
 999 
1000     }
1001 
1002     public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>
1003     createPublisher() {
1004         return new PublishingBodySubscriber();
1005     }
1006 
1007 
1008     /**
1009      * Tries to determine whether bs::getBody must be invoked asynchronously,
1010      * and if so, uses the provided executor to do it.
1011      * If the executor is a {@link HttpClientImpl.DelegatingExecutor},
1012      * uses the executor's delegate.
1013      * @param e    The executor to use if an executor is required.
1014      * @param bs   The BodySubscriber (trusted or not)
1015      * @param <T>  The type of the response.
1016      * @return A completion stage that completes when the completion
1017      *         stage returned by bs::getBody completes. This may, or
1018      *         may not, be the same completion stage.
1019      */
1020     public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {
1021         if (TrustedSubscriber.needsExecutor(bs)) {
1022             // getBody must be called in the executor
1023             return getBodyAsync(e, bs, new MinimalFuture<>());
1024         } else {
1025             // No executor needed
1026             return bs.getBody();
1027         }
1028     }
1029 
1030     /**
1031      * Invokes bs::getBody using the provided executor.
1032      * If invoking bs::getBody requires an executor, and the given executor
1033      * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1034      * delegate is used. If an error occurs anywhere then the given {code cf}
1035      * is completed exceptionally (this method does not throw).
1036      * @param e   The executor that should be used to call bs::getBody
1037      * @param bs  The BodySubscriber
1038      * @param cf  A completable future that this function will set up
1039      *            to complete when the completion stage returned by
1040      *            bs::getBody completes.
1041      *            In case of any error while trying to set up the
1042      *            completion chain, {@code cf} will be completed
1043      *            exceptionally with that error.
1044      * @param <T> The response type.
1045      * @return The provided {@code cf}.
1046      */
1047     public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1048                                                       BodySubscriber<T> bs,
1049                                                       CompletableFuture<T> cf) {
1050         return getBodyAsync(e, bs, cf, cf::completeExceptionally);
1051     }
1052 
1053     /**
1054      * Invokes bs::getBody using the provided executor.
1055      * If invoking bs::getBody requires an executor, and the given executor
1056      * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
1057      * delegate is used.
1058      * The provided {@code cf} is completed with the result (exceptional
1059      * or not) of the completion stage returned by bs::getBody.
1060      * If an error occurs when trying to set up the
1061      * completion chain, the provided {@code errorHandler} is invoked,
1062      * but {@code cf} is not necessarily affected.
1063      * This method does not throw.
1064      * @param e   The executor that should be used to call bs::getBody
1065      * @param bs  The BodySubscriber
1066      * @param cf  A completable future that this function will set up
1067      *            to complete when the completion stage returned by
1068      *            bs::getBody completes.
1069      *            In case of any error while trying to set up the
1070      *            completion chain, {@code cf} will be completed
1071      *            exceptionally with that error.
1072      * @param errorHandler The handler to invoke if an error is raised
1073      *                     while trying to set up the completion chain.
1074      * @param <T> The response type.
1075      * @return The provide {@code cf}. If the {@code errorHandler} is
1076      * invoked, it is the responsibility of the {@code errorHandler} to
1077      * complete the {@code cf}, if needed.
1078      */
1079     public static <T> CompletableFuture<T> getBodyAsync(Executor e,
1080                                                       BodySubscriber<T> bs,
1081                                                       CompletableFuture<T> cf,
1082                                                       Consumer<Throwable> errorHandler) {
1083         assert errorHandler != null;
1084         try {
1085             assert e != null;
1086             assert cf != null;
1087 
1088             if (TrustedSubscriber.needsExecutor(bs)) {
1089                 e = (e instanceof HttpClientImpl.DelegatingExecutor)
1090                         ? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;
1091             }
1092 
1093             e.execute(() -> {
1094                 try {
1095                     bs.getBody().whenComplete((r, t) -> {
1096                         if (t != null) {
1097                             cf.completeExceptionally(t);
1098                         } else {
1099                             cf.complete(r);
1100                         }
1101                     });
1102                 } catch (Throwable t) {
1103                     errorHandler.accept(t);
1104                 }
1105             });
1106             return cf;
1107 
1108         } catch (Throwable t) {
1109             errorHandler.accept(t);
1110         }
1111         return cf;
1112     }
1113 }