1 /*
  2  * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @enablePreview
 27  * @modules java.base/jdk.internal.vm.annotation java.base/jdk.internal.misc
 28  * @key randomness
 29  * @run testng/othervm TestHandshake
 30  * @run testng/othervm -Xint TestHandshake
 31  * @run testng/othervm -XX:TieredStopAtLevel=1 TestHandshake
 32  * @run testng/othervm -XX:-TieredCompilation TestHandshake
 33  */
 34 
 35 import java.lang.foreign.Arena;
 36 import java.lang.foreign.MemorySegment;
 37 import java.lang.invoke.MethodHandles;
 38 import java.lang.invoke.VarHandle;
 39 import java.nio.ByteBuffer;
 40 import java.nio.ByteOrder;
 41 import java.util.concurrent.ExecutorService;
 42 import java.util.concurrent.Executors;
 43 import java.util.concurrent.ThreadLocalRandom;
 44 import java.util.concurrent.TimeUnit;
 45 import java.util.concurrent.atomic.AtomicBoolean;
 46 import java.util.concurrent.atomic.AtomicLong;
 47 
 48 import org.testng.annotations.DataProvider;
 49 import org.testng.annotations.Test;
 50 
 51 import static java.lang.foreign.ValueLayout.JAVA_BYTE;
 52 import static org.testng.Assert.*;
 53 
 54 public class TestHandshake {
 55 
 56     static final int ITERATIONS = 5;
 57     static final int SEGMENT_SIZE = 1_000_000;
 58     static final int MAX_DELAY_MILLIS = 500;
 59     static final int MAX_EXECUTOR_WAIT_SECONDS = 20;
 60     static final int MAX_THREAD_SPIN_WAIT_MILLIS = 200;
 61 
 62     static final int NUM_ACCESSORS = Math.min(10, Runtime.getRuntime().availableProcessors());
 63 
 64     static final AtomicLong start = new AtomicLong();
 65     static final AtomicBoolean started = new AtomicBoolean();
 66 
 67     @Test(dataProvider = "accessors")
 68     public void testHandshake(String testName, AccessorFactory accessorFactory) throws InterruptedException {
 69         for (int it = 0 ; it < ITERATIONS ; it++) {
 70             Arena arena = Arena.ofShared();
 71             MemorySegment segment = arena.allocate(SEGMENT_SIZE, 1);
 72             System.out.println("ITERATION " + it);
 73             ExecutorService accessExecutor = Executors.newCachedThreadPool();
 74             start.set(System.currentTimeMillis());
 75             started.set(false);
 76             for (int i = 0; i < NUM_ACCESSORS ; i++) {
 77                 accessExecutor.execute(accessorFactory.make(i, segment, arena));
 78             }
 79             int delay = ThreadLocalRandom.current().nextInt(MAX_DELAY_MILLIS);
 80             System.out.println("Starting handshaker with delay set to " + delay + " millis");
 81             Thread.sleep(delay);
 82             accessExecutor.execute(new Handshaker(arena));
 83             accessExecutor.shutdown();
 84             assertTrue(accessExecutor.awaitTermination(MAX_EXECUTOR_WAIT_SECONDS, TimeUnit.SECONDS));
 85             assertTrue(!segment.scope().isAlive());
 86         }
 87     }
 88 
 89     static abstract class AbstractSegmentAccessor implements Runnable {
 90         final MemorySegment segment;
 91         final int id;
 92         final AtomicBoolean failed = new AtomicBoolean();
 93 
 94         AbstractSegmentAccessor(int id, MemorySegment segment) {
 95             this.id = id;
 96             this.segment = segment;
 97         }
 98 
 99         @Override
100         public final void run() {
101             start("\"Accessor #\" + id");
102             outer: while (segment.scope().isAlive()) {
103                 try {
104                     doAccess();
105                 } catch (IllegalStateException ex) {
106                     long delay = System.currentTimeMillis() - start.get();
107                     System.out.println("Accessor #" + id + " suspending - elapsed (ms): " + delay);
108                     backoff();
109                     delay = System.currentTimeMillis() - start.get();
110                     System.out.println("Accessor #" + id + " resuming - elapsed (ms): " + delay);
111                     continue outer;
112                 }
113             }
114             long delay = System.currentTimeMillis() - start.get();
115             System.out.println("Accessor #" + id + " terminated - elapsed (ms): " + delay);
116         }
117 
118         abstract void doAccess();
119 
120         private void backoff() {
121             try {
122                 Thread.sleep(ThreadLocalRandom.current().nextInt(MAX_THREAD_SPIN_WAIT_MILLIS));
123             } catch (InterruptedException ex) {
124                 throw new AssertionError(ex);
125             }
126         }
127     }
128 
129     static void start(String name) {
130         if (started.compareAndSet(false, true)) {
131             long delay = System.currentTimeMillis() - start.get();
132             System.out.println("Started first thread: " + name + " ; elapsed (ms): " + delay);
133         }
134     }
135 
136     static abstract class AbstractBufferAccessor extends AbstractSegmentAccessor {
137         final ByteBuffer bb;
138 
139         AbstractBufferAccessor(int id, MemorySegment segment, Arena _unused) {
140             super(id, segment);
141             this.bb = segment.asByteBuffer();
142         }
143     }
144 
145     static class SegmentAccessor extends AbstractSegmentAccessor {
146 
147         SegmentAccessor(int id, MemorySegment segment, Arena _unused) {
148             super(id, segment);
149         }
150 
151         @Override
152         void doAccess() {
153             int sum = 0;
154             for (int i = 0; i < segment.byteSize(); i++) {
155                 sum += segment.get(JAVA_BYTE, i);
156             }
157         }
158     }
159 
160     static class SegmentCopyAccessor extends AbstractSegmentAccessor {
161 
162         MemorySegment first, second;
163 
164 
165         SegmentCopyAccessor(int id, MemorySegment segment, Arena _unused) {
166             super(id, segment);
167             long split = segment.byteSize() / 2;
168             first = segment.asSlice(0, split);
169             second = segment.asSlice(split);
170         }
171 
172         @Override
173         public void doAccess() {
174             first.copyFrom(second);
175         }
176     }
177 
178     static class SegmentFillAccessor extends AbstractSegmentAccessor {
179 
180         SegmentFillAccessor(int id, MemorySegment segment, Arena _unused) {
181             super(id, segment);
182         }
183 
184         @Override
185         public void doAccess() {
186             segment.fill((byte) ThreadLocalRandom.current().nextInt(10));
187         }
188     }
189 
190     static class SegmentMismatchAccessor extends AbstractSegmentAccessor {
191 
192         final MemorySegment copy;
193 
194         SegmentMismatchAccessor(int id, MemorySegment segment, Arena arena) {
195             super(id, segment);
196             this.copy = arena.allocate(SEGMENT_SIZE, 1);
197             copy.copyFrom(segment);
198             copy.set(JAVA_BYTE, ThreadLocalRandom.current().nextInt(SEGMENT_SIZE), (byte)42);
199         }
200 
201         @Override
202         public void doAccess() {
203             segment.mismatch(copy);
204         }
205     }
206 
207     static class BufferAccessor extends AbstractBufferAccessor {
208 
209         BufferAccessor(int id, MemorySegment segment, Arena _unused) {
210             super(id, segment, null);
211         }
212 
213         @Override
214         public void doAccess() {
215             int sum = 0;
216             for (int i = 0; i < bb.capacity(); i++) {
217                 sum += bb.get(i);
218             }
219         }
220     }
221 
222     static class BufferHandleAccessor extends AbstractBufferAccessor {
223 
224         static VarHandle handle = MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
225 
226         public BufferHandleAccessor(int id, MemorySegment segment, Arena _unused) {
227             super(id, segment, null);
228         }
229 
230         @Override
231         public void doAccess() {
232             int sum = 0;
233             for (int i = 0; i < bb.capacity() / 2; i++) {
234                 sum += (short) handle.get(bb, i);
235             }
236         }
237     };
238 
239     static class Handshaker implements Runnable {
240 
241         final Arena arena;
242 
243         Handshaker(Arena arena) {
244             this.arena = arena;
245         }
246 
247         @Override
248         public void run() {
249             start("Handshaker");
250             while (true) {
251                 try {
252                     arena.close();
253                     break;
254                 } catch (IllegalStateException ex) {
255                     Thread.onSpinWait();
256                 }
257             }
258             long delay = System.currentTimeMillis() - start.get();
259             System.out.println("Segment closed - elapsed (ms): " + delay);
260         }
261     }
262 
263     interface AccessorFactory {
264         AbstractSegmentAccessor make(int id, MemorySegment segment, Arena arena);
265     }
266 
267     @DataProvider
268     static Object[][] accessors() {
269         return new Object[][] {
270                 { "SegmentAccessor", (AccessorFactory)SegmentAccessor::new },
271                 { "SegmentCopyAccessor", (AccessorFactory)SegmentCopyAccessor::new },
272                 { "SegmentMismatchAccessor", (AccessorFactory)SegmentMismatchAccessor::new },
273                 { "SegmentFillAccessor", (AccessorFactory)SegmentFillAccessor::new },
274                 { "BufferAccessor", (AccessorFactory)BufferAccessor::new },
275                 { "BufferHandleAccessor", (AccessorFactory)BufferHandleAccessor::new }
276         };
277     }
278 }