1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
use std::{fmt, net::SocketAddr, thread::JoinHandle};

use crossbeam::channel::{select, Receiver, Sender};

use re_log_types::LogMsg;

#[derive(Debug, PartialEq, Eq)]
struct FlushedMsg;

/// Sent to prematurely quit (before flushing).
#[derive(Debug, PartialEq, Eq)]
struct QuitMsg;

/// Sent to prematurely quit (before flushing).
#[derive(Debug, PartialEq, Eq)]
enum InterruptMsg {
    /// Switch to a mode where we drop messages if disconnected.
    ///
    /// Sending this before a flush ensures we won't get stuck trying to send
    /// messages to a closed endpoint, but we will still send all messages to an open endpoint.
    DropIfDisconnected,

    /// Quite immediately, dropping any unsent message.
    Quit,
}

enum MsgMsg {
    LogMsg(LogMsg),
    Flush,
}

enum PacketMsg {
    Packet(Vec<u8>),
    Flush,
}

/// Send [`LogMsg`]es to a server over TCP.
///
/// The messages are encoded and sent on separate threads
/// so that calling [`Client::send`] is non-blocking.
pub struct Client {
    msg_tx: Sender<MsgMsg>,
    flushed_rx: Receiver<FlushedMsg>,
    encode_quit_tx: Sender<QuitMsg>,
    send_quit_tx: Sender<InterruptMsg>,
    encode_join: Option<JoinHandle<()>>,
    send_join: Option<JoinHandle<()>>,

    /// Only used for diagnostics, not for communication after `new()`.
    addr: SocketAddr,
}

impl Client {
    /// Connect via TCP to this log server.
    ///
    /// `flush_timeout` is the minimum time the `TcpClient` will wait during a
    /// flush before potentially dropping data. Note: Passing `None` here can
    /// cause a call to `flush` to block indefinitely if a connection cannot be
    /// established.
    pub fn new(addr: SocketAddr, flush_timeout: Option<std::time::Duration>) -> Self {
        re_log::debug!("Connecting to remote {addr}…");

        // TODO(emilk): keep track of how much memory is in each pipe
        // and apply back-pressure to not use too much RAM.
        let (msg_tx, msg_rx) = crossbeam::channel::unbounded();
        let (packet_tx, packet_rx) = crossbeam::channel::unbounded();
        let (flushed_tx, flushed_rx) = crossbeam::channel::unbounded();
        let (encode_quit_tx, encode_quit_rx) = crossbeam::channel::unbounded();
        let (send_quit_tx, send_quit_rx) = crossbeam::channel::unbounded();

        // We don't compress the stream because we assume the SDK
        // and server are on the same machine and compression
        // can be expensive, see https://github.com/rerun-io/rerun/issues/2216
        let encoding_options = re_log_encoding::EncodingOptions::UNCOMPRESSED;

        let encode_join = std::thread::Builder::new()
            .name("msg_encoder".into())
            .spawn(move || {
                msg_encode(encoding_options, &msg_rx, &encode_quit_rx, &packet_tx);
            })
            .expect("Failed to spawn thread");

        let send_join = std::thread::Builder::new()
            .name("tcp_sender".into())
            .spawn(move || {
                tcp_sender(addr, flush_timeout, &packet_rx, &send_quit_rx, &flushed_tx);
            })
            .expect("Failed to spawn thread");

        Self {
            msg_tx,
            flushed_rx,
            encode_quit_tx,
            send_quit_tx,
            encode_join: Some(encode_join),
            send_join: Some(send_join),
            addr,
        }
    }

    pub fn send(&self, log_msg: LogMsg) {
        self.send_msg_msg(MsgMsg::LogMsg(log_msg));
    }

    /// Stall until all messages so far has been sent.
    pub fn flush(&self) {
        re_log::debug!("Flushing message queue…");
        if self.msg_tx.send(MsgMsg::Flush).is_err() {
            re_log::debug!("Flush failed: already shut down.");
            return;
        }

        match self.flushed_rx.recv() {
            Ok(FlushedMsg) => {
                re_log::debug!("Flush complete.");
            }
            Err(_) => {
                // This can happen on Ctrl-C
                re_log::warn!("Failed to flush pipeline - not all messages were sent.");
            }
        }
    }

    /// Switch to a mode where we drop messages if disconnected.
    ///
    /// Calling this before a flush (or drop) ensures we won't get stuck trying to send
    /// messages to a closed endpoint, but we will still send all messages to an open endpoint.
    pub fn drop_if_disconnected(&self) {
        self.send_quit_tx
            .send(InterruptMsg::DropIfDisconnected)
            .ok();
    }

    fn send_msg_msg(&self, msg: MsgMsg) {
        // ignoring errors, because Ctrl-C can shut down the receiving end.
        self.msg_tx.send(msg).ok();
    }
}

impl Drop for Client {
    /// Wait until everything has been sent.
    fn drop(&mut self) {
        re_log::debug!("Shutting down the client connection…");
        self.flush();
        // First shut down the encoder:
        self.encode_quit_tx.send(QuitMsg).ok();
        self.encode_join.take().map(|j| j.join().ok());
        // Then the other threads:
        self.send_quit_tx.send(InterruptMsg::Quit).ok();
        self.send_join.take().map(|j| j.join().ok());
        re_log::debug!("TCP client has shut down.");
    }
}

impl fmt::Debug for Client {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // The other fields are all channels and join handles, so they are not usefully printable.
        f.debug_struct("Client")
            .field("addr", &self.addr)
            .finish_non_exhaustive()
    }
}

fn msg_encode(
    encoding_options: re_log_encoding::EncodingOptions,
    msg_rx: &Receiver<MsgMsg>,
    quit_rx: &Receiver<QuitMsg>,
    packet_tx: &Sender<PacketMsg>,
) {
    loop {
        select! {
            recv(msg_rx) -> msg_msg => {
                let Ok(msg_msg) = msg_msg else {
                    re_log::debug!("Shutting down msg_encode thread: channel has closed");
                    return; // channel has closed
                };

                let packet_msg = match &msg_msg {
                    MsgMsg::LogMsg(log_msg) => {
                        match re_log_encoding::encoder::encode_to_bytes(
                            re_build_info::CrateVersion::LOCAL,
                            encoding_options, std::iter::once(log_msg),
                        ) {
                            Ok(packet) => {
                                re_log::trace!("Encoded message of size {}", packet.len());
                                Some(PacketMsg::Packet(packet))
                            }
                            Err(err) => {
                                re_log::error_once!("Failed to encode log message: {err}");
                                None
                            }
                        }
                    }
                    MsgMsg::Flush => Some(PacketMsg::Flush),
                };

                if let Some(packet_msg) = packet_msg {
                    if packet_tx.send(packet_msg).is_err() {
                        re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition.");
                        return;
                    }
                }
            }
            recv(quit_rx) -> _quit_msg => {
                re_log::debug!("Shutting down msg_encode thread: quit received");
                return;
            }
        }
    }
}

fn tcp_sender(
    addr: SocketAddr,
    flush_timeout: Option<std::time::Duration>,
    packet_rx: &Receiver<PacketMsg>,
    quit_rx: &Receiver<InterruptMsg>,
    flushed_tx: &Sender<FlushedMsg>,
) {
    let mut tcp_client = crate::tcp_client::TcpClient::new(addr, flush_timeout);
    // Once this flag has been set, we will drop all messages if the tcp_client is
    // no longer connected.
    let mut drop_if_disconnected = false;

    loop {
        select! {
            recv(packet_rx) -> packet_msg => {
                if let Ok(packet_msg) = packet_msg {
                    match packet_msg {
                        PacketMsg::Packet(packet) => {
                            match send_until_success(&mut tcp_client, drop_if_disconnected, &packet, quit_rx) {
                                Some(InterruptMsg::Quit) => {return;}
                                Some(InterruptMsg::DropIfDisconnected) => {
                                    drop_if_disconnected = true;
                                }
                                None => {}
                            }
                        }
                        PacketMsg::Flush => {
                            tcp_client.flush();
                            flushed_tx
                                .send(FlushedMsg)
                                .expect("Main thread should still be alive");
                        }
                    }
                } else {
                    re_log::debug!("Shutting down tcp_sender thread: packet_rx channel has closed");
                    return; // channel has closed
                }
            },
            recv(quit_rx) -> quit_msg => { match quit_msg {
                // Don't terminate on receiving a `DropIfDisconnected`. It's a soft-quit that allows
                // us to flush the pipeline.
                Ok(InterruptMsg::DropIfDisconnected) => {
                    drop_if_disconnected = true;
                }
                Ok(InterruptMsg::Quit) => {
                    re_log::debug!("Shutting down tcp_sender thread: received Quit message");
                    return;
                }
                Err(_) => {
                    re_log::debug!("Shutting down tcp_sender thread: quit_rx channel has closed");
                    return;
                }
            }}
        }
    }
}

fn send_until_success(
    tcp_client: &mut crate::tcp_client::TcpClient,
    drop_if_disconnected: bool,
    packet: &[u8],
    quit_rx: &Receiver<InterruptMsg>,
) -> Option<InterruptMsg> {
    // Early exit if tcp_client is disconnected
    if drop_if_disconnected && tcp_client.has_timed_out_for_flush() {
        re_log::warn_once!("Dropping messages because tcp client has timed out.");
        return None;
    }

    if let Err(err) = tcp_client.send(packet) {
        if drop_if_disconnected && tcp_client.has_timed_out_for_flush() {
            re_log::warn_once!("Dropping messages because tcp client has timed out.");
            return None;
        }
        // If this is the first time we fail to send the message, produce a warning.
        re_log::debug!("Failed to send message: {err}");

        let mut attempts = 1;
        let mut sleep_ms = 100;

        loop {
            select! {
                recv(quit_rx) -> _quit_msg => {
                    re_log::warn_once!("Dropping messages because tcp client has timed out or quitting.");
                    return Some(_quit_msg.unwrap_or(InterruptMsg::Quit));
                }
                default(std::time::Duration::from_millis(sleep_ms)) => {
                    if let Err(new_err) = tcp_client.send(packet) {
                        attempts += 1;
                        if attempts == 3 {
                            re_log::warn!("Failed to send message after {attempts} attempts: {err}");
                        }

                        if drop_if_disconnected && tcp_client.has_timed_out_for_flush() {
                            re_log::warn_once!("Dropping messages because tcp client has timed out.");
                            return None;
                        }

                        const MAX_SLEEP_MS : u64 = 3000;

                        sleep_ms = (sleep_ms * 2).min(MAX_SLEEP_MS);

                        // Only produce subsequent warnings once we've saturated the back-off
                        if sleep_ms == MAX_SLEEP_MS && new_err.to_string() != err.to_string() {
                            re_log::warn!("Still failing to send message after {attempts} attempts: {err}");
                        }
                    } else {
                        return None;
                    }
                }
            }
        }
    } else {
        None
    }
}