1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @library /test/lib 27 * @modules java.base/sun.nio.ch 28 * @key randomness 29 * @run testng/othervm TestAsyncSocketChannels 30 * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=true TestAsyncSocketChannels 31 * @run testng/othervm -Dsun.nio.ch.disableSynchronousRead=false TestAsyncSocketChannels 32 */ 33 34 import java.io.IOException; 35 import java.lang.foreign.Arena; 36 import java.lang.foreign.MemorySegment; 37 import java.net.InetAddress; 38 import java.net.InetSocketAddress; 39 import java.net.StandardSocketOptions; 40 import java.nio.ByteBuffer; 41 import java.nio.channels.AsynchronousServerSocketChannel; 42 import java.nio.channels.AsynchronousSocketChannel; 43 import java.nio.channels.CompletionHandler; 44 import java.util.List; 45 import java.util.concurrent.CountDownLatch; 46 import java.util.concurrent.ExecutionException; 47 import java.util.concurrent.Future; 48 import java.util.concurrent.atomic.AtomicBoolean; 49 import java.util.concurrent.atomic.AtomicInteger; 50 import java.util.concurrent.atomic.AtomicLong; 51 import java.util.function.Supplier; 52 53 import org.testng.annotations.*; 54 import static java.lang.System.out; 55 import static java.util.concurrent.TimeUnit.SECONDS; 56 import static java.lang.foreign.ValueLayout.JAVA_BYTE; 57 import static org.testng.Assert.*; 58 59 /** 60 * Tests consisting of buffer views with asynchronous NIO network channels. 61 */ 62 public class TestAsyncSocketChannels extends AbstractChannelsTest { 63 64 static final Class<IOException> IOE = IOException.class; 65 static final Class<ExecutionException> EE = ExecutionException.class; 66 static final Class<IllegalStateException> ISE = IllegalStateException.class; 67 68 /** Tests that confined sessions are not supported. */ 69 @Test(dataProvider = "confinedArenas") 70 public void testWithConfined(Supplier<Arena> arenaSupplier) 71 throws Throwable 72 { 73 try (var channel = AsynchronousSocketChannel.open(); 74 var server = AsynchronousServerSocketChannel.open(); 75 var connectedChannel = connectChannels(server, channel); 76 var drop = arenaSupplier.get()) { 77 Arena scope = drop; 78 var segment = scope.allocate(10, 1); 79 var bb = segment.asByteBuffer(); 80 var bba = new ByteBuffer[] { bb }; 81 List<ThrowingConsumer<TestHandler,?>> ioOps = List.of( 82 handler -> handler.propagateHandlerFromFuture(channel.write(bb)), 83 handler -> handler.propagateHandlerFromFuture(channel.read(bb)), 84 handler -> channel.write(bb, null, handler), 85 handler -> channel.read( bb, null, handler), 86 handler -> channel.write(bb , 0L, SECONDS, null, handler), 87 handler -> channel.read( bb, 0L, SECONDS, null, handler), 88 handler -> channel.write(bba, 0, bba.length, 0L, SECONDS, null, handler), 89 handler -> channel.read( bba, 0, bba.length, 0L, SECONDS, null, handler) 90 ); 91 for (var ioOp : ioOps) { 92 out.println("testAsyncWithConfined - op"); 93 var handler = new TestHandler(); 94 ioOp.accept(handler); 95 handler.await() 96 .assertFailedWith(ISE) 97 .assertExceptionMessage("Confined session not supported"); 98 } 99 } 100 } 101 102 /** Tests that I/O with a closed session throws a suitable exception. */ 103 @Test(dataProvider = "sharedArenasAndTimeouts") 104 public void testIOWithClosedSharedSession(Supplier<Arena> arenaSupplier, int timeout) 105 throws Exception 106 { 107 try (var channel = AsynchronousSocketChannel.open(); 108 var server = AsynchronousServerSocketChannel.open(); 109 var connectedChannel = connectChannels(server, channel)) { 110 Arena drop = arenaSupplier.get(); 111 ByteBuffer bb = segmentBufferOfSize(drop, 64); 112 ByteBuffer[] buffers = segmentBuffersOfSize(8, drop, 32); 113 drop.close(); 114 { 115 assertCauses(expectThrows(EE, () -> connectedChannel.read(bb).get()), IOE, ISE); 116 } 117 { 118 var handler = new TestHandler<Integer>(); 119 connectedChannel.read(bb, null, handler); 120 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed"); 121 } 122 { 123 var handler = new TestHandler<Integer>(); 124 connectedChannel.read(bb, timeout, SECONDS, null, handler); 125 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed"); 126 } 127 { 128 var handler = new TestHandler<Long>(); 129 connectedChannel.read(buffers, 0, buffers.length, timeout, SECONDS, null, handler); 130 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed"); 131 } 132 { 133 assertCauses(expectThrows(EE, () -> connectedChannel.write(bb).get()), IOE, ISE); 134 } 135 { 136 var handler = new TestHandler<Integer>(); 137 connectedChannel.write(bb, null, handler); 138 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed"); 139 } 140 { 141 var handler = new TestHandler<Integer>(); 142 connectedChannel.write(bb, timeout, SECONDS, null, handler); 143 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed"); 144 } 145 { 146 var handler = new TestHandler<Long>(); 147 connectedChannel.write(buffers, 0, buffers.length, timeout, SECONDS, null, handler); 148 handler.await().assertFailedWith(ISE).assertExceptionMessage("Already closed"); 149 } 150 } 151 } 152 153 /** Tests basic I/O operations work with views over implicit and shared sessions. */ 154 @Test(dataProvider = "sharedArenas") 155 public void testBasicIOWithSupportedSession(Supplier<Arena> arenaSupplier) 156 throws Exception 157 { 158 Arena drop; 159 try (var asc1 = AsynchronousSocketChannel.open(); 160 var assc = AsynchronousServerSocketChannel.open(); 161 var asc2 = connectChannels(assc, asc1); 162 var scp = drop = arenaSupplier.get()) { 163 Arena scope1 = drop; 164 MemorySegment segment1 = scope1.allocate(10, 1); 165 Arena scope = drop; 166 MemorySegment segment2 = scope.allocate(10, 1); 167 for (int i = 0; i < 10; i++) { 168 segment1.set(JAVA_BYTE, i, (byte) i); 169 } 170 { // Future variants 171 ByteBuffer bb1 = segment1.asByteBuffer(); 172 ByteBuffer bb2 = segment2.asByteBuffer(); 173 assertEquals((int)asc1.write(bb1).get(), 10); 174 assertEquals((int)asc2.read(bb2).get(), 10); 175 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); 176 } 177 { // CompletionHandler variants 178 ByteBuffer bb1 = segment1.asByteBuffer(); 179 ByteBuffer bb2 = segment2.asByteBuffer(); 180 var writeHandler = new TestHandler(); 181 asc1.write(new ByteBuffer[]{bb1}, 0, 1, 30L, SECONDS, null, writeHandler); 182 writeHandler.await().assertCompleteWith(10L); 183 var readHandler = new TestHandler(); 184 asc2.read(new ByteBuffer[]{bb2}, 0, 1, 30L, SECONDS, null, readHandler); 185 readHandler.await().assertCompleteWith(10L); 186 assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); 187 } 188 { // Gathering/Scattering variants 189 var writeBuffers = mixedBuffersOfSize(16, drop, 32); 190 var readBuffers = mixedBuffersOfSize(16, drop, 32); 191 long expectedCount = remaining(writeBuffers); 192 var writeHandler = new TestHandler(); 193 asc1.write(writeBuffers, 0, 16, 30L, SECONDS, null, writeHandler); 194 writeHandler.await().assertCompleteWith(expectedCount); 195 var readHandler = new TestHandler(); 196 asc2.read(readBuffers, 0, 16, 30L, SECONDS, null, readHandler); 197 readHandler.await().assertCompleteWith(expectedCount); 198 assertEquals(flip(readBuffers), clear(writeBuffers)); 199 } 200 } 201 } 202 203 /** Tests that a session is not closeable when there is an outstanding read operation. */ 204 @Test(dataProvider = "sharedArenasAndTimeouts") 205 public void testCloseWithOutstandingRead(Supplier<Arena> arenaSupplier, int timeout) 206 throws Throwable 207 { 208 try (var asc1 = AsynchronousSocketChannel.open(); 209 var assc = AsynchronousServerSocketChannel.open(); 210 var asc2 = connectChannels(assc, asc1); 211 var drop = arenaSupplier.get()) { 212 Arena scope = drop; 213 var segment = scope.allocate(10, 1); 214 var bb = segment.asByteBuffer(); 215 var bba = new ByteBuffer[] { bb }; 216 List<ThrowingConsumer<TestHandler,?>> readOps = List.of( 217 handler -> handler.propagateHandlerFromFuture(asc1.read(bb)), 218 handler -> asc1.read(bb, null, handler), 219 handler -> asc1.read(bb, timeout, SECONDS, null, handler), 220 handler -> asc1.read(bba, 0, bba.length, timeout, SECONDS, null, handler) 221 ); 222 for (var ioOp : readOps) { 223 out.println("testCloseWithOutstandingRead - op"); 224 var handler = new TestHandler<Long>(); 225 ioOp.accept(handler); 226 assertFalse(handler.isDone()); 227 assertTrue(drop.scope().isAlive()); 228 229 // write to allow the blocking read complete, which will 230 // in turn unlock the session and allow it to be closed. 231 asc2.write(ByteBuffer.wrap(new byte[] { 0x01 })).get(); 232 handler.await().assertCompleteWith(1L); 233 assertTrue(drop.scope().isAlive()); 234 } 235 } 236 } 237 238 /** Tests that a session is not closeable when there is an outstanding write operation. */ 239 // Note: limited scenarios are checked, given the 5 sec sleep! 240 @Test(dataProvider = "sharedArenasAndTimeouts") 241 public void testCloseWithOutstandingWrite(Supplier<Arena> arenaSupplier, int timeout) 242 throws Throwable 243 { 244 try (var asc1 = AsynchronousSocketChannel.open(); 245 var assc = AsynchronousServerSocketChannel.open(); 246 var asc2 = connectChannels(assc, asc1); 247 var drop = arenaSupplier.get()) { 248 249 // number of bytes written 250 final AtomicLong bytesWritten = new AtomicLong(0); 251 // set to true to signal that no more buffers should be written 252 final AtomicBoolean continueWriting = new AtomicBoolean(true); 253 final AtomicInteger outstandingWriteOps = new AtomicInteger(0); 254 255 // write until socket buffer is full so as to create the conditions 256 // for when a write does not complete immediately 257 var bba = segmentBuffersOfSize(32, drop, 128); 258 TestHandler<Long> handler; 259 outstandingWriteOps.getAndIncrement(); 260 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, 261 (handler = new TestHandler<>() { 262 public void completed(Long result, Void att) { 263 super.completed(result, att); 264 bytesWritten.addAndGet(result); 265 if (continueWriting.get()) { 266 var bba = segmentBuffersOfSize(32, drop, 128); 267 outstandingWriteOps.getAndIncrement(); 268 asc1.write(bba, 0, bba.length, timeout, SECONDS, null, this); 269 } 270 outstandingWriteOps.getAndDecrement(); 271 } 272 })); 273 // give time for socket buffer to fill up. 274 awaitNoFurtherWrites(bytesWritten); 275 276 assertTrue(drop.scope().isAlive()); 277 278 // signal handler to stop further writing 279 continueWriting.set(false); 280 281 // read to allow the outstanding write complete, which will 282 // in turn unlock the session and allow it to be closed. 283 readNBytes(asc2, bytesWritten.get()); 284 assertTrue(drop.scope().isAlive()); 285 awaitOutstandingWrites(outstandingWriteOps); 286 handler.await(); 287 } 288 } 289 290 /** Waits for outstandingWriteOps to complete (become 0). */ 291 static void awaitOutstandingWrites(AtomicInteger outstandingWriteOps) { 292 boolean initial = true; 293 while (outstandingWriteOps.get() > 0 ) { 294 if (initial) { 295 out.print("awaiting outstanding writes"); 296 initial = false; 297 } 298 out.print("."); 299 Thread.onSpinWait(); 300 } 301 out.println("outstanding writes: " + outstandingWriteOps.get()); 302 } 303 304 /** Waits, at most 20secs, for bytesWritten to stabilize. */ 305 static void awaitNoFurtherWrites(AtomicLong bytesWritten) throws Exception { 306 int i; 307 long prevN = 0; 308 for (i=0; i<10; i++) { 309 long n = bytesWritten.get(); 310 Thread.sleep(2 * 1000); 311 if (bytesWritten.get() == n && prevN == n) { 312 break; 313 } 314 prevN = n; 315 } 316 out.println("awaitNoFurtherWrites: i=" + i +" , bytesWritten=" + bytesWritten.get()); 317 } 318 319 /** Completion handler that exposes conveniences to assert results. */ 320 static class TestHandler<V extends Number> implements CompletionHandler<V, Void> { 321 volatile V result; 322 volatile Throwable throwable; 323 final CountDownLatch latch = new CountDownLatch(1); 324 325 /** Starts a thread that complete the handled with the Future result. */ 326 TestHandler propagateHandlerFromFuture(Future<Integer> future) { 327 Runnable runnable = () -> { 328 try { 329 this.completed((V)future.get(), null); 330 } catch (Throwable t) { 331 // assert and unwrap exception added by Future 332 assertTrue(ExecutionException.class.isInstance(t)); 333 t = t.getCause(); 334 assertTrue(IOException.class.isInstance(t)); 335 t = t.getCause(); 336 this.failed(t, null); 337 } 338 }; 339 Thread t = new Thread(runnable); 340 t.start(); 341 return this; 342 } 343 344 @Override 345 public void completed(V result, Void att) { 346 assert result.longValue() >= 0; 347 this.result = result; 348 latch.countDown(); 349 } 350 @Override 351 public void failed(Throwable exc, Void att){ 352 this.throwable = tolerateIOEOnWindows(exc); 353 latch.countDown(); 354 } 355 356 TestHandler await() throws InterruptedException{ 357 latch.await(); 358 return this; 359 } 360 361 TestHandler assertCompleteWith(V value) { 362 assertEquals(result.longValue(), value.longValue()); 363 assertEquals(throwable, null); 364 return this; 365 } 366 367 TestHandler assertFailedWith(Class<? extends Exception> expectedException) { 368 assertTrue(expectedException.isInstance(throwable), 369 "Expected type:%s, got:%s".formatted(expectedException, throwable) ); 370 assertEquals(result, null, "Unexpected result: " + result); 371 return this; 372 } 373 374 TestHandler assertExceptionMessage(String expectedMessage) { 375 assertEquals(throwable.getMessage(), expectedMessage); 376 return this; 377 } 378 379 boolean isDone() { 380 return latch.getCount() == 0; 381 } 382 } 383 384 static AsynchronousSocketChannel connectChannels(AsynchronousServerSocketChannel assc, 385 AsynchronousSocketChannel asc) 386 throws Exception 387 { 388 setBufferSized(assc, asc); 389 assc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); 390 asc.connect(assc.getLocalAddress()).get(); 391 return assc.accept().get(); 392 } 393 394 /** Sets the send/receive buffer sizes in an attempt/hint to limit the 395 * accepted/connected socket buffer sizes. Actual buffer sizes in use will 396 * likely be larger due to TCP auto-tuning, but the hint typically reduces 397 * the overall scaled sizes. This is primarily to stabilize outstanding 398 * write operations. 399 */ 400 static void setBufferSized(AsynchronousServerSocketChannel assc, 401 AsynchronousSocketChannel asc) 402 throws Exception 403 { 404 assc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024); 405 asc.setOption(StandardSocketOptions.SO_SNDBUF, 32 * 1024); 406 asc.setOption(StandardSocketOptions.SO_RCVBUF, 32 * 1024); 407 } 408 409 /** Tolerate the additional level of IOException wrapping of unchecked exceptions 410 * On Windows, when completing the completion handler with a failure. */ 411 static Throwable tolerateIOEOnWindows(Throwable t) { 412 if (System.getProperty("os.name").startsWith("Windows")) { 413 if (t instanceof IOException) 414 return t.getCause(); 415 } 416 return t; 417 } 418 419 static void readNBytes(AsynchronousSocketChannel channel, long len) 420 throws Exception 421 { 422 var buf = ByteBuffer.allocateDirect(4096); 423 long total = 0L; 424 do { 425 int n = channel.read(buf).get(); 426 assertTrue(n > 0, "got:" + n); 427 buf.clear(); 428 total += n; 429 } while (total < len); 430 } 431 }