1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
use std::cell::RefCell;
use std::ops::ControlFlow;
use std::sync::Arc;

use re_log::ResultExt as _;
use re_log_types::LogMsg;

/// Stream an rrd file from a HTTP server.
///
/// If `follow_if_http` is `true`, and the url is an HTTP source, the viewer will open the stream
/// in `Following` mode rather than `Playing` mode.
///
/// `on_msg` can be used to wake up the UI thread on Wasm.
pub fn stream_rrd_from_http_to_channel(
    url: String,
    follow: bool,
    on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> re_smart_channel::Receiver<LogMsg> {
    let (tx, rx) = re_smart_channel::smart_channel(
        re_smart_channel::SmartMessageSource::RrdHttpStream { url: url.clone() },
        re_smart_channel::SmartChannelSource::RrdHttpStream {
            url: url.clone(),
            follow,
        },
    );
    stream_rrd_from_http(
        url.clone(),
        Arc::new(move |msg| {
            if let Some(on_msg) = &on_msg {
                on_msg();
            }
            match msg {
                HttpMessage::LogMsg(msg) => {
                    if tx.send(msg).is_ok() {
                        ControlFlow::Continue(())
                    } else {
                        re_log::info_once!("Closing connection to {url}");
                        ControlFlow::Break(())
                    }
                }
                HttpMessage::Success => {
                    tx.quit(None).warn_on_err_once("Failed to send quit marker");
                    ControlFlow::Break(())
                }
                HttpMessage::Failure(err) => {
                    tx.quit(Some(err))
                        .warn_on_err_once("Failed to send quit marker");
                    ControlFlow::Break(())
                }
            }
        }),
    );
    rx
}

/// An intermediate message when decoding an rrd file fetched over HTTP.
pub enum HttpMessage {
    /// The next [`LogMsg`] in the decoding stream.
    LogMsg(LogMsg),

    /// Everything has been successfully decoded. End of stream.
    Success,

    /// Something went wrong. End of stream.
    Failure(Box<dyn std::error::Error + Send + Sync>),
}

pub type HttpMessageCallback = dyn Fn(HttpMessage) -> ControlFlow<()> + Send + Sync;

pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
    re_log::debug!("Downloading .rrd file from {url:?}…");

    ehttp::streaming::fetch(ehttp::Request::get(&url), {
        let version_policy = crate::decoder::VersionPolicy::Warn;
        let decoder = RefCell::new(StreamDecoder::new(version_policy));
        move |part| match part {
            Ok(part) => match part {
                ehttp::streaming::Part::Response(ehttp::PartialResponse {
                    ok,
                    status,
                    status_text,
                    ..
                }) => {
                    if ok {
                        re_log::debug!("Decoding .rrd file from {url:?}…");
                        ControlFlow::Continue(())
                    } else {
                        on_msg(HttpMessage::Failure(
                            format!("Failed to fetch .rrd file from {url}: {status} {status_text}")
                                .into(),
                        ))
                    }
                }
                ehttp::streaming::Part::Chunk(chunk) => {
                    if chunk.is_empty() {
                        re_log::debug!("Finished decoding .rrd file from {url:?}…");
                        return on_msg(HttpMessage::Success);
                    }

                    re_tracing::profile_scope!("decoding_rrd_stream");
                    decoder.borrow_mut().push_chunk(chunk);
                    loop {
                        match decoder.borrow_mut().try_read() {
                            Ok(message) => match message {
                                Some(message) => {
                                    // only return if the callback asks us to
                                    if on_msg(HttpMessage::LogMsg(message)).is_break() {
                                        return ControlFlow::Break(());
                                    }
                                }
                                None => return ControlFlow::Continue(()),
                            },
                            Err(err) => {
                                return on_msg(HttpMessage::Failure(
                                    format!("Failed to fetch .rrd file from {url}: {err}").into(),
                                ))
                            }
                        }
                    }
                }
            },
            Err(err) => on_msg(HttpMessage::Failure(
                format!("Failed to fetch .rrd file from {url}: {err}").into(),
            )),
        }
    });
}

#[cfg(target_arch = "wasm32")]
// TODO(#6330): remove unwrap()
#[allow(clippy::unwrap_used)]
mod web_event_listener {
    use super::HttpMessageCallback;
    use js_sys::Uint8Array;
    use std::sync::Arc;
    use wasm_bindgen::{closure::Closure, JsCast, JsValue};
    use web_sys::MessageEvent;

    /// Install an event-listener on `window` which will decode the incoming event as an rrd
    ///
    /// From javascript you can send an rrd using:
    /// ``` ignore
    /// var rrd = new Uint8Array(…); // Get an RRD from somewhere
    /// window.postMessage(rrd, "*")
    /// ```
    pub fn stream_rrd_from_event_listener(on_msg: Arc<HttpMessageCallback>) {
        let window = web_sys::window().expect("no global `window` exists");
        let closure = Closure::wrap(Box::new({
            move |event: JsValue| match event.dyn_into::<MessageEvent>() {
                Ok(message_event) => {
                    let uint8_array = Uint8Array::new(&message_event.data());
                    let result: Vec<u8> = uint8_array.to_vec();
                    crate::stream_rrd_from_http::decode_rrd(result, Arc::clone(&on_msg));
                }
                Err(js_val) => {
                    re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val);
                }
            }
        }) as Box<dyn FnMut(_)>);
        window
            .add_event_listener_with_callback("message", closure.as_ref().unchecked_ref())
            .unwrap();
        closure.forget();
    }
}

#[cfg(target_arch = "wasm32")]
pub use web_event_listener::stream_rrd_from_event_listener;

#[cfg(target_arch = "wasm32")]
// TODO(#6330): remove unwrap()
#[allow(clippy::unwrap_used)]
pub mod web_decode {
    use super::{HttpMessage, HttpMessageCallback};
    use std::sync::Arc;

    pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<HttpMessageCallback>) {
        wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg));
    }

    /// Decodes the file in chunks, with an yield between each chunk.
    ///
    /// This is cooperative multi-tasking.
    async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Arc<HttpMessageCallback>) {
        let mut last_yield = web_time::Instant::now();

        let version_policy = crate::decoder::VersionPolicy::Warn;
        match crate::decoder::Decoder::new(version_policy, rrd_bytes.as_slice()) {
            Ok(decoder) => {
                for msg in decoder {
                    match msg {
                        Ok(msg) => {
                            on_msg(HttpMessage::LogMsg(msg));
                        }
                        Err(err) => {
                            re_log::warn_once!("Failed to decode message: {err}");
                        }
                    }

                    on_msg(HttpMessage::Success);

                    if last_yield.elapsed() > web_time::Duration::from_millis(10) {
                        // yield to the ui task
                        yield_().await;
                        last_yield = web_time::Instant::now();
                    }
                }
            }
            Err(err) => {
                on_msg(HttpMessage::Failure(
                    format!("Failed to decode .rrd: {err}").into(),
                ));
            }
        }
    }

    // Yield to other tasks
    async fn yield_() {
        // TODO(emilk): create a better async yield function. See https://github.com/rustwasm/wasm-bindgen/issues/3359
        sleep_ms(1).await;
    }

    // Hack to get async sleep on wasm
    async fn sleep_ms(millis: i32) {
        let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
            web_sys::window()
                .unwrap()
                .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis)
                .expect("Failed to call set_timeout");
        };
        let p = js_sys::Promise::new(&mut cb);
        wasm_bindgen_futures::JsFuture::from(p).await.unwrap();
    }
}

#[cfg(target_arch = "wasm32")]
use web_decode::decode_rrd;

use crate::decoder::stream::StreamDecoder;