1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
use std::fmt;
use std::sync::Arc;
use parking_lot::Mutex;
use re_log_encoding::encoder::encode_as_bytes_local;
use re_log_encoding::encoder::{local_raw_encoder, EncodeError};
use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId};
use crate::RecordingStream;
/// Where the SDK sends its log messages.
pub trait LogSink: Send + Sync + 'static {
/// Send this log message.
fn send(&self, msg: LogMsg);
/// Send all these log messages.
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
for msg in messages {
self.send(msg);
}
}
/// Drain all buffered [`LogMsg`]es and return them.
///
/// Only applies to sinks that maintain a backlog.
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
vec![]
}
/// Blocks until all pending data in the sink's send buffers has been fully flushed.
///
/// See also [`LogSink::drop_if_disconnected`].
fn flush_blocking(&self);
/// Drops all pending data currently sitting in the sink's send buffers if it is unable to
/// flush it for any reason (e.g. a broken TCP connection for a [`TcpSink`]).
#[inline]
fn drop_if_disconnected(&self) {}
/// Send a blueprint directly to the log-sink.
///
/// This mirrors the behavior of [`crate::RecordingStream::send_blueprint`].
fn send_blueprint(&self, blueprint: Vec<LogMsg>, activation_cmd: BlueprintActivationCommand) {
let mut blueprint_id = None;
for msg in blueprint {
if blueprint_id.is_none() {
blueprint_id = Some(msg.store_id().clone());
}
self.send(msg);
}
if let Some(blueprint_id) = blueprint_id {
if blueprint_id == activation_cmd.blueprint_id {
// Let the viewer know that the blueprint has been fully received,
// and that it can now be activated.
// We don't want to activate half-loaded blueprints, because that can be confusing,
// and can also lead to problems with space-view heuristics.
self.send(activation_cmd.into());
} else {
re_log::warn!(
"Blueprint ID mismatch when sending blueprint: {} != {}. Ignoring activation.",
blueprint_id,
activation_cmd.blueprint_id
);
}
}
}
}
// ----------------------------------------------------------------------------
/// Store log messages in memory until you call [`LogSink::drain_backlog`].
#[derive(Default)]
pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);
impl Drop for BufferedSink {
fn drop(&mut self) {
for msg in self.0.lock().iter() {
// Sinks intentionally end up with pending SetStoreInfo messages
// these are fine to drop safely. Anything else should produce a
// warning.
if !matches!(msg, LogMsg::SetStoreInfo(_)) {
re_log::warn!("Dropping data in BufferedSink");
return;
}
}
}
}
impl BufferedSink {
/// An empty buffer.
#[inline]
pub fn new() -> Self {
Self::default()
}
}
impl LogSink for BufferedSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}
#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
std::mem::take(&mut self.0.lock())
}
#[inline]
fn flush_blocking(&self) {}
}
impl fmt::Debug for BufferedSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BufferedSink {{ {} messages }}", self.0.lock().len())
}
}
/// Store log messages directly in memory.
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
///
/// Additionally the raw storage can be accessed and used to create an in-memory RRD.
/// This is useful for things like the inline rrd-viewer in Jupyter notebooks.
pub struct MemorySink(MemorySinkStorage);
impl MemorySink {
/// Create a new [`MemorySink`] with an associated [`RecordingStream`].
#[inline]
pub fn new(rec: RecordingStream) -> Self {
Self(MemorySinkStorage::new(rec))
}
/// Access the raw `MemorySinkStorage`
#[inline]
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}
impl LogSink for MemorySink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.write().push(msg);
}
#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.write().append(&mut messages);
}
#[inline]
fn flush_blocking(&self) {}
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
// Note that When draining the backlog, we don't call `take` since that would flush
// the stream. But drain_backlog is being called as part of `set_sink`, which has already queued
// a flush of the batcher. Queueing a second flush here seems to lead to a deadlock
// at shutdown.
std::mem::take(&mut (self.0.write()))
}
}
impl fmt::Debug for MemorySink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MemorySink {{ {} messages }}", self.buffer().num_msgs())
}
}
#[derive(Default)]
struct MemorySinkStorageInner {
msgs: Vec<LogMsg>,
has_been_used: bool,
}
/// The storage used by [`MemorySink`].
#[derive(Clone)]
pub struct MemorySinkStorage {
inner: Arc<Mutex<MemorySinkStorageInner>>,
pub(crate) rec: RecordingStream,
}
impl Drop for MemorySinkStorage {
fn drop(&mut self) {
let inner = self.inner.lock();
if !inner.has_been_used {
for msg in &inner.msgs {
// Sinks intentionally end up with pending SetStoreInfo messages
// these are fine to drop safely. Anything else should produce a
// warning.
if !matches!(msg, LogMsg::SetStoreInfo(_)) {
re_log::warn!("Dropping data in MemorySink");
return;
}
}
}
}
}
impl MemorySinkStorage {
/// Create a new [`MemorySinkStorage`] with an associated [`RecordingStream`].
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}
/// Write access to the inner array of [`LogMsg`].
#[inline]
fn write(&self) -> parking_lot::MappedMutexGuard<'_, Vec<LogMsg>> {
let mut inner = self.inner.lock();
inner.has_been_used = false;
parking_lot::MutexGuard::map(inner, |inner| &mut inner.msgs)
}
/// How many messages are currently written to this memory sink
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn num_msgs(&self) -> usize {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
self.rec.flush_blocking();
self.inner.lock().msgs.len()
}
/// Consumes and returns the inner array of [`LogMsg`].
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
self.rec.flush_blocking();
std::mem::take(&mut (self.write()))
}
/// Convert the stored messages into an in-memory Rerun log file.
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn concat_memory_sinks_as_bytes(sinks: &[&Self]) -> Result<Vec<u8>, EncodeError> {
let mut encoder = local_raw_encoder()?;
for sink in sinks {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
sink.rec.flush_blocking();
let mut inner = sink.inner.lock();
inner.has_been_used = true;
for message in &inner.msgs {
encoder.append(message)?;
}
}
encoder.finish()?;
Ok(encoder.into_inner())
}
/// Drain the stored messages and return them as an in-memory RRD.
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn drain_as_bytes(&self) -> Result<Vec<u8>, EncodeError> {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
self.rec.flush_blocking();
let mut inner = self.inner.lock();
inner.has_been_used = true;
encode_as_bytes_local(std::mem::take(&mut inner.msgs).into_iter().map(Ok))
}
#[inline]
/// Get the [`StoreId`] from the associated `RecordingStream` if it exists.
pub fn store_id(&self) -> Option<StoreId> {
self.rec.store_info().map(|info| info.store_id.clone())
}
}
// ----------------------------------------------------------------------------
type LogMsgCallback = Box<dyn Fn(&[LogMsg]) + Send + Sync>;
/// A sink which forwards all log messages to a callback without any buffering.
pub struct CallbackSink {
// We often receive only one element, so we use SmallVec to avoid heap allocation
callback: LogMsgCallback,
}
impl CallbackSink {
/// Create a new `CallbackSink` with the given callback function.
#[inline]
pub fn new<F>(callback: F) -> Self
where
F: Fn(&[LogMsg]) + Send + Sync + 'static,
{
Self {
callback: Box::new(callback),
}
}
}
impl LogSink for CallbackSink {
#[inline]
fn send(&self, msg: LogMsg) {
(self.callback)(&[msg]);
}
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
(self.callback)(&messages[..]);
}
#[inline]
fn flush_blocking(&self) {}
}
// ----------------------------------------------------------------------------
/// Stream log messages to a Rerun TCP server.
#[derive(Debug)]
pub struct TcpSink {
client: re_sdk_comms::Client,
}
impl TcpSink {
/// Connect to the given address in a background thread.
/// Retries until successful.
///
/// `flush_timeout` is the minimum time the [`TcpSink`] will wait during a flush
/// before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
#[inline]
pub fn new(addr: std::net::SocketAddr, flush_timeout: Option<std::time::Duration>) -> Self {
Self {
client: re_sdk_comms::Client::new(addr, flush_timeout),
}
}
}
impl LogSink for TcpSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.client.send(msg);
}
#[inline]
fn flush_blocking(&self) {
self.client.flush();
}
#[inline]
fn drop_if_disconnected(&self) {
self.client.drop_if_disconnected();
}
}