1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
use std::borrow::Cow;
use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;
use crate::{DataLoader, DataLoaderError, LoadedData, RrdLoader};
// ---
/// Loads the given `path` using all [`crate::DataLoader`]s available.
///
/// A single `path` might be handled by more than one loader.
///
/// Synchronously checks whether the file exists and can be loaded. Beyond that, all
/// errors are asynchronous and handled directly by the [`crate::DataLoader`]s themselves
/// (i.e. they're logged).
#[cfg(not(target_arch = "wasm32"))]
pub fn load_from_path(
settings: &crate::DataLoaderSettings,
file_source: FileSource,
path: &std::path::Path,
// NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
re_tracing::profile_function!(path.to_string_lossy());
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("path does not exist: {path:?}"),
)
.into());
}
re_log::info!("Loading {path:?}…");
let rx = load(settings, path, None)?;
send(settings.clone(), file_source, rx, tx);
Ok(())
}
/// Loads the given `contents` using all [`crate::DataLoader`]s available.
///
/// A single file might be handled by more than one loader.
///
/// Synchronously checks that the file can be loaded. Beyond that, all errors are asynchronous
/// and handled directly by the [`crate::DataLoader`]s themselves (i.e. they're logged).
///
/// `path` is only used for informational purposes, no data is ever read from the filesystem.
pub fn load_from_file_contents(
settings: &crate::DataLoaderSettings,
file_source: FileSource,
filepath: &std::path::Path,
contents: std::borrow::Cow<'_, [u8]>,
// NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
re_tracing::profile_function!(filepath.to_string_lossy());
re_log::info!("Loading {filepath:?}…");
let data = load(settings, filepath, Some(contents))?;
send(settings.clone(), file_source, data, tx);
Ok(())
}
// ---
/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
pub(crate) fn prepare_store_info(
application_id: re_log_types::ApplicationId,
store_id: &re_log_types::StoreId,
file_source: FileSource,
) -> LogMsg {
re_tracing::profile_function!();
use re_log_types::SetStoreInfo;
let store_source = re_log_types::StoreSource::File { file_source };
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id,
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: re_log_types::Time::now(),
store_source,
store_version: Some(re_build_info::CrateVersion::LOCAL),
},
})
}
/// Loads the data at `path` using all available [`crate::DataLoader`]s.
///
/// On success, returns a channel with all the [`LoadedData`]:
/// - On native, this is filled asynchronously from other threads.
/// - On wasm, this is pre-filled synchronously.
///
/// There is only one way this function can return an error: not a single [`crate::DataLoader`]
/// (whether it is builtin, custom or external) was capable of loading the data, in which case
/// [`DataLoaderError::Incompatible`] will be returned.
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn load(
settings: &crate::DataLoaderSettings,
path: &std::path::Path,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
re_tracing::profile_function!(path.display().to_string());
// On native we run loaders in parallel so this needs to become static.
let contents: Option<std::sync::Arc<std::borrow::Cow<'static, [u8]>>> =
contents.map(|contents| std::sync::Arc::new(Cow::Owned(contents.into_owned())));
let rx_loader = {
let (tx_loader, rx_loader) = std::sync::mpsc::channel();
let any_compatible_loader = {
#[derive(PartialEq, Eq)]
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();
// Prevent passing RRD paths to external (and other) loaders.
// See <https://github.com/rerun-io/rerun/issues/6530>.
let loaders = {
use crate::DataLoader as _;
use rayon::iter::Either;
let extension = crate::extension(path);
if crate::SUPPORTED_RERUN_EXTENSIONS.contains(&extension.as_str()) {
Either::Left(
crate::iter_loaders()
.filter(|loader| loader.name() == crate::RrdLoader.name()),
)
} else {
Either::Right(crate::iter_loaders())
}
};
for loader in loaders {
let loader = std::sync::Arc::clone(&loader);
let settings = settings.clone();
let path = path.to_owned();
let contents = contents.clone(); // arc
let tx_loader = tx_loader.clone();
let tx_feedback = tx_feedback.clone();
rayon::spawn(move || {
re_tracing::profile_scope!("inner", loader.name());
if let Some(contents) = contents.as_deref() {
let contents = Cow::Borrowed(contents.as_ref());
if let Err(err) = loader.load_from_file_contents(
&settings,
path.clone(),
contents,
tx_loader,
) {
if err.is_incompatible() {
return;
}
re_log::error!(?path, loader = loader.name(), %err, "Failed to load data");
}
} else if let Err(err) =
loader.load_from_path(&settings, path.clone(), tx_loader)
{
if err.is_incompatible() {
return;
}
re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
}
re_log::debug!(loader = loader.name(), ?path, "compatible loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
});
}
re_tracing::profile_wait!("compatible_loader");
drop(tx_feedback);
rx_feedback.recv() == Ok(CompatibleLoaderFound)
};
// Implicitly closing `tx_loader`!
any_compatible_loader.then_some(rx_loader)
};
if let Some(rx_loader) = rx_loader {
Ok(rx_loader)
} else {
Err(DataLoaderError::Incompatible(path.to_owned()))
}
}
/// Loads the data at `path` using all available [`crate::DataLoader`]s.
///
/// On success, returns a channel (pre-filled synchronously) with all the [`LoadedData`].
///
/// There is only one way this function can return an error: not a single [`crate::DataLoader`]
/// (whether it is builtin, custom or external) was capable of loading the data, in which case
/// [`DataLoaderError::Incompatible`] will be returned.
#[cfg(target_arch = "wasm32")]
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn load(
settings: &crate::DataLoaderSettings,
path: &std::path::Path,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
re_tracing::profile_function!(path.display().to_string());
let rx_loader = {
let (tx_loader, rx_loader) = std::sync::mpsc::channel();
let any_compatible_loader = crate::iter_loaders().map(|loader| {
if let Some(contents) = contents.as_deref() {
let settings = settings.clone();
let tx_loader = tx_loader.clone();
let path = path.to_owned();
let contents = Cow::Borrowed(contents);
if let Err(err) = loader.load_from_file_contents(&settings, path.clone(), contents, tx_loader) {
if err.is_incompatible() {
return false;
}
re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
}
true
} else {
false
}
})
.reduce(|any_compatible, is_compatible| any_compatible || is_compatible)
.unwrap_or(false);
// Implicitly closing `tx_loader`!
any_compatible_loader.then_some(rx_loader)
};
if let Some(rx_loader) = rx_loader {
Ok(rx_loader)
} else {
Err(DataLoaderError::Incompatible(path.to_owned()))
}
}
/// Forwards the data in `rx_loader` to `tx`, taking care of necessary conversions, if any.
///
/// Runs asynchronously from another thread on native, synchronously on wasm.
pub(crate) fn send(
settings: crate::DataLoaderSettings,
file_source: FileSource,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();
#[derive(Default, Debug)]
struct Tracked {
is_rrd_or_rbl: bool,
already_has_store_info: bool,
}
let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();
let tx = tx.clone();
move || {
// ## Ignoring channel errors
//
// Not our problem whether or not the other end has hung up, but we still want to
// poll the channel in any case so as to make sure that the data producer
// doesn't get stuck.
for data in rx_loader {
let data_loader_name = data.data_loader_name().clone();
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
LogMsg::SetStoreInfo(set_store_info) => {
Some((set_store_info.info.store_id.clone(), true))
}
LogMsg::ArrowMsg(store_id, _arrow_msg) => {
Some((store_id.clone(), false))
}
LogMsg::BlueprintActivationCommand(_) => None,
};
if let Some((store_id, store_info_created)) = store_info {
let tracked = store_info_tracker.entry(store_id).or_default();
tracked.is_rrd_or_rbl =
*data_loader_name == RrdLoader::name(&RrdLoader);
tracked.already_has_store_info |= store_info_created;
}
msg
}
Err(err) => {
re_log::error!(%err, "Couldn't serialize component data");
continue;
}
};
tx.send(msg).ok();
}
for (store_id, tracked) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();
// Never try to send custom store info for RRDs and RBLs, they always have their own, and
// it's always right.
let should_force_store_info = settings.force_store_info && !tracked.is_rrd_or_rbl;
let should_send_new_store_info = should_force_store_info
|| (!tracked.already_has_store_info && !is_a_preexisting_recording);
if should_send_new_store_info {
let app_id = settings
.opened_application_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string().into());
let store_info = prepare_store_info(app_id, &store_id, file_source.clone());
tx.send(store_info).ok();
}
}
tx.quit(None).ok();
}
});
}
// NOTE:
// - On native, we parallelize using `rayon`.
// - On wasm, we serialize everything, which works because the data-loading channels are unbounded.
#[cfg(not(target_arch = "wasm32"))]
fn spawn<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
rayon::spawn(f);
}
#[cfg(target_arch = "wasm32")]
fn spawn<F>(f: F)
where
F: FnOnce(),
{
f();
}