1 /*
  2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @modules java.base/jdk.internal.foreign
 27  * @run testng/othervm TestMemorySession
 28  */
 29 
 30 import java.lang.foreign.Arena;
 31 
 32 import jdk.internal.foreign.MemorySessionImpl;
 33 import org.testng.annotations.DataProvider;
 34 import org.testng.annotations.Test;
 35 import static org.testng.Assert.*;
 36 
 37 import java.lang.reflect.Method;
 38 import java.util.ArrayList;
 39 import java.util.List;
 40 import java.util.concurrent.atomic.AtomicInteger;
 41 import java.util.concurrent.atomic.AtomicReference;
 42 import java.util.function.Supplier;
 43 import java.util.stream.IntStream;
 44 
 45 public class TestMemorySession {
 46 
 47     final static int N_THREADS = 100;
 48 
 49     @Test
 50     public void testConfined() {
 51         AtomicInteger acc = new AtomicInteger();
 52         Arena arena = Arena.ofConfined();
 53         for (int i = 0 ; i < N_THREADS ; i++) {
 54             int delta = i;
 55             addCloseAction(arena, () -> acc.addAndGet(delta));
 56         }
 57         assertEquals(acc.get(), 0);
 58 
 59         arena.close();
 60         assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum());
 61     }
 62 
 63     @Test(dataProvider = "sharedSessions")
 64     public void testSharedSingleThread(ArenaSupplier arenaSupplier) {
 65         AtomicInteger acc = new AtomicInteger();
 66         Arena session = arenaSupplier.get();
 67         for (int i = 0 ; i < N_THREADS ; i++) {
 68             int delta = i;
 69             addCloseAction(session, () -> acc.addAndGet(delta));
 70         }
 71         assertEquals(acc.get(), 0);
 72 
 73         if (!TestMemorySession.ArenaSupplier.isImplicit(session)) {
 74             TestMemorySession.ArenaSupplier.close(session);
 75             assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum());
 76         } else {
 77             session = null;
 78             int expected = IntStream.range(0, N_THREADS).sum();
 79             while (acc.get() != expected) {
 80                 kickGC();
 81             }
 82         }
 83     }
 84 
 85     @Test(dataProvider = "sharedSessions")
 86     public void testSharedMultiThread(ArenaSupplier arenaSupplier) {
 87         AtomicInteger acc = new AtomicInteger();
 88         List<Thread> threads = new ArrayList<>();
 89         Arena session = arenaSupplier.get();
 90         AtomicReference<Arena> sessionRef = new AtomicReference<>(session);
 91         for (int i = 0 ; i < N_THREADS ; i++) {
 92             int delta = i;
 93             Thread thread = new Thread(() -> {
 94                 try {
 95                     addCloseAction(sessionRef.get(), () -> {
 96                         acc.addAndGet(delta);
 97                     });
 98                 } catch (IllegalStateException ex) {
 99                     // already closed - we need to call cleanup manually
100                     acc.addAndGet(delta);
101                 }
102             });
103             threads.add(thread);
104         }
105         assertEquals(acc.get(), 0);
106         threads.forEach(Thread::start);
107 
108         // if no cleaner, close - not all segments might have been added to the session!
109         // if cleaner, don't unset the session - after all, the session is kept alive by threads
110         if (!TestMemorySession.ArenaSupplier.isImplicit(session)) {
111             while (true) {
112                 try {
113                     TestMemorySession.ArenaSupplier.close(session);
114                     break;
115                 } catch (IllegalStateException ise) {
116                     // session is acquired (by add) - wait some more
117                 }
118             }
119         }
120 
121         threads.forEach(t -> {
122             try {
123                 t.join();
124             } catch (InterruptedException ex) {
125                 fail();
126             }
127         });
128 
129         if (!TestMemorySession.ArenaSupplier.isImplicit(session)) {
130             assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum());
131         } else {
132             session = null;
133             sessionRef.set(null);
134             int expected = IntStream.range(0, N_THREADS).sum();
135             while (acc.get() != expected) {
136                 kickGC();
137             }
138         }
139     }
140 
141     @Test
142     public void testLockSingleThread() {
143         Arena arena = Arena.ofConfined();
144         List<Arena> handles = new ArrayList<>();
145         for (int i = 0 ; i < N_THREADS ; i++) {
146             Arena handle = Arena.ofConfined();
147             keepAlive(handle, arena);
148             handles.add(handle);
149         }
150 
151         while (true) {
152             try {
153                 arena.close();
154                 assertEquals(handles.size(), 0);
155                 break;
156             } catch (IllegalStateException ex) {
157                 assertTrue(handles.size() > 0);
158                 Arena handle = handles.remove(0);
159                 handle.close();
160             }
161         }
162     }
163 
164     @Test
165     public void testLockSharedMultiThread() {
166         Arena arena = Arena.ofShared();
167         AtomicInteger lockCount = new AtomicInteger();
168         for (int i = 0 ; i < N_THREADS ; i++) {
169             new Thread(() -> {
170                 try (Arena handle = Arena.ofConfined()) {
171                     keepAlive(handle, arena);
172                     lockCount.incrementAndGet();
173                     waitSomeTime();
174                     lockCount.decrementAndGet();
175                 } catch (IllegalStateException ex) {
176                     // might be already closed - do nothing
177                 }
178             }).start();
179         }
180 
181         while (true) {
182             try {
183                 arena.close();
184                 assertEquals(lockCount.get(), 0);
185                 break;
186             } catch (IllegalStateException ex) {
187                 waitSomeTime();
188             }
189         }
190     }
191 
192     @Test
193     public void testCloseEmptyConfinedSession() {
194         Arena.ofConfined().close();
195     }
196 
197     @Test
198     public void testCloseEmptySharedSession() {
199         Arena.ofShared().close();
200     }
201 
202     @Test
203     public void testCloseConfinedLock() {
204         Arena arena = Arena.ofConfined();
205         Arena handle = Arena.ofConfined();
206         keepAlive(handle, arena);
207         AtomicReference<Throwable> failure = new AtomicReference<>();
208         Thread t = new Thread(() -> {
209             try {
210                 handle.close();
211             } catch (Throwable ex) {
212                 failure.set(ex);
213             }
214         });
215         t.start();
216         try {
217             t.join();
218             assertNotNull(failure.get());
219             assertEquals(failure.get().getClass(), WrongThreadException.class);
220         } catch (Throwable ex) {
221             throw new AssertionError(ex);
222         }
223     }
224 
225     @Test(dataProvider = "allSessions")
226     public void testSessionAcquires(ArenaSupplier ArenaSupplier) {
227         Arena session = ArenaSupplier.get();
228         acquireRecursive(session, 5);
229         if (!TestMemorySession.ArenaSupplier.isImplicit(session))
230             TestMemorySession.ArenaSupplier.close(session);
231     }
232 
233     private void acquireRecursive(Arena session, int acquireCount) {
234         try (Arena arena = Arena.ofConfined()) {
235             keepAlive(arena, session);
236             if (acquireCount > 0) {
237                 // recursive acquire
238                 acquireRecursive(session, acquireCount - 1);
239             }
240             if (!ArenaSupplier.isImplicit(session)) {
241                 assertThrows(IllegalStateException.class, () -> ArenaSupplier.close(session));
242             }
243         }
244     }
245 
246     @Test
247     public void testConfinedSessionWithImplicitDependency() {
248         Arena root = Arena.ofConfined();
249         // Create many implicit sessions which depend on 'root', and let them become unreachable.
250         for (int i = 0; i < N_THREADS; i++) {
251             keepAlive(Arena.ofAuto(), root);
252         }
253         // Now let's keep trying to close 'root' until we succeed. This is trickier than it seems: cleanup action
254         // might be called from another thread (the Cleaner thread), so that the confined session lock count is updated racily.
255         // If that happens, the loop below never terminates.
256         while (true) {
257             try {
258                 root.close();
259                 break; // success!
260             } catch (IllegalStateException ex) {
261                 kickGC();
262                 for (int i = 0 ; i < N_THREADS ; i++) {  // add more races from current thread
263                     try (Arena arena = Arena.ofConfined()) {
264                         keepAlive(arena, root);
265                         // dummy
266                     }
267                 }
268                 // try again
269             }
270         }
271     }
272 
273     @Test
274     public void testConfinedSessionWithSharedDependency() {
275         Arena root = Arena.ofConfined();
276         List<Thread> threads = new ArrayList<>();
277         // Create many implicit sessions which depend on 'root', and let them become unreachable.
278         for (int i = 0; i < N_THREADS; i++) {
279             Arena arena = Arena.ofShared(); // create session inside same thread!
280             keepAlive(arena, root);
281             Thread t = new Thread(arena::close); // close from another thread!
282             threads.add(t);
283             t.start();
284         }
285         for (int i = 0 ; i < N_THREADS ; i++) { // add more races from current thread
286             try (Arena arena = Arena.ofConfined()) {
287                 keepAlive(arena, root);
288                 // dummy
289             }
290         }
291         threads.forEach(t -> {
292             try {
293                 t.join();
294             } catch (InterruptedException ex) {
295                 // ok
296             }
297         });
298         // Now let's close 'root'. This is trickier than it seems: releases of the confined session happen in different
299         // threads, so that the confined session lock count is updated racily. If that happens, the following close will blow up.
300         root.close();
301     }
302 
303     @Test(dataProvider = "nonCloseableSessions")
304     public void testNonCloseableSessions(ArenaSupplier arenaSupplier) {
305         var arena = arenaSupplier.get();
306         var sessionImpl = ((MemorySessionImpl) arena.scope());
307         assertFalse(sessionImpl.isCloseable());
308         assertThrows(UnsupportedOperationException.class, () ->
309                 sessionImpl.close());
310     }
311 
312     @Test(dataProvider = "allSessionsAndGlobal")
313     public void testIsCloseableBy(ArenaSupplier arenaSupplier) {
314         var arena = arenaSupplier.get();
315         var sessionImpl = ((MemorySessionImpl) arena.scope());
316         assertEquals(sessionImpl.isCloseableBy(Thread.currentThread()), sessionImpl.isCloseable());
317         Thread otherThread = new Thread();
318         boolean isCloseableByOther = sessionImpl.isCloseable() && !"ConfinedSession".equals(sessionImpl.getClass().getSimpleName());
319         assertEquals(sessionImpl.isCloseableBy(otherThread), isCloseableByOther);
320     }
321 
322     private void waitSomeTime() {
323         try {
324             Thread.sleep(10);
325         } catch (InterruptedException ex) {
326             // ignore
327         }
328     }
329 
330     private void kickGC() {
331         for (int i = 0 ; i < 100 ; i++) {
332             byte[] b = new byte[100];
333             System.gc();
334             Thread.onSpinWait();
335         }
336     }
337 
338     @DataProvider
339     static Object[][] drops() {
340         return new Object[][] {
341                 { (Supplier<Arena>) Arena::ofConfined},
342                 { (Supplier<Arena>) Arena::ofShared},
343         };
344     }
345 
346     private void keepAlive(Arena child, Arena parent) {
347         MemorySessionImpl parentImpl = MemorySessionImpl.toMemorySession(parent);
348         parentImpl.acquire0();
349         addCloseAction(child, parentImpl::release0);
350     }
351 
352     private void addCloseAction(Arena session, Runnable action) {
353         MemorySessionImpl sessionImpl = MemorySessionImpl.toMemorySession(session);
354         sessionImpl.addCloseAction(action);
355     }
356 
357     interface ArenaSupplier extends Supplier<Arena> {
358 
359         static void close(Arena arena) {
360             MemorySessionImpl.toMemorySession(arena).close();
361         }
362 
363         static boolean isImplicit(Arena arena) {
364             return !MemorySessionImpl.toMemorySession(arena).isCloseable();
365         }
366 
367         static ArenaSupplier ofAuto() {
368             return Arena::ofAuto;
369         }
370 
371         static ArenaSupplier ofGlobal() {
372             return Arena::global;
373         }
374 
375         static ArenaSupplier ofArena(Supplier<Arena> arenaSupplier) {
376             return arenaSupplier::get;
377         }
378     }
379 
380     @DataProvider(name = "sharedSessions")
381     static Object[][] sharedSessions() {
382         return new Object[][] {
383                 { ArenaSupplier.ofArena(Arena::ofShared) },
384                 { ArenaSupplier.ofAuto() },
385         };
386     }
387 
388     @DataProvider(name = "allSessions")
389     static Object[][] allSessions() {
390         return new Object[][] {
391                 { ArenaSupplier.ofArena(Arena::ofConfined) },
392                 { ArenaSupplier.ofArena(Arena::ofShared) },
393                 { ArenaSupplier.ofAuto() },
394         };
395     }
396 
397     @DataProvider(name = "nonCloseableSessions")
398     static Object[][] nonCloseableSessions() {
399         return new Object[][] {
400                 { ArenaSupplier.ofGlobal() },
401                 { ArenaSupplier.ofAuto() }
402         };
403     }
404 
405     @DataProvider(name = "allSessionsAndGlobal")
406     static Object[][] allSessionsAndGlobal() {
407         return new Object[][] {
408                 { ArenaSupplier.ofArena(Arena::ofConfined) },
409                 { ArenaSupplier.ofArena(Arena::ofShared) },
410                 { ArenaSupplier.ofAuto() },
411                 { ArenaSupplier.ofGlobal() },
412         };
413     }
414 
415 }