1 /*
  2  * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 
 24 /*
 25  * @test
 26  * @enablePreview
 27  * @modules java.base/jdk.internal.foreign
 28  * @run testng/othervm TestMemorySession
 29  */
 30 
 31 import java.lang.foreign.Arena;
 32 
 33 import jdk.internal.foreign.MemorySessionImpl;
 34 import org.testng.annotations.DataProvider;
 35 import org.testng.annotations.Test;
 36 import static org.testng.Assert.*;
 37 
 38 import java.lang.reflect.Method;
 39 import java.util.ArrayList;
 40 import java.util.List;
 41 import java.util.concurrent.atomic.AtomicInteger;
 42 import java.util.concurrent.atomic.AtomicReference;
 43 import java.util.function.Supplier;
 44 import java.util.stream.IntStream;
 45 
 46 public class TestMemorySession {
 47 
 48     final static int N_THREADS = 100;
 49 
 50     @Test
 51     public void testConfined() {
 52         AtomicInteger acc = new AtomicInteger();
 53         Arena arena = Arena.ofConfined();
 54         for (int i = 0 ; i < N_THREADS ; i++) {
 55             int delta = i;
 56             addCloseAction(arena, () -> acc.addAndGet(delta));
 57         }
 58         assertEquals(acc.get(), 0);
 59 
 60         arena.close();
 61         assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum());
 62     }
 63 
 64     @Test(dataProvider = "sharedSessions")
 65     public void testSharedSingleThread(ArenaSupplier arenaSupplier) {
 66         AtomicInteger acc = new AtomicInteger();
 67         Arena session = arenaSupplier.get();
 68         for (int i = 0 ; i < N_THREADS ; i++) {
 69             int delta = i;
 70             addCloseAction(session, () -> acc.addAndGet(delta));
 71         }
 72         assertEquals(acc.get(), 0);
 73 
 74         if (!TestMemorySession.ArenaSupplier.isImplicit(session)) {
 75             TestMemorySession.ArenaSupplier.close(session);
 76             assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum());
 77         } else {
 78             session = null;
 79             int expected = IntStream.range(0, N_THREADS).sum();
 80             while (acc.get() != expected) {
 81                 kickGC();
 82             }
 83         }
 84     }
 85 
 86     @Test(dataProvider = "sharedSessions")
 87     public void testSharedMultiThread(ArenaSupplier arenaSupplier) {
 88         AtomicInteger acc = new AtomicInteger();
 89         List<Thread> threads = new ArrayList<>();
 90         Arena session = arenaSupplier.get();
 91         AtomicReference<Arena> sessionRef = new AtomicReference<>(session);
 92         for (int i = 0 ; i < N_THREADS ; i++) {
 93             int delta = i;
 94             Thread thread = new Thread(() -> {
 95                 try {
 96                     addCloseAction(sessionRef.get(), () -> {
 97                         acc.addAndGet(delta);
 98                     });
 99                 } catch (IllegalStateException ex) {
100                     // already closed - we need to call cleanup manually
101                     acc.addAndGet(delta);
102                 }
103             });
104             threads.add(thread);
105         }
106         assertEquals(acc.get(), 0);
107         threads.forEach(Thread::start);
108 
109         // if no cleaner, close - not all segments might have been added to the session!
110         // if cleaner, don't unset the session - after all, the session is kept alive by threads
111         if (!TestMemorySession.ArenaSupplier.isImplicit(session)) {
112             while (true) {
113                 try {
114                     TestMemorySession.ArenaSupplier.close(session);
115                     break;
116                 } catch (IllegalStateException ise) {
117                     // session is acquired (by add) - wait some more
118                 }
119             }
120         }
121 
122         threads.forEach(t -> {
123             try {
124                 t.join();
125             } catch (InterruptedException ex) {
126                 fail();
127             }
128         });
129 
130         if (!TestMemorySession.ArenaSupplier.isImplicit(session)) {
131             assertEquals(acc.get(), IntStream.range(0, N_THREADS).sum());
132         } else {
133             session = null;
134             sessionRef.set(null);
135             int expected = IntStream.range(0, N_THREADS).sum();
136             while (acc.get() != expected) {
137                 kickGC();
138             }
139         }
140     }
141 
142     @Test
143     public void testLockSingleThread() {
144         Arena arena = Arena.ofConfined();
145         List<Arena> handles = new ArrayList<>();
146         for (int i = 0 ; i < N_THREADS ; i++) {
147             Arena handle = Arena.ofConfined();
148             keepAlive(handle, arena);
149             handles.add(handle);
150         }
151 
152         while (true) {
153             try {
154                 arena.close();
155                 assertEquals(handles.size(), 0);
156                 break;
157             } catch (IllegalStateException ex) {
158                 assertTrue(handles.size() > 0);
159                 Arena handle = handles.remove(0);
160                 handle.close();
161             }
162         }
163     }
164 
165     @Test
166     public void testLockSharedMultiThread() {
167         Arena arena = Arena.ofShared();
168         AtomicInteger lockCount = new AtomicInteger();
169         for (int i = 0 ; i < N_THREADS ; i++) {
170             new Thread(() -> {
171                 try (Arena handle = Arena.ofConfined()) {
172                     keepAlive(handle, arena);
173                     lockCount.incrementAndGet();
174                     waitSomeTime();
175                     lockCount.decrementAndGet();
176                 } catch (IllegalStateException ex) {
177                     // might be already closed - do nothing
178                 }
179             }).start();
180         }
181 
182         while (true) {
183             try {
184                 arena.close();
185                 assertEquals(lockCount.get(), 0);
186                 break;
187             } catch (IllegalStateException ex) {
188                 waitSomeTime();
189             }
190         }
191     }
192 
193     @Test
194     public void testCloseEmptyConfinedSession() {
195         Arena.ofConfined().close();
196     }
197 
198     @Test
199     public void testCloseEmptySharedSession() {
200         Arena.ofShared().close();
201     }
202 
203     @Test
204     public void testCloseConfinedLock() {
205         Arena arena = Arena.ofConfined();
206         Arena handle = Arena.ofConfined();
207         keepAlive(handle, arena);
208         AtomicReference<Throwable> failure = new AtomicReference<>();
209         Thread t = new Thread(() -> {
210             try {
211                 handle.close();
212             } catch (Throwable ex) {
213                 failure.set(ex);
214             }
215         });
216         t.start();
217         try {
218             t.join();
219             assertNotNull(failure.get());
220             assertEquals(failure.get().getClass(), WrongThreadException.class);
221         } catch (Throwable ex) {
222             throw new AssertionError(ex);
223         }
224     }
225 
226     @Test(dataProvider = "allSessions")
227     public void testSessionAcquires(ArenaSupplier ArenaSupplier) {
228         Arena session = ArenaSupplier.get();
229         acquireRecursive(session, 5);
230         if (!TestMemorySession.ArenaSupplier.isImplicit(session))
231             TestMemorySession.ArenaSupplier.close(session);
232     }
233 
234     private void acquireRecursive(Arena session, int acquireCount) {
235         try (Arena arena = Arena.ofConfined()) {
236             keepAlive(arena, session);
237             if (acquireCount > 0) {
238                 // recursive acquire
239                 acquireRecursive(session, acquireCount - 1);
240             }
241             if (!ArenaSupplier.isImplicit(session)) {
242                 assertThrows(IllegalStateException.class, () -> ArenaSupplier.close(session));
243             }
244         }
245     }
246 
247     @Test
248     public void testConfinedSessionWithImplicitDependency() {
249         Arena root = Arena.ofConfined();
250         // Create many implicit sessions which depend on 'root', and let them become unreachable.
251         for (int i = 0; i < N_THREADS; i++) {
252             keepAlive(Arena.ofAuto(), root);
253         }
254         // Now let's keep trying to close 'root' until we succeed. This is trickier than it seems: cleanup action
255         // might be called from another thread (the Cleaner thread), so that the confined session lock count is updated racily.
256         // If that happens, the loop below never terminates.
257         while (true) {
258             try {
259                 root.close();
260                 break; // success!
261             } catch (IllegalStateException ex) {
262                 kickGC();
263                 for (int i = 0 ; i < N_THREADS ; i++) {  // add more races from current thread
264                     try (Arena arena = Arena.ofConfined()) {
265                         keepAlive(arena, root);
266                         // dummy
267                     }
268                 }
269                 // try again
270             }
271         }
272     }
273 
274     @Test
275     public void testConfinedSessionWithSharedDependency() {
276         Arena root = Arena.ofConfined();
277         List<Thread> threads = new ArrayList<>();
278         // Create many implicit sessions which depend on 'root', and let them become unreachable.
279         for (int i = 0; i < N_THREADS; i++) {
280             Arena arena = Arena.ofShared(); // create session inside same thread!
281             keepAlive(arena, root);
282             Thread t = new Thread(arena::close); // close from another thread!
283             threads.add(t);
284             t.start();
285         }
286         for (int i = 0 ; i < N_THREADS ; i++) { // add more races from current thread
287             try (Arena arena = Arena.ofConfined()) {
288                 keepAlive(arena, root);
289                 // dummy
290             }
291         }
292         threads.forEach(t -> {
293             try {
294                 t.join();
295             } catch (InterruptedException ex) {
296                 // ok
297             }
298         });
299         // Now let's close 'root'. This is trickier than it seems: releases of the confined session happen in different
300         // threads, so that the confined session lock count is updated racily. If that happens, the following close will blow up.
301         root.close();
302     }
303 
304     @Test(dataProvider = "nonCloseableSessions")
305     public void testNonCloseableSessions(ArenaSupplier arenaSupplier) {
306         var arena = arenaSupplier.get();
307         var sessionImpl = ((MemorySessionImpl) arena.scope());
308         assertFalse(sessionImpl.isCloseable());
309         assertThrows(UnsupportedOperationException.class, () ->
310                 sessionImpl.close());
311     }
312 
313     @Test(dataProvider = "allSessionsAndGlobal")
314     public void testIsCloseableBy(ArenaSupplier arenaSupplier) {
315         var arena = arenaSupplier.get();
316         var sessionImpl = ((MemorySessionImpl) arena.scope());
317         assertEquals(sessionImpl.isCloseableBy(Thread.currentThread()), sessionImpl.isCloseable());
318         Thread otherThread = new Thread();
319         boolean isCloseableByOther = sessionImpl.isCloseable() && !"ConfinedSession".equals(sessionImpl.getClass().getSimpleName());
320         assertEquals(sessionImpl.isCloseableBy(otherThread), isCloseableByOther);
321     }
322 
323     private void waitSomeTime() {
324         try {
325             Thread.sleep(10);
326         } catch (InterruptedException ex) {
327             // ignore
328         }
329     }
330 
331     private void kickGC() {
332         for (int i = 0 ; i < 100 ; i++) {
333             byte[] b = new byte[100];
334             System.gc();
335             Thread.onSpinWait();
336         }
337     }
338 
339     @DataProvider
340     static Object[][] drops() {
341         return new Object[][] {
342                 { (Supplier<Arena>) Arena::ofConfined},
343                 { (Supplier<Arena>) Arena::ofShared},
344         };
345     }
346 
347     private void keepAlive(Arena child, Arena parent) {
348         MemorySessionImpl parentImpl = MemorySessionImpl.toMemorySession(parent);
349         parentImpl.acquire0();
350         addCloseAction(child, parentImpl::release0);
351     }
352 
353     private void addCloseAction(Arena session, Runnable action) {
354         MemorySessionImpl sessionImpl = MemorySessionImpl.toMemorySession(session);
355         sessionImpl.addCloseAction(action);
356     }
357 
358     interface ArenaSupplier extends Supplier<Arena> {
359 
360         static void close(Arena arena) {
361             MemorySessionImpl.toMemorySession(arena).close();
362         }
363 
364         static boolean isImplicit(Arena arena) {
365             return !MemorySessionImpl.toMemorySession(arena).isCloseable();
366         }
367 
368         static ArenaSupplier ofAuto() {
369             return Arena::ofAuto;
370         }
371 
372         static ArenaSupplier ofGlobal() {
373             return Arena::global;
374         }
375 
376         static ArenaSupplier ofArena(Supplier<Arena> arenaSupplier) {
377             return arenaSupplier::get;
378         }
379     }
380 
381     @DataProvider(name = "sharedSessions")
382     static Object[][] sharedSessions() {
383         return new Object[][] {
384                 { ArenaSupplier.ofArena(Arena::ofShared) },
385                 { ArenaSupplier.ofAuto() },
386         };
387     }
388 
389     @DataProvider(name = "allSessions")
390     static Object[][] allSessions() {
391         return new Object[][] {
392                 { ArenaSupplier.ofArena(Arena::ofConfined) },
393                 { ArenaSupplier.ofArena(Arena::ofShared) },
394                 { ArenaSupplier.ofAuto() },
395         };
396     }
397 
398     @DataProvider(name = "nonCloseableSessions")
399     static Object[][] nonCloseableSessions() {
400         return new Object[][] {
401                 { ArenaSupplier.ofGlobal() },
402                 { ArenaSupplier.ofAuto() }
403         };
404     }
405 
406     @DataProvider(name = "allSessionsAndGlobal")
407     static Object[][] allSessionsAndGlobal() {
408         return new Object[][] {
409                 { ArenaSupplier.ofArena(Arena::ofConfined) },
410                 { ArenaSupplier.ofArena(Arena::ofShared) },
411                 { ArenaSupplier.ofAuto() },
412                 { ArenaSupplier.ofGlobal() },
413         };
414     }
415 
416 }