1 /*
  2  * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @summary Tests mapped response subscriber
 27  * @library /test/lib /test/jdk/java/net/httpclient/lib
 28  * @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.http2.Http2TestServer
 29  *        jdk.httpclient.test.lib.common.TestServerConfigurator
 30  * @run testng/othervm
 31  *       -Djdk.internal.httpclient.debug=true
 32  *      MappingResponseSubscriber
 33  */
 34 
 35 import java.io.IOException;
 36 import java.io.InputStream;
 37 import java.io.OutputStream;
 38 import java.net.InetAddress;
 39 import java.net.InetSocketAddress;
 40 import java.net.URI;
 41 import java.nio.ByteBuffer;
 42 import java.util.List;
 43 import java.util.concurrent.CompletionStage;
 44 import java.util.concurrent.Executor;
 45 import java.util.concurrent.Executors;
 46 import java.util.concurrent.Flow;
 47 import com.sun.net.httpserver.HttpExchange;
 48 import com.sun.net.httpserver.HttpHandler;
 49 import com.sun.net.httpserver.HttpServer;
 50 import com.sun.net.httpserver.HttpsServer;
 51 import java.net.http.HttpClient;
 52 import java.net.http.HttpHeaders;
 53 import java.net.http.HttpRequest;
 54 import java.net.http.HttpResponse;
 55 import java.net.http.HttpResponse.BodyHandler;
 56 import java.net.http.HttpResponse.BodyHandlers;
 57 import java.net.http.HttpResponse.BodySubscribers;
 58 import  java.net.http.HttpResponse.BodySubscriber;
 59 import java.util.function.Function;
 60 import javax.net.ssl.SSLContext;
 61 
 62 import jdk.httpclient.test.lib.common.TestServerConfigurator;
 63 import jdk.internal.net.http.common.OperationTrackers.Tracker;
 64 import jdk.httpclient.test.lib.http2.Http2TestServer;
 65 import jdk.httpclient.test.lib.http2.Http2TestExchange;
 66 import jdk.httpclient.test.lib.http2.Http2Handler;
 67 import jdk.test.lib.net.SimpleSSLContext;
 68 import org.testng.annotations.AfterTest;
 69 import org.testng.annotations.BeforeTest;
 70 import org.testng.annotations.DataProvider;
 71 import org.testng.annotations.Test;
 72 import static java.lang.System.out;
 73 import static java.nio.charset.StandardCharsets.UTF_8;
 74 import static org.testng.Assert.assertEquals;
 75 import static org.testng.Assert.assertTrue;
 76 
 77 public class MappingResponseSubscriber {
 78 
 79     SSLContext sslContext;
 80     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
 81     HttpsServer httpsTestServer;       // HTTPS/1.1
 82     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
 83     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
 84     String httpURI_fixed;
 85     String httpURI_chunk;
 86     String httpsURI_fixed;
 87     String httpsURI_chunk;
 88     String http2URI_fixed;
 89     String http2URI_chunk;
 90     String https2URI_fixed;
 91     String https2URI_chunk;
 92 
 93     static final int ITERATION_COUNT = 3;
 94     // a shared executor helps reduce the amount of threads created by the test
 95     static final Executor executor = Executors.newCachedThreadPool();
 96 
 97     static final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
 98 
 99     @DataProvider(name = "variants")
100     public Object[][] variants() {
101         return new Object[][]{
102                 { httpURI_fixed,    false },
103                 { httpURI_chunk,    false },
104                 { httpsURI_fixed,   false },
105                 { httpsURI_chunk,   false },
106                 { http2URI_fixed,   false },
107                 { http2URI_chunk,   false },
108                 { https2URI_fixed,  false,},
109                 { https2URI_chunk,  false },
110 
111                 { httpURI_fixed,    true },
112                 { httpURI_chunk,    true },
113                 { httpsURI_fixed,   true },
114                 { httpsURI_chunk,   true },
115                 { http2URI_fixed,   true },
116                 { http2URI_chunk,   true },
117                 { https2URI_fixed,  true,},
118                 { https2URI_chunk,  true },
119         };
120     }
121 
122     HttpClient newHttpClient() {
123         return HttpClient.newBuilder()
124                          .executor(executor)
125                          .sslContext(sslContext)
126                          .build();
127     }
128 
129     @Test(dataProvider = "variants")
130     public void testAsBytes(String uri, boolean sameClient) throws Exception {
131         HttpClient client = null;
132         for (int i = 0; i < ITERATION_COUNT; i++) {
133             if (!sameClient || client == null)
134                 client = newHttpClient();
135 
136             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
137                     .build();
138             BodyHandler<byte[]> handler = new CRSBodyHandler();
139             HttpResponse<byte[]> response = client.send(req, handler);
140             byte[] body = response.body();
141             assertEquals(body, bytes);
142 
143             // if sameClient we will reuse the client for the next
144             // operation, so there's nothing more to do.
145             if (sameClient) continue;
146 
147             // if no error and not same client then wait for the
148             // client to be GC'ed before performing the nex operation
149             Tracker tracker = TRACKER.getTracker(client);
150             client = null;
151             System.gc();
152             AssertionError error = TRACKER.check(tracker, 1500);
153             if (error != null) throw error; // the client didn't shut down properly
154         }
155         if (sameClient) {
156             Tracker tracker = TRACKER.getTracker(client);
157             client = null;
158             System.gc();
159             AssertionError error = TRACKER.check(tracker,1500);
160             if (error != null) throw error; // the client didn't shut down properly
161         }
162     }
163 
164     static class CRSBodyHandler implements BodyHandler<byte[]> {
165         @Override
166         public BodySubscriber<byte[]> apply(HttpResponse.ResponseInfo rinfo) {
167             assertEquals(rinfo.statusCode(), 200);
168             return BodySubscribers.mapping(
169                 new CRSBodySubscriber(), (s) -> s.getBytes(UTF_8)
170             );
171         }
172     }
173 
174     static class CRSBodySubscriber implements BodySubscriber<String> {
175         private final BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
176         volatile boolean onSubscribeCalled;
177 
178         @Override
179         public void onSubscribe(Flow.Subscription subscription) {
180             //out.println("onSubscribe ");
181             onSubscribeCalled = true;
182             ofString.onSubscribe(subscription);
183         }
184 
185         @Override
186         public void onNext(List<ByteBuffer> item) {
187            // out.println("onNext " + item);
188             assertTrue(onSubscribeCalled);
189             ofString.onNext(item);
190         }
191 
192         @Override
193         public void onError(Throwable throwable) {
194             //out.println("onError");
195             assertTrue(onSubscribeCalled);
196             ofString.onError(throwable);
197         }
198 
199         @Override
200         public void onComplete() {
201             //out.println("onComplete");
202             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
203             ofString.onComplete();
204         }
205 
206         @Override
207         public CompletionStage<String> getBody() {
208             return ofString.getBody();
209         }
210     }
211 
212     static String serverAuthority(HttpServer server) {
213         return InetAddress.getLoopbackAddress().getHostName() + ":"
214                 + server.getAddress().getPort();
215     }
216 
217     @BeforeTest
218     public void setup() throws Exception {
219         sslContext = new SimpleSSLContext().get();
220         if (sslContext == null)
221             throw new AssertionError("Unexpected null sslContext");
222 
223         // HTTP/1.1
224         HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler();
225         HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler();
226         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
227         httpTestServer = HttpServer.create(sa, 0);
228         httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler);
229         httpTestServer.createContext("/http1/chunk", h1_chunkHandler);
230         httpURI_fixed = "http://" + serverAuthority(httpTestServer) + "/http1/fixed";
231         httpURI_chunk = "http://" + serverAuthority(httpTestServer) + "/http1/chunk";
232 
233         httpsTestServer = HttpsServer.create(sa, 0);
234         httpsTestServer.setHttpsConfigurator(new TestServerConfigurator(sa.getAddress(), sslContext));
235         httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler);
236         httpsTestServer.createContext("/https1/chunk", h1_chunkHandler);
237         httpsURI_fixed = "https://" + serverAuthority(httpsTestServer) + "/https1/fixed";
238         httpsURI_chunk = "https://" + serverAuthority(httpsTestServer) + "/https1/chunk";
239 
240         // HTTP/2
241         Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler();
242         Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler();
243 
244         http2TestServer = new Http2TestServer("localhost", false, 0);
245         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
246         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
247         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
248         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
249 
250         https2TestServer = new Http2TestServer("localhost", true, sslContext);
251         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
252         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
253         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
254         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
255 
256         httpTestServer.start();
257         httpsTestServer.start();
258         http2TestServer.start();
259         https2TestServer.start();
260     }
261 
262     @AfterTest
263     public void teardown() throws Exception {
264         httpTestServer.stop(0);
265         httpsTestServer.stop(0);
266         http2TestServer.stop();
267         https2TestServer.stop();
268     }
269 
270     static byte[] bytes;
271 
272     static {
273         bytes = new byte[128 * 1024];
274         int b = 'A';
275 
276         for (int i=0; i< bytes.length; i++) {
277             bytes[i] = (byte)b;
278             b = b == 'Z'? 'A' : b + 1;
279         }
280     }
281 
282     static class HTTP1_FixedLengthHandler implements HttpHandler {
283         @Override
284         public void handle(HttpExchange t) throws IOException {
285             out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI());
286             try (InputStream is = t.getRequestBody()) {
287                 is.readAllBytes();
288             }
289             t.sendResponseHeaders(200, bytes.length);  //no body
290             OutputStream os = t.getResponseBody();
291             os.write(bytes);
292             os.close();
293         }
294     }
295 
296     static class HTTP1_ChunkedHandler implements HttpHandler {
297         @Override
298         public void handle(HttpExchange t) throws IOException {
299             out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI());
300             try (InputStream is = t.getRequestBody()) {
301                 is.readAllBytes();
302             }
303             t.sendResponseHeaders(200, 0); // chunked
304             OutputStream os = t.getResponseBody();
305             os.write(bytes);
306             os.close();
307         }
308     }
309 
310     static class HTTP2_FixedLengthHandler implements Http2Handler {
311         @Override
312         public void handle(Http2TestExchange t) throws IOException {
313             out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI());
314             try (InputStream is = t.getRequestBody()) {
315                 is.readAllBytes();
316             }
317             t.sendResponseHeaders(200, 0); // chunked
318             OutputStream os = t.getResponseBody();
319             os.write(bytes);
320             os.close();
321         }
322     }
323 
324     static class HTTP2_VariableHandler implements Http2Handler {
325         @Override
326         public void handle(Http2TestExchange t) throws IOException {
327             out.println("HTTP2_VariableHandler received request to " + t.getRequestURI());
328             try (InputStream is = t.getRequestBody()) {
329                 is.readAllBytes();
330             }
331             t.sendResponseHeaders(200, bytes.length);  //no body
332             OutputStream os = t.getResponseBody();
333             os.write(bytes);
334             os.close();
335         }
336     }
337 
338     // -- Compile only. Verifies generic signatures
339 
340     static final Function<CharSequence,Integer> f1 = subscriber -> 1;
341     static final Function<CharSequence,Number> f2 = subscriber -> 2;
342     static final Function<String,Integer> f3 = subscriber -> 3;
343     static final Function<String,Number> f4 = subscriber -> 4;
344 
345     public void compileOnly() throws Exception {
346         HttpClient client = null;
347         HttpRequest req = null;
348 
349         HttpResponse<Integer> r1 = client.send(req, (ri) ->
350                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> 1));
351         HttpResponse<Number>  r2 = client.send(req, (ri) ->
352                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> 1));
353         HttpResponse<String>  r3 = client.send(req, (ri) ->
354                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> "s"));
355         HttpResponse<CharSequence> r4 = client.send(req, (ri) ->
356                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> "s"));
357 
358         HttpResponse<Integer> x1 = client.send(req, (ri) ->
359                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f1));
360         HttpResponse<Number>  x2 = client.send(req, (ri) ->
361                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f1));
362         HttpResponse<Number>  x3 = client.send(req, (ri) ->
363                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f2));
364         HttpResponse<Integer> x4 = client.send(req, (ri) ->
365                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f3));
366         HttpResponse<Number>  x5 = client.send(req, (ri) ->
367                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f3));
368         HttpResponse<Number>  x7 = client.send(req, (ri) ->
369                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f4));
370     }
371 }