1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
impl From<re_protos::log_msg::v0::Compression> for crate::Compression {
    fn from(value: re_protos::log_msg::v0::Compression) -> Self {
        match value {
            re_protos::log_msg::v0::Compression::None => Self::Off,
            re_protos::log_msg::v0::Compression::Lz4 => Self::LZ4,
        }
    }
}

impl From<crate::Compression> for re_protos::log_msg::v0::Compression {
    fn from(value: crate::Compression) -> Self {
        match value {
            crate::Compression::Off => Self::None,
            crate::Compression::LZ4 => Self::Lz4,
        }
    }
}

#[cfg(feature = "decoder")]
pub fn log_msg_from_proto(
    message: re_protos::log_msg::v0::LogMsg,
) -> Result<re_log_types::LogMsg, crate::decoder::DecodeError> {
    use crate::codec::{arrow::decode_arrow, CodecError};
    use crate::decoder::DecodeError;
    use re_protos::{
        log_msg::v0::{log_msg::Msg, Encoding},
        missing_field,
    };

    match message.msg {
        Some(Msg::SetStoreInfo(set_store_info)) => Ok(re_log_types::LogMsg::SetStoreInfo(
            set_store_info.try_into()?,
        )),
        Some(Msg::ArrowMsg(arrow_msg)) => {
            if arrow_msg.encoding() != Encoding::ArrowIpc {
                return Err(DecodeError::Codec(CodecError::UnsupportedEncoding));
            }

            let batch = decode_arrow(
                &arrow_msg.payload,
                arrow_msg.uncompressed_size as usize,
                arrow_msg.compression().into(),
            )?;

            let store_id: re_log_types::StoreId = arrow_msg
                .store_id
                .ok_or_else(|| missing_field!(re_protos::log_msg::v0::ArrowMsg, "store_id"))?
                .into();

            let chunk = re_chunk::Chunk::from_record_batch(batch)?;

            Ok(re_log_types::LogMsg::ArrowMsg(
                store_id,
                chunk.to_arrow_msg()?,
            ))
        }
        Some(Msg::BlueprintActivationCommand(blueprint_activation_command)) => {
            Ok(re_log_types::LogMsg::BlueprintActivationCommand(
                blueprint_activation_command.try_into()?,
            ))
        }
        None => Err(missing_field!(re_protos::log_msg::v0::LogMsg, "msg").into()),
    }
}

#[cfg(feature = "encoder")]
pub fn log_msg_to_proto(
    message: re_log_types::LogMsg,
    compression: crate::Compression,
) -> Result<re_protos::log_msg::v0::LogMsg, crate::encoder::EncodeError> {
    use crate::codec::arrow::encode_arrow;
    use re_protos::log_msg::v0::{
        ArrowMsg, BlueprintActivationCommand, LogMsg as ProtoLogMsg, SetStoreInfo,
    };

    let proto_msg = match message {
        re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
            let set_store_info: SetStoreInfo = set_store_info.into();
            ProtoLogMsg {
                msg: Some(re_protos::log_msg::v0::log_msg::Msg::SetStoreInfo(
                    set_store_info,
                )),
            }
        }
        re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
            let payload = encode_arrow(&arrow_msg.batch, compression)?;
            let arrow_msg = ArrowMsg {
                store_id: Some(store_id.into()),
                compression: match compression {
                    crate::Compression::Off => re_protos::log_msg::v0::Compression::None as i32,
                    crate::Compression::LZ4 => re_protos::log_msg::v0::Compression::Lz4 as i32,
                },
                uncompressed_size: payload.uncompressed_size as i32,
                encoding: re_protos::log_msg::v0::Encoding::ArrowIpc as i32,
                payload: payload.data,
            };
            ProtoLogMsg {
                msg: Some(re_protos::log_msg::v0::log_msg::Msg::ArrowMsg(arrow_msg)),
            }
        }
        re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
            let blueprint_activation_command: BlueprintActivationCommand =
                blueprint_activation_command.into();
            ProtoLogMsg {
                msg: Some(
                    re_protos::log_msg::v0::log_msg::Msg::BlueprintActivationCommand(
                        blueprint_activation_command,
                    ),
                ),
            }
        }
    };

    Ok(proto_msg)
}