1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use super::{MessageHeader, MessageKind};
use crate::codec::arrow::decode_arrow;
use crate::codec::CodecError;
use crate::decoder::DecodeError;
use re_log_types::LogMsg;
use re_protos::missing_field;

pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMsg>), DecodeError> {
    use re_protos::external::prost::Message;
    use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

    let mut read_bytes = 0u64;
    let header = MessageHeader::decode(data)?;
    read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len;

    let mut buf = vec![0; header.len as usize];
    data.read_exact(&mut buf[..])?;

    let msg = match header.kind {
        MessageKind::SetStoreInfo => {
            let set_store_info = SetStoreInfo::decode(&buf[..])?;
            Some(LogMsg::SetStoreInfo(set_store_info.try_into()?))
        }
        MessageKind::ArrowMsg => {
            let arrow_msg = ArrowMsg::decode(&buf[..])?;
            if arrow_msg.encoding() != Encoding::ArrowIpc {
                return Err(DecodeError::Codec(CodecError::UnsupportedEncoding));
            }

            let (schema, chunk) = decode_arrow(
                &arrow_msg.payload,
                arrow_msg.uncompressed_size as usize,
                arrow_msg.compression().into(),
            )?;

            let store_id: re_log_types::StoreId = arrow_msg
                .store_id
                .ok_or_else(|| missing_field!(re_protos::log_msg::v0::ArrowMsg, "store_id"))?
                .into();

            let chunk = re_chunk::Chunk::from_transport(&re_chunk::TransportChunk {
                schema,
                data: chunk,
            })?;

            Some(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
        }
        MessageKind::BlueprintActivationCommand => {
            let blueprint_activation_command = BlueprintActivationCommand::decode(&buf[..])?;
            Some(LogMsg::BlueprintActivationCommand(
                blueprint_activation_command.try_into()?,
            ))
        }
        MessageKind::End => None,
    };

    Ok((read_bytes, msg))
}