1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use std::fmt::Display;

use re_log_encoding::protobuf_conversions::log_msg_from_proto;
use re_log_types::LogMsg;
use re_protos::sdk_comms::v0::message_proxy_client::MessageProxyClient;
use re_protos::sdk_comms::v0::Empty;
use tokio_stream::StreamExt;
use url::Url;

use crate::StreamError;
use crate::TonicStatusError;

pub fn stream(
    url: String,
    on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<re_smart_channel::Receiver<LogMsg>, InvalidMessageProxyAddress> {
    re_log::debug!("Loading {url} via gRPC…");

    let parsed_url = MessageProxyAddress::parse(&url)?;

    let (tx, rx) = re_smart_channel::smart_channel(
        re_smart_channel::SmartMessageSource::MessageProxy { url: url.clone() },
        re_smart_channel::SmartChannelSource::MessageProxy { url: url.clone() },
    );

    crate::spawn_future(async move {
        if let Err(err) = stream_async(parsed_url, tx, on_msg).await {
            re_log::error!(
                "Error while streaming from {url}: {}",
                re_error::format_ref(&err)
            );
        }
    });

    Ok(rx)
}

struct MessageProxyAddress(String);

impl MessageProxyAddress {
    fn parse(url: &str) -> Result<Self, InvalidMessageProxyAddress> {
        let Some(url) = url.strip_prefix("temp") else {
            let scheme = url.split_once("://").map(|(a, _)| a).ok_or("unknown");
            return Err(InvalidMessageProxyAddress {
                url: url.to_owned(),
                msg: format!(
                    "Invalid scheme {scheme:?}, expected {:?}",
                    // TODO(#8761): URL prefix
                    "temp"
                ),
            });
        };
        let url = format!("http{url}");

        let _ = Url::parse(&url).map_err(|err| InvalidMessageProxyAddress {
            url: url.clone(),
            msg: err.to_string(),
        })?;

        Ok(Self(url))
    }

    fn to_http(&self) -> String {
        self.0.clone()
    }
}

impl Display for MessageProxyAddress {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        Display::fmt(&self.0, f)
    }
}

#[derive(Debug, thiserror::Error)]
#[error("invalid message proxy address {url:?}: {msg}")]
pub struct InvalidMessageProxyAddress {
    pub url: String,
    pub msg: String,
}

async fn stream_async(
    url: MessageProxyAddress,
    tx: re_smart_channel::Sender<LogMsg>,
    on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<(), StreamError> {
    let mut client = {
        let url = url.to_http();

        #[cfg(target_arch = "wasm32")]
        let tonic_client = {
            tonic_web_wasm_client::Client::new_with_options(
                url,
                tonic_web_wasm_client::options::FetchOptions::new(),
            )
        };

        #[cfg(not(target_arch = "wasm32"))]
        let tonic_client = { tonic::transport::Endpoint::new(url)?.connect().await? };

        // TODO(#8411): figure out the right size for this
        MessageProxyClient::new(tonic_client).max_decoding_message_size(usize::MAX)
    };

    re_log::debug!("Streaming messages from gRPC endpoint {url}");

    let mut stream = client
        .read_messages(Empty {})
        .await
        .map_err(TonicStatusError)?
        .into_inner();

    loop {
        match stream.try_next().await {
            Ok(Some(msg)) => {
                let msg = log_msg_from_proto(msg)?;
                if tx.send(msg).is_err() {
                    re_log::debug!("gRPC stream smart channel closed");
                    break;
                }
                if let Some(on_msg) = &on_msg {
                    on_msg();
                }
            }

            // Stream closed
            Ok(None) => {
                re_log::debug!("gRPC stream disconnected");
                break;
            }

            Err(_) => {
                re_log::debug!("gRPC stream timed out");
                break;
            }
        }
    }

    Ok(())
}