1 /* 2 * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @enablePreview 27 * @modules java.base/jdk.internal.vm.annotation java.base/jdk.internal.misc 28 * @key randomness 29 * @run testng/othervm TestHandshake 30 * @run testng/othervm -Xint TestHandshake 31 * @run testng/othervm -XX:TieredStopAtLevel=1 TestHandshake 32 * @run testng/othervm -XX:-TieredCompilation TestHandshake 33 */ 34 35 import java.lang.foreign.Arena; 36 import java.lang.foreign.MemorySegment; 37 import java.lang.invoke.MethodHandles; 38 import java.lang.invoke.VarHandle; 39 import java.nio.ByteBuffer; 40 import java.nio.ByteOrder; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ThreadLocalRandom; 44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.atomic.AtomicBoolean; 46 import java.util.concurrent.atomic.AtomicLong; 47 48 import org.testng.annotations.DataProvider; 49 import org.testng.annotations.Test; 50 51 import static java.lang.foreign.ValueLayout.JAVA_BYTE; 52 import static org.testng.Assert.*; 53 54 public class TestHandshake { 55 56 static final int ITERATIONS = 5; 57 static final int SEGMENT_SIZE = 1_000_000; 58 static final int MAX_DELAY_MILLIS = 500; 59 static final int MAX_EXECUTOR_WAIT_SECONDS = 20; 60 static final int MAX_THREAD_SPIN_WAIT_MILLIS = 200; 61 62 static final int NUM_ACCESSORS = Math.min(10, Runtime.getRuntime().availableProcessors()); 63 64 static final AtomicLong start = new AtomicLong(); 65 static final AtomicBoolean started = new AtomicBoolean(); 66 67 @Test(dataProvider = "accessors") 68 public void testHandshake(String testName, AccessorFactory accessorFactory) throws InterruptedException { 69 for (int it = 0 ; it < ITERATIONS ; it++) { 70 Arena arena = Arena.ofShared(); 71 MemorySegment segment = arena.allocate(SEGMENT_SIZE, 1); 72 System.out.println("ITERATION " + it); 73 ExecutorService accessExecutor = Executors.newCachedThreadPool(); 74 start.set(System.currentTimeMillis()); 75 started.set(false); 76 for (int i = 0; i < NUM_ACCESSORS ; i++) { 77 accessExecutor.execute(accessorFactory.make(i, segment, arena)); 78 } 79 int delay = ThreadLocalRandom.current().nextInt(MAX_DELAY_MILLIS); 80 System.out.println("Starting handshaker with delay set to " + delay + " millis"); 81 Thread.sleep(delay); 82 accessExecutor.execute(new Handshaker(arena)); 83 accessExecutor.shutdown(); 84 assertTrue(accessExecutor.awaitTermination(MAX_EXECUTOR_WAIT_SECONDS, TimeUnit.SECONDS)); 85 assertTrue(!segment.scope().isAlive()); 86 } 87 } 88 89 static abstract class AbstractSegmentAccessor implements Runnable { 90 final MemorySegment segment; 91 final int id; 92 final AtomicBoolean failed = new AtomicBoolean(); 93 94 AbstractSegmentAccessor(int id, MemorySegment segment) { 95 this.id = id; 96 this.segment = segment; 97 } 98 99 @Override 100 public final void run() { 101 start("\"Accessor #\" + id"); 102 outer: while (segment.scope().isAlive()) { 103 try { 104 doAccess(); 105 } catch (IllegalStateException ex) { 106 long delay = System.currentTimeMillis() - start.get(); 107 System.out.println("Accessor #" + id + " suspending - elapsed (ms): " + delay); 108 backoff(); 109 delay = System.currentTimeMillis() - start.get(); 110 System.out.println("Accessor #" + id + " resuming - elapsed (ms): " + delay); 111 continue outer; 112 } 113 } 114 long delay = System.currentTimeMillis() - start.get(); 115 System.out.println("Accessor #" + id + " terminated - elapsed (ms): " + delay); 116 } 117 118 abstract void doAccess(); 119 120 private void backoff() { 121 try { 122 Thread.sleep(ThreadLocalRandom.current().nextInt(MAX_THREAD_SPIN_WAIT_MILLIS)); 123 } catch (InterruptedException ex) { 124 throw new AssertionError(ex); 125 } 126 } 127 } 128 129 static void start(String name) { 130 if (started.compareAndSet(false, true)) { 131 long delay = System.currentTimeMillis() - start.get(); 132 System.out.println("Started first thread: " + name + " ; elapsed (ms): " + delay); 133 } 134 } 135 136 static abstract class AbstractBufferAccessor extends AbstractSegmentAccessor { 137 final ByteBuffer bb; 138 139 AbstractBufferAccessor(int id, MemorySegment segment, Arena _unused) { 140 super(id, segment); 141 this.bb = segment.asByteBuffer(); 142 } 143 } 144 145 static class SegmentAccessor extends AbstractSegmentAccessor { 146 147 SegmentAccessor(int id, MemorySegment segment, Arena _unused) { 148 super(id, segment); 149 } 150 151 @Override 152 void doAccess() { 153 int sum = 0; 154 for (int i = 0; i < segment.byteSize(); i++) { 155 sum += segment.get(JAVA_BYTE, i); 156 } 157 } 158 } 159 160 static class SegmentCopyAccessor extends AbstractSegmentAccessor { 161 162 MemorySegment first, second; 163 164 165 SegmentCopyAccessor(int id, MemorySegment segment, Arena _unused) { 166 super(id, segment); 167 long split = segment.byteSize() / 2; 168 first = segment.asSlice(0, split); 169 second = segment.asSlice(split); 170 } 171 172 @Override 173 public void doAccess() { 174 first.copyFrom(second); 175 } 176 } 177 178 static class SegmentFillAccessor extends AbstractSegmentAccessor { 179 180 SegmentFillAccessor(int id, MemorySegment segment, Arena _unused) { 181 super(id, segment); 182 } 183 184 @Override 185 public void doAccess() { 186 segment.fill((byte) ThreadLocalRandom.current().nextInt(10)); 187 } 188 } 189 190 static class SegmentMismatchAccessor extends AbstractSegmentAccessor { 191 192 final MemorySegment copy; 193 194 SegmentMismatchAccessor(int id, MemorySegment segment, Arena arena) { 195 super(id, segment); 196 this.copy = arena.allocate(SEGMENT_SIZE, 1); 197 copy.copyFrom(segment); 198 copy.set(JAVA_BYTE, ThreadLocalRandom.current().nextInt(SEGMENT_SIZE), (byte)42); 199 } 200 201 @Override 202 public void doAccess() { 203 segment.mismatch(copy); 204 } 205 } 206 207 static class BufferAccessor extends AbstractBufferAccessor { 208 209 BufferAccessor(int id, MemorySegment segment, Arena _unused) { 210 super(id, segment, null); 211 } 212 213 @Override 214 public void doAccess() { 215 int sum = 0; 216 for (int i = 0; i < bb.capacity(); i++) { 217 sum += bb.get(i); 218 } 219 } 220 } 221 222 static class BufferHandleAccessor extends AbstractBufferAccessor { 223 224 static VarHandle handle = MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder()); 225 226 public BufferHandleAccessor(int id, MemorySegment segment, Arena _unused) { 227 super(id, segment, null); 228 } 229 230 @Override 231 public void doAccess() { 232 int sum = 0; 233 for (int i = 0; i < bb.capacity() / 2; i++) { 234 sum += (short) handle.get(bb, i); 235 } 236 } 237 }; 238 239 static class Handshaker implements Runnable { 240 241 final Arena arena; 242 243 Handshaker(Arena arena) { 244 this.arena = arena; 245 } 246 247 @Override 248 public void run() { 249 start("Handshaker"); 250 while (true) { 251 try { 252 arena.close(); 253 break; 254 } catch (IllegalStateException ex) { 255 Thread.onSpinWait(); 256 } 257 } 258 long delay = System.currentTimeMillis() - start.get(); 259 System.out.println("Segment closed - elapsed (ms): " + delay); 260 } 261 } 262 263 interface AccessorFactory { 264 AbstractSegmentAccessor make(int id, MemorySegment segment, Arena arena); 265 } 266 267 @DataProvider 268 static Object[][] accessors() { 269 return new Object[][] { 270 { "SegmentAccessor", (AccessorFactory)SegmentAccessor::new }, 271 { "SegmentCopyAccessor", (AccessorFactory)SegmentCopyAccessor::new }, 272 { "SegmentMismatchAccessor", (AccessorFactory)SegmentMismatchAccessor::new }, 273 { "SegmentFillAccessor", (AccessorFactory)SegmentFillAccessor::new }, 274 { "BufferAccessor", (AccessorFactory)BufferAccessor::new }, 275 { "BufferHandleAccessor", (AccessorFactory)BufferHandleAccessor::new } 276 }; 277 } 278 }