1 /*
   2  * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import java.io.BufferedReader;
  29 import java.io.IOException;
  30 import java.io.InputStream;
  31 import java.net.URI;
  32 import java.nio.ByteBuffer;
  33 import java.nio.charset.Charset;
  34 import java.nio.channels.FileChannel;
  35 import java.nio.charset.StandardCharsets;
  36 import java.nio.file.OpenOption;
  37 import java.nio.file.Path;
  38 import java.util.List;
  39 import java.util.Objects;
  40 import java.util.Optional;
  41 import java.util.concurrent.CompletableFuture;
  42 import java.util.concurrent.CompletionStage;
  43 import java.util.concurrent.ConcurrentMap;
  44 import java.util.concurrent.Flow;
  45 import java.util.concurrent.Flow.Subscriber;
  46 import java.util.concurrent.Flow.Publisher;
  47 import java.util.concurrent.Flow.Subscription;
  48 import java.util.function.Consumer;
  49 import java.util.function.Function;
  50 import java.util.function.Supplier;
  51 import java.util.stream.Stream;
  52 import javax.net.ssl.SSLSession;
  53 import jdk.internal.net.http.BufferingSubscriber;
  54 import jdk.internal.net.http.LineSubscriberAdapter;
  55 import jdk.internal.net.http.ResponseBodyHandlers.FileDownloadBodyHandler;
  56 import jdk.internal.net.http.ResponseBodyHandlers.PathBodyHandler;
  57 import jdk.internal.net.http.ResponseBodyHandlers.PushPromisesHandlerWithMap;
  58 import jdk.internal.net.http.ResponseSubscribers;
  59 import jdk.internal.net.http.ResponseSubscribers.PathSubscriber;
  60 import static java.nio.file.StandardOpenOption.*;
  61 import static jdk.internal.net.http.common.Utils.charsetFrom;
  62 
  63 /**
  64  * An HTTP response.
  65  *
  66  * <p> An {@code HttpResponse} is not created directly, but rather returned as
  67  * a result of sending an {@link HttpRequest}. An {@code HttpResponse} is
  68  * made available when the response status code and headers have been received,
  69  * and typically after the response body has also been completely received.
  70  * Whether or not the {@code HttpResponse} is made available before the response
  71  * body has been completely received depends on the {@link BodyHandler
  72  * BodyHandler} provided when sending the {@code HttpRequest}.
  73  *
  74  * <p> This class provides methods for accessing the response status code,
  75  * headers, the response body, and the {@code HttpRequest} corresponding
  76  * to this response.
  77  *
  78  * <p> The following is an example of retrieving a response as a String:
  79  *
  80  * <pre>{@code    HttpResponse<String> response = client
  81  *     .send(request, BodyHandlers.ofString()); }</pre>
  82  *
  83  * <p> The class {@link BodyHandlers BodyHandlers} provides implementations
  84  * of many common response handlers. Alternatively, a custom {@code BodyHandler}
  85  * implementation can be used.
  86  *
  87  * @param <T> the response body type
  88  * @since 11
  89  */
  90 public interface HttpResponse<T> {
  91 
  92 
  93     /**
  94      * Returns the status code for this response.
  95      *
  96      * @return the response code
  97      */
  98     public int statusCode();
  99 
 100     /**
 101      * Returns the {@link HttpRequest} corresponding to this response.
 102      *
 103      * <p> The returned {@code HttpRequest} may not be the initiating request
 104      * provided when {@linkplain HttpClient#send(HttpRequest, BodyHandler)
 105      * sending}. For example, if the initiating request was redirected, then the
 106      * request returned by this method will have the redirected URI, which will
 107      * be different from the initiating request URI.
 108      *
 109      * @see #previousResponse()
 110      *
 111      * @return the request
 112      */
 113     public HttpRequest request();
 114 
 115     /**
 116      * Returns an {@code Optional} containing the previous intermediate response
 117      * if one was received. An intermediate response is one that is received
 118      * as a result of redirection or authentication. If no previous response
 119      * was received then an empty {@code Optional} is returned.
 120      *
 121      * @return an Optional containing the HttpResponse, if any.
 122      */
 123     public Optional<HttpResponse<T>> previousResponse();
 124 
 125     /**
 126      * Returns the received response headers.
 127      *
 128      * @return the response headers
 129      */
 130     public HttpHeaders headers();
 131 
 132     /**
 133      * Returns the body. Depending on the type of {@code T}, the returned body
 134      * may represent the body after it was read (such as {@code byte[]}, or
 135      * {@code String}, or {@code Path}) or it may represent an object with
 136      * which the body is read, such as an {@link java.io.InputStream}.
 137      *
 138      * <p> If this {@code HttpResponse} was returned from an invocation of
 139      * {@link #previousResponse()} then this method returns {@code null}
 140      *
 141      * @return the body
 142      */
 143     public T body();
 144 
 145     /**
 146      * Returns an {@link Optional} containing the {@link SSLSession} in effect
 147      * for this response. Returns an empty {@code Optional} if this is not a
 148      * <i>HTTPS</i> response.
 149      *
 150      * @return an {@code Optional} containing the {@code SSLSession} associated
 151      *         with the response
 152      */
 153     public Optional<SSLSession> sslSession();
 154 
 155     /**
 156      * Returns the {@code URI} that the response was received from. This may be
 157      * different from the request {@code URI} if redirection occurred.
 158      *
 159      * @return the URI of the response
 160      */
 161      public URI uri();
 162 
 163     /**
 164      * Returns the HTTP protocol version that was used for this response.
 165      *
 166      * @return HTTP protocol version
 167      */
 168     public HttpClient.Version version();
 169 
 170 
 171     /**
 172      * Initial response information supplied to a {@link BodyHandler BodyHandler}
 173      * when a response is initially received and before the body is processed.
 174      */
 175     public interface ResponseInfo {
 176         /**
 177          * Provides the response status code.
 178          * @return the response status code
 179          */
 180         public int statusCode();
 181 
 182         /**
 183          * Provides the response headers.
 184          * @return the response headers
 185          */
 186         public HttpHeaders headers();
 187 
 188         /**
 189          * Provides the response protocol version.
 190          * @return the response protocol version
 191          */
 192         public HttpClient.Version version();
 193     }
 194 
 195     /**
 196      * A handler for response bodies.  The class {@link BodyHandlers BodyHandlers}
 197      * provides implementations of many common body handlers.
 198      *
 199      * <p> The {@code BodyHandler} interface allows inspection of the response
 200      * code and headers, before the actual response body is received, and is
 201      * responsible for creating the response {@link BodySubscriber
 202      * BodySubscriber}. The {@code BodySubscriber} consumes the actual response
 203      * body bytes and, typically, converts them into a higher-level Java type.
 204      *
 205      * <p> A {@code BodyHandler} is a function that takes a {@link ResponseInfo
 206      * ResponseInfo} object; and which returns a {@code BodySubscriber}. The
 207      * {@code BodyHandler} is invoked when the response status code and headers
 208      * are available, but before the response  body bytes are received.
 209      *
 210      * <p> The following example uses one of the {@linkplain BodyHandlers
 211      * predefined body handlers} that always process the response body in the
 212      * same way ( streams the response body to a file ).
 213      *
 214      * <pre>{@code   HttpRequest request = HttpRequest.newBuilder()
 215      *        .uri(URI.create("http://www.foo.com/"))
 216      *        .build();
 217      *  client.sendAsync(request, BodyHandlers.ofFile(Paths.get("/tmp/f")))
 218      *        .thenApply(HttpResponse::body)
 219      *        .thenAccept(System.out::println); }</pre>
 220      *
 221      * Note, that even though the pre-defined handlers do not examine the
 222      * response code, the response code and headers are always retrievable from
 223      * the {@link HttpResponse}, when it is returned.
 224      *
 225      * <p> In the second example, the function returns a different subscriber
 226      * depending on the status code.
 227      * <pre>{@code   HttpRequest request = HttpRequest.newBuilder()
 228      *        .uri(URI.create("http://www.foo.com/"))
 229      *        .build();
 230      *  BodyHandler<Path> bodyHandler = (rspInfo) -> rspInfo.statusCode() == 200
 231      *                      ? BodySubscribers.ofFile(Paths.get("/tmp/f"))
 232      *                      : BodySubscribers.replacing(Paths.get("/NULL"));
 233      *  client.sendAsync(request, bodyHandler)
 234      *        .thenApply(HttpResponse::body)
 235      *        .thenAccept(System.out::println); }</pre>
 236      *
 237      * @param <T> the response body type
 238      * @see BodyHandlers
 239      * @since 11
 240      */
 241     @FunctionalInterface
 242     public interface BodyHandler<T> {
 243 
 244         /**
 245          * Returns a {@link BodySubscriber BodySubscriber} considering the
 246          * given response status code and headers. This method is invoked before
 247          * the actual response body bytes are read and its implementation must
 248          * return a {@link BodySubscriber BodySubscriber} to consume the response
 249          * body bytes.
 250          *
 251          * <p> The response body can be discarded using one of {@link
 252          * BodyHandlers#discarding() discarding} or {@link
 253          * BodyHandlers#replacing(Object) replacing}.
 254          *
 255          * @param responseInfo the response info
 256          * @return a body subscriber
 257          */
 258         public BodySubscriber<T> apply(ResponseInfo responseInfo);
 259     }
 260 
 261     /**
 262      * Implementations of {@link BodyHandler BodyHandler} that implement various
 263      * useful handlers, such as handling the response body as a String, or
 264      * streaming the response body to a file.
 265      *
 266      * <p> These implementations do not examine the status code, meaning the
 267      * body is always accepted. They typically return an equivalently named
 268      * {@code BodySubscriber}. Alternatively, a custom handler can be used to
 269      * examine the status code and headers, and return a different body
 270      * subscriber, of the same type, as appropriate.
 271      *
 272      * <p>The following are examples of using the predefined body handlers to
 273      * convert a flow of response body data into common high-level Java objects:
 274      *
 275      * <pre>{@code    // Receives the response body as a String
 276      *   HttpResponse<String> response = client
 277      *     .send(request, BodyHandlers.ofString());
 278      *
 279      *   // Receives the response body as a file
 280      *   HttpResponse<Path> response = client
 281      *     .send(request, BodyHandlers.ofFile(Paths.get("example.html")));
 282      *
 283      *   // Receives the response body as an InputStream
 284      *   HttpResponse<InputStream> response = client
 285      *     .send(request, BodyHandlers.ofInputStream());
 286      *
 287      *   // Discards the response body
 288      *   HttpResponse<Void> response = client
 289      *     .send(request, BodyHandlers.discarding());  }</pre>
 290      *
 291      * @since 11
 292      */
 293     public static class BodyHandlers {
 294 
 295         private BodyHandlers() { }
 296 
 297         /**
 298          * Returns a response body handler that returns a {@link BodySubscriber
 299          * BodySubscriber}{@code <Void>} obtained from {@link
 300          * BodySubscribers#fromSubscriber(Subscriber)}, with the given
 301          * {@code subscriber}.
 302          *
 303          * <p> The response body is not available through this, or the {@code
 304          * HttpResponse} API, but instead all response body is forwarded to the
 305          * given {@code subscriber}, which should make it available, if
 306          * appropriate, through some other mechanism, e.g. an entry in a
 307          * database, etc.
 308          *
 309          * @apiNote This method can be used as an adapter between {@code
 310          * BodySubscriber} and {@code Flow.Subscriber}.
 311          *
 312          * <p> For example:
 313          * <pre> {@code  TextSubscriber subscriber = new TextSubscriber();
 314          *  HttpResponse<Void> response = client.sendAsync(request,
 315          *      BodyHandlers.fromSubscriber(subscriber)).join();
 316          *  System.out.println(response.statusCode()); }</pre>
 317          *
 318          * @param subscriber the subscriber
 319          * @return a response body handler
 320          */
 321         public static BodyHandler<Void>
 322         fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
 323             Objects.requireNonNull(subscriber);
 324             return (responseInfo) -> BodySubscribers.fromSubscriber(subscriber,
 325                                                                        s -> null);
 326         }
 327 
 328         /**
 329          * Returns a response body handler that returns a {@link BodySubscriber
 330          * BodySubscriber}{@code <T>} obtained from {@link
 331          * BodySubscribers#fromSubscriber(Subscriber, Function)}, with the
 332          * given {@code subscriber} and {@code finisher} function.
 333          *
 334          * <p> The given {@code finisher} function is applied after the given
 335          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
 336          * function is invoked with the given subscriber, and returns a value
 337          * that is set as the response's body.
 338          *
 339          * @apiNote This method can be used as an adapter between {@code
 340          * BodySubscriber} and {@code Flow.Subscriber}.
 341          *
 342          * <p> For example:
 343          * <pre> {@code  TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
 344          *  HttpResponse<String> response = client.sendAsync(request,
 345          *      BodyHandlers.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
 346          *  String text = response.body(); }</pre>
 347          *
 348          * @param <S> the type of the Subscriber
 349          * @param <T> the type of the response body
 350          * @param subscriber the subscriber
 351          * @param finisher a function to be applied after the subscriber has completed
 352          * @return a response body handler
 353          */
 354         public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
 355         fromSubscriber(S subscriber, Function<? super S,? extends T> finisher) {
 356             Objects.requireNonNull(subscriber);
 357             Objects.requireNonNull(finisher);
 358             return (responseInfo) -> BodySubscribers.fromSubscriber(subscriber,
 359                                                                       finisher);
 360         }
 361 
 362         /**
 363          * Returns a response body handler that returns a {@link BodySubscriber
 364          * BodySubscriber}{@code <Void>} obtained from {@link
 365          * BodySubscribers#fromLineSubscriber(Subscriber, Function, Charset, String)
 366          * BodySubscribers.fromLineSubscriber(subscriber, s -> null, charset, null)},
 367          * with the given {@code subscriber}.
 368          * The {@link Charset charset} used to decode the response body bytes is
 369          * obtained from the HTTP response headers as specified by {@link #ofString()},
 370          * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
 371          *
 372          * <p> The response body is not available through this, or the {@code
 373          * HttpResponse} API, but instead all response body is forwarded to the
 374          * given {@code subscriber}, which should make it available, if
 375          * appropriate, through some other mechanism, e.g. an entry in a
 376          * database, etc.
 377          *
 378          * @apiNote This method can be used as an adapter between a {@code
 379          * BodySubscriber} and a text based {@code Flow.Subscriber} that parses
 380          * text line by line.
 381          *
 382          * <p> For example:
 383          * <pre> {@code  // A PrintSubscriber that implements Flow.Subscriber<String>
 384          *  // and print lines received by onNext() on System.out
 385          *  PrintSubscriber subscriber = new PrintSubscriber(System.out);
 386          *  client.sendAsync(request, BodyHandlers.fromLineSubscriber(subscriber))
 387          *      .thenApply(HttpResponse::statusCode)
 388          *      .thenAccept((status) -> {
 389          *          if (status != 200) {
 390          *              System.err.printf("ERROR: %d status received%n", status);
 391          *          }
 392          *      }); }</pre>
 393          *
 394          * @param subscriber the subscriber
 395          * @return a response body handler
 396          */
 397         public static BodyHandler<Void>
 398         fromLineSubscriber(Subscriber<? super String> subscriber) {
 399             Objects.requireNonNull(subscriber);
 400             return (responseInfo) ->
 401                         BodySubscribers.fromLineSubscriber(subscriber,
 402                                                            s -> null,
 403                                                            charsetFrom(responseInfo.headers()),
 404                                                            null);
 405         }
 406 
 407         /**
 408          * Returns a response body handler that returns a {@link BodySubscriber
 409          * BodySubscriber}{@code <T>} obtained from {@link
 410          * BodySubscribers#fromLineSubscriber(Subscriber, Function, Charset, String)
 411          * BodySubscribers.fromLineSubscriber(subscriber, finisher, charset, lineSeparator)},
 412          * with the given {@code subscriber}, {@code finisher} function, and line separator.
 413          * The {@link Charset charset} used to decode the response body bytes is
 414          * obtained from the HTTP response headers as specified by {@link #ofString()}.
 415          *
 416          * <p> The given {@code finisher} function is applied after the given
 417          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
 418          * function is invoked with the given subscriber, and returns a value
 419          * that is set as the response's body.
 420          *
 421          * @apiNote This method can be used as an adapter between a {@code
 422          * BodySubscriber} and a text based {@code Flow.Subscriber} that parses
 423          * text line by line.
 424          *
 425          * <p> For example:
 426          * <pre> {@code  // A LineParserSubscriber that implements Flow.Subscriber<String>
 427          *  // and accumulates lines that match a particular pattern
 428          *  Pattern pattern = ...;
 429          *  LineParserSubscriber subscriber = new LineParserSubscriber(pattern);
 430          *  HttpResponse<List<String>> response = client.send(request,
 431          *      BodyHandlers.fromLineSubscriber(subscriber, s -> s.getMatchingLines(), "\n"));
 432          *  if (response.statusCode() != 200) {
 433          *      System.err.printf("ERROR: %d status received%n", response.statusCode());
 434          *  } }</pre>
 435          *
 436          *
 437          * @param <S> the type of the Subscriber
 438          * @param <T> the type of the response body
 439          * @param subscriber the subscriber
 440          * @param finisher a function to be applied after the subscriber has completed
 441          * @param lineSeparator an optional line separator: can be {@code null},
 442          *                      in which case lines will be delimited in the manner of
 443          *                      {@link BufferedReader#readLine()}.
 444          * @return a response body handler
 445          * @throws IllegalArgumentException if the supplied {@code lineSeparator}
 446          *         is the empty string
 447          */
 448         public static <S extends Subscriber<? super String>,T> BodyHandler<T>
 449         fromLineSubscriber(S subscriber,
 450                            Function<? super S,? extends T> finisher,
 451                            String lineSeparator) {
 452             Objects.requireNonNull(subscriber);
 453             Objects.requireNonNull(finisher);
 454             // implicit null check
 455             if (lineSeparator != null && lineSeparator.isEmpty())
 456                 throw new IllegalArgumentException("empty line separator");
 457             return (responseInfo) ->
 458                         BodySubscribers.fromLineSubscriber(subscriber,
 459                                                            finisher,
 460                                                            charsetFrom(responseInfo.headers()),
 461                                                            lineSeparator);
 462         }
 463 
 464         /**
 465          * Returns a response body handler that discards the response body.
 466          *
 467          * @return a response body handler
 468          */
 469         public static BodyHandler<Void> discarding() {
 470             return (responseInfo) -> BodySubscribers.discarding();
 471         }
 472 
 473         /**
 474          * Returns a response body handler that returns the given replacement
 475          * value, after discarding the response body.
 476          *
 477          * @param <U> the response body type
 478          * @param value the value of U to return as the body, may be {@code null}
 479          * @return a response body handler
 480          */
 481         public static <U> BodyHandler<U> replacing(U value) {
 482             return (responseInfo) -> BodySubscribers.replacing(value);
 483         }
 484 
 485         /**
 486          * Returns a {@code BodyHandler<String>} that returns a
 487          * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from
 488          * {@link BodySubscribers#ofString(Charset) BodySubscribers.ofString(Charset)}.
 489          * The body is decoded using the given character set.
 490          *
 491          * @param charset the character set to convert the body with
 492          * @return a response body handler
 493          */
 494         public static BodyHandler<String> ofString(Charset charset) {
 495             Objects.requireNonNull(charset);
 496             return (responseInfo) -> BodySubscribers.ofString(charset);
 497         }
 498 
 499         /**
 500          * Returns a {@code BodyHandler<Path>} that returns a
 501          * {@link BodySubscriber BodySubscriber}{@code <Path>} obtained from
 502          * {@link BodySubscribers#ofFile(Path, OpenOption...)
 503          * BodySubscribers.ofFile(Path,OpenOption...)}.
 504          *
 505          * <p> When the {@code HttpResponse} object is returned, the body has
 506          * been completely written to the file, and {@link #body()} returns a
 507          * reference to its {@link Path}.
 508          *
 509          * <p> Security manager permission checks are performed in this factory
 510          * method, when the {@code BodyHandler} is created. Care must be taken
 511          * that the {@code BodyHandler} is not shared with untrusted code.
 512          *
 513          * @param file the file to store the body in
 514          * @param openOptions any options to use when opening/creating the file
 515          * @return a response body handler
 516          * @throws IllegalArgumentException if an invalid set of open options
 517          *          are specified
 518          * @throws SecurityException If a security manager has been installed
 519          *          and it denies {@linkplain SecurityManager#checkWrite(String)
 520          *          write access} to the file.
 521          */
 522         public static BodyHandler<Path> ofFile(Path file, OpenOption... openOptions) {
 523             Objects.requireNonNull(file);
 524             List<OpenOption> opts = List.of(openOptions);
 525             if (opts.contains(DELETE_ON_CLOSE) || opts.contains(READ)) {
 526                 // these options make no sense, since the FileChannel is not exposed
 527                 throw new IllegalArgumentException("invalid openOptions: " + opts);
 528             }
 529             return PathBodyHandler.create(file, opts);
 530         }
 531 
 532         /**
 533          * Returns a {@code BodyHandler<Path>} that returns a
 534          * {@link BodySubscriber BodySubscriber}{@code <Path>}.
 535          *
 536          * <p> Equivalent to: {@code ofFile(file, CREATE, WRITE)}
 537          *
 538          * <p> Security manager permission checks are performed in this factory
 539          * method, when the {@code BodyHandler} is created. Care must be taken
 540          * that the {@code BodyHandler} is not shared with untrusted code.
 541          *
 542          * @param file the file to store the body in
 543          * @return a response body handler
 544          * @throws SecurityException If a security manager has been installed
 545          *          and it denies {@linkplain SecurityManager#checkWrite(String)
 546          *          write access} to the file.
 547          */
 548         public static BodyHandler<Path> ofFile(Path file) {
 549             return BodyHandlers.ofFile(file, CREATE, WRITE);
 550         }
 551 
 552         /**
 553          * Returns a {@code BodyHandler<Path>} that returns a
 554          * {@link BodySubscriber BodySubscriber}&lt;{@link Path}&gt;
 555          * where the download directory is specified, but the filename is
 556          * obtained from the {@code Content-Disposition} response header. The
 557          * {@code Content-Disposition} header must specify the <i>attachment</i>
 558          * type and must also contain a <i>filename</i> parameter. If the
 559          * filename specifies multiple path components only the final component
 560          * is used as the filename (with the given directory name).
 561          *
 562          * <p> When the {@code HttpResponse} object is returned, the body has
 563          * been completely written to the file and {@link #body()} returns a
 564          * {@code Path} object for the file. The returned {@code Path} is the
 565          * combination of the supplied directory name and the file name supplied
 566          * by the server. If the destination directory does not exist or cannot
 567          * be written to, then the response will fail with an {@link IOException}.
 568          *
 569          * <p> Security manager permission checks are performed in this factory
 570          * method, when the {@code BodyHandler} is created. Care must be taken
 571          * that the {@code BodyHandler} is not shared with untrusted code.
 572          *
 573          * @param directory the directory to store the file in
 574          * @param openOptions open options used when opening the file
 575          * @return a response body handler
 576          * @throws IllegalArgumentException if the given path does not exist,
 577          *          is not a directory, is not writable, or if an invalid set
 578          *          of open options are specified
 579          * @throws SecurityException If a security manager has been installed
 580          *          and it denies
 581          *          {@linkplain SecurityManager#checkRead(String) read access}
 582          *          to the directory, or it denies
 583          *          {@linkplain SecurityManager#checkWrite(String) write access}
 584          *          to the directory, or it denies
 585          *          {@linkplain SecurityManager#checkWrite(String) write access}
 586          *          to the files within the directory.
 587          */
 588         public static BodyHandler<Path> ofFileDownload(Path directory,
 589                                                        OpenOption... openOptions) {
 590             Objects.requireNonNull(directory);
 591             List<OpenOption> opts = List.of(openOptions);
 592             if (opts.contains(DELETE_ON_CLOSE)) {
 593                 throw new IllegalArgumentException("invalid option: " + DELETE_ON_CLOSE);
 594             }
 595             return FileDownloadBodyHandler.create(directory, opts);
 596         }
 597 
 598         /**
 599          * Returns a {@code BodyHandler<InputStream>} that returns a
 600          * {@link BodySubscriber BodySubscriber}{@code <InputStream>} obtained from
 601          * {@link BodySubscribers#ofInputStream() BodySubscribers.ofInputStream}.
 602          *
 603          * <p> When the {@code HttpResponse} object is returned, the response
 604          * headers will have been completely read, but the body may not have
 605          * been fully received yet. The {@link #body()} method returns an
 606          * {@link InputStream} from which the body can be read as it is received.
 607          *
 608          * @apiNote See {@link BodySubscribers#ofInputStream()} for more
 609          * information.
 610          *
 611          * @return a response body handler
 612          */
 613         public static BodyHandler<InputStream> ofInputStream() {
 614             return (responseInfo) -> BodySubscribers.ofInputStream();
 615         }
 616 
 617         /**
 618          * Returns a {@code BodyHandler<Stream<String>>} that returns a
 619          * {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained
 620          * from {@link BodySubscribers#ofLines(Charset) BodySubscribers.ofLines(charset)}.
 621          * The {@link Charset charset} used to decode the response body bytes is
 622          * obtained from the HTTP response headers as specified by {@link #ofString()},
 623          * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
 624          *
 625          * <p> When the {@code HttpResponse} object is returned, the body may
 626          * not have been completely received.
 627          *
 628          * @return a response body handler
 629          */
 630         public static BodyHandler<Stream<String>> ofLines() {
 631             return (responseInfo) ->
 632                     BodySubscribers.ofLines(charsetFrom(responseInfo.headers()));
 633         }
 634 
 635         /**
 636          * Returns a {@code BodyHandler<Void>} that returns a
 637          * {@link BodySubscriber BodySubscriber}{@code <Void>} obtained from
 638          * {@link BodySubscribers#ofByteArrayConsumer(Consumer)
 639          * BodySubscribers.ofByteArrayConsumer(Consumer)}.
 640          *
 641          * <p> When the {@code HttpResponse} object is returned, the body has
 642          * been completely written to the consumer.
 643          *
 644          * @apiNote
 645          * The subscriber returned by this handler is not flow controlled.
 646          * Therefore, the supplied consumer must be able to process whatever
 647          * amount of data is delivered in a timely fashion.
 648          *
 649          * @param consumer a Consumer to accept the response body
 650          * @return a response body handler
 651          */
 652         public static BodyHandler<Void>
 653         ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) {
 654             Objects.requireNonNull(consumer);
 655             return (responseInfo) -> BodySubscribers.ofByteArrayConsumer(consumer);
 656         }
 657 
 658         /**
 659          * Returns a {@code BodyHandler<byte[]>} that returns a
 660          * {@link BodySubscriber BodySubscriber}{@code <byte[]>} obtained
 661          * from {@link BodySubscribers#ofByteArray() BodySubscribers.ofByteArray()}.
 662          *
 663          * <p> When the {@code HttpResponse} object is returned, the body has
 664          * been completely written to the byte array.
 665          *
 666          * @return a response body handler
 667          */
 668         public static BodyHandler<byte[]> ofByteArray() {
 669             return (responseInfo) -> BodySubscribers.ofByteArray();
 670         }
 671 
 672         /**
 673          * Returns a {@code BodyHandler<String>} that returns a
 674          * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from
 675          * {@link BodySubscribers#ofString(Charset) BodySubscribers.ofString(Charset)}.
 676          * The body is decoded using the character set specified in
 677          * the {@code Content-Type} response header. If there is no such
 678          * header, or the character set is not supported, then
 679          * {@link StandardCharsets#UTF_8 UTF_8} is used.
 680          *
 681          * <p> When the {@code HttpResponse} object is returned, the body has
 682          * been completely written to the string.
 683          *
 684          * @return a response body handler
 685          */
 686         public static BodyHandler<String> ofString() {
 687             return (responseInfo) -> BodySubscribers.ofString(charsetFrom(responseInfo.headers()));
 688         }
 689 
 690         /**
 691          * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
 692          * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
 693          * obtained from {@link BodySubscribers#ofPublisher()
 694          * BodySubscribers.ofPublisher()}.
 695          *
 696          * <p> When the {@code HttpResponse} object is returned, the response
 697          * headers will have been completely read, but the body may not have
 698          * been fully received yet. The {@link #body()} method returns a
 699          * {@link Publisher Publisher}{@code <List<ByteBuffer>>} from which the body
 700          * response bytes can be obtained as they are received. The publisher
 701          * can and must be subscribed to only once.
 702          *
 703          * @apiNote See {@link BodySubscribers#ofPublisher()} for more
 704          * information.
 705          *
 706          * @return a response body handler
 707          */
 708         public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
 709             return (responseInfo) -> BodySubscribers.ofPublisher();
 710         }
 711 
 712         /**
 713          * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain
 714          * BodySubscribers#buffering(BodySubscriber,int) buffering BodySubscriber}
 715          * that buffers data before delivering it to the downstream subscriber.
 716          * These {@code BodySubscriber} instances are created by calling
 717          * {@link BodySubscribers#buffering(BodySubscriber,int)
 718          * BodySubscribers.buffering} with a subscriber obtained from the given
 719          * downstream handler and the {@code bufferSize} parameter.
 720          *
 721          * @param <T> the response body type
 722          * @param downstreamHandler the downstream handler
 723          * @param bufferSize the buffer size parameter passed to {@link
 724          *        BodySubscribers#buffering(BodySubscriber,int) BodySubscribers.buffering}
 725          * @return a body handler
 726          * @throws IllegalArgumentException if {@code bufferSize <= 0}
 727          */
 728          public static <T> BodyHandler<T> buffering(BodyHandler<T> downstreamHandler,
 729                                                     int bufferSize) {
 730              Objects.requireNonNull(downstreamHandler);
 731              if (bufferSize <= 0)
 732                  throw new IllegalArgumentException("must be greater than 0");
 733              return (responseInfo) -> BodySubscribers
 734                      .buffering(downstreamHandler.apply(responseInfo),
 735                                 bufferSize);
 736          }
 737     }
 738 
 739     /**
 740      * A handler for push promises.
 741      *
 742      * <p> A <i>push promise</i> is a synthetic request sent by an HTTP/2 server
 743      * when retrieving an initiating client-sent request. The server has
 744      * determined, possibly through inspection of the initiating request, that
 745      * the client will likely need the promised resource, and hence pushes a
 746      * synthetic push request, in the form of a push promise, to the client. The
 747      * client can choose to accept or reject the push promise request.
 748      *
 749      * <p> A push promise request may be received up to the point where the
 750      * response body of the initiating client-sent request has been fully
 751      * received. The delivery of a push promise response, however, is not
 752      * coordinated with the delivery of the response to the initiating
 753      * client-sent request.
 754      *
 755      * @param <T> the push promise response body type
 756      * @since 11
 757      */
 758     public interface PushPromiseHandler<T> {
 759 
 760         /**
 761          * Notification of an incoming push promise.
 762          *
 763          * <p> This method is invoked once for each push promise received, up
 764          * to the point where the response body of the initiating client-sent
 765          * request has been fully received.
 766          *
 767          * <p> A push promise is accepted by invoking the given {@code acceptor}
 768          * function. The {@code acceptor} function must be passed a non-null
 769          * {@code BodyHandler}, that is to be used to handle the promise's
 770          * response body. The acceptor function will return a {@code
 771          * CompletableFuture} that completes with the promise's response.
 772          *
 773          * <p> If the {@code acceptor} function is not successfully invoked,
 774          * then the push promise is rejected. The {@code acceptor} function will
 775          * throw an {@code IllegalStateException} if invoked more than once.
 776          *
 777          * @param initiatingRequest the initiating client-send request
 778          * @param pushPromiseRequest the synthetic push request
 779          * @param acceptor the acceptor function that must be successfully
 780          *                 invoked to accept the push promise
 781          */
 782         public void applyPushPromise(
 783             HttpRequest initiatingRequest,
 784             HttpRequest pushPromiseRequest,
 785             Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor
 786         );
 787 
 788 
 789         /**
 790          * Returns a push promise handler that accumulates push promises, and
 791          * their responses, into the given map.
 792          *
 793          * <p> Entries are added to the given map for each push promise accepted.
 794          * The entry's key is the push request, and the entry's value is a
 795          * {@code CompletableFuture} that completes with the response
 796          * corresponding to the key's push request. A push request is rejected /
 797          * cancelled if there is already an entry in the map whose key is
 798          * {@linkplain HttpRequest#equals equal} to it. A push request is
 799          * rejected / cancelled if it  does not have the same origin as its
 800          * initiating request.
 801          *
 802          * <p> Entries are added to the given map as soon as practically
 803          * possible when a push promise is received and accepted. That way code,
 804          * using such a map like a cache, can determine if a push promise has
 805          * been issued by the server and avoid making, possibly, unnecessary
 806          * requests.
 807          *
 808          * <p> The delivery of a push promise response is not coordinated with
 809          * the delivery of the response to the initiating client-sent request.
 810          * However, when the response body for the initiating client-sent
 811          * request has been fully received, the map is guaranteed to be fully
 812          * populated, that is, no more entries will be added. The individual
 813          * {@code CompletableFutures} contained in the map may or may not
 814          * already be completed at this point.
 815          *
 816          * @param <T> the push promise response body type
 817          * @param pushPromiseHandler t he body handler to use for push promises
 818          * @param pushPromisesMap a map to accumulate push promises into
 819          * @return a push promise handler
 820          */
 821         public static <T> PushPromiseHandler<T>
 822         of(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
 823            ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
 824             return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap);
 825         }
 826     }
 827 
 828     /**
 829      * A {@code BodySubscriber} consumes response body bytes and converts them
 830      * into a higher-level Java type.  The class {@link BodySubscribers
 831      * BodySubscribers} provides implementations of many common body subscribers.
 832      *
 833      * <p> The object acts as a {@link Flow.Subscriber}&lt;{@link List}&lt;{@link
 834      * ByteBuffer}&gt;&gt; to the HTTP Client implementation, which publishes
 835      * lists of ByteBuffers containing the response body. The Flow of data, as
 836      * well as the order of ByteBuffers in the Flow lists, is a strictly ordered
 837      * representation of the response body. Both the Lists and the ByteBuffers,
 838      * once passed to the subscriber, are no longer used by the HTTP Client. The
 839      * subscriber converts the incoming buffers of data to some higher-level
 840      * Java type {@code T}.
 841      *
 842      * <p> The {@link #getBody()} method returns a
 843      * {@link CompletionStage}{@code <T>} that provides the response body
 844      * object. The {@code CompletionStage} must be obtainable at any time. When
 845      * it completes depends on the nature of type {@code T}. In many cases,
 846      * when {@code T} represents the entire body after being consumed then
 847      * the {@code CompletionStage} completes after the body has been consumed.
 848      * If  {@code T} is a streaming type, such as {@link java.io.InputStream
 849      * InputStream}, then it completes before the body has been read, because
 850      * the calling code uses the {@code InputStream} to consume the data.
 851      *
 852      * @apiNote To ensure that all resources associated with the corresponding
 853      * HTTP exchange are properly released, an implementation of {@code
 854      * BodySubscriber} should ensure to {@linkplain Flow.Subscription#request
 855      * request} more data until one of {@link #onComplete() onComplete} or
 856      * {@link #onError(Throwable) onError} are signalled, or {@link
 857      * Flow.Subscription#request cancel} its {@linkplain
 858      * #onSubscribe(Flow.Subscription) subscription} if unable or unwilling to
 859      * do so. Calling {@code cancel} before exhausting the response body data
 860      * may cause the underlying HTTP connection to be closed and prevent it
 861      * from being reused for subsequent operations.
 862      *
 863      * @implNote The flow of data containing the response body is immutable.
 864      * Specifically, it is a flow of unmodifiable lists of read-only ByteBuffers.
 865      *
 866      * @param <T> the response body type
 867      * @see BodySubscribers
 868      * @since 11
 869      */
 870     public interface BodySubscriber<T>
 871             extends Flow.Subscriber<List<ByteBuffer>> {
 872 
 873         /**
 874          * Returns a {@code CompletionStage} which when completed will return
 875          * the response body object. This method can be called at any time
 876          * relative to the other {@link Flow.Subscriber} methods and is invoked
 877          * using the client's {@link HttpClient#executor() executor}.
 878          *
 879          * @return a CompletionStage for the response body
 880          */
 881         public CompletionStage<T> getBody();
 882     }
 883 
 884     /**
 885      * Implementations of {@link BodySubscriber BodySubscriber} that implement
 886      * various useful subscribers, such as converting the response body bytes
 887      * into a String, or streaming the bytes to a file.
 888      *
 889      * <p>The following are examples of using the predefined body subscribers
 890      * to convert a flow of response body data into common high-level Java
 891      * objects:
 892      *
 893      * <pre>{@code    // Streams the response body to a File
 894      *   HttpResponse<byte[]> response = client
 895      *     .send(request, responseInfo -> BodySubscribers.ofByteArray());
 896      *
 897      *   // Accumulates the response body and returns it as a byte[]
 898      *   HttpResponse<byte[]> response = client
 899      *     .send(request, responseInfo -> BodySubscribers.ofByteArray());
 900      *
 901      *   // Discards the response body
 902      *   HttpResponse<Void> response = client
 903      *     .send(request, responseInfo -> BodySubscribers.discarding());
 904      *
 905      *   // Accumulates the response body as a String then maps it to its bytes
 906      *   HttpResponse<byte[]> response = client
 907      *     .send(request, responseInfo ->
 908      *        BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), String::getBytes));
 909      * }</pre>
 910      *
 911      * @since 11
 912      */
 913     public static class BodySubscribers {
 914 
 915         private BodySubscribers() { }
 916 
 917         /**
 918          * Returns a body subscriber that forwards all response body to the
 919          * given {@code Flow.Subscriber}. The {@linkplain BodySubscriber#getBody()
 920          * completion stage} of the returned body subscriber completes after one
 921          * of the given subscribers {@code onComplete} or {@code onError} has
 922          * been invoked.
 923          *
 924          * @apiNote This method can be used as an adapter between {@code
 925          * BodySubscriber} and {@code Flow.Subscriber}.
 926          *
 927          * @param subscriber the subscriber
 928          * @return a body subscriber
 929          */
 930         public static BodySubscriber<Void>
 931         fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
 932             return new ResponseSubscribers.SubscriberAdapter<>(subscriber, s -> null);
 933         }
 934 
 935         /**
 936          * Returns a body subscriber that forwards all response body to the
 937          * given {@code Flow.Subscriber}. The {@linkplain BodySubscriber#getBody()
 938          * completion stage} of the returned body subscriber completes after one
 939          * of the given subscribers {@code onComplete} or {@code onError} has
 940          * been invoked.
 941          *
 942          * <p> The given {@code finisher} function is applied after the given
 943          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
 944          * function is invoked with the given subscriber, and returns a value
 945          * that is set as the response's body.
 946          *
 947          * @apiNote This method can be used as an adapter between {@code
 948          * BodySubscriber} and {@code Flow.Subscriber}.
 949          *
 950          * @param <S> the type of the Subscriber
 951          * @param <T> the type of the response body
 952          * @param subscriber the subscriber
 953          * @param finisher a function to be applied after the subscriber has
 954          *                 completed
 955          * @return a body subscriber
 956          */
 957         public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodySubscriber<T>
 958         fromSubscriber(S subscriber,
 959                        Function<? super S,? extends T> finisher) {
 960             return new ResponseSubscribers.SubscriberAdapter<>(subscriber, finisher);
 961         }
 962 
 963         /**
 964          * Returns a body subscriber that forwards all response body to the
 965          * given {@code Flow.Subscriber}, line by line.
 966          * The {@linkplain BodySubscriber#getBody() completion
 967          * stage} of the returned body subscriber completes after one of the
 968          * given subscribers {@code onComplete} or {@code onError} has been
 969          * invoked.
 970          * Bytes are decoded using the {@link StandardCharsets#UTF_8
 971          * UTF-8} charset, and lines are delimited in the manner of
 972          * {@link BufferedReader#readLine()}.
 973          *
 974          * @apiNote This method can be used as an adapter between {@code
 975          * BodySubscriber} and {@code Flow.Subscriber}.
 976          *
 977          * @implNote This is equivalent to calling <pre>{@code
 978          *      fromLineSubscriber(subscriber, s -> null, StandardCharsets.UTF_8, null)
 979          * }</pre>
 980          *
 981          * @param subscriber the subscriber
 982          * @return a body subscriber
 983          */
 984         public static BodySubscriber<Void>
 985         fromLineSubscriber(Subscriber<? super String> subscriber) {
 986             return fromLineSubscriber(subscriber,  s -> null,
 987                     StandardCharsets.UTF_8, null);
 988         }
 989 
 990         /**
 991          * Returns a body subscriber that forwards all response body to the
 992          * given {@code Flow.Subscriber}, line by line. The {@linkplain
 993          * BodySubscriber#getBody() completion stage} of the returned body
 994          * subscriber completes after one of the given subscribers
 995          * {@code onComplete} or {@code onError} has been invoked.
 996          *
 997          * <p> The given {@code finisher} function is applied after the given
 998          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
 999          * function is invoked with the given subscriber, and returns a value
1000          * that is set as the response's body.
1001          *
1002          * @apiNote This method can be used as an adapter between {@code
1003          * BodySubscriber} and {@code Flow.Subscriber}.
1004          *
1005          * @param <S> the type of the Subscriber
1006          * @param <T> the type of the response body
1007          * @param subscriber the subscriber
1008          * @param finisher a function to be applied after the subscriber has
1009          *                 completed
1010          * @param charset a {@link Charset} to decode the bytes
1011          * @param lineSeparator an optional line separator: can be {@code null},
1012          *                      in which case lines will be delimited in the manner of
1013          *                      {@link BufferedReader#readLine()}.
1014          * @return a body subscriber
1015          * @throws IllegalArgumentException if the supplied {@code lineSeparator}
1016          *         is the empty string
1017          */
1018         public static <S extends Subscriber<? super String>,T> BodySubscriber<T>
1019         fromLineSubscriber(S subscriber,
1020                            Function<? super S,? extends T> finisher,
1021                            Charset charset,
1022                            String lineSeparator) {
1023             return LineSubscriberAdapter.create(subscriber,
1024                     finisher, charset, lineSeparator);
1025         }
1026 
1027         /**
1028          * Returns a body subscriber which stores the response body as a {@code
1029          * String} converted using the given {@code Charset}.
1030          *
1031          * <p> The {@link HttpResponse} using this subscriber is available after
1032          * the entire response has been read.
1033          *
1034          * @param charset the character set to convert the String with
1035          * @return a body subscriber
1036          */
1037         public static BodySubscriber<String> ofString(Charset charset) {
1038             Objects.requireNonNull(charset);
1039             return new ResponseSubscribers.ByteArraySubscriber<>(
1040                     bytes -> new String(bytes, charset)
1041             );
1042         }
1043 
1044         /**
1045          * Returns a {@code BodySubscriber} which stores the response body as a
1046          * byte array.
1047          *
1048          * <p> The {@link HttpResponse} using this subscriber is available after
1049          * the entire response has been read.
1050          *
1051          * @return a body subscriber
1052          */
1053         public static BodySubscriber<byte[]> ofByteArray() {
1054             return new ResponseSubscribers.ByteArraySubscriber<>(
1055                     Function.identity() // no conversion
1056             );
1057         }
1058 
1059         /**
1060          * Returns a {@code BodySubscriber} which stores the response body in a
1061          * file opened with the given options and name. The file will be opened
1062          * with the given options using {@link FileChannel#open(Path,OpenOption...)
1063          * FileChannel.open} just before the body is read. Any exception thrown
1064          * will be returned or thrown from {@link HttpClient#send(HttpRequest,
1065          * BodyHandler) HttpClient::send} or {@link HttpClient#sendAsync(HttpRequest,
1066          * BodyHandler) HttpClient::sendAsync} as appropriate.
1067          *
1068          * <p> The {@link HttpResponse} using this subscriber is available after
1069          * the entire response has been read.
1070          *
1071          * <p> Security manager permission checks are performed in this factory
1072          * method, when the {@code BodySubscriber} is created. Care must be taken
1073          * that the {@code BodyHandler} is not shared with untrusted code.
1074          *
1075          * @param file the file to store the body in
1076          * @param openOptions the list of options to open the file with
1077          * @return a body subscriber
1078          * @throws IllegalArgumentException if an invalid set of open options
1079          *          are specified
1080          * @throws SecurityException if a security manager has been installed
1081          *          and it denies {@linkplain SecurityManager#checkWrite(String)
1082          *          write access} to the file
1083          */
1084         public static BodySubscriber<Path> ofFile(Path file, OpenOption... openOptions) {
1085             Objects.requireNonNull(file);
1086             List<OpenOption> opts = List.of(openOptions);
1087             if (opts.contains(DELETE_ON_CLOSE) || opts.contains(READ)) {
1088                 // these options make no sense, since the FileChannel is not exposed
1089                 throw new IllegalArgumentException("invalid openOptions: " + opts);
1090             }
1091             return PathSubscriber.create(file, opts);
1092         }
1093 
1094         /**
1095          * Returns a {@code BodySubscriber} which stores the response body in a
1096          * file opened with the given name.
1097          *
1098          * <p> Equivalent to: {@code ofFile(file, CREATE, WRITE)}
1099          *
1100          * <p> Security manager permission checks are performed in this factory
1101          * method, when the {@code BodySubscriber} is created. Care must be taken
1102          * that the {@code BodyHandler} is not shared with untrusted code.
1103          *
1104          * @param file the file to store the body in
1105          * @return a body subscriber
1106          * @throws SecurityException if a security manager has been installed
1107          *          and it denies {@linkplain SecurityManager#checkWrite(String)
1108          *          write access} to the file
1109          */
1110         public static BodySubscriber<Path> ofFile(Path file) {
1111             return ofFile(file, CREATE, WRITE);
1112         }
1113 
1114         /**
1115          * Returns a {@code BodySubscriber} which provides the incoming body
1116          * data to the provided Consumer of {@code Optional<byte[]>}. Each
1117          * call to {@link Consumer#accept(java.lang.Object) Consumer.accept()}
1118          * will contain a non empty {@code Optional}, except for the final
1119          * invocation after all body data has been read, when the {@code
1120          * Optional} will be empty.
1121          *
1122          * <p> The {@link HttpResponse} using this subscriber is available after
1123          * the entire response has been read.
1124          *
1125          * @apiNote
1126          * This subscriber is not flow controlled.
1127          * Therefore, the supplied consumer must be able to process whatever
1128          * amount of data is delivered in a timely fashion.
1129          *
1130          * @param consumer a Consumer of byte arrays
1131          * @return a BodySubscriber
1132          */
1133         public static BodySubscriber<Void>
1134         ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) {
1135             return new ResponseSubscribers.ConsumerSubscriber(consumer);
1136         }
1137 
1138         /**
1139          * Returns a {@code BodySubscriber} which streams the response body as
1140          * an {@link InputStream}.
1141          *
1142          * <p> The {@link HttpResponse} using this subscriber is available
1143          * immediately after the response headers have been read, without
1144          * requiring to wait for the entire body to be processed. The response
1145          * body can then be read directly from the {@link InputStream}.
1146          *
1147          * @apiNote To ensure that all resources associated with the
1148          * corresponding exchange are properly released the caller must
1149          * ensure to either read all bytes until EOF is reached, or call
1150          * {@link InputStream#close} if it is unable or unwilling to do so.
1151          * Calling {@code close} before exhausting the stream may cause
1152          * the underlying HTTP connection to be closed and prevent it
1153          * from being reused for subsequent operations.
1154          *
1155          * @return a body subscriber that streams the response body as an
1156          *         {@link InputStream}.
1157          */
1158         public static BodySubscriber<InputStream> ofInputStream() {
1159             return new ResponseSubscribers.HttpResponseInputStream();
1160         }
1161 
1162         /**
1163          * Returns a {@code BodySubscriber} which streams the response body as
1164          * a {@link Stream Stream}{@code <String>}, where each string in the stream
1165          * corresponds to a line as defined by {@link BufferedReader#lines()}.
1166          *
1167          * <p> The {@link HttpResponse} using this subscriber is available
1168          * immediately after the response headers have been read, without
1169          * requiring to wait for the entire body to be processed. The response
1170          * body can then be read directly from the {@link Stream}.
1171          *
1172          * @apiNote To ensure that all resources associated with the
1173          * corresponding exchange are properly released the caller must
1174          * ensure to either read all lines until the stream is exhausted,
1175          * or call {@link Stream#close} if it is unable or unwilling to do so.
1176          * Calling {@code close} before exhausting the stream may cause
1177          * the underlying HTTP connection to be closed and prevent it
1178          * from being reused for subsequent operations.
1179          *
1180          * @param charset the character set to use when converting bytes to characters
1181          * @return a body subscriber that streams the response body as a
1182          *         {@link Stream Stream}{@code <String>}.
1183          *
1184          * @see BufferedReader#lines()
1185          */
1186         public static BodySubscriber<Stream<String>> ofLines(Charset charset) {
1187             return ResponseSubscribers.createLineStream(charset);
1188         }
1189 
1190         /**
1191          * Returns a response subscriber which publishes the response body
1192          * through a {@code Publisher<List<ByteBuffer>>}.
1193          *
1194          * <p> The {@link HttpResponse} using this subscriber is available
1195          * immediately after the response headers have been read, without
1196          * requiring to wait for the entire body to be processed. The response
1197          * body bytes can then be obtained by subscribing to the publisher
1198          * returned by the {@code HttpResponse} {@link HttpResponse#body() body}
1199          * method.
1200          *
1201          * <p>The publisher returned by the {@link HttpResponse#body() body}
1202          * method can be subscribed to only once. The first subscriber will
1203          * receive the body response bytes if successfully subscribed, or will
1204          * cause the subscription to be cancelled otherwise.
1205          * If more subscriptions are attempted, the subsequent subscribers will
1206          * be immediately subscribed with an empty subscription and their
1207          * {@link Subscriber#onError(Throwable) onError} method
1208          * will be invoked with an {@code IllegalStateException}.
1209          *
1210          * @apiNote To ensure that all resources associated with the
1211          * corresponding exchange are properly released the caller must
1212          * ensure that the provided publisher is subscribed once, and either
1213          * {@linkplain Subscription#request(long) requests} all bytes
1214          * until {@link Subscriber#onComplete() onComplete} or
1215          * {@link Subscriber#onError(Throwable) onError} are invoked, or
1216          * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
1217          * subscription} if it is unable or unwilling to do so.
1218          * Note that depending on the actual HTTP protocol {@linkplain
1219          * HttpClient.Version version} used for the exchange, cancelling the
1220          * subscription instead of exhausting the flow may cause the underlying
1221          * HTTP connection to be closed and prevent it from being reused for
1222          * subsequent operations.
1223          *
1224          * @return A {@code BodySubscriber} which publishes the response body
1225          *         through a {@code Publisher<List<ByteBuffer>>}.
1226          */
1227         public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
1228             return ResponseSubscribers.createPublisher();
1229         }
1230 
1231         /**
1232          * Returns a response subscriber which discards the response body. The
1233          * supplied value is the value that will be returned from
1234          * {@link HttpResponse#body()}.
1235          *
1236          * @param <U> the type of the response body
1237          * @param value the value to return from HttpResponse.body(), may be {@code null}
1238          * @return a {@code BodySubscriber}
1239          */
1240         public static <U> BodySubscriber<U> replacing(U value) {
1241             return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value));
1242         }
1243 
1244         /**
1245          * Returns a response subscriber which discards the response body.
1246          *
1247          * @return a response body subscriber
1248          */
1249         public static BodySubscriber<Void> discarding() {
1250             return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(null));
1251         }
1252 
1253         /**
1254          * Returns a {@code BodySubscriber} which buffers data before delivering
1255          * it to the given downstream subscriber. The subscriber guarantees to
1256          * deliver {@code bufferSize} bytes of data to each invocation of the
1257          * downstream's {@link BodySubscriber#onNext(Object) onNext} method,
1258          * except for the final invocation, just before
1259          * {@link BodySubscriber#onComplete() onComplete} is invoked. The final
1260          * invocation of {@code onNext} may contain fewer than {@code bufferSize}
1261          * bytes.
1262          *
1263          * <p> The returned subscriber delegates its {@link BodySubscriber#getBody()
1264          * getBody()} method to the downstream subscriber.
1265          *
1266          * @param <T> the type of the response body
1267          * @param downstream the downstream subscriber
1268          * @param bufferSize the buffer size
1269          * @return a buffering body subscriber
1270          * @throws IllegalArgumentException if {@code bufferSize <= 0}
1271          */
1272          public static <T> BodySubscriber<T> buffering(BodySubscriber<T> downstream,
1273                                                        int bufferSize) {
1274              if (bufferSize <= 0)
1275                  throw new IllegalArgumentException("must be greater than 0");
1276              return new BufferingSubscriber<>(downstream, bufferSize);
1277          }
1278 
1279         /**
1280          * Returns a {@code BodySubscriber} whose response body value is that of
1281          * the result of applying the given function to the body object of the
1282          * given {@code upstream} {@code BodySubscriber}.
1283          *
1284          * <p> The mapping function is executed using the client's {@linkplain
1285          * HttpClient#executor() executor}, and can therefore be used to map any
1286          * response body type, including blocking {@link InputStream}.
1287          * However, performing any blocking operation in the mapper function
1288          * runs the risk of blocking the executor's thread for an unknown
1289          * amount of time (at least until the blocking operation finishes),
1290          * which may end up starving the executor of available threads.
1291          * Therefore, in the case where mapping to the desired type might
1292          * block (e.g. by reading on the {@code InputStream}), then mapping
1293          * to a {@link java.util.function.Supplier Supplier} of the desired
1294          * type and deferring the blocking operation until {@link Supplier#get()
1295          * Supplier::get} is invoked by the caller's thread should be preferred,
1296          * as shown in the following example which uses a well-known JSON parser to
1297          * convert an {@code InputStream} into any annotated Java type.
1298          *
1299          * <p>For example:
1300          * <pre> {@code  public static <W> BodySubscriber<Supplier<W>> asJSON(Class<W> targetType) {
1301          *     BodySubscriber<InputStream> upstream = BodySubscribers.ofInputStream();
1302          *
1303          *     BodySubscriber<Supplier<W>> downstream = BodySubscribers.mapping(
1304          *           upstream,
1305          *           (InputStream is) -> () -> {
1306          *               try (InputStream stream = is) {
1307          *                   ObjectMapper objectMapper = new ObjectMapper();
1308          *                   return objectMapper.readValue(stream, targetType);
1309          *               } catch (IOException e) {
1310          *                   throw new UncheckedIOException(e);
1311          *               }
1312          *           });
1313          *    return downstream;
1314          *  } }</pre>
1315          *
1316          * @param <T> the upstream body type
1317          * @param <U> the type of the body subscriber returned
1318          * @param upstream the body subscriber to be mapped
1319          * @param mapper the mapping function
1320          * @return a mapping body subscriber
1321          */
1322         public static <T,U> BodySubscriber<U> mapping(BodySubscriber<T> upstream,
1323                                                       Function<? super T, ? extends U> mapper)
1324         {
1325             return new ResponseSubscribers.MappingSubscriber<>(upstream, mapper);
1326         }
1327     }
1328 }