45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65
66 import jdk.test.lib.thread.VThreadRunner;
67 import org.junit.jupiter.api.Test;
68 import static org.junit.jupiter.api.Assertions.*;
69
70 class BlockingChannelOps {
71
72 /**
73 * SocketChannel read/write, no blocking.
74 */
75 @Test
76 void testSocketChannelReadWrite1() throws Exception {
77 VThreadRunner.run(() -> {
78 try (var connection = new Connection()) {
79 SocketChannel sc1 = connection.channel1();
80 SocketChannel sc2 = connection.channel2();
81
82 // write to sc1
83 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
84 int n = sc1.write(bb);
85 assertTrue(n > 0);
86
87 // read from sc2 should not block
88 bb = ByteBuffer.allocate(10);
89 n = sc2.read(bb);
90 assertTrue(n > 0);
91 assertTrue(bb.get(0) == 'X');
92 }
93 });
94 }
95
96 /**
97 * Virtual thread blocks in SocketChannel read.
98 */
99 @Test
100 void testSocketChannelRead() throws Exception {
101 VThreadRunner.run(() -> {
102 try (var connection = new Connection()) {
103 SocketChannel sc1 = connection.channel1();
104 SocketChannel sc2 = connection.channel2();
105
106 // write to sc1 when current thread blocks in sc2.read
107 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
108 runAfterParkedAsync(() -> sc1.write(bb1));
109
110 // read from sc2 should block
111 ByteBuffer bb2 = ByteBuffer.allocate(10);
112 int n = sc2.read(bb2);
113 assertTrue(n > 0);
114 assertTrue(bb2.get(0) == 'X');
115 }
116 });
117 }
118
119 /**
120 * Virtual thread blocks in SocketChannel write.
121 */
122 @Test
123 void testSocketChannelWrite() throws Exception {
124 VThreadRunner.run(() -> {
125 try (var connection = new Connection()) {
126 SocketChannel sc1 = connection.channel1();
127 SocketChannel sc2 = connection.channel2();
128
129 // read from sc2 to EOF when current thread blocks in sc1.write
130 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
131
132 // write to sc1 should block
133 ByteBuffer bb = ByteBuffer.allocate(100*1024);
134 for (int i=0; i<1000; i++) {
135 int n = sc1.write(bb);
136 assertTrue(n > 0);
137 bb.clear();
138 }
139 sc1.close();
140
141 // wait for reader to finish
142 reader.join();
143 }
144 });
145 }
146
147 /**
148 * SocketChannel close while virtual thread blocked in read.
149 */
150 @Test
151 void testSocketChannelReadAsyncClose() throws Exception {
152 VThreadRunner.run(() -> {
153 try (var connection = new Connection()) {
154 SocketChannel sc = connection.channel1();
155 runAfterParkedAsync(sc::close);
156 try {
157 int n = sc.read(ByteBuffer.allocate(100));
158 fail("read returned " + n);
159 } catch (AsynchronousCloseException expected) { }
160 }
161 });
162 }
163
164 /**
165 * Virtual thread interrupted while blocked in SocketChannel read.
166 */
167 @Test
168 void testSocketChannelReadInterrupt() throws Exception {
169 VThreadRunner.run(() -> {
170 try (var connection = new Connection()) {
171 SocketChannel sc = connection.channel1();
172
173 // interrupt current thread when it blocks in read
174 Thread thisThread = Thread.currentThread();
175 runAfterParkedAsync(thisThread::interrupt);
176
177 try {
178 int n = sc.read(ByteBuffer.allocate(100));
179 fail("read returned " + n);
180 } catch (ClosedByInterruptException expected) {
181 assertTrue(Thread.interrupted());
182 }
183 }
184 });
185 }
186
187 /**
188 * SocketChannel close while virtual thread blocked in write.
189 */
190 @Test
191 void testSocketChannelWriteAsyncClose() throws Exception {
192 VThreadRunner.run(() -> {
193 boolean retry = true;
194 while (retry) {
195 try (var connection = new Connection()) {
196 SocketChannel sc = connection.channel1();
197
198 // close sc when current thread blocks in write
199 runAfterParkedAsync(sc::close);
200 try {
201 ByteBuffer bb = ByteBuffer.allocate(100*1024);
202 for (;;) {
203 int n = sc.write(bb);
204 assertTrue(n > 0);
205 bb.clear();
206 }
207 } catch (AsynchronousCloseException expected) {
208 // closed when blocked in write
209 retry = false;
210 } catch (ClosedChannelException e) {
211 // closed when not blocked in write, need to retry test
212 }
213 }
214 }
215 });
216 }
217
218 /**
219 * Virtual thread interrupted while blocked in SocketChannel write.
220 */
221 @Test
222 void testSocketChannelWriteInterrupt() throws Exception {
223 VThreadRunner.run(() -> {
224 boolean retry = true;
225 while (retry) {
226 try (var connection = new Connection()) {
227 SocketChannel sc = connection.channel1();
228
229 // interrupt current thread when it blocks in write
230 Thread thisThread = Thread.currentThread();
231 runAfterParkedAsync(thisThread::interrupt);
232
233 try {
234 ByteBuffer bb = ByteBuffer.allocate(100*1024);
235 for (;;) {
236 int n = sc.write(bb);
237 assertTrue(n > 0);
238 bb.clear();
239 }
240 } catch (ClosedByInterruptException e) {
241 // closed when blocked in write
242 assertTrue(Thread.interrupted());
243 retry = false;
244 } catch (ClosedChannelException e) {
245 // closed when not blocked in write, need to retry test
246 }
247 }
248 }
249 });
250 }
251
252 /**
253 * Virtual thread blocks in SocketChannel adaptor read.
254 */
255 @Test
256 void testSocketAdaptorRead1() throws Exception {
257 testSocketAdaptorRead(0);
258 }
259
260 /**
261 * Virtual thread blocks in SocketChannel adaptor read with timeout.
262 */
263 @Test
264 void testSocketAdaptorRead2() throws Exception {
265 testSocketAdaptorRead(60_000);
266 }
267
268 private void testSocketAdaptorRead(int timeout) throws Exception {
269 VThreadRunner.run(() -> {
270 try (var connection = new Connection()) {
271 SocketChannel sc1 = connection.channel1();
272 SocketChannel sc2 = connection.channel2();
273
274 // write to sc1 when currnet thread blocks reading from sc2
275 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
276 runAfterParkedAsync(() -> sc1.write(bb));
277
278 // read from sc2 should block
279 byte[] array = new byte[100];
280 if (timeout > 0)
281 sc2.socket().setSoTimeout(timeout);
282 int n = sc2.socket().getInputStream().read(array);
283 assertTrue(n > 0);
284 assertTrue(array[0] == 'X');
285 }
286 });
287 }
288
289 /**
290 * ServerSocketChannel accept, no blocking.
291 */
292 @Test
293 void testServerSocketChannelAccept1() throws Exception {
294 VThreadRunner.run(() -> {
295 try (var ssc = ServerSocketChannel.open()) {
296 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
297 var sc1 = SocketChannel.open(ssc.getLocalAddress());
298 // accept should not block
299 var sc2 = ssc.accept();
300 sc1.close();
301 sc2.close();
302 }
303 });
304 }
305
306 /**
307 * Virtual thread blocks in ServerSocketChannel accept.
308 */
309 @Test
310 void testServerSocketChannelAccept2() throws Exception {
311 VThreadRunner.run(() -> {
312 try (var ssc = ServerSocketChannel.open()) {
313 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
314 var sc1 = SocketChannel.open();
315
316 // connect when current thread when it blocks in accept
317 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
318
319 // accept should block
320 var sc2 = ssc.accept();
321 sc1.close();
322 sc2.close();
323 }
324 });
325 }
326
327 /**
328 * SeverSocketChannel close while virtual thread blocked in accept.
329 */
330 @Test
331 void testServerSocketChannelAcceptAsyncClose() throws Exception {
332 VThreadRunner.run(() -> {
333 try (var ssc = ServerSocketChannel.open()) {
334 InetAddress lh = InetAddress.getLoopbackAddress();
335 ssc.bind(new InetSocketAddress(lh, 0));
336 runAfterParkedAsync(ssc::close);
337 try {
338 SocketChannel sc = ssc.accept();
339 sc.close();
340 fail("connection accepted???");
341 } catch (AsynchronousCloseException expected) { }
342 }
343 });
344 }
345
346 /**
347 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
348 */
349 @Test
350 void testServerSocketChannelAcceptInterrupt() throws Exception {
351 VThreadRunner.run(() -> {
352 try (var ssc = ServerSocketChannel.open()) {
353 InetAddress lh = InetAddress.getLoopbackAddress();
354 ssc.bind(new InetSocketAddress(lh, 0));
355
356 // interrupt current thread when it blocks in accept
357 Thread thisThread = Thread.currentThread();
358 runAfterParkedAsync(thisThread::interrupt);
359
360 try {
361 SocketChannel sc = ssc.accept();
362 sc.close();
363 fail("connection accepted???");
364 } catch (ClosedByInterruptException expected) {
365 assertTrue(Thread.interrupted());
366 }
367 }
368 });
369 }
370
371 /**
372 * Virtual thread blocks in ServerSocketChannel adaptor accept.
373 */
374 @Test
375 void testSocketChannelAdaptorAccept1() throws Exception {
376 testSocketChannelAdaptorAccept(0);
377 }
378
379 /**
380 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
381 */
382 @Test
383 void testSocketChannelAdaptorAccept2() throws Exception {
384 testSocketChannelAdaptorAccept(60_000);
385 }
386
387 private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
388 VThreadRunner.run(() -> {
389 try (var ssc = ServerSocketChannel.open()) {
390 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391 var sc = SocketChannel.open();
392
393 // interrupt current thread when it blocks in accept
394 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
395
396 // accept should block
397 if (timeout > 0)
398 ssc.socket().setSoTimeout(timeout);
399 Socket s = ssc.socket().accept();
400 sc.close();
401 s.close();
402 }
403 });
404 }
405
406 /**
407 * DatagramChannel receive/send, no blocking.
408 */
409 @Test
410 void testDatagramChannelSendReceive1() throws Exception {
411 VThreadRunner.run(() -> {
412 try (DatagramChannel dc1 = DatagramChannel.open();
413 DatagramChannel dc2 = DatagramChannel.open()) {
414
415 InetAddress lh = InetAddress.getLoopbackAddress();
416 dc2.bind(new InetSocketAddress(lh, 0));
417
418 // send should not block
419 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
420 int n = dc1.send(bb, dc2.getLocalAddress());
421 assertTrue(n > 0);
422
423 // receive should not block
424 bb = ByteBuffer.allocate(10);
425 dc2.receive(bb);
426 assertTrue(bb.get(0) == 'X');
427 }
428 });
429 }
430
431 /**
432 * Virtual thread blocks in DatagramChannel receive.
433 */
434 @Test
435 void testDatagramChannelSendReceive2() throws Exception {
436 VThreadRunner.run(() -> {
437 try (DatagramChannel dc1 = DatagramChannel.open();
438 DatagramChannel dc2 = DatagramChannel.open()) {
439
440 InetAddress lh = InetAddress.getLoopbackAddress();
441 dc2.bind(new InetSocketAddress(lh, 0));
442
443 // send from dc1 when current thread blocked in dc2.receive
444 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
445 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
446
447 // read from dc2 should block
448 ByteBuffer bb2 = ByteBuffer.allocate(10);
449 dc2.receive(bb2);
450 assertTrue(bb2.get(0) == 'X');
451 }
452 });
453 }
454
455 /**
456 * DatagramChannel close while virtual thread blocked in receive.
457 */
458 @Test
459 void testDatagramChannelReceiveAsyncClose() throws Exception {
460 VThreadRunner.run(() -> {
461 try (DatagramChannel dc = DatagramChannel.open()) {
462 InetAddress lh = InetAddress.getLoopbackAddress();
463 dc.bind(new InetSocketAddress(lh, 0));
464 runAfterParkedAsync(dc::close);
465 try {
466 dc.receive(ByteBuffer.allocate(100));
467 fail("receive returned");
468 } catch (AsynchronousCloseException expected) { }
469 }
470 });
471 }
472
473 /**
474 * Virtual thread interrupted while blocked in DatagramChannel receive.
475 */
476 @Test
477 void testDatagramChannelReceiveInterrupt() throws Exception {
478 VThreadRunner.run(() -> {
479 try (DatagramChannel dc = DatagramChannel.open()) {
480 InetAddress lh = InetAddress.getLoopbackAddress();
481 dc.bind(new InetSocketAddress(lh, 0));
482
483 // interrupt current thread when it blocks in receive
484 Thread thisThread = Thread.currentThread();
485 runAfterParkedAsync(thisThread::interrupt);
486
487 try {
488 dc.receive(ByteBuffer.allocate(100));
489 fail("receive returned");
490 } catch (ClosedByInterruptException expected) {
491 assertTrue(Thread.interrupted());
492 }
493 }
494 });
495 }
496
497 /**
498 * Virtual thread blocks in DatagramSocket adaptor receive.
499 */
500 @Test
501 void testDatagramSocketAdaptorReceive1() throws Exception {
502 testDatagramSocketAdaptorReceive(0);
503 }
504
505 /**
506 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
507 */
508 @Test
509 void testDatagramSocketAdaptorReceive2() throws Exception {
510 testDatagramSocketAdaptorReceive(60_000);
511 }
512
513 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
514 VThreadRunner.run(() -> {
515 try (DatagramChannel dc1 = DatagramChannel.open();
516 DatagramChannel dc2 = DatagramChannel.open()) {
517
518 InetAddress lh = InetAddress.getLoopbackAddress();
519 dc2.bind(new InetSocketAddress(lh, 0));
520
521 // send from dc1 when current thread blocks in dc2 receive
522 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
524
525 // receive should block
526 byte[] array = new byte[100];
527 DatagramPacket p = new DatagramPacket(array, 0, array.length);
528 if (timeout > 0)
529 dc2.socket().setSoTimeout(timeout);
530 dc2.socket().receive(p);
531 assertTrue(p.getLength() == 3 && array[0] == 'X');
532 }
533 });
534 }
535
536 /**
537 * DatagramChannel close while virtual thread blocked in adaptor receive.
538 */
539 @Test
540 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
541 testDatagramSocketAdaptorReceiveAsyncClose(0);
542 }
543
544 /**
545 * DatagramChannel close while virtual thread blocked in adaptor receive
546 * with timeout.
547 */
548 @Test
549 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
550 testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
551 }
552
553 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
554 VThreadRunner.run(() -> {
555 try (DatagramChannel dc = DatagramChannel.open()) {
556 InetAddress lh = InetAddress.getLoopbackAddress();
557 dc.bind(new InetSocketAddress(lh, 0));
558
559 byte[] array = new byte[100];
560 DatagramPacket p = new DatagramPacket(array, 0, array.length);
561 if (timeout > 0)
562 dc.socket().setSoTimeout(timeout);
563
564 // close channel/socket when current thread blocks in receive
565 runAfterParkedAsync(dc::close);
566
567 assertThrows(SocketException.class, () -> dc.socket().receive(p));
568 }
569 });
570 }
571
572 /**
573 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
574 */
575 @Test
576 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
577 testDatagramSocketAdaptorReceiveInterrupt(0);
578 }
579
580 /**
581 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
582 * with timeout.
583 */
584 @Test
585 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
586 testDatagramSocketAdaptorReceiveInterrupt(60_1000);
587 }
588
589 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
590 VThreadRunner.run(() -> {
591 try (DatagramChannel dc = DatagramChannel.open()) {
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 dc.bind(new InetSocketAddress(lh, 0));
594
595 byte[] array = new byte[100];
596 DatagramPacket p = new DatagramPacket(array, 0, array.length);
597 if (timeout > 0)
598 dc.socket().setSoTimeout(timeout);
599
600 // interrupt current thread when it blocks in receive
601 Thread thisThread = Thread.currentThread();
602 runAfterParkedAsync(thisThread::interrupt);
603
604 try {
605 dc.socket().receive(p);
606 fail();
607 } catch (ClosedByInterruptException expected) {
608 assertTrue(Thread.interrupted());
609 }
610 }
611 });
612 }
613
614 /**
615 * Pipe read/write, no blocking.
616 */
617 @Test
618 void testPipeReadWrite1() throws Exception {
619 VThreadRunner.run(() -> {
620 Pipe p = Pipe.open();
621 try (Pipe.SinkChannel sink = p.sink();
622 Pipe.SourceChannel source = p.source()) {
623
624 // write should not block
625 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
626 int n = sink.write(bb);
627 assertTrue(n > 0);
628
629 // read should not block
630 bb = ByteBuffer.allocate(10);
631 n = source.read(bb);
632 assertTrue(n > 0);
633 assertTrue(bb.get(0) == 'X');
634 }
635 });
636 }
637
638 /**
639 * Virtual thread blocks in Pipe.SourceChannel read.
640 */
641 @Test
642 void testPipeReadWrite2() throws Exception {
643 VThreadRunner.run(() -> {
644 Pipe p = Pipe.open();
645 try (Pipe.SinkChannel sink = p.sink();
646 Pipe.SourceChannel source = p.source()) {
647
648 // write from sink when current thread blocks reading from source
649 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
650 runAfterParkedAsync(() -> sink.write(bb1));
651
652 // read should block
653 ByteBuffer bb2 = ByteBuffer.allocate(10);
654 int n = source.read(bb2);
655 assertTrue(n > 0);
656 assertTrue(bb2.get(0) == 'X');
657 }
658 });
659 }
660
661 /**
662 * Virtual thread blocks in Pipe.SinkChannel write.
663 */
664 @Test
665 void testPipeReadWrite3() throws Exception {
666 VThreadRunner.run(() -> {
667 Pipe p = Pipe.open();
668 try (Pipe.SinkChannel sink = p.sink();
669 Pipe.SourceChannel source = p.source()) {
670
671 // read from source to EOF when current thread blocking in write
672 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
673
674 // write to sink should block
675 ByteBuffer bb = ByteBuffer.allocate(100*1024);
676 for (int i=0; i<1000; i++) {
677 int n = sink.write(bb);
678 assertTrue(n > 0);
679 bb.clear();
680 }
681 sink.close();
682
683 // wait for reader to finish
684 reader.join();
685 }
686 });
687 }
688
689 /**
690 * Pipe.SourceChannel close while virtual thread blocked in read.
691 */
692 @Test
693 void testPipeReadAsyncClose() throws Exception {
694 VThreadRunner.run(() -> {
695 Pipe p = Pipe.open();
696 try (Pipe.SinkChannel sink = p.sink();
697 Pipe.SourceChannel source = p.source()) {
698 runAfterParkedAsync(source::close);
699 try {
700 int n = source.read(ByteBuffer.allocate(100));
701 fail("read returned " + n);
702 } catch (AsynchronousCloseException expected) { }
703 }
704 });
705 }
706
707 /**
708 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
709 */
710 @Test
711 void testPipeReadInterrupt() throws Exception {
712 VThreadRunner.run(() -> {
713 Pipe p = Pipe.open();
714 try (Pipe.SinkChannel sink = p.sink();
715 Pipe.SourceChannel source = p.source()) {
716
717 // interrupt current thread when it blocks reading from source
718 Thread thisThread = Thread.currentThread();
719 runAfterParkedAsync(thisThread::interrupt);
720
721 try {
722 int n = source.read(ByteBuffer.allocate(100));
723 fail("read returned " + n);
724 } catch (ClosedByInterruptException expected) {
725 assertTrue(Thread.interrupted());
726 }
727 }
728 });
729 }
730
731 /**
732 * Pipe.SinkChannel close while virtual thread blocked in write.
733 */
734 @Test
735 void testPipeWriteAsyncClose() throws Exception {
736 VThreadRunner.run(() -> {
737 boolean retry = true;
738 while (retry) {
739 Pipe p = Pipe.open();
740 try (Pipe.SinkChannel sink = p.sink();
741 Pipe.SourceChannel source = p.source()) {
742
743 // close sink when current thread blocks in write
744 runAfterParkedAsync(sink::close);
745 try {
746 ByteBuffer bb = ByteBuffer.allocate(100*1024);
747 for (;;) {
748 int n = sink.write(bb);
749 assertTrue(n > 0);
750 bb.clear();
751 }
752 } catch (AsynchronousCloseException e) {
753 // closed when blocked in write
754 retry = false;
755 } catch (ClosedChannelException e) {
756 // closed when not blocked in write, need to retry test
757 }
758 }
759 }
760 });
761 }
762
763 /**
764 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
765 */
766 @Test
767 void testPipeWriteInterrupt() throws Exception {
768 VThreadRunner.run(() -> {
769 boolean retry = true;
770 while (retry) {
771 Pipe p = Pipe.open();
772 try (Pipe.SinkChannel sink = p.sink();
773 Pipe.SourceChannel source = p.source()) {
774
775 // interrupt current thread when it blocks in write
776 Thread thisThread = Thread.currentThread();
777 runAfterParkedAsync(thisThread::interrupt);
778
779 try {
780 ByteBuffer bb = ByteBuffer.allocate(100*1024);
781 for (;;) {
782 int n = sink.write(bb);
783 assertTrue(n > 0);
784 bb.clear();
785 }
786 } catch (ClosedByInterruptException expected) {
787 // closed when blocked in write
788 assertTrue(Thread.interrupted());
|
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.Executors;
67 import java.util.stream.Stream;
68
69 import jdk.test.lib.thread.VThreadRunner;
70 import jdk.test.lib.thread.VThreadScheduler;
71
72 import org.junit.jupiter.api.BeforeAll;
73 import org.junit.jupiter.api.AfterAll;
74 import org.junit.jupiter.params.ParameterizedTest;
75 import org.junit.jupiter.params.provider.MethodSource;
76 import static org.junit.jupiter.api.Assertions.*;
77
78 class BlockingChannelOps {
79 private static ExecutorService threadPool;
80 private static Thread.VirtualThreadScheduler customScheduler;
81
82 @BeforeAll
83 static void setup() {
84 threadPool = Executors.newCachedThreadPool();
85 customScheduler = (_, task) -> threadPool.execute(task);
86 }
87
88 @AfterAll
89 static void finish() {
90 threadPool.shutdown();
91 }
92
93 /**
94 * Returns the Thread.Builder to create the virtual thread.
95 */
96 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
97 if (VThreadScheduler.supportsCustomScheduler()) {
98 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
99 } else {
100 return Stream.of(Thread.ofVirtual());
101 }
102 }
103
104 /**
105 * SocketChannel read/write, no blocking.
106 */
107 @ParameterizedTest
108 @MethodSource("threadBuilders")
109 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
110 VThreadRunner.run(builder, () -> {
111 try (var connection = new Connection()) {
112 SocketChannel sc1 = connection.channel1();
113 SocketChannel sc2 = connection.channel2();
114
115 // write to sc1
116 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
117 int n = sc1.write(bb);
118 assertTrue(n > 0);
119
120 // read from sc2 should not block
121 bb = ByteBuffer.allocate(10);
122 n = sc2.read(bb);
123 assertTrue(n > 0);
124 assertTrue(bb.get(0) == 'X');
125 }
126 });
127 }
128
129 /**
130 * Virtual thread blocks in SocketChannel read.
131 */
132 @ParameterizedTest
133 @MethodSource("threadBuilders")
134 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
135 VThreadRunner.run(builder, () -> {
136 try (var connection = new Connection()) {
137 SocketChannel sc1 = connection.channel1();
138 SocketChannel sc2 = connection.channel2();
139
140 // write to sc1 when current thread blocks in sc2.read
141 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
142 runAfterParkedAsync(() -> sc1.write(bb1));
143
144 // read from sc2 should block
145 ByteBuffer bb2 = ByteBuffer.allocate(10);
146 int n = sc2.read(bb2);
147 assertTrue(n > 0);
148 assertTrue(bb2.get(0) == 'X');
149 }
150 });
151 }
152
153 /**
154 * Virtual thread blocks in SocketChannel write.
155 */
156 @ParameterizedTest
157 @MethodSource("threadBuilders")
158 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
159 VThreadRunner.run(builder, () -> {
160 try (var connection = new Connection()) {
161 SocketChannel sc1 = connection.channel1();
162 SocketChannel sc2 = connection.channel2();
163
164 // read from sc2 to EOF when current thread blocks in sc1.write
165 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
166
167 // write to sc1 should block
168 ByteBuffer bb = ByteBuffer.allocate(100*1024);
169 for (int i=0; i<1000; i++) {
170 int n = sc1.write(bb);
171 assertTrue(n > 0);
172 bb.clear();
173 }
174 sc1.close();
175
176 // wait for reader to finish
177 reader.join();
178 }
179 });
180 }
181
182 /**
183 * SocketChannel close while virtual thread blocked in read.
184 */
185 @ParameterizedTest
186 @MethodSource("threadBuilders")
187 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
188 VThreadRunner.run(builder, () -> {
189 try (var connection = new Connection()) {
190 SocketChannel sc = connection.channel1();
191 runAfterParkedAsync(sc::close);
192 try {
193 int n = sc.read(ByteBuffer.allocate(100));
194 fail("read returned " + n);
195 } catch (AsynchronousCloseException expected) { }
196 }
197 });
198 }
199
200 /**
201 * SocketChannel shutdownInput while virtual thread blocked in read.
202 */
203 @ParameterizedTest
204 @MethodSource("threadBuilders")
205 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
206 VThreadRunner.run(builder, () -> {
207 try (var connection = new Connection()) {
208 SocketChannel sc = connection.channel1();
209 runAfterParkedAsync(sc::shutdownInput);
210 int n = sc.read(ByteBuffer.allocate(100));
211 assertEquals(-1, n);
212 assertTrue(sc.isOpen());
213 }
214 });
215 }
216
217 /**
218 * Virtual thread interrupted while blocked in SocketChannel read.
219 */
220 @ParameterizedTest
221 @MethodSource("threadBuilders")
222 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
223 VThreadRunner.run(builder, () -> {
224 try (var connection = new Connection()) {
225 SocketChannel sc = connection.channel1();
226
227 // interrupt current thread when it blocks in read
228 Thread thisThread = Thread.currentThread();
229 runAfterParkedAsync(thisThread::interrupt);
230
231 try {
232 int n = sc.read(ByteBuffer.allocate(100));
233 fail("read returned " + n);
234 } catch (ClosedByInterruptException expected) {
235 assertTrue(Thread.interrupted());
236 }
237 }
238 });
239 }
240
241 /**
242 * SocketChannel close while virtual thread blocked in write.
243 */
244 @ParameterizedTest
245 @MethodSource("threadBuilders")
246 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
247 VThreadRunner.run(builder, () -> {
248 boolean retry = true;
249 while (retry) {
250 try (var connection = new Connection()) {
251 SocketChannel sc = connection.channel1();
252
253 // close sc when current thread blocks in write
254 runAfterParkedAsync(sc::close);
255 try {
256 ByteBuffer bb = ByteBuffer.allocate(100*1024);
257 for (;;) {
258 int n = sc.write(bb);
259 assertTrue(n > 0);
260 bb.clear();
261 }
262 } catch (AsynchronousCloseException expected) {
263 // closed when blocked in write
264 retry = false;
265 } catch (ClosedChannelException e) {
266 // closed when not blocked in write, need to retry test
267 }
268 }
269 }
270 });
271 }
272
273
274 /**
275 * SocketChannel shutdownOutput while virtual thread blocked in write.
276 */
277 @ParameterizedTest
278 @MethodSource("threadBuilders")
279 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
280 VThreadRunner.run(builder, () -> {
281 try (var connection = new Connection()) {
282 SocketChannel sc = connection.channel1();
283
284 // shutdown output when current thread blocks in write
285 runAfterParkedAsync(sc::shutdownOutput);
286 try {
287 ByteBuffer bb = ByteBuffer.allocate(100*1024);
288 for (;;) {
289 int n = sc.write(bb);
290 assertTrue(n > 0);
291 bb.clear();
292 }
293 } catch (ClosedChannelException e) {
294 // expected
295 }
296 assertTrue(sc.isOpen());
297 }
298 });
299 }
300
301 /**
302 * Virtual thread interrupted while blocked in SocketChannel write.
303 */
304 @ParameterizedTest
305 @MethodSource("threadBuilders")
306 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
307 VThreadRunner.run(builder, () -> {
308 boolean retry = true;
309 while (retry) {
310 try (var connection = new Connection()) {
311 SocketChannel sc = connection.channel1();
312
313 // interrupt current thread when it blocks in write
314 Thread thisThread = Thread.currentThread();
315 runAfterParkedAsync(thisThread::interrupt);
316
317 try {
318 ByteBuffer bb = ByteBuffer.allocate(100*1024);
319 for (;;) {
320 int n = sc.write(bb);
321 assertTrue(n > 0);
322 bb.clear();
323 }
324 } catch (ClosedByInterruptException e) {
325 // closed when blocked in write
326 assertTrue(Thread.interrupted());
327 retry = false;
328 } catch (ClosedChannelException e) {
329 // closed when not blocked in write, need to retry test
330 }
331 }
332 }
333 });
334 }
335
336 /**
337 * Virtual thread blocks in SocketChannel adaptor read.
338 */
339 @ParameterizedTest
340 @MethodSource("threadBuilders")
341 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
342 testSocketAdaptorRead(builder, 0);
343 }
344
345 /**
346 * Virtual thread blocks in SocketChannel adaptor read with timeout.
347 */
348 @ParameterizedTest
349 @MethodSource("threadBuilders")
350 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
351 testSocketAdaptorRead(builder, 60_000);
352 }
353
354 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
355 int timeout) throws Exception {
356 VThreadRunner.run(builder, () -> {
357 try (var connection = new Connection()) {
358 SocketChannel sc1 = connection.channel1();
359 SocketChannel sc2 = connection.channel2();
360
361 // write to sc1 when currnet thread blocks reading from sc2
362 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
363 runAfterParkedAsync(() -> sc1.write(bb));
364
365 // read from sc2 should block
366 byte[] array = new byte[100];
367 if (timeout > 0)
368 sc2.socket().setSoTimeout(timeout);
369 int n = sc2.socket().getInputStream().read(array);
370 assertTrue(n > 0);
371 assertTrue(array[0] == 'X');
372 }
373 });
374 }
375
376 /**
377 * ServerSocketChannel accept, no blocking.
378 */
379 @ParameterizedTest
380 @MethodSource("threadBuilders")
381 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
382 VThreadRunner.run(builder, () -> {
383 try (var ssc = ServerSocketChannel.open()) {
384 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
385 var sc1 = SocketChannel.open(ssc.getLocalAddress());
386 // accept should not block
387 var sc2 = ssc.accept();
388 sc1.close();
389 sc2.close();
390 }
391 });
392 }
393
394 /**
395 * Virtual thread blocks in ServerSocketChannel accept.
396 */
397 @ParameterizedTest
398 @MethodSource("threadBuilders")
399 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
400 VThreadRunner.run(builder, () -> {
401 try (var ssc = ServerSocketChannel.open()) {
402 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
403 var sc1 = SocketChannel.open();
404
405 // connect when current thread when it blocks in accept
406 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
407
408 // accept should block
409 var sc2 = ssc.accept();
410 sc1.close();
411 sc2.close();
412 }
413 });
414 }
415
416 /**
417 * SeverSocketChannel close while virtual thread blocked in accept.
418 */
419 @ParameterizedTest
420 @MethodSource("threadBuilders")
421 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
422 VThreadRunner.run(builder, () -> {
423 try (var ssc = ServerSocketChannel.open()) {
424 InetAddress lh = InetAddress.getLoopbackAddress();
425 ssc.bind(new InetSocketAddress(lh, 0));
426 runAfterParkedAsync(ssc::close);
427 try {
428 SocketChannel sc = ssc.accept();
429 sc.close();
430 fail("connection accepted???");
431 } catch (AsynchronousCloseException expected) { }
432 }
433 });
434 }
435
436 /**
437 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
438 */
439 @ParameterizedTest
440 @MethodSource("threadBuilders")
441 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
442 VThreadRunner.run(builder, () -> {
443 try (var ssc = ServerSocketChannel.open()) {
444 InetAddress lh = InetAddress.getLoopbackAddress();
445 ssc.bind(new InetSocketAddress(lh, 0));
446
447 // interrupt current thread when it blocks in accept
448 Thread thisThread = Thread.currentThread();
449 runAfterParkedAsync(thisThread::interrupt);
450
451 try {
452 SocketChannel sc = ssc.accept();
453 sc.close();
454 fail("connection accepted???");
455 } catch (ClosedByInterruptException expected) {
456 assertTrue(Thread.interrupted());
457 }
458 }
459 });
460 }
461
462 /**
463 * Virtual thread blocks in ServerSocketChannel adaptor accept.
464 */
465 @ParameterizedTest
466 @MethodSource("threadBuilders")
467 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
468 testSocketChannelAdaptorAccept(builder, 0);
469 }
470
471 /**
472 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
473 */
474 @ParameterizedTest
475 @MethodSource("threadBuilders")
476 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
477 testSocketChannelAdaptorAccept(builder, 60_000);
478 }
479
480 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
481 int timeout) throws Exception {
482 VThreadRunner.run(builder, () -> {
483 try (var ssc = ServerSocketChannel.open()) {
484 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
485 var sc = SocketChannel.open();
486
487 // interrupt current thread when it blocks in accept
488 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
489
490 // accept should block
491 if (timeout > 0)
492 ssc.socket().setSoTimeout(timeout);
493 Socket s = ssc.socket().accept();
494 sc.close();
495 s.close();
496 }
497 });
498 }
499
500 /**
501 * DatagramChannel receive/send, no blocking.
502 */
503 @ParameterizedTest
504 @MethodSource("threadBuilders")
505 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
506 VThreadRunner.run(builder, () -> {
507 try (DatagramChannel dc1 = DatagramChannel.open();
508 DatagramChannel dc2 = DatagramChannel.open()) {
509
510 InetAddress lh = InetAddress.getLoopbackAddress();
511 dc2.bind(new InetSocketAddress(lh, 0));
512
513 // send should not block
514 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
515 int n = dc1.send(bb, dc2.getLocalAddress());
516 assertTrue(n > 0);
517
518 // receive should not block
519 bb = ByteBuffer.allocate(10);
520 dc2.receive(bb);
521 assertTrue(bb.get(0) == 'X');
522 }
523 });
524 }
525
526 /**
527 * Virtual thread blocks in DatagramChannel receive.
528 */
529 @ParameterizedTest
530 @MethodSource("threadBuilders")
531 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
532 VThreadRunner.run(builder, () -> {
533 try (DatagramChannel dc1 = DatagramChannel.open();
534 DatagramChannel dc2 = DatagramChannel.open()) {
535
536 InetAddress lh = InetAddress.getLoopbackAddress();
537 dc2.bind(new InetSocketAddress(lh, 0));
538
539 // send from dc1 when current thread blocked in dc2.receive
540 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
541 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
542
543 // read from dc2 should block
544 ByteBuffer bb2 = ByteBuffer.allocate(10);
545 dc2.receive(bb2);
546 assertTrue(bb2.get(0) == 'X');
547 }
548 });
549 }
550
551 /**
552 * DatagramChannel close while virtual thread blocked in receive.
553 */
554 @ParameterizedTest
555 @MethodSource("threadBuilders")
556 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
557 VThreadRunner.run(builder, () -> {
558 try (DatagramChannel dc = DatagramChannel.open()) {
559 InetAddress lh = InetAddress.getLoopbackAddress();
560 dc.bind(new InetSocketAddress(lh, 0));
561 runAfterParkedAsync(dc::close);
562 try {
563 dc.receive(ByteBuffer.allocate(100));
564 fail("receive returned");
565 } catch (AsynchronousCloseException expected) { }
566 }
567 });
568 }
569
570 /**
571 * Virtual thread interrupted while blocked in DatagramChannel receive.
572 */
573 @ParameterizedTest
574 @MethodSource("threadBuilders")
575 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
576 VThreadRunner.run(builder, () -> {
577 try (DatagramChannel dc = DatagramChannel.open()) {
578 InetAddress lh = InetAddress.getLoopbackAddress();
579 dc.bind(new InetSocketAddress(lh, 0));
580
581 // interrupt current thread when it blocks in receive
582 Thread thisThread = Thread.currentThread();
583 runAfterParkedAsync(thisThread::interrupt);
584
585 try {
586 dc.receive(ByteBuffer.allocate(100));
587 fail("receive returned");
588 } catch (ClosedByInterruptException expected) {
589 assertTrue(Thread.interrupted());
590 }
591 }
592 });
593 }
594
595 /**
596 * Virtual thread blocks in DatagramSocket adaptor receive.
597 */
598 @ParameterizedTest
599 @MethodSource("threadBuilders")
600 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
601 testDatagramSocketAdaptorReceive(builder, 0);
602 }
603
604 /**
605 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
606 */
607 @ParameterizedTest
608 @MethodSource("threadBuilders")
609 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
610 testDatagramSocketAdaptorReceive(builder, 60_000);
611 }
612
613 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
614 int timeout) throws Exception {
615 VThreadRunner.run(builder, () -> {
616 try (DatagramChannel dc1 = DatagramChannel.open();
617 DatagramChannel dc2 = DatagramChannel.open()) {
618
619 InetAddress lh = InetAddress.getLoopbackAddress();
620 dc2.bind(new InetSocketAddress(lh, 0));
621
622 // send from dc1 when current thread blocks in dc2 receive
623 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
624 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
625
626 // receive should block
627 byte[] array = new byte[100];
628 DatagramPacket p = new DatagramPacket(array, 0, array.length);
629 if (timeout > 0)
630 dc2.socket().setSoTimeout(timeout);
631 dc2.socket().receive(p);
632 assertTrue(p.getLength() == 3 && array[0] == 'X');
633 }
634 });
635 }
636
637 /**
638 * DatagramChannel close while virtual thread blocked in adaptor receive.
639 */
640 @ParameterizedTest
641 @MethodSource("threadBuilders")
642 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
643 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
644 }
645
646 /**
647 * DatagramChannel close while virtual thread blocked in adaptor receive
648 * with timeout.
649 */
650 @ParameterizedTest
651 @MethodSource("threadBuilders")
652 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
653 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
654 }
655
656 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
657 int timeout) throws Exception {
658 VThreadRunner.run(builder, () -> {
659 try (DatagramChannel dc = DatagramChannel.open()) {
660 InetAddress lh = InetAddress.getLoopbackAddress();
661 dc.bind(new InetSocketAddress(lh, 0));
662
663 byte[] array = new byte[100];
664 DatagramPacket p = new DatagramPacket(array, 0, array.length);
665 if (timeout > 0)
666 dc.socket().setSoTimeout(timeout);
667
668 // close channel/socket when current thread blocks in receive
669 runAfterParkedAsync(dc::close);
670
671 assertThrows(SocketException.class, () -> dc.socket().receive(p));
672 }
673 });
674 }
675
676 /**
677 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
678 */
679 @ParameterizedTest
680 @MethodSource("threadBuilders")
681 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
682 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
683 }
684
685 /**
686 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
687 * with timeout.
688 */
689 @ParameterizedTest
690 @MethodSource("threadBuilders")
691 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
692 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
693 }
694
695 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
696 int timeout) throws Exception {
697 VThreadRunner.run(builder, () -> {
698 try (DatagramChannel dc = DatagramChannel.open()) {
699 InetAddress lh = InetAddress.getLoopbackAddress();
700 dc.bind(new InetSocketAddress(lh, 0));
701
702 byte[] array = new byte[100];
703 DatagramPacket p = new DatagramPacket(array, 0, array.length);
704 if (timeout > 0)
705 dc.socket().setSoTimeout(timeout);
706
707 // interrupt current thread when it blocks in receive
708 Thread thisThread = Thread.currentThread();
709 runAfterParkedAsync(thisThread::interrupt);
710
711 try {
712 dc.socket().receive(p);
713 fail();
714 } catch (ClosedByInterruptException expected) {
715 assertTrue(Thread.interrupted());
716 }
717 }
718 });
719 }
720
721 /**
722 * Pipe read/write, no blocking.
723 */
724 @ParameterizedTest
725 @MethodSource("threadBuilders")
726 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
727 VThreadRunner.run(builder, () -> {
728 Pipe p = Pipe.open();
729 try (Pipe.SinkChannel sink = p.sink();
730 Pipe.SourceChannel source = p.source()) {
731
732 // write should not block
733 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
734 int n = sink.write(bb);
735 assertTrue(n > 0);
736
737 // read should not block
738 bb = ByteBuffer.allocate(10);
739 n = source.read(bb);
740 assertTrue(n > 0);
741 assertTrue(bb.get(0) == 'X');
742 }
743 });
744 }
745
746 /**
747 * Virtual thread blocks in Pipe.SourceChannel read.
748 */
749 @ParameterizedTest
750 @MethodSource("threadBuilders")
751 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
752 VThreadRunner.run(builder, () -> {
753 Pipe p = Pipe.open();
754 try (Pipe.SinkChannel sink = p.sink();
755 Pipe.SourceChannel source = p.source()) {
756
757 // write from sink when current thread blocks reading from source
758 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
759 runAfterParkedAsync(() -> sink.write(bb1));
760
761 // read should block
762 ByteBuffer bb2 = ByteBuffer.allocate(10);
763 int n = source.read(bb2);
764 assertTrue(n > 0);
765 assertTrue(bb2.get(0) == 'X');
766 }
767 });
768 }
769
770 /**
771 * Virtual thread blocks in Pipe.SinkChannel write.
772 */
773 @ParameterizedTest
774 @MethodSource("threadBuilders")
775 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
776 VThreadRunner.run(builder, () -> {
777 Pipe p = Pipe.open();
778 try (Pipe.SinkChannel sink = p.sink();
779 Pipe.SourceChannel source = p.source()) {
780
781 // read from source to EOF when current thread blocking in write
782 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
783
784 // write to sink should block
785 ByteBuffer bb = ByteBuffer.allocate(100*1024);
786 for (int i=0; i<1000; i++) {
787 int n = sink.write(bb);
788 assertTrue(n > 0);
789 bb.clear();
790 }
791 sink.close();
792
793 // wait for reader to finish
794 reader.join();
795 }
796 });
797 }
798
799 /**
800 * Pipe.SourceChannel close while virtual thread blocked in read.
801 */
802 @ParameterizedTest
803 @MethodSource("threadBuilders")
804 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
805 VThreadRunner.run(builder, () -> {
806 Pipe p = Pipe.open();
807 try (Pipe.SinkChannel sink = p.sink();
808 Pipe.SourceChannel source = p.source()) {
809 runAfterParkedAsync(source::close);
810 try {
811 int n = source.read(ByteBuffer.allocate(100));
812 fail("read returned " + n);
813 } catch (AsynchronousCloseException expected) { }
814 }
815 });
816 }
817
818 /**
819 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
820 */
821 @ParameterizedTest
822 @MethodSource("threadBuilders")
823 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
824 VThreadRunner.run(builder, () -> {
825 Pipe p = Pipe.open();
826 try (Pipe.SinkChannel sink = p.sink();
827 Pipe.SourceChannel source = p.source()) {
828
829 // interrupt current thread when it blocks reading from source
830 Thread thisThread = Thread.currentThread();
831 runAfterParkedAsync(thisThread::interrupt);
832
833 try {
834 int n = source.read(ByteBuffer.allocate(100));
835 fail("read returned " + n);
836 } catch (ClosedByInterruptException expected) {
837 assertTrue(Thread.interrupted());
838 }
839 }
840 });
841 }
842
843 /**
844 * Pipe.SinkChannel close while virtual thread blocked in write.
845 */
846 @ParameterizedTest
847 @MethodSource("threadBuilders")
848 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
849 VThreadRunner.run(builder, () -> {
850 boolean retry = true;
851 while (retry) {
852 Pipe p = Pipe.open();
853 try (Pipe.SinkChannel sink = p.sink();
854 Pipe.SourceChannel source = p.source()) {
855
856 // close sink when current thread blocks in write
857 runAfterParkedAsync(sink::close);
858 try {
859 ByteBuffer bb = ByteBuffer.allocate(100*1024);
860 for (;;) {
861 int n = sink.write(bb);
862 assertTrue(n > 0);
863 bb.clear();
864 }
865 } catch (AsynchronousCloseException e) {
866 // closed when blocked in write
867 retry = false;
868 } catch (ClosedChannelException e) {
869 // closed when not blocked in write, need to retry test
870 }
871 }
872 }
873 });
874 }
875
876 /**
877 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
878 */
879 @ParameterizedTest
880 @MethodSource("threadBuilders")
881 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
882 VThreadRunner.run(builder, () -> {
883 boolean retry = true;
884 while (retry) {
885 Pipe p = Pipe.open();
886 try (Pipe.SinkChannel sink = p.sink();
887 Pipe.SourceChannel source = p.source()) {
888
889 // interrupt current thread when it blocks in write
890 Thread thisThread = Thread.currentThread();
891 runAfterParkedAsync(thisThread::interrupt);
892
893 try {
894 ByteBuffer bb = ByteBuffer.allocate(100*1024);
895 for (;;) {
896 int n = sink.write(bb);
897 assertTrue(n > 0);
898 bb.clear();
899 }
900 } catch (ClosedByInterruptException expected) {
901 // closed when blocked in write
902 assertTrue(Thread.interrupted());
|