1 /*
  2  * Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @library /test/lib
 27  * @modules java.base/sun.nio.ch
 28  *          jdk.incubator.foreign/jdk.internal.foreign
 29  * @key randomness
 30  * @run testng/othervm TestSocketChannels
 31  */
 32 
 33 import java.net.InetAddress;
 34 import java.net.InetSocketAddress;
 35 import java.nio.ByteBuffer;
 36 import java.nio.channels.ServerSocketChannel;
 37 import java.nio.channels.SocketChannel;
 38 import java.util.Arrays;
 39 import java.util.List;
 40 import java.util.concurrent.atomic.AtomicReference;
 41 import java.util.function.Supplier;
 42 import java.util.stream.Stream;
 43 import jdk.incubator.foreign.MemoryAccess;
 44 import jdk.incubator.foreign.MemorySegment;
 45 import jdk.incubator.foreign.ResourceScope;
 46 import org.testng.annotations.*;
 47 import static org.testng.Assert.*;
 48 
 49 /**
 50  * Tests consisting of buffer views with synchronous NIO network channels.
 51  */
 52 public class TestSocketChannels extends AbstractChannelsTest {
 53 
 54     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 55 
 56     @Test(dataProvider = "closeableScopes")
 57     public void testBasicIOWithClosedSegment(Supplier<ResourceScope> scopeSupplier)
 58         throws Exception
 59     {
 60         try (var channel = SocketChannel.open();
 61              var server = ServerSocketChannel.open();
 62              var connectedChannel = connectChannels(server, channel)) {
 63             ResourceScope scope = scopeSupplier.get();
 64             ByteBuffer bb = segmentBufferOfSize(scope, 16);
 65             scope.close();
 66             assertMessage(expectThrows(ISE, () -> channel.read(bb)),                           "Already closed");
 67             assertMessage(expectThrows(ISE, () -> channel.read(new ByteBuffer[] {bb})),        "Already closed");
 68             assertMessage(expectThrows(ISE, () -> channel.read(new ByteBuffer[] {bb}, 0, 1)),  "Already closed");
 69             assertMessage(expectThrows(ISE, () -> channel.write(bb)),                          "Already closed");
 70             assertMessage(expectThrows(ISE, () -> channel.write(new ByteBuffer[] {bb})),       "Already closed");
 71             assertMessage(expectThrows(ISE, () -> channel.write(new ByteBuffer[] {bb}, 0 ,1)), "Already closed");
 72         }
 73     }
 74 
 75     @Test(dataProvider = "closeableScopes")
 76     public void testScatterGatherWithClosedSegment(Supplier<ResourceScope> scopeSupplier)
 77         throws Exception
 78     {
 79         try (var channel = SocketChannel.open();
 80              var server = ServerSocketChannel.open();
 81              var connectedChannel = connectChannels(server, channel)) {
 82             ResourceScope scope = scopeSupplier.get();
 83             ByteBuffer[] buffers = segmentBuffersOfSize(8, scope, 16);
 84             scope.close();
 85             assertMessage(expectThrows(ISE, () -> channel.write(buffers)),       "Already closed");
 86             assertMessage(expectThrows(ISE, () -> channel.read(buffers)),        "Already closed");
 87             assertMessage(expectThrows(ISE, () -> channel.write(buffers, 0 ,8)), "Already closed");
 88             assertMessage(expectThrows(ISE, () -> channel.read(buffers, 0, 8)),  "Already closed");
 89         }
 90     }
 91 
 92     @Test(dataProvider = "allScopes")
 93     public void testBasicIO(Supplier<ResourceScope> scopeSupplier)
 94         throws Exception
 95     {
 96         ResourceScope scope;
 97         try (var sc1 = SocketChannel.open();
 98              var ssc = ServerSocketChannel.open();
 99              var sc2 = connectChannels(ssc, sc1);
100              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
101             MemorySegment segment1 = MemorySegment.allocateNative(10, 1, scope);
102             MemorySegment segment2 = MemorySegment.allocateNative(10, 1, scope);
103             for (int i = 0; i < 10; i++) {
104                 MemoryAccess.setByteAtOffset(segment1, i, (byte) i);
105             }
106             ByteBuffer bb1 = segment1.asByteBuffer();
107             ByteBuffer bb2 = segment2.asByteBuffer();
108             assertEquals(sc1.write(bb1), 10);
109             assertEquals(sc2.read(bb2), 10);
110             assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
111         }
112     }
113 
114     @Test
115     public void testBasicHeapIOWithGlobalScope() throws Exception {
116         try (var sc1 = SocketChannel.open();
117              var ssc = ServerSocketChannel.open();
118              var sc2 = connectChannels(ssc, sc1)) {
119             var segment1 = MemorySegment.ofArray(new byte[10]);
120             var segment2 = MemorySegment.ofArray(new byte[10]);
121             for (int i = 0; i < 10; i++) {
122                 MemoryAccess.setByteAtOffset(segment1, i, (byte) i);
123             }
124             ByteBuffer bb1 = segment1.asByteBuffer();
125             ByteBuffer bb2 = segment2.asByteBuffer();
126             assertEquals(sc1.write(bb1), 10);
127             assertEquals(sc2.read(bb2), 10);
128             assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
129         }
130     }
131 
132     @Test(dataProvider = "confinedScopes")
133     public void testIOOnConfinedFromAnotherThread(Supplier<ResourceScope> scopeSupplier)
134         throws Exception
135     {
136         try (var channel = SocketChannel.open();
137              var server = ServerSocketChannel.open();
138              var connected = connectChannels(server, channel);
139              var scope = scopeSupplier.get()) {
140             var segment = MemorySegment.allocateNative(10, 1, scope);
141             ByteBuffer bb = segment.asByteBuffer();
142             List<ThrowingRunnable> ioOps = List.of(
143                     () -> channel.write(bb),
144                     () -> channel.read(bb),
145                     () -> channel.write(new ByteBuffer[] {bb}),
146                     () -> channel.read(new ByteBuffer[] {bb}),
147                     () -> channel.write(new ByteBuffer[] {bb}, 0, 1),
148                     () -> channel.read(new ByteBuffer[] {bb}, 0, 1)
149             );
150             for (var ioOp : ioOps) {
151                 AtomicReference<Exception> exception = new AtomicReference<>();
152                 Runnable task = () -> exception.set(expectThrows(ISE, ioOp));
153                 var t = new Thread(task);
154                 t.start();
155                 t.join();
156                 assertMessage(exception.get(), "Attempted access outside owning thread");
157             }
158         }
159     }
160 
161     @Test(dataProvider = "allScopes")
162     public void testScatterGatherIO(Supplier<ResourceScope> scopeSupplier)
163         throws Exception
164     {
165         ResourceScope scope;
166         try (var sc1 = SocketChannel.open();
167              var ssc = ServerSocketChannel.open();
168              var sc2 = connectChannels(ssc, sc1);
169              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
170             var writeBuffers = mixedBuffersOfSize(32, scope, 64);
171             var readBuffers = mixedBuffersOfSize(32, scope, 64);
172             long expectedCount = remaining(writeBuffers);
173             assertEquals(writeNBytes(sc1, writeBuffers, 0, 32, expectedCount), expectedCount);
174             assertEquals(readNBytes(sc2, readBuffers, 0, 32, expectedCount), expectedCount);
175             assertEquals(flip(readBuffers), clear(writeBuffers));
176         }
177     }
178 
179     @Test(dataProvider = "closeableScopes")
180     public void testBasicIOWithDifferentScopes(Supplier<ResourceScope> scopeSupplier)
181          throws Exception
182     {
183         try (var sc1 = SocketChannel.open();
184              var ssc = ServerSocketChannel.open();
185              var sc2 = connectChannels(ssc, sc1);
186              var scope1 = scopeSupplier.get();
187              var scope2 = scopeSupplier.get()) {
188             var writeBuffers = Stream.of(mixedBuffersOfSize(16, scope1, 64), mixedBuffersOfSize(16, scope2, 64))
189                                      .flatMap(Arrays::stream)
190                                      .toArray(ByteBuffer[]::new);
191             var readBuffers = Stream.of(mixedBuffersOfSize(16, scope1, 64), mixedBuffersOfSize(16, scope2, 64))
192                                     .flatMap(Arrays::stream)
193                                     .toArray(ByteBuffer[]::new);
194 
195             long expectedCount = remaining(writeBuffers);
196             assertEquals(writeNBytes(sc1, writeBuffers, 0, 32, expectedCount), expectedCount);
197             assertEquals(readNBytes(sc2, readBuffers, 0, 32, expectedCount), expectedCount);
198             assertEquals(flip(readBuffers), clear(writeBuffers));
199         }
200     }
201 
202     static SocketChannel connectChannels(ServerSocketChannel ssc, SocketChannel sc)
203         throws Exception
204     {
205         ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
206         sc.connect(ssc.getLocalAddress());
207         return ssc.accept();
208     }
209 
210     static long writeNBytes(SocketChannel channel,
211                             ByteBuffer[] buffers, int offset, int len,
212                             long bytes)
213         throws Exception
214     {
215         long total = 0L;
216         do {
217             long n = channel.write(buffers, offset, len);
218             assertTrue(n > 0, "got:" + n);
219             total += n;
220         } while (total < bytes);
221         return total;
222     }
223 
224     static long readNBytes(SocketChannel channel,
225                            ByteBuffer[] buffers, int offset, int len,
226                            long bytes)
227         throws Exception
228     {
229         long total = 0L;
230         do {
231             long n = channel.read(buffers, offset, len);
232             assertTrue(n > 0, "got:" + n);
233             total += n;
234         } while (total < bytes);
235         return total;
236     }
237 }
238