1 /*
  2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @library /test/lib
 27  * @modules java.base/sun.nio.ch
 28  * @key randomness
 29  * @run testng/othervm TestAsyncSocketChannels
 30  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=true TestAsyncSocketChannels
 31  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=false TestAsyncSocketChannels
 32  */
 33 
 34 import java.io.IOException;
 35 import java.lang.foreign.Arena;
 36 import java.lang.foreign.MemorySegment;
 37 import java.net.InetAddress;
 38 import java.net.InetSocketAddress;
 39 import java.net.StandardSocketOptions;
 40 import java.nio.ByteBuffer;
 41 import java.nio.channels.AsynchronousServerSocketChannel;
 42 import java.nio.channels.AsynchronousSocketChannel;
 43 import java.nio.channels.CompletionHandler;
 44 import java.util.List;
 45 import java.util.concurrent.CountDownLatch;
 46 import java.util.concurrent.ExecutionException;
 47 import java.util.concurrent.Future;
 48 import java.util.concurrent.atomic.AtomicBoolean;
 49 import java.util.concurrent.atomic.AtomicInteger;
 50 import java.util.concurrent.atomic.AtomicLong;
 51 import java.util.function.Supplier;
 52 
 53 import org.testng.annotations.*;
 54 import static java.lang.System.out;
 55 import static java.util.concurrent.TimeUnit.SECONDS;
 56 import static java.lang.foreign.ValueLayout.JAVA_BYTE;
 57 import static org.testng.Assert.*;
 58 
 59 /**
 60  * Tests consisting of buffer views with asynchronous NIO network channels.
 61  */
 62 public class TestAsyncSocketChannels extends AbstractChannelsTest {
 63 
 64     static final Class<IOException> IOE = IOException.class;
 65     static final Class<ExecutionException> EE = ExecutionException.class;
 66     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 67 
 68     /** Tests that confined sessions are not supported. */
 69     @Test(dataProvider = "confinedArenas")
 70     public void testWithConfined(Supplier<Arena> arenaSupplier)
 71         throws Throwable
 72     {
 73         try (var channel = AsynchronousSocketChannel.open();
 74              var server = AsynchronousServerSocketChannel.open();
 75              var connectedChannel = connectChannels(server, channel);
 76              var drop = arenaSupplier.get()) {
 77             Arena scope = drop;
 78             var segment = scope.allocate(10, 1);
 79             var bb = segment.asByteBuffer();
 80             var bba = new ByteBuffer[] { bb };
 81             List<ThrowingConsumer<TestHandler,?>> ioOps = List.of(
 82                     handler -> handler.propagateHandlerFromFuture(channel.write(bb)),
 83                     handler -> handler.propagateHandlerFromFuture(channel.read(bb)),
 84                     handler -> channel.write(bb, null, handler),
 85                     handler -> channel.read( bb, null, handler),
 86                     handler -> channel.write(bb , 0L, SECONDS, null, handler),
 87                     handler -> channel.read( bb,  0L, SECONDS, null, handler),
 88                     handler -> channel.write(bba, 0, bba.length, 0L, SECONDS, null, handler),
 89                     handler -> channel.read( bba, 0, bba.length, 0L, SECONDS, null, handler)
 90             );
 91             for (var ioOp : ioOps) {
 92                 out.println("testAsyncWithConfined - op");
 93                 var handler = new TestHandler();
 94                 ioOp.accept(handler);
 95                 handler.await()
 96                         .assertFailedWith(ISE)
 97                         .assertExceptionMessage("Confined session not supported");
 98             }
 99         }
100     }
101 
102     /** Tests that I/O with a closed session throws a suitable exception. */
103     @Test(dataProvider = "sharedArenasAndTimeouts")
104     public void testIOWithClosedSharedSession(Supplier<Arena> arenaSupplier, int timeout)
105         throws Exception
106     {
107         try (var channel = AsynchronousSocketChannel.open();
108              var server = AsynchronousServerSocketChannel.open();
109              var connectedChannel = connectChannels(server, channel)) {
110             Arena drop = arenaSupplier.get();
111             ByteBuffer bb = segmentBufferOfSize(drop, 64);
112             ByteBuffer[] buffers = segmentBuffersOfSize(8, drop, 32);
113             drop.close();
114             {
115                 assertCauses(expectThrows(EE, () -> connectedChannel.read(bb).get()), IOE, ISE);
116             }
117             {
118                 var handler = new TestHandler<Integer>();
119                 connectedChannel.read(bb, null, handler);
120                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
121             }
122             {
123                 var handler = new TestHandler<Integer>();
124                 connectedChannel.read(bb, timeout, SECONDS, null, handler);
125                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
126             }
127             {
128                 var handler = new TestHandler<Long>();
129                 connectedChannel.read(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
130                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
131             }
132             {
133                 assertCauses(expectThrows(EE, () -> connectedChannel.write(bb).get()), IOE, ISE);
134             }
135             {
136                 var handler = new TestHandler<Integer>();
137                 connectedChannel.write(bb, null, handler);
138                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
139             }
140             {
141                 var handler = new TestHandler<Integer>();
142                 connectedChannel.write(bb, timeout, SECONDS, null, handler);
143                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
144             }
145             {
146                 var handler = new TestHandler<Long>();
147                 connectedChannel.write(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
148                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
149             }
150         }
151     }
152 
153     /** Tests basic I/O operations work with views over implicit and shared sessions. */
154     @Test(dataProvider = "sharedArenas")
155     public void testBasicIOWithSupportedSession(Supplier<Arena> arenaSupplier)
156         throws Exception
157     {
158         Arena drop;
159         try (var asc1 = AsynchronousSocketChannel.open();
160              var assc = AsynchronousServerSocketChannel.open();
161              var asc2 = connectChannels(assc, asc1);
162              var scp = drop = arenaSupplier.get()) {
163             Arena scope1 = drop;
164             MemorySegment segment1 = scope1.allocate(10, 1);
165             Arena scope = drop;
166             MemorySegment segment2 = scope.allocate(10, 1);
167             for (int i = 0; i < 10; i++) {
168                 segment1.set(JAVA_BYTE, i, (byte) i);
169             }
170             {   // Future variants
171                 ByteBuffer bb1 = segment1.asByteBuffer();
172                 ByteBuffer bb2 = segment2.asByteBuffer();
173                 assertEquals((int)asc1.write(bb1).get(), 10);
174                 assertEquals((int)asc2.read(bb2).get(), 10);
175                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
176             }
177             {   // CompletionHandler variants
178                 ByteBuffer bb1 = segment1.asByteBuffer();
179                 ByteBuffer bb2 = segment2.asByteBuffer();
180                 var writeHandler = new TestHandler();
181                 asc1.write(new ByteBuffer[]{bb1}, 0, 1, 30L, SECONDS, null, writeHandler);
182                 writeHandler.await().assertCompleteWith(10L);
183                 var readHandler = new TestHandler();
184                 asc2.read(new ByteBuffer[]{bb2}, 0, 1, 30L, SECONDS, null, readHandler);
185                 readHandler.await().assertCompleteWith(10L);
186                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
187             }
188             {   // Gathering/Scattering variants
189                 var writeBuffers = mixedBuffersOfSize(16, drop, 32);
190                 var readBuffers = mixedBuffersOfSize(16, drop, 32);
191                 long expectedCount = remaining(writeBuffers);
192                 var writeHandler = new TestHandler();
193                 asc1.write(writeBuffers, 0, 16, 30L, SECONDS, null, writeHandler);
194                 writeHandler.await().assertCompleteWith(expectedCount);
195                 var readHandler = new TestHandler();
196                 asc2.read(readBuffers, 0, 16, 30L, SECONDS, null, readHandler);
197                 readHandler.await().assertCompleteWith(expectedCount);
198                 assertEquals(flip(readBuffers), clear(writeBuffers));
199             }
200         }
201     }
202 
203     /** Tests that a session is not closeable when there is an outstanding read operation. */
204     @Test(dataProvider = "sharedArenasAndTimeouts")
205     public void testCloseWithOutstandingRead(Supplier<Arena> arenaSupplier, int timeout)
206         throws Throwable
207     {
208         try (var asc1 = AsynchronousSocketChannel.open();
209              var assc = AsynchronousServerSocketChannel.open();
210              var asc2 = connectChannels(assc, asc1);
211              var drop = arenaSupplier.get()) {
212             Arena scope = drop;
213             var segment = scope.allocate(10, 1);
214             var bb = segment.asByteBuffer();
215             var bba = new ByteBuffer[] { bb };
216             List<ThrowingConsumer<TestHandler,?>> readOps = List.of(
217                     handler -> handler.propagateHandlerFromFuture(asc1.read(bb)),
218                     handler -> asc1.read(bb, null, handler),
219                     handler -> asc1.read(bb,  timeout, SECONDS, null, handler),
220                     handler -> asc1.read(bba, 0, bba.length, timeout, SECONDS, null, handler)
221             );
222             for (var ioOp : readOps) {
223                 out.println("testCloseWithOutstandingRead - op");
224                 var handler = new TestHandler<Long>();
225                 ioOp.accept(handler);
226                 assertFalse(handler.isDone());
227                 assertTrue(drop.scope().isAlive());
228 
229                 // write to allow the blocking read complete, which will
230                 // in turn unlock the session and allow it to be closed.
231                 asc2.write(ByteBuffer.wrap(new byte[] { 0x01 })).get();
232                 handler.await().assertCompleteWith(1L);
233                 assertTrue(drop.scope().isAlive());
234             }
235         }
236     }
237 
238     /** Tests that a session is not closeable when there is an outstanding write operation. */
239     // Note: limited scenarios are checked, given the 5 sec sleep!
240     @Test(dataProvider = "sharedArenasAndTimeouts")
241     public void testCloseWithOutstandingWrite(Supplier<Arena> arenaSupplier, int timeout)
242          throws Throwable
243     {
244         try (var asc1 = AsynchronousSocketChannel.open();
245              var assc = AsynchronousServerSocketChannel.open();
246              var asc2 = connectChannels(assc, asc1);
247              var drop = arenaSupplier.get()) {
248 
249             // number of bytes written
250             final AtomicLong bytesWritten = new AtomicLong(0);
251             // set to true to signal that no more buffers should be written
252             final AtomicBoolean continueWriting = new AtomicBoolean(true);
253             final AtomicInteger outstandingWriteOps = new AtomicInteger(0);
254 
255             // write until socket buffer is full so as to create the conditions
256             // for when a write does not complete immediately
257             var bba = segmentBuffersOfSize(32, drop, 128);
258             TestHandler<Long> handler;
259             outstandingWriteOps.getAndIncrement();
260             asc1.write(bba, 0, bba.length, timeout, SECONDS, null,
261                     (handler = new TestHandler<>() {
262                         public void completed(Long result, Void att) {
263                             super.completed(result, att);
264                             bytesWritten.addAndGet(result);
265                             if (continueWriting.get()) {
266                                 var bba = segmentBuffersOfSize(32, drop, 128);
267                                 outstandingWriteOps.getAndIncrement();
268                                 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, this);
269                             }
270                             outstandingWriteOps.getAndDecrement();
271                         }
272                     }));
273             // give time for socket buffer to fill up.
274             awaitNoFurtherWrites(bytesWritten);
275 
276             assertTrue(drop.scope().isAlive());
277 
278             // signal handler to stop further writing
279             continueWriting.set(false);
280 
281             // read to allow the outstanding write complete, which will
282             // in turn unlock the session and allow it to be closed.
283             readNBytes(asc2, bytesWritten.get());
284             assertTrue(drop.scope().isAlive());
285             awaitOutstandingWrites(outstandingWriteOps);
286             handler.await();
287         }
288     }
289 
290     /** Waits for outstandingWriteOps to complete (become 0). */
291     static void awaitOutstandingWrites(AtomicInteger outstandingWriteOps) {
292         boolean initial = true;
293         while (outstandingWriteOps.get() > 0 )  {
294             if (initial) {
295                 out.print("awaiting outstanding writes");
296                 initial = false;
297             }
298             out.print(".");
299             Thread.onSpinWait();
300         }
301         out.println("outstanding writes: " + outstandingWriteOps.get());
302     }
303 
304     /** Waits, at most 20secs, for bytesWritten to stabilize. */
305     static void awaitNoFurtherWrites(AtomicLong bytesWritten) throws Exception {
306         int i;
307         long prevN = 0;
308         for (i=0; i<10; i++) {
309             long n = bytesWritten.get();
310             Thread.sleep(2 * 1000);
311             if (bytesWritten.get() == n && prevN == n) {
312                 break;
313             }
314             prevN = n;
315         }
316         out.println("awaitNoFurtherWrites: i=" + i +" , bytesWritten=" + bytesWritten.get());
317     }
318 
319     /** Completion handler that exposes conveniences to assert results. */
320     static class TestHandler<V extends Number> implements CompletionHandler<V, Void> {
321         volatile V result;
322         volatile Throwable throwable;
323         final CountDownLatch latch = new CountDownLatch(1);
324 
325         /** Starts a thread that complete the handled with the Future result. */
326         TestHandler propagateHandlerFromFuture(Future<Integer> future) {
327             Runnable runnable = () -> {
328                 try {
329                     this.completed((V)future.get(), null);
330                 } catch (Throwable t) {
331                     // assert and unwrap exception added by Future
332                     assertTrue(ExecutionException.class.isInstance(t));
333                     t = t.getCause();
334                     assertTrue(IOException.class.isInstance(t));
335                     t = t.getCause();
336                     this.failed(t, null);
337                 }
338             };
339             Thread t = new Thread(runnable);
340             t.start();
341             return this;
342         }
343 
344         @Override
345         public void completed(V result, Void att) {
346             assert result.longValue() >= 0;
347             this.result = result;
348             latch.countDown();
349         }
350         @Override
351         public void failed(Throwable exc, Void att){
352             this.throwable = tolerateIOEOnWindows(exc);
353             latch.countDown();
354         }
355 
356         TestHandler await() throws InterruptedException{
357             latch.await();
358             return this;
359         }
360 
361         TestHandler assertCompleteWith(V value) {
362             assertEquals(result.longValue(), value.longValue());
363             assertEquals(throwable, null);
364             return this;
365         }
366 
367         TestHandler assertFailedWith(Class<? extends Exception> expectedException) {
368             assertTrue(expectedException.isInstance(throwable),
369                        "Expected type:%s, got:%s".formatted(expectedException, throwable) );
370             assertEquals(result, null, "Unexpected result: " + result);
371             return this;
372         }
373 
374         TestHandler assertExceptionMessage(String expectedMessage) {
375             assertEquals(throwable.getMessage(), expectedMessage);
376             return this;
377         }
378 
379         boolean isDone() {
380             return latch.getCount() == 0;
381         }
382     }
383 
384     static AsynchronousSocketChannel connectChannels(AsynchronousServerSocketChannel assc,
385                                                      AsynchronousSocketChannel asc)
386         throws Exception
387     {
388         setBufferSized(assc, asc);
389         assc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
390         asc.connect(assc.getLocalAddress()).get();
391         return assc.accept().get();
392     }
393 
394     /** Sets the send/receive buffer sizes in an attempt/hint to limit the
395      * accepted/connected socket buffer sizes. Actual buffer sizes in use will
396      * likely be larger due to TCP auto-tuning, but the hint typically reduces
397      * the overall scaled sizes. This is primarily to stabilize outstanding
398      * write operations.
399      */
400     static void setBufferSized(AsynchronousServerSocketChannel assc,
401                                AsynchronousSocketChannel asc)
402         throws Exception
403     {
404         assc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
405         asc.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
406         asc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
407     }
408 
409     /** Tolerate the additional level of IOException wrapping of unchecked exceptions
410      * On Windows, when completing the completion handler with a failure. */
411     static Throwable tolerateIOEOnWindows(Throwable t) {
412         if (System.getProperty("os.name").startsWith("Windows")) {
413             if (t instanceof IOException)
414                 return t.getCause();
415         }
416         return t;
417     }
418 
419     static void readNBytes(AsynchronousSocketChannel channel, long len)
420         throws Exception
421     {
422         var buf = ByteBuffer.allocateDirect(4096);
423         long total = 0L;
424         do {
425             int n = channel.read(buf).get();
426             assertTrue(n > 0, "got:" + n);
427             buf.clear();
428             total += n;
429         } while (total < len);
430     }
431 }