re_sdk/binary_stream_sink.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
use std::sync::Arc;
use parking_lot::Mutex;
use re_log::ResultExt as _;
use re_log_encoding::encoder::encode_as_bytes_local;
use re_log_types::LogMsg;
use crate::sink::LogSink;
use crate::RecordingStream;
/// The storage used by [`BinaryStreamSink`].
///
/// Reading from this consumes the bytes from the stream.
pub struct BinaryStreamStorage {
inner: Arc<Mutex<Vec<LogMsg>>>,
rec: RecordingStream,
}
impl BinaryStreamStorage {
/// Create a new binary stream storage.
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}
/// Read and consume the current contents of the buffer.
///
/// This returns a fully encoded RRD file.
///
/// This does not flush the underlying batcher.
/// Use [`BinaryStreamStorage::flush`] if you want to guarantee that all
/// logged messages have been written to the stream before you read them.
#[inline]
pub fn read(&self) -> Option<Vec<u8>> {
let mut inner = self.inner.lock();
// if there's no messages to send, do not include the RRD headers.
if inner.is_empty() {
return None;
}
encode_as_bytes_local(inner.drain(..).map(Ok)).ok_or_log_error()
}
/// Flush the batcher and log encoder to guarantee that all logged messages
/// have been written to the stream.
///
/// This will block until the flush is complete.
#[inline]
pub fn flush(&self) {
self.rec.flush_blocking();
}
}
impl Drop for BinaryStreamStorage {
fn drop(&mut self) {
self.flush();
let bytes = self.read();
if let Some(bytes) = bytes {
re_log::warn!(
"Dropping data in BinaryStreamStorage ({} bytes)",
bytes.len()
);
}
}
}
/// Stream log messages to an in-memory binary stream.
///
/// The contents of this stream are encoded in the Rerun Record Data format (rrd).
///
/// This stream has no mechanism of limiting memory or creating back-pressure. If you do not
/// read from it, it will buffer all messages that you have logged.
pub struct BinaryStreamSink {
buffer: Arc<Mutex<Vec<LogMsg>>>,
}
impl BinaryStreamSink {
/// Create a pair of a new [`BinaryStreamSink`] and the associated [`BinaryStreamStorage`].
pub fn new(rec: RecordingStream) -> (Self, BinaryStreamStorage) {
let storage = BinaryStreamStorage::new(rec);
(
Self {
buffer: storage.inner.clone(),
},
storage,
)
}
}
impl LogSink for BinaryStreamSink {
#[inline]
fn send(&self, msg: re_log_types::LogMsg) {
self.buffer.lock().push(msg);
}
#[inline]
fn flush_blocking(&self) {}
}