1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::path::PathBuf;

use anyhow::Context;
use crossbeam::channel;
use itertools::Itertools as _;

use re_chunk::external::crossbeam;
use re_log_types::LogMsg;

// ---

/// Asynchronously decodes potentially multiplexed RRD streams from the given `paths`, or standard
/// input if none are specified.
///
/// This function returns 2 channels:
/// * The first channel contains both the successfully decoded data, if any, as well as any
///   errors faced during processing.
/// * The second channel, which will fire only once, after all processing is done, indicates the
///   total number of bytes processed.
///
/// This function is best-effort: it will try to make progress even in the face of errors.
/// It is up to the user to decide whether and when to stop.
///
/// This function is capable of decoding multiple independent recordings from a single stream.
pub fn read_rrd_streams_from_file_or_stdin(
    version_policy: re_log_encoding::decoder::VersionPolicy,
    paths: &[String],
) -> (
    channel::Receiver<anyhow::Result<LogMsg>>,
    channel::Receiver<u64>,
) {
    let path_to_input_rrds = paths.iter().map(PathBuf::from).collect_vec();

    // TODO(cmc): might want to make this configurable at some point.
    let (tx, rx) = crossbeam::channel::bounded(100);
    let (tx_size_bytes, rx_size_bytes) = crossbeam::channel::bounded(1);

    _ = std::thread::Builder::new()
        .name("rerun-rrd-in".to_owned())
        .spawn(move || {
            let mut size_bytes = 0;

            if path_to_input_rrds.is_empty() {
                // stdin

                let stdin = std::io::BufReader::new(std::io::stdin().lock());

                let mut decoder = match re_log_encoding::decoder::Decoder::new_concatenated(
                    version_policy,
                    stdin,
                )
                .context("couldn't decode stdin stream -- skipping")
                {
                    Ok(decoder) => decoder,
                    Err(err) => {
                        tx.send(Err(err)).ok();
                        return;
                    }
                };

                for res in &mut decoder {
                    let res = res.context("couldn't decode message from stdin -- skipping");
                    tx.send(res).ok();
                }

                size_bytes += decoder.size_bytes();
            } else {
                // file(s)

                for rrd_path in path_to_input_rrds {
                    let rrd_file = match std::fs::File::open(&rrd_path)
                        .with_context(|| format!("couldn't open {rrd_path:?} -- skipping"))
                    {
                        Ok(file) => file,
                        Err(err) => {
                            tx.send(Err(err)).ok();
                            continue;
                        }
                    };

                    let mut decoder =
                        match re_log_encoding::decoder::Decoder::new(version_policy, rrd_file)
                            .with_context(|| format!("couldn't decode {rrd_path:?} -- skipping"))
                        {
                            Ok(decoder) => decoder,
                            Err(err) => {
                                tx.send(Err(err)).ok();
                                continue;
                            }
                        };

                    for res in &mut decoder {
                        let res = res.context("decode rrd message").with_context(|| {
                            format!("couldn't decode message {rrd_path:?} -- skipping")
                        });
                        tx.send(res).ok();
                    }

                    size_bytes += decoder.size_bytes();
                }
            }

            tx_size_bytes.send(size_bytes).ok();
        });

    (rx, rx_size_bytes)
}