1 /*
  2  * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @bug 8195823
 27  * @summary Buffers given to response body subscribers should not contain
 28  *          unprocessed HTTP data
 29  * @library /test/lib /test/jdk/java/net/httpclient/lib
 30  * @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
 31  *        jdk.httpclient.test.lib.common.TestServerConfigurator
 32  * @run testng/othervm
 33  *      -Djdk.tracePinnedThreads=full
 34  *      -Djdk.httpclient.HttpClient.log=headers,errors,channel
 35  *      ConcurrentResponses
 36  */
 37 
 38 import java.io.IOException;
 39 import java.io.InputStream;
 40 import java.io.OutputStream;
 41 import java.net.InetAddress;
 42 import java.net.InetSocketAddress;
 43 import java.net.URI;
 44 import java.nio.ByteBuffer;
 45 import java.util.HashMap;
 46 import java.util.List;
 47 import java.util.Map;
 48 import java.util.concurrent.CompletableFuture;
 49 import java.util.concurrent.CompletionStage;
 50 import java.util.concurrent.ExecutorService;
 51 import java.util.concurrent.Executors;
 52 import java.util.concurrent.Flow;
 53 import java.util.concurrent.atomic.AtomicInteger;
 54 import java.util.stream.IntStream;
 55 import javax.net.ssl.SSLContext;
 56 import com.sun.net.httpserver.HttpExchange;
 57 import com.sun.net.httpserver.HttpHandler;
 58 import com.sun.net.httpserver.HttpServer;
 59 import com.sun.net.httpserver.HttpsServer;
 60 import java.net.http.HttpClient;
 61 import java.net.http.HttpRequest;
 62 import java.net.http.HttpResponse;
 63 import java.net.http.HttpResponse.BodyHandler;
 64 import java.net.http.HttpResponse.BodyHandlers;
 65 import java.net.http.HttpResponse.BodySubscriber;
 66 import java.net.http.HttpResponse.BodySubscribers;
 67 
 68 import jdk.httpclient.test.lib.common.TestServerConfigurator;
 69 import jdk.httpclient.test.lib.http2.Http2TestServer;
 70 import jdk.httpclient.test.lib.http2.Http2TestExchange;
 71 import jdk.httpclient.test.lib.http2.Http2Handler;
 72 import jdk.test.lib.net.SimpleSSLContext;
 73 import org.testng.annotations.AfterTest;
 74 import org.testng.annotations.BeforeTest;
 75 import org.testng.annotations.DataProvider;
 76 import org.testng.annotations.Test;
 77 import static java.nio.charset.StandardCharsets.UTF_8;
 78 import static java.net.http.HttpResponse.BodyHandlers.discarding;
 79 import static org.testng.Assert.assertEquals;
 80 import static org.testng.Assert.assertFalse;
 81 import static org.testng.Assert.fail;
 82 
 83 public class ConcurrentResponses {
 84 
 85     SSLContext sslContext;
 86     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
 87     HttpsServer httpsTestServer;       // HTTPS/1.1
 88     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
 89     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
 90     String httpFixedURI, httpsFixedURI, httpChunkedURI, httpsChunkedURI;
 91     String http2FixedURI, https2FixedURI, http2VariableURI, https2VariableURI;
 92 
 93     static final int CONCURRENT_REQUESTS = 13;
 94     static final AtomicInteger IDS = new AtomicInteger();
 95 
 96     static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
 97     static final int ALPHABET_LENGTH = ALPHABET.length();
 98 
 99     static final String stringOfLength(int requiredLength) {
100         StringBuilder sb = new StringBuilder(requiredLength);
101         IntStream.range(0, requiredLength)
102                  .mapToObj(i -> ALPHABET.charAt(i % ALPHABET_LENGTH))
103                  .forEach(c -> sb.append(c));
104         return sb.toString();
105     }
106 
107     /** An array of different Strings, to be used as bodies. */
108     static final String[] BODIES = bodies();
109 
110     static String[] bodies() {
111         String[] bodies = new String[CONCURRENT_REQUESTS];
112         for (int i=0;i<CONCURRENT_REQUESTS; i++) {
113             // slightly, but still, different bodies
114             bodies[i] = "Request-" + i + "-body-" + stringOfLength((1024) + i);
115         }
116         return bodies;
117     }
118 
119     /**
120      * Asserts the given response's status code is 200.
121      * Returns a CF that completes with the given response.
122      */
123     static final <T> CompletionStage<HttpResponse<T>>
124     assert200ResponseCode(HttpResponse<T> response) {
125         assertEquals(response.statusCode(), 200);
126         return CompletableFuture.completedFuture(response);
127     }
128 
129     /**
130      * Asserts that the given response's body is equal to the given body.
131      * Returns a CF that completes with the given response.
132      */
133     static final <T> CompletionStage<HttpResponse<T>>
134     assertbody(HttpResponse<T> response, T body) {
135         assertEquals(response.body(), body);
136         return CompletableFuture.completedFuture(response);
137     }
138 
139     @DataProvider(name = "uris")
140     public Object[][] variants() {
141         return new Object[][]{
142                 { httpFixedURI },
143                 { httpsFixedURI },
144                 { httpChunkedURI },
145                 { httpsChunkedURI },
146                 { http2FixedURI },
147                 { https2FixedURI },
148                 { http2VariableURI },
149                 { https2VariableURI }
150         };
151     }
152 
153 
154     // The ofString implementation accumulates data, below a certain threshold
155     // into the byte buffers it is given.
156     @Test(dataProvider = "uris")
157     void testAsString(String uri) throws Exception {
158         int id = IDS.getAndIncrement();
159         ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
160                 .name("HttpClient-" + id + "-Worker", 0).factory());
161         HttpClient client = HttpClient.newBuilder()
162                 .sslContext(sslContext)
163                 .executor(virtualExecutor)
164                 .build();
165         try {
166             Map<HttpRequest, String> requests = new HashMap<>();
167             for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
168                 HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
169                         .build();
170                 requests.put(request, BODIES[i]);
171             }
172 
173             // initial connection to seed the cache so next parallel connections reuse it
174             client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
175 
176             // will reuse connection cached from the previous request ( when HTTP/2 )
177             CompletableFuture.allOf(requests.keySet().parallelStream()
178                             .map(request -> client.sendAsync(request, BodyHandlers.ofString()))
179                             .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
180                             .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
181                             .toArray(CompletableFuture<?>[]::new))
182                     .join();
183         } finally {
184             client.close();
185             virtualExecutor.close();
186         }
187     }
188 
189     // The custom subscriber aggressively attacks any area, between the limit
190     // and the capacity, in the byte buffers it is given, by writing 'X' into it.
191     @Test(dataProvider = "uris")
192     void testWithCustomSubscriber(String uri) throws Exception {
193         int id = IDS.getAndIncrement();
194         ExecutorService virtualExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
195                 .name("HttpClient-" + id + "-Worker", 0).factory());
196         HttpClient client = HttpClient.newBuilder()
197                 .executor(virtualExecutor)
198                 .sslContext(sslContext).build();
199         try {
200             Map<HttpRequest, String> requests = new HashMap<>();
201             for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
202                 HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
203                         .build();
204                 requests.put(request, BODIES[i]);
205             }
206 
207             // initial connection to seed the cache so next parallel connections reuse it
208             client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discarding()).join();
209 
210             // will reuse connection cached from the previous request ( when HTTP/2 )
211             CompletableFuture.allOf(requests.keySet().parallelStream()
212                             .map(request -> client.sendAsync(request, CustomSubscriber.handler))
213                             .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
214                             .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
215                             .toArray(CompletableFuture<?>[]::new))
216                     .join();
217         } finally {
218             client.close();
219             virtualExecutor.close();
220         }
221     }
222 
223 
224     /**
225      * A subscriber that wraps ofString, but mucks with any data between limit
226      * and capacity, if the client mistakenly passes it any that is should not.
227      */
228     static class CustomSubscriber implements BodySubscriber<String> {
229         static final BodyHandler<String> handler = (r) -> new CustomSubscriber();
230         private final BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
231 
232         @Override
233         public CompletionStage<String> getBody() {
234             return ofString.getBody();
235         }
236 
237         @Override
238         public void onSubscribe(Flow.Subscription subscription) {
239             ofString.onSubscribe(subscription);
240         }
241 
242         @Override
243         public void onNext(List<ByteBuffer> buffers) {
244             // Muck any data beyond the give limit, since there shouldn't
245             // be any of interest to the HTTP Client.
246             for (ByteBuffer buffer : buffers) {
247                 if (buffer.isReadOnly())
248                     continue;
249 
250                 if (buffer.limit() != buffer.capacity()) {
251                     final int limit = buffer.limit();
252                     final int position = buffer.position();
253                     buffer.position(buffer.limit());
254                     buffer.limit(buffer.capacity());
255                     while (buffer.hasRemaining())
256                         buffer.put((byte)'X');
257                     buffer.position(position); // restore original position
258                     buffer.limit(limit);       // restore original limit
259                 }
260             }
261             ofString.onNext(buffers);
262         }
263 
264         @Override
265         public void onError(Throwable throwable) {
266             ofString.onError(throwable);
267             throwable.printStackTrace();
268             fail("UNEXPECTED:" + throwable);
269         }
270 
271         @Override
272         public void onComplete() {
273             ofString.onComplete();
274         }
275     }
276 
277     static String serverAuthority(HttpServer server) {
278         return InetAddress.getLoopbackAddress().getHostName() + ":"
279                 + server.getAddress().getPort();
280     }
281 
282     @BeforeTest
283     public void setup() throws Exception {
284         sslContext = new SimpleSSLContext().get();
285         if (sslContext == null)
286             throw new AssertionError("Unexpected null sslContext");
287 
288         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
289         httpTestServer = HttpServer.create(sa, 0);
290         httpTestServer.createContext("/http1/fixed", new Http1FixedHandler());
291         httpFixedURI = "http://" + serverAuthority(httpTestServer) + "/http1/fixed";
292         httpTestServer.createContext("/http1/chunked", new Http1ChunkedHandler());
293         httpChunkedURI = "http://" + serverAuthority(httpTestServer) + "/http1/chunked";
294 
295         httpsTestServer = HttpsServer.create(sa, 0);
296         httpsTestServer.setHttpsConfigurator(new TestServerConfigurator(sa.getAddress(), sslContext));
297         httpsTestServer.createContext("/https1/fixed", new Http1FixedHandler());
298         httpsFixedURI = "https://" + serverAuthority(httpsTestServer) + "/https1/fixed";
299         httpsTestServer.createContext("/https1/chunked", new Http1ChunkedHandler());
300         httpsChunkedURI = "https://" + serverAuthority(httpsTestServer) + "/https1/chunked";
301 
302         http2TestServer = new Http2TestServer("localhost", false, 0);
303         http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed");
304         http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed";
305         http2TestServer.addHandler(new Http2VariableHandler(), "/http2/variable");
306         http2VariableURI = "http://" + http2TestServer.serverAuthority() + "/http2/variable";
307 
308         https2TestServer = new Http2TestServer("localhost", true, sslContext);
309         https2TestServer.addHandler(new Http2FixedHandler(), "/https2/fixed");
310         https2FixedURI = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
311         https2TestServer.addHandler(new Http2VariableHandler(), "/https2/variable");
312         https2VariableURI = "https://" + https2TestServer.serverAuthority() + "/https2/variable";
313 
314         httpTestServer.start();
315         httpsTestServer.start();
316         http2TestServer.start();
317         https2TestServer.start();
318     }
319 
320     @AfterTest
321     public void teardown() throws Exception {
322         httpTestServer.stop(0);
323         httpsTestServer.stop(0);
324         http2TestServer.stop();
325         https2TestServer.stop();
326     }
327 
328     interface SendResponseHeadersFunction {
329         void apply(int responseCode, long responseLength) throws IOException;
330     }
331 
332     // A handler implementation that replies with 200 OK. If the exchange's uri
333     // has a query, then it must be an integer, which is used as an index to
334     // select the particular response body, e.g. /http2/x?5 -> BODIES[5]
335     static void serverHandlerImpl(InputStream inputStream,
336                                   OutputStream outputStream,
337                                   URI uri,
338                                   SendResponseHeadersFunction sendResponseHeadersFunction)
339         throws IOException
340     {
341         try (InputStream is = inputStream;
342              OutputStream os = outputStream) {
343             is.readAllBytes();
344 
345             String magicQuery = uri.getQuery();
346             if (magicQuery != null) {
347                 int bodyIndex = Integer.valueOf(magicQuery);
348                 String body = BODIES[bodyIndex];
349                 byte[] bytes = body.getBytes(UTF_8);
350                 sendResponseHeadersFunction.apply(200, bytes.length);
351                 int offset = 0;
352                 // Deliberately attempt to reply with several relatively
353                 // small data frames ( each write corresponds to its own
354                 // data frame ). Additionally, yield, to encourage other
355                 // handlers to execute, therefore increasing the likelihood
356                 // of multiple different-stream related frames in the
357                 // client's read buffer.
358                 while (offset < bytes.length) {
359                     int length = Math.min(bytes.length - offset, 64);
360                     os.write(bytes, offset, length);
361                     os.flush();
362                     offset += length;
363                     Thread.yield();
364                 }
365             } else {
366                 sendResponseHeadersFunction.apply(200, 1);
367                 os.write('A');
368             }
369         }
370     }
371 
372     static class Http1FixedHandler implements HttpHandler {
373         @Override
374         public void handle(HttpExchange t) throws IOException {
375             serverHandlerImpl(t.getRequestBody(),
376                               t.getResponseBody(),
377                               t.getRequestURI(),
378                               (rcode, length) -> t.sendResponseHeaders(rcode, length));
379         }
380     }
381 
382     static class Http1ChunkedHandler implements HttpHandler {
383         @Override
384         public void handle(HttpExchange t) throws IOException {
385             serverHandlerImpl(t.getRequestBody(),
386                               t.getResponseBody(),
387                               t.getRequestURI(),
388                               (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /*chunked*/));
389         }
390     }
391 
392     static class Http2FixedHandler implements Http2Handler {
393         @Override
394         public void handle(Http2TestExchange t) throws IOException {
395             serverHandlerImpl(t.getRequestBody(),
396                               t.getResponseBody(),
397                               t.getRequestURI(),
398                               (rcode, length) -> t.sendResponseHeaders(rcode, length));
399         }
400     }
401 
402     static class Http2VariableHandler implements Http2Handler {
403         @Override
404         public void handle(Http2TestExchange t) throws IOException {
405             serverHandlerImpl(t.getRequestBody(),
406                               t.getResponseBody(),
407                               t.getRequestURI(),
408                               (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /* no Content-Length */));
409         }
410     }
411 }