45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65
66 import jdk.test.lib.thread.VThreadRunner;
67 import org.junit.jupiter.api.Test;
68 import static org.junit.jupiter.api.Assertions.*;
69
70 class BlockingChannelOps {
71
72 /**
73 * SocketChannel read/write, no blocking.
74 */
75 @Test
76 void testSocketChannelReadWrite1() throws Exception {
77 VThreadRunner.run(() -> {
78 try (var connection = new Connection()) {
79 SocketChannel sc1 = connection.channel1();
80 SocketChannel sc2 = connection.channel2();
81
82 // write to sc1
83 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
84 int n = sc1.write(bb);
85 assertTrue(n > 0);
86
87 // read from sc2 should not block
88 bb = ByteBuffer.allocate(10);
89 n = sc2.read(bb);
90 assertTrue(n > 0);
91 assertTrue(bb.get(0) == 'X');
92 }
93 });
94 }
95
96 /**
97 * Virtual thread blocks in SocketChannel read.
98 */
99 @Test
100 void testSocketChannelRead() throws Exception {
101 VThreadRunner.run(() -> {
102 try (var connection = new Connection()) {
103 SocketChannel sc1 = connection.channel1();
104 SocketChannel sc2 = connection.channel2();
105
106 // write to sc1 when current thread blocks in sc2.read
107 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
108 runAfterParkedAsync(() -> sc1.write(bb1));
109
110 // read from sc2 should block
111 ByteBuffer bb2 = ByteBuffer.allocate(10);
112 int n = sc2.read(bb2);
113 assertTrue(n > 0);
114 assertTrue(bb2.get(0) == 'X');
115 }
116 });
117 }
118
119 /**
120 * Virtual thread blocks in SocketChannel write.
121 */
122 @Test
123 void testSocketChannelWrite() throws Exception {
124 VThreadRunner.run(() -> {
125 try (var connection = new Connection()) {
126 SocketChannel sc1 = connection.channel1();
127 SocketChannel sc2 = connection.channel2();
128
129 // read from sc2 to EOF when current thread blocks in sc1.write
130 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
131
132 // write to sc1 should block
133 ByteBuffer bb = ByteBuffer.allocate(100*1024);
134 for (int i=0; i<1000; i++) {
135 int n = sc1.write(bb);
136 assertTrue(n > 0);
137 bb.clear();
138 }
139 sc1.close();
140
141 // wait for reader to finish
142 reader.join();
143 }
144 });
145 }
146
147 /**
148 * SocketChannel close while virtual thread blocked in read.
149 */
150 @Test
151 void testSocketChannelReadAsyncClose() throws Exception {
152 VThreadRunner.run(() -> {
153 try (var connection = new Connection()) {
154 SocketChannel sc = connection.channel1();
155 runAfterParkedAsync(sc::close);
156 try {
157 int n = sc.read(ByteBuffer.allocate(100));
158 fail("read returned " + n);
159 } catch (AsynchronousCloseException expected) { }
160 }
161 });
162 }
163
164 /**
165 * Virtual thread interrupted while blocked in SocketChannel read.
166 */
167 @Test
168 void testSocketChannelReadInterrupt() throws Exception {
169 VThreadRunner.run(() -> {
170 try (var connection = new Connection()) {
171 SocketChannel sc = connection.channel1();
172
173 // interrupt current thread when it blocks in read
174 Thread thisThread = Thread.currentThread();
175 runAfterParkedAsync(thisThread::interrupt);
176
177 try {
178 int n = sc.read(ByteBuffer.allocate(100));
179 fail("read returned " + n);
180 } catch (ClosedByInterruptException expected) {
181 assertTrue(Thread.interrupted());
182 }
183 }
184 });
185 }
186
187 /**
188 * SocketChannel close while virtual thread blocked in write.
189 */
190 @Test
191 void testSocketChannelWriteAsyncClose() throws Exception {
192 VThreadRunner.run(() -> {
193 boolean retry = true;
194 while (retry) {
195 try (var connection = new Connection()) {
196 SocketChannel sc = connection.channel1();
197
198 // close sc when current thread blocks in write
199 runAfterParkedAsync(sc::close);
200 try {
201 ByteBuffer bb = ByteBuffer.allocate(100*1024);
202 for (;;) {
203 int n = sc.write(bb);
204 assertTrue(n > 0);
205 bb.clear();
206 }
207 } catch (AsynchronousCloseException expected) {
208 // closed when blocked in write
209 retry = false;
210 } catch (ClosedChannelException e) {
211 // closed when not blocked in write, need to retry test
212 }
213 }
214 }
215 });
216 }
217
218 /**
219 * Virtual thread interrupted while blocked in SocketChannel write.
220 */
221 @Test
222 void testSocketChannelWriteInterrupt() throws Exception {
223 VThreadRunner.run(() -> {
224 boolean retry = true;
225 while (retry) {
226 try (var connection = new Connection()) {
227 SocketChannel sc = connection.channel1();
228
229 // interrupt current thread when it blocks in write
230 Thread thisThread = Thread.currentThread();
231 runAfterParkedAsync(thisThread::interrupt);
232
233 try {
234 ByteBuffer bb = ByteBuffer.allocate(100*1024);
235 for (;;) {
236 int n = sc.write(bb);
237 assertTrue(n > 0);
238 bb.clear();
239 }
240 } catch (ClosedByInterruptException e) {
241 // closed when blocked in write
242 assertTrue(Thread.interrupted());
243 retry = false;
244 } catch (ClosedChannelException e) {
245 // closed when not blocked in write, need to retry test
246 }
247 }
248 }
249 });
250 }
251
252 /**
253 * Virtual thread blocks in SocketChannel adaptor read.
254 */
255 @Test
256 void testSocketAdaptorRead1() throws Exception {
257 testSocketAdaptorRead(0);
258 }
259
260 /**
261 * Virtual thread blocks in SocketChannel adaptor read with timeout.
262 */
263 @Test
264 void testSocketAdaptorRead2() throws Exception {
265 testSocketAdaptorRead(60_000);
266 }
267
268 private void testSocketAdaptorRead(int timeout) throws Exception {
269 VThreadRunner.run(() -> {
270 try (var connection = new Connection()) {
271 SocketChannel sc1 = connection.channel1();
272 SocketChannel sc2 = connection.channel2();
273
274 // write to sc1 when currnet thread blocks reading from sc2
275 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
276 runAfterParkedAsync(() -> sc1.write(bb));
277
278 // read from sc2 should block
279 byte[] array = new byte[100];
280 if (timeout > 0)
281 sc2.socket().setSoTimeout(timeout);
282 int n = sc2.socket().getInputStream().read(array);
283 assertTrue(n > 0);
284 assertTrue(array[0] == 'X');
285 }
286 });
287 }
288
289 /**
290 * ServerSocketChannel accept, no blocking.
291 */
292 @Test
293 void testServerSocketChannelAccept1() throws Exception {
294 VThreadRunner.run(() -> {
295 try (var ssc = ServerSocketChannel.open()) {
296 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
297 var sc1 = SocketChannel.open(ssc.getLocalAddress());
298 // accept should not block
299 var sc2 = ssc.accept();
300 sc1.close();
301 sc2.close();
302 }
303 });
304 }
305
306 /**
307 * Virtual thread blocks in ServerSocketChannel accept.
308 */
309 @Test
310 void testServerSocketChannelAccept2() throws Exception {
311 VThreadRunner.run(() -> {
312 try (var ssc = ServerSocketChannel.open()) {
313 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
314 var sc1 = SocketChannel.open();
315
316 // connect when current thread when it blocks in accept
317 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
318
319 // accept should block
320 var sc2 = ssc.accept();
321 sc1.close();
322 sc2.close();
323 }
324 });
325 }
326
327 /**
328 * SeverSocketChannel close while virtual thread blocked in accept.
329 */
330 @Test
331 void testServerSocketChannelAcceptAsyncClose() throws Exception {
332 VThreadRunner.run(() -> {
333 try (var ssc = ServerSocketChannel.open()) {
334 InetAddress lh = InetAddress.getLoopbackAddress();
335 ssc.bind(new InetSocketAddress(lh, 0));
336 runAfterParkedAsync(ssc::close);
337 try {
338 SocketChannel sc = ssc.accept();
339 sc.close();
340 fail("connection accepted???");
341 } catch (AsynchronousCloseException expected) { }
342 }
343 });
344 }
345
346 /**
347 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
348 */
349 @Test
350 void testServerSocketChannelAcceptInterrupt() throws Exception {
351 VThreadRunner.run(() -> {
352 try (var ssc = ServerSocketChannel.open()) {
353 InetAddress lh = InetAddress.getLoopbackAddress();
354 ssc.bind(new InetSocketAddress(lh, 0));
355
356 // interrupt current thread when it blocks in accept
357 Thread thisThread = Thread.currentThread();
358 runAfterParkedAsync(thisThread::interrupt);
359
360 try {
361 SocketChannel sc = ssc.accept();
362 sc.close();
363 fail("connection accepted???");
364 } catch (ClosedByInterruptException expected) {
365 assertTrue(Thread.interrupted());
366 }
367 }
368 });
369 }
370
371 /**
372 * Virtual thread blocks in ServerSocketChannel adaptor accept.
373 */
374 @Test
375 void testSocketChannelAdaptorAccept1() throws Exception {
376 testSocketChannelAdaptorAccept(0);
377 }
378
379 /**
380 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
381 */
382 @Test
383 void testSocketChannelAdaptorAccept2() throws Exception {
384 testSocketChannelAdaptorAccept(60_000);
385 }
386
387 private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
388 VThreadRunner.run(() -> {
389 try (var ssc = ServerSocketChannel.open()) {
390 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
391 var sc = SocketChannel.open();
392
393 // interrupt current thread when it blocks in accept
394 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
395
396 // accept should block
397 if (timeout > 0)
398 ssc.socket().setSoTimeout(timeout);
399 Socket s = ssc.socket().accept();
400 sc.close();
401 s.close();
402 }
403 });
404 }
405
406 /**
407 * DatagramChannel receive/send, no blocking.
408 */
409 @Test
410 void testDatagramChannelSendReceive1() throws Exception {
411 VThreadRunner.run(() -> {
412 try (DatagramChannel dc1 = DatagramChannel.open();
413 DatagramChannel dc2 = DatagramChannel.open()) {
414
415 InetAddress lh = InetAddress.getLoopbackAddress();
416 dc2.bind(new InetSocketAddress(lh, 0));
417
418 // send should not block
419 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
420 int n = dc1.send(bb, dc2.getLocalAddress());
421 assertTrue(n > 0);
422
423 // receive should not block
424 bb = ByteBuffer.allocate(10);
425 dc2.receive(bb);
426 assertTrue(bb.get(0) == 'X');
427 }
428 });
429 }
430
431 /**
432 * Virtual thread blocks in DatagramChannel receive.
433 */
434 @Test
435 void testDatagramChannelSendReceive2() throws Exception {
436 VThreadRunner.run(() -> {
437 try (DatagramChannel dc1 = DatagramChannel.open();
438 DatagramChannel dc2 = DatagramChannel.open()) {
439
440 InetAddress lh = InetAddress.getLoopbackAddress();
441 dc2.bind(new InetSocketAddress(lh, 0));
442
443 // send from dc1 when current thread blocked in dc2.receive
444 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
445 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
446
447 // read from dc2 should block
448 ByteBuffer bb2 = ByteBuffer.allocate(10);
449 dc2.receive(bb2);
450 assertTrue(bb2.get(0) == 'X');
451 }
452 });
453 }
454
455 /**
456 * DatagramChannel close while virtual thread blocked in receive.
457 */
458 @Test
459 void testDatagramChannelReceiveAsyncClose() throws Exception {
460 VThreadRunner.run(() -> {
461 try (DatagramChannel dc = DatagramChannel.open()) {
462 InetAddress lh = InetAddress.getLoopbackAddress();
463 dc.bind(new InetSocketAddress(lh, 0));
464 runAfterParkedAsync(dc::close);
465 try {
466 dc.receive(ByteBuffer.allocate(100));
467 fail("receive returned");
468 } catch (AsynchronousCloseException expected) { }
469 }
470 });
471 }
472
473 /**
474 * Virtual thread interrupted while blocked in DatagramChannel receive.
475 */
476 @Test
477 void testDatagramChannelReceiveInterrupt() throws Exception {
478 VThreadRunner.run(() -> {
479 try (DatagramChannel dc = DatagramChannel.open()) {
480 InetAddress lh = InetAddress.getLoopbackAddress();
481 dc.bind(new InetSocketAddress(lh, 0));
482
483 // interrupt current thread when it blocks in receive
484 Thread thisThread = Thread.currentThread();
485 runAfterParkedAsync(thisThread::interrupt);
486
487 try {
488 dc.receive(ByteBuffer.allocate(100));
489 fail("receive returned");
490 } catch (ClosedByInterruptException expected) {
491 assertTrue(Thread.interrupted());
492 }
493 }
494 });
495 }
496
497 /**
498 * Virtual thread blocks in DatagramSocket adaptor receive.
499 */
500 @Test
501 void testDatagramSocketAdaptorReceive1() throws Exception {
502 testDatagramSocketAdaptorReceive(0);
503 }
504
505 /**
506 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
507 */
508 @Test
509 void testDatagramSocketAdaptorReceive2() throws Exception {
510 testDatagramSocketAdaptorReceive(60_000);
511 }
512
513 private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
514 VThreadRunner.run(() -> {
515 try (DatagramChannel dc1 = DatagramChannel.open();
516 DatagramChannel dc2 = DatagramChannel.open()) {
517
518 InetAddress lh = InetAddress.getLoopbackAddress();
519 dc2.bind(new InetSocketAddress(lh, 0));
520
521 // send from dc1 when current thread blocks in dc2 receive
522 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
523 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
524
525 // receive should block
526 byte[] array = new byte[100];
527 DatagramPacket p = new DatagramPacket(array, 0, array.length);
528 if (timeout > 0)
529 dc2.socket().setSoTimeout(timeout);
530 dc2.socket().receive(p);
531 assertTrue(p.getLength() == 3 && array[0] == 'X');
532 }
533 });
534 }
535
536 /**
537 * DatagramChannel close while virtual thread blocked in adaptor receive.
538 */
539 @Test
540 void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
541 testDatagramSocketAdaptorReceiveAsyncClose(0);
542 }
543
544 /**
545 * DatagramChannel close while virtual thread blocked in adaptor receive
546 * with timeout.
547 */
548 @Test
549 void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
550 testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
551 }
552
553 private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
554 VThreadRunner.run(() -> {
555 try (DatagramChannel dc = DatagramChannel.open()) {
556 InetAddress lh = InetAddress.getLoopbackAddress();
557 dc.bind(new InetSocketAddress(lh, 0));
558
559 byte[] array = new byte[100];
560 DatagramPacket p = new DatagramPacket(array, 0, array.length);
561 if (timeout > 0)
562 dc.socket().setSoTimeout(timeout);
563
564 // close channel/socket when current thread blocks in receive
565 runAfterParkedAsync(dc::close);
566
567 assertThrows(SocketException.class, () -> dc.socket().receive(p));
568 }
569 });
570 }
571
572 /**
573 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
574 */
575 @Test
576 void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
577 testDatagramSocketAdaptorReceiveInterrupt(0);
578 }
579
580 /**
581 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
582 * with timeout.
583 */
584 @Test
585 void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
586 testDatagramSocketAdaptorReceiveInterrupt(60_1000);
587 }
588
589 private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
590 VThreadRunner.run(() -> {
591 try (DatagramChannel dc = DatagramChannel.open()) {
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 dc.bind(new InetSocketAddress(lh, 0));
594
595 byte[] array = new byte[100];
596 DatagramPacket p = new DatagramPacket(array, 0, array.length);
597 if (timeout > 0)
598 dc.socket().setSoTimeout(timeout);
599
600 // interrupt current thread when it blocks in receive
601 Thread thisThread = Thread.currentThread();
602 runAfterParkedAsync(thisThread::interrupt);
603
604 try {
605 dc.socket().receive(p);
606 fail();
607 } catch (ClosedByInterruptException expected) {
608 assertTrue(Thread.interrupted());
609 }
610 }
611 });
612 }
613
614 /**
615 * Pipe read/write, no blocking.
616 */
617 @Test
618 void testPipeReadWrite1() throws Exception {
619 VThreadRunner.run(() -> {
620 Pipe p = Pipe.open();
621 try (Pipe.SinkChannel sink = p.sink();
622 Pipe.SourceChannel source = p.source()) {
623
624 // write should not block
625 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
626 int n = sink.write(bb);
627 assertTrue(n > 0);
628
629 // read should not block
630 bb = ByteBuffer.allocate(10);
631 n = source.read(bb);
632 assertTrue(n > 0);
633 assertTrue(bb.get(0) == 'X');
634 }
635 });
636 }
637
638 /**
639 * Virtual thread blocks in Pipe.SourceChannel read.
640 */
641 @Test
642 void testPipeReadWrite2() throws Exception {
643 VThreadRunner.run(() -> {
644 Pipe p = Pipe.open();
645 try (Pipe.SinkChannel sink = p.sink();
646 Pipe.SourceChannel source = p.source()) {
647
648 // write from sink when current thread blocks reading from source
649 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
650 runAfterParkedAsync(() -> sink.write(bb1));
651
652 // read should block
653 ByteBuffer bb2 = ByteBuffer.allocate(10);
654 int n = source.read(bb2);
655 assertTrue(n > 0);
656 assertTrue(bb2.get(0) == 'X');
657 }
658 });
659 }
660
661 /**
662 * Virtual thread blocks in Pipe.SinkChannel write.
663 */
664 @Test
665 void testPipeReadWrite3() throws Exception {
666 VThreadRunner.run(() -> {
667 Pipe p = Pipe.open();
668 try (Pipe.SinkChannel sink = p.sink();
669 Pipe.SourceChannel source = p.source()) {
670
671 // read from source to EOF when current thread blocking in write
672 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
673
674 // write to sink should block
675 ByteBuffer bb = ByteBuffer.allocate(100*1024);
676 for (int i=0; i<1000; i++) {
677 int n = sink.write(bb);
678 assertTrue(n > 0);
679 bb.clear();
680 }
681 sink.close();
682
683 // wait for reader to finish
684 reader.join();
685 }
686 });
687 }
688
689 /**
690 * Pipe.SourceChannel close while virtual thread blocked in read.
691 */
692 @Test
693 void testPipeReadAsyncClose() throws Exception {
694 VThreadRunner.run(() -> {
695 Pipe p = Pipe.open();
696 try (Pipe.SinkChannel sink = p.sink();
697 Pipe.SourceChannel source = p.source()) {
698 runAfterParkedAsync(source::close);
699 try {
700 int n = source.read(ByteBuffer.allocate(100));
701 fail("read returned " + n);
702 } catch (AsynchronousCloseException expected) { }
703 }
704 });
705 }
706
707 /**
708 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
709 */
710 @Test
711 void testPipeReadInterrupt() throws Exception {
712 VThreadRunner.run(() -> {
713 Pipe p = Pipe.open();
714 try (Pipe.SinkChannel sink = p.sink();
715 Pipe.SourceChannel source = p.source()) {
716
717 // interrupt current thread when it blocks reading from source
718 Thread thisThread = Thread.currentThread();
719 runAfterParkedAsync(thisThread::interrupt);
720
721 try {
722 int n = source.read(ByteBuffer.allocate(100));
723 fail("read returned " + n);
724 } catch (ClosedByInterruptException expected) {
725 assertTrue(Thread.interrupted());
726 }
727 }
728 });
729 }
730
731 /**
732 * Pipe.SinkChannel close while virtual thread blocked in write.
733 */
734 @Test
735 void testPipeWriteAsyncClose() throws Exception {
736 VThreadRunner.run(() -> {
737 boolean retry = true;
738 while (retry) {
739 Pipe p = Pipe.open();
740 try (Pipe.SinkChannel sink = p.sink();
741 Pipe.SourceChannel source = p.source()) {
742
743 // close sink when current thread blocks in write
744 runAfterParkedAsync(sink::close);
745 try {
746 ByteBuffer bb = ByteBuffer.allocate(100*1024);
747 for (;;) {
748 int n = sink.write(bb);
749 assertTrue(n > 0);
750 bb.clear();
751 }
752 } catch (AsynchronousCloseException e) {
753 // closed when blocked in write
754 retry = false;
755 } catch (ClosedChannelException e) {
756 // closed when not blocked in write, need to retry test
757 }
758 }
759 }
760 });
761 }
762
763 /**
764 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
765 */
766 @Test
767 void testPipeWriteInterrupt() throws Exception {
768 VThreadRunner.run(() -> {
769 boolean retry = true;
770 while (retry) {
771 Pipe p = Pipe.open();
772 try (Pipe.SinkChannel sink = p.sink();
773 Pipe.SourceChannel source = p.source()) {
774
775 // interrupt current thread when it blocks in write
776 Thread thisThread = Thread.currentThread();
777 runAfterParkedAsync(thisThread::interrupt);
778
779 try {
780 ByteBuffer bb = ByteBuffer.allocate(100*1024);
781 for (;;) {
782 int n = sink.write(bb);
783 assertTrue(n > 0);
784 bb.clear();
785 }
786 } catch (ClosedByInterruptException expected) {
787 // closed when blocked in write
788 assertTrue(Thread.interrupted());
|
45 */
46
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.DatagramPacket;
50 import java.net.InetAddress;
51 import java.net.InetSocketAddress;
52 import java.net.Socket;
53 import java.net.SocketAddress;
54 import java.net.SocketException;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AsynchronousCloseException;
57 import java.nio.channels.ClosedByInterruptException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.Pipe;
61 import java.nio.channels.ReadableByteChannel;
62 import java.nio.channels.ServerSocketChannel;
63 import java.nio.channels.SocketChannel;
64 import java.nio.channels.WritableByteChannel;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.Executors;
67 import java.util.stream.Stream;
68
69 import jdk.test.lib.thread.VThreadRunner;
70 import jdk.test.lib.thread.VThreadScheduler;
71
72 import org.junit.jupiter.api.BeforeAll;
73 import org.junit.jupiter.api.AfterAll;
74 import org.junit.jupiter.params.ParameterizedTest;
75 import org.junit.jupiter.params.provider.MethodSource;
76 import static org.junit.jupiter.api.Assertions.*;
77
78 class BlockingChannelOps {
79
80 private static ExecutorService customScheduler;
81
82 @BeforeAll
83 static void setup() {
84 customScheduler = Executors.newCachedThreadPool();
85 }
86
87 @AfterAll
88 static void finish() {
89 customScheduler.shutdown();
90 }
91
92 /**
93 * Returns the Thread.Builder to create the virtual thread.
94 */
95 static Stream<Thread.Builder.OfVirtual> threadBuilders() {
96 if (VThreadScheduler.supportsCustomScheduler()) {
97 return Stream.of(Thread.ofVirtual(), Thread.ofVirtual().scheduler(customScheduler));
98 } else {
99 return Stream.of(Thread.ofVirtual());
100 }
101 }
102
103 /**
104 * SocketChannel read/write, no blocking.
105 */
106 @ParameterizedTest
107 @MethodSource("threadBuilders")
108 void testSocketChannelReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
109 VThreadRunner.run(builder, () -> {
110 try (var connection = new Connection()) {
111 SocketChannel sc1 = connection.channel1();
112 SocketChannel sc2 = connection.channel2();
113
114 // write to sc1
115 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
116 int n = sc1.write(bb);
117 assertTrue(n > 0);
118
119 // read from sc2 should not block
120 bb = ByteBuffer.allocate(10);
121 n = sc2.read(bb);
122 assertTrue(n > 0);
123 assertTrue(bb.get(0) == 'X');
124 }
125 });
126 }
127
128 /**
129 * Virtual thread blocks in SocketChannel read.
130 */
131 @ParameterizedTest
132 @MethodSource("threadBuilders")
133 void testSocketChannelRead(Thread.Builder.OfVirtual builder) throws Exception {
134 VThreadRunner.run(builder, () -> {
135 try (var connection = new Connection()) {
136 SocketChannel sc1 = connection.channel1();
137 SocketChannel sc2 = connection.channel2();
138
139 // write to sc1 when current thread blocks in sc2.read
140 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
141 runAfterParkedAsync(() -> sc1.write(bb1));
142
143 // read from sc2 should block
144 ByteBuffer bb2 = ByteBuffer.allocate(10);
145 int n = sc2.read(bb2);
146 assertTrue(n > 0);
147 assertTrue(bb2.get(0) == 'X');
148 }
149 });
150 }
151
152 /**
153 * Virtual thread blocks in SocketChannel write.
154 */
155 @ParameterizedTest
156 @MethodSource("threadBuilders")
157 void testSocketChannelWrite(Thread.Builder.OfVirtual builder) throws Exception {
158 VThreadRunner.run(builder, () -> {
159 try (var connection = new Connection()) {
160 SocketChannel sc1 = connection.channel1();
161 SocketChannel sc2 = connection.channel2();
162
163 // read from sc2 to EOF when current thread blocks in sc1.write
164 Thread reader = runAfterParkedAsync(() -> readToEOF(sc2));
165
166 // write to sc1 should block
167 ByteBuffer bb = ByteBuffer.allocate(100*1024);
168 for (int i=0; i<1000; i++) {
169 int n = sc1.write(bb);
170 assertTrue(n > 0);
171 bb.clear();
172 }
173 sc1.close();
174
175 // wait for reader to finish
176 reader.join();
177 }
178 });
179 }
180
181 /**
182 * SocketChannel close while virtual thread blocked in read.
183 */
184 @ParameterizedTest
185 @MethodSource("threadBuilders")
186 void testSocketChannelReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
187 VThreadRunner.run(builder, () -> {
188 try (var connection = new Connection()) {
189 SocketChannel sc = connection.channel1();
190 runAfterParkedAsync(sc::close);
191 try {
192 int n = sc.read(ByteBuffer.allocate(100));
193 fail("read returned " + n);
194 } catch (AsynchronousCloseException expected) { }
195 }
196 });
197 }
198
199 /**
200 * SocketChannel shutdownInput while virtual thread blocked in read.
201 */
202 @ParameterizedTest
203 @MethodSource("threadBuilders")
204 void testSocketChannelReadAsyncShutdownInput(Thread.Builder.OfVirtual builder) throws Exception {
205 VThreadRunner.run(builder, () -> {
206 try (var connection = new Connection()) {
207 SocketChannel sc = connection.channel1();
208 runAfterParkedAsync(sc::shutdownInput);
209 int n = sc.read(ByteBuffer.allocate(100));
210 assertEquals(-1, n);
211 assertTrue(sc.isOpen());
212 }
213 });
214 }
215
216 /**
217 * Virtual thread interrupted while blocked in SocketChannel read.
218 */
219 @ParameterizedTest
220 @MethodSource("threadBuilders")
221 void testSocketChannelReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
222 VThreadRunner.run(builder, () -> {
223 try (var connection = new Connection()) {
224 SocketChannel sc = connection.channel1();
225
226 // interrupt current thread when it blocks in read
227 Thread thisThread = Thread.currentThread();
228 runAfterParkedAsync(thisThread::interrupt);
229
230 try {
231 int n = sc.read(ByteBuffer.allocate(100));
232 fail("read returned " + n);
233 } catch (ClosedByInterruptException expected) {
234 assertTrue(Thread.interrupted());
235 }
236 }
237 });
238 }
239
240 /**
241 * SocketChannel close while virtual thread blocked in write.
242 */
243 @ParameterizedTest
244 @MethodSource("threadBuilders")
245 void testSocketChannelWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
246 VThreadRunner.run(builder, () -> {
247 boolean retry = true;
248 while (retry) {
249 try (var connection = new Connection()) {
250 SocketChannel sc = connection.channel1();
251
252 // close sc when current thread blocks in write
253 runAfterParkedAsync(sc::close);
254 try {
255 ByteBuffer bb = ByteBuffer.allocate(100*1024);
256 for (;;) {
257 int n = sc.write(bb);
258 assertTrue(n > 0);
259 bb.clear();
260 }
261 } catch (AsynchronousCloseException expected) {
262 // closed when blocked in write
263 retry = false;
264 } catch (ClosedChannelException e) {
265 // closed when not blocked in write, need to retry test
266 }
267 }
268 }
269 });
270 }
271
272
273 /**
274 * SocketChannel shutdownOutput while virtual thread blocked in write.
275 */
276 @ParameterizedTest
277 @MethodSource("threadBuilders")
278 void testSocketChannelWriteAsyncShutdownOutput(Thread.Builder.OfVirtual builder) throws Exception {
279 VThreadRunner.run(builder, () -> {
280 try (var connection = new Connection()) {
281 SocketChannel sc = connection.channel1();
282
283 // shutdown output when current thread blocks in write
284 runAfterParkedAsync(sc::shutdownOutput);
285 try {
286 ByteBuffer bb = ByteBuffer.allocate(100*1024);
287 for (;;) {
288 int n = sc.write(bb);
289 assertTrue(n > 0);
290 bb.clear();
291 }
292 } catch (ClosedChannelException e) {
293 // expected
294 }
295 assertTrue(sc.isOpen());
296 }
297 });
298 }
299
300 /**
301 * Virtual thread interrupted while blocked in SocketChannel write.
302 */
303 @ParameterizedTest
304 @MethodSource("threadBuilders")
305 void testSocketChannelWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
306 VThreadRunner.run(builder, () -> {
307 boolean retry = true;
308 while (retry) {
309 try (var connection = new Connection()) {
310 SocketChannel sc = connection.channel1();
311
312 // interrupt current thread when it blocks in write
313 Thread thisThread = Thread.currentThread();
314 runAfterParkedAsync(thisThread::interrupt);
315
316 try {
317 ByteBuffer bb = ByteBuffer.allocate(100*1024);
318 for (;;) {
319 int n = sc.write(bb);
320 assertTrue(n > 0);
321 bb.clear();
322 }
323 } catch (ClosedByInterruptException e) {
324 // closed when blocked in write
325 assertTrue(Thread.interrupted());
326 retry = false;
327 } catch (ClosedChannelException e) {
328 // closed when not blocked in write, need to retry test
329 }
330 }
331 }
332 });
333 }
334
335 /**
336 * Virtual thread blocks in SocketChannel adaptor read.
337 */
338 @ParameterizedTest
339 @MethodSource("threadBuilders")
340 void testSocketAdaptorRead1(Thread.Builder.OfVirtual builder) throws Exception {
341 testSocketAdaptorRead(builder, 0);
342 }
343
344 /**
345 * Virtual thread blocks in SocketChannel adaptor read with timeout.
346 */
347 @ParameterizedTest
348 @MethodSource("threadBuilders")
349 void testSocketAdaptorRead2(Thread.Builder.OfVirtual builder) throws Exception {
350 testSocketAdaptorRead(builder, 60_000);
351 }
352
353 private void testSocketAdaptorRead(Thread.Builder.OfVirtual builder,
354 int timeout) throws Exception {
355 VThreadRunner.run(builder, () -> {
356 try (var connection = new Connection()) {
357 SocketChannel sc1 = connection.channel1();
358 SocketChannel sc2 = connection.channel2();
359
360 // write to sc1 when currnet thread blocks reading from sc2
361 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
362 runAfterParkedAsync(() -> sc1.write(bb));
363
364 // read from sc2 should block
365 byte[] array = new byte[100];
366 if (timeout > 0)
367 sc2.socket().setSoTimeout(timeout);
368 int n = sc2.socket().getInputStream().read(array);
369 assertTrue(n > 0);
370 assertTrue(array[0] == 'X');
371 }
372 });
373 }
374
375 /**
376 * ServerSocketChannel accept, no blocking.
377 */
378 @ParameterizedTest
379 @MethodSource("threadBuilders")
380 void testServerSocketChannelAccept1(Thread.Builder.OfVirtual builder) throws Exception {
381 VThreadRunner.run(builder, () -> {
382 try (var ssc = ServerSocketChannel.open()) {
383 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
384 var sc1 = SocketChannel.open(ssc.getLocalAddress());
385 // accept should not block
386 var sc2 = ssc.accept();
387 sc1.close();
388 sc2.close();
389 }
390 });
391 }
392
393 /**
394 * Virtual thread blocks in ServerSocketChannel accept.
395 */
396 @ParameterizedTest
397 @MethodSource("threadBuilders")
398 void testServerSocketChannelAccept2(Thread.Builder.OfVirtual builder) throws Exception {
399 VThreadRunner.run(builder, () -> {
400 try (var ssc = ServerSocketChannel.open()) {
401 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
402 var sc1 = SocketChannel.open();
403
404 // connect when current thread when it blocks in accept
405 runAfterParkedAsync(() -> sc1.connect(ssc.getLocalAddress()));
406
407 // accept should block
408 var sc2 = ssc.accept();
409 sc1.close();
410 sc2.close();
411 }
412 });
413 }
414
415 /**
416 * SeverSocketChannel close while virtual thread blocked in accept.
417 */
418 @ParameterizedTest
419 @MethodSource("threadBuilders")
420 void testServerSocketChannelAcceptAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
421 VThreadRunner.run(builder, () -> {
422 try (var ssc = ServerSocketChannel.open()) {
423 InetAddress lh = InetAddress.getLoopbackAddress();
424 ssc.bind(new InetSocketAddress(lh, 0));
425 runAfterParkedAsync(ssc::close);
426 try {
427 SocketChannel sc = ssc.accept();
428 sc.close();
429 fail("connection accepted???");
430 } catch (AsynchronousCloseException expected) { }
431 }
432 });
433 }
434
435 /**
436 * Virtual thread interrupted while blocked in ServerSocketChannel accept.
437 */
438 @ParameterizedTest
439 @MethodSource("threadBuilders")
440 void testServerSocketChannelAcceptInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
441 VThreadRunner.run(builder, () -> {
442 try (var ssc = ServerSocketChannel.open()) {
443 InetAddress lh = InetAddress.getLoopbackAddress();
444 ssc.bind(new InetSocketAddress(lh, 0));
445
446 // interrupt current thread when it blocks in accept
447 Thread thisThread = Thread.currentThread();
448 runAfterParkedAsync(thisThread::interrupt);
449
450 try {
451 SocketChannel sc = ssc.accept();
452 sc.close();
453 fail("connection accepted???");
454 } catch (ClosedByInterruptException expected) {
455 assertTrue(Thread.interrupted());
456 }
457 }
458 });
459 }
460
461 /**
462 * Virtual thread blocks in ServerSocketChannel adaptor accept.
463 */
464 @ParameterizedTest
465 @MethodSource("threadBuilders")
466 void testSocketChannelAdaptorAccept1(Thread.Builder.OfVirtual builder) throws Exception {
467 testSocketChannelAdaptorAccept(builder, 0);
468 }
469
470 /**
471 * Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
472 */
473 @ParameterizedTest
474 @MethodSource("threadBuilders")
475 void testSocketChannelAdaptorAccept2(Thread.Builder.OfVirtual builder) throws Exception {
476 testSocketChannelAdaptorAccept(builder, 60_000);
477 }
478
479 private void testSocketChannelAdaptorAccept(Thread.Builder.OfVirtual builder,
480 int timeout) throws Exception {
481 VThreadRunner.run(builder, () -> {
482 try (var ssc = ServerSocketChannel.open()) {
483 ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
484 var sc = SocketChannel.open();
485
486 // interrupt current thread when it blocks in accept
487 runAfterParkedAsync(() -> sc.connect(ssc.getLocalAddress()));
488
489 // accept should block
490 if (timeout > 0)
491 ssc.socket().setSoTimeout(timeout);
492 Socket s = ssc.socket().accept();
493 sc.close();
494 s.close();
495 }
496 });
497 }
498
499 /**
500 * DatagramChannel receive/send, no blocking.
501 */
502 @ParameterizedTest
503 @MethodSource("threadBuilders")
504 void testDatagramChannelSendReceive1(Thread.Builder.OfVirtual builder) throws Exception {
505 VThreadRunner.run(builder, () -> {
506 try (DatagramChannel dc1 = DatagramChannel.open();
507 DatagramChannel dc2 = DatagramChannel.open()) {
508
509 InetAddress lh = InetAddress.getLoopbackAddress();
510 dc2.bind(new InetSocketAddress(lh, 0));
511
512 // send should not block
513 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
514 int n = dc1.send(bb, dc2.getLocalAddress());
515 assertTrue(n > 0);
516
517 // receive should not block
518 bb = ByteBuffer.allocate(10);
519 dc2.receive(bb);
520 assertTrue(bb.get(0) == 'X');
521 }
522 });
523 }
524
525 /**
526 * Virtual thread blocks in DatagramChannel receive.
527 */
528 @ParameterizedTest
529 @MethodSource("threadBuilders")
530 void testDatagramChannelSendReceive2(Thread.Builder.OfVirtual builder) throws Exception {
531 VThreadRunner.run(builder, () -> {
532 try (DatagramChannel dc1 = DatagramChannel.open();
533 DatagramChannel dc2 = DatagramChannel.open()) {
534
535 InetAddress lh = InetAddress.getLoopbackAddress();
536 dc2.bind(new InetSocketAddress(lh, 0));
537
538 // send from dc1 when current thread blocked in dc2.receive
539 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
540 runAfterParkedAsync(() -> dc1.send(bb1, dc2.getLocalAddress()));
541
542 // read from dc2 should block
543 ByteBuffer bb2 = ByteBuffer.allocate(10);
544 dc2.receive(bb2);
545 assertTrue(bb2.get(0) == 'X');
546 }
547 });
548 }
549
550 /**
551 * DatagramChannel close while virtual thread blocked in receive.
552 */
553 @ParameterizedTest
554 @MethodSource("threadBuilders")
555 void testDatagramChannelReceiveAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
556 VThreadRunner.run(builder, () -> {
557 try (DatagramChannel dc = DatagramChannel.open()) {
558 InetAddress lh = InetAddress.getLoopbackAddress();
559 dc.bind(new InetSocketAddress(lh, 0));
560 runAfterParkedAsync(dc::close);
561 try {
562 dc.receive(ByteBuffer.allocate(100));
563 fail("receive returned");
564 } catch (AsynchronousCloseException expected) { }
565 }
566 });
567 }
568
569 /**
570 * Virtual thread interrupted while blocked in DatagramChannel receive.
571 */
572 @ParameterizedTest
573 @MethodSource("threadBuilders")
574 void testDatagramChannelReceiveInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
575 VThreadRunner.run(builder, () -> {
576 try (DatagramChannel dc = DatagramChannel.open()) {
577 InetAddress lh = InetAddress.getLoopbackAddress();
578 dc.bind(new InetSocketAddress(lh, 0));
579
580 // interrupt current thread when it blocks in receive
581 Thread thisThread = Thread.currentThread();
582 runAfterParkedAsync(thisThread::interrupt);
583
584 try {
585 dc.receive(ByteBuffer.allocate(100));
586 fail("receive returned");
587 } catch (ClosedByInterruptException expected) {
588 assertTrue(Thread.interrupted());
589 }
590 }
591 });
592 }
593
594 /**
595 * Virtual thread blocks in DatagramSocket adaptor receive.
596 */
597 @ParameterizedTest
598 @MethodSource("threadBuilders")
599 void testDatagramSocketAdaptorReceive1(Thread.Builder.OfVirtual builder) throws Exception {
600 testDatagramSocketAdaptorReceive(builder, 0);
601 }
602
603 /**
604 * Virtual thread blocks in DatagramSocket adaptor receive with timeout.
605 */
606 @ParameterizedTest
607 @MethodSource("threadBuilders")
608 void testDatagramSocketAdaptorReceive2(Thread.Builder.OfVirtual builder) throws Exception {
609 testDatagramSocketAdaptorReceive(builder, 60_000);
610 }
611
612 private void testDatagramSocketAdaptorReceive(Thread.Builder.OfVirtual builder,
613 int timeout) throws Exception {
614 VThreadRunner.run(builder, () -> {
615 try (DatagramChannel dc1 = DatagramChannel.open();
616 DatagramChannel dc2 = DatagramChannel.open()) {
617
618 InetAddress lh = InetAddress.getLoopbackAddress();
619 dc2.bind(new InetSocketAddress(lh, 0));
620
621 // send from dc1 when current thread blocks in dc2 receive
622 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
623 runAfterParkedAsync(() -> dc1.send(bb, dc2.getLocalAddress()));
624
625 // receive should block
626 byte[] array = new byte[100];
627 DatagramPacket p = new DatagramPacket(array, 0, array.length);
628 if (timeout > 0)
629 dc2.socket().setSoTimeout(timeout);
630 dc2.socket().receive(p);
631 assertTrue(p.getLength() == 3 && array[0] == 'X');
632 }
633 });
634 }
635
636 /**
637 * DatagramChannel close while virtual thread blocked in adaptor receive.
638 */
639 @ParameterizedTest
640 @MethodSource("threadBuilders")
641 void testDatagramSocketAdaptorReceiveAsyncClose1(Thread.Builder.OfVirtual builder) throws Exception {
642 testDatagramSocketAdaptorReceiveAsyncClose(builder, 0);
643 }
644
645 /**
646 * DatagramChannel close while virtual thread blocked in adaptor receive
647 * with timeout.
648 */
649 @ParameterizedTest
650 @MethodSource("threadBuilders")
651 void testDatagramSocketAdaptorReceiveAsyncClose2(Thread.Builder.OfVirtual builder) throws Exception {
652 testDatagramSocketAdaptorReceiveAsyncClose(builder, 60_1000);
653 }
654
655 private void testDatagramSocketAdaptorReceiveAsyncClose(Thread.Builder.OfVirtual builder,
656 int timeout) throws Exception {
657 VThreadRunner.run(builder, () -> {
658 try (DatagramChannel dc = DatagramChannel.open()) {
659 InetAddress lh = InetAddress.getLoopbackAddress();
660 dc.bind(new InetSocketAddress(lh, 0));
661
662 byte[] array = new byte[100];
663 DatagramPacket p = new DatagramPacket(array, 0, array.length);
664 if (timeout > 0)
665 dc.socket().setSoTimeout(timeout);
666
667 // close channel/socket when current thread blocks in receive
668 runAfterParkedAsync(dc::close);
669
670 assertThrows(SocketException.class, () -> dc.socket().receive(p));
671 }
672 });
673 }
674
675 /**
676 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
677 */
678 @ParameterizedTest
679 @MethodSource("threadBuilders")
680 void testDatagramSocketAdaptorReceiveInterrupt1(Thread.Builder.OfVirtual builder) throws Exception {
681 testDatagramSocketAdaptorReceiveInterrupt(builder, 0);
682 }
683
684 /**
685 * Virtual thread interrupted while blocked in DatagramSocket adaptor receive
686 * with timeout.
687 */
688 @ParameterizedTest
689 @MethodSource("threadBuilders")
690 void testDatagramSocketAdaptorReceiveInterrupt2(Thread.Builder.OfVirtual builder) throws Exception {
691 testDatagramSocketAdaptorReceiveInterrupt(builder, 60_1000);
692 }
693
694 private void testDatagramSocketAdaptorReceiveInterrupt(Thread.Builder.OfVirtual builder,
695 int timeout) throws Exception {
696 VThreadRunner.run(builder, () -> {
697 try (DatagramChannel dc = DatagramChannel.open()) {
698 InetAddress lh = InetAddress.getLoopbackAddress();
699 dc.bind(new InetSocketAddress(lh, 0));
700
701 byte[] array = new byte[100];
702 DatagramPacket p = new DatagramPacket(array, 0, array.length);
703 if (timeout > 0)
704 dc.socket().setSoTimeout(timeout);
705
706 // interrupt current thread when it blocks in receive
707 Thread thisThread = Thread.currentThread();
708 runAfterParkedAsync(thisThread::interrupt);
709
710 try {
711 dc.socket().receive(p);
712 fail();
713 } catch (ClosedByInterruptException expected) {
714 assertTrue(Thread.interrupted());
715 }
716 }
717 });
718 }
719
720 /**
721 * Pipe read/write, no blocking.
722 */
723 @ParameterizedTest
724 @MethodSource("threadBuilders")
725 void testPipeReadWrite1(Thread.Builder.OfVirtual builder) throws Exception {
726 VThreadRunner.run(builder, () -> {
727 Pipe p = Pipe.open();
728 try (Pipe.SinkChannel sink = p.sink();
729 Pipe.SourceChannel source = p.source()) {
730
731 // write should not block
732 ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
733 int n = sink.write(bb);
734 assertTrue(n > 0);
735
736 // read should not block
737 bb = ByteBuffer.allocate(10);
738 n = source.read(bb);
739 assertTrue(n > 0);
740 assertTrue(bb.get(0) == 'X');
741 }
742 });
743 }
744
745 /**
746 * Virtual thread blocks in Pipe.SourceChannel read.
747 */
748 @ParameterizedTest
749 @MethodSource("threadBuilders")
750 void testPipeReadWrite2(Thread.Builder.OfVirtual builder) throws Exception {
751 VThreadRunner.run(builder, () -> {
752 Pipe p = Pipe.open();
753 try (Pipe.SinkChannel sink = p.sink();
754 Pipe.SourceChannel source = p.source()) {
755
756 // write from sink when current thread blocks reading from source
757 ByteBuffer bb1 = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
758 runAfterParkedAsync(() -> sink.write(bb1));
759
760 // read should block
761 ByteBuffer bb2 = ByteBuffer.allocate(10);
762 int n = source.read(bb2);
763 assertTrue(n > 0);
764 assertTrue(bb2.get(0) == 'X');
765 }
766 });
767 }
768
769 /**
770 * Virtual thread blocks in Pipe.SinkChannel write.
771 */
772 @ParameterizedTest
773 @MethodSource("threadBuilders")
774 void testPipeReadWrite3(Thread.Builder.OfVirtual builder) throws Exception {
775 VThreadRunner.run(builder, () -> {
776 Pipe p = Pipe.open();
777 try (Pipe.SinkChannel sink = p.sink();
778 Pipe.SourceChannel source = p.source()) {
779
780 // read from source to EOF when current thread blocking in write
781 Thread reader = runAfterParkedAsync(() -> readToEOF(source));
782
783 // write to sink should block
784 ByteBuffer bb = ByteBuffer.allocate(100*1024);
785 for (int i=0; i<1000; i++) {
786 int n = sink.write(bb);
787 assertTrue(n > 0);
788 bb.clear();
789 }
790 sink.close();
791
792 // wait for reader to finish
793 reader.join();
794 }
795 });
796 }
797
798 /**
799 * Pipe.SourceChannel close while virtual thread blocked in read.
800 */
801 @ParameterizedTest
802 @MethodSource("threadBuilders")
803 void testPipeReadAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
804 VThreadRunner.run(builder, () -> {
805 Pipe p = Pipe.open();
806 try (Pipe.SinkChannel sink = p.sink();
807 Pipe.SourceChannel source = p.source()) {
808 runAfterParkedAsync(source::close);
809 try {
810 int n = source.read(ByteBuffer.allocate(100));
811 fail("read returned " + n);
812 } catch (AsynchronousCloseException expected) { }
813 }
814 });
815 }
816
817 /**
818 * Virtual thread interrupted while blocked in Pipe.SourceChannel read.
819 */
820 @ParameterizedTest
821 @MethodSource("threadBuilders")
822 void testPipeReadInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
823 VThreadRunner.run(builder, () -> {
824 Pipe p = Pipe.open();
825 try (Pipe.SinkChannel sink = p.sink();
826 Pipe.SourceChannel source = p.source()) {
827
828 // interrupt current thread when it blocks reading from source
829 Thread thisThread = Thread.currentThread();
830 runAfterParkedAsync(thisThread::interrupt);
831
832 try {
833 int n = source.read(ByteBuffer.allocate(100));
834 fail("read returned " + n);
835 } catch (ClosedByInterruptException expected) {
836 assertTrue(Thread.interrupted());
837 }
838 }
839 });
840 }
841
842 /**
843 * Pipe.SinkChannel close while virtual thread blocked in write.
844 */
845 @ParameterizedTest
846 @MethodSource("threadBuilders")
847 void testPipeWriteAsyncClose(Thread.Builder.OfVirtual builder) throws Exception {
848 VThreadRunner.run(builder, () -> {
849 boolean retry = true;
850 while (retry) {
851 Pipe p = Pipe.open();
852 try (Pipe.SinkChannel sink = p.sink();
853 Pipe.SourceChannel source = p.source()) {
854
855 // close sink when current thread blocks in write
856 runAfterParkedAsync(sink::close);
857 try {
858 ByteBuffer bb = ByteBuffer.allocate(100*1024);
859 for (;;) {
860 int n = sink.write(bb);
861 assertTrue(n > 0);
862 bb.clear();
863 }
864 } catch (AsynchronousCloseException e) {
865 // closed when blocked in write
866 retry = false;
867 } catch (ClosedChannelException e) {
868 // closed when not blocked in write, need to retry test
869 }
870 }
871 }
872 });
873 }
874
875 /**
876 * Virtual thread interrupted while blocked in Pipe.SinkChannel write.
877 */
878 @ParameterizedTest
879 @MethodSource("threadBuilders")
880 void testPipeWriteInterrupt(Thread.Builder.OfVirtual builder) throws Exception {
881 VThreadRunner.run(builder, () -> {
882 boolean retry = true;
883 while (retry) {
884 Pipe p = Pipe.open();
885 try (Pipe.SinkChannel sink = p.sink();
886 Pipe.SourceChannel source = p.source()) {
887
888 // interrupt current thread when it blocks in write
889 Thread thisThread = Thread.currentThread();
890 runAfterParkedAsync(thisThread::interrupt);
891
892 try {
893 ByteBuffer bb = ByteBuffer.allocate(100*1024);
894 for (;;) {
895 int n = sink.write(bb);
896 assertTrue(n > 0);
897 bb.clear();
898 }
899 } catch (ClosedByInterruptException expected) {
900 // closed when blocked in write
901 assertTrue(Thread.interrupted());
|