1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
use std::borrow::Cow;

use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;

use crate::{DataLoader, DataLoaderError, LoadedData, RrdLoader};

// ---

/// Loads the given `path` using all [`crate::DataLoader`]s available.
///
/// A single `path` might be handled by more than one loader.
///
/// Synchronously checks whether the file exists and can be loaded. Beyond that, all
/// errors are asynchronous and handled directly by the [`crate::DataLoader`]s themselves
/// (i.e. they're logged).
#[cfg(not(target_arch = "wasm32"))]
pub fn load_from_path(
    settings: &crate::DataLoaderSettings,
    file_source: FileSource,
    path: &std::path::Path,
    // NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
    tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
    re_tracing::profile_function!(path.to_string_lossy());

    if !path.exists() {
        return Err(std::io::Error::new(
            std::io::ErrorKind::NotFound,
            format!("path does not exist: {path:?}"),
        )
        .into());
    }

    re_log::info!("Loading {path:?}…");

    let rx = load(settings, path, None)?;

    send(settings.clone(), file_source, rx, tx);

    Ok(())
}

/// Loads the given `contents` using all [`crate::DataLoader`]s available.
///
/// A single file might be handled by more than one loader.
///
/// Synchronously checks that the file can be loaded. Beyond that, all errors are asynchronous
/// and handled directly by the [`crate::DataLoader`]s themselves (i.e. they're logged).
///
/// `path` is only used for informational purposes, no data is ever read from the filesystem.
pub fn load_from_file_contents(
    settings: &crate::DataLoaderSettings,
    file_source: FileSource,
    filepath: &std::path::Path,
    contents: std::borrow::Cow<'_, [u8]>,
    // NOTE: This channel must be unbounded since we serialize all operations when running on wasm.
    tx: &Sender<LogMsg>,
) -> Result<(), DataLoaderError> {
    re_tracing::profile_function!(filepath.to_string_lossy());

    re_log::info!("Loading {filepath:?}…");

    let data = load(settings, filepath, Some(contents))?;

    send(settings.clone(), file_source, data, tx);

    Ok(())
}

// ---

/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
pub(crate) fn prepare_store_info(
    application_id: re_log_types::ApplicationId,
    store_id: &re_log_types::StoreId,
    file_source: FileSource,
) -> LogMsg {
    re_tracing::profile_function!();

    use re_log_types::SetStoreInfo;

    let store_source = re_log_types::StoreSource::File { file_source };

    LogMsg::SetStoreInfo(SetStoreInfo {
        row_id: *re_chunk::RowId::new(),
        info: re_log_types::StoreInfo {
            application_id,
            store_id: store_id.clone(),
            cloned_from: None,
            is_official_example: false,
            started: re_log_types::Time::now(),
            store_source,
            store_version: Some(re_build_info::CrateVersion::LOCAL),
        },
    })
}

/// Loads the data at `path` using all available [`crate::DataLoader`]s.
///
/// On success, returns a channel with all the [`LoadedData`]:
/// - On native, this is filled asynchronously from other threads.
/// - On wasm, this is pre-filled synchronously.
///
/// There is only one way this function can return an error: not a single [`crate::DataLoader`]
/// (whether it is builtin, custom or external) was capable of loading the data, in which case
/// [`DataLoaderError::Incompatible`] will be returned.
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn load(
    settings: &crate::DataLoaderSettings,
    path: &std::path::Path,
    contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
    re_tracing::profile_function!(path.display().to_string());

    // On native we run loaders in parallel so this needs to become static.
    let contents: Option<std::sync::Arc<std::borrow::Cow<'static, [u8]>>> =
        contents.map(|contents| std::sync::Arc::new(Cow::Owned(contents.into_owned())));

    let rx_loader = {
        let (tx_loader, rx_loader) = std::sync::mpsc::channel();

        let any_compatible_loader = {
            #[derive(PartialEq, Eq)]
            struct CompatibleLoaderFound;
            let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();

            // Prevent passing RRD paths to external (and other) loaders.
            // See <https://github.com/rerun-io/rerun/issues/6530>.
            let loaders = {
                use crate::DataLoader as _;
                use rayon::iter::Either;

                let extension = crate::extension(path);
                if crate::SUPPORTED_RERUN_EXTENSIONS.contains(&extension.as_str()) {
                    Either::Left(
                        crate::iter_loaders()
                            .filter(|loader| loader.name() == crate::RrdLoader.name()),
                    )
                } else {
                    Either::Right(crate::iter_loaders())
                }
            };

            for loader in loaders {
                let loader = std::sync::Arc::clone(&loader);

                let settings = settings.clone();
                let path = path.to_owned();
                let contents = contents.clone(); // arc

                let tx_loader = tx_loader.clone();
                let tx_feedback = tx_feedback.clone();

                rayon::spawn(move || {
                    re_tracing::profile_scope!("inner", loader.name());

                    if let Some(contents) = contents.as_deref() {
                        let contents = Cow::Borrowed(contents.as_ref());

                        if let Err(err) = loader.load_from_file_contents(
                            &settings,
                            path.clone(),
                            contents,
                            tx_loader,
                        ) {
                            if err.is_incompatible() {
                                return;
                            }
                            re_log::error!(?path, loader = loader.name(), %err, "Failed to load data");
                        }
                    } else if let Err(err) =
                        loader.load_from_path(&settings, path.clone(), tx_loader)
                    {
                        if err.is_incompatible() {
                            return;
                        }
                        re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
                    }

                    re_log::debug!(loader = loader.name(), ?path, "compatible loader found");
                    tx_feedback.send(CompatibleLoaderFound).ok();
                });
            }

            re_tracing::profile_wait!("compatible_loader");

            drop(tx_feedback);

            rx_feedback.recv() == Ok(CompatibleLoaderFound)
        };

        // Implicitly closing `tx_loader`!

        any_compatible_loader.then_some(rx_loader)
    };

    if let Some(rx_loader) = rx_loader {
        Ok(rx_loader)
    } else {
        Err(DataLoaderError::Incompatible(path.to_owned()))
    }
}

/// Loads the data at `path` using all available [`crate::DataLoader`]s.
///
/// On success, returns a channel (pre-filled synchronously) with all the [`LoadedData`].
///
/// There is only one way this function can return an error: not a single [`crate::DataLoader`]
/// (whether it is builtin, custom or external) was capable of loading the data, in which case
/// [`DataLoaderError::Incompatible`] will be returned.
#[cfg(target_arch = "wasm32")]
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn load(
    settings: &crate::DataLoaderSettings,
    path: &std::path::Path,
    contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
    re_tracing::profile_function!(path.display().to_string());

    let rx_loader = {
        let (tx_loader, rx_loader) = std::sync::mpsc::channel();

        let any_compatible_loader = crate::iter_loaders().map(|loader| {
            if let Some(contents) = contents.as_deref() {
                let settings = settings.clone();
                let tx_loader = tx_loader.clone();
                let path = path.to_owned();
                let contents = Cow::Borrowed(contents);

                if let Err(err) = loader.load_from_file_contents(&settings, path.clone(), contents, tx_loader) {
                    if err.is_incompatible() {
                        return false;
                    }
                    re_log::error!(?path, loader = loader.name(), %err, "Failed to load data from file");
                }

                true
            } else {
                false
            }
        })
            .reduce(|any_compatible, is_compatible| any_compatible || is_compatible)
            .unwrap_or(false);

        // Implicitly closing `tx_loader`!

        any_compatible_loader.then_some(rx_loader)
    };

    if let Some(rx_loader) = rx_loader {
        Ok(rx_loader)
    } else {
        Err(DataLoaderError::Incompatible(path.to_owned()))
    }
}

/// Forwards the data in `rx_loader` to `tx`, taking care of necessary conversions, if any.
///
/// Runs asynchronously from another thread on native, synchronously on wasm.
pub(crate) fn send(
    settings: crate::DataLoaderSettings,
    file_source: FileSource,
    rx_loader: std::sync::mpsc::Receiver<LoadedData>,
    tx: &Sender<LogMsg>,
) {
    spawn({
        re_tracing::profile_function!();

        #[derive(Default, Debug)]
        struct Tracked {
            is_rrd_or_rbl: bool,
            already_has_store_info: bool,
        }

        let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();

        let tx = tx.clone();
        move || {
            // ## Ignoring channel errors
            //
            // Not our problem whether or not the other end has hung up, but we still want to
            // poll the channel in any case so as to make sure that the data producer
            // doesn't get stuck.
            for data in rx_loader {
                let data_loader_name = data.data_loader_name().clone();
                let msg = match data.into_log_msg() {
                    Ok(msg) => {
                        let store_info = match &msg {
                            LogMsg::SetStoreInfo(set_store_info) => {
                                Some((set_store_info.info.store_id.clone(), true))
                            }
                            LogMsg::ArrowMsg(store_id, _arrow_msg) => {
                                Some((store_id.clone(), false))
                            }
                            LogMsg::BlueprintActivationCommand(_) => None,
                        };

                        if let Some((store_id, store_info_created)) = store_info {
                            let tracked = store_info_tracker.entry(store_id).or_default();
                            tracked.is_rrd_or_rbl =
                                *data_loader_name == RrdLoader::name(&RrdLoader);
                            tracked.already_has_store_info |= store_info_created;
                        }

                        msg
                    }
                    Err(err) => {
                        re_log::error!(%err, "Couldn't serialize component data");
                        continue;
                    }
                };
                tx.send(msg).ok();
            }

            for (store_id, tracked) in store_info_tracker {
                let is_a_preexisting_recording =
                    Some(&store_id) == settings.opened_store_id.as_ref();

                // Never try to send custom store info for RRDs and RBLs, they always have their own, and
                // it's always right.
                let should_force_store_info = settings.force_store_info && !tracked.is_rrd_or_rbl;

                let should_send_new_store_info = should_force_store_info
                    || (!tracked.already_has_store_info && !is_a_preexisting_recording);

                if should_send_new_store_info {
                    let app_id = settings
                        .opened_application_id
                        .clone()
                        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string().into());
                    let store_info = prepare_store_info(app_id, &store_id, file_source.clone());
                    tx.send(store_info).ok();
                }
            }

            tx.quit(None).ok();
        }
    });
}

// NOTE:
// - On native, we parallelize using `rayon`.
// - On wasm, we serialize everything, which works because the data-loading channels are unbounded.

#[cfg(not(target_arch = "wasm32"))]
fn spawn<F>(f: F)
where
    F: FnOnce() + Send + 'static,
{
    rayon::spawn(f);
}

#[cfg(target_arch = "wasm32")]
fn spawn<F>(f: F)
where
    F: FnOnce(),
{
    f();
}