1 /* 2 * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @enablePreview 27 * @modules java.base/jdk.internal.foreign 28 * @run testng/othervm TestMemorySession 29 */ 30 31 import java.lang.foreign.Arena; 32 33 import jdk.internal.foreign.MemorySessionImpl; 34 import org.testng.annotations.DataProvider; 35 import org.testng.annotations.Test; 36 import static org.testng.Assert.*; 37 38 import java.lang.reflect.Method; 39 import java.util.ArrayList; 40 import java.util.List; 41 import java.util.concurrent.atomic.AtomicInteger; 42 import java.util.concurrent.atomic.AtomicReference; 43 import java.util.function.Supplier; 44 import java.util.stream.IntStream; 45 46 public class TestMemorySession { 47 48 final static int N_THREADS = 100; 49 50 @Test 51 public void testConfined() { 52 AtomicInteger acc = new AtomicInteger(); 53 Arena arena = Arena.ofConfined(); 54 for (int i = 0 ; i < N_THREADS ; i++) { 55 int delta = i; 56 addCloseAction(arena, () -> acc.addAndGet(delta)); 57 } 58 assertEquals(acc.get(), 0); 59 60 arena.close(); 61 assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum()); 62 } 63 64 @Test(dataProvider = "sharedSessions") 65 public void testSharedSingleThread(ArenaSupplier arenaSupplier) { 66 AtomicInteger acc = new AtomicInteger(); 67 Arena session = arenaSupplier.get(); 68 for (int i = 0 ; i < N_THREADS ; i++) { 69 int delta = i; 70 addCloseAction(session, () -> acc.addAndGet(delta)); 71 } 72 assertEquals(acc.get(), 0); 73 74 if (!TestMemorySession.ArenaSupplier.isImplicit(session)) { 75 TestMemorySession.ArenaSupplier.close(session); 76 assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum()); 77 } else { 78 session = null; 79 int expected = IntStream.range(0, N_THREADS).sum(); 80 while (acc.get() != expected) { 81 kickGC(); 82 } 83 } 84 } 85 86 @Test(dataProvider = "sharedSessions") 87 public void testSharedMultiThread(ArenaSupplier arenaSupplier) { 88 AtomicInteger acc = new AtomicInteger(); 89 List<Thread> threads = new ArrayList<>(); 90 Arena session = arenaSupplier.get(); 91 AtomicReference<Arena> sessionRef = new AtomicReference<>(session); 92 for (int i = 0 ; i < N_THREADS ; i++) { 93 int delta = i; 94 Thread thread = new Thread(() -> { 95 try { 96 addCloseAction(sessionRef.get(), () -> { 97 acc.addAndGet(delta); 98 }); 99 } catch (IllegalStateException ex) { 100 // already closed - we need to call cleanup manually 101 acc.addAndGet(delta); 102 } 103 }); 104 threads.add(thread); 105 } 106 assertEquals(acc.get(), 0); 107 threads.forEach(Thread::start); 108 109 // if no cleaner, close - not all segments might have been added to the session! 110 // if cleaner, don't unset the session - after all, the session is kept alive by threads 111 if (!TestMemorySession.ArenaSupplier.isImplicit(session)) { 112 while (true) { 113 try { 114 TestMemorySession.ArenaSupplier.close(session); 115 break; 116 } catch (IllegalStateException ise) { 117 // session is acquired (by add) - wait some more 118 } 119 } 120 } 121 122 threads.forEach(t -> { 123 try { 124 t.join(); 125 } catch (InterruptedException ex) { 126 fail(); 127 } 128 }); 129 130 if (!TestMemorySession.ArenaSupplier.isImplicit(session)) { 131 assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum()); 132 } else { 133 session = null; 134 sessionRef.set(null); 135 int expected = IntStream.range(0, N_THREADS).sum(); 136 while (acc.get() != expected) { 137 kickGC(); 138 } 139 } 140 } 141 142 @Test 143 public void testLockSingleThread() { 144 Arena arena = Arena.ofConfined(); 145 List<Arena> handles = new ArrayList<>(); 146 for (int i = 0 ; i < N_THREADS ; i++) { 147 Arena handle = Arena.ofConfined(); 148 keepAlive(handle, arena); 149 handles.add(handle); 150 } 151 152 while (true) { 153 try { 154 arena.close(); 155 assertEquals(handles.size(), 0); 156 break; 157 } catch (IllegalStateException ex) { 158 assertTrue(handles.size() > 0); 159 Arena handle = handles.remove(0); 160 handle.close(); 161 } 162 } 163 } 164 165 @Test 166 public void testLockSharedMultiThread() { 167 Arena arena = Arena.ofShared(); 168 AtomicInteger lockCount = new AtomicInteger(); 169 for (int i = 0 ; i < N_THREADS ; i++) { 170 new Thread(() -> { 171 try (Arena handle = Arena.ofConfined()) { 172 keepAlive(handle, arena); 173 lockCount.incrementAndGet(); 174 waitSomeTime(); 175 lockCount.decrementAndGet(); 176 } catch (IllegalStateException ex) { 177 // might be already closed - do nothing 178 } 179 }).start(); 180 } 181 182 while (true) { 183 try { 184 arena.close(); 185 assertEquals(lockCount.get(), 0); 186 break; 187 } catch (IllegalStateException ex) { 188 waitSomeTime(); 189 } 190 } 191 } 192 193 @Test 194 public void testCloseEmptyConfinedSession() { 195 Arena.ofConfined().close(); 196 } 197 198 @Test 199 public void testCloseEmptySharedSession() { 200 Arena.ofShared().close(); 201 } 202 203 @Test 204 public void testCloseConfinedLock() { 205 Arena arena = Arena.ofConfined(); 206 Arena handle = Arena.ofConfined(); 207 keepAlive(handle, arena); 208 AtomicReference<Throwable> failure = new AtomicReference<>(); 209 Thread t = new Thread(() -> { 210 try { 211 handle.close(); 212 } catch (Throwable ex) { 213 failure.set(ex); 214 } 215 }); 216 t.start(); 217 try { 218 t.join(); 219 assertNotNull(failure.get()); 220 assertEquals(failure.get().getClass(), WrongThreadException.class); 221 } catch (Throwable ex) { 222 throw new AssertionError(ex); 223 } 224 } 225 226 @Test(dataProvider = "allSessions") 227 public void testSessionAcquires(ArenaSupplier ArenaSupplier) { 228 Arena session = ArenaSupplier.get(); 229 acquireRecursive(session, 5); 230 if (!TestMemorySession.ArenaSupplier.isImplicit(session)) 231 TestMemorySession.ArenaSupplier.close(session); 232 } 233 234 private void acquireRecursive(Arena session, int acquireCount) { 235 try (Arena arena = Arena.ofConfined()) { 236 keepAlive(arena, session); 237 if (acquireCount > 0) { 238 // recursive acquire 239 acquireRecursive(session, acquireCount - 1); 240 } 241 if (!ArenaSupplier.isImplicit(session)) { 242 assertThrows(IllegalStateException.class, () -> ArenaSupplier.close(session)); 243 } 244 } 245 } 246 247 @Test 248 public void testConfinedSessionWithImplicitDependency() { 249 Arena root = Arena.ofConfined(); 250 // Create many implicit sessions which depend on 'root', and let them become unreachable. 251 for (int i = 0; i < N_THREADS; i++) { 252 keepAlive(Arena.ofAuto(), root); 253 } 254 // Now let's keep trying to close 'root' until we succeed. This is trickier than it seems: cleanup action 255 // might be called from another thread (the Cleaner thread), so that the confined session lock count is updated racily. 256 // If that happens, the loop below never terminates. 257 while (true) { 258 try { 259 root.close(); 260 break; // success! 261 } catch (IllegalStateException ex) { 262 kickGC(); 263 for (int i = 0 ; i < N_THREADS ; i++) { // add more races from current thread 264 try (Arena arena = Arena.ofConfined()) { 265 keepAlive(arena, root); 266 // dummy 267 } 268 } 269 // try again 270 } 271 } 272 } 273 274 @Test 275 public void testConfinedSessionWithSharedDependency() { 276 Arena root = Arena.ofConfined(); 277 List<Thread> threads = new ArrayList<>(); 278 // Create many implicit sessions which depend on 'root', and let them become unreachable. 279 for (int i = 0; i < N_THREADS; i++) { 280 Arena arena = Arena.ofShared(); // create session inside same thread! 281 keepAlive(arena, root); 282 Thread t = new Thread(arena::close); // close from another thread! 283 threads.add(t); 284 t.start(); 285 } 286 for (int i = 0 ; i < N_THREADS ; i++) { // add more races from current thread 287 try (Arena arena = Arena.ofConfined()) { 288 keepAlive(arena, root); 289 // dummy 290 } 291 } 292 threads.forEach(t -> { 293 try { 294 t.join(); 295 } catch (InterruptedException ex) { 296 // ok 297 } 298 }); 299 // Now let's close 'root'. This is trickier than it seems: releases of the confined session happen in different 300 // threads, so that the confined session lock count is updated racily. If that happens, the following close will blow up. 301 root.close(); 302 } 303 304 @Test(dataProvider = "nonCloseableSessions") 305 public void testNonCloseableSessions(ArenaSupplier arenaSupplier) { 306 var arena = arenaSupplier.get(); 307 var sessionImpl = ((MemorySessionImpl) arena.scope()); 308 assertFalse(sessionImpl.isCloseable()); 309 assertThrows(UnsupportedOperationException.class, () -> 310 sessionImpl.close()); 311 } 312 313 @Test(dataProvider = "allSessionsAndGlobal") 314 public void testIsCloseableBy(ArenaSupplier arenaSupplier) { 315 var arena = arenaSupplier.get(); 316 var sessionImpl = ((MemorySessionImpl) arena.scope()); 317 assertEquals(sessionImpl.isCloseableBy(Thread.currentThread()), sessionImpl.isCloseable()); 318 Thread otherThread = new Thread(); 319 boolean isCloseableByOther = sessionImpl.isCloseable() && !"ConfinedSession".equals(sessionImpl.getClass().getSimpleName()); 320 assertEquals(sessionImpl.isCloseableBy(otherThread), isCloseableByOther); 321 } 322 323 private void waitSomeTime() { 324 try { 325 Thread.sleep(10); 326 } catch (InterruptedException ex) { 327 // ignore 328 } 329 } 330 331 private void kickGC() { 332 for (int i = 0 ; i < 100 ; i++) { 333 byte[] b = new byte[100]; 334 System.gc(); 335 Thread.onSpinWait(); 336 } 337 } 338 339 @DataProvider 340 static Object[][] drops() { 341 return new Object[][] { 342 { (Supplier<Arena>) Arena::ofConfined}, 343 { (Supplier<Arena>) Arena::ofShared}, 344 }; 345 } 346 347 private void keepAlive(Arena child, Arena parent) { 348 MemorySessionImpl parentImpl = MemorySessionImpl.toMemorySession(parent); 349 parentImpl.acquire0(); 350 addCloseAction(child, parentImpl::release0); 351 } 352 353 private void addCloseAction(Arena session, Runnable action) { 354 MemorySessionImpl sessionImpl = MemorySessionImpl.toMemorySession(session); 355 sessionImpl.addCloseAction(action); 356 } 357 358 interface ArenaSupplier extends Supplier<Arena> { 359 360 static void close(Arena arena) { 361 MemorySessionImpl.toMemorySession(arena).close(); 362 } 363 364 static boolean isImplicit(Arena arena) { 365 return !MemorySessionImpl.toMemorySession(arena).isCloseable(); 366 } 367 368 static ArenaSupplier ofAuto() { 369 return Arena::ofAuto; 370 } 371 372 static ArenaSupplier ofGlobal() { 373 return Arena::global; 374 } 375 376 static ArenaSupplier ofArena(Supplier<Arena> arenaSupplier) { 377 return arenaSupplier::get; 378 } 379 } 380 381 @DataProvider(name = "sharedSessions") 382 static Object[][] sharedSessions() { 383 return new Object[][] { 384 { ArenaSupplier.ofArena(Arena::ofShared) }, 385 { ArenaSupplier.ofAuto() }, 386 }; 387 } 388 389 @DataProvider(name = "allSessions") 390 static Object[][] allSessions() { 391 return new Object[][] { 392 { ArenaSupplier.ofArena(Arena::ofConfined) }, 393 { ArenaSupplier.ofArena(Arena::ofShared) }, 394 { ArenaSupplier.ofAuto() }, 395 }; 396 } 397 398 @DataProvider(name = "nonCloseableSessions") 399 static Object[][] nonCloseableSessions() { 400 return new Object[][] { 401 { ArenaSupplier.ofGlobal() }, 402 { ArenaSupplier.ofAuto() } 403 }; 404 } 405 406 @DataProvider(name = "allSessionsAndGlobal") 407 static Object[][] allSessionsAndGlobal() { 408 return new Object[][] { 409 { ArenaSupplier.ofArena(Arena::ofConfined) }, 410 { ArenaSupplier.ofArena(Arena::ofShared) }, 411 { ArenaSupplier.ofAuto() }, 412 { ArenaSupplier.ofGlobal() }, 413 }; 414 } 415 416 }