1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @bug 8195823
 27  * @summary Buffers given to response body subscribers should not contain
 28  *          unprocessed HTTP data
 29  * @library /test/lib /test/jdk/java/net/httpclient/lib
 30  * @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
 31  * @run testng/othervm
 32  *      -Djdk.tracePinnedThreads=full
 33  *      -Djdk.httpclient.HttpClient.log=headers,errors,channel
 34  *      ConcurrentResponses
 35  */
 36 
 37 import java.io.IOException;
 38 import java.io.InputStream;
 39 import java.io.OutputStream;
 40 import java.net.InetAddress;
 41 import java.net.InetSocketAddress;
 42 import java.net.URI;
 43 import java.nio.ByteBuffer;
 44 import java.util.HashMap;
 45 import java.util.List;
 46 import java.util.Map;
 47 import java.util.concurrent.CompletableFuture;
 48 import java.util.concurrent.CompletionStage;
 49 import java.util.concurrent.ExecutorService;
 50 import java.util.concurrent.Executors;
 51 import java.util.concurrent.Flow;
 52 import java.util.concurrent.atomic.AtomicInteger;
 53 import java.util.stream.IntStream;
 54 import javax.net.ssl.SSLContext;
 55 import com.sun.net.httpserver.HttpExchange;
 56 import com.sun.net.httpserver.HttpHandler;
 57 import com.sun.net.httpserver.HttpServer;
 58 import com.sun.net.httpserver.HttpsConfigurator;
 59 import com.sun.net.httpserver.HttpsServer;
 60 import java.net.http.HttpClient;
 61 import java.net.http.HttpRequest;
 62 import java.net.http.HttpResponse;
 63 import java.net.http.HttpResponse.BodyHandler;
 64 import java.net.http.HttpResponse.BodyHandlers;
 65 import java.net.http.HttpResponse.BodySubscriber;
 66 import java.net.http.HttpResponse.BodySubscribers;
 67 import jdk.httpclient.test.lib.http2.Http2TestServer;
 68 import jdk.httpclient.test.lib.http2.Http2TestExchange;
 69 import jdk.httpclient.test.lib.http2.Http2Handler;
 70 import jdk.test.lib.net.SimpleSSLContext;
 71 import org.testng.annotations.AfterTest;
 72 import org.testng.annotations.BeforeTest;
 73 import org.testng.annotations.DataProvider;
 74 import org.testng.annotations.Test;
 75 import static java.nio.charset.StandardCharsets.UTF_8;
 76 import static java.net.http.HttpResponse.BodyHandlers.discarding;
 77 import static org.testng.Assert.assertEquals;
 78 import static org.testng.Assert.assertFalse;
 79 import static org.testng.Assert.fail;
 80 
 81 public class ConcurrentResponses {
 82 
 83     SSLContext sslContext;
 84     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
 85     HttpsServer httpsTestServer;       // HTTPS/1.1
 86     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
 87     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
 88     String httpFixedURI, httpsFixedURI, httpChunkedURI, httpsChunkedURI;
 89     String http2FixedURI, https2FixedURI, http2VariableURI, https2VariableURI;
 90 
 91     static final int CONCURRENT_REQUESTS = 13;
 92     static final AtomicInteger IDS = new AtomicInteger();
 93 
 94     static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
 95     static final int ALPHABET_LENGTH = ALPHABET.length();
 96 
 97     static final String stringOfLength(int requiredLength) {
 98         StringBuilder sb = new StringBuilder(requiredLength);
 99         IntStream.range(0, requiredLength)
100                  .mapToObj(i -> ALPHABET.charAt(i % ALPHABET_LENGTH))
101                  .forEach(c -> sb.append(c));
102         return sb.toString();
103     }
104 
105     /** An array of different Strings, to be used as bodies. */
106     static final String[] BODIES = bodies();
107 
108     static String[] bodies() {
109         String[] bodies = new String[CONCURRENT_REQUESTS];
110         for (int i=0;i<CONCURRENT_REQUESTS; i++) {
111             // slightly, but still, different bodies
112             bodies[i] = "Request-" + i + "-body-" + stringOfLength((1024) + i);
113         }
114         return bodies;
115     }
116 
117     /**
118      * Asserts the given response's status code is 200.
119      * Returns a CF that completes with the given response.
120      */
121     static final <T> CompletionStage<HttpResponse<T>>
122     assert200ResponseCode(HttpResponse<T> response) {
123         assertEquals(response.statusCode(), 200);
124         return CompletableFuture.completedFuture(response);
125     }
126 
127     /**
128      * Asserts that the given response's body is equal to the given body.
129      * Returns a CF that completes with the given response.
130      */
131     static final <T> CompletionStage<HttpResponse<T>>
132     assertbody(HttpResponse<T> response, T body) {
133         assertEquals(response.body(), body);
134         return CompletableFuture.completedFuture(response);
135     }
136 
137     @DataProvider(name = "uris")
138     public Object[][] variants() {
139         return new Object[][]{
140                 { httpFixedURI },
141                 { httpsFixedURI },
142                 { httpChunkedURI },
143                 { httpsChunkedURI },
144                 { http2FixedURI },
145                 { https2FixedURI },
146                 { http2VariableURI },
147                 { https2VariableURI }
148         };
149     }
150 
151 
152     // The ofString implementation accumulates data, below a certain threshold
153     // into the byte buffers it is given.
154     @Test(dataProvider = "uris")
155     void testAsString(String uri) throws Exception {
156         int id = IDS.getAndIncrement();
157         ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
158                 .name("HttpClient-" + id + "-Worker", 0).factory());
159         HttpClient client = HttpClient.newBuilder()
160                 .sslContext(sslContext)
161                 .executor(virtualExecutor)
162                 .build();
163         try {
164             Map<HttpRequest, String> requests = new HashMap<>();
165             for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
166                 HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
167                         .build();
168                 requests.put(request, BODIES[i]);
169             }
170 
171             // initial connection to seed the cache so next parallel connections reuse it
172             client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
173 
174             // will reuse connection cached from the previous request ( when HTTP/2 )
175             CompletableFuture.allOf(requests.keySet().parallelStream()
176                             .map(request -> client.sendAsync(request, BodyHandlers.ofString()))
177                             .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
178                             .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
179                             .toArray(CompletableFuture<?>[]::new))
180                     .join();
181         } finally {
182             client.close();
183             virtualExecutor.close();
184         }
185     }
186 
187     // The custom subscriber aggressively attacks any area, between the limit
188     // and the capacity, in the byte buffers it is given, by writing 'X' into it.
189     @Test(dataProvider = "uris")
190     void testWithCustomSubscriber(String uri) throws Exception {
191         int id = IDS.getAndIncrement();
192         ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
193                 .name("HttpClient-" + id + "-Worker", 0).factory());
194         HttpClient client = HttpClient.newBuilder()
195                 .executor(virtualExecutor)
196                 .sslContext(sslContext).build();
197         try {
198             Map<HttpRequest, String> requests = new HashMap<>();
199             for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
200                 HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
201                         .build();
202                 requests.put(request, BODIES[i]);
203             }
204 
205             // initial connection to seed the cache so next parallel connections reuse it
206             client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
207 
208             // will reuse connection cached from the previous request ( when HTTP/2 )
209             CompletableFuture.allOf(requests.keySet().parallelStream()
210                             .map(request -> client.sendAsync(request, CustomSubscriber.handler))
211                             .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
212                             .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
213                             .toArray(CompletableFuture<?>[]::new))
214                     .join();
215         } finally {
216             client.close();
217             virtualExecutor.close();
218         }
219     }
220 
221 
222     /**
223      * A subscriber that wraps ofString, but mucks with any data between limit
224      * and capacity, if the client mistakenly passes it any that is should not.
225      */
226     static class CustomSubscriber implements BodySubscriber<String> {
227         static final BodyHandler<String> handler = (r) -> new CustomSubscriber();
228         private final BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
229 
230         @Override
231         public CompletionStage<String> getBody() {
232             return ofString.getBody();
233         }
234 
235         @Override
236         public void onSubscribe(Flow.Subscription subscription) {
237             ofString.onSubscribe(subscription);
238         }
239 
240         @Override
241         public void onNext(List<ByteBuffer> buffers) {
242             // Muck any data beyond the give limit, since there shouldn't
243             // be any of interest to the HTTP Client.
244             for (ByteBuffer buffer : buffers) {
245                 if (buffer.isReadOnly())
246                     continue;
247 
248                 if (buffer.limit() != buffer.capacity()) {
249                     final int limit = buffer.limit();
250                     final int position = buffer.position();
251                     buffer.position(buffer.limit());
252                     buffer.limit(buffer.capacity());
253                     while (buffer.hasRemaining())
254                         buffer.put((byte)'X');
255                     buffer.position(position); // restore original position
256                     buffer.limit(limit);       // restore original limit
257                 }
258             }
259             ofString.onNext(buffers);
260         }
261 
262         @Override
263         public void onError(Throwable throwable) {
264             ofString.onError(throwable);
265             throwable.printStackTrace();
266             fail("UNEXPECTED:" + throwable);
267         }
268 
269         @Override
270         public void onComplete() {
271             ofString.onComplete();
272         }
273     }
274 
275     static String serverAuthority(HttpServer server) {
276         return InetAddress.getLoopbackAddress().getHostName() + ":"
277                 + server.getAddress().getPort();
278     }
279 
280     @BeforeTest
281     public void setup() throws Exception {
282         sslContext = new SimpleSSLContext().get();
283         if (sslContext == null)
284             throw new AssertionError("Unexpected null sslContext");
285 
286         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
287         httpTestServer = HttpServer.create(sa, 0);
288         httpTestServer.createContext("/http1/fixed", new Http1FixedHandler());
289         httpFixedURI = "http://" + serverAuthority(httpTestServer) + "/http1/fixed";
290         httpTestServer.createContext("/http1/chunked", new Http1ChunkedHandler());
291         httpChunkedURI = "http://" + serverAuthority(httpTestServer) + "/http1/chunked";
292 
293         httpsTestServer = HttpsServer.create(sa, 0);
294         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
295         httpsTestServer.createContext("/https1/fixed", new Http1FixedHandler());
296         httpsFixedURI = "https://" + serverAuthority(httpsTestServer) + "/https1/fixed";
297         httpsTestServer.createContext("/https1/chunked", new Http1ChunkedHandler());
298         httpsChunkedURI = "https://" + serverAuthority(httpsTestServer) + "/https1/chunked";
299 
300         http2TestServer = new Http2TestServer("localhost", false, 0);
301         http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed");
302         http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed";
303         http2TestServer.addHandler(new Http2VariableHandler(), "/http2/variable");
304         http2VariableURI = "http://" + http2TestServer.serverAuthority() + "/http2/variable";
305 
306         https2TestServer = new Http2TestServer("localhost", true, sslContext);
307         https2TestServer.addHandler(new Http2FixedHandler(), "/https2/fixed");
308         https2FixedURI = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
309         https2TestServer.addHandler(new Http2VariableHandler(), "/https2/variable");
310         https2VariableURI = "https://" + https2TestServer.serverAuthority() + "/https2/variable";
311 
312         httpTestServer.start();
313         httpsTestServer.start();
314         http2TestServer.start();
315         https2TestServer.start();
316     }
317 
318     @AfterTest
319     public void teardown() throws Exception {
320         httpTestServer.stop(0);
321         httpsTestServer.stop(0);
322         http2TestServer.stop();
323         https2TestServer.stop();
324     }
325 
326     interface SendResponseHeadersFunction {
327         void apply(int responseCode, long responseLength) throws IOException;
328     }
329 
330     // A handler implementation that replies with 200 OK. If the exchange's uri
331     // has a query, then it must be an integer, which is used as an index to
332     // select the particular response body, e.g. /http2/x?5 -> BODIES[5]
333     static void serverHandlerImpl(InputStream inputStream,
334                                   OutputStream outputStream,
335                                   URI uri,
336                                   SendResponseHeadersFunction sendResponseHeadersFunction)
337         throws IOException
338     {
339         try (InputStream is = inputStream;
340              OutputStream os = outputStream) {
341             is.readAllBytes();
342 
343             String magicQuery = uri.getQuery();
344             if (magicQuery != null) {
345                 int bodyIndex = Integer.valueOf(magicQuery);
346                 String body = BODIES[bodyIndex];
347                 byte[] bytes = body.getBytes(UTF_8);
348                 sendResponseHeadersFunction.apply(200, bytes.length);
349                 int offset = 0;
350                 // Deliberately attempt to reply with several relatively
351                 // small data frames ( each write corresponds to its own
352                 // data frame ). Additionally, yield, to encourage other
353                 // handlers to execute, therefore increasing the likelihood
354                 // of multiple different-stream related frames in the
355                 // client's read buffer.
356                 while (offset < bytes.length) {
357                     int length = Math.min(bytes.length - offset, 64);
358                     os.write(bytes, offset, length);
359                     os.flush();
360                     offset += length;
361                     Thread.yield();
362                 }
363             } else {
364                 sendResponseHeadersFunction.apply(200, 1);
365                 os.write('A');
366             }
367         }
368     }
369 
370     static class Http1FixedHandler implements HttpHandler {
371         @Override
372         public void handle(HttpExchange t) throws IOException {
373             serverHandlerImpl(t.getRequestBody(),
374                               t.getResponseBody(),
375                               t.getRequestURI(),
376                               (rcode, length) -> t.sendResponseHeaders(rcode, length));
377         }
378     }
379 
380     static class Http1ChunkedHandler implements HttpHandler {
381         @Override
382         public void handle(HttpExchange t) throws IOException {
383             serverHandlerImpl(t.getRequestBody(),
384                               t.getResponseBody(),
385                               t.getRequestURI(),
386                               (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /*chunked*/));
387         }
388     }
389 
390     static class Http2FixedHandler implements Http2Handler {
391         @Override
392         public void handle(Http2TestExchange t) throws IOException {
393             serverHandlerImpl(t.getRequestBody(),
394                               t.getResponseBody(),
395                               t.getRequestURI(),
396                               (rcode, length) -> t.sendResponseHeaders(rcode, length));
397         }
398     }
399 
400     static class Http2VariableHandler implements Http2Handler {
401         @Override
402         public void handle(Http2TestExchange t) throws IOException {
403             serverHandlerImpl(t.getRequestBody(),
404                               t.getResponseBody(),
405                               t.getRequestURI(),
406                               (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /* no Content-Length */));
407         }
408     }
409 }