1 /*
  2  * Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @library /test/lib
 27  * @modules java.base/sun.nio.ch
 28  *          jdk.incubator.foreign/jdk.internal.foreign
 29  * @key randomness
 30  * @run testng/othervm TestAsyncSocketChannels
 31  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=true TestAsyncSocketChannels
 32  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=false TestAsyncSocketChannels
 33  */
 34 
 35 import java.io.IOException;
 36 import java.net.InetAddress;
 37 import java.net.InetSocketAddress;
 38 import java.net.StandardSocketOptions;
 39 import java.nio.ByteBuffer;
 40 import java.nio.channels.AsynchronousServerSocketChannel;
 41 import java.nio.channels.AsynchronousSocketChannel;
 42 import java.nio.channels.CompletionHandler;
 43 import java.util.List;
 44 import java.util.concurrent.CountDownLatch;
 45 import java.util.concurrent.ExecutionException;
 46 import java.util.concurrent.Future;
 47 import java.util.concurrent.atomic.AtomicBoolean;
 48 import java.util.concurrent.atomic.AtomicInteger;
 49 import java.util.concurrent.atomic.AtomicLong;
 50 import java.util.function.Supplier;
 51 import jdk.incubator.foreign.MemoryAccess;
 52 import jdk.incubator.foreign.MemorySegment;
 53 import jdk.incubator.foreign.ResourceScope;
 54 import org.testng.annotations.*;
 55 import static java.lang.System.out;
 56 import static java.util.concurrent.TimeUnit.SECONDS;
 57 import static org.testng.Assert.*;
 58 
 59 /**
 60  * Tests consisting of buffer views with asynchronous NIO network channels.
 61  */
 62 public class TestAsyncSocketChannels extends AbstractChannelsTest {
 63 
 64     static final Class<IOException> IOE = IOException.class;
 65     static final Class<ExecutionException> EE = ExecutionException.class;
 66     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 67 
 68     /** Tests that confined scopes are not supported. */
 69     @Test(dataProvider = "confinedScopes")
 70     public void testWithConfined(Supplier<ResourceScope> scopeSupplier)
 71         throws Throwable
 72     {
 73         try (var channel = AsynchronousSocketChannel.open();
 74              var server = AsynchronousServerSocketChannel.open();
 75              var connectedChannel = connectChannels(server, channel);
 76              var scope = scopeSupplier.get()) {
 77             var segment = MemorySegment.allocateNative(10, 1, scope);
 78             var bb = segment.asByteBuffer();
 79             var bba = new ByteBuffer[] { bb };
 80             List<ThrowingConsumer<TestHandler,?>> ioOps = List.of(
 81                     handler -> handler.propagateHandlerFromFuture(channel.write(bb)),
 82                     handler -> handler.propagateHandlerFromFuture(channel.read(bb)),
 83                     handler -> channel.write(bb, null, handler),
 84                     handler -> channel.read( bb, null, handler),
 85                     handler -> channel.write(bb , 0L, SECONDS, null, handler),
 86                     handler -> channel.read( bb,  0L, SECONDS, null, handler),
 87                     handler -> channel.write(bba, 0, bba.length, 0L, SECONDS, null, handler),
 88                     handler -> channel.read( bba, 0, bba.length, 0L, SECONDS, null, handler)
 89             );
 90             for (var ioOp : ioOps) {
 91                 out.println("testAsyncWithConfined - op");
 92                 var handler = new TestHandler();
 93                 ioOp.accept(handler);
 94                 handler.await()
 95                         .assertFailedWith(ISE)
 96                         .assertExceptionMessage("Confined scope not supported");
 97             }
 98         }
 99     }
100 
101     /** Tests that I/O with a closed scope throws a suitable exception. */
102     @Test(dataProvider = "sharedScopesAndTimeouts")
103     public void testIOWithClosedSharedScope(Supplier<ResourceScope> scopeSupplier, int timeout)
104         throws Exception
105     {
106         try (var channel = AsynchronousSocketChannel.open();
107              var server = AsynchronousServerSocketChannel.open();
108              var connectedChannel = connectChannels(server, channel)) {
109             ResourceScope scope = scopeSupplier.get();
110             ByteBuffer bb = segmentBufferOfSize(scope, 64);
111             ByteBuffer[] buffers = segmentBuffersOfSize(8, scope, 32);
112             scope.close();
113             {
114                 assertCauses(expectThrows(EE, () -> connectedChannel.read(bb).get()), IOE, ISE);
115             }
116             {
117                 var handler = new TestHandler<Integer>();
118                 connectedChannel.read(bb, null, handler);
119                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
120             }
121             {
122                 var handler = new TestHandler<Integer>();
123                 connectedChannel.read(bb, timeout, SECONDS, null, handler);
124                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
125             }
126             {
127                 var handler = new TestHandler<Long>();
128                 connectedChannel.read(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
129                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
130             }
131             {
132                 assertCauses(expectThrows(EE, () -> connectedChannel.write(bb).get()), IOE, ISE);
133             }
134             {
135                 var handler = new TestHandler<Integer>();
136                 connectedChannel.write(bb, null, handler);
137                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
138             }
139             {
140                 var handler = new TestHandler<Integer>();
141                 connectedChannel.write(bb, timeout, SECONDS, null, handler);
142                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
143             }
144             {
145                 var handler = new TestHandler<Long>();
146                 connectedChannel.write(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
147                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
148             }
149         }
150     }
151 
152     /** Tests basic I/O operations work with views over implicit and shared scopes. */
153     @Test(dataProvider = "sharedAndImplicitScopes")
154     public void testBasicIOWithSupportedScope(Supplier<ResourceScope> scopeSupplier)
155         throws Exception
156     {
157         ResourceScope scope;
158         try (var asc1 = AsynchronousSocketChannel.open();
159              var assc = AsynchronousServerSocketChannel.open();
160              var asc2 = connectChannels(assc, asc1);
161              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
162             MemorySegment segment1 = MemorySegment.allocateNative(10, 1, scope);
163             MemorySegment segment2 = MemorySegment.allocateNative(10, 1, scope);
164             for (int i = 0; i < 10; i++) {
165                 MemoryAccess.setByteAtOffset(segment1, i, (byte) i);
166             }
167             {   // Future variants
168                 ByteBuffer bb1 = segment1.asByteBuffer();
169                 ByteBuffer bb2 = segment2.asByteBuffer();
170                 assertEquals((int)asc1.write(bb1).get(), 10);
171                 assertEquals((int)asc2.read(bb2).get(), 10);
172                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
173             }
174             {   // CompletionHandler variants
175                 ByteBuffer bb1 = segment1.asByteBuffer();
176                 ByteBuffer bb2 = segment2.asByteBuffer();
177                 var writeHandler = new TestHandler();
178                 asc1.write(new ByteBuffer[]{bb1}, 0, 1, 30L, SECONDS, null, writeHandler);
179                 writeHandler.await().assertCompleteWith(10L);
180                 var readHandler = new TestHandler();
181                 asc2.read(new ByteBuffer[]{bb2}, 0, 1, 30L, SECONDS, null, readHandler);
182                 readHandler.await().assertCompleteWith(10L);
183                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
184             }
185             {   // Gathering/Scattering variants
186                 var writeBuffers = mixedBuffersOfSize(16, scope, 32);
187                 var readBuffers = mixedBuffersOfSize(16, scope, 32);
188                 long expectedCount = remaining(writeBuffers);
189                 var writeHandler = new TestHandler();
190                 asc1.write(writeBuffers, 0, 16, 30L, SECONDS, null, writeHandler);
191                 writeHandler.await().assertCompleteWith(expectedCount);
192                 var readHandler = new TestHandler();
193                 asc2.read(readBuffers, 0, 16, 30L, SECONDS, null, readHandler);
194                 readHandler.await().assertCompleteWith(expectedCount);
195                 assertEquals(flip(readBuffers), clear(writeBuffers));
196             }
197         }
198     }
199 
200     /** Tests that a scope is not closeable when there is an outstanding read operation. */
201     @Test(dataProvider = "sharedScopesAndTimeouts")
202     public void testCloseWithOutstandingRead(Supplier<ResourceScope> scopeSupplier, int timeout)
203         throws Throwable
204     {
205         try (var asc1 = AsynchronousSocketChannel.open();
206              var assc = AsynchronousServerSocketChannel.open();
207              var asc2 = connectChannels(assc, asc1);
208              var scope = scopeSupplier.get()) {
209             var segment = MemorySegment.allocateNative(10, 1, scope);
210             var bb = segment.asByteBuffer();
211             var bba = new ByteBuffer[] { bb };
212             List<ThrowingConsumer<TestHandler,?>> readOps = List.of(
213                     handler -> handler.propagateHandlerFromFuture(asc1.read(bb)),
214                     handler -> asc1.read(bb, null, handler),
215                     handler -> asc1.read(bb,  timeout, SECONDS, null, handler),
216                     handler -> asc1.read(bba, 0, bba.length, timeout, SECONDS, null, handler)
217             );
218             for (var ioOp : readOps) {
219                 out.println("testCloseWithOutstandingRead - op");
220                 var handler = new TestHandler<Long>();
221                 ioOp.accept(handler);
222                 assertFalse(handler.isDone());
223                 assertTrue(scope.isAlive());
224                 assertMessage(expectThrows(ISE, () -> scope.close()), "Scope is acquired by");
225 
226                 // write to allow the blocking read complete, which will
227                 // in turn unlock the scope and allow it to be closed.
228                 asc2.write(ByteBuffer.wrap(new byte[] { 0x01 })).get();
229                 handler.await().assertCompleteWith(1L);
230                 assertTrue(scope.isAlive());
231             }
232         }
233     }
234 
235     /** Tests that a scope is not closeable when there is an outstanding write operation. */
236     // Note: limited scenarios are checked, given the 5 sec sleep!
237     @Test(dataProvider = "sharedScopesAndTimeouts")
238     public void testCloseWithOutstandingWrite(Supplier<ResourceScope> scopeSupplier, int timeout)
239          throws Throwable
240     {
241         try (var asc1 = AsynchronousSocketChannel.open();
242              var assc = AsynchronousServerSocketChannel.open();
243              var asc2 = connectChannels(assc, asc1);
244              var scope = scopeSupplier.get()) {
245 
246             // number of bytes written
247             final AtomicLong bytesWritten = new AtomicLong(0);
248             // set to true to signal that no more buffers should be written
249             final AtomicBoolean continueWriting = new AtomicBoolean(true);
250             final AtomicInteger outstandingWriteOps = new AtomicInteger(0);
251 
252             // write until socket buffer is full so as to create the conditions
253             // for when a write does not complete immediately
254             var bba = segmentBuffersOfSize(32, scope, 128);
255             TestHandler<Long> handler;
256             outstandingWriteOps.getAndIncrement();
257             asc1.write(bba, 0, bba.length, timeout, SECONDS, null,
258                     (handler = new TestHandler<>() {
259                         public void completed(Long result, Void att) {
260                             super.completed(result, att);
261                             bytesWritten.addAndGet(result);
262                             if (continueWriting.get()) {
263                                 var bba = segmentBuffersOfSize(32, scope, 128);
264                                 outstandingWriteOps.getAndIncrement();
265                                 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, this);
266                             }
267                             outstandingWriteOps.getAndDecrement();
268                         }
269                     }));
270             // give time for socket buffer to fill up.
271             awaitNoFurtherWrites(bytesWritten);
272 
273             assertMessage(expectThrows(ISE, () -> scope.close()), "Scope is acquired by");
274             assertTrue(scope.isAlive());
275 
276             // signal handler to stop further writing
277             continueWriting.set(false);
278 
279             // read to allow the outstanding write complete, which will
280             // in turn unlock the scope and allow it to be closed.
281             readNBytes(asc2, bytesWritten.get());
282             assertTrue(scope.isAlive());
283             awaitOutstandingWrites(outstandingWriteOps);
284             handler.await();
285         }
286     }
287 
288     /** Waits for outstandingWriteOps to complete (become 0). */
289     static void awaitOutstandingWrites(AtomicInteger outstandingWriteOps) {
290         boolean initial = true;
291         while (outstandingWriteOps.get() > 0 )  {
292             if (initial) {
293                 out.print("awaiting outstanding writes");
294                 initial = false;
295             }
296             out.print(".");
297             Thread.onSpinWait();
298         }
299         out.println("outstanding writes: " + outstandingWriteOps.get());
300     }
301 
302     /** Waits, at most 20secs, for bytesWritten to stabilize. */
303     static void awaitNoFurtherWrites(AtomicLong bytesWritten) throws Exception {
304         int i;
305         long prevN = 0;
306         for (i=0; i<10; i++) {
307             long n = bytesWritten.get();
308             Thread.sleep(2 * 1000);
309             if (bytesWritten.get() == n && prevN == n) {
310                 break;
311             }
312             prevN = n;
313         }
314         out.println("awaitNoFurtherWrites: i=" + i +" , bytesWritten=" + bytesWritten.get());
315     }
316 
317     /** Completion handler that exposes conveniences to assert results. */
318     static class TestHandler<V extends Number> implements CompletionHandler<V, Void> {
319         volatile V result;
320         volatile Throwable throwable;
321         final CountDownLatch latch = new CountDownLatch(1);
322 
323         /** Starts a thread that complete the handled with the Future result. */
324         TestHandler propagateHandlerFromFuture(Future<Integer> future) {
325             Runnable runnable = () -> {
326                 try {
327                     this.completed((V)future.get(), null);
328                 } catch (Throwable t) {
329                     // assert and unwrap exception added by Future
330                     assertTrue(ExecutionException.class.isInstance(t));
331                     t = t.getCause();
332                     assertTrue(IOException.class.isInstance(t));
333                     t = t.getCause();
334                     this.failed(t, null);
335                 }
336             };
337             Thread t = new Thread(runnable);
338             t.start();
339             return this;
340         }
341 
342         @Override
343         public void completed(V result, Void att) {
344             assert result.longValue() >= 0;
345             this.result = result;
346             latch.countDown();
347         }
348         @Override
349         public void failed(Throwable exc, Void att){
350             this.throwable = tolerateIOEOnWindows(exc);
351             latch.countDown();
352         }
353 
354         TestHandler await() throws InterruptedException{
355             latch.await();
356             return this;
357         }
358 
359         TestHandler assertCompleteWith(V value) {
360             assertEquals(result.longValue(), value.longValue());
361             assertEquals(throwable, null);
362             return this;
363         }
364 
365         TestHandler assertFailedWith(Class<? extends Exception> expectedException) {
366             assertTrue(expectedException.isInstance(throwable),
367                        "Expected type:%s, got:%s".formatted(expectedException, throwable) );
368             assertEquals(result, null, "Unexpected result: " + result);
369             return this;
370         }
371 
372         TestHandler assertExceptionMessage(String expectedMessage) {
373             assertEquals(throwable.getMessage(), expectedMessage);
374             return this;
375         }
376 
377         boolean isDone() {
378             return latch.getCount() == 0;
379         }
380     }
381 
382     static AsynchronousSocketChannel connectChannels(AsynchronousServerSocketChannel assc,
383                                                      AsynchronousSocketChannel asc)
384         throws Exception
385     {
386         setBufferSized(assc, asc);
387         assc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
388         asc.connect(assc.getLocalAddress()).get();
389         return assc.accept().get();
390     }
391 
392     /** Sets the send/receive buffer sizes in an attempt/hint to limit the
393      * accepted/connected socket buffer sizes. Actual buffer sizes in use will
394      * likely be larger due to TCP auto-tuning, but the hint typically reduces
395      * the overall scaled sizes. This is primarily to stabilize outstanding
396      * write operations.
397      */
398     static void setBufferSized(AsynchronousServerSocketChannel assc,
399                                AsynchronousSocketChannel asc)
400         throws Exception
401     {
402         assc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
403         asc.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024);
404         asc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024);
405     }
406 
407     /** Tolerate the additional level of IOException wrapping of unchecked exceptions
408      * On Windows, when completing the completion handler with a failure. */
409     static Throwable tolerateIOEOnWindows(Throwable t) {
410         if (System.getProperty("os.name").startsWith("Windows")) {
411             if (t instanceof IOException)
412                 return t.getCause();
413         }
414         return t;
415     }
416 
417     static void readNBytes(AsynchronousSocketChannel channel, long len)
418         throws Exception
419     {
420         var buf = ByteBuffer.allocateDirect(4096);
421         long total = 0L;
422         do {
423             int n = channel.read(buf).get();
424             assertTrue(n > 0, "got:" + n);
425             buf.clear();
426             total += n;
427         } while (total < len);
428     }
429 }