use std::sync::{atomic::Ordering::Relaxed, Arc};
use web_time::Instant;
use crate::{SendError, SharedStats, SmartMessage, SmartMessagePayload, SmartMessageSource};
#[derive(Clone)]
pub struct Sender<T: Send> {
tx: crossbeam::channel::Sender<SmartMessage<T>>,
source: Arc<SmartMessageSource>,
stats: Arc<SharedStats>,
}
impl<T: Send> Sender<T> {
pub(crate) fn new(
tx: crossbeam::channel::Sender<SmartMessage<T>>,
source: Arc<SmartMessageSource>,
stats: Arc<SharedStats>,
) -> Self {
Self { tx, source, stats }
}
pub fn clone_as(&self, source: SmartMessageSource) -> Self {
Self {
tx: self.tx.clone(),
source: Arc::new(source),
stats: Arc::clone(&self.stats),
}
}
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.send_at(
Instant::now(),
Arc::clone(&self.source),
SmartMessagePayload::Msg(msg),
)
.map_err(|SendError(msg)| match msg {
SmartMessagePayload::Msg(msg) => SendError(msg),
SmartMessagePayload::Flush { .. } | SmartMessagePayload::Quit(_) => unreachable!(),
})
}
pub fn send_at(
&self,
time: Instant,
source: Arc<SmartMessageSource>,
payload: SmartMessagePayload<T>,
) -> Result<(), SendError<SmartMessagePayload<T>>> {
debug_assert!(!matches!(*source, SmartMessageSource::Unknown));
self.tx
.send(SmartMessage {
time,
source,
payload,
})
.map_err(|SendError(msg)| SendError(msg.payload))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn flush_blocking(&self) -> Result<(), SendError<()>> {
let (tx, rx) = std::sync::mpsc::sync_channel(0); self.tx
.send(SmartMessage {
time: Instant::now(),
source: Arc::clone(&self.source),
payload: SmartMessagePayload::Flush {
on_flush_done: Box::new(move || {
tx.send(()).ok();
}),
},
})
.map_err(|_ignored| SendError(()))?;
rx.recv().map_err(|_ignored| SendError(()))
}
pub fn quit(
&self,
err: Option<Box<dyn std::error::Error + Send>>,
) -> Result<(), SendError<SmartMessage<T>>> {
debug_assert!(!matches!(*self.source, SmartMessageSource::Unknown));
self.tx.send(SmartMessage {
time: Instant::now(),
source: Arc::clone(&self.source),
payload: SmartMessagePayload::Quit(err),
})
}
#[inline]
pub fn is_empty(&self) -> bool {
self.tx.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.tx.len()
}
pub fn latency_ns(&self) -> u64 {
self.stats.latency_ns.load(Relaxed)
}
pub fn latency_sec(&self) -> f32 {
self.latency_ns() as f32 / 1e9
}
}