1 /*
  2  * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  */
 23 package jdk.jfr.api.consumer.streaming;
 24 
 25 import java.nio.file.Files;
 26 import java.nio.file.Path;
 27 import java.nio.file.Paths;
 28 import java.time.Instant;
 29 import java.util.concurrent.CountDownLatch;
 30 import java.util.concurrent.TimeUnit;
 31 import java.util.concurrent.atomic.AtomicInteger;
 32 import java.util.concurrent.atomic.AtomicReference;
 33 
 34 import jdk.jfr.Event;
 35 import jdk.jfr.Name;
 36 import jdk.jfr.consumer.EventStream;
 37 import jdk.test.lib.Asserts;
 38 import jdk.test.lib.jfr.StreamingUtils;
 39 import jdk.test.lib.process.ProcessTools;
 40 
 41 /**
 42  * @test
 43  * @summary Test scenario where JFR event producer is in a different process
 44  *          with respect to the JFR event stream consumer.
 45  * @requires vm.flagless
 46  * @requires vm.hasJFR
 47  * @library /test/lib /test/jdk
 48  * @modules jdk.attach
 49  *          jdk.jfr
 50  * @run main jdk.jfr.api.consumer.streaming.TestCrossProcessStreaming
 51  */
 52 
 53 // Test Sequence:
 54 // 1. Main starts a child process "Event-Producer"
 55 // 2. Producer process produces TestEvent1 (first batch).
 56 // 3. Main process consumes the event stream until pre-defined number of events is consumed.
 57 // 4. Main process signals to child process to start producing the 2nd batch of events (TestEvent2).
 58 // 5. The second batch is produced for pre-defined number of flush intervals.
 59 // 6. Once the main process detects the pre-defined number of flush intervals, it signals
 60 //    to the producer process to exit.
 61 // 7. Producer process communicates the number of events in 2nd batch then exits.
 62 // 8. Main process verifies that number of events in 2nd batch arrived as expected, and that
 63 //    producer process exited w/o error.
 64 //
 65 //    The sequence of steps 2-5 ensures that the stream can be consumed simultaneously
 66 //    as the producer process is producing events.
 67 public class TestCrossProcessStreaming {
 68     @Name("Batch1")
 69     public static class TestEvent1 extends Event {
 70     }
 71     @Name("Batch2")
 72     public static class TestEvent2 extends Event {
 73     }
 74     @Name("Result")
 75     public static class ResultEvent extends Event {
 76         int batch1Count;
 77         int batch2Count;
 78     }
 79 
 80     public static void main(String... args) throws Exception {
 81         Process process = EventProducer.start();
 82         Path repo = StreamingUtils.getJfrRepository(process);
 83 
 84         // Consume 1000 events in a first batch
 85         CountDownLatch consumed = new CountDownLatch(1000);
 86         try (EventStream es = EventStream.openRepository(repo)) {
 87             es.onEvent("Batch1", e -> consumed.countDown());
 88             es.setStartTime(Instant.EPOCH); // read from start
 89             es.startAsync();
 90             consumed.await();
 91         }
 92 
 93         signal("second-batch");
 94 
 95         // Consume events until 'exit' signal.
 96         AtomicInteger total = new AtomicInteger();
 97         AtomicInteger produced = new AtomicInteger(-1);
 98         AtomicReference<Exception> exception = new AtomicReference<>();
 99         try (EventStream es = EventStream.openRepository(repo)) {
100             es.onEvent("Batch2", e -> {
101                     try {
102                         if (total.incrementAndGet() == 1000)  {
103                             signal("exit");
104                         }
105                     } catch (Exception exc) {
106                         exception.set(exc);
107                     }
108             });
109             es.onEvent("Result",e -> {
110                 produced.set(e.getInt("batch2Count"));
111                 es.close();
112             });
113             es.setStartTime(Instant.EPOCH);
114             es.start();
115         }
116         process.waitFor();
117 
118         if (exception.get() != null) {
119             throw exception.get();
120         }
121         Asserts.assertEquals(process.exitValue(), 0, "Incorrect exit value");
122         Asserts.assertEquals(total.get(), produced.get(), "Missing events");
123     }
124 
125     static class EventProducer {
126         private static final String MAIN_STARTED = "MAIN_STARTED";
127 
128         static Process start() throws Exception {
129             String[] args = {"-XX:StartFlightRecording", EventProducer.class.getName()};
130             ProcessBuilder pb = ProcessTools.createLimitedTestJavaProcessBuilder(args);
131             return ProcessTools.startProcess("Event-Producer", pb,
132                                              line -> line.contains(MAIN_STARTED),
133                                              0, TimeUnit.SECONDS);
134         }
135 
136         public static void main(String... args) throws Exception {
137             System.out.println(MAIN_STARTED);
138             ResultEvent rs = new ResultEvent();
139             rs.batch1Count = emit(TestEvent1.class, "second-batch");
140             rs.batch2Count = emit(TestEvent2.class, "exit");
141             rs.commit();
142         }
143 
144         static int emit(Class<? extends Event> eventClass, String termination) throws Exception {
145             int count = 0;
146             while (true) {
147                 Event event = eventClass.getConstructor().newInstance();
148                 event.commit();
149                 count++;
150                 if (count % 1000 == 0) {
151                     Thread.sleep(100);
152                     if (signalCheck(termination)) {
153                         System.out.println("Events generated: " + count);
154                         return count;
155                     }
156                 }
157             }
158         }
159     }
160 
161     static void signal(String name) throws Exception {
162         Files.createFile(Paths.get(".", name));
163     }
164 
165     static boolean signalCheck(String name) throws Exception {
166         return Files.exists(Paths.get(".", name));
167     }
168 }
--- EOF ---