1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::Arc;

use parking_lot::Mutex;

use re_log_types::LogMsg;

use crate::sink::LogSink;
use crate::RecordingStream;

/// Errors that can occur when creating a [`BinaryStreamSink`].
#[derive(thiserror::Error, Debug)]
pub enum BinaryStreamSinkError {
    /// Error spawning the writer thread.
    #[error("Failed to spawn thread: {0}")]
    SpawnThread(std::io::Error),

    /// Error encoding a log message.
    #[error("Failed to encode LogMsg: {0}")]
    LogMsgEncode(#[from] re_log_encoding::encoder::EncodeError),
}

enum Command {
    Send(LogMsg),
    Flush(SyncSender<()>),
}

impl Command {
    fn flush() -> (Self, Receiver<()>) {
        let (tx, rx) = std::sync::mpsc::sync_channel(0); // oneshot
        (Self::Flush(tx), rx)
    }
}

/// The inner storage used by [`BinaryStreamStorage`].
///
/// Although this implements Clone so that it can be shared between the encoder thread and the outer
/// storage, the model is that reading from it consumes the buffer.
#[derive(Clone, Default)]
struct BinaryStreamStorageInner(Arc<Mutex<std::io::Cursor<Vec<u8>>>>);

impl std::io::Write for BinaryStreamStorageInner {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.0.lock().write(buf)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        self.0.lock().flush()
    }
}

/// The storage used by [`BinaryStreamSink`].
///
/// Reading from this consumes the bytes from the stream.
pub struct BinaryStreamStorage {
    inner: BinaryStreamStorageInner,
    rec: RecordingStream,
}

impl BinaryStreamStorage {
    /// Create a new binary stream storage.
    fn new(rec: RecordingStream) -> Self {
        Self {
            inner: Default::default(),
            rec,
        }
    }

    /// Read and consume the current contents of the buffer.
    ///
    /// This does not flush the underlying batcher.
    /// Use [`BinaryStreamStorage::flush`] if you want to guarantee that all
    /// logged messages have been written to the stream before you read them.
    #[inline]
    pub fn read(&self) -> Vec<u8> {
        let mut buffer = std::io::Cursor::new(Vec::new());
        std::mem::swap(&mut buffer, &mut *self.inner.0.lock());
        buffer.into_inner()
    }

    /// Flush the batcher and log encoder to guarantee that all logged messages
    /// have been written to the stream.
    ///
    /// This will block until the flush is complete.
    #[inline]
    pub fn flush(&self) {
        self.rec.flush_blocking();
    }
}

impl Drop for BinaryStreamStorage {
    fn drop(&mut self) {
        self.flush();
        let bytes = self.read();

        if !bytes.is_empty() {
            re_log::warn!("Dropping data in BinaryStreamStorage");
        }
    }
}

/// Stream log messages to an in-memory binary stream.
///
/// The contents of this stream are encoded in the Rerun Record Data format (rrd).
///
/// This stream has no mechanism of limiting memory or creating back-pressure. If you do not
/// read from it, it will buffer all messages that you have logged.
pub struct BinaryStreamSink {
    /// The sender to the encoder thread.
    tx: Mutex<Sender<Option<Command>>>,

    /// Handle to join the encoder thread on drop.
    join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for BinaryStreamSink {
    fn drop(&mut self) {
        self.tx.lock().send(None).ok();
        if let Some(join_handle) = self.join_handle.take() {
            join_handle.join().ok();
        }
    }
}

impl BinaryStreamSink {
    /// Create a pair of a new [`BinaryStreamSink`] and the associated [`BinaryStreamStorage`].
    pub fn new(rec: RecordingStream) -> Result<(Self, BinaryStreamStorage), BinaryStreamSinkError> {
        let storage = BinaryStreamStorage::new(rec);

        // We always compress when writing to a stream
        // TODO(jleibs): Make this configurable
        let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;

        let (tx, rx) = std::sync::mpsc::channel();

        let encoder = re_log_encoding::encoder::DroppableEncoder::new(
            re_build_info::CrateVersion::LOCAL,
            encoding_options,
            storage.inner.clone(),
        )?;

        let join_handle = spawn_and_stream(encoder, rx)?;

        Ok((
            Self {
                tx: tx.into(),
                join_handle: Some(join_handle),
            },
            storage,
        ))
    }
}

impl LogSink for BinaryStreamSink {
    #[inline]
    fn send(&self, msg: re_log_types::LogMsg) {
        self.tx.lock().send(Some(Command::Send(msg))).ok();
    }

    #[inline]
    fn flush_blocking(&self) {
        let (cmd, oneshot) = Command::flush();
        self.tx.lock().send(Some(cmd)).ok();
        oneshot.recv().ok();
    }
}

/// Spawn the encoder thread that will write log messages to the binary stream.
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
    mut encoder: re_log_encoding::encoder::DroppableEncoder<W>,
    rx: Receiver<Option<Command>>,
) -> Result<std::thread::JoinHandle<()>, BinaryStreamSinkError> {
    std::thread::Builder::new()
        .name("binary_stream_encoder".into())
        .spawn({
            move || {
                while let Ok(Some(cmd)) = rx.recv() {
                    match cmd {
                        Command::Send(log_msg) => {
                            if let Err(err) = encoder.append(&log_msg) {
                                re_log::error!(
                                    "Failed to write log stream to binary stream: {err}"
                                );
                                return;
                            }
                        }
                        Command::Flush(oneshot) => {
                            re_log::trace!("Flushing…");
                            if let Err(err) = encoder.flush_blocking() {
                                re_log::error!(
                                    "Failed to flush log stream to binary stream: {err}"
                                );
                                return;
                            }
                            drop(oneshot); // signals the oneshot
                        }
                    }
                }
                re_log::debug!("Log stream written to binary stream");
            }
        })
        .map_err(BinaryStreamSinkError::SpawnThread)
}