1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @bug 8201186
 27  * @summary Tests an asynchronous BodySubscriber that completes
 28  *          immediately with a Publisher<List<ByteBuffer>>
 29  * @library /test/lib /test/jdk/java/net/httpclient/lib
 30  * @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters
 31  * @run testng/othervm ResponsePublisher
 32  */
 33 
 34 import com.sun.net.httpserver.HttpExchange;
 35 import com.sun.net.httpserver.HttpHandler;
 36 import com.sun.net.httpserver.HttpServer;
 37 import com.sun.net.httpserver.HttpsConfigurator;
 38 import com.sun.net.httpserver.HttpsServer;
 39 import jdk.test.lib.net.SimpleSSLContext;
 40 import org.testng.annotations.AfterTest;
 41 import org.testng.annotations.BeforeTest;
 42 import org.testng.annotations.DataProvider;
 43 import org.testng.annotations.Test;
 44 
 45 import javax.net.ssl.SSLContext;
 46 import java.io.IOException;
 47 import java.io.InputStream;
 48 import java.io.OutputStream;
 49 import java.net.InetAddress;
 50 import java.net.InetSocketAddress;
 51 import java.net.URI;
 52 import java.net.http.HttpClient;
 53 import java.net.http.HttpHeaders;
 54 import java.net.http.HttpRequest;
 55 import java.net.http.HttpResponse;
 56 import java.net.http.HttpResponse.BodyHandler;
 57 import java.net.http.HttpResponse.BodyHandlers;
 58 import java.net.http.HttpResponse.BodySubscriber;
 59 import java.net.http.HttpResponse.BodySubscribers;
 60 import java.nio.ByteBuffer;
 61 import java.util.List;
 62 import java.util.Objects;
 63 import java.util.concurrent.CompletableFuture;
 64 import java.util.concurrent.CompletionException;
 65 import java.util.concurrent.CompletionStage;
 66 import java.util.concurrent.Executor;
 67 import java.util.concurrent.Executors;
 68 import java.util.concurrent.Flow;
 69 import java.util.concurrent.Flow.Publisher;
 70 import java.util.concurrent.atomic.AtomicReference;
 71 import java.util.function.Supplier;
 72 import jdk.httpclient.test.lib.common.HttpServerAdapters;
 73 import jdk.httpclient.test.lib.http2.Http2TestServer;
 74 
 75 import static java.lang.System.out;
 76 import static java.net.http.HttpClient.Version.HTTP_1_1;
 77 import static java.net.http.HttpClient.Version.HTTP_2;
 78 import static java.nio.charset.StandardCharsets.UTF_8;
 79 import static org.testng.Assert.assertEquals;
 80 import static org.testng.Assert.assertNotNull;
 81 import static org.testng.Assert.assertTrue;
 82 
 83 public class ResponsePublisher implements HttpServerAdapters {
 84 
 85     SSLContext sslContext;
 86     HttpTestServer httpTestServer;    // HTTP/1.1    [ 4 servers ]
 87     HttpTestServer httpsTestServer;   // HTTPS/1.1
 88     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
 89     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
 90     String httpURI_fixed;
 91     String httpURI_chunk;
 92     String httpsURI_fixed;
 93     String httpsURI_chunk;
 94     String http2URI_fixed;
 95     String http2URI_chunk;
 96     String https2URI_fixed;
 97     String https2URI_chunk;
 98 
 99     static final int ITERATION_COUNT = 3;
100     // a shared executor helps reduce the amount of threads created by the test
101     static final Executor executor = Executors.newCachedThreadPool();
102 
103     interface BHS extends Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> {
104         static BHS of(BHS impl, String name) {
105             return new BHSImpl(impl, name);
106         }
107     }
108 
109     static final class BHSImpl implements BHS {
110         final BHS supplier;
111         final String name;
112         BHSImpl(BHS impl, String name) {
113             this.supplier = impl;
114             this.name = name;
115         }
116         @Override
117         public String toString() {
118             return name;
119         }
120 
121         @Override
122         public BodyHandler<Publisher<List<ByteBuffer>>> get() {
123             return supplier.get();
124         }
125     }
126 
127     static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_API =
128             BHS.of(BodyHandlers::ofPublisher, "BodyHandlers::ofPublisher");
129     static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_TEST =
130             BHS.of(PublishingBodyHandler::new, "PublishingBodyHandler::new");
131 
132     @DataProvider(name = "variants")
133     public Object[][] variants() {
134         return new Object[][]{
135                 { httpURI_fixed,    false, OF_PUBLISHER_API },
136                 { httpURI_chunk,    false, OF_PUBLISHER_API },
137                 { httpsURI_fixed,   false, OF_PUBLISHER_API },
138                 { httpsURI_chunk,   false, OF_PUBLISHER_API },
139                 { http2URI_fixed,   false, OF_PUBLISHER_API },
140                 { http2URI_chunk,   false, OF_PUBLISHER_API },
141                 { https2URI_fixed,  false, OF_PUBLISHER_API },
142                 { https2URI_chunk,  false, OF_PUBLISHER_API },
143 
144                 { httpURI_fixed,    true, OF_PUBLISHER_API },
145                 { httpURI_chunk,    true, OF_PUBLISHER_API },
146                 { httpsURI_fixed,   true, OF_PUBLISHER_API },
147                 { httpsURI_chunk,   true, OF_PUBLISHER_API },
148                 { http2URI_fixed,   true, OF_PUBLISHER_API },
149                 { http2URI_chunk,   true, OF_PUBLISHER_API },
150                 { https2URI_fixed,  true, OF_PUBLISHER_API },
151                 { https2URI_chunk,  true, OF_PUBLISHER_API },
152 
153                 { httpURI_fixed,    false, OF_PUBLISHER_TEST },
154                 { httpURI_chunk,    false, OF_PUBLISHER_TEST },
155                 { httpsURI_fixed,   false, OF_PUBLISHER_TEST },
156                 { httpsURI_chunk,   false, OF_PUBLISHER_TEST },
157                 { http2URI_fixed,   false, OF_PUBLISHER_TEST },
158                 { http2URI_chunk,   false, OF_PUBLISHER_TEST },
159                 { https2URI_fixed,  false, OF_PUBLISHER_TEST },
160                 { https2URI_chunk,  false, OF_PUBLISHER_TEST },
161 
162                 { httpURI_fixed,    true, OF_PUBLISHER_TEST },
163                 { httpURI_chunk,    true, OF_PUBLISHER_TEST },
164                 { httpsURI_fixed,   true, OF_PUBLISHER_TEST },
165                 { httpsURI_chunk,   true, OF_PUBLISHER_TEST },
166                 { http2URI_fixed,   true, OF_PUBLISHER_TEST },
167                 { http2URI_chunk,   true, OF_PUBLISHER_TEST },
168                 { https2URI_fixed,  true, OF_PUBLISHER_TEST },
169                 { https2URI_chunk,  true, OF_PUBLISHER_TEST },
170         };
171     }
172 
173     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
174     HttpClient newHttpClient() {
175         return TRACKER.track(HttpClient.newBuilder()
176                          .executor(executor)
177                          .sslContext(sslContext)
178                          .build());
179     }
180 
181     @Test(dataProvider = "variants")
182     public void testExceptions(String uri, boolean sameClient, BHS handlers) throws Exception {
183         HttpClient client = null;
184         for (int i=0; i< ITERATION_COUNT; i++) {
185             if (!sameClient || client == null)
186                 client = newHttpClient();
187 
188             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
189                     .build();
190             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
191             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
192             try {
193                 response.body().subscribe(null);
194                 throw new RuntimeException("Expected NPE not thrown");
195             } catch (NullPointerException x) {
196                 System.out.println("Got expected NPE: " + x);
197             }
198             // We can reuse our BodySubscribers implementations to subscribe to the
199             // Publisher<List<ByteBuffer>>
200             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
201             response.body().subscribe(ofString);
202 
203             BodySubscriber<String> ofString2 = BodySubscribers.ofString(UTF_8);
204             response.body().subscribe(ofString2);
205             try {
206                 ofString2.getBody().toCompletableFuture().join();
207                 throw new RuntimeException("Expected ISE not thrown");
208             } catch (CompletionException x) {
209                 Throwable cause = x.getCause();
210                 if (cause instanceof  IllegalStateException) {
211                     System.out.println("Got expected ISE: " + cause);
212                 } else {
213                     throw x;
214                 }
215             }
216             // Get the final result and compare it with the expected body
217             String body = ofString.getBody().toCompletableFuture().get();
218             assertEquals(body, "");
219         }
220     }
221 
222     @Test(dataProvider = "variants")
223     public void testNoBody(String uri, boolean sameClient, BHS handlers) throws Exception {
224         HttpClient client = null;
225         for (int i=0; i< ITERATION_COUNT; i++) {
226             if (!sameClient || client == null)
227                 client = newHttpClient();
228 
229             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
230                     .build();
231             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
232             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
233             // We can reuse our BodySubscribers implementations to subscribe to the
234             // Publisher<List<ByteBuffer>>
235             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
236             // get the Publisher<List<ByteBuffer>> and
237             // subscribe to it.
238             response.body().subscribe(ofString);
239             // Get the final result and compare it with the expected body
240             String body = ofString.getBody().toCompletableFuture().get();
241             assertEquals(body, "");
242         }
243     }
244 
245     @Test(dataProvider = "variants")
246     public void testNoBodyAsync(String uri, boolean sameClient, BHS handlers) throws Exception {
247         HttpClient client = null;
248         for (int i=0; i< ITERATION_COUNT; i++) {
249             if (!sameClient || client == null)
250                 client = newHttpClient();
251 
252             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
253                     .build();
254             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
255             // We can reuse our BodySubscribers implementations to subscribe to the
256             // Publisher<List<ByteBuffer>>
257             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
258             CompletableFuture<String> result =
259                     client.sendAsync(req, handler).thenCompose(
260                             (responsePublisher) -> {
261                                 // get the Publisher<List<ByteBuffer>> and
262                                 // subscribe to it.
263                                 responsePublisher.body().subscribe(ofString);
264                                 return ofString.getBody();
265                             });
266             // Get the final result and compare it with the expected body
267             assertEquals(result.get(), "");
268         }
269     }
270 
271     @Test(dataProvider = "variants")
272     public void testAsString(String uri, boolean sameClient, BHS handlers) throws Exception {
273         HttpClient client = null;
274         for (int i=0; i< ITERATION_COUNT; i++) {
275             if (!sameClient || client == null)
276                 client = newHttpClient();
277 
278             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
279                     .build();
280             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
281             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
282             // We can reuse our BodySubscribers implementations to subscribe to the
283             // Publisher<List<ByteBuffer>>
284             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
285             // get the Publisher<List<ByteBuffer>> and
286             // subscribe to it.
287             response.body().subscribe(ofString);
288             // Get the final result and compare it with the expected body
289             String body = ofString.getBody().toCompletableFuture().get();
290             assertEquals(body, WITH_BODY);
291         }
292     }
293 
294     @Test(dataProvider = "variants")
295     public void testAsStringAsync(String uri, boolean sameClient, BHS handlers) throws Exception {
296         HttpClient client = null;
297         for (int i=0; i< ITERATION_COUNT; i++) {
298             if (!sameClient || client == null)
299                 client = newHttpClient();
300 
301             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
302                     .build();
303             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
304             // We can reuse our BodySubscribers implementations to subscribe to the
305             // Publisher<List<ByteBuffer>>
306             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
307             CompletableFuture<String> result = client.sendAsync(req, handler)
308                     .thenCompose((responsePublisher) -> {
309                         // get the Publisher<List<ByteBuffer>> and
310                         // subscribe to it.
311                         responsePublisher.body().subscribe(ofString);
312                         return ofString.getBody();
313                     });
314             // Get the final result and compare it with the expected body
315             String body = result.get();
316             assertEquals(body, WITH_BODY);
317         }
318     }
319 
320     // A BodyHandler that returns PublishingBodySubscriber instances
321     static class PublishingBodyHandler implements BodyHandler<Publisher<List<ByteBuffer>>> {
322         @Override
323         public BodySubscriber<Publisher<List<ByteBuffer>>> apply(HttpResponse.ResponseInfo rinfo) {
324             assertEquals(rinfo.statusCode(), 200);
325             return new PublishingBodySubscriber();
326         }
327     }
328 
329     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
330     static class PublishingBodySubscriber implements BodySubscriber<Publisher<List<ByteBuffer>>> {
331         private final CompletableFuture<Flow.Subscription> subscriptionCF = new CompletableFuture<>();
332         private final CompletableFuture<Flow.Subscriber<? super List<ByteBuffer>>> subscribedCF = new CompletableFuture<>();
333         private AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
334         private final CompletionStage<Publisher<List<ByteBuffer>>> body =
335                 subscriptionCF.thenCompose((s) -> CompletableFuture.completedStage(this::subscribe));
336                 //CompletableFuture.completedStage(this::subscribe);
337 
338         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
339             Objects.requireNonNull(subscriber, "subscriber must not be null");
340             if (subscriberRef.compareAndSet(null, subscriber)) {
341                 subscriptionCF.thenAccept((s) -> {
342                     subscriber.onSubscribe(s);
343                     subscribedCF.complete(subscriber);
344                 });
345             } else {
346                 subscriber.onSubscribe(new Flow.Subscription() {
347                     @Override public void request(long n) { }
348                     @Override public void cancel() { }
349                 });
350                 subscriber.onError(
351                         new IllegalStateException("This publisher has already one subscriber"));
352             }
353         }
354 
355         @Override
356         public void onSubscribe(Flow.Subscription subscription) {
357             subscriptionCF.complete(subscription);
358         }
359 
360         @Override
361         public void onNext(List<ByteBuffer> item) {
362             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
363             Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
364             assert subscriber != null; // cannot be called before subscriber calls request(1)
365             subscriber.onNext(item);
366         }
367 
368         @Override
369         public void onError(Throwable throwable) {
370             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
371             // onError can be called before request(1), and therefore can
372             // be called before subscriberRef is set.
373             subscribedCF.thenAccept(s -> s.onError(throwable));
374         }
375 
376         @Override
377         public void onComplete() {
378             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
379             // onComplete can be called before request(1), and therefore can
380             // be called before subscriberRef is set.
381             subscribedCF.thenAccept(s -> s.onComplete());
382         }
383 
384         @Override
385         public CompletionStage<Publisher<List<ByteBuffer>>> getBody() {
386             return body;
387         }
388     }
389 
390     static String serverAuthority(HttpServer server) {
391         return InetAddress.getLoopbackAddress().getHostName() + ":"
392                 + server.getAddress().getPort();
393     }
394 
395     @BeforeTest
396     public void setup() throws Exception {
397         sslContext = new SimpleSSLContext().get();
398         if (sslContext == null)
399             throw new AssertionError("Unexpected null sslContext");
400 
401         // HTTP/1.1
402         HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
403         HttpTestHandler h1_chunkHandler = new HTTP_VariableLengthHandler();
404         httpTestServer = HttpTestServer.create(HTTP_1_1);
405         httpTestServer.addHandler( h1_fixedLengthHandler, "/http1/fixed");
406         httpTestServer.addHandler(h1_chunkHandler,"/http1/chunk");
407         httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed";
408         httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk";
409 
410         httpsTestServer = HttpTestServer.create(HTTP_1_1, sslContext);
411         httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
412         httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
413         httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed";
414         httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk";
415 
416         // HTTP/2
417         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
418         HttpTestHandler h2_chunkedHandler = new HTTP_VariableLengthHandler();
419 
420         http2TestServer = HttpTestServer.create(HTTP_2);
421         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
422         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
423         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
424         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
425 
426         https2TestServer = HttpTestServer.create(HTTP_2, sslContext);
427         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
428         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
429         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
430         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
431 
432         httpTestServer.start();
433         httpsTestServer.start();
434         http2TestServer.start();
435         https2TestServer.start();
436     }
437 
438     @AfterTest
439     public void teardown() throws Exception {
440         Thread.sleep(100);
441         AssertionError fail = TRACKER.check(500);
442         try {
443             httpTestServer.stop();
444             httpsTestServer.stop();
445             http2TestServer.stop();
446             https2TestServer.stop();
447         } finally {
448             if (fail != null) {
449                 throw fail;
450             }
451         }
452     }
453 
454     static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" +
455             " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" +
456             " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" +
457             " exercitation ullamco laboris nisi ut aliquip ex ea" +
458             " commodo consequat. Duis aute irure dolor in reprehenderit in " +
459             "voluptate velit esse cillum dolore eu fugiat nulla pariatur." +
460             " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" +
461             " officia deserunt mollit anim id est laborum.";
462 
463     static class HTTP_FixedLengthHandler implements HttpTestHandler {
464         @Override
465         public void handle(HttpTestExchange t) throws IOException {
466             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
467             try (InputStream is = t.getRequestBody()) {
468                 is.readAllBytes();
469             }
470             if (t.getRequestURI().getPath().endsWith("/withBody")) {
471                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
472                 t.sendResponseHeaders(200, bytes.length);  // body
473                 try (OutputStream os = t.getResponseBody()) {
474                     os.write(bytes);
475                 }
476             } else {
477                 t.sendResponseHeaders(200, 0);  //no body
478             }
479         }
480     }
481 
482     static class HTTP_VariableLengthHandler implements HttpTestHandler {
483         @Override
484         public void handle(HttpTestExchange t) throws IOException {
485             out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI());
486             try (InputStream is = t.getRequestBody()) {
487                 is.readAllBytes();
488             }
489             t.sendResponseHeaders(200, -1);  //chunked or variable
490             if (t.getRequestURI().getPath().endsWith("/withBody")) {
491                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
492                 try (OutputStream os = t.getResponseBody()) {
493                     int chunkLen = bytes.length/10;
494                     if (chunkLen == 0) {
495                         os.write(bytes);
496                     } else {
497                         int count = 0;
498                         for (int i=0; i<10; i++) {
499                             os.write(bytes, count, chunkLen);
500                             os.flush();
501                             count += chunkLen;
502                         }
503                         os.write(bytes, count, bytes.length % chunkLen);
504                         count += bytes.length % chunkLen;
505                         assert count == bytes.length;
506                     }
507                 }
508             } else {
509                 t.getResponseBody().close();   // no body
510             }
511         }
512     }
513 }