< prev index next >

test/jdk/java/foreign/channels/TestAsyncSocketChannels.java

Print this page

 31  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=true TestAsyncSocketChannels
 32  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=false TestAsyncSocketChannels
 33  */
 34 
 35 import java.io.IOException;
 36 import java.net.InetAddress;
 37 import java.net.InetSocketAddress;
 38 import java.net.StandardSocketOptions;
 39 import java.nio.ByteBuffer;
 40 import java.nio.channels.AsynchronousServerSocketChannel;
 41 import java.nio.channels.AsynchronousSocketChannel;
 42 import java.nio.channels.CompletionHandler;
 43 import java.util.List;
 44 import java.util.concurrent.CountDownLatch;
 45 import java.util.concurrent.ExecutionException;
 46 import java.util.concurrent.Future;
 47 import java.util.concurrent.atomic.AtomicBoolean;
 48 import java.util.concurrent.atomic.AtomicInteger;
 49 import java.util.concurrent.atomic.AtomicLong;
 50 import java.util.function.Supplier;
 51 import jdk.incubator.foreign.MemoryAccess;
 52 import jdk.incubator.foreign.MemorySegment;
 53 import jdk.incubator.foreign.ResourceScope;
 54 import org.testng.annotations.*;
 55 import static java.lang.System.out;
 56 import static java.util.concurrent.TimeUnit.SECONDS;

 57 import static org.testng.Assert.*;
 58 
 59 /**
 60  * Tests consisting of buffer views with asynchronous NIO network channels.
 61  */
 62 public class TestAsyncSocketChannels extends AbstractChannelsTest {
 63 
 64     static final Class<IOException> IOE = IOException.class;
 65     static final Class<ExecutionException> EE = ExecutionException.class;
 66     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 67 
 68     /** Tests that confined scopes are not supported. */
 69     @Test(dataProvider = "confinedScopes")
 70     public void testWithConfined(Supplier<ResourceScope> scopeSupplier)
 71         throws Throwable
 72     {
 73         try (var channel = AsynchronousSocketChannel.open();
 74              var server = AsynchronousServerSocketChannel.open();
 75              var connectedChannel = connectChannels(server, channel);
 76              var scope = scopeSupplier.get()) {

145                 var handler = new TestHandler<Long>();
146                 connectedChannel.write(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
147                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
148             }
149         }
150     }
151 
152     /** Tests basic I/O operations work with views over implicit and shared scopes. */
153     @Test(dataProvider = "sharedAndImplicitScopes")
154     public void testBasicIOWithSupportedScope(Supplier<ResourceScope> scopeSupplier)
155         throws Exception
156     {
157         ResourceScope scope;
158         try (var asc1 = AsynchronousSocketChannel.open();
159              var assc = AsynchronousServerSocketChannel.open();
160              var asc2 = connectChannels(assc, asc1);
161              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
162             MemorySegment segment1 = MemorySegment.allocateNative(10, 1, scope);
163             MemorySegment segment2 = MemorySegment.allocateNative(10, 1, scope);
164             for (int i = 0; i < 10; i++) {
165                 MemoryAccess.setByteAtOffset(segment1, i, (byte) i);
166             }
167             {   // Future variants
168                 ByteBuffer bb1 = segment1.asByteBuffer();
169                 ByteBuffer bb2 = segment2.asByteBuffer();
170                 assertEquals((int)asc1.write(bb1).get(), 10);
171                 assertEquals((int)asc2.read(bb2).get(), 10);
172                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
173             }
174             {   // CompletionHandler variants
175                 ByteBuffer bb1 = segment1.asByteBuffer();
176                 ByteBuffer bb2 = segment2.asByteBuffer();
177                 var writeHandler = new TestHandler();
178                 asc1.write(new ByteBuffer[]{bb1}, 0, 1, 30L, SECONDS, null, writeHandler);
179                 writeHandler.await().assertCompleteWith(10L);
180                 var readHandler = new TestHandler();
181                 asc2.read(new ByteBuffer[]{bb2}, 0, 1, 30L, SECONDS, null, readHandler);
182                 readHandler.await().assertCompleteWith(10L);
183                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
184             }
185             {   // Gathering/Scattering variants

204     {
205         try (var asc1 = AsynchronousSocketChannel.open();
206              var assc = AsynchronousServerSocketChannel.open();
207              var asc2 = connectChannels(assc, asc1);
208              var scope = scopeSupplier.get()) {
209             var segment = MemorySegment.allocateNative(10, 1, scope);
210             var bb = segment.asByteBuffer();
211             var bba = new ByteBuffer[] { bb };
212             List<ThrowingConsumer<TestHandler,?>> readOps = List.of(
213                     handler -> handler.propagateHandlerFromFuture(asc1.read(bb)),
214                     handler -> asc1.read(bb, null, handler),
215                     handler -> asc1.read(bb,  timeout, SECONDS, null, handler),
216                     handler -> asc1.read(bba, 0, bba.length, timeout, SECONDS, null, handler)
217             );
218             for (var ioOp : readOps) {
219                 out.println("testCloseWithOutstandingRead - op");
220                 var handler = new TestHandler<Long>();
221                 ioOp.accept(handler);
222                 assertFalse(handler.isDone());
223                 assertTrue(scope.isAlive());
224                 assertMessage(expectThrows(ISE, () -> scope.close()), "Scope is acquired by");
225 
226                 // write to allow the blocking read complete, which will
227                 // in turn unlock the scope and allow it to be closed.
228                 asc2.write(ByteBuffer.wrap(new byte[] { 0x01 })).get();
229                 handler.await().assertCompleteWith(1L);
230                 assertTrue(scope.isAlive());
231             }
232         }
233     }
234 
235     /** Tests that a scope is not closeable when there is an outstanding write operation. */
236     // Note: limited scenarios are checked, given the 5 sec sleep!
237     @Test(dataProvider = "sharedScopesAndTimeouts")
238     public void testCloseWithOutstandingWrite(Supplier<ResourceScope> scopeSupplier, int timeout)
239          throws Throwable
240     {
241         try (var asc1 = AsynchronousSocketChannel.open();
242              var assc = AsynchronousServerSocketChannel.open();
243              var asc2 = connectChannels(assc, asc1);
244              var scope = scopeSupplier.get()) {

253             // for when a write does not complete immediately
254             var bba = segmentBuffersOfSize(32, scope, 128);
255             TestHandler<Long> handler;
256             outstandingWriteOps.getAndIncrement();
257             asc1.write(bba, 0, bba.length, timeout, SECONDS, null,
258                     (handler = new TestHandler<>() {
259                         public void completed(Long result, Void att) {
260                             super.completed(result, att);
261                             bytesWritten.addAndGet(result);
262                             if (continueWriting.get()) {
263                                 var bba = segmentBuffersOfSize(32, scope, 128);
264                                 outstandingWriteOps.getAndIncrement();
265                                 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, this);
266                             }
267                             outstandingWriteOps.getAndDecrement();
268                         }
269                     }));
270             // give time for socket buffer to fill up.
271             awaitNoFurtherWrites(bytesWritten);
272 
273             assertMessage(expectThrows(ISE, () -> scope.close()), "Scope is acquired by");
274             assertTrue(scope.isAlive());
275 
276             // signal handler to stop further writing
277             continueWriting.set(false);
278 
279             // read to allow the outstanding write complete, which will
280             // in turn unlock the scope and allow it to be closed.
281             readNBytes(asc2, bytesWritten.get());
282             assertTrue(scope.isAlive());
283             awaitOutstandingWrites(outstandingWriteOps);
284             handler.await();
285         }
286     }
287 
288     /** Waits for outstandingWriteOps to complete (become 0). */
289     static void awaitOutstandingWrites(AtomicInteger outstandingWriteOps) {
290         boolean initial = true;
291         while (outstandingWriteOps.get() > 0 )  {
292             if (initial) {
293                 out.print("awaiting outstanding writes");

 31  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=true TestAsyncSocketChannels
 32  * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=false TestAsyncSocketChannels
 33  */
 34 
 35 import java.io.IOException;
 36 import java.net.InetAddress;
 37 import java.net.InetSocketAddress;
 38 import java.net.StandardSocketOptions;
 39 import java.nio.ByteBuffer;
 40 import java.nio.channels.AsynchronousServerSocketChannel;
 41 import java.nio.channels.AsynchronousSocketChannel;
 42 import java.nio.channels.CompletionHandler;
 43 import java.util.List;
 44 import java.util.concurrent.CountDownLatch;
 45 import java.util.concurrent.ExecutionException;
 46 import java.util.concurrent.Future;
 47 import java.util.concurrent.atomic.AtomicBoolean;
 48 import java.util.concurrent.atomic.AtomicInteger;
 49 import java.util.concurrent.atomic.AtomicLong;
 50 import java.util.function.Supplier;
 51 
 52 import jdk.incubator.foreign.MemorySegment;
 53 import jdk.incubator.foreign.ResourceScope;
 54 import org.testng.annotations.*;
 55 import static java.lang.System.out;
 56 import static java.util.concurrent.TimeUnit.SECONDS;
 57 import static jdk.incubator.foreign.ValueLayout.JAVA_BYTE;
 58 import static org.testng.Assert.*;
 59 
 60 /**
 61  * Tests consisting of buffer views with asynchronous NIO network channels.
 62  */
 63 public class TestAsyncSocketChannels extends AbstractChannelsTest {
 64 
 65     static final Class<IOException> IOE = IOException.class;
 66     static final Class<ExecutionException> EE = ExecutionException.class;
 67     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 68 
 69     /** Tests that confined scopes are not supported. */
 70     @Test(dataProvider = "confinedScopes")
 71     public void testWithConfined(Supplier<ResourceScope> scopeSupplier)
 72         throws Throwable
 73     {
 74         try (var channel = AsynchronousSocketChannel.open();
 75              var server = AsynchronousServerSocketChannel.open();
 76              var connectedChannel = connectChannels(server, channel);
 77              var scope = scopeSupplier.get()) {

146                 var handler = new TestHandler<Long>();
147                 connectedChannel.write(buffers, 0, buffers.length, timeout, SECONDS, null, handler);
148                 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed");
149             }
150         }
151     }
152 
153     /** Tests basic I/O operations work with views over implicit and shared scopes. */
154     @Test(dataProvider = "sharedAndImplicitScopes")
155     public void testBasicIOWithSupportedScope(Supplier<ResourceScope> scopeSupplier)
156         throws Exception
157     {
158         ResourceScope scope;
159         try (var asc1 = AsynchronousSocketChannel.open();
160              var assc = AsynchronousServerSocketChannel.open();
161              var asc2 = connectChannels(assc, asc1);
162              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
163             MemorySegment segment1 = MemorySegment.allocateNative(10, 1, scope);
164             MemorySegment segment2 = MemorySegment.allocateNative(10, 1, scope);
165             for (int i = 0; i < 10; i++) {
166                 segment1.set(JAVA_BYTE, i, (byte) i);
167             }
168             {   // Future variants
169                 ByteBuffer bb1 = segment1.asByteBuffer();
170                 ByteBuffer bb2 = segment2.asByteBuffer();
171                 assertEquals((int)asc1.write(bb1).get(), 10);
172                 assertEquals((int)asc2.read(bb2).get(), 10);
173                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
174             }
175             {   // CompletionHandler variants
176                 ByteBuffer bb1 = segment1.asByteBuffer();
177                 ByteBuffer bb2 = segment2.asByteBuffer();
178                 var writeHandler = new TestHandler();
179                 asc1.write(new ByteBuffer[]{bb1}, 0, 1, 30L, SECONDS, null, writeHandler);
180                 writeHandler.await().assertCompleteWith(10L);
181                 var readHandler = new TestHandler();
182                 asc2.read(new ByteBuffer[]{bb2}, 0, 1, 30L, SECONDS, null, readHandler);
183                 readHandler.await().assertCompleteWith(10L);
184                 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
185             }
186             {   // Gathering/Scattering variants

205     {
206         try (var asc1 = AsynchronousSocketChannel.open();
207              var assc = AsynchronousServerSocketChannel.open();
208              var asc2 = connectChannels(assc, asc1);
209              var scope = scopeSupplier.get()) {
210             var segment = MemorySegment.allocateNative(10, 1, scope);
211             var bb = segment.asByteBuffer();
212             var bba = new ByteBuffer[] { bb };
213             List<ThrowingConsumer<TestHandler,?>> readOps = List.of(
214                     handler -> handler.propagateHandlerFromFuture(asc1.read(bb)),
215                     handler -> asc1.read(bb, null, handler),
216                     handler -> asc1.read(bb,  timeout, SECONDS, null, handler),
217                     handler -> asc1.read(bba, 0, bba.length, timeout, SECONDS, null, handler)
218             );
219             for (var ioOp : readOps) {
220                 out.println("testCloseWithOutstandingRead - op");
221                 var handler = new TestHandler<Long>();
222                 ioOp.accept(handler);
223                 assertFalse(handler.isDone());
224                 assertTrue(scope.isAlive());
225                 assertMessage(expectThrows(ISE, () -> scope.close()), "Scope is kept alive by");
226 
227                 // write to allow the blocking read complete, which will
228                 // in turn unlock the scope and allow it to be closed.
229                 asc2.write(ByteBuffer.wrap(new byte[] { 0x01 })).get();
230                 handler.await().assertCompleteWith(1L);
231                 assertTrue(scope.isAlive());
232             }
233         }
234     }
235 
236     /** Tests that a scope is not closeable when there is an outstanding write operation. */
237     // Note: limited scenarios are checked, given the 5 sec sleep!
238     @Test(dataProvider = "sharedScopesAndTimeouts")
239     public void testCloseWithOutstandingWrite(Supplier<ResourceScope> scopeSupplier, int timeout)
240          throws Throwable
241     {
242         try (var asc1 = AsynchronousSocketChannel.open();
243              var assc = AsynchronousServerSocketChannel.open();
244              var asc2 = connectChannels(assc, asc1);
245              var scope = scopeSupplier.get()) {

254             // for when a write does not complete immediately
255             var bba = segmentBuffersOfSize(32, scope, 128);
256             TestHandler<Long> handler;
257             outstandingWriteOps.getAndIncrement();
258             asc1.write(bba, 0, bba.length, timeout, SECONDS, null,
259                     (handler = new TestHandler<>() {
260                         public void completed(Long result, Void att) {
261                             super.completed(result, att);
262                             bytesWritten.addAndGet(result);
263                             if (continueWriting.get()) {
264                                 var bba = segmentBuffersOfSize(32, scope, 128);
265                                 outstandingWriteOps.getAndIncrement();
266                                 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, this);
267                             }
268                             outstandingWriteOps.getAndDecrement();
269                         }
270                     }));
271             // give time for socket buffer to fill up.
272             awaitNoFurtherWrites(bytesWritten);
273 
274             assertMessage(expectThrows(ISE, () -> scope.close()), "Scope is kept alive by");
275             assertTrue(scope.isAlive());
276 
277             // signal handler to stop further writing
278             continueWriting.set(false);
279 
280             // read to allow the outstanding write complete, which will
281             // in turn unlock the scope and allow it to be closed.
282             readNBytes(asc2, bytesWritten.get());
283             assertTrue(scope.isAlive());
284             awaitOutstandingWrites(outstandingWriteOps);
285             handler.await();
286         }
287     }
288 
289     /** Waits for outstandingWriteOps to complete (become 0). */
290     static void awaitOutstandingWrites(AtomicInteger outstandingWriteOps) {
291         boolean initial = true;
292         while (outstandingWriteOps.get() > 0 )  {
293             if (initial) {
294                 out.print("awaiting outstanding writes");
< prev index next >