1 /*
  2  * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @modules java.base/jdk.internal.vm.annotation java.base/jdk.internal.misc
 27  * @key randomness
 28  * @run testng/othervm TestHandshake
 29  * @run testng/othervm -Xint TestHandshake
 30  * @run testng/othervm -XX:TieredStopAtLevel=1 TestHandshake
 31  * @run testng/othervm -XX:-TieredCompilation TestHandshake
 32  */
 33 
 34 import java.lang.foreign.Arena;
 35 import java.lang.foreign.MemorySegment;
 36 import java.lang.invoke.MethodHandles;
 37 import java.lang.invoke.VarHandle;
 38 import java.nio.ByteBuffer;
 39 import java.nio.ByteOrder;
 40 import java.util.concurrent.ExecutorService;
 41 import java.util.concurrent.Executors;
 42 import java.util.concurrent.ThreadLocalRandom;
 43 import java.util.concurrent.TimeUnit;
 44 import java.util.concurrent.atomic.AtomicBoolean;
 45 import java.util.concurrent.atomic.AtomicLong;
 46 
 47 import org.testng.annotations.DataProvider;
 48 import org.testng.annotations.Test;
 49 
 50 import static java.lang.foreign.ValueLayout.JAVA_BYTE;
 51 import static org.testng.Assert.*;
 52 
 53 public class TestHandshake {
 54 
 55     static final int ITERATIONS = 5;
 56     static final int SEGMENT_SIZE = 1_000_000;
 57     static final int MAX_DELAY_MILLIS = 500;
 58     static final int MAX_EXECUTOR_WAIT_SECONDS = 20;
 59     static final int MAX_THREAD_SPIN_WAIT_MILLIS = 200;
 60 
 61     static final int NUM_ACCESSORS = Math.min(10, Runtime.getRuntime().availableProcessors());
 62 
 63     static final AtomicLong start = new AtomicLong();
 64     static final AtomicBoolean started = new AtomicBoolean();
 65 
 66     @Test(dataProvider = "accessors")
 67     public void testHandshake(String testName, AccessorFactory accessorFactory) throws InterruptedException {
 68         for (int it = 0 ; it < ITERATIONS ; it++) {
 69             Arena arena = Arena.ofShared();
 70             MemorySegment segment = arena.allocate(SEGMENT_SIZE, 1);
 71             System.out.println("ITERATION " + it);
 72             ExecutorService accessExecutor = Executors.newCachedThreadPool();
 73             start.set(System.currentTimeMillis());
 74             started.set(false);
 75             for (int i = 0; i < NUM_ACCESSORS ; i++) {
 76                 accessExecutor.execute(accessorFactory.make(i, segment, arena));
 77             }
 78             int delay = ThreadLocalRandom.current().nextInt(MAX_DELAY_MILLIS);
 79             System.out.println("Starting handshaker with delay set to " + delay + " millis");
 80             Thread.sleep(delay);
 81             accessExecutor.execute(new Handshaker(arena));
 82             accessExecutor.shutdown();
 83             assertTrue(accessExecutor.awaitTermination(MAX_EXECUTOR_WAIT_SECONDS, TimeUnit.SECONDS));
 84             assertTrue(!segment.scope().isAlive());
 85         }
 86     }
 87 
 88     static abstract class AbstractSegmentAccessor implements Runnable {
 89         final MemorySegment segment;
 90         final int id;
 91         final AtomicBoolean failed = new AtomicBoolean();
 92 
 93         AbstractSegmentAccessor(int id, MemorySegment segment) {
 94             this.id = id;
 95             this.segment = segment;
 96         }
 97 
 98         @Override
 99         public final void run() {
100             start("\"Accessor #\" + id");
101             outer: while (segment.scope().isAlive()) {
102                 try {
103                     doAccess();
104                 } catch (IllegalStateException ex) {
105                     long delay = System.currentTimeMillis() - start.get();
106                     System.out.println("Accessor #" + id + " suspending - elapsed (ms): " + delay);
107                     backoff();
108                     delay = System.currentTimeMillis() - start.get();
109                     System.out.println("Accessor #" + id + " resuming - elapsed (ms): " + delay);
110                     continue outer;
111                 }
112             }
113             long delay = System.currentTimeMillis() - start.get();
114             System.out.println("Accessor #" + id + " terminated - elapsed (ms): " + delay);
115         }
116 
117         abstract void doAccess();
118 
119         private void backoff() {
120             try {
121                 Thread.sleep(ThreadLocalRandom.current().nextInt(MAX_THREAD_SPIN_WAIT_MILLIS));
122             } catch (InterruptedException ex) {
123                 throw new AssertionError(ex);
124             }
125         }
126     }
127 
128     static void start(String name) {
129         if (started.compareAndSet(false, true)) {
130             long delay = System.currentTimeMillis() - start.get();
131             System.out.println("Started first thread: " + name + " ; elapsed (ms): " + delay);
132         }
133     }
134 
135     static abstract class AbstractBufferAccessor extends AbstractSegmentAccessor {
136         final ByteBuffer bb;
137 
138         AbstractBufferAccessor(int id, MemorySegment segment, Arena _unused) {
139             super(id, segment);
140             this.bb = segment.asByteBuffer();
141         }
142     }
143 
144     static class SegmentAccessor extends AbstractSegmentAccessor {
145 
146         SegmentAccessor(int id, MemorySegment segment, Arena _unused) {
147             super(id, segment);
148         }
149 
150         @Override
151         void doAccess() {
152             int sum = 0;
153             for (int i = 0; i < segment.byteSize(); i++) {
154                 sum += segment.get(JAVA_BYTE, i);
155             }
156         }
157     }
158 
159     static class SegmentCopyAccessor extends AbstractSegmentAccessor {
160 
161         MemorySegment first, second;
162 
163 
164         SegmentCopyAccessor(int id, MemorySegment segment, Arena _unused) {
165             super(id, segment);
166             long split = segment.byteSize() / 2;
167             first = segment.asSlice(0, split);
168             second = segment.asSlice(split);
169         }
170 
171         @Override
172         public void doAccess() {
173             first.copyFrom(second);
174         }
175     }
176 
177     static class SegmentFillAccessor extends AbstractSegmentAccessor {
178 
179         SegmentFillAccessor(int id, MemorySegment segment, Arena _unused) {
180             super(id, segment);
181         }
182 
183         @Override
184         public void doAccess() {
185             segment.fill((byte) ThreadLocalRandom.current().nextInt(10));
186         }
187     }
188 
189     static class SegmentMismatchAccessor extends AbstractSegmentAccessor {
190 
191         final MemorySegment copy;
192 
193         SegmentMismatchAccessor(int id, MemorySegment segment, Arena arena) {
194             super(id, segment);
195             this.copy = arena.allocate(SEGMENT_SIZE, 1);
196             copy.copyFrom(segment);
197             copy.set(JAVA_BYTE, ThreadLocalRandom.current().nextInt(SEGMENT_SIZE), (byte)42);
198         }
199 
200         @Override
201         public void doAccess() {
202             segment.mismatch(copy);
203         }
204     }
205 
206     static class BufferAccessor extends AbstractBufferAccessor {
207 
208         BufferAccessor(int id, MemorySegment segment, Arena _unused) {
209             super(id, segment, null);
210         }
211 
212         @Override
213         public void doAccess() {
214             int sum = 0;
215             for (int i = 0; i < bb.capacity(); i++) {
216                 sum += bb.get(i);
217             }
218         }
219     }
220 
221     static class BufferHandleAccessor extends AbstractBufferAccessor {
222 
223         static VarHandle handle = MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
224 
225         public BufferHandleAccessor(int id, MemorySegment segment, Arena _unused) {
226             super(id, segment, null);
227         }
228 
229         @Override
230         public void doAccess() {
231             int sum = 0;
232             for (int i = 0; i < bb.capacity() / 2; i++) {
233                 sum += (short) handle.get(bb, i);
234             }
235         }
236     };
237 
238     static class Handshaker implements Runnable {
239 
240         final Arena arena;
241 
242         Handshaker(Arena arena) {
243             this.arena = arena;
244         }
245 
246         @Override
247         public void run() {
248             start("Handshaker");
249             while (true) {
250                 try {
251                     arena.close();
252                     break;
253                 } catch (IllegalStateException ex) {
254                     Thread.onSpinWait();
255                 }
256             }
257             long delay = System.currentTimeMillis() - start.get();
258             System.out.println("Segment closed - elapsed (ms): " + delay);
259         }
260     }
261 
262     interface AccessorFactory {
263         AbstractSegmentAccessor make(int id, MemorySegment segment, Arena arena);
264     }
265 
266     @DataProvider
267     static Object[][] accessors() {
268         return new Object[][] {
269                 { "SegmentAccessor", (AccessorFactory)SegmentAccessor::new },
270                 { "SegmentCopyAccessor", (AccessorFactory)SegmentCopyAccessor::new },
271                 { "SegmentMismatchAccessor", (AccessorFactory)SegmentMismatchAccessor::new },
272                 { "SegmentFillAccessor", (AccessorFactory)SegmentFillAccessor::new },
273                 { "BufferAccessor", (AccessorFactory)BufferAccessor::new },
274                 { "BufferHandleAccessor", (AccessorFactory)BufferHandleAccessor::new }
275         };
276     }
277 }