1 /*
  2  * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
  3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  4  *
  5  * This code is free software; you can redistribute it and/or modify it
  6  * under the terms of the GNU General Public License version 2 only, as
  7  * published by the Free Software Foundation.
  8  *
  9  * This code is distributed in the hope that it will be useful, but WITHOUT
 10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 12  * version 2 for more details (a copy is included in the LICENSE file that
 13  * accompanied this code).
 14  *
 15  * You should have received a copy of the GNU General Public License version
 16  * 2 along with this work; if not, write to the Free Software Foundation,
 17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 18  *
 19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 20  * or visit www.oracle.com if you need additional information or have any
 21  * questions.
 22  *
 23  */
 24 
 25 #include "precompiled.hpp"
 26 #include "jfr/jfrEvents.hpp"
 27 #include "jfr/jni/jfrJavaSupport.hpp"
 28 #include "jfr/recorder/jfrRecorder.hpp"
 29 #include "jfr/recorder/checkpoint/jfrCheckpointManager.hpp"
 30 #include "jfr/recorder/repository/jfrChunkWriter.hpp"
 31 #include "jfr/recorder/service/jfrOptionSet.hpp"
 32 #include "jfr/recorder/service/jfrPostBox.hpp"
 33 #include "jfr/recorder/storage/jfrFullStorage.inline.hpp"
 34 #include "jfr/recorder/storage/jfrMemorySpace.inline.hpp"
 35 #include "jfr/recorder/storage/jfrStorage.hpp"
 36 #include "jfr/recorder/storage/jfrStorageControl.hpp"
 37 #include "jfr/recorder/storage/jfrStorageUtils.inline.hpp"
 38 #include "jfr/utilities/jfrIterator.hpp"
 39 #include "jfr/utilities/jfrLinkedList.inline.hpp"
 40 #include "jfr/utilities/jfrTime.hpp"
 41 #include "jfr/writers/jfrNativeEventWriter.hpp"
 42 #include "logging/log.hpp"
 43 #include "runtime/mutexLocker.hpp"
 44 #include "runtime/safepoint.hpp"
 45 #include "runtime/thread.hpp"
 46 
 47 typedef JfrStorage::BufferPtr BufferPtr;
 48 
 49 static JfrStorage* _instance = NULL;
 50 static JfrStorageControl* _control;
 51 
 52 JfrStorage& JfrStorage::instance() {
 53   return *_instance;
 54 }
 55 
 56 JfrStorage* JfrStorage::create(JfrChunkWriter& chunkwriter, JfrPostBox& post_box) {
 57   assert(_instance == NULL, "invariant");
 58   _instance = new JfrStorage(chunkwriter, post_box);
 59   return _instance;
 60 }
 61 
 62 void JfrStorage::destroy() {
 63   if (_instance != NULL) {
 64     delete _instance;
 65     _instance = NULL;
 66   }
 67 }
 68 
 69 JfrStorage::JfrStorage(JfrChunkWriter& chunkwriter, JfrPostBox& post_box) :
 70   _control(NULL),
 71   _global_mspace(NULL),
 72   _thread_local_mspace(NULL),
 73   _chunkwriter(chunkwriter),
 74   _post_box(post_box) {}
 75 
 76 JfrStorage::~JfrStorage() {
 77   if (_control != NULL) {
 78     delete _control;
 79   }
 80   if (_global_mspace != NULL) {
 81     delete _global_mspace;
 82   }
 83   if (_thread_local_mspace != NULL) {
 84     delete _thread_local_mspace;
 85   }
 86   if (_full_list != NULL) {
 87     delete _full_list;
 88   }
 89   _instance = NULL;
 90 }
 91 
 92 static const size_t thread_local_cache_count = 8;
 93 // start to discard data when the only this number of free buffers are left
 94 static const size_t in_memory_discard_threshold_delta = 2;
 95 
 96 bool JfrStorage::initialize() {
 97   assert(_control == NULL, "invariant");
 98   assert(_global_mspace == NULL, "invariant");
 99   assert(_thread_local_mspace == NULL, "invariant");
100 
101   const size_t num_global_buffers = (size_t)JfrOptionSet::num_global_buffers();
102   assert(num_global_buffers >= in_memory_discard_threshold_delta, "invariant");
103   const size_t global_buffer_size = (size_t)JfrOptionSet::global_buffer_size();
104   const size_t thread_buffer_size = (size_t)JfrOptionSet::thread_buffer_size();
105 
106   _control = new JfrStorageControl(num_global_buffers, num_global_buffers - in_memory_discard_threshold_delta);
107   if (_control == NULL) {
108     return false;
109   }
110   _global_mspace = create_mspace<JfrStorageMspace>(global_buffer_size,
111                                                    num_global_buffers, // cache count limit
112                                                    num_global_buffers, // cache_preallocate count
113                                                    false, // preallocate_to_free_list (== preallocate directly to live list)
114                                                    this);
115   if (_global_mspace == NULL) {
116     return false;
117   }
118   assert(_global_mspace->live_list_is_nonempty(), "invariant");
119   _thread_local_mspace = create_mspace<JfrThreadLocalMspace>(thread_buffer_size,
120                                                              thread_local_cache_count, // cache count limit
121                                                              thread_local_cache_count, // cache preallocate count
122                                                              true,  // preallocate_to_free_list
123                                                              this);
124   if (_thread_local_mspace == NULL) {
125     return false;
126   }
127   assert(_thread_local_mspace->free_list_is_nonempty(), "invariant");
128   // The full list will contain nodes pointing to retired global and transient buffers.
129   _full_list = new JfrFullList(*_control);
130   return _full_list != NULL && _full_list->initialize(num_global_buffers * 2);
131 }
132 
133 JfrStorageControl& JfrStorage::control() {
134   return *instance()._control;
135 }
136 
137 static void log_allocation_failure(const char* msg, size_t size) {
138   log_warning(jfr)("Unable to allocate " SIZE_FORMAT " bytes of %s.", size, msg);
139 }
140 
141 BufferPtr JfrStorage::acquire_thread_local(Thread* thread, size_t size /* 0 */) {
142   BufferPtr buffer = mspace_acquire_to_live_list(size, instance()._thread_local_mspace, thread);
143   if (buffer == NULL) {
144     log_allocation_failure("thread local_memory", size);
145     return NULL;
146   }
147   assert(buffer->acquired_by_self(), "invariant");
148   return buffer;
149 }
150 
151 BufferPtr JfrStorage::acquire_transient(size_t size, Thread* thread) {
152   BufferPtr buffer = mspace_allocate_transient_lease(size, instance()._thread_local_mspace, thread);
153   if (buffer == NULL) {
154     log_allocation_failure("transient memory", size);
155     return NULL;
156   }
157   assert(buffer->acquired_by_self(), "invariant");
158   assert(buffer->transient(), "invariant");
159   assert(buffer->lease(), "invariant");
160   return buffer;
161 }
162 
163 static BufferPtr acquire_lease(size_t size, JfrStorageMspace* mspace, JfrStorage& storage_instance, size_t retry_count, Thread* thread) {
164   assert(size <= mspace->min_element_size(), "invariant");
165   while (true) {
166     BufferPtr buffer = mspace_acquire_lease_with_retry(size, mspace, retry_count, thread);
167     if (buffer == NULL && storage_instance.control().should_discard()) {
168       storage_instance.discard_oldest(thread);
169       continue;
170     }
171     return buffer;
172   }
173 }
174 
175 static BufferPtr acquire_promotion_buffer(size_t size, JfrStorageMspace* mspace, JfrStorage& storage_instance, size_t retry_count, Thread* thread) {
176   assert(size <= mspace->min_element_size(), "invariant");
177   while (true) {
178     BufferPtr buffer= mspace_acquire_live_with_retry(size, mspace, retry_count, thread);
179     if (buffer == NULL && storage_instance.control().should_discard()) {
180       storage_instance.discard_oldest(thread);
181       continue;
182     }
183     return buffer;
184   }
185 }
186 
187 static const size_t lease_retry = 10;
188 
189 BufferPtr JfrStorage::acquire_large(size_t size, Thread* thread) {
190   JfrStorage& storage_instance = instance();
191   const size_t max_elem_size = storage_instance._global_mspace->min_element_size(); // min is also max
192   // if not too large and capacity is still available, ask for a lease from the global system
193   if (size < max_elem_size && storage_instance.control().is_global_lease_allowed()) {
194     BufferPtr const buffer = acquire_lease(size, storage_instance._global_mspace, storage_instance, lease_retry, thread);
195     if (buffer != NULL) {
196       assert(buffer->acquired_by_self(), "invariant");
197       assert(!buffer->transient(), "invariant");
198       assert(buffer->lease(), "invariant");
199       storage_instance.control().increment_leased();
200       return buffer;
201     }
202   }
203   return acquire_transient(size, thread);
204 }
205 
206 static void write_data_loss_event(JfrBuffer* buffer, u8 unflushed_size, Thread* thread) {
207   assert(buffer != NULL, "invariant");
208   assert(buffer->empty(), "invariant");
209   const u8 total_data_loss = thread->jfr_thread_local()->add_data_lost(unflushed_size);
210   if (EventDataLoss::is_enabled()) {
211     JfrNativeEventWriter writer(buffer, thread);
212     writer.begin_event_write(false);
213     writer.write<u8>(EventDataLoss::eventId);
214     writer.write(JfrTicks::now());
215     writer.write(unflushed_size);
216     writer.write(total_data_loss);
217     writer.end_event_write(false);
218   }
219 }
220 
221 static void write_data_loss(BufferPtr buffer, Thread* thread) {
222   assert(buffer != NULL, "invariant");
223   const size_t unflushed_size = buffer->unflushed_size();
224   buffer->reinitialize();
225   if (unflushed_size == 0) {
226     return;
227   }
228   write_data_loss_event(buffer, unflushed_size, thread);
229 }
230 
231 static const size_t promotion_retry = 100;
232 
233 bool JfrStorage::flush_regular_buffer(BufferPtr buffer, Thread* thread) {
234   assert(buffer != NULL, "invariant");
235   assert(!buffer->lease(), "invariant");
236   assert(!buffer->transient(), "invariant");
237   const size_t unflushed_size = buffer->unflushed_size();
238   if (unflushed_size == 0) {
239     buffer->reinitialize();
240     assert(buffer->empty(), "invariant");
241     return true;
242   }
243 
244   if (buffer->excluded()) {
245     const bool thread_is_excluded = thread->jfr_thread_local()->is_excluded();
246     buffer->reinitialize(thread_is_excluded);
247     assert(buffer->empty(), "invariant");
248     if (!thread_is_excluded) {
249       // state change from exclusion to inclusion requires a thread checkpoint
250       JfrCheckpointManager::write_checkpoint(thread);
251     }
252     return true;
253   }
254 
255   BufferPtr const promotion_buffer = acquire_promotion_buffer(unflushed_size, _global_mspace, *this, promotion_retry, thread);
256   if (promotion_buffer == NULL) {
257     write_data_loss(buffer, thread);
258     return false;
259   }
260   assert(promotion_buffer->acquired_by_self(), "invariant");
261   assert(promotion_buffer->free_size() >= unflushed_size, "invariant");
262   buffer->move(promotion_buffer, unflushed_size);
263   assert(buffer->empty(), "invariant");
264   return true;
265 }
266 
267 /*
268 * 1. If the buffer was a "lease" from the global system, release back.
269 * 2. If the buffer is transient (temporal dynamically allocated), retire and register full.
270 *
271 * The buffer is effectively invalidated for the thread post-return,
272 * and the caller should take means to ensure that it is not referenced any longer.
273 */
274 void JfrStorage::release_large(BufferPtr buffer, Thread* thread) {
275   assert(buffer != NULL, "invariant");
276   assert(buffer->lease(), "invariant");
277   assert(buffer->acquired_by_self(), "invariant");
278   buffer->clear_lease();
279   if (buffer->transient()) {
280     buffer->set_retired();
281     register_full(buffer, thread);
282   } else {
283     buffer->release();
284     control().decrement_leased();
285   }
286 }
287 
288 void JfrStorage::register_full(BufferPtr buffer, Thread* thread) {
289   assert(buffer != NULL, "invariant");
290   assert(buffer->acquired_by(thread), "invariant");
291   assert(buffer->retired(), "invariant");
292   if (_full_list->add(buffer)) {
293     _post_box.post(MSG_FULLBUFFER);
294   }
295 }
296 
297 // don't use buffer on return, it is gone
298 void JfrStorage::release(BufferPtr buffer, Thread* thread) {
299   assert(buffer != NULL, "invariant");
300   assert(!buffer->lease(), "invariant");
301   assert(!buffer->transient(), "invariant");
302   assert(!buffer->retired(), "invariant");
303   if (!buffer->empty()) {
304     if (!flush_regular_buffer(buffer, thread)) {
305       buffer->reinitialize();
306     }
307   }
308   assert(buffer->empty(), "invariant");
309   assert(buffer->identity() != NULL, "invariant");
310   buffer->clear_excluded();
311   buffer->set_retired();
312 }
313 
314 void JfrStorage::release_thread_local(BufferPtr buffer, Thread* thread) {
315   assert(buffer != NULL, "invariant");
316   JfrStorage& storage_instance = instance();
317   storage_instance.release(buffer, thread);
318 }
319 
320 static void log_discard(size_t pre_full_count, size_t post_full_count, size_t amount) {
321   if (log_is_enabled(Debug, jfr, system)) {
322     const size_t number_of_discards = pre_full_count - post_full_count;
323     if (number_of_discards > 0) {
324       log_debug(jfr, system)("Cleared " SIZE_FORMAT " full buffer(s) of " SIZE_FORMAT" bytes.", number_of_discards, amount);
325       log_debug(jfr, system)("Current number of full buffers " SIZE_FORMAT "", number_of_discards);
326     }
327   }
328 }
329 
330 void JfrStorage::discard_oldest(Thread* thread) {
331   if (JfrBuffer_lock->try_lock()) {
332     if (!control().should_discard()) {
333       // another thread handled it
334       return;
335     }
336     const size_t num_full_pre_discard = control().full_count();
337     size_t discarded_size = 0;
338     while (_full_list->is_nonempty()) {
339       BufferPtr oldest = _full_list->remove();
340       assert(oldest != NULL, "invariant");
341       assert(oldest->identity() != NULL, "invariant");
342       discarded_size += oldest->discard();
343       assert(oldest->unflushed_size() == 0, "invariant");
344       if (oldest->transient()) {
345         mspace_release(oldest, _thread_local_mspace);
346         continue;
347       }
348       oldest->reinitialize();
349       assert(!oldest->retired(), "invariant");
350       oldest->release(); // publish
351       break;
352     }
353     JfrBuffer_lock->unlock();
354     log_discard(num_full_pre_discard, control().full_count(), discarded_size);
355   }
356 }
357 
358 #ifdef ASSERT
359 typedef const BufferPtr ConstBufferPtr;
360 
361 static void assert_flush_precondition(ConstBufferPtr cur, size_t used, bool native, const Thread* t) {
362   assert(t != NULL, "invariant");
363   assert(cur != NULL, "invariant");
364   assert(cur->pos() + used <= cur->end(), "invariant");
365   assert(native ? t->jfr_thread_local()->native_buffer() == cur : t->jfr_thread_local()->java_buffer() == cur, "invariant");
366 }
367 
368 static void assert_flush_regular_precondition(ConstBufferPtr cur, const u1* const cur_pos, size_t used, size_t req, const Thread* t) {
369   assert(t != NULL, "invariant");
370   assert(cur != NULL, "invariant");
371   assert(!cur->lease(), "invariant");
372   assert(cur_pos != NULL, "invariant");
373   assert(req >= used, "invariant");
374 }
375 
376 static void assert_provision_large_precondition(ConstBufferPtr cur, size_t used, size_t req, const Thread* t) {
377   assert(cur != NULL, "invariant");
378   assert(t != NULL, "invariant");
379   assert(t->jfr_thread_local()->shelved_buffer() != NULL, "invariant");
380   assert(req >= used, "invariant");
381 }
382 
383 static void assert_flush_large_precondition(ConstBufferPtr cur, const u1* const cur_pos, size_t used, size_t req, bool native, Thread* t) {
384   assert(t != NULL, "invariant");
385   assert(cur != NULL, "invariant");
386   assert(cur->lease(), "invariant");
387   assert(!cur->excluded(), "invariant");
388   assert(cur_pos != NULL, "invariant");
389   assert(native ? t->jfr_thread_local()->native_buffer() == cur : t->jfr_thread_local()->java_buffer() == cur, "invariant");
390   assert(t->jfr_thread_local()->shelved_buffer() != NULL, "invariant");
391   assert(req >= used, "invariant");
392   assert(cur != t->jfr_thread_local()->shelved_buffer(), "invariant");
393 }
394 #endif // ASSERT
395 
396 BufferPtr JfrStorage::flush(BufferPtr cur, size_t used, size_t req, bool native, Thread* t) {
397   debug_only(assert_flush_precondition(cur, used, native, t);)
398   const u1* const cur_pos = cur->pos();
399   req += used;
400   // requested size now encompass the outstanding used size
401   return cur->lease() ? instance().flush_large(cur, cur_pos, used, req, native, t) :
402                           instance().flush_regular(cur, cur_pos, used, req, native, t);
403 }
404 
405 BufferPtr JfrStorage::flush_regular(BufferPtr cur, const u1* const cur_pos, size_t used, size_t req, bool native, Thread* t) {
406   debug_only(assert_flush_regular_precondition(cur, cur_pos, used, req, t);)
407   // A flush is needed before memmove since a non-large buffer is thread stable
408   // (thread local). The flush will not modify memory in addresses above pos()
409   // which is where the "used / uncommitted" data resides. It is therefore both
410   // possible and valid to migrate data after the flush. This is however only
411   // the case for stable thread local buffers; it is not the case for large buffers.
412   flush_regular_buffer(cur, t);
413   if (cur->excluded()) {
414     return cur;
415   }
416   if (cur->free_size() >= req) {
417     // simplest case, no switching of buffers
418     if (used > 0) {
419       // source and destination may overlap so memmove must be used instead of memcpy
420       memmove(cur->pos(), (void*)cur_pos, used);
421     }
422     assert(native ? t->jfr_thread_local()->native_buffer() == cur : t->jfr_thread_local()->java_buffer() == cur, "invariant");
423     return cur;
424   }
425   // Going for a "larger-than-regular" buffer.
426   // Shelve the current buffer to make room for a temporary lease.
427   assert(t->jfr_thread_local()->shelved_buffer() == NULL, "invariant");
428   t->jfr_thread_local()->shelve_buffer(cur);
429   return provision_large(cur, cur_pos, used, req, native, t);
430 }
431 
432 static BufferPtr store_buffer_to_thread_local(BufferPtr buffer, JfrThreadLocal* jfr_thread_local, bool native) {
433   assert(buffer != NULL, "invariant");
434   if (native) {
435     jfr_thread_local->set_native_buffer(buffer);
436   } else {
437     jfr_thread_local->set_java_buffer(buffer);
438   }
439   return buffer;
440 }
441 
442 static BufferPtr restore_shelved_buffer(bool native, Thread* t) {
443   JfrThreadLocal* const tl = t->jfr_thread_local();
444   BufferPtr shelved = tl->shelved_buffer();
445   assert(shelved != NULL, "invariant");
446   tl->shelve_buffer(NULL);
447   // restore shelved buffer back as primary
448   return store_buffer_to_thread_local(shelved, tl, native);
449 }
450 
451 BufferPtr JfrStorage::flush_large(BufferPtr cur, const u1* const cur_pos, size_t used, size_t req, bool native, Thread* t) {
452   debug_only(assert_flush_large_precondition(cur, cur_pos, used, req, native, t);)
453   // Can the "regular" buffer (now shelved) accommodate the requested size?
454   BufferPtr shelved = t->jfr_thread_local()->shelved_buffer();
455   assert(shelved != NULL, "invariant");
456   if (shelved->free_size() >= req) {
457     if (req > 0) {
458       memcpy(shelved->pos(), (void*)cur_pos, (size_t)used);
459     }
460     // release and invalidate
461     release_large(cur, t);
462     return restore_shelved_buffer(native, t);
463   }
464   // regular too small
465   return provision_large(cur, cur_pos,  used, req, native, t);
466 }
467 
468 static BufferPtr large_fail(BufferPtr cur, bool native, JfrStorage& storage_instance, Thread* t) {
469   assert(cur != NULL, "invariant");
470   assert(t != NULL, "invariant");
471   if (cur->lease()) {
472     storage_instance.release_large(cur, t);
473   }
474   return restore_shelved_buffer(native, t);
475 }
476 
477 // Always returns a non-null buffer.
478 // If accommodating the large request fails, the shelved buffer is returned
479 // even though it might be smaller than the requested size.
480 // Caller needs to ensure if the size was successfully accommodated.
481 BufferPtr JfrStorage::provision_large(BufferPtr cur, const u1* const cur_pos, size_t used, size_t req, bool native, Thread* t) {
482   debug_only(assert_provision_large_precondition(cur, used, req, t);)
483   assert(t->jfr_thread_local()->shelved_buffer() != NULL, "invariant");
484   BufferPtr const buffer = acquire_large(req, t);
485   if (buffer == NULL) {
486     // unable to allocate and serve the request
487     return large_fail(cur, native, *this, t);
488   }
489   // ok managed to acquire a "large" buffer for the requested size
490   assert(buffer->free_size() >= req, "invariant");
491   assert(buffer->lease(), "invariant");
492   // transfer outstanding data
493   memcpy(buffer->pos(), (void*)cur_pos, used);
494   if (cur->lease()) {
495     release_large(cur, t);
496     // don't use current anymore, it is gone
497   }
498   return store_buffer_to_thread_local(buffer, t->jfr_thread_local(), native);
499 }
500 
501 typedef UnBufferedWriteToChunk<JfrBuffer> WriteOperation;
502 typedef MutexedWriteOp<WriteOperation> MutexedWriteOperation;
503 typedef ConcurrentWriteOp<WriteOperation> ConcurrentWriteOperation;
504 
505 typedef Excluded<JfrBuffer, true> NonExcluded;
506 typedef PredicatedConcurrentWriteOp<WriteOperation, NonExcluded>  ConcurrentNonExcludedWriteOperation;
507 
508 typedef ScavengingReleaseOp<JfrThreadLocalMspace, JfrThreadLocalMspace::LiveList> ReleaseThreadLocalOperation;
509 typedef CompositeOperation<ConcurrentNonExcludedWriteOperation, ReleaseThreadLocalOperation> ConcurrentWriteReleaseThreadLocalOperation;
510 
511 size_t JfrStorage::write() {
512   const size_t full_elements = write_full();
513   WriteOperation wo(_chunkwriter);
514   NonExcluded ne;
515   ConcurrentNonExcludedWriteOperation cnewo(wo, ne);
516   ReleaseThreadLocalOperation rtlo(_thread_local_mspace, _thread_local_mspace->live_list());
517   ConcurrentWriteReleaseThreadLocalOperation tlop(&cnewo, &rtlo);
518   process_live_list(tlop, _thread_local_mspace);
519   assert(_global_mspace->free_list_is_empty(), "invariant");
520   assert(_global_mspace->live_list_is_nonempty(), "invariant");
521   process_live_list(cnewo, _global_mspace);
522   return full_elements + wo.elements();
523 }
524 
525 size_t JfrStorage::write_at_safepoint() {
526   assert(SafepointSynchronize::is_at_safepoint(), "invariant");
527   const size_t full_elements = write_full();
528   WriteOperation wo(_chunkwriter);
529   NonExcluded ne;
530   ConcurrentNonExcludedWriteOperation cnewo(wo, ne); // concurrent because of gc's
531   process_live_list(cnewo, _thread_local_mspace);
532   assert(_global_mspace->free_list_is_empty(), "invariant");
533   assert(_global_mspace->live_list_is_nonempty(), "invariant");
534   process_live_list(cnewo, _global_mspace);
535   return full_elements + wo.elements();
536 }
537 
538 typedef DiscardOp<DefaultDiscarder<JfrStorage::Buffer> > DiscardOperation;
539 typedef CompositeOperation<DiscardOperation, ReleaseThreadLocalOperation> DiscardReleaseThreadLocalOperation;
540 
541 size_t JfrStorage::clear() {
542   const size_t full_elements = clear_full();
543   DiscardOperation discarder(concurrent); // concurrent discard mode
544   ReleaseThreadLocalOperation rtlo(_thread_local_mspace, _thread_local_mspace->live_list());
545   DiscardReleaseThreadLocalOperation tldo(&discarder, &rtlo);
546   process_live_list(tldo, _thread_local_mspace);
547   assert(_global_mspace->free_list_is_empty(), "invariant");
548   assert(_global_mspace->live_list_is_nonempty(), "invariant");
549   process_live_list(discarder, _global_mspace);
550   return full_elements + discarder.elements();
551 }
552 
553 template <typename Processor>
554 static size_t process_full(Processor& processor, JfrFullList* list, JfrStorageControl& control) {
555   assert(list != NULL, "invariant");
556   assert(list->is_nonempty(), "invariant");
557   size_t count = 0;
558   do {
559     BufferPtr full = list->remove();
560     if (full == NULL) break;
561     assert(full->retired(), "invariant");
562     processor.process(full);
563     // at this point, the buffer is already live or destroyed
564     ++count;
565   } while (list->is_nonempty());
566   return count;
567 }
568 
569 static void log(size_t count, size_t amount, bool clear = false) {
570   if (log_is_enabled(Debug, jfr, system)) {
571     if (count > 0) {
572       log_debug(jfr, system)("%s " SIZE_FORMAT " full buffer(s) of " SIZE_FORMAT" B of data%s",
573         clear ? "Discarded" : "Wrote", count, amount, clear ? "." : " to chunk.");
574     }
575   }
576 }
577 
578 typedef ReleaseOp<JfrThreadLocalMspace> ReleaseFullOperation;
579 typedef CompositeOperation<MutexedWriteOperation, ReleaseFullOperation> WriteFullOperation;
580 
581 // full writer
582 // Assumption is retired only; exclusive access
583 // MutexedWriter -> ReleaseOp
584 //
585 size_t JfrStorage::write_full() {
586   assert(_chunkwriter.is_valid(), "invariant");
587   if (_full_list->is_empty()) {
588     return 0;
589   }
590   WriteOperation wo(_chunkwriter);
591   MutexedWriteOperation writer(wo); // a retired buffer implies mutexed access
592   ReleaseFullOperation rfo(_thread_local_mspace);
593   WriteFullOperation wfo(&writer, &rfo);
594   const size_t count = process_full(wfo, _full_list, control());
595   if (count != 0) {
596     log(count, writer.size());
597   }
598   return count;
599 }
600 
601 size_t JfrStorage::clear_full() {
602   if (_full_list->is_empty()) {
603     return 0;
604   }
605   DiscardOperation discarder(mutexed); // a retired buffer implies mutexed access
606   const size_t count = process_full(discarder, _full_list, control());
607   if (count != 0) {
608     log(count, discarder.size());
609   }
610   return count;
611 }