#![allow(clippy::needless_pass_by_value)] #![allow(clippy::borrow_deref_ref)] #![allow(unsafe_op_in_unsafe_fn)] use std::io::IsTerminal as _;
use std::path::PathBuf;
use std::{borrow::Borrow, collections::HashMap};
use arrow::array::RecordBatch as ArrowRecordBatch;
use itertools::Itertools;
use pyo3::{
exceptions::PyRuntimeError,
prelude::*,
types::{PyBytes, PyDict},
};
use re_log::ResultExt;
use re_log_types::LogMsg;
use re_log_types::{BlueprintActivationCommand, EntityPathPart, StoreKind};
use re_sdk::external::re_log_encoding::encoder::encode_ref_as_bytes_local;
use re_sdk::sink::CallbackSink;
use re_sdk::{
sink::{BinaryStreamStorage, MemorySinkStorage},
time::TimePoint,
EntityPath, RecordingStream, RecordingStreamBuilder, StoreId,
};
#[cfg(feature = "web_viewer")]
use re_web_viewer_server::WebViewerServerPort;
#[cfg(feature = "web_viewer")]
use re_ws_comms::RerunServerPort;
use once_cell::sync::{Lazy, OnceCell};
fn all_recordings() -> parking_lot::MutexGuard<'static, HashMap<StoreId, RecordingStream>> {
static ALL_RECORDINGS: OnceCell<parking_lot::Mutex<HashMap<StoreId, RecordingStream>>> =
OnceCell::new();
ALL_RECORDINGS.get_or_init(Default::default).lock()
}
type GarbageSender = crossbeam::channel::Sender<ArrowRecordBatch>;
type GarbageReceiver = crossbeam::channel::Receiver<ArrowRecordBatch>;
static GARBAGE_QUEUE: Lazy<(GarbageSender, GarbageReceiver)> =
Lazy::new(crossbeam::channel::unbounded);
fn flush_garbage_queue() {
while GARBAGE_QUEUE.1.try_recv().is_ok() {
}
}
#[cfg(feature = "web_viewer")]
fn global_web_viewer_server(
) -> parking_lot::MutexGuard<'static, Option<re_web_viewer_server::WebViewerServer>> {
static WEB_HANDLE: OnceCell<parking_lot::Mutex<Option<re_web_viewer_server::WebViewerServer>>> =
OnceCell::new();
WEB_HANDLE.get_or_init(Default::default).lock()
}
#[pymodule]
fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
re_log::setup_logging();
m.add_class::<PyMemorySinkStorage>()?;
m.add_class::<PyRecordingStream>()?;
if matches!(std::env::var("RERUN_APP_ONLY").as_deref(), Ok("true")) {
return Ok(());
}
m.add_function(wrap_pyfunction!(new_recording, m)?)?;
m.add_function(wrap_pyfunction!(new_blueprint, m)?)?;
m.add_function(wrap_pyfunction!(shutdown, m)?)?;
m.add_function(wrap_pyfunction!(cleanup_if_forked_child, m)?)?;
m.add_function(wrap_pyfunction!(spawn, m)?)?;
m.add_function(wrap_pyfunction!(get_application_id, m)?)?;
m.add_function(wrap_pyfunction!(get_recording_id, m)?)?;
m.add_function(wrap_pyfunction!(get_data_recording, m)?)?;
m.add_function(wrap_pyfunction!(get_global_data_recording, m)?)?;
m.add_function(wrap_pyfunction!(set_global_data_recording, m)?)?;
m.add_function(wrap_pyfunction!(get_thread_local_data_recording, m)?)?;
m.add_function(wrap_pyfunction!(set_thread_local_data_recording, m)?)?;
m.add_function(wrap_pyfunction!(get_blueprint_recording, m)?)?;
m.add_function(wrap_pyfunction!(get_global_blueprint_recording, m)?)?;
m.add_function(wrap_pyfunction!(set_global_blueprint_recording, m)?)?;
m.add_function(wrap_pyfunction!(get_thread_local_blueprint_recording, m)?)?;
m.add_function(wrap_pyfunction!(set_thread_local_blueprint_recording, m)?)?;
m.add_function(wrap_pyfunction!(is_enabled, m)?)?;
m.add_function(wrap_pyfunction!(binary_stream, m)?)?;
m.add_function(wrap_pyfunction!(connect_tcp, m)?)?;
m.add_function(wrap_pyfunction!(connect_tcp_blueprint, m)?)?;
#[cfg(feature = "remote")]
m.add_function(wrap_pyfunction!(connect_grpc, m)?)?;
m.add_function(wrap_pyfunction!(save, m)?)?;
m.add_function(wrap_pyfunction!(save_blueprint, m)?)?;
m.add_function(wrap_pyfunction!(stdout, m)?)?;
m.add_function(wrap_pyfunction!(memory_recording, m)?)?;
m.add_function(wrap_pyfunction!(set_callback_sink, m)?)?;
m.add_function(wrap_pyfunction!(serve_web, m)?)?;
m.add_function(wrap_pyfunction!(disconnect, m)?)?;
m.add_function(wrap_pyfunction!(flush, m)?)?;
m.add_function(wrap_pyfunction!(set_time_sequence, m)?)?;
m.add_function(wrap_pyfunction!(set_time_seconds, m)?)?;
m.add_function(wrap_pyfunction!(set_time_nanos, m)?)?;
m.add_function(wrap_pyfunction!(disable_timeline, m)?)?;
m.add_function(wrap_pyfunction!(reset_time, m)?)?;
m.add_function(wrap_pyfunction!(log_arrow_msg, m)?)?;
m.add_function(wrap_pyfunction!(log_file_from_path, m)?)?;
m.add_function(wrap_pyfunction!(log_file_from_contents, m)?)?;
m.add_function(wrap_pyfunction!(send_arrow_chunk, m)?)?;
m.add_function(wrap_pyfunction!(send_blueprint, m)?)?;
m.add_function(wrap_pyfunction!(version, m)?)?;
m.add_function(wrap_pyfunction!(get_app_url, m)?)?;
m.add_function(wrap_pyfunction!(start_web_viewer_server, m)?)?;
m.add_function(wrap_pyfunction!(escape_entity_path_part, m)?)?;
m.add_function(wrap_pyfunction!(new_entity_path, m)?)?;
use crate::video::asset_video_read_frame_timestamps_ns;
m.add_function(wrap_pyfunction!(asset_video_read_frame_timestamps_ns, m)?)?;
crate::dataframe::register(m)?;
#[cfg(feature = "remote")]
crate::remote::register(m)?;
Ok(())
}
#[allow(clippy::fn_params_excessive_bools)]
#[allow(clippy::struct_excessive_bools)]
#[allow(clippy::too_many_arguments)]
#[pyfunction]
#[pyo3(signature = (
application_id,
recording_id=None,
make_default=true,
make_thread_default=true,
application_path=None,
default_enabled=true,
))]
fn new_recording(
py: Python<'_>,
application_id: String,
recording_id: Option<String>,
make_default: bool,
make_thread_default: bool,
application_path: Option<PathBuf>,
default_enabled: bool,
) -> PyResult<PyRecordingStream> {
const SENTINEL_FILENAME: &str = ".rerun_examples";
let is_official_example = application_path.is_some_and(|mut path| {
for _ in 0..4 {
path.pop(); if path.join(SENTINEL_FILENAME).exists() {
return true;
}
}
false
});
let recording_id = if let Some(recording_id) = recording_id {
StoreId::from_string(StoreKind::Recording, recording_id)
} else {
default_store_id(py, StoreKind::Recording, &application_id)
};
let mut batcher_config = re_chunk::ChunkBatcherConfig::from_env().unwrap_or_default();
let on_release = |chunk| {
GARBAGE_QUEUE.0.send(chunk).ok();
};
batcher_config.hooks.on_release = Some(on_release.into());
let recording = RecordingStreamBuilder::new(application_id)
.batcher_config(batcher_config)
.is_official_example(is_official_example)
.store_id(recording_id.clone())
.store_source(re_log_types::StoreSource::PythonSdk(python_version(py)))
.default_enabled(default_enabled)
.buffered()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
if make_default {
set_global_data_recording(
py,
Some(&PyRecordingStream(recording.clone() )),
);
}
if make_thread_default {
set_thread_local_data_recording(
py,
Some(&PyRecordingStream(recording.clone() )),
);
}
all_recordings().insert(recording_id, recording.clone());
Ok(PyRecordingStream(recording))
}
#[allow(clippy::fn_params_excessive_bools)]
#[pyfunction]
#[pyo3(signature = (
application_id,
make_default=true,
make_thread_default=true,
default_enabled=true,
))]
fn new_blueprint(
py: Python<'_>,
application_id: String,
make_default: bool,
make_thread_default: bool,
default_enabled: bool,
) -> PyResult<PyRecordingStream> {
let blueprint_id = StoreId::random(StoreKind::Blueprint);
let mut batcher_config = re_chunk::ChunkBatcherConfig::from_env().unwrap_or_default();
let on_release = |chunk| {
GARBAGE_QUEUE.0.send(chunk).ok();
};
batcher_config.hooks.on_release = Some(on_release.into());
let blueprint = RecordingStreamBuilder::new(application_id)
.batcher_config(batcher_config)
.store_id(blueprint_id.clone())
.store_source(re_log_types::StoreSource::PythonSdk(python_version(py)))
.default_enabled(default_enabled)
.buffered()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
if make_default {
set_global_blueprint_recording(
py,
Some(&PyRecordingStream(blueprint.clone() )),
);
}
if make_thread_default {
set_thread_local_blueprint_recording(
py,
Some(&PyRecordingStream(blueprint.clone() )),
);
}
all_recordings().insert(blueprint_id, blueprint.clone());
Ok(PyRecordingStream(blueprint))
}
#[pyfunction]
fn shutdown(py: Python<'_>) {
re_log::debug!("Shutting down the Rerun SDK");
py.allow_threads(|| {
for (_, recording) in all_recordings().iter() {
recording.disconnect();
}
flush_garbage_queue();
});
}
#[pyclass(frozen)]
#[derive(Clone)]
struct PyRecordingStream(RecordingStream);
#[pymethods]
impl PyRecordingStream {
fn is_forked_child(&self) -> bool {
self.0.is_forked_child()
}
}
impl std::ops::Deref for PyRecordingStream {
type Target = RecordingStream;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn get_application_id(recording: Option<&PyRecordingStream>) -> Option<String> {
get_data_recording(recording)?
.store_info()
.map(|info| info.application_id.to_string())
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn get_recording_id(recording: Option<&PyRecordingStream>) -> Option<String> {
get_data_recording(recording)?
.store_info()
.map(|info| info.store_id.to_string())
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn get_data_recording(recording: Option<&PyRecordingStream>) -> Option<PyRecordingStream> {
RecordingStream::get_quiet(
re_sdk::StoreKind::Recording,
recording.map(|rec| rec.0.clone()),
)
.map(PyRecordingStream)
}
#[pyfunction]
fn get_global_data_recording() -> Option<PyRecordingStream> {
RecordingStream::global(re_sdk::StoreKind::Recording).map(PyRecordingStream)
}
#[pyfunction]
fn cleanup_if_forked_child() {
re_sdk::cleanup_if_forked_child();
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn set_global_data_recording(
py: Python<'_>,
recording: Option<&PyRecordingStream>,
) -> Option<PyRecordingStream> {
py.allow_threads(|| {
let rec = RecordingStream::set_global(
re_sdk::StoreKind::Recording,
recording.map(|rec| rec.0.clone()),
)
.map(PyRecordingStream);
flush_garbage_queue();
rec
})
}
#[pyfunction]
fn get_thread_local_data_recording() -> Option<PyRecordingStream> {
RecordingStream::thread_local(re_sdk::StoreKind::Recording).map(PyRecordingStream)
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn set_thread_local_data_recording(
py: Python<'_>,
recording: Option<&PyRecordingStream>,
) -> Option<PyRecordingStream> {
py.allow_threads(|| {
let rec = RecordingStream::set_thread_local(
re_sdk::StoreKind::Recording,
recording.map(|rec| rec.0.clone()),
)
.map(PyRecordingStream);
flush_garbage_queue();
rec
})
}
#[pyfunction]
#[pyo3(signature = (overrides=None))]
fn get_blueprint_recording(overrides: Option<&PyRecordingStream>) -> Option<PyRecordingStream> {
RecordingStream::get_quiet(
re_sdk::StoreKind::Blueprint,
overrides.map(|rec| rec.0.clone()),
)
.map(PyRecordingStream)
}
#[pyfunction]
fn get_global_blueprint_recording() -> Option<PyRecordingStream> {
RecordingStream::global(re_sdk::StoreKind::Blueprint).map(PyRecordingStream)
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn set_global_blueprint_recording(
py: Python<'_>,
recording: Option<&PyRecordingStream>,
) -> Option<PyRecordingStream> {
py.allow_threads(|| {
let rec = RecordingStream::set_global(
re_sdk::StoreKind::Blueprint,
recording.map(|rec| rec.0.clone()),
)
.map(PyRecordingStream);
flush_garbage_queue();
rec
})
}
#[pyfunction]
fn get_thread_local_blueprint_recording() -> Option<PyRecordingStream> {
RecordingStream::thread_local(re_sdk::StoreKind::Blueprint).map(PyRecordingStream)
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn set_thread_local_blueprint_recording(
py: Python<'_>,
recording: Option<&PyRecordingStream>,
) -> Option<PyRecordingStream> {
py.allow_threads(|| {
let rec = RecordingStream::set_thread_local(
re_sdk::StoreKind::Blueprint,
recording.map(|rec| rec.0.clone()),
)
.map(PyRecordingStream);
flush_garbage_queue();
rec
})
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn is_enabled(recording: Option<&PyRecordingStream>) -> bool {
get_data_recording(recording).is_some_and(|rec| rec.is_enabled())
}
fn send_mem_sink_as_default_blueprint(
sink: &dyn re_sdk::sink::LogSink,
default_blueprint: &PyMemorySinkStorage,
) {
if let Some(id) = default_blueprint.inner.store_id() {
let activate_cmd = BlueprintActivationCommand::make_default(id);
sink.send_blueprint(default_blueprint.inner.take(), activate_cmd);
} else {
re_log::warn!("Provided `default_blueprint` has no store info, cannot send it.");
}
}
#[pyfunction]
#[pyo3(signature = (port = 9876, memory_limit = "75%".to_owned(), hide_welcome_screen = false, executable_name = "rerun".to_owned(), executable_path = None, extra_args = vec![], extra_env = vec![]))]
fn spawn(
port: u16,
memory_limit: String,
hide_welcome_screen: bool,
executable_name: String,
executable_path: Option<String>,
extra_args: Vec<String>,
extra_env: Vec<(String, String)>,
) -> PyResult<()> {
let spawn_opts = re_sdk::SpawnOptions {
port,
wait_for_bind: true,
memory_limit,
hide_welcome_screen,
executable_name,
executable_path,
extra_args,
extra_env,
};
re_sdk::spawn(&spawn_opts).map_err(|err| PyRuntimeError::new_err(err.to_string()))
}
#[pyfunction]
#[pyo3(signature = (addr = None, flush_timeout_sec=re_sdk::default_flush_timeout().expect("always Some()").as_secs_f32(), default_blueprint = None, recording = None))]
fn connect_tcp(
addr: Option<String>,
flush_timeout_sec: Option<f32>,
default_blueprint: Option<&PyMemorySinkStorage>,
recording: Option<&PyRecordingStream>,
py: Python<'_>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
if re_sdk::forced_sink_path().is_some() {
re_log::debug!("Ignored call to `connect()` since _RERUN_TEST_FORCE_SAVE is set");
return Ok(());
}
let addr = if let Some(addr) = addr {
addr.parse()?
} else {
re_sdk::default_server_addr()
};
let flush_timeout = flush_timeout_sec.map(std::time::Duration::from_secs_f32);
py.allow_threads(|| {
let sink = re_sdk::sink::TcpSink::new(addr, flush_timeout);
if let Some(default_blueprint) = default_blueprint {
send_mem_sink_as_default_blueprint(&sink, default_blueprint);
}
recording.set_sink(Box::new(sink));
flush_garbage_queue();
});
Ok(())
}
#[pyfunction]
#[pyo3(signature = (addr, make_active, make_default, blueprint_stream))]
fn connect_tcp_blueprint(
addr: Option<String>,
make_active: bool,
make_default: bool,
blueprint_stream: &PyRecordingStream,
py: Python<'_>,
) -> PyResult<()> {
let addr = if let Some(addr) = addr {
addr.parse()?
} else {
re_sdk::default_server_addr()
};
if let Some(blueprint_id) = (*blueprint_stream).store_info().map(|info| info.store_id) {
py.allow_threads(|| {
blueprint_stream.flush_blocking();
let activation_cmd = BlueprintActivationCommand {
blueprint_id,
make_active,
make_default,
};
blueprint_stream.record_msg(activation_cmd.into());
blueprint_stream.connect_opts(addr, None);
flush_garbage_queue();
});
Ok(())
} else {
Err(PyRuntimeError::new_err(
"Blueprint stream has no store info".to_owned(),
))
}
}
#[cfg(feature = "remote")]
#[pyfunction]
#[pyo3(signature = (addr, recording = None))]
fn connect_grpc(addr: String, recording: Option<&PyRecordingStream>, py: Python<'_>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
py.allow_threads(|| {
let sink = re_sdk::sink::GrpcSink::new(addr);
recording.set_sink(Box::new(sink));
flush_garbage_queue();
});
}
#[pyfunction]
#[pyo3(signature = (path, default_blueprint = None, recording = None))]
fn save(
path: &str,
default_blueprint: Option<&PyMemorySinkStorage>,
recording: Option<&PyRecordingStream>,
py: Python<'_>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
if re_sdk::forced_sink_path().is_some() {
re_log::debug!("Ignored call to `save()` since _RERUN_TEST_FORCE_SAVE is set");
return Ok(());
}
py.allow_threads(|| {
let sink = re_sdk::sink::FileSink::new(path)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
if let Some(default_blueprint) = default_blueprint {
send_mem_sink_as_default_blueprint(&sink, default_blueprint);
}
recording.set_sink(Box::new(sink));
flush_garbage_queue();
Ok(())
})
}
#[pyfunction]
#[pyo3(signature = (path, blueprint_stream))]
fn save_blueprint(
path: &str,
blueprint_stream: &PyRecordingStream,
py: Python<'_>,
) -> PyResult<()> {
if let Some(blueprint_id) = (*blueprint_stream).store_info().map(|info| info.store_id) {
py.allow_threads(|| {
blueprint_stream.flush_blocking();
let activation_cmd = BlueprintActivationCommand::make_active(blueprint_id.clone());
blueprint_stream.record_msg(activation_cmd.into());
let res = blueprint_stream
.save_opts(path)
.map_err(|err| PyRuntimeError::new_err(err.to_string()));
flush_garbage_queue();
res
})
} else {
Err(PyRuntimeError::new_err(
"Blueprint stream has no store info".to_owned(),
))
}
}
#[pyfunction]
#[pyo3(signature = (default_blueprint = None, recording = None))]
fn stdout(
default_blueprint: Option<&PyMemorySinkStorage>,
recording: Option<&PyRecordingStream>,
py: Python<'_>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
if re_sdk::forced_sink_path().is_some() {
re_log::debug!("Ignored call to `stdout()` since _RERUN_TEST_FORCE_SAVE is set");
return Ok(());
}
py.allow_threads(|| {
let sink: Box<dyn re_sdk::sink::LogSink> = if std::io::stdout().is_terminal() {
re_log::debug!("Ignored call to stdout() because stdout is a terminal");
Box::new(re_sdk::sink::BufferedSink::new())
} else {
Box::new(
re_sdk::sink::FileSink::stdout()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?,
)
};
if let Some(default_blueprint) = default_blueprint {
send_mem_sink_as_default_blueprint(sink.as_ref(), default_blueprint);
}
flush_garbage_queue();
recording.set_sink(sink);
Ok(())
})
}
#[pyfunction]
#[pyo3(signature = (recording = None))]
fn memory_recording(
recording: Option<&PyRecordingStream>,
py: Python<'_>,
) -> Option<PyMemorySinkStorage> {
get_data_recording(recording).map(|rec| {
let inner = py.allow_threads(|| {
let storage = rec.memory();
flush_garbage_queue();
storage
});
PyMemorySinkStorage { inner }
})
}
#[pyfunction]
#[pyo3(signature = (callback, recording = None))]
fn set_callback_sink(callback: PyObject, recording: Option<&PyRecordingStream>, py: Python<'_>) {
let Some(rec) = get_data_recording(recording) else {
return;
};
let callback = move |msgs: &[LogMsg]| {
Python::with_gil(|py| {
let data = encode_ref_as_bytes_local(msgs.iter().map(Ok)).ok_or_log_error()?;
let bytes = PyBytes::new_bound(py, &data);
callback.bind(py).call1((bytes,)).ok_or_log_error()?;
Some(())
});
};
py.allow_threads(|| {
rec.set_sink(Box::new(CallbackSink::new(callback)));
flush_garbage_queue();
});
}
#[pyfunction]
#[pyo3(signature = (recording = None))]
fn binary_stream(
recording: Option<&PyRecordingStream>,
py: Python<'_>,
) -> PyResult<Option<PyBinarySinkStorage>> {
let Some(recording) = get_data_recording(recording) else {
return Ok(None);
};
let inner = py
.allow_threads(|| {
let storage = recording.binary_stream();
flush_garbage_queue();
storage
})
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
Ok(Some(PyBinarySinkStorage { inner }))
}
#[pyclass(frozen)]
struct PyMemorySinkStorage {
inner: MemorySinkStorage,
}
#[pymethods]
impl PyMemorySinkStorage {
#[pyo3(signature = (concat=None))]
fn concat_as_bytes<'p>(
&self,
concat: Option<&Self>,
py: Python<'p>,
) -> PyResult<Bound<'p, PyBytes>> {
py.allow_threads(|| {
let concat_bytes = MemorySinkStorage::concat_memory_sinks_as_bytes(
[Some(&self.inner), concat.map(|c| &c.inner)]
.iter()
.filter_map(|s| *s)
.collect_vec()
.as_slice(),
);
flush_garbage_queue();
concat_bytes
})
.map(|bytes| PyBytes::new_bound(py, bytes.as_slice()))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}
fn num_msgs(&self, py: Python<'_>) -> usize {
py.allow_threads(|| {
let num = self.inner.num_msgs();
flush_garbage_queue();
num
})
}
fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyBytes>> {
py.allow_threads(|| {
let bytes = self.inner.drain_as_bytes();
flush_garbage_queue();
bytes
})
.map(|bytes| PyBytes::new_bound(py, bytes.as_slice()))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}
}
#[pyclass(frozen)]
struct PyBinarySinkStorage {
inner: BinaryStreamStorage,
}
#[pymethods]
impl PyBinarySinkStorage {
#[pyo3(signature = (*, flush = true))]
fn read<'p>(&self, flush: bool, py: Python<'p>) -> Bound<'p, PyBytes> {
PyBytes::new_bound(
py,
py.allow_threads(|| {
if flush {
self.inner.flush();
}
let bytes = self.inner.read();
flush_garbage_queue();
bytes
})
.as_slice(),
)
}
fn flush(&self, py: Python<'_>) {
py.allow_threads(|| {
self.inner.flush();
flush_garbage_queue();
});
}
}
#[allow(clippy::unnecessary_wraps)] #[pyfunction]
#[pyo3(signature = (open_browser, web_port, ws_port, server_memory_limit, default_blueprint = None, recording = None))]
fn serve_web(
open_browser: bool,
web_port: Option<u16>,
ws_port: Option<u16>,
server_memory_limit: String,
default_blueprint: Option<&PyMemorySinkStorage>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
#[cfg(feature = "web_viewer")]
{
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
if re_sdk::forced_sink_path().is_some() {
re_log::debug!("Ignored call to `serve()` since _RERUN_TEST_FORCE_SAVE is set");
return Ok(());
}
let server_memory_limit = re_memory::MemoryLimit::parse(&server_memory_limit)
.map_err(|err| PyRuntimeError::new_err(format!("Bad server_memory_limit: {err}:")))?;
let sink = re_sdk::web_viewer::new_sink(
open_browser,
"0.0.0.0",
web_port.map(WebViewerServerPort).unwrap_or_default(),
ws_port.map(RerunServerPort).unwrap_or_default(),
server_memory_limit,
)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
if let Some(default_blueprint) = default_blueprint {
send_mem_sink_as_default_blueprint(sink.as_ref(), default_blueprint);
}
recording.set_sink(sink);
Ok(())
}
#[cfg(not(feature = "web_viewer"))]
{
_ = default_blueprint;
_ = recording;
_ = web_port;
_ = ws_port;
_ = open_browser;
_ = server_memory_limit;
Err(PyRuntimeError::new_err(
"The Rerun SDK was not compiled with the 'web_viewer' feature",
))
}
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn disconnect(py: Python<'_>, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
py.allow_threads(|| {
recording.disconnect();
flush_garbage_queue();
});
}
#[pyfunction]
#[pyo3(signature = (blocking, recording=None))]
fn flush(py: Python<'_>, blocking: bool, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
py.allow_threads(|| {
if blocking {
recording.flush_blocking();
} else {
recording.flush_async();
}
flush_garbage_queue();
});
}
#[pyfunction]
#[pyo3(signature = (timeline, sequence, recording=None))]
fn set_time_sequence(timeline: &str, sequence: i64, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
recording.set_time_sequence(timeline, sequence);
}
#[pyfunction]
#[pyo3(signature = (timeline, seconds, recording=None))]
fn set_time_seconds(timeline: &str, seconds: f64, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
recording.set_time_seconds(timeline, seconds);
}
#[pyfunction]
#[pyo3(signature = (timeline, nanos, recording=None))]
fn set_time_nanos(timeline: &str, nanos: i64, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
recording.set_time_nanos(timeline, nanos);
}
#[pyfunction]
#[pyo3(signature = (timeline, recording=None))]
fn disable_timeline(timeline: &str, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
recording.disable_timeline(timeline);
}
#[pyfunction]
#[pyo3(signature = (recording=None))]
fn reset_time(recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};
recording.reset_time();
}
#[pyfunction]
#[pyo3(signature = (
entity_path,
components,
static_,
recording=None,
))]
fn log_arrow_msg(
py: Python<'_>,
entity_path: &str,
components: Bound<'_, PyDict>,
static_: bool,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
let entity_path = EntityPath::parse_forgiving(entity_path);
let row = crate::arrow::build_row_from_components(&components, &TimePoint::default())?;
recording.record_row(entity_path, row, !static_);
py.allow_threads(flush_garbage_queue);
Ok(())
}
#[pyfunction]
#[pyo3(signature = (
entity_path,
timelines,
components,
recording=None,
))]
fn send_arrow_chunk(
py: Python<'_>,
entity_path: &str,
timelines: Bound<'_, PyDict>,
components: Bound<'_, PyDict>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
let entity_path = EntityPath::parse_forgiving(entity_path);
let chunk = crate::arrow::build_chunk_from_components(entity_path, &timelines, &components)?;
recording.send_chunk(chunk);
py.allow_threads(flush_garbage_queue);
Ok(())
}
#[pyfunction]
#[pyo3(signature = (
file_path,
entity_path_prefix = None,
static_ = false,
recording = None,
))]
fn log_file_from_path(
py: Python<'_>,
file_path: std::path::PathBuf,
entity_path_prefix: Option<String>,
static_: bool,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
log_file(py, file_path, None, entity_path_prefix, static_, recording)
}
#[pyfunction]
#[pyo3(signature = (
file_path,
file_contents,
entity_path_prefix = None,
static_ = false,
recording = None,
))]
fn log_file_from_contents(
py: Python<'_>,
file_path: std::path::PathBuf,
file_contents: &[u8],
entity_path_prefix: Option<String>,
static_: bool,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
log_file(
py,
file_path,
Some(file_contents),
entity_path_prefix,
static_,
recording,
)
}
fn log_file(
py: Python<'_>,
file_path: std::path::PathBuf,
file_contents: Option<&[u8]>,
entity_path_prefix: Option<String>,
static_: bool,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};
if let Some(contents) = file_contents {
recording
.log_file_from_contents(
file_path,
std::borrow::Cow::Borrowed(contents),
entity_path_prefix.map(Into::into),
static_,
)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
} else {
recording
.log_file_from_path(file_path, entity_path_prefix.map(Into::into), static_)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
}
py.allow_threads(flush_garbage_queue);
Ok(())
}
#[pyfunction]
#[pyo3(signature = (blueprint, make_active = false, make_default = true, recording = None))]
fn send_blueprint(
blueprint: &PyMemorySinkStorage,
make_active: bool,
make_default: bool,
recording: Option<&PyRecordingStream>,
) {
let Some(recording) = get_data_recording(recording) else {
return;
};
if let Some(blueprint_id) = blueprint.inner.store_id() {
let activation_cmd = BlueprintActivationCommand {
blueprint_id,
make_active,
make_default,
};
recording.send_blueprint(blueprint.inner.take(), activation_cmd);
} else {
re_log::warn!("Provided `blueprint` has no store info, cannot send it.");
}
}
#[pyfunction]
fn version() -> String {
re_build_info::build_info!().to_string()
}
#[pyfunction]
fn get_app_url() -> String {
#[cfg(feature = "web_viewer")]
if let Some(hosted_assets) = &*global_web_viewer_server() {
return hosted_assets.server_url();
}
let build_info = re_build_info::build_info!();
if build_info.is_final() {
format!("https://app.rerun.io/version/{}", build_info.version)
} else if let Some(short_git_hash) = build_info.git_hash.get(..7) {
format!("https://app.rerun.io/commit/{short_git_hash}")
} else {
re_log::warn_once!(
"No valid git hash found in build info. Defaulting to app.rerun.io for app url."
);
"https://app.rerun.io".to_owned()
}
}
#[pyfunction]
fn start_web_viewer_server(port: u16) -> PyResult<()> {
#[allow(clippy::unnecessary_wraps)]
#[cfg(feature = "web_viewer")]
{
let mut web_handle = global_web_viewer_server();
*web_handle = Some(
re_web_viewer_server::WebViewerServer::new("0.0.0.0", WebViewerServerPort(port))
.map_err(|err| {
PyRuntimeError::new_err(format!(
"Failed to start web viewer server on port {port}: {err}",
))
})?,
);
Ok(())
}
#[cfg(not(feature = "web_viewer"))]
{
_ = port;
Err(PyRuntimeError::new_err(
"The Rerun SDK was not compiled with the 'web_viewer' feature",
))
}
}
#[pyfunction]
fn escape_entity_path_part(part: &str) -> String {
EntityPathPart::from(part).escaped_string()
}
#[pyfunction]
fn new_entity_path(parts: Vec<Bound<'_, pyo3::types::PyString>>) -> PyResult<String> {
let parts: PyResult<Vec<_>> = parts.iter().map(|part| part.to_cow()).collect();
let path = EntityPath::from(
parts?
.into_iter()
.map(|part| EntityPathPart::from(part.borrow()))
.collect_vec(),
);
Ok(path.to_string())
}
fn python_version(py: Python<'_>) -> re_log_types::PythonVersion {
let py_version = py.version_info();
re_log_types::PythonVersion {
major: py_version.major,
minor: py_version.minor,
patch: py_version.patch,
suffix: py_version.suffix.map(|s| s.to_owned()).unwrap_or_default(),
}
}
fn default_store_id(py: Python<'_>, variant: StoreKind, application_id: &str) -> StoreId {
use rand::{Rng as _, SeedableRng as _};
use std::hash::{Hash as _, Hasher as _};
let seed = match authkey(py) {
Ok(seed) => seed,
Err(err) => {
re_log::error_once!("Failed to retrieve python authkey: {err}\nMultiprocessing will result in split recordings.");
let bytes = rand::Rng::gen::<[u8; 8]>(&mut rand::thread_rng());
bytes.to_vec()
}
};
let salt: u64 = 0xab12_cd34_ef56_0178;
let mut hasher = std::collections::hash_map::DefaultHasher::default();
seed.hash(&mut hasher);
salt.hash(&mut hasher);
application_id.hash(&mut hasher);
let mut rng = rand::rngs::StdRng::seed_from_u64(hasher.finish());
let uuid = uuid::Builder::from_random_bytes(rng.gen()).into_uuid();
StoreId::from_uuid(variant, uuid)
}
fn authkey(py: Python<'_>) -> PyResult<Vec<u8>> {
let locals = PyDict::new_bound(py);
py.run_bound(
r#"
import multiprocessing
# authkey is the same for child and parent processes, so this is how we know we're the same
authkey = multiprocessing.current_process().authkey
"#,
None,
Some(&locals),
)
.and_then(|()| {
locals
.get_item("authkey")?
.ok_or_else(|| PyRuntimeError::new_err("authkey missing from expected locals"))
})
.and_then(|authkey| {
authkey
.downcast()
.cloned()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
})
.map(|authkey: Bound<'_, PyBytes>| authkey.as_bytes().to_vec())
}