1 /*
  2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @enablePreview
 27  * @library /test/lib
 28  * @modules java.base/sun.nio.ch
 29  * @key randomness
 30  * @run testng/othervm TestAsyncSocketChannels
 31  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=true TestAsyncSocketChannels
 32  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=false TestAsyncSocketChannels
 33  */
 34 
 35 import java.io.IOException;
 36 import java.lang.foreign.Arena;
 37 import java.lang.foreign.MemorySegment;
 38 import java.net.InetAddress;
 39 import java.net.InetSocketAddress;
 40 import java.net.StandardSocketOptions;
 41 import java.nio.ByteBuffer;
 42 import java.nio.channels.AsynchronousServerSocketChannel;
 43 import java.nio.channels.AsynchronousSocketChannel;
 44 import java.nio.channels.CompletionHandler;
 45 import java.util.List;
 46 import java.util.concurrent.CountDownLatch;
 47 import java.util.concurrent.ExecutionException;
 48 import java.util.concurrent.Future;
 49 import java.util.concurrent.atomic.AtomicBoolean;
 50 import java.util.concurrent.atomic.AtomicInteger;
 51 import java.util.concurrent.atomic.AtomicLong;
 52 import java.util.function.Supplier;
 53 
 54 import org.testng.annotations.*;
 55 import static java.lang.System.out;
 56 import static java.util.concurrent.TimeUnit.SECONDS;
 57 import static java.lang.foreign.ValueLayout.JAVA_BYTE;
 58 import static org.testng.Assert.*;
 59 
 60 /**
 61  * Tests consisting of buffer views with asynchronous NIO network channels.
 62  */
 63 public class TestAsyncSocketChannels extends AbstractChannelsTest {
 64 
 65     static final Class<IOException> IOE = IOException.class;
 66     static final Class<ExecutionException> EE = ExecutionException.class;
 67     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 68 
 69     /** Tests that confined sessions are not supported. */
 70     @Test(dataProvider = "confinedArenas")
 71     public void testWithConfined(Supplier<Arena> arenaSupplier)
 72         throws Throwable
 73     {
 74         try (var channel = AsynchronousSocketChannel.open();
 75              var server = AsynchronousServerSocketChannel.open();
 76              var connectedChannel = connectChannels(server, channel);
 77              var drop = arenaSupplier.get()) {
 78             Arena scope = drop;
 79             var segment = scope.allocate(10, 1);
 80             var bb = segment.asByteBuffer();
 81             var bba = new ByteBuffer[] { bb };
 82             List<ThrowingConsumer<TestHandler,?>> ioOps = List.of(
 83                     handler -> handler.propagateHandlerFromFuture(channel.write(bb)),
 84                     handler -> handler.propagateHandlerFromFuture(channel.read(bb)),
 85                     handler -> channel.write(bb, null, handler),
 86                     handler -> channel.read( bb, null, handler),
 87                     handler -> channel.write(bb , 0L, SECONDS, null, handler),
 88                     handler -> channel.read( bb,  0L, SECONDS, null, handler),
 89                     handler -> channel.write(bba, 0, bba.length, 0L, SECONDS, null, handler),
 90                     handler -> channel.read( bba, 0, bba.length, 0L, SECONDS, null, handler)
 91             );
 92             for (var ioOp : ioOps) {
 93                 out.println("testAsyncWithConfined - op");
 94                 var handler = new TestHandler();
 95                 ioOp.accept(handler);
 96                 handler.await()
 97                         .assertFailedWith(ISE)
 98                         .assertExceptionMessage("Confined session not supported");
 99             }
100         }
101     }
102 
103     /** Tests that I/O with a closed session throws a suitable exception. */
104     @Test(dataProvider = "sharedArenasAndTimeouts")
105     public void testIOWithClosedSharedSession(Supplier<Arena> arenaSupplier, int timeout)
106         throws Exception
107     {
108         try (var channel = AsynchronousSocketChannel.open();
109              var server = AsynchronousServerSocketChannel.open();
110              var connectedChannel = connectChannels(server, channel)) {
111             Arena drop = arenaSupplier.get();
112             ByteBuffer bb = segmentBufferOfSize(drop, 64);
113             ByteBuffer[] buffers = segmentBuffersOfSize(8, drop, 32);
114             drop.close();
115             {
116                 assertCauses(expectThrows(EE, () -> connectedChannel.read(bb).get()), IOE, ISE);
117             }
118             {
119                 var handler = new TestHandler<Integer>();
120                 connectedChannel.read(bb, null, handler);
121                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
122             }
123             {
124                 var handler = new TestHandler<Integer>();
125                 connectedChannel.read(bb, timeout, SECONDS, null, handler);
126                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
127             }
128             {
129                 var handler = new TestHandler<Long>();
130                 connectedChannel.read(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
131                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
132             }
133             {
134                 assertCauses(expectThrows(EE, () -> connectedChannel.write(bb).get()), IOE, ISE);
135             }
136             {
137                 var handler = new TestHandler<Integer>();
138                 connectedChannel.write(bb, null, handler);
139                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
140             }
141             {
142                 var handler = new TestHandler<Integer>();
143                 connectedChannel.write(bb, timeout, SECONDS, null, handler);
144                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
145             }
146             {
147                 var handler = new TestHandler<Long>();
148                 connectedChannel.write(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
149                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
150             }
151         }
152     }
153 
154     /** Tests basic I/O operations work with views over implicit and shared sessions. */
155     @Test(dataProvider = "sharedArenas")
156     public void testBasicIOWithSupportedSession(Supplier<Arena> arenaSupplier)
157         throws Exception
158     {
159         Arena drop;
160         try (var asc1 = AsynchronousSocketChannel.open();
161              var assc = AsynchronousServerSocketChannel.open();
162              var asc2 = connectChannels(assc, asc1);
163              var scp = drop = arenaSupplier.get()) {
164             Arena scope1 = drop;
165             MemorySegment segment1 = scope1.allocate(10, 1);
166             Arena scope = drop;
167             MemorySegment segment2 = scope.allocate(10, 1);
168             for (int i = 0; i < 10; i++) {
169                 segment1.set(JAVA_BYTE, i, (byte) i);
170             }
171             {   // Future variants
172                 ByteBuffer bb1 = segment1.asByteBuffer();
173                 ByteBuffer bb2 = segment2.asByteBuffer();
174                 assertEquals((int)asc1.write(bb1).get(), 10);
175                 assertEquals((int)asc2.read(bb2).get(), 10);
176                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
177             }
178             {   // CompletionHandler variants
179                 ByteBuffer bb1 = segment1.asByteBuffer();
180                 ByteBuffer bb2 = segment2.asByteBuffer();
181                 var writeHandler = new TestHandler();
182                 asc1.write(new ByteBuffer[]{bb1}, 0, 1, 30L, SECONDS, null, writeHandler);
183                 writeHandler.await().assertCompleteWith(10L);
184                 var readHandler = new TestHandler();
185                 asc2.read(new ByteBuffer[]{bb2}, 0, 1, 30L, SECONDS, null, readHandler);
186                 readHandler.await().assertCompleteWith(10L);
187                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
188             }
189             {   // Gathering/Scattering variants
190                 var writeBuffers = mixedBuffersOfSize(16, drop, 32);
191                 var readBuffers = mixedBuffersOfSize(16, drop, 32);
192                 long expectedCount = remaining(writeBuffers);
193                 var writeHandler = new TestHandler();
194                 asc1.write(writeBuffers, 0, 16, 30L, SECONDS, null, writeHandler);
195                 writeHandler.await().assertCompleteWith(expectedCount);
196                 var readHandler = new TestHandler();
197                 asc2.read(readBuffers, 0, 16, 30L, SECONDS, null, readHandler);
198                 readHandler.await().assertCompleteWith(expectedCount);
199                 assertEquals(flip(readBuffers), clear(writeBuffers));
200             }
201         }
202     }
203 
204     /** Tests that a session is not closeable when there is an outstanding read operation. */
205     @Test(dataProvider = "sharedArenasAndTimeouts")
206     public void testCloseWithOutstandingRead(Supplier<Arena> arenaSupplier, int timeout)
207         throws Throwable
208     {
209         try (var asc1 = AsynchronousSocketChannel.open();
210              var assc = AsynchronousServerSocketChannel.open();
211              var asc2 = connectChannels(assc, asc1);
212              var drop = arenaSupplier.get()) {
213             Arena scope = drop;
214             var segment = scope.allocate(10, 1);
215             var bb = segment.asByteBuffer();
216             var bba = new ByteBuffer[] { bb };
217             List<ThrowingConsumer<TestHandler,?>> readOps = List.of(
218                     handler -> handler.propagateHandlerFromFuture(asc1.read(bb)),
219                     handler -> asc1.read(bb, null, handler),
220                     handler -> asc1.read(bb,  timeout, SECONDS, null, handler),
221                     handler -> asc1.read(bba, 0, bba.length, timeout, SECONDS, null, handler)
222             );
223             for (var ioOp : readOps) {
224                 out.println("testCloseWithOutstandingRead - op");
225                 var handler = new TestHandler<Long>();
226                 ioOp.accept(handler);
227                 assertFalse(handler.isDone());
228                 assertTrue(drop.scope().isAlive());
229 
230                 // write to allow the blocking read complete, which will
231                 // in turn unlock the session and allow it to be closed.
232                 asc2.write(ByteBuffer.wrap(new byte[] { 0x01 })).get();
233                 handler.await().assertCompleteWith(1L);
234                 assertTrue(drop.scope().isAlive());
235             }
236         }
237     }
238 
239     /** Tests that a session is not closeable when there is an outstanding write operation. */
240     // Note: limited scenarios are checked, given the 5 sec sleep!
241     @Test(dataProvider = "sharedArenasAndTimeouts")
242     public void testCloseWithOutstandingWrite(Supplier<Arena> arenaSupplier, int timeout)
243          throws Throwable
244     {
245         try (var asc1 = AsynchronousSocketChannel.open();
246              var assc = AsynchronousServerSocketChannel.open();
247              var asc2 = connectChannels(assc, asc1);
248              var drop = arenaSupplier.get()) {
249 
250             // number of bytes written
251             final AtomicLong bytesWritten = new AtomicLong(0);
252             // set to true to signal that no more buffers should be written
253             final AtomicBoolean continueWriting = new AtomicBoolean(true);
254             final AtomicInteger outstandingWriteOps = new AtomicInteger(0);
255 
256             // write until socket buffer is full so as to create the conditions
257             // for when a write does not complete immediately
258             var bba = segmentBuffersOfSize(32, drop, 128);
259             TestHandler<Long> handler;
260             outstandingWriteOps.getAndIncrement();
261             asc1.write(bba, 0, bba.length, timeout, SECONDS, null,
262                     (handler = new TestHandler<>() {
263                         public void completed(Long result, Void att) {
264                             super.completed(result, att);
265                             bytesWritten.addAndGet(result);
266                             if (continueWriting.get()) {
267                                 var bba = segmentBuffersOfSize(32, drop, 128);
268                                 outstandingWriteOps.getAndIncrement();
269                                 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, this);
270                             }
271                             outstandingWriteOps.getAndDecrement();
272                         }
273                     }));
274             // give time for socket buffer to fill up.
275             awaitNoFurtherWrites(bytesWritten);
276 
277             assertTrue(drop.scope().isAlive());
278 
279             // signal handler to stop further writing
280             continueWriting.set(false);
281 
282             // read to allow the outstanding write complete, which will
283             // in turn unlock the session and allow it to be closed.
284             readNBytes(asc2, bytesWritten.get());
285             assertTrue(drop.scope().isAlive());
286             awaitOutstandingWrites(outstandingWriteOps);
287             handler.await();
288         }
289     }
290 
291     /** Waits for outstandingWriteOps to complete (become 0). */
292     static void awaitOutstandingWrites(AtomicInteger outstandingWriteOps) {
293         boolean initial = true;
294         while (outstandingWriteOps.get() > 0 )  {
295             if (initial) {
296                 out.print("awaiting outstanding writes");
297                 initial = false;
298             }
299             out.print(".");
300             Thread.onSpinWait();
301         }
302         out.println("outstanding writes: " + outstandingWriteOps.get());
303     }
304 
305     /** Waits, at most 20secs, for bytesWritten to stabilize. */
306     static void awaitNoFurtherWrites(AtomicLong bytesWritten) throws Exception {
307         int i;
308         long prevN = 0;
309         for (i=0; i<10; i++) {
310             long n = bytesWritten.get();
311             Thread.sleep(2 * 1000);
312             if (bytesWritten.get() == n && prevN == n) {
313                 break;
314             }
315             prevN = n;
316         }
317         out.println("awaitNoFurtherWrites: i=" + i +" , bytesWritten=" + bytesWritten.get());
318     }
319 
320     /** Completion handler that exposes conveniences to assert results. */
321     static class TestHandler<V extends Number> implements CompletionHandler<V, Void> {
322         volatile V result;
323         volatile Throwable throwable;
324         final CountDownLatch latch = new CountDownLatch(1);
325 
326         /** Starts a thread that complete the handled with the Future result. */
327         TestHandler propagateHandlerFromFuture(Future<Integer> future) {
328             Runnable runnable = () -> {
329                 try {
330                     this.completed((V)future.get(), null);
331                 } catch (Throwable t) {
332                     // assert and unwrap exception added by Future
333                     assertTrue(ExecutionException.class.isInstance(t));
334                     t = t.getCause();
335                     assertTrue(IOException.class.isInstance(t));
336                     t = t.getCause();
337                     this.failed(t, null);
338                 }
339             };
340             Thread t = new Thread(runnable);
341             t.start();
342             return this;
343         }
344 
345         @Override
346         public void completed(V result, Void att) {
347             assert result.longValue() >= 0;
348             this.result = result;
349             latch.countDown();
350         }
351         @Override
352         public void failed(Throwable exc, Void att){
353             this.throwable = tolerateIOEOnWindows(exc);
354             latch.countDown();
355         }
356 
357         TestHandler await() throws InterruptedException{
358             latch.await();
359             return this;
360         }
361 
362         TestHandler assertCompleteWith(V value) {
363             assertEquals(result.longValue(), value.longValue());
364             assertEquals(throwable, null);
365             return this;
366         }
367 
368         TestHandler assertFailedWith(Class<? extends Exception> expectedException) {
369             assertTrue(expectedException.isInstance(throwable),
370                        "Expected type:%s, got:%s".formatted(expectedException, throwable) );
371             assertEquals(result, null, "Unexpected result: " + result);
372             return this;
373         }
374 
375         TestHandler assertExceptionMessage(String expectedMessage) {
376             assertEquals(throwable.getMessage(), expectedMessage);
377             return this;
378         }
379 
380         boolean isDone() {
381             return latch.getCount() == 0;
382         }
383     }
384 
385     static AsynchronousSocketChannel connectChannels(AsynchronousServerSocketChannel assc,
386                                                      AsynchronousSocketChannel asc)
387         throws Exception
388     {
389         setBufferSized(assc, asc);
390         assc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391         asc.connect(assc.getLocalAddress()).get();
392         return assc.accept().get();
393     }
394 
395     /** Sets the send/receive buffer sizes in an attempt/hint to limit the
396      * accepted/connected socket buffer sizes. Actual buffer sizes in use will
397      * likely be larger due to TCP auto-tuning, but the hint typically reduces
398      * the overall scaled sizes. This is primarily to stabilize outstanding
399      * write operations.
400      */
401     static void setBufferSized(AsynchronousServerSocketChannel assc,
402                                AsynchronousSocketChannel asc)
403         throws Exception
404     {
405         assc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
406         asc.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
407         asc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
408     }
409 
410     /** Tolerate the additional level of IOException wrapping of unchecked exceptions
411      * On Windows, when completing the completion handler with a failure. */
412     static Throwable tolerateIOEOnWindows(Throwable t) {
413         if (System.getProperty("os.name").startsWith("Windows")) {
414             if (t instanceof IOException)
415                 return t.getCause();
416         }
417         return t;
418     }
419 
420     static void readNBytes(AsynchronousSocketChannel channel, long len)
421         throws Exception
422     {
423         var buf = ByteBuffer.allocateDirect(4096);
424         long total = 0L;
425         do {
426             int n = channel.read(buf).get();
427             assertTrue(n > 0, "got:" + n);
428             buf.clear();
429             total += n;
430         } while (total < len);
431     }
432 }