1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
use std::path::PathBuf;
use anyhow::Context;
use crossbeam::channel;
use itertools::Itertools as _;
use re_chunk::external::crossbeam;
use re_log_types::LogMsg;
// ---
/// Asynchronously decodes potentially multiplexed RRD streams from the given `paths`, or standard
/// input if none are specified.
///
/// This function returns 2 channels:
/// * The first channel contains both the successfully decoded data, if any, as well as any
/// errors faced during processing.
/// * The second channel, which will fire only once, after all processing is done, indicates the
/// total number of bytes processed.
///
/// This function is best-effort: it will try to make progress even in the face of errors.
/// It is up to the user to decide whether and when to stop.
///
/// This function is capable of decoding multiple independent recordings from a single stream.
pub fn read_rrd_streams_from_file_or_stdin(
version_policy: re_log_encoding::decoder::VersionPolicy,
paths: &[String],
) -> (
channel::Receiver<anyhow::Result<LogMsg>>,
channel::Receiver<u64>,
) {
let path_to_input_rrds = paths.iter().map(PathBuf::from).collect_vec();
// TODO(cmc): might want to make this configurable at some point.
let (tx, rx) = crossbeam::channel::bounded(100);
let (tx_size_bytes, rx_size_bytes) = crossbeam::channel::bounded(1);
_ = std::thread::Builder::new()
.name("rerun-rrd-in".to_owned())
.spawn(move || {
let mut size_bytes = 0;
if path_to_input_rrds.is_empty() {
// stdin
let stdin = std::io::BufReader::new(std::io::stdin().lock());
let mut decoder = match re_log_encoding::decoder::Decoder::new_concatenated(
version_policy,
stdin,
)
.context("couldn't decode stdin stream -- skipping")
{
Ok(decoder) => decoder,
Err(err) => {
tx.send(Err(err)).ok();
return;
}
};
for res in &mut decoder {
let res = res.context("couldn't decode message from stdin -- skipping");
tx.send(res).ok();
}
size_bytes += decoder.size_bytes();
} else {
// file(s)
for rrd_path in path_to_input_rrds {
let rrd_file = match std::fs::File::open(&rrd_path)
.with_context(|| format!("couldn't open {rrd_path:?} -- skipping"))
{
Ok(file) => file,
Err(err) => {
tx.send(Err(err)).ok();
continue;
}
};
let mut decoder =
match re_log_encoding::decoder::Decoder::new(version_policy, rrd_file)
.with_context(|| format!("couldn't decode {rrd_path:?} -- skipping"))
{
Ok(decoder) => decoder,
Err(err) => {
tx.send(Err(err)).ok();
continue;
}
};
for res in &mut decoder {
let res = res.context("decode rrd message").with_context(|| {
format!("couldn't decode message {rrd_path:?} -- skipping")
});
tx.send(res).ok();
}
size_bytes += decoder.size_bytes();
}
}
tx_size_bytes.send(size_bytes).ok();
});
(rx, rx_size_bytes)
}