1 /*
  2  * Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @library /test/lib
 27  * @modules java.base/sun.nio.ch
 28  *          jdk.incubator.foreign/jdk.internal.foreign
 29  * @key randomness
 30  * @run testng/othervm TestSocketChannels
 31  */
 32 
 33 import java.net.InetAddress;
 34 import java.net.InetSocketAddress;
 35 import java.nio.ByteBuffer;
 36 import java.nio.channels.ServerSocketChannel;
 37 import java.nio.channels.SocketChannel;
 38 import java.util.Arrays;
 39 import java.util.List;
 40 import java.util.concurrent.atomic.AtomicReference;
 41 import java.util.function.Supplier;
 42 import java.util.stream.Stream;
 43 
 44 import jdk.incubator.foreign.MemorySegment;
 45 import jdk.incubator.foreign.ResourceScope;
 46 import org.testng.annotations.*;
 47 
 48 import static jdk.incubator.foreign.ValueLayout.JAVA_BYTE;
 49 import static org.testng.Assert.*;
 50 
 51 /**
 52  * Tests consisting of buffer views with synchronous NIO network channels.
 53  */
 54 public class TestSocketChannels extends AbstractChannelsTest {
 55 
 56     static final Class<IllegalStateException> ISE = IllegalStateException.class;
 57 
 58     @Test(dataProvider = "closeableScopes")
 59     public void testBasicIOWithClosedSegment(Supplier<ResourceScope> scopeSupplier)
 60         throws Exception
 61     {
 62         try (var channel = SocketChannel.open();
 63              var server = ServerSocketChannel.open();
 64              var connectedChannel = connectChannels(server, channel)) {
 65             ResourceScope scope = scopeSupplier.get();
 66             ByteBuffer bb = segmentBufferOfSize(scope, 16);
 67             scope.close();
 68             assertMessage(expectThrows(ISE, () -> channel.read(bb)),                           "Already closed");
 69             assertMessage(expectThrows(ISE, () -> channel.read(new ByteBuffer[] {bb})),        "Already closed");
 70             assertMessage(expectThrows(ISE, () -> channel.read(new ByteBuffer[] {bb}, 0, 1)),  "Already closed");
 71             assertMessage(expectThrows(ISE, () -> channel.write(bb)),                          "Already closed");
 72             assertMessage(expectThrows(ISE, () -> channel.write(new ByteBuffer[] {bb})),       "Already closed");
 73             assertMessage(expectThrows(ISE, () -> channel.write(new ByteBuffer[] {bb}, 0 ,1)), "Already closed");
 74         }
 75     }
 76 
 77     @Test(dataProvider = "closeableScopes")
 78     public void testScatterGatherWithClosedSegment(Supplier<ResourceScope> scopeSupplier)
 79         throws Exception
 80     {
 81         try (var channel = SocketChannel.open();
 82              var server = ServerSocketChannel.open();
 83              var connectedChannel = connectChannels(server, channel)) {
 84             ResourceScope scope = scopeSupplier.get();
 85             ByteBuffer[] buffers = segmentBuffersOfSize(8, scope, 16);
 86             scope.close();
 87             assertMessage(expectThrows(ISE, () -> channel.write(buffers)),       "Already closed");
 88             assertMessage(expectThrows(ISE, () -> channel.read(buffers)),        "Already closed");
 89             assertMessage(expectThrows(ISE, () -> channel.write(buffers, 0 ,8)), "Already closed");
 90             assertMessage(expectThrows(ISE, () -> channel.read(buffers, 0, 8)),  "Already closed");
 91         }
 92     }
 93 
 94     @Test(dataProvider = "allScopes")
 95     public void testBasicIO(Supplier<ResourceScope> scopeSupplier)
 96         throws Exception
 97     {
 98         ResourceScope scope;
 99         try (var sc1 = SocketChannel.open();
100              var ssc = ServerSocketChannel.open();
101              var sc2 = connectChannels(ssc, sc1);
102              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
103             MemorySegment segment1 = MemorySegment.allocateNative(10, 1, scope);
104             MemorySegment segment2 = MemorySegment.allocateNative(10, 1, scope);
105             for (int i = 0; i < 10; i++) {
106                 segment1.set(JAVA_BYTE, i, (byte) i);
107             }
108             ByteBuffer bb1 = segment1.asByteBuffer();
109             ByteBuffer bb2 = segment2.asByteBuffer();
110             assertEquals(sc1.write(bb1), 10);
111             assertEquals(sc2.read(bb2), 10);
112             assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
113         }
114     }
115 
116     @Test
117     public void testBasicHeapIOWithGlobalScope() throws Exception {
118         try (var sc1 = SocketChannel.open();
119              var ssc = ServerSocketChannel.open();
120              var sc2 = connectChannels(ssc, sc1)) {
121             var segment1 = MemorySegment.ofArray(new byte[10]);
122             var segment2 = MemorySegment.ofArray(new byte[10]);
123             for (int i = 0; i < 10; i++) {
124                 segment1.set(JAVA_BYTE, i, (byte) i);
125             }
126             ByteBuffer bb1 = segment1.asByteBuffer();
127             ByteBuffer bb2 = segment2.asByteBuffer();
128             assertEquals(sc1.write(bb1), 10);
129             assertEquals(sc2.read(bb2), 10);
130             assertEquals(bb2.flip(), ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
131         }
132     }
133 
134     @Test(dataProvider = "confinedScopes")
135     public void testIOOnConfinedFromAnotherThread(Supplier<ResourceScope> scopeSupplier)
136         throws Exception
137     {
138         try (var channel = SocketChannel.open();
139              var server = ServerSocketChannel.open();
140              var connected = connectChannels(server, channel);
141              var scope = scopeSupplier.get()) {
142             var segment = MemorySegment.allocateNative(10, 1, scope);
143             ByteBuffer bb = segment.asByteBuffer();
144             List<ThrowingRunnable> ioOps = List.of(
145                     () -> channel.write(bb),
146                     () -> channel.read(bb),
147                     () -> channel.write(new ByteBuffer[] {bb}),
148                     () -> channel.read(new ByteBuffer[] {bb}),
149                     () -> channel.write(new ByteBuffer[] {bb}, 0, 1),
150                     () -> channel.read(new ByteBuffer[] {bb}, 0, 1)
151             );
152             for (var ioOp : ioOps) {
153                 AtomicReference<Exception> exception = new AtomicReference<>();
154                 Runnable task = () -> exception.set(expectThrows(ISE, ioOp));
155                 var t = new Thread(task);
156                 t.start();
157                 t.join();
158                 assertMessage(exception.get(), "Attempted access outside owning thread");
159             }
160         }
161     }
162 
163     @Test(dataProvider = "allScopes")
164     public void testScatterGatherIO(Supplier<ResourceScope> scopeSupplier)
165         throws Exception
166     {
167         ResourceScope scope;
168         try (var sc1 = SocketChannel.open();
169              var ssc = ServerSocketChannel.open();
170              var sc2 = connectChannels(ssc, sc1);
171              var scp = closeableScopeOrNull(scope = scopeSupplier.get())) {
172             var writeBuffers = mixedBuffersOfSize(32, scope, 64);
173             var readBuffers = mixedBuffersOfSize(32, scope, 64);
174             long expectedCount = remaining(writeBuffers);
175             assertEquals(writeNBytes(sc1, writeBuffers, 0, 32, expectedCount), expectedCount);
176             assertEquals(readNBytes(sc2, readBuffers, 0, 32, expectedCount), expectedCount);
177             assertEquals(flip(readBuffers), clear(writeBuffers));
178         }
179     }
180 
181     @Test(dataProvider = "closeableScopes")
182     public void testBasicIOWithDifferentScopes(Supplier<ResourceScope> scopeSupplier)
183          throws Exception
184     {
185         try (var sc1 = SocketChannel.open();
186              var ssc = ServerSocketChannel.open();
187              var sc2 = connectChannels(ssc, sc1);
188              var scope1 = scopeSupplier.get();
189              var scope2 = scopeSupplier.get()) {
190             var writeBuffers = Stream.of(mixedBuffersOfSize(16, scope1, 64), mixedBuffersOfSize(16, scope2, 64))
191                                      .flatMap(Arrays::stream)
192                                      .toArray(ByteBuffer[]::new);
193             var readBuffers = Stream.of(mixedBuffersOfSize(16, scope1, 64), mixedBuffersOfSize(16, scope2, 64))
194                                     .flatMap(Arrays::stream)
195                                     .toArray(ByteBuffer[]::new);
196 
197             long expectedCount = remaining(writeBuffers);
198             assertEquals(writeNBytes(sc1, writeBuffers, 0, 32, expectedCount), expectedCount);
199             assertEquals(readNBytes(sc2, readBuffers, 0, 32, expectedCount), expectedCount);
200             assertEquals(flip(readBuffers), clear(writeBuffers));
201         }
202     }
203 
204     static SocketChannel connectChannels(ServerSocketChannel ssc, SocketChannel sc)
205         throws Exception
206     {
207         ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
208         sc.connect(ssc.getLocalAddress());
209         return ssc.accept();
210     }
211 
212     static long writeNBytes(SocketChannel channel,
213                             ByteBuffer[] buffers, int offset, int len,
214                             long bytes)
215         throws Exception
216     {
217         long total = 0L;
218         do {
219             long n = channel.write(buffers, offset, len);
220             assertTrue(n > 0, "got:" + n);
221             total += n;
222         } while (total < bytes);
223         return total;
224     }
225 
226     static long readNBytes(SocketChannel channel,
227                            ByteBuffer[] buffers, int offset, int len,
228                            long bytes)
229         throws Exception
230     {
231         long total = 0L;
232         do {
233             long n = channel.read(buffers, offset, len);
234             assertTrue(n > 0, "got:" + n);
235             total += n;
236         } while (total < bytes);
237         return total;
238     }
239 }
240