< prev index next >

test/jdk/java/nio/channels/DatagramChannel/InterruptibleOrNot.java

Print this page
@@ -1,7 +1,7 @@
  /*
-  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+  * Copyright (c) 2019, 2024, Oracle and/or its affiliates. All rights reserved.
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   *
   * This code is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License version 2 only, as
   * published by the Free Software Foundation.

@@ -23,11 +23,11 @@
  
  /**
   * @test
   * @bug 8236246
   * @modules java.base/sun.nio.ch
-  * @run testng InterruptibleOrNot
+  * @run junit InterruptibleOrNot
   * @summary Test SelectorProviderImpl.openDatagramChannel(boolean) to create
   *     DatagramChannel objects that optionally support interrupt
   */
  
  import java.io.Closeable;

@@ -38,154 +38,180 @@
  import java.nio.ByteBuffer;
  import java.nio.channels.AsynchronousCloseException;
  import java.nio.channels.ClosedByInterruptException;
  import java.nio.channels.DatagramChannel;
  import java.time.Duration;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
+ import java.util.Arrays;
  import sun.nio.ch.DefaultSelectorProvider;
  
- import org.testng.annotations.Test;
- import static org.testng.Assert.*;
+ import org.junit.jupiter.api.Test;
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.function.Executable;
+ import static org.junit.jupiter.api.Assertions.*;
  
- @Test
  public class InterruptibleOrNot {
+     // DatagramChannel implementation class
+     private static String dcImplClassName;
  
-     public void testInterruptBeforeInterruptibleReceive() throws Exception {
-         testInterruptBeforeReceive(true);
-     }
- 
-     public void testInterruptDuringInterruptibleReceive() throws Exception {
-         testInterruptDuringReceive(true);
-     }
- 
-     public void testInterruptBeforeUninterruptibleReceive() throws Exception {
-         testInterruptBeforeReceive(false);
-     }
- 
-     public void testInterruptDuringUninterruptibleReceive() throws Exception {
-         testInterruptDuringReceive(false);
-     }
- 
-     public void testInterruptBeforeInterruptibleSend() throws Exception {
-         testInterruptBeforeSend(true);
+     @BeforeAll
+     static void setup() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(true)) {
+             dcImplClassName = dc.getClass().getName();
+         }
      }
  
-     public void testInterruptBeforeUninterruptibleSend() throws Exception {
-         testInterruptBeforeSend(false);
+     /**
+      * Call DatagramChannel.receive with the interrupt status set, the DatagramChannel
+      * is interruptible.
+      */
+     @Test
+     public void testInterruptBeforeInterruptibleReceive() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(true)) {
+             ByteBuffer buf = ByteBuffer.allocate(100);
+             Thread.currentThread().interrupt();
+             assertThrows(ClosedByInterruptException.class, () -> dc.receive(buf));
+             assertFalse(dc.isOpen());
+         } finally {
+             Thread.interrupted();  // clear interrupt status
+         }
      }
  
      /**
-      * Test invoking DatagramChannel receive with interrupt status set
+      * Test interrupting a thread blocked in DatagramChannel.receive, the DatagramChannel
+      * is interruptible.
       */
-     static void testInterruptBeforeReceive(boolean interruptible)
-         throws Exception
-     {
-         try (DatagramChannel dc = openDatagramChannel(interruptible)) {
-             dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
-             Future<?> timeout = scheduleClose(dc, Duration.ofSeconds(2));
-             try {
-                 ByteBuffer buf = ByteBuffer.allocate(100);
-                 Thread.currentThread().interrupt();
-                 assertThrows(expectedException(interruptible), () -> dc.receive(buf));
-             } finally {
-                 timeout.cancel(false);
-             }
+     @Test
+     public void testInterruptDuringInterruptibleReceive() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(true)) {
+             ByteBuffer buf = ByteBuffer.allocate(100);
+             Thread thread = Thread.currentThread();
+             onReceive(thread::interrupt);
+             assertThrows(ClosedByInterruptException.class, () -> dc.receive(buf));
+             assertFalse(dc.isOpen());
          } finally {
-             Thread.interrupted();  // clear interrupt
+             Thread.interrupted();  // clear interrupt status
          }
      }
  
      /**
-      * Test Thread.interrupt when target thread is blocked in DatagramChannel receive
+      * Call DatagramChannel.receive with the interrupt status set, the DatagramChannel
+      * is not interruptible.
       */
-     static void testInterruptDuringReceive(boolean interruptible)
-         throws Exception
-     {
-         try (DatagramChannel dc = openDatagramChannel(interruptible)) {
-             dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
-             Future<?> timerTask = scheduleClose(dc, Duration.ofSeconds(5));
-             Future<?> interruptTask = scheduleInterrupt(Thread.currentThread(), Duration.ofSeconds(1));
-             try {
-                 ByteBuffer buf = ByteBuffer.allocate(100);
-                 assertThrows(expectedException(interruptible), () -> dc.receive(buf));
-             } finally {
-                 timerTask.cancel(false);
-                 interruptTask.cancel(false);
-             }
+     @Test
+     public void testInterruptBeforeUninterruptibleReceive() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(false)) {
+             ByteBuffer buf = ByteBuffer.allocate(100);
+             onReceive(() -> {
+                 // close the channel after a delay to ensure receive wakes up
+                 Thread.sleep(1000);
+                 dc.close();
+             });
+             Thread.currentThread().interrupt();
+             assertThrows(AsynchronousCloseException.class, () -> dc.receive(buf));
+             assertFalse(dc.isOpen());
          } finally {
-             Thread.interrupted();  // clear interrupt
+             Thread.interrupted();  // clear interrupt status
          }
      }
  
      /**
-      * Test invoking DatagramChannel send with interrupt status set
+      * Test interrupting a thread blocked in DatagramChannel.receive, the DatagramChannel
+      * is not interruptible.
       */
-     static void testInterruptBeforeSend(boolean interruptible)
-         throws Exception
-     {
-         try (DatagramChannel dc = openDatagramChannel(interruptible)) {
-             dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
-             Future<?> timeout = scheduleClose(dc, Duration.ofSeconds(2));
-             try {
-                 ByteBuffer buf = ByteBuffer.allocate(100);
-                 SocketAddress target = dc.getLocalAddress();
-                 Thread.currentThread().interrupt();
-                 if (interruptible) {
-                     assertThrows(ClosedByInterruptException.class, () -> dc.send(buf, target));
-                 } else {
-                     int n = dc.send(buf, target);
-                     assertTrue(n == 100);
-                 }
-             } finally {
-                 timeout.cancel(false);
-             }
+     @Test
+     public void testInterruptDuringUninterruptibleReceive() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(true)) {
+             ByteBuffer buf = ByteBuffer.allocate(100);
+ 
+             Thread thread = Thread.currentThread();
+             onReceive(() -> {
+                 // interrupt should not cause the receive to wakeup
+                 thread.interrupt();
+ 
+                 // close the channel after a delay to ensure receive wakes up
+                 Thread.sleep(1000);
+                 dc.close();
+             });
+             assertThrows(AsynchronousCloseException.class, () -> dc.receive(buf));
+             assertFalse(dc.isOpen());
          } finally {
-             Thread.interrupted();  // clear interrupt
+             Thread.interrupted();  // clear interrupt status
          }
      }
  
      /**
-      * Creates a DatagramChannel that is interruptible or not.
+      * Call DatagramChannel.send with the interrupt status set, the DatagramChannel
+      * is interruptible.
       */
-     static DatagramChannel openDatagramChannel(boolean interruptible) throws IOException {
-         if (interruptible) {
-             return DatagramChannel.open();
-         } else {
-             return DefaultSelectorProvider.get().openUninterruptibleDatagramChannel();
+     @Test
+     public void testInterruptBeforeInterruptibleSend() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(true)) {
+             ByteBuffer buf = ByteBuffer.allocate(100);
+             SocketAddress target = dc.getLocalAddress();
+             Thread.currentThread().interrupt();
+             assertThrows(ClosedByInterruptException.class, () -> dc.send(buf, target));
+             assertFalse(dc.isOpen());
+         } finally {
+             Thread.interrupted();  // clear interrupt
          }
      }
  
      /**
-      * Expect ClosedByInterruptException if interruptible.
+      * Call DatagramChannel.send with the interrupt status set, the DatagramChannel
+      * is not interruptible.
       */
-     static Class<? extends Exception> expectedException(boolean expectInterrupt) {
-         if (expectInterrupt) {
-             return ClosedByInterruptException.class;
-         } else {
-             return AsynchronousCloseException.class;
+     @Test
+     public void testInterruptBeforeUninterruptibleSend() throws Exception {
+         try (DatagramChannel dc = boundDatagramChannel(false)) {
+             ByteBuffer buf = ByteBuffer.allocate(100);
+             SocketAddress target = dc.getLocalAddress();
+             Thread.currentThread().interrupt();
+             int n = dc.send(buf, target);
+             assertEquals(100, n);
+             assertTrue(dc.isOpen());
+         } finally {
+             Thread.interrupted();  // clear interrupt status
          }
      }
  
      /**
-      * Schedule the given object to be closed.
+      * Creates a DatagramChannel that is interruptible or not, and bound to the loopback
+      * address.
       */
-     static Future<?> scheduleClose(Closeable c, Duration timeout) {
-         long nanos = TimeUnit.NANOSECONDS.convert(timeout);
-         return STPE.schedule(() -> {
-             c.close();
-             return null;
-         }, nanos, TimeUnit.NANOSECONDS);
+     static DatagramChannel boundDatagramChannel(boolean interruptible) throws IOException {
+         DatagramChannel dc;
+         if (interruptible) {
+             dc = DatagramChannel.open();
+         } else {
+             dc = DefaultSelectorProvider.get().openUninterruptibleDatagramChannel();
+         }
+         try {
+             dc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+         } catch (IOException ioe) {
+             dc.close();
+             throw ioe;
+         }
+         return dc;
      }
  
      /**
-      * Schedule the given thread to be interrupted.
+      * Runs the given action when the current thread is sampled in DatagramChannel.receive.
       */
-     static Future<?> scheduleInterrupt(Thread t, Duration timeout) {
-         long nanos = TimeUnit.NANOSECONDS.convert(timeout);
-         return STPE.schedule(t::interrupt, nanos, TimeUnit.NANOSECONDS);
+     static void onReceive(Executable action) {
+         Thread target = Thread.currentThread();
+         Thread.ofPlatform().daemon().start(() -> {
+             try {
+                 boolean found = false;
+                 while (!found) {
+                     Thread.sleep(20);
+                     StackTraceElement[] stack = target.getStackTrace();
+                     found = Arrays.stream(stack)
+                             .anyMatch(e -> dcImplClassName.equals(e.getClassName())
+                                     && "receive".equals(e.getMethodName()));
+                 }
+                 action.execute();
+             } catch (Throwable ex) {
+                 ex.printStackTrace();
+             }
+         });
      }
- 
-     static final ScheduledExecutorService STPE = Executors.newScheduledThreadPool(0);
  }
< prev index next >