1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @bug 8201186
 27  * @summary Tests an asynchronous BodySubscriber that completes
 28  *          immediately with a Publisher<List<ByteBuffer>>
 29  * @library /test/lib /test/jdk/java/net/httpclient/lib
 30  * @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters
 31  * @run testng/othervm ResponsePublisher
 32  */
 33 
 34 import com.sun.net.httpserver.HttpExchange;
 35 import com.sun.net.httpserver.HttpHandler;
 36 import com.sun.net.httpserver.HttpServer;
 37 import com.sun.net.httpserver.HttpsConfigurator;
 38 import com.sun.net.httpserver.HttpsServer;
 39 import jdk.internal.net.http.common.OperationTrackers;
 40 import jdk.test.lib.net.SimpleSSLContext;
 41 import org.testng.annotations.AfterTest;
 42 import org.testng.annotations.BeforeTest;
 43 import org.testng.annotations.DataProvider;
 44 import org.testng.annotations.Test;
 45 
 46 import javax.net.ssl.SSLContext;
 47 import java.io.IOException;
 48 import java.io.InputStream;
 49 import java.io.OutputStream;
 50 import java.net.InetAddress;
 51 import java.net.InetSocketAddress;
 52 import java.net.URI;
 53 import java.net.http.HttpClient;
 54 import java.net.http.HttpHeaders;
 55 import java.net.http.HttpRequest;
 56 import java.net.http.HttpResponse;
 57 import java.net.http.HttpResponse.BodyHandler;
 58 import java.net.http.HttpResponse.BodyHandlers;
 59 import java.net.http.HttpResponse.BodySubscriber;
 60 import java.net.http.HttpResponse.BodySubscribers;
 61 import java.nio.ByteBuffer;
 62 import java.util.List;
 63 import java.util.Objects;
 64 import java.util.concurrent.CompletableFuture;
 65 import java.util.concurrent.CompletionException;
 66 import java.util.concurrent.CompletionStage;
 67 import java.util.concurrent.Executor;
 68 import java.util.concurrent.Executors;
 69 import java.util.concurrent.Flow;
 70 import java.util.concurrent.Flow.Publisher;
 71 import java.util.concurrent.atomic.AtomicReference;
 72 import java.util.function.Supplier;
 73 import jdk.httpclient.test.lib.common.HttpServerAdapters;
 74 import jdk.httpclient.test.lib.http2.Http2TestServer;
 75 
 76 import static java.lang.System.out;
 77 import static java.net.http.HttpClient.Version.HTTP_1_1;
 78 import static java.net.http.HttpClient.Version.HTTP_2;
 79 import static java.nio.charset.StandardCharsets.UTF_8;
 80 import static org.testng.Assert.assertEquals;
 81 import static org.testng.Assert.assertNotNull;
 82 import static org.testng.Assert.assertTrue;
 83 
 84 public class ResponsePublisher implements HttpServerAdapters {
 85 
 86     SSLContext sslContext;
 87     HttpTestServer httpTestServer;    // HTTP/1.1    [ 4 servers ]
 88     HttpTestServer httpsTestServer;   // HTTPS/1.1
 89     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
 90     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
 91     String httpURI_fixed;
 92     String httpURI_chunk;
 93     String httpsURI_fixed;
 94     String httpsURI_chunk;
 95     String http2URI_fixed;
 96     String http2URI_chunk;
 97     String https2URI_fixed;
 98     String https2URI_chunk;
 99 
100     static final int ITERATION_COUNT = 3;
101     // a shared executor helps reduce the amount of threads created by the test
102     static final Executor executor = Executors.newCachedThreadPool();
103 
104     static final long start = System.nanoTime();
105 
106     public static String now() {
107         long now = System.nanoTime() - start;
108         long secs = now / 1000_000_000;
109         long mill = (now % 1000_000_000) / 1000_000;
110         long nan = now % 1000_000;
111         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
112     }
113 
114     interface BHS extends Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> {
115         static BHS of(BHS impl, String name) {
116             return new BHSImpl(impl, name);
117         }
118     }
119 
120     static final class BHSImpl implements BHS {
121         final BHS supplier;
122         final String name;
123         BHSImpl(BHS impl, String name) {
124             this.supplier = impl;
125             this.name = name;
126         }
127         @Override
128         public String toString() {
129             return name;
130         }
131 
132         @Override
133         public BodyHandler<Publisher<List<ByteBuffer>>> get() {
134             return supplier.get();
135         }
136     }
137 
138     static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_API =
139             BHS.of(BodyHandlers::ofPublisher, "BodyHandlers::ofPublisher");
140     static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_TEST =
141             BHS.of(PublishingBodyHandler::new, "PublishingBodyHandler::new");
142 
143     @DataProvider(name = "variants")
144     public Object[][] variants() {
145         return new Object[][]{
146                 { httpURI_fixed,    false, OF_PUBLISHER_API },
147                 { httpURI_chunk,    false, OF_PUBLISHER_API },
148                 { httpsURI_fixed,   false, OF_PUBLISHER_API },
149                 { httpsURI_chunk,   false, OF_PUBLISHER_API },
150                 { http2URI_fixed,   false, OF_PUBLISHER_API },
151                 { http2URI_chunk,   false, OF_PUBLISHER_API },
152                 { https2URI_fixed,  false, OF_PUBLISHER_API },
153                 { https2URI_chunk,  false, OF_PUBLISHER_API },
154 
155                 { httpURI_fixed,    true, OF_PUBLISHER_API },
156                 { httpURI_chunk,    true, OF_PUBLISHER_API },
157                 { httpsURI_fixed,   true, OF_PUBLISHER_API },
158                 { httpsURI_chunk,   true, OF_PUBLISHER_API },
159                 { http2URI_fixed,   true, OF_PUBLISHER_API },
160                 { http2URI_chunk,   true, OF_PUBLISHER_API },
161                 { https2URI_fixed,  true, OF_PUBLISHER_API },
162                 { https2URI_chunk,  true, OF_PUBLISHER_API },
163 
164                 { httpURI_fixed,    false, OF_PUBLISHER_TEST },
165                 { httpURI_chunk,    false, OF_PUBLISHER_TEST },
166                 { httpsURI_fixed,   false, OF_PUBLISHER_TEST },
167                 { httpsURI_chunk,   false, OF_PUBLISHER_TEST },
168                 { http2URI_fixed,   false, OF_PUBLISHER_TEST },
169                 { http2URI_chunk,   false, OF_PUBLISHER_TEST },
170                 { https2URI_fixed,  false, OF_PUBLISHER_TEST },
171                 { https2URI_chunk,  false, OF_PUBLISHER_TEST },
172 
173                 { httpURI_fixed,    true, OF_PUBLISHER_TEST },
174                 { httpURI_chunk,    true, OF_PUBLISHER_TEST },
175                 { httpsURI_fixed,   true, OF_PUBLISHER_TEST },
176                 { httpsURI_chunk,   true, OF_PUBLISHER_TEST },
177                 { http2URI_fixed,   true, OF_PUBLISHER_TEST },
178                 { http2URI_chunk,   true, OF_PUBLISHER_TEST },
179                 { https2URI_fixed,  true, OF_PUBLISHER_TEST },
180                 { https2URI_chunk,  true, OF_PUBLISHER_TEST },
181         };
182     }
183 
184     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
185     HttpClient newHttpClient() {
186         return TRACKER.track(HttpClient.newBuilder()
187                          .executor(executor)
188                          .sslContext(sslContext)
189                          .build());
190     }
191 
192     @Test(dataProvider = "variants")
193     public void testExceptions(String uri, boolean sameClient, BHS handlers) throws Exception {
194         HttpClient client = null;
195         for (int i=0; i< ITERATION_COUNT; i++) {
196             if (!sameClient || client == null)
197                 client = newHttpClient();
198 
199             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
200                     .build();
201             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
202             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
203             try {
204                 response.body().subscribe(null);
205                 throw new RuntimeException("Expected NPE not thrown");
206             } catch (NullPointerException x) {
207                 System.out.println("Got expected NPE: " + x);
208             }
209             // We can reuse our BodySubscribers implementations to subscribe to the
210             // Publisher<List<ByteBuffer>>
211             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
212             response.body().subscribe(ofString);
213 
214             BodySubscriber<String> ofString2 = BodySubscribers.ofString(UTF_8);
215             response.body().subscribe(ofString2);
216             try {
217                 ofString2.getBody().toCompletableFuture().join();
218                 throw new RuntimeException("Expected ISE not thrown");
219             } catch (CompletionException x) {
220                 Throwable cause = x.getCause();
221                 if (cause instanceof  IllegalStateException) {
222                     System.out.println("Got expected ISE: " + cause);
223                 } else {
224                     throw x;
225                 }
226             }
227             // Get the final result and compare it with the expected body
228             String body = ofString.getBody().toCompletableFuture().get();
229             assertEquals(body, "");
230             // ensure client closes before next iteration
231             if (!sameClient) {
232                 var tracker = TRACKER.getTracker(client);
233                 client = null;
234                 clientCleanup(tracker);
235             }
236         }
237     }
238 
239     @Test(dataProvider = "variants")
240     public void testNoBody(String uri, boolean sameClient, BHS handlers) throws Exception {
241         HttpClient client = null;
242         for (int i=0; i< ITERATION_COUNT; i++) {
243             if (!sameClient || client == null)
244                 client = newHttpClient();
245 
246             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
247                     .build();
248             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
249             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
250             // We can reuse our BodySubscribers implementations to subscribe to the
251             // Publisher<List<ByteBuffer>>
252             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
253             // get the Publisher<List<ByteBuffer>> and
254             // subscribe to it.
255             response.body().subscribe(ofString);
256             // Get the final result and compare it with the expected body
257             String body = ofString.getBody().toCompletableFuture().get();
258             assertEquals(body, "");
259             // ensure client closes before next iteration
260             if (!sameClient) {
261                 var tracker = TRACKER.getTracker(client);
262                 client = null;
263                 clientCleanup(tracker);
264             }
265         }
266     }
267 
268     @Test(dataProvider = "variants")
269     public void testNoBodyAsync(String uri, boolean sameClient, BHS handlers) throws Exception {
270         HttpClient client = null;
271         for (int i=0; i< ITERATION_COUNT; i++) {
272             if (!sameClient || client == null)
273                 client = newHttpClient();
274 
275             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
276                     .build();
277             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
278             // We can reuse our BodySubscribers implementations to subscribe to the
279             // Publisher<List<ByteBuffer>>
280             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
281             CompletableFuture<String> result =
282                     client.sendAsync(req, handler).thenCompose(
283                             (responsePublisher) -> {
284                                 // get the Publisher<List<ByteBuffer>> and
285                                 // subscribe to it.
286                                 responsePublisher.body().subscribe(ofString);
287                                 return ofString.getBody();
288                             });
289             // Get the final result and compare it with the expected body
290             assertEquals(result.get(), "");
291             // ensure client closes before next iteration
292             if (!sameClient) {
293                 var tracker = TRACKER.getTracker(client);
294                 client = null;
295                 clientCleanup(tracker);
296             }
297         }
298     }
299 
300     @Test(dataProvider = "variants")
301     public void testAsString(String uri, boolean sameClient, BHS handlers) throws Exception {
302         HttpClient client = null;
303         for (int i=0; i< ITERATION_COUNT; i++) {
304             if (!sameClient || client == null)
305                 client = newHttpClient();
306 
307             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
308                     .build();
309             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
310             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
311             // We can reuse our BodySubscribers implementations to subscribe to the
312             // Publisher<List<ByteBuffer>>
313             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
314             // get the Publisher<List<ByteBuffer>> and
315             // subscribe to it.
316             response.body().subscribe(ofString);
317             // Get the final result and compare it with the expected body
318             String body = ofString.getBody().toCompletableFuture().get();
319             assertEquals(body, WITH_BODY);
320             // ensure client closes before next iteration
321             if (!sameClient) {
322                 var tracker = TRACKER.getTracker(client);
323                 client = null;
324                 clientCleanup(tracker);
325             }
326         }
327     }
328 
329     @Test(dataProvider = "variants")
330     public void testAsStringAsync(String uri, boolean sameClient, BHS handlers) throws Exception {
331         HttpClient client = null;
332         for (int i=0; i< ITERATION_COUNT; i++) {
333             if (!sameClient || client == null)
334                 client = newHttpClient();
335 
336             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
337                     .build();
338             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
339             // We can reuse our BodySubscribers implementations to subscribe to the
340             // Publisher<List<ByteBuffer>>
341             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
342             CompletableFuture<String> result = client.sendAsync(req, handler)
343                     .thenCompose((responsePublisher) -> {
344                         // get the Publisher<List<ByteBuffer>> and
345                         // subscribe to it.
346                         responsePublisher.body().subscribe(ofString);
347                         return ofString.getBody();
348                     });
349             // Get the final result and compare it with the expected body
350             String body = result.get();
351             assertEquals(body, WITH_BODY);
352             // ensure client closes before next iteration
353             if (!sameClient) {
354                 var tracker = TRACKER.getTracker(client);
355                 client = null;
356                 clientCleanup(tracker);
357             }
358         }
359     }
360 
361     // A BodyHandler that returns PublishingBodySubscriber instances
362     static class PublishingBodyHandler implements BodyHandler<Publisher<List<ByteBuffer>>> {
363         @Override
364         public BodySubscriber<Publisher<List<ByteBuffer>>> apply(HttpResponse.ResponseInfo rinfo) {
365             assertEquals(rinfo.statusCode(), 200);
366             return new PublishingBodySubscriber();
367         }
368     }
369 
370     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
371     static class PublishingBodySubscriber implements BodySubscriber<Publisher<List<ByteBuffer>>> {
372         private final CompletableFuture<Flow.Subscription> subscriptionCF = new CompletableFuture<>();
373         private final CompletableFuture<Flow.Subscriber<? super List<ByteBuffer>>> subscribedCF = new CompletableFuture<>();
374         private AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
375         private final CompletionStage<Publisher<List<ByteBuffer>>> body =
376                 subscriptionCF.thenCompose((s) -> CompletableFuture.completedStage(this::subscribe));
377                 //CompletableFuture.completedStage(this::subscribe);
378 
379         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
380             Objects.requireNonNull(subscriber, "subscriber must not be null");
381             if (subscriberRef.compareAndSet(null, subscriber)) {
382                 subscriptionCF.thenAccept((s) -> {
383                     subscriber.onSubscribe(s);
384                     subscribedCF.complete(subscriber);
385                 });
386             } else {
387                 subscriber.onSubscribe(new Flow.Subscription() {
388                     @Override public void request(long n) { }
389                     @Override public void cancel() { }
390                 });
391                 subscriber.onError(
392                         new IllegalStateException("This publisher has already one subscriber"));
393             }
394         }
395 
396         @Override
397         public void onSubscribe(Flow.Subscription subscription) {
398             subscriptionCF.complete(subscription);
399         }
400 
401         @Override
402         public void onNext(List<ByteBuffer> item) {
403             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
404             Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
405             assert subscriber != null; // cannot be called before subscriber calls request(1)
406             subscriber.onNext(item);
407         }
408 
409         @Override
410         public void onError(Throwable throwable) {
411             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
412             // onError can be called before request(1), and therefore can
413             // be called before subscriberRef is set.
414             subscribedCF.thenAccept(s -> s.onError(throwable));
415         }
416 
417         @Override
418         public void onComplete() {
419             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
420             // onComplete can be called before request(1), and therefore can
421             // be called before subscriberRef is set.
422             subscribedCF.thenAccept(s -> s.onComplete());
423         }
424 
425         @Override
426         public CompletionStage<Publisher<List<ByteBuffer>>> getBody() {
427             return body;
428         }
429     }
430 
431     static String serverAuthority(HttpServer server) {
432         return InetAddress.getLoopbackAddress().getHostName() + ":"
433                 + server.getAddress().getPort();
434     }
435 
436     @BeforeTest
437     public void setup() throws Exception {
438         sslContext = new SimpleSSLContext().get();
439         if (sslContext == null)
440             throw new AssertionError("Unexpected null sslContext");
441 
442         // HTTP/1.1
443         HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
444         HttpTestHandler h1_chunkHandler = new HTTP_VariableLengthHandler();
445         httpTestServer = HttpTestServer.create(HTTP_1_1);
446         httpTestServer.addHandler( h1_fixedLengthHandler, "/http1/fixed");
447         httpTestServer.addHandler(h1_chunkHandler,"/http1/chunk");
448         httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed";
449         httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk";
450 
451         httpsTestServer = HttpTestServer.create(HTTP_1_1, sslContext);
452         httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
453         httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
454         httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed";
455         httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk";
456 
457         // HTTP/2
458         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
459         HttpTestHandler h2_chunkedHandler = new HTTP_VariableLengthHandler();
460 
461         http2TestServer = HttpTestServer.create(HTTP_2);
462         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
463         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
464         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
465         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
466 
467         https2TestServer = HttpTestServer.create(HTTP_2, sslContext);
468         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
469         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
470         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
471         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
472 
473         httpTestServer.start();
474         httpsTestServer.start();
475         http2TestServer.start();
476         https2TestServer.start();
477     }
478 
479     @AfterTest
480     public void teardown() throws Exception {
481         Thread.sleep(100);
482         AssertionError fail = TRACKER.check(500);
483         try {
484             httpTestServer.stop();
485             httpsTestServer.stop();
486             http2TestServer.stop();
487             https2TestServer.stop();
488         } finally {
489             if (fail != null) {
490                 throw fail;
491             }
492         }
493     }
494 
495     // Wait for the client to be garbage collected.
496     // we use the ReferenceTracker API rather than HttpClient::close here,
497     // because we want to get some diagnosis if a client doesn't release
498     // its resources and terminates as expected
499     // By using the ReferenceTracker, we will get some diagnosis about what
500     // is keeping the client alive if it doesn't get GC'ed within the
501     // expected time frame.
502     public void clientCleanup(OperationTrackers.Tracker tracker){
503         System.gc();
504         System.out.println(now() + "waiting for client to shutdown: " + tracker.getName());
505         System.err.println(now() + "waiting for client to shutdown: " + tracker.getName());
506         var error = TRACKER.check(tracker, 10000);
507         if (error != null) throw error;
508         System.out.println(now() + "client shutdown normally: " + tracker.getName());
509         System.err.println(now() + "client shutdown normally: " + tracker.getName());
510     }
511 
512     static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" +
513             " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" +
514             " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" +
515             " exercitation ullamco laboris nisi ut aliquip ex ea" +
516             " commodo consequat. Duis aute irure dolor in reprehenderit in " +
517             "voluptate velit esse cillum dolore eu fugiat nulla pariatur." +
518             " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" +
519             " officia deserunt mollit anim id est laborum.";
520 
521     static class HTTP_FixedLengthHandler implements HttpTestHandler {
522         @Override
523         public void handle(HttpTestExchange t) throws IOException {
524             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
525             try (InputStream is = t.getRequestBody()) {
526                 is.readAllBytes();
527             }
528             if (t.getRequestURI().getPath().endsWith("/withBody")) {
529                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
530                 t.sendResponseHeaders(200, bytes.length);  // body
531                 try (OutputStream os = t.getResponseBody()) {
532                     os.write(bytes);
533                 }
534             } else {
535                 t.sendResponseHeaders(200, 0);  //no body
536             }
537         }
538     }
539 
540     static class HTTP_VariableLengthHandler implements HttpTestHandler {
541         @Override
542         public void handle(HttpTestExchange t) throws IOException {
543             out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI());
544             try (InputStream is = t.getRequestBody()) {
545                 is.readAllBytes();
546             }
547             t.sendResponseHeaders(200, -1);  //chunked or variable
548             if (t.getRequestURI().getPath().endsWith("/withBody")) {
549                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
550                 try (OutputStream os = t.getResponseBody()) {
551                     int chunkLen = bytes.length/10;
552                     if (chunkLen == 0) {
553                         os.write(bytes);
554                     } else {
555                         int count = 0;
556                         for (int i=0; i<10; i++) {
557                             os.write(bytes, count, chunkLen);
558                             os.flush();
559                             count += chunkLen;
560                         }
561                         os.write(bytes, count, bytes.length % chunkLen);
562                         count += bytes.length % chunkLen;
563                         assert count == bytes.length;
564                     }
565                 }
566             } else {
567                 t.getResponseBody().close();   // no body
568             }
569         }
570     }
571 }