< prev index next > test/jdk/java/nio/channels/vthread/BlockingChannelOps.java
Print this page
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.stream.Stream;
import jdk.test.lib.thread.VThreadRunner;
- import org.junit.jupiter.api.Test;
+ import jdk.test.lib.thread.VThreadScheduler;
+
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.params.ParameterizedTest;
+ import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.*;
class BlockingChannelOps {
+ private static ExecutorService customScheduler;
+
+ @BeforeAll
+ static void setup() {
+ customScheduler = Executors.newCachedThreadPool();
+ }
+
+ @AfterAll
+ static void finish() {
+ customScheduler.shutdown();
+ }
+
+ /**
+ * Returns the Thread.Builder to create the virtual thread.
+ */
+ static Stream<Thread.Builder.OfVirtual> threadBuilders() {
+ if (VThreadScheduler.supportsCustomScheduler()) {
+ return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
+ } else {
+ return Stream.of(Thread.ofVirtual());
+ }
+ }
+
/**
* SocketChannel read/write, no blocking.
*/
- @Test
- void testSocketChannelReadWrite1() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// write to sc1
}
/**
* Virtual thread blocks in SocketChannel read.
*/
- @Test
- void testSocketChannelRead() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// write to sc1 when current thread blocks in sc2.read
}
/**
* Virtual thread blocks in SocketChannel write.
*/
- @Test
- void testSocketChannelWrite() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// read from sc2 to EOF when current thread blocks in sc1.write
}
/**
* SocketChannel close while virtual thread blocked in read.
*/
- @Test
- void testSocketChannelReadAsyncClose() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
runAfterParkedAsync(sc::close);
try {
int n = sc.read(ByteBuffer.allocate(100));
} catch (AsynchronousCloseException expected) { }
}
});
}
+ /**
+ * SocketChannel shutdownInput while virtual thread blocked in read.
+ */
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
+ try (var connection = new Connection()) {
+ SocketChannel sc = connection.channel1();
+ runAfterParkedAsync(sc::shutdownInput);
+ int n = sc.read(ByteBuffer.allocate(100));
+ assertEquals(-1, n);
+ assertTrue(sc.isOpen());
+ }
+ });
+ }
+
/**
* Virtual thread interrupted while blocked in SocketChannel read.
*/
- @Test
- void testSocketChannelReadInterrupt() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
// interrupt current thread when it blocks in read
Thread thisThread = Thread.currentThread();
}
/**
* SocketChannel close while virtual thread blocked in write.
*/
- @Test
- void testSocketChannelWriteAsyncClose() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
boolean retry = true;
while (retry) {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
}
}
});
}
+
+ /**
+ * SocketChannel shutdownOutput while virtual thread blocked in write.
+ */
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
+ try (var connection = new Connection()) {
+ SocketChannel sc = connection.channel1();
+
+ // shutdown output when current thread blocks in write
+ runAfterParkedAsync(sc::shutdownOutput);
+ try {
+ ByteBuffer bb = ByteBuffer.allocate(100*1024);
+ for (;;) {
+ int n = sc.write(bb);
+ assertTrue(n > 0);
+ bb.clear();
+ }
+ } catch (ClosedChannelException e) {
+ // expected
+ }
+ assertTrue(sc.isOpen());
+ }
+ });
+ }
+
/**
* Virtual thread interrupted while blocked in SocketChannel write.
*/
- @Test
- void testSocketChannelWriteInterrupt() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
boolean retry = true;
while (retry) {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
}
/**
* Virtual thread blocks in SocketChannel adaptor read.
*/
- @Test
- void testSocketAdaptorRead1() throws Exception {
- testSocketAdaptorRead(0);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
+ testSocketAdaptorRead(builder, 0);
}
/**
* Virtual thread blocks in SocketChannel adaptor read with timeout.
*/
- @Test
- void testSocketAdaptorRead2() throws Exception {
- testSocketAdaptorRead(60_000);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
+ testSocketAdaptorRead(builder, 60_000);
}
- private void testSocketAdaptorRead(int timeout) throws Exception {
- VThreadRunner.run(() -> {
+ private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
+ int timeout) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();
// write to sc1 when currnet thread blocks reading from sc2
}
/**
* ServerSocketChannel accept, no blocking.
*/
- @Test
- void testServerSocketChannelAccept1() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc1 = SocketChannel.open(ssc.getLocalAddress());
// accept should not block
var sc2 = ssc.accept();
}
/**
* Virtual thread blocks in ServerSocketChannel accept.
*/
- @Test
- void testServerSocketChannelAccept2() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc1 = SocketChannel.open();
// connect when current thread when it blocks in accept
}
/**
* SeverSocketChannel close while virtual thread blocked in accept.
*/
- @Test
- void testServerSocketChannelAcceptAsyncClose() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var ssc = ServerSocketChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lh, 0));
runAfterParkedAsync(ssc::close);
try {
}
/**
* Virtual thread interrupted while blocked in ServerSocketChannel accept.
*/
- @Test
- void testServerSocketChannelAcceptInterrupt() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var ssc = ServerSocketChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lh, 0));
// interrupt current thread when it blocks in accept
}
/**
* Virtual thread blocks in ServerSocketChannel adaptor accept.
*/
- @Test
- void testSocketChannelAdaptorAccept1() throws Exception {
- testSocketChannelAdaptorAccept(0);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
+ testSocketChannelAdaptorAccept(builder, 0);
}
/**
* Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
*/
- @Test
- void testSocketChannelAdaptorAccept2() throws Exception {
- testSocketChannelAdaptorAccept(60_000);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
+ testSocketChannelAdaptorAccept(builder, 60_000);
}
- private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
- VThreadRunner.run(() -> {
+ private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
+ int timeout) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
var sc = SocketChannel.open();
// interrupt current thread when it blocks in accept
}
/**
* DatagramChannel receive/send, no blocking.
*/
- @Test
- void testDatagramChannelSendReceive1() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));
}
/**
* Virtual thread blocks in DatagramChannel receive.
*/
- @Test
- void testDatagramChannelSendReceive2() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));
}
/**
* DatagramChannel close while virtual thread blocked in receive.
*/
- @Test
- void testDatagramChannelReceiveAsyncClose() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));
runAfterParkedAsync(dc::close);
try {
}
/**
* Virtual thread interrupted while blocked in DatagramChannel receive.
*/
- @Test
- void testDatagramChannelReceiveInterrupt() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));
// interrupt current thread when it blocks in receive
}
/**
* Virtual thread blocks in DatagramSocket adaptor receive.
*/
- @Test
- void testDatagramSocketAdaptorReceive1() throws Exception {
- testDatagramSocketAdaptorReceive(0);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
+ testDatagramSocketAdaptorReceive(builder, 0);
}
/**
* Virtual thread blocks in DatagramSocket adaptor receive with timeout.
*/
- @Test
- void testDatagramSocketAdaptorReceive2() throws Exception {
- testDatagramSocketAdaptorReceive(60_000);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
+ testDatagramSocketAdaptorReceive(builder, 60_000);
}
- private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
- VThreadRunner.run(() -> {
+ private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
+ int timeout) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc2.bind(new InetSocketAddress(lh, 0));
}
/**
* DatagramChannel close while virtual thread blocked in adaptor receive.
*/
- @Test
- void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
- testDatagramSocketAdaptorReceiveAsyncClose(0);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
+ testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
}
/**
* DatagramChannel close while virtual thread blocked in adaptor receive
* with timeout.
*/
- @Test
- void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
- testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
+ testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
}
- private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
- VThreadRunner.run(() -> {
+ private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
+ int timeout) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));
byte[] array = new byte[100];
}
/**
* Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
*/
- @Test
- void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
- testDatagramSocketAdaptorReceiveInterrupt(0);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
+ testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
}
/**
* Virtual thread interrupted while blocked in DatagramSocket adaptor receive
* with timeout.
*/
- @Test
- void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
- testDatagramSocketAdaptorReceiveInterrupt(60_1000);
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
+ testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
}
- private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
- VThreadRunner.run(() -> {
+ private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
+ int timeout) throws Exception {
+ VThreadRunner.run(builder, () -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
dc.bind(new InetSocketAddress(lh, 0));
byte[] array = new byte[100];
}
/**
* Pipe read/write, no blocking.
*/
- @Test
- void testPipeReadWrite1() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// write should not block
}
/**
* Virtual thread blocks in Pipe.SourceChannel read.
*/
- @Test
- void testPipeReadWrite2() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// write from sink when current thread blocks reading from source
}
/**
* Virtual thread blocks in Pipe.SinkChannel write.
*/
- @Test
- void testPipeReadWrite3() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// read from source to EOF when current thread blocking in write
}
/**
* Pipe.SourceChannel close while virtual thread blocked in read.
*/
- @Test
- void testPipeReadAsyncClose() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
runAfterParkedAsync(source::close);
try {
}
/**
* Virtual thread interrupted while blocked in Pipe.SourceChannel read.
*/
- @Test
- void testPipeReadInterrupt() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// interrupt current thread when it blocks reading from source
}
/**
* Pipe.SinkChannel close while virtual thread blocked in write.
*/
- @Test
- void testPipeWriteAsyncClose() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
boolean retry = true;
while (retry) {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
}
/**
* Virtual thread interrupted while blocked in Pipe.SinkChannel write.
*/
- @Test
- void testPipeWriteInterrupt() throws Exception {
- VThreadRunner.run(() -> {
+ @ParameterizedTest
+ @MethodSource("threadBuilders")
+ void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
+ VThreadRunner.run(builder, () -> {
boolean retry = true;
while (retry) {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
< prev index next >