use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use crossbeam::channel::{unbounded, Receiver, Sender};
use super::{AsyncDecoder, Chunk, Frame, OutputCallback, Result};
enum Command {
Chunk(Chunk),
Reset,
Stop,
}
#[derive(Clone)]
struct Comms {
should_stop: Arc<AtomicBool>,
num_outstanding_resets: Arc<AtomicU64>,
}
impl Default for Comms {
fn default() -> Self {
Self {
should_stop: Arc::new(AtomicBool::new(false)),
num_outstanding_resets: Arc::new(AtomicU64::new(0)),
}
}
}
#[cfg(with_dav1d)]
pub trait SyncDecoder {
fn submit_chunk(
&mut self,
should_stop: &std::sync::atomic::AtomicBool,
chunk: Chunk,
on_output: &OutputCallback,
);
fn reset(&mut self) {}
}
pub struct AsyncDecoderWrapper {
_thread: std::thread::JoinHandle<()>,
command_tx: Sender<Command>,
comms: Comms,
}
impl AsyncDecoderWrapper {
pub fn new(
debug_name: String,
mut sync_decoder: Box<dyn SyncDecoder + Send>,
on_output: impl Fn(Result<Frame>) + Send + Sync + 'static,
) -> Self {
re_tracing::profile_function!();
let (command_tx, command_rx) = unbounded();
let comms = Comms::default();
let thread = std::thread::Builder::new()
.name(format!("decoder of {debug_name}"))
.spawn({
let comms = comms.clone();
move || {
econtext::econtext_data!("Video", debug_name.clone());
decoder_thread(sync_decoder.as_mut(), &comms, &command_rx, &on_output);
re_log::debug!("Closing decoder thread for {debug_name}");
}
})
.expect("failed to spawn decoder thread");
Self {
_thread: thread,
command_tx,
comms,
}
}
}
impl AsyncDecoder for AsyncDecoderWrapper {
fn submit_chunk(&mut self, chunk: Chunk) -> Result<()> {
re_tracing::profile_function!();
self.command_tx.send(Command::Chunk(chunk)).ok();
Ok(())
}
fn reset(&mut self) -> Result<()> {
re_tracing::profile_function!();
self.comms
.num_outstanding_resets
.fetch_add(1, Ordering::Release);
self.command_tx.send(Command::Reset).ok();
Ok(())
}
}
impl Drop for AsyncDecoderWrapper {
fn drop(&mut self) {
re_tracing::profile_function!();
self.comms.should_stop.store(true, Ordering::Release);
self.command_tx.send(Command::Stop).ok();
}
}
fn decoder_thread(
decoder: &mut dyn SyncDecoder,
comms: &Comms,
command_rx: &Receiver<Command>,
on_output: &OutputCallback,
) {
#![allow(clippy::debug_assert_with_mut_call)]
while let Ok(command) = command_rx.recv() {
if comms.should_stop.load(Ordering::Acquire) {
re_log::debug!("Should stop");
return;
}
let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire);
match command {
Command::Chunk(chunk) => {
if !has_outstanding_reset {
decoder.submit_chunk(&comms.should_stop, chunk, on_output);
}
}
Command::Reset => {
decoder.reset();
comms.num_outstanding_resets.fetch_sub(1, Ordering::Release);
}
Command::Stop => {
re_log::debug!("Stop");
return;
}
}
}
re_log::debug!("Disconnected");
}