1 /* 2 * Copyright (c) 2019, 2026, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package jdk.jfr.api.consumer.streaming; 24 25 import java.nio.file.Files; 26 import java.nio.file.Path; 27 import java.nio.file.Paths; 28 import java.time.Instant; 29 import java.util.concurrent.CountDownLatch; 30 import java.util.concurrent.TimeUnit; 31 import java.util.concurrent.atomic.AtomicInteger; 32 import java.util.concurrent.atomic.AtomicReference; 33 34 import jdk.jfr.Event; 35 import jdk.jfr.Name; 36 import jdk.jfr.consumer.EventStream; 37 import jdk.test.lib.Asserts; 38 import jdk.test.lib.jfr.StreamingUtils; 39 import jdk.test.lib.process.ProcessTools; 40 41 /** 42 * @test 43 * @summary Test scenario where JFR event producer is in a different process 44 * with respect to the JFR event stream consumer. 45 * @requires vm.flagless 46 * @requires vm.hasJFR 47 * @library /test/lib /test/jdk 48 * @modules jdk.attach 49 * jdk.jfr 50 * @run main jdk.jfr.api.consumer.streaming.TestCrossProcessStreaming 51 */ 52 53 // Test Sequence: 54 // 1. Main starts a child process "Event-Producer" 55 // 2. Producer process produces TestEvent1 (first batch). 56 // 3. Main process consumes the event stream until pre-defined number of events is consumed. 57 // 4. Main process signals to child process to start producing the 2nd batch of events (TestEvent2). 58 // 5. The second batch is produced for pre-defined number of flush intervals. 59 // 6. Once the main process detects the pre-defined number of flush intervals, it signals 60 // to the producer process to exit. 61 // 7. Producer process communicates the number of events in 2nd batch then exits. 62 // 8. Main process verifies that number of events in 2nd batch arrived as expected, and that 63 // producer process exited w/o error. 64 // 65 // The sequence of steps 2-5 ensures that the stream can be consumed simultaneously 66 // as the producer process is producing events. 67 public class TestCrossProcessStreaming { 68 @Name("Batch1") 69 public static class TestEvent1 extends Event { 70 } 71 @Name("Batch2") 72 public static class TestEvent2 extends Event { 73 } 74 @Name("Result") 75 public static class ResultEvent extends Event { 76 int batch1Count; 77 int batch2Count; 78 } 79 80 public static void main(String... args) throws Exception { 81 Process process = EventProducer.start(); 82 Path repo = StreamingUtils.getJfrRepository(process); 83 84 // Consume 1000 events in a first batch 85 CountDownLatch consumed = new CountDownLatch(1000); 86 try (EventStream es = EventStream.openRepository(repo)) { 87 es.onEvent("Batch1", e -> consumed.countDown()); 88 es.setStartTime(Instant.EPOCH); // read from start 89 es.startAsync(); 90 consumed.await(); 91 } 92 93 signal("second-batch"); 94 95 // Consume events until 'exit' signal. 96 AtomicInteger total = new AtomicInteger(); 97 AtomicInteger produced = new AtomicInteger(-1); 98 AtomicReference<Exception> exception = new AtomicReference<>(); 99 try (EventStream es = EventStream.openRepository(repo)) { 100 es.onEvent("Batch2", e -> { 101 try { 102 if (total.incrementAndGet() == 1000) { 103 signal("exit"); 104 } 105 } catch (Exception exc) { 106 exception.set(exc); 107 } 108 }); 109 es.onEvent("Result",e -> { 110 produced.set(e.getInt("batch2Count")); 111 es.close(); 112 }); 113 es.setStartTime(Instant.EPOCH); 114 es.start(); 115 } 116 process.waitFor(); 117 118 if (exception.get() != null) { 119 throw exception.get(); 120 } 121 Asserts.assertEquals(process.exitValue(), 0, "Incorrect exit value"); 122 Asserts.assertEquals(total.get(), produced.get(), "Missing events"); 123 } 124 125 static class EventProducer { 126 private static final String MAIN_STARTED = "MAIN_STARTED"; 127 128 static Process start() throws Exception { 129 String[] args = {"-XX:StartFlightRecording", EventProducer.class.getName()}; 130 ProcessBuilder pb = ProcessTools.createTestJavaProcessBuilder(args); 131 return ProcessTools.startProcess("Event-Producer", pb, 132 line -> line.contains(MAIN_STARTED), 133 0, TimeUnit.SECONDS); 134 } 135 136 public static void main(String... args) throws Exception { 137 System.out.println(MAIN_STARTED); 138 ResultEvent rs = new ResultEvent(); 139 rs.batch1Count = emit(TestEvent1.class, "second-batch"); 140 rs.batch2Count = emit(TestEvent2.class, "exit"); 141 rs.commit(); 142 } 143 144 static int emit(Class<? extends Event> eventClass, String termination) throws Exception { 145 int count = 0; 146 while (true) { 147 Event event = eventClass.getConstructor().newInstance(); 148 event.commit(); 149 count++; 150 if (count % 1000 == 0) { 151 Thread.sleep(100); 152 if (signalCheck(termination)) { 153 System.out.println("Events generated: " + count); 154 return count; 155 } 156 } 157 } 158 } 159 } 160 161 static void signal(String name) throws Exception { 162 Files.createFile(Paths.get(".", name)); 163 } 164 165 static boolean signalCheck(String name) throws Exception { 166 return Files.exists(Paths.get(".", name)); 167 } 168 } --- EOF ---