rerun/commands/
stdio.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::path::PathBuf;

use anyhow::Context as _;
use crossbeam::channel;
use itertools::Itertools as _;

use re_chunk::external::crossbeam;
use re_log_types::LogMsg;

// ---

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum InputSource {
    Stdin,
    File(PathBuf),
}

impl std::fmt::Display for InputSource {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Stdin => write!(f, "stdin"),
            Self::File(path) => write!(f, "{path:?}"),
        }
    }
}

/// Asynchronously decodes potentially multiplexed RRD streams from the given `paths`, or standard
/// input if none are specified.
///
/// This function returns 2 channels:
/// * The first channel contains both the successfully decoded data, if any, as well as any
///   errors faced during processing.
/// * The second channel, which will fire only once, after all processing is done, indicates the
///   total number of bytes processed.
///
/// This function is best-effort: it will try to make progress even in the face of errors.
/// It is up to the user to decide whether and when to stop.
///
/// This function is capable of decoding multiple independent recordings from a single stream.
pub fn read_rrd_streams_from_file_or_stdin(
    paths: &[String],
) -> (
    channel::Receiver<(InputSource, anyhow::Result<LogMsg>)>,
    channel::Receiver<u64>,
) {
    let path_to_input_rrds = paths
        .iter()
        .filter(|s| !s.is_empty()) // Avoid a problem with `pixi run check-backwards-compatibility`
        .map(PathBuf::from)
        .collect_vec();

    // TODO(cmc): might want to make this configurable at some point.
    let (tx, rx) = crossbeam::channel::bounded(100);
    let (tx_size_bytes, rx_size_bytes) = crossbeam::channel::bounded(1);

    _ = std::thread::Builder::new()
        .name("rerun-rrd-in".to_owned())
        .spawn(move || {
            let mut size_bytes = 0;

            if path_to_input_rrds.is_empty() {
                // stdin

                let stdin = std::io::BufReader::new(std::io::stdin().lock());

                let mut decoder = match re_log_encoding::decoder::Decoder::new_concatenated(stdin)
                    .context("couldn't decode stdin stream -- skipping")
                {
                    Ok(decoder) => decoder,
                    Err(err) => {
                        tx.send((InputSource::Stdin, Err(err))).ok();
                        return;
                    }
                };

                for res in &mut decoder {
                    let res = res.context("couldn't decode message from stdin -- skipping");
                    tx.send((InputSource::Stdin, res)).ok();
                }

                size_bytes += decoder.size_bytes();
            } else {
                // file(s)

                for rrd_path in path_to_input_rrds {
                    let rrd_file = match std::fs::File::open(&rrd_path)
                        .with_context(|| format!("couldn't open {rrd_path:?} -- skipping"))
                    {
                        Ok(file) => file,
                        Err(err) => {
                            tx.send((InputSource::File(rrd_path.clone()), Err(err)))
                                .ok();
                            continue;
                        }
                    };

                    let mut decoder = match re_log_encoding::decoder::Decoder::new(rrd_file)
                        .with_context(|| format!("couldn't decode {rrd_path:?} -- skipping"))
                    {
                        Ok(decoder) => decoder,
                        Err(err) => {
                            tx.send((InputSource::File(rrd_path.clone()), Err(err)))
                                .ok();
                            continue;
                        }
                    };

                    for res in &mut decoder {
                        let res = res.context("decode rrd message").with_context(|| {
                            format!("couldn't decode message {rrd_path:?} -- skipping")
                        });
                        tx.send((InputSource::File(rrd_path.clone()), res)).ok();
                    }

                    size_bytes += decoder.size_bytes();
                }
            }

            tx_size_bytes.send(size_bytes).ok();
        });

    (rx, rx_size_bytes)
}