1 /*
  2  * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @modules jdk.incubator.foreign java.base/jdk.internal.vm.annotation java.base/jdk.internal.misc
 27  * @key randomness
 28  * @run testng/othervm TestHandshake
 29  * @run testng/othervm -Xint TestHandshake
 30  * @run testng/othervm -XX:TieredStopAtLevel=1 TestHandshake
 31  * @run testng/othervm -XX:-TieredCompilation TestHandshake
 32  */
 33 
 34 import jdk.incubator.foreign.MemoryAccess;
 35 import jdk.incubator.foreign.MemorySegment;
 36 
 37 import java.lang.invoke.MethodHandles;
 38 import java.lang.invoke.VarHandle;
 39 import java.nio.ByteBuffer;
 40 import java.nio.ByteOrder;
 41 import java.util.concurrent.ExecutorService;
 42 import java.util.concurrent.Executors;
 43 import java.util.concurrent.ThreadLocalRandom;
 44 import java.util.concurrent.TimeUnit;
 45 import java.util.concurrent.atomic.AtomicBoolean;
 46 import java.util.concurrent.atomic.AtomicLong;
 47 
 48 import jdk.incubator.foreign.ResourceScope;
 49 import org.testng.annotations.DataProvider;
 50 import org.testng.annotations.Test;
 51 import static org.testng.Assert.*;
 52 
 53 public class TestHandshake {
 54 
 55     static final int ITERATIONS = 5;
 56     static final int SEGMENT_SIZE = 1_000_000;
 57     static final int MAX_DELAY_MILLIS = 500;
 58     static final int MAX_EXECUTOR_WAIT_SECONDS = 20;
 59     static final int MAX_THREAD_SPIN_WAIT_MILLIS = 200;
 60 
 61     static final int NUM_ACCESSORS = Math.min(10, Runtime.getRuntime().availableProcessors());
 62 
 63     static final AtomicLong start = new AtomicLong();
 64     static final AtomicBoolean started = new AtomicBoolean();
 65 
 66     @Test(dataProvider = "accessors")
 67     public void testHandshake(String testName, AccessorFactory accessorFactory) throws InterruptedException {
 68         for (int it = 0 ; it < ITERATIONS ; it++) {
 69             ResourceScope scope = ResourceScope.newSharedScope();
 70             MemorySegment segment = MemorySegment.allocateNative(SEGMENT_SIZE, 1, scope);
 71             System.out.println("ITERATION " + it);
 72             ExecutorService accessExecutor = Executors.newCachedThreadPool();
 73             start.set(System.currentTimeMillis());
 74             started.set(false);
 75             for (int i = 0; i < NUM_ACCESSORS ; i++) {
 76                 accessExecutor.execute(accessorFactory.make(i, segment));
 77             }
 78             int delay = ThreadLocalRandom.current().nextInt(MAX_DELAY_MILLIS);
 79             System.out.println("Starting handshaker with delay set to " + delay + " millis");
 80             Thread.sleep(delay);
 81             accessExecutor.execute(new Handshaker(scope));
 82             accessExecutor.shutdown();
 83             assertTrue(accessExecutor.awaitTermination(MAX_EXECUTOR_WAIT_SECONDS, TimeUnit.SECONDS));
 84             assertTrue(!segment.scope().isAlive());
 85         }
 86     }
 87 
 88     static abstract class AbstractSegmentAccessor implements Runnable {
 89         final MemorySegment segment;
 90         final int id;
 91 
 92         AbstractSegmentAccessor(int id, MemorySegment segment) {
 93             this.id = id;
 94             this.segment = segment;
 95         }
 96 
 97         @Override
 98         public final void run() {
 99             start("\"Accessor #\" + id");
100             outer: while (segment.scope().isAlive()) {
101                 try {
102                     doAccess();
103                 } catch (IllegalStateException ex) {
104                     long delay = System.currentTimeMillis() - start.get();
105                     System.out.println("Accessor #" + id + " suspending - elapsed (ms): " + delay);
106                     backoff();
107                     delay = System.currentTimeMillis() - start.get();
108                     System.out.println("Accessor #" + id + " resuming - elapsed (ms): " + delay);
109                     continue outer;
110                 }
111             }
112             long delay = System.currentTimeMillis() - start.get();
113             System.out.println("Accessor #" + id + " terminated - elapsed (ms): " + delay);
114         }
115 
116         abstract void doAccess();
117 
118         private void backoff() {
119             try {
120                 Thread.sleep(ThreadLocalRandom.current().nextInt(MAX_THREAD_SPIN_WAIT_MILLIS));
121             } catch (InterruptedException ex) {
122                 throw new AssertionError(ex);
123             }
124         }
125     }
126 
127     static void start(String name) {
128         if (started.compareAndSet(false, true)) {
129             long delay = System.currentTimeMillis() - start.get();
130             System.out.println("Started first thread: " + name + " ; elapsed (ms): " + delay);
131         }
132     }
133 
134     static abstract class AbstractBufferAccessor extends AbstractSegmentAccessor {
135         final ByteBuffer bb;
136 
137         AbstractBufferAccessor(int id, MemorySegment segment) {
138             super(id, segment);
139             this.bb = segment.asByteBuffer();
140         }
141     }
142 
143     static class SegmentAccessor extends AbstractSegmentAccessor {
144 
145         SegmentAccessor(int id, MemorySegment segment) {
146             super(id, segment);
147         }
148 
149         @Override
150         void doAccess() {
151             int sum = 0;
152             for (int i = 0; i < segment.byteSize(); i++) {
153                 sum += MemoryAccess.getByteAtOffset(segment, i);
154             }
155         }
156     }
157 
158     static class SegmentCopyAccessor extends AbstractSegmentAccessor {
159 
160         MemorySegment first, second;
161 
162 
163         SegmentCopyAccessor(int id, MemorySegment segment) {
164             super(id, segment);
165             long split = segment.byteSize() / 2;
166             first = segment.asSlice(0, split);
167             second = segment.asSlice(split);
168         }
169 
170         @Override
171         public void doAccess() {
172             first.copyFrom(second);
173         }
174     }
175 
176     static class SegmentFillAccessor extends AbstractSegmentAccessor {
177 
178         SegmentFillAccessor(int id, MemorySegment segment) {
179             super(id, segment);
180         }
181 
182         @Override
183         public void doAccess() {
184             segment.fill((byte) ThreadLocalRandom.current().nextInt(10));
185         }
186     }
187 
188     static class SegmentMismatchAccessor extends AbstractSegmentAccessor {
189 
190         final MemorySegment copy;
191 
192         SegmentMismatchAccessor(int id, MemorySegment segment) {
193             super(id, segment);
194             this.copy = MemorySegment.allocateNative(SEGMENT_SIZE, 1, segment.scope());
195             copy.copyFrom(segment);
196             MemoryAccess.setByteAtOffset(copy, ThreadLocalRandom.current().nextInt(SEGMENT_SIZE), (byte)42);
197         }
198 
199         @Override
200         public void doAccess() {
201             segment.mismatch(copy);
202         }
203     }
204 
205     static class BufferAccessor extends AbstractBufferAccessor {
206 
207         BufferAccessor(int id, MemorySegment segment) {
208             super(id, segment);
209         }
210 
211         @Override
212         public void doAccess() {
213             int sum = 0;
214             for (int i = 0; i < bb.capacity(); i++) {
215                 sum += bb.get(i);
216             }
217         }
218     }
219 
220     static class BufferHandleAccessor extends AbstractBufferAccessor {
221 
222         static VarHandle handle = MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
223 
224         public BufferHandleAccessor(int id, MemorySegment segment) {
225             super(id, segment);
226         }
227 
228         @Override
229         public void doAccess() {
230             int sum = 0;
231             for (int i = 0; i < bb.capacity() / 2; i++) {
232                 sum += (short) handle.get(bb, i);
233             }
234         }
235     };
236 
237     static class Handshaker implements Runnable {
238 
239         final ResourceScope scope;
240 
241         Handshaker(ResourceScope scope) {
242             this.scope = scope;
243         }
244 
245         @Override
246         public void run() {
247             start("Handshaker");
248             while (true) {
249                 try {
250                     scope.close();
251                     break;
252                 } catch (IllegalStateException ex) {
253                     Thread.onSpinWait();
254                 }
255             }
256             long delay = System.currentTimeMillis() - start.get();
257             System.out.println("Segment closed - elapsed (ms): " + delay);
258         }
259     }
260 
261     interface AccessorFactory {
262         AbstractSegmentAccessor make(int id, MemorySegment segment);
263     }
264 
265     @DataProvider
266     static Object[][] accessors() {
267         return new Object[][] {
268                 { "SegmentAccessor", (AccessorFactory)SegmentAccessor::new },
269                 { "SegmentCopyAccessor", (AccessorFactory)SegmentCopyAccessor::new },
270                 { "SegmentMismatchAccessor", (AccessorFactory)SegmentMismatchAccessor::new },
271                 { "SegmentFillAccessor", (AccessorFactory)SegmentFillAccessor::new },
272                 { "BufferAccessor", (AccessorFactory)BufferAccessor::new },
273                 { "BufferHandleAccessor", (AccessorFactory)BufferHandleAccessor::new }
274         };
275     }
276 }