1 /*
2 * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /*
25 * @test id=default
26 * @bug 8284161
27 * @summary Test virtual threads doing blocking I/O on java.net Sockets
28 * @library /test/lib
29 * @run junit BlockingSocketOps
30 */
31
32 /*
33 * @test id=poller-modes
34 * @requires (os.family == "linux") | (os.family == "mac")
35 * @library /test/lib
36 * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
37 * @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
38 * @run junit/othervm -Djdk.pollerMode=3 BlockingSocketOps
39 */
40
41 /*
42 * @test id=io_uring
43 * @requires os.family == "linux"
44 * @library /test/lib
45 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true BlockingSocketOps
46 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true BlockingSocketOps
47 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true BlockingSocketOps
48 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
49 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
50 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
51 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps
52 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps
53 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true BlockingSocketOps
54 * @run junit/othervm -Djdk.pollerMode=1 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
55 * @run junit/othervm -Djdk.pollerMode=2 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
56 * @run junit/othervm -Djdk.pollerMode=3 -Djdk.io_uring=true -Djdk.io_uring.read=true -Djdk.io_uring.write=true -Djdk.io_uring.sqpoll_idle=20 BlockingSocketOps
57 */
58
59 /*
60 * @test id=no-vmcontinuations
61 * @requires vm.continuations
62 * @library /test/lib
63 * @run junit/othervm -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations BlockingSocketOps
64 */
65
66 import java.io.Closeable;
67 import java.io.IOException;
68 import java.io.InputStream;
69 import java.io.OutputStream;
70 import java.net.DatagramPacket;
71 import java.net.DatagramSocket;
72 import java.net.InetAddress;
73 import java.net.InetSocketAddress;
74 import java.net.ServerSocket;
75 import java.net.Socket;
76 import java.net.SocketAddress;
77 import java.net.SocketException;
78 import java.net.SocketTimeoutException;
79
80 import jdk.test.lib.thread.VThreadRunner;
81 import org.junit.jupiter.api.Test;
82 import static org.junit.jupiter.api.Assertions.*;
83
84 class BlockingSocketOps {
85
86 /**
87 * Socket read/write, no blocking.
88 */
89 @Test
90 void testSocketReadWrite1() throws Exception {
91 VThreadRunner.run(() -> {
92 try (var connection = new Connection()) {
93 Socket s1 = connection.socket1();
94 Socket s2 = connection.socket2();
95
96 // write should not block
97 byte[] ba = "XXX".getBytes("UTF-8");
98 s1.getOutputStream().write(ba);
99
100 // read should not block
101 ba = new byte[10];
102 int n = s2.getInputStream().read(ba);
103 assertTrue(n > 0);
104 assertTrue(ba[0] == 'X');
105 }
106 });
107 }
108
109 /**
110 * Virtual thread blocks in read.
111 */
112 @Test
113 void testSocketRead1() throws Exception {
114 testSocketRead(0);
115 }
116
117 /**
118 * Virtual thread blocks in timed read.
119 */
120 @Test
121 void testSocketRead2() throws Exception {
122 testSocketRead(60_000);
123 }
124
125 void testSocketRead(int timeout) throws Exception {
126 VThreadRunner.run(() -> {
127 try (var connection = new Connection()) {
128 Socket s1 = connection.socket1();
129 Socket s2 = connection.socket2();
130
131 // delayed write from sc1
132 byte[] ba1 = "XXX".getBytes("UTF-8");
133 runAfterParkedAsync(() -> s1.getOutputStream().write(ba1));
134
135 // read from sc2 should block
136 if (timeout > 0) {
137 s2.setSoTimeout(timeout);
138 }
139 byte[] ba2 = new byte[10];
140 int n = s2.getInputStream().read(ba2);
141 assertTrue(n > 0);
142 assertTrue(ba2[0] == 'X');
143 }
144 });
145 }
146
147 /**
148 * Virtual thread blocks in write.
149 */
150 @Test
151 void testSocketWrite1() throws Exception {
152 VThreadRunner.run(() -> {
153 try (var connection = new Connection()) {
154 Socket s1 = connection.socket1();
155 Socket s2 = connection.socket2();
156
157 // delayed read from s2 to EOF
158 InputStream in = s2.getInputStream();
159 Thread reader = runAfterParkedAsync(() ->
160 in.transferTo(OutputStream.nullOutputStream()));
161
162 // write should block
163 byte[] ba = new byte[100*1024];
164 try (OutputStream out = s1.getOutputStream()) {
165 for (int i = 0; i < 1000; i++) {
166 out.write(ba);
167 }
168 }
169
170 // wait for reader to finish
171 reader.join();
172 }
173 });
174 }
175
176 /**
177 * Virtual thread blocks in read, peer closes connection gracefully.
178 */
179 @Test
180 void testSocketReadPeerClose1() throws Exception {
181 VThreadRunner.run(() -> {
182 try (var connection = new Connection()) {
183 Socket s1 = connection.socket1();
184 Socket s2 = connection.socket2();
185
186 // delayed close of s2
187 runAfterParkedAsync(s2::close);
188
189 // read from s1 should block, then read -1
190 int n = s1.getInputStream().read();
191 assertTrue(n == -1);
192 }
193 });
194 }
195
196 /**
197 * Virtual thread blocks in read, peer closes connection abruptly.
198 */
199 @Test
200 void testSocketReadPeerClose2() throws Exception {
201 VThreadRunner.run(() -> {
202 try (var connection = new Connection()) {
203 Socket s1 = connection.socket1();
204 Socket s2 = connection.socket2();
205
206 // delayed abrupt close of s2
207 s2.setSoLinger(true, 0);
208 runAfterParkedAsync(s2::close);
209
210 // read from s1 should block, then throw
211 try {
212 int n = s1.getInputStream().read();
213 fail("read " + n);
214 } catch (IOException ioe) {
215 // expected
216 }
217 }
218 });
219 }
220
221 /**
222 * Socket close while virtual thread blocked in read.
223 */
224 @Test
225 void testSocketReadAsyncClose1() throws Exception {
226 testSocketReadAsyncClose(0);
227 }
228
229 /**
230 * Socket close while virtual thread blocked in timed read.
231 */
232 @Test
233 void testSocketReadAsyncClose2() throws Exception {
234 testSocketReadAsyncClose(60_000);
235 }
236
237 void testSocketReadAsyncClose(int timeout) throws Exception {
238 VThreadRunner.run(() -> {
239 try (var connection = new Connection()) {
240 Socket s = connection.socket1();
241
242 // delayed close of s
243 runAfterParkedAsync(s::close);
244
245 // read from s should block, then throw
246 if (timeout > 0) {
247 s.setSoTimeout(timeout);
248 }
249 try {
250 int n = s.getInputStream().read();
251 fail("read " + n);
252 } catch (SocketException expected) { }
253 }
254 });
255 }
256
257 /**
258 * Socket shutdownInput while virtual thread blocked in read.
259 */
260 @Test
261 void testSocketReadAsyncShutdownInput1() throws Exception {
262 testSocketReadAsyncShutdownInput(0);
263 }
264
265 /**
266 * Socket shutdownInput while virtual thread blocked in timed read.
267 */
268 @Test
269 void testSocketReadAsyncShutdownInput2() throws Exception {
270 testSocketReadAsyncShutdownInput(60_000);
271 }
272
273 void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
274 VThreadRunner.run(() -> {
275 try (var connection = new Connection()) {
276 Socket s = connection.socket1();
277
278 // delayed shutdown of s
279 runAfterParkedAsync(s::shutdownInput);
280
281 // read from s should block, then throw
282 if (timeout > 0) {
283 s.setSoTimeout(timeout);
284 }
285
286 // -1 or SocketException
287 try {
288 int n = s.getInputStream().read();
289 assertEquals(-1, n);
290 } catch (SocketException e) { }
291 assertFalse(s.isClosed());
292 }
293 });
294 }
295
296 /**
297 * Virtual thread interrupted while blocked in Socket read.
298 */
299 @Test
300 void testSocketReadInterrupt1() throws Exception {
301 testSocketReadInterrupt(0);
302 }
303
304 /**
305 * Virtual thread interrupted while blocked in Socket read with timeout
306 */
307 @Test
308 void testSocketReadInterrupt2() throws Exception {
309 testSocketReadInterrupt(60_000);
310 }
311
312 void testSocketReadInterrupt(int timeout) throws Exception {
313 VThreadRunner.run(() -> {
314 try (var connection = new Connection()) {
315 Socket s = connection.socket1();
316
317
318 // delayed interrupt of current thread
319 Thread thisThread = Thread.currentThread();
320 runAfterParkedAsync(thisThread::interrupt);
321
322 // read from s should block, then throw
323 if (timeout > 0) {
324 s.setSoTimeout(timeout);
325 }
326 try {
327 int n = s.getInputStream().read();
328 fail("read " + n);
329 } catch (SocketException expected) {
330 assertTrue(Thread.interrupted());
331 assertTrue(s.isClosed());
332 }
333 }
334 });
335 }
336
337 /**
338 * Socket close while virtual thread blocked in write.
339 */
340 @Test
341 void testSocketWriteAsyncClose() throws Exception {
342 VThreadRunner.run(() -> {
343 try (var connection = new Connection()) {
344 Socket s = connection.socket1();
345
346 // delayed close of s
347 runAfterParkedAsync(s::close);
348
349 // write to s should block, then throw
350 try {
351 byte[] ba = new byte[100*1024];
352 OutputStream out = s.getOutputStream();
353 for (;;) {
354 out.write(ba);
355 }
356 } catch (SocketException expected) { }
357 }
358 });
359 }
360
361 /**
362 * Socket shutdownOutput while virtual thread blocked in write.
363 */
364 @Test
365 void testSocketWriteAsyncShutdownOutput() throws Exception {
366 VThreadRunner.run(() -> {
367 try (var connection = new Connection()) {
368 Socket s = connection.socket1();
369
370 // delayed shutdown of s
371 runAfterParkedAsync(s::shutdownOutput);
372
373 // write to s should block, then throw
374 try {
375 byte[] ba = new byte[100*1024];
376 OutputStream out = s.getOutputStream();
377 for (;;) {
378 out.write(ba);
379 }
380 } catch (SocketException expected) { }
381 assertFalse(s.isClosed());
382 }
383 });
384 }
385
386 /**
387 * Virtual thread interrupted while blocked in Socket write.
388 */
389 @Test
390 void testSocketWriteInterrupt() throws Exception {
391 VThreadRunner.run(() -> {
392 try (var connection = new Connection()) {
393 Socket s = connection.socket1();
394
395 // delayed interrupt of current thread
396 Thread thisThread = Thread.currentThread();
397 runAfterParkedAsync(thisThread::interrupt);
398
399 // write to s should block, then throw
400 try {
401 byte[] ba = new byte[100*1024];
402 OutputStream out = s.getOutputStream();
403 for (;;) {
404 out.write(ba);
405 }
406 } catch (SocketException expected) {
407 assertTrue(Thread.interrupted());
408 assertTrue(s.isClosed());
409 }
410 }
411 });
412 }
413
414 /**
415 * Virtual thread reading urgent data when SO_OOBINLINE is enabled.
416 */
417 @Test
418 void testSocketReadUrgentData() throws Exception {
419 VThreadRunner.run(() -> {
420 try (var connection = new Connection()) {
421 Socket s1 = connection.socket1();
422 Socket s2 = connection.socket2();
423
424 // urgent data should be received
425 runAfterParkedAsync(() -> s2.sendUrgentData('X'));
426
427 // read should block, then read the OOB byte
428 s1.setOOBInline(true);
429 byte[] ba = new byte[10];
430 int n = s1.getInputStream().read(ba);
431 assertTrue(n == 1);
432 assertTrue(ba[0] == 'X');
433
434 // urgent data should not be received
435 s1.setOOBInline(false);
436 s1.setSoTimeout(500);
437 s2.sendUrgentData('X');
438 try {
439 s1.getInputStream().read(ba);
440 fail();
441 } catch (SocketTimeoutException expected) { }
442 }
443 });
444 }
445
446 /**
447 * ServerSocket accept, no blocking.
448 */
449 @Test
450 void testServerSocketAccept1() throws Exception {
451 VThreadRunner.run(() -> {
452 try (var listener = new ServerSocket()) {
453 InetAddress loopback = InetAddress.getLoopbackAddress();
454 listener.bind(new InetSocketAddress(loopback, 0));
455
456 // establish connection
457 var socket1 = new Socket(loopback, listener.getLocalPort());
458
459 // accept should not block
460 var socket2 = listener.accept();
461 socket1.close();
462 socket2.close();
463 }
464 });
465 }
466
467 /**
468 * Virtual thread blocks in accept.
469 */
470 @Test
471 void testServerSocketAccept2() throws Exception {
472 testServerSocketAccept(0);
473 }
474
475 /**
476 * Virtual thread blocks in timed accept.
477 */
478 @Test
479 void testServerSocketAccept3() throws Exception {
480 testServerSocketAccept(60_000);
481 }
482
483 void testServerSocketAccept(int timeout) throws Exception {
484 VThreadRunner.run(() -> {
485 try (var listener = new ServerSocket()) {
486 InetAddress loopback = InetAddress.getLoopbackAddress();
487 listener.bind(new InetSocketAddress(loopback, 0));
488
489 // schedule connect
490 var socket1 = new Socket();
491 SocketAddress remote = listener.getLocalSocketAddress();
492 runAfterParkedAsync(() -> socket1.connect(remote));
493
494 // accept should block
495 if (timeout > 0) {
496 listener.setSoTimeout(timeout);
497 }
498 var socket2 = listener.accept();
499 socket1.close();
500 socket2.close();
501 }
502 });
503 }
504
505 /**
506 * ServerSocket close while virtual thread blocked in accept.
507 */
508 @Test
509 void testServerSocketAcceptAsyncClose1() throws Exception {
510 testServerSocketAcceptAsyncClose(0);
511 }
512
513 /**
514 * ServerSocket close while virtual thread blocked in timed accept.
515 */
516 @Test
517 void testServerSocketAcceptAsyncClose2() throws Exception {
518 testServerSocketAcceptAsyncClose(60_000);
519 }
520
521 void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
522 VThreadRunner.run(() -> {
523 try (var listener = new ServerSocket()) {
524 InetAddress loopback = InetAddress.getLoopbackAddress();
525 listener.bind(new InetSocketAddress(loopback, 0));
526
527 // delayed close of listener
528 runAfterParkedAsync(listener::close);
529
530 // accept should block, then throw
531 if (timeout > 0) {
532 listener.setSoTimeout(timeout);
533 }
534 try {
535 listener.accept().close();
536 fail("connection accepted???");
537 } catch (SocketException expected) { }
538 }
539 });
540 }
541
542 /**
543 * Virtual thread interrupted while blocked in ServerSocket accept.
544 */
545 @Test
546 void testServerSocketAcceptInterrupt1() throws Exception {
547 testServerSocketAcceptInterrupt(0);
548 }
549
550 /**
551 * Virtual thread interrupted while blocked in ServerSocket accept with timeout.
552 */
553 @Test
554 void testServerSocketAcceptInterrupt2() throws Exception {
555 testServerSocketAcceptInterrupt(60_000);
556 }
557
558 void testServerSocketAcceptInterrupt(int timeout) throws Exception {
559 VThreadRunner.run(() -> {
560 try (var listener = new ServerSocket()) {
561 InetAddress loopback = InetAddress.getLoopbackAddress();
562 listener.bind(new InetSocketAddress(loopback, 0));
563
564 // delayed interrupt of current thread
565 Thread thisThread = Thread.currentThread();
566 runAfterParkedAsync(thisThread::interrupt);
567
568 // accept should block, then throw
569 if (timeout > 0) {
570 listener.setSoTimeout(timeout);
571 }
572 try {
573 listener.accept().close();
574 fail("connection accepted???");
575 } catch (SocketException expected) {
576 assertTrue(Thread.interrupted());
577 assertTrue(listener.isClosed());
578 }
579 }
580 });
581 }
582
583 /**
584 * DatagramSocket receive/send, no blocking.
585 */
586 @Test
587 void testDatagramSocketSendReceive1() throws Exception {
588 VThreadRunner.run(() -> {
589 try (DatagramSocket s1 = new DatagramSocket(null);
590 DatagramSocket s2 = new DatagramSocket(null)) {
591
592 InetAddress lh = InetAddress.getLoopbackAddress();
593 s1.bind(new InetSocketAddress(lh, 0));
594 s2.bind(new InetSocketAddress(lh, 0));
595
596 // send should not block
597 byte[] bytes = "XXX".getBytes("UTF-8");
598 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
599 p1.setSocketAddress(s2.getLocalSocketAddress());
600 s1.send(p1);
601
602 // receive should not block
603 byte[] ba = new byte[100];
604 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
605 s2.receive(p2);
606 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
607 assertTrue(ba[0] == 'X');
608 }
609 });
610 }
611
612 /**
613 * Virtual thread blocks in DatagramSocket receive.
614 */
615 @Test
616 void testDatagramSocketSendReceive2() throws Exception {
617 testDatagramSocketSendReceive(0);
618 }
619
620 /**
621 * Virtual thread blocks in DatagramSocket receive with timeout.
622 */
623 @Test
624 void testDatagramSocketSendReceive3() throws Exception {
625 testDatagramSocketSendReceive(60_000);
626 }
627
628 private void testDatagramSocketSendReceive(int timeout) throws Exception {
629 VThreadRunner.run(() -> {
630 try (DatagramSocket s1 = new DatagramSocket(null);
631 DatagramSocket s2 = new DatagramSocket(null)) {
632
633 InetAddress lh = InetAddress.getLoopbackAddress();
634 s1.bind(new InetSocketAddress(lh, 0));
635 s2.bind(new InetSocketAddress(lh, 0));
636
637 // delayed send
638 byte[] bytes = "XXX".getBytes("UTF-8");
639 DatagramPacket p1 = new DatagramPacket(bytes, bytes.length);
640 p1.setSocketAddress(s2.getLocalSocketAddress());
641 runAfterParkedAsync(() -> s1.send(p1));
642
643 // receive should block
644 if (timeout > 0) {
645 s2.setSoTimeout(timeout);
646 }
647 byte[] ba = new byte[100];
648 DatagramPacket p2 = new DatagramPacket(ba, ba.length);
649 s2.receive(p2);
650 assertEquals(s1.getLocalSocketAddress(), p2.getSocketAddress());
651 assertTrue(ba[0] == 'X');
652 }
653 });
654 }
655
656 /**
657 * Virtual thread blocks in DatagramSocket receive that times out.
658 */
659 @Test
660 void testDatagramSocketReceiveTimeout() throws Exception {
661 VThreadRunner.run(() -> {
662 try (DatagramSocket s = new DatagramSocket(null)) {
663 InetAddress lh = InetAddress.getLoopbackAddress();
664 s.bind(new InetSocketAddress(lh, 0));
665 s.setSoTimeout(500);
666 byte[] ba = new byte[100];
667 DatagramPacket p = new DatagramPacket(ba, ba.length);
668 try {
669 s.receive(p);
670 fail();
671 } catch (SocketTimeoutException expected) { }
672 }
673 });
674 }
675
676 /**
677 * DatagramSocket close while virtual thread blocked in receive.
678 */
679 @Test
680 void testDatagramSocketReceiveAsyncClose1() throws Exception {
681 testDatagramSocketReceiveAsyncClose(0);
682 }
683
684 /**
685 * DatagramSocket close while virtual thread blocked with timeout.
686 */
687 @Test
688 void testDatagramSocketReceiveAsyncClose2() throws Exception {
689 testDatagramSocketReceiveAsyncClose(60_000);
690 }
691
692 private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
693 VThreadRunner.run(() -> {
694 try (DatagramSocket s = new DatagramSocket(null)) {
695 InetAddress lh = InetAddress.getLoopbackAddress();
696 s.bind(new InetSocketAddress(lh, 0));
697
698 // delayed close of s
699 runAfterParkedAsync(s::close);
700
701 // receive should block, then throw
702 if (timeout > 0) {
703 s.setSoTimeout(timeout);
704 }
705 try {
706 byte[] ba = new byte[100];
707 DatagramPacket p = new DatagramPacket(ba, ba.length);
708 s.receive(p);
709 fail();
710 } catch (SocketException expected) { }
711 }
712 });
713 }
714
715 /**
716 * Virtual thread interrupted while blocked in DatagramSocket receive.
717 */
718 @Test
719 void testDatagramSocketReceiveInterrupt1() throws Exception {
720 testDatagramSocketReceiveInterrupt(0);
721 }
722
723 /**
724 * Virtual thread interrupted while blocked in DatagramSocket receive with timeout.
725 */
726 @Test
727 void testDatagramSocketReceiveInterrupt2() throws Exception {
728 testDatagramSocketReceiveInterrupt(60_000);
729 }
730
731 private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
732 VThreadRunner.run(() -> {
733 try (DatagramSocket s = new DatagramSocket(null)) {
734 InetAddress lh = InetAddress.getLoopbackAddress();
735 s.bind(new InetSocketAddress(lh, 0));
736
737 // delayed interrupt of current thread
738 Thread thisThread = Thread.currentThread();
739 runAfterParkedAsync(thisThread::interrupt);
740
741 // receive should block, then throw
742 if (timeout > 0) {
743 s.setSoTimeout(timeout);
744 }
745 try {
746 byte[] ba = new byte[100];
747 DatagramPacket p = new DatagramPacket(ba, ba.length);
748 s.receive(p);
749 fail();
750 } catch (SocketException expected) {
751 assertTrue(Thread.interrupted());
752 assertTrue(s.isClosed());
753 }
754 }
755 });
756 }
757
758 /**
759 * Creates a loopback connection
760 */
761 static class Connection implements Closeable {
762 private final Socket s1;
763 private final Socket s2;
764 Connection() throws IOException {
765 var lh = InetAddress.getLoopbackAddress();
766 try (var listener = new ServerSocket()) {
767 listener.bind(new InetSocketAddress(lh, 0));
768 Socket s1 = new Socket();
769 Socket s2;
770 try {
771 s1.connect(listener.getLocalSocketAddress());
772 s2 = listener.accept();
773 } catch (IOException ioe) {
774 s1.close();
775 throw ioe;
776 }
777 this.s1 = s1;
778 this.s2 = s2;
779 }
780
781 }
782 Socket socket1() {
783 return s1;
784 }
785 Socket socket2() {
786 return s2;
787 }
788 @Override
789 public void close() throws IOException {
790 s1.close();
791 s2.close();
792 }
793 }
794
795 @FunctionalInterface
796 interface ThrowingRunnable {
797 void run() throws Exception;
798 }
799
800 /**
801 * Runs the given task asynchronously after the current virtual thread has parked.
802 * @return the thread started to run the task
803 */
804 static Thread runAfterParkedAsync(ThrowingRunnable task) {
805 Thread target = Thread.currentThread();
806 if (!target.isVirtual())
807 throw new WrongThreadException();
808 return Thread.ofPlatform().daemon().start(() -> {
809 try {
810 Thread.State state = target.getState();
811 while (state != Thread.State.WAITING
812 && state != Thread.State.TIMED_WAITING) {
813 Thread.sleep(20);
814 state = target.getState();
815 }
816 Thread.sleep(20); // give a bit more time to release carrier
817 task.run();
818 } catch (Exception e) {
819 e.printStackTrace();
820 }
821 });
822 }
823 }