1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
use std::sync::{
    atomic::{AtomicBool, AtomicU64, Ordering},
    Arc,
};

use crossbeam::channel::{unbounded, Receiver, Sender};

use super::{AsyncDecoder, Chunk, Frame, OutputCallback, Result};

enum Command {
    Chunk(Chunk),
    Reset,
    Stop,
}

#[derive(Clone)]
struct Comms {
    /// Set when it is time to die
    should_stop: Arc<AtomicBool>,

    /// Incremented on each call to [`AsyncDecoder::reset`].
    /// Decremented each time the decoder thread receives [`Command::Reset`].
    num_outstanding_resets: Arc<AtomicU64>,
}

impl Default for Comms {
    fn default() -> Self {
        Self {
            should_stop: Arc::new(AtomicBool::new(false)),
            num_outstanding_resets: Arc::new(AtomicU64::new(0)),
        }
    }
}

/// Blocking decoder of video chunks.
#[cfg(with_dav1d)]
pub trait SyncDecoder {
    /// Submit some work and read the results.
    ///
    /// Stop early if `should_stop` is `true` or turns `true`.
    fn submit_chunk(
        &mut self,
        should_stop: &std::sync::atomic::AtomicBool,
        chunk: Chunk,
        on_output: &OutputCallback,
    );

    /// Clear and reset everything
    fn reset(&mut self) {}
}

/// Runs a [`SyncDecoder`] in a background thread, for non-blocking video decoding.
pub struct AsyncDecoderWrapper {
    /// Where the decoding happens
    _thread: std::thread::JoinHandle<()>,

    /// Commands sent to the decoder thread.
    command_tx: Sender<Command>,

    /// Instant communication to the decoder thread (circumventing the command queue).
    comms: Comms,
}

impl AsyncDecoderWrapper {
    pub fn new(
        debug_name: String,
        mut sync_decoder: Box<dyn SyncDecoder + Send>,
        on_output: impl Fn(Result<Frame>) + Send + Sync + 'static,
    ) -> Self {
        re_tracing::profile_function!();

        let (command_tx, command_rx) = unbounded();
        let comms = Comms::default();

        let thread = std::thread::Builder::new()
            .name(format!("decoder of {debug_name}"))
            .spawn({
                let comms = comms.clone();
                move || {
                    econtext::econtext_data!("Video", debug_name.clone());

                    decoder_thread(sync_decoder.as_mut(), &comms, &command_rx, &on_output);
                    re_log::debug!("Closing decoder thread for {debug_name}");
                }
            })
            .expect("failed to spawn decoder thread");

        Self {
            _thread: thread,
            command_tx,
            comms,
        }
    }
}

impl AsyncDecoder for AsyncDecoderWrapper {
    // NOTE: The interface is all `&mut self` to avoid certain types of races.
    fn submit_chunk(&mut self, chunk: Chunk) -> Result<()> {
        re_tracing::profile_function!();
        self.command_tx.send(Command::Chunk(chunk)).ok();

        Ok(())
    }

    /// Resets the decoder.
    ///
    /// This does not block, all chunks sent to `decode` before this point will be discarded.
    // NOTE: The interface is all `&mut self` to avoid certain types of races.
    fn reset(&mut self) -> Result<()> {
        re_tracing::profile_function!();

        // Increment resets first…
        self.comms
            .num_outstanding_resets
            .fetch_add(1, Ordering::Release);

        // …so it is visible on the decoder thread when it gets the `Reset` command.
        self.command_tx.send(Command::Reset).ok();

        Ok(())
    }
}

impl Drop for AsyncDecoderWrapper {
    fn drop(&mut self) {
        re_tracing::profile_function!();

        // Set `should_stop` first…
        self.comms.should_stop.store(true, Ordering::Release);

        // …so it is visible on the decoder thread when it gets the `Stop` command.
        self.command_tx.send(Command::Stop).ok();

        // NOTE: we don't block here. The decoder thread will finish soon enough.
    }
}

fn decoder_thread(
    decoder: &mut dyn SyncDecoder,
    comms: &Comms,
    command_rx: &Receiver<Command>,
    on_output: &OutputCallback,
) {
    #![allow(clippy::debug_assert_with_mut_call)]

    while let Ok(command) = command_rx.recv() {
        if comms.should_stop.load(Ordering::Acquire) {
            re_log::debug!("Should stop");
            return;
        }

        // If we're waiting for a reset we should ignore all other commands until we receive it.
        let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire);

        match command {
            Command::Chunk(chunk) => {
                if !has_outstanding_reset {
                    decoder.submit_chunk(&comms.should_stop, chunk, on_output);
                }
            }
            Command::Reset => {
                decoder.reset();
                comms.num_outstanding_resets.fetch_sub(1, Ordering::Release);
            }
            Command::Stop => {
                re_log::debug!("Stop");
                return;
            }
        }
    }

    re_log::debug!("Disconnected");
}