1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//! The external application that will be controlled by the extended viewer ui.

use core::f32;
use std::{
    f32::consts::{PI, TAU},
    net::ToSocketAddrs,
};

use custom_callback::comms::{app::ControlApp, protocol::Message};

use rerun::{
    external::{glam::Vec3, re_log},
    RecordingStream,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut app = ControlApp::bind("127.0.0.1:8888").await?.run();
    let rec = rerun::RecordingStreamBuilder::new("rerun_example_custom_callback")
        .connect_tcp_opts(
            "127.0.0.1:9877".to_socket_addrs().unwrap().next().unwrap(),
            None,
        )?;

    // Add a handler for incoming messages
    let add_rec = rec.clone();
    app.add_handler(move |msg| handle_message(&add_rec, msg))?;

    // spawn a task to log a point every 100ms
    // we then use a channel to control the point's position and radius using the control panel
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    let snake_handle = tokio::spawn(animated_snake(rx, rec));

    // Add a handler for dynamic updates
    app.add_handler(move |msg| handle_dynamic_update(tx.clone(), msg))?;

    // Keep the server running
    tokio::signal::ctrl_c().await?;
    re_log::info!("Shutting down");
    snake_handle.abort();

    Ok(())
}

fn handle_dynamic_update(tx: UnboundedSender<Message>, message: &Message) {
    if let Message::DynamicPosition { .. } = message {
        tx.send(message.clone()).expect("failed to send message");
    }
}

fn handle_message(rec: &RecordingStream, message: &Message) {
    match message {
        Message::Point3d {
            path,
            position,
            radius,
        } => rec.log(
            path.to_string(),
            &rerun::Points3D::new([position]).with_radii([*radius]),
        ),
        Message::Box3d {
            path,
            half_size,
            position,
        } => rec.log(
            path.to_string(),
            &rerun::Boxes3D::from_half_sizes([half_size]).with_centers([position]),
        ),
        Message::Disconnect => {
            re_log::info!("Client disconnected");
            Ok(())
        }
        _ => Ok(()),
    }
    .expect("failed to handle message");
}

async fn animated_snake(mut rx: UnboundedReceiver<Message>, rec: RecordingStream) {
    let mut current_radius = 0.1;
    let mut current_offset = 0.5;

    let mut t = 0.0_f32;
    loop {
        // update the position and radius
        if let Ok(Message::DynamicPosition { radius, offset }) = rx.try_recv() {
            // ensure these values are never zero
            current_offset = offset.max(0.01);
            current_radius = radius.max(0.01);
        }

        let num_spheres = ((PI * current_offset) / current_radius.max(f32::EPSILON)).max(1.);
        let theta = TAU / num_spheres;

        let total_spheres = ((num_spheres as usize) / 3).max(1);
        let mut points = Vec::with_capacity(total_spheres);
        t -= (total_spheres - 1) as f32 * theta;

        for _ in 0..total_spheres {
            let x = current_offset * t.cos();
            let y = current_offset * t.sin();
            let z = 0.0;

            points.push(Vec3::new(x, y, z));

            t += theta;
        }

        // log the point
        rec.log(
            "dynamic".to_string(),
            &rerun::Points3D::new(points).with_radii(vec![current_radius; num_spheres as usize]),
        )
        .expect("failed to log dynamic");

        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }
}