1 /* 2 * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @modules java.base/jdk.internal.vm.annotation java.base/jdk.internal.misc 27 * @key randomness 28 * @run testng/othervm TestHandshake 29 * @run testng/othervm -Xint TestHandshake 30 * @run testng/othervm -XX:TieredStopAtLevel=1 TestHandshake 31 * @run testng/othervm -XX:-TieredCompilation TestHandshake 32 */ 33 34 import java.lang.foreign.Arena; 35 import java.lang.foreign.MemorySegment; 36 import java.lang.invoke.MethodHandles; 37 import java.lang.invoke.VarHandle; 38 import java.nio.ByteBuffer; 39 import java.nio.ByteOrder; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.Executors; 42 import java.util.concurrent.ThreadLocalRandom; 43 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.atomic.AtomicBoolean; 45 import java.util.concurrent.atomic.AtomicLong; 46 47 import org.testng.annotations.DataProvider; 48 import org.testng.annotations.Test; 49 50 import static java.lang.foreign.ValueLayout.JAVA_BYTE; 51 import static org.testng.Assert.*; 52 53 public class TestHandshake { 54 55 static final int ITERATIONS = 5; 56 static final int SEGMENT_SIZE = 1_000_000; 57 static final int MAX_DELAY_MILLIS = 500; 58 static final int MAX_EXECUTOR_WAIT_SECONDS = 20; 59 static final int MAX_THREAD_SPIN_WAIT_MILLIS = 200; 60 61 static final int NUM_ACCESSORS = Math.min(10, Runtime.getRuntime().availableProcessors()); 62 63 static final AtomicLong start = new AtomicLong(); 64 static final AtomicBoolean started = new AtomicBoolean(); 65 66 @Test(dataProvider = "accessors") 67 public void testHandshake(String testName, AccessorFactory accessorFactory) throws InterruptedException { 68 for (int it = 0 ; it < ITERATIONS ; it++) { 69 Arena arena = Arena.ofShared(); 70 MemorySegment segment = arena.allocate(SEGMENT_SIZE, 1); 71 System.out.println("ITERATION " + it); 72 ExecutorService accessExecutor = Executors.newCachedThreadPool(); 73 start.set(System.currentTimeMillis()); 74 started.set(false); 75 for (int i = 0; i < NUM_ACCESSORS ; i++) { 76 accessExecutor.execute(accessorFactory.make(i, segment, arena)); 77 } 78 int delay = ThreadLocalRandom.current().nextInt(MAX_DELAY_MILLIS); 79 System.out.println("Starting handshaker with delay set to " + delay + " millis"); 80 Thread.sleep(delay); 81 accessExecutor.execute(new Handshaker(arena)); 82 accessExecutor.shutdown(); 83 assertTrue(accessExecutor.awaitTermination(MAX_EXECUTOR_WAIT_SECONDS, TimeUnit.SECONDS)); 84 assertTrue(!segment.scope().isAlive()); 85 } 86 } 87 88 static abstract class AbstractSegmentAccessor implements Runnable { 89 final MemorySegment segment; 90 final int id; 91 final AtomicBoolean failed = new AtomicBoolean(); 92 93 AbstractSegmentAccessor(int id, MemorySegment segment) { 94 this.id = id; 95 this.segment = segment; 96 } 97 98 @Override 99 public final void run() { 100 start("\"Accessor #\" + id"); 101 outer: while (segment.scope().isAlive()) { 102 try { 103 doAccess(); 104 } catch (IllegalStateException ex) { 105 long delay = System.currentTimeMillis() - start.get(); 106 System.out.println("Accessor #" + id + " suspending - elapsed (ms): " + delay); 107 backoff(); 108 delay = System.currentTimeMillis() - start.get(); 109 System.out.println("Accessor #" + id + " resuming - elapsed (ms): " + delay); 110 continue outer; 111 } 112 } 113 long delay = System.currentTimeMillis() - start.get(); 114 System.out.println("Accessor #" + id + " terminated - elapsed (ms): " + delay); 115 } 116 117 abstract void doAccess(); 118 119 private void backoff() { 120 try { 121 Thread.sleep(ThreadLocalRandom.current().nextInt(MAX_THREAD_SPIN_WAIT_MILLIS)); 122 } catch (InterruptedException ex) { 123 throw new AssertionError(ex); 124 } 125 } 126 } 127 128 static void start(String name) { 129 if (started.compareAndSet(false, true)) { 130 long delay = System.currentTimeMillis() - start.get(); 131 System.out.println("Started first thread: " + name + " ; elapsed (ms): " + delay); 132 } 133 } 134 135 static abstract class AbstractBufferAccessor extends AbstractSegmentAccessor { 136 final ByteBuffer bb; 137 138 AbstractBufferAccessor(int id, MemorySegment segment, Arena _unused) { 139 super(id, segment); 140 this.bb = segment.asByteBuffer(); 141 } 142 } 143 144 static class SegmentAccessor extends AbstractSegmentAccessor { 145 146 SegmentAccessor(int id, MemorySegment segment, Arena _unused) { 147 super(id, segment); 148 } 149 150 @Override 151 void doAccess() { 152 int sum = 0; 153 for (int i = 0; i < segment.byteSize(); i++) { 154 sum += segment.get(JAVA_BYTE, i); 155 } 156 } 157 } 158 159 static class SegmentCopyAccessor extends AbstractSegmentAccessor { 160 161 MemorySegment first, second; 162 163 164 SegmentCopyAccessor(int id, MemorySegment segment, Arena _unused) { 165 super(id, segment); 166 long split = segment.byteSize() / 2; 167 first = segment.asSlice(0, split); 168 second = segment.asSlice(split); 169 } 170 171 @Override 172 public void doAccess() { 173 first.copyFrom(second); 174 } 175 } 176 177 static class SegmentFillAccessor extends AbstractSegmentAccessor { 178 179 SegmentFillAccessor(int id, MemorySegment segment, Arena _unused) { 180 super(id, segment); 181 } 182 183 @Override 184 public void doAccess() { 185 segment.fill((byte) ThreadLocalRandom.current().nextInt(10)); 186 } 187 } 188 189 static class SegmentMismatchAccessor extends AbstractSegmentAccessor { 190 191 final MemorySegment copy; 192 193 SegmentMismatchAccessor(int id, MemorySegment segment, Arena arena) { 194 super(id, segment); 195 this.copy = arena.allocate(SEGMENT_SIZE, 1); 196 copy.copyFrom(segment); 197 copy.set(JAVA_BYTE, ThreadLocalRandom.current().nextInt(SEGMENT_SIZE), (byte)42); 198 } 199 200 @Override 201 public void doAccess() { 202 segment.mismatch(copy); 203 } 204 } 205 206 static class BufferAccessor extends AbstractBufferAccessor { 207 208 BufferAccessor(int id, MemorySegment segment, Arena _unused) { 209 super(id, segment, null); 210 } 211 212 @Override 213 public void doAccess() { 214 int sum = 0; 215 for (int i = 0; i < bb.capacity(); i++) { 216 sum += bb.get(i); 217 } 218 } 219 } 220 221 static class BufferHandleAccessor extends AbstractBufferAccessor { 222 223 static VarHandle handle = MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder()); 224 225 public BufferHandleAccessor(int id, MemorySegment segment, Arena _unused) { 226 super(id, segment, null); 227 } 228 229 @Override 230 public void doAccess() { 231 int sum = 0; 232 for (int i = 0; i < bb.capacity() / 2; i++) { 233 sum += (short) handle.get(bb, i); 234 } 235 } 236 }; 237 238 static class Handshaker implements Runnable { 239 240 final Arena arena; 241 242 Handshaker(Arena arena) { 243 this.arena = arena; 244 } 245 246 @Override 247 public void run() { 248 start("Handshaker"); 249 while (true) { 250 try { 251 arena.close(); 252 break; 253 } catch (IllegalStateException ex) { 254 Thread.onSpinWait(); 255 } 256 } 257 long delay = System.currentTimeMillis() - start.get(); 258 System.out.println("Segment closed - elapsed (ms): " + delay); 259 } 260 } 261 262 interface AccessorFactory { 263 AbstractSegmentAccessor make(int id, MemorySegment segment, Arena arena); 264 } 265 266 @DataProvider 267 static Object[][] accessors() { 268 return new Object[][] { 269 { "SegmentAccessor", (AccessorFactory)SegmentAccessor::new }, 270 { "SegmentCopyAccessor", (AccessorFactory)SegmentCopyAccessor::new }, 271 { "SegmentMismatchAccessor", (AccessorFactory)SegmentMismatchAccessor::new }, 272 { "SegmentFillAccessor", (AccessorFactory)SegmentFillAccessor::new }, 273 { "BufferAccessor", (AccessorFactory)BufferAccessor::new }, 274 { "BufferHandleAccessor", (AccessorFactory)BufferHandleAccessor::new } 275 }; 276 } 277 }