pub mod stream;
use std::io::BufRead as _;
use std::io::Read;
use re_build_info::CrateVersion;
use re_log_types::LogMsg;
use crate::codec;
use crate::codec::file::decoder;
use crate::FileHeader;
use crate::MessageHeader;
use crate::VersionPolicy;
use crate::OLD_RRD_HEADERS;
use crate::{Compression, EncodingOptions, Serializer};
fn warn_on_version_mismatch(
version_policy: VersionPolicy,
encoded_version: [u8; 4],
) -> Result<(), DecodeError> {
let encoded_version = if encoded_version == [0, 0, 0, 0] {
CrateVersion::new(0, 2, 0)
} else {
CrateVersion::from_bytes(encoded_version)
};
if encoded_version.is_compatible_with(CrateVersion::LOCAL) {
Ok(())
} else {
match version_policy {
VersionPolicy::Warn => {
re_log::warn_once!(
"Found log stream with Rerun version {encoded_version}, \
which is incompatible with the local Rerun version {}. \
Loading will try to continue, but might fail in subtle ways.",
CrateVersion::LOCAL,
);
Ok(())
}
VersionPolicy::Error => Err(DecodeError::IncompatibleRerunVersion {
file: encoded_version,
local: CrateVersion::LOCAL,
}),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
#[error("Not an .rrd file")]
NotAnRrd,
#[error("Data was from an old, incompatible Rerun version")]
OldRrdVersion,
#[error("Data from Rerun version {file}, which is incompatible with the local Rerun version {local}")]
IncompatibleRerunVersion {
file: CrateVersion,
local: CrateVersion,
},
#[error("Failed to decode the options: {0}")]
Options(#[from] crate::OptionsError),
#[error("Failed to read: {0}")]
Read(#[from] std::io::Error),
#[error("lz4 error: {0}")]
Lz4(#[from] lz4_flex::block::DecompressError),
#[error("Protobuf error: {0}")]
Protobuf(#[from] re_protos::external::prost::DecodeError),
#[error("Could not convert type from protobuf: {0}")]
TypeConversion(#[from] re_protos::TypeConversionError),
#[error("Failed to read chunk: {0}")]
Chunk(#[from] re_chunk::ChunkError),
#[error("Arrow error: {0}")]
Arrow(#[from] arrow2::error::Error),
#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
#[error("Codec error: {0}")]
Codec(#[from] codec::CodecError),
}
pub fn decode_bytes(
version_policy: VersionPolicy,
bytes: &[u8],
) -> Result<Vec<LogMsg>, DecodeError> {
re_tracing::profile_function!();
let decoder = Decoder::new(version_policy, std::io::Cursor::new(bytes))?;
let mut msgs = vec![];
for msg in decoder {
msgs.push(msg?);
}
Ok(msgs)
}
pub fn read_options(
version_policy: VersionPolicy,
bytes: &[u8],
) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
let mut read = std::io::Cursor::new(bytes);
let FileHeader {
magic,
version,
options,
} = FileHeader::decode(&mut read)?;
if OLD_RRD_HEADERS.contains(&magic) {
return Err(DecodeError::OldRrdVersion);
} else if &magic != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}
warn_on_version_mismatch(version_policy, version)?;
match options.serializer {
Serializer::MsgPack | Serializer::Protobuf => {}
}
Ok((CrateVersion::from_bytes(version), options))
}
enum Reader<R: std::io::Read> {
Raw(R),
Buffered(std::io::BufReader<R>),
}
impl<R: std::io::Read> std::io::Read for Reader<R> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
Self::Raw(read) => read.read(buf),
Self::Buffered(read) => read.read(buf),
}
}
}
pub struct Decoder<R: std::io::Read> {
version: CrateVersion,
options: EncodingOptions,
read: Reader<R>,
uncompressed: Vec<u8>, compressed: Vec<u8>, size_bytes: u64,
}
impl<R: std::io::Read> Decoder<R> {
pub fn new(version_policy: VersionPolicy, mut read: R) -> Result<Self, DecodeError> {
re_tracing::profile_function!();
let mut data = [0_u8; FileHeader::SIZE];
read.read_exact(&mut data).map_err(DecodeError::Read)?;
let (version, options) = read_options(version_policy, &data)?;
Ok(Self {
version,
options,
read: Reader::Raw(read),
uncompressed: vec![],
compressed: vec![],
size_bytes: FileHeader::SIZE as _,
})
}
pub fn new_concatenated(
version_policy: VersionPolicy,
mut read: std::io::BufReader<R>,
) -> Result<Self, DecodeError> {
re_tracing::profile_function!();
let mut data = [0_u8; FileHeader::SIZE];
read.read_exact(&mut data).map_err(DecodeError::Read)?;
let (version, options) = read_options(version_policy, &data)?;
Ok(Self {
version,
options,
read: Reader::Buffered(read),
uncompressed: vec![],
compressed: vec![],
size_bytes: FileHeader::SIZE as _,
})
}
#[inline]
pub fn version(&self) -> CrateVersion {
self.version
}
#[inline]
pub fn size_bytes(&self) -> u64 {
self.size_bytes
}
fn peek_file_header(&mut self) -> bool {
match &mut self.read {
Reader::Raw(_) => false,
Reader::Buffered(read) => {
if read.fill_buf().map_err(DecodeError::Read).is_err() {
return false;
}
let mut read = std::io::Cursor::new(read.buffer());
if FileHeader::decode(&mut read).is_err() {
return false;
}
true
}
}
}
}
impl<R: std::io::Read> Iterator for Decoder<R> {
type Item = Result<LogMsg, DecodeError>;
fn next(&mut self) -> Option<Self::Item> {
re_tracing::profile_function!();
if self.peek_file_header() {
let mut data = [0_u8; FileHeader::SIZE];
if let Err(err) = self.read.read_exact(&mut data).map_err(DecodeError::Read) {
return Some(Err(err));
}
let (version, options) = match read_options(VersionPolicy::Warn, &data) {
Ok(opts) => opts,
Err(err) => return Some(Err(err)),
};
self.version = CrateVersion::max(self.version, version);
self.options = options;
self.size_bytes += FileHeader::SIZE as u64;
}
let msg = match self.options.serializer {
Serializer::Protobuf => match decoder::decode(&mut self.read) {
Ok((read_bytes, msg)) => {
self.size_bytes += read_bytes;
msg
}
Err(err) => return Some(Err(err)),
},
Serializer::MsgPack => {
let header = match MessageHeader::decode(&mut self.read) {
Ok(header) => header,
Err(err) => match err {
DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return None;
}
other => return Some(Err(other)),
},
};
self.size_bytes += MessageHeader::SIZE as u64;
match header {
MessageHeader::Data {
compressed_len,
uncompressed_len,
} => {
let uncompressed_len = uncompressed_len as usize;
let compressed_len = compressed_len as usize;
self.uncompressed
.resize(self.uncompressed.len().max(uncompressed_len), 0);
match self.options.compression {
Compression::Off => {
re_tracing::profile_scope!("read uncompressed");
if let Err(err) = self
.read
.read_exact(&mut self.uncompressed[..uncompressed_len])
{
return Some(Err(DecodeError::Read(err)));
}
self.size_bytes += uncompressed_len as u64;
}
Compression::LZ4 => {
self.compressed
.resize(self.compressed.len().max(compressed_len), 0);
{
re_tracing::profile_scope!("read compressed");
if let Err(err) =
self.read.read_exact(&mut self.compressed[..compressed_len])
{
return Some(Err(DecodeError::Read(err)));
}
}
re_tracing::profile_scope!("lz4");
if let Err(err) = lz4_flex::block::decompress_into(
&self.compressed[..compressed_len],
&mut self.uncompressed[..uncompressed_len],
) {
return Some(Err(DecodeError::Lz4(err)));
}
self.size_bytes += compressed_len as u64;
}
}
let data = &self.uncompressed[..uncompressed_len];
{
re_tracing::profile_scope!("MsgPack deser");
match rmp_serde::from_slice::<LogMsg>(data) {
Ok(msg) => Some(msg),
Err(err) => return Some(Err(err.into())),
}
}
}
MessageHeader::EndOfStream => None,
}
}
};
let Some(mut msg) = msg else {
if self.peek_file_header() {
re_log::debug!(
"Reached end of stream, but it seems we have a concatenated file, continuing"
);
return self.next();
}
re_log::debug!("Reached end of stream, iterator complete");
return None;
};
if let LogMsg::SetStoreInfo(msg) = &mut msg {
msg.info.store_version = Some(self.version());
}
Some(Ok(msg))
}
}
#[cfg(all(test, feature = "decoder", feature = "encoder"))]
mod tests {
#![allow(clippy::unwrap_used)] use super::*;
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_types::{
ApplicationId, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
};
fn fake_log_messages() -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Blueprint);
vec![
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *RowId::new(),
info: StoreInfo {
application_id: ApplicationId("test".to_owned()),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: true,
started: Time::now(),
store_source: StoreSource::RustSdk {
rustc_version: String::new(),
llvm_version: String::new(),
},
store_version: Some(CrateVersion::LOCAL),
},
}),
LogMsg::ArrowMsg(
store_id.clone(),
re_chunk::Chunk::builder("test_entity".into())
.with_archetype(
re_chunk::RowId::new(),
re_log_types::TimePoint::default().with(
re_log_types::Timeline::new_sequence("blueprint"),
re_log_types::TimeInt::from_milliseconds(re_log_types::NonMinI64::MIN),
),
&re_types::blueprint::archetypes::Background::new(
re_types::blueprint::components::BackgroundKind::SolidColor,
)
.with_color([255, 0, 0]),
)
.build()
.unwrap()
.to_arrow_msg()
.unwrap(),
),
LogMsg::BlueprintActivationCommand(re_log_types::BlueprintActivationCommand {
blueprint_id: store_id,
make_active: true,
make_default: true,
}),
]
}
fn clear_arrow_extension_metadata(messages: &mut Vec<LogMsg>) {
for msg in messages {
if let LogMsg::ArrowMsg(_, arrow_msg) = msg {
for field in &mut arrow_msg.schema.fields {
field
.metadata
.retain(|k, _| !k.starts_with("ARROW:extension"));
}
}
}
}
#[test]
fn test_encode_decode() {
let rrd_version = CrateVersion::LOCAL;
let messages = fake_log_messages();
let options = [
EncodingOptions {
compression: Compression::Off,
serializer: Serializer::MsgPack,
},
EncodingOptions {
compression: Compression::LZ4,
serializer: Serializer::MsgPack,
},
EncodingOptions {
compression: Compression::Off,
serializer: Serializer::Protobuf,
},
EncodingOptions {
compression: Compression::LZ4,
serializer: Serializer::Protobuf,
},
];
for options in options {
let mut file = vec![];
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut file)
.unwrap();
let mut decoded_messages = Decoder::new(VersionPolicy::Error, &mut file.as_slice())
.unwrap()
.collect::<Result<Vec<LogMsg>, DecodeError>>()
.unwrap();
clear_arrow_extension_metadata(&mut decoded_messages);
assert_eq!(messages, decoded_messages);
}
}
#[test]
fn test_concatenated_streams() {
let options = [
EncodingOptions {
compression: Compression::Off,
serializer: Serializer::MsgPack,
},
EncodingOptions {
compression: Compression::LZ4,
serializer: Serializer::MsgPack,
},
EncodingOptions {
compression: Compression::Off,
serializer: Serializer::Protobuf,
},
EncodingOptions {
compression: Compression::LZ4,
serializer: Serializer::Protobuf,
},
];
for options in options {
println!("{options:?}");
let mut data = vec![];
let messages = fake_log_messages();
let writer = std::io::Cursor::new(&mut data);
let mut encoder1 =
crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
for message in &messages {
encoder1.append(message).unwrap();
}
encoder1.finish().unwrap();
let written = data.len() as u64;
let mut writer = std::io::Cursor::new(&mut data);
writer.set_position(written);
let mut encoder2 =
crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
for message in &messages {
encoder2.append(message).unwrap();
}
encoder2.finish().unwrap();
let decoder = Decoder::new_concatenated(
VersionPolicy::Error,
std::io::BufReader::new(data.as_slice()),
)
.unwrap();
let mut decoded_messages = decoder.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
clear_arrow_extension_metadata(&mut decoded_messages);
assert_eq!([messages.clone(), messages].concat(), decoded_messages);
}
}
}