use std::sync::{atomic::AtomicU64, Arc};
use web_time::Instant;
pub use crossbeam::channel::{RecvError, RecvTimeoutError, SendError, TryRecvError};
mod receive_set;
mod receiver;
mod sender;
pub use receive_set::ReceiveSet;
pub use receiver::Receiver;
pub use sender::Sender;
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)]
pub enum SmartChannelSource {
File(std::path::PathBuf),
RrdHttpStream { url: String, follow: bool },
RrdWebEventListener,
JsChannel {
channel_name: String,
},
Sdk,
Stdin,
RedapGrpcStream(re_uri::DatasetDataUri),
MessageProxy(re_uri::ProxyUri),
}
impl std::fmt::Display for SmartChannelSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::File(path) => path.display().fmt(f),
Self::RrdHttpStream { url, follow: _ } => url.fmt(f),
Self::MessageProxy(uri) => uri.fmt(f),
Self::RedapGrpcStream(uri) => uri.fmt(f),
Self::RrdWebEventListener => "Web event listener".fmt(f),
Self::JsChannel { channel_name } => write!(f, "Javascript channel: {channel_name}"),
Self::Sdk => "SDK".fmt(f),
Self::Stdin => "Standard input".fmt(f),
}
}
}
impl SmartChannelSource {
pub fn is_network(&self) -> bool {
match self {
Self::File(_) | Self::Sdk | Self::RrdWebEventListener | Self::Stdin => false,
Self::RrdHttpStream { .. }
| Self::JsChannel { .. }
| Self::RedapGrpcStream { .. }
| Self::MessageProxy { .. } => true,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SmartMessageSource {
Unknown,
File(std::path::PathBuf),
RrdHttpStream {
url: String,
},
RrdWebEventCallback,
JsChannelPush,
Sdk,
Stdin,
RedapGrpcStream(re_uri::DatasetDataUri),
MessageProxy(re_uri::ProxyUri),
}
impl std::fmt::Display for SmartMessageSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&match self {
Self::Unknown => "unknown".into(),
Self::File(path) => format!("file://{}", path.to_string_lossy()),
Self::RrdHttpStream { url } => url.clone(),
Self::MessageProxy(uri) => uri.to_string(),
Self::RedapGrpcStream(uri) => uri.to_string(),
Self::RrdWebEventCallback => "web_callback".into(),
Self::JsChannelPush => "javascript".into(),
Self::Sdk => "sdk".into(),
Self::Stdin => "stdin".into(),
})
}
}
#[derive(Default)]
pub(crate) struct SharedStats {
latency_nanos: AtomicU64,
}
pub fn smart_channel<T: Send>(
sender_source: SmartMessageSource,
source: SmartChannelSource,
) -> (Sender<T>, Receiver<T>) {
let stats = Arc::new(SharedStats::default());
smart_channel_with_stats(sender_source, Arc::new(source), stats)
}
pub(crate) fn smart_channel_with_stats<T: Send>(
sender_source: SmartMessageSource,
source: Arc<SmartChannelSource>,
stats: Arc<SharedStats>,
) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam::channel::unbounded();
let sender_source = Arc::new(sender_source);
let sender = Sender::new(tx, sender_source, stats.clone());
let receiver = Receiver::new(rx, stats, source);
(sender, receiver)
}
pub enum SmartMessagePayload<T: Send> {
Msg(T),
Flush {
on_flush_done: Box<dyn FnOnce() + Send>,
},
Quit(Option<Box<dyn std::error::Error + Send>>),
}
impl<T: Send> std::fmt::Debug for SmartMessagePayload<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Msg(_) => f.write_str("Msg(_)"),
Self::Flush { .. } => f.write_str("Flush"),
Self::Quit(_) => f.write_str("Quit"),
}
}
}
impl<T: Send + PartialEq> PartialEq for SmartMessagePayload<T> {
fn eq(&self, rhs: &Self) -> bool {
match (self, rhs) {
(Self::Msg(msg1), Self::Msg(msg2)) => msg1.eq(msg2),
_ => false,
}
}
}
#[derive(Debug, PartialEq)]
pub struct SmartMessage<T: Send> {
pub time: Instant,
pub source: Arc<SmartMessageSource>,
pub payload: SmartMessagePayload<T>,
}
impl<T: Send> SmartMessage<T> {
pub fn data(&self) -> Option<&T> {
match &self.payload {
SmartMessagePayload::Msg(msg) => Some(msg),
SmartMessagePayload::Flush { .. } | SmartMessagePayload::Quit(_) => None,
}
}
pub fn into_data(self) -> Option<T> {
match self.payload {
SmartMessagePayload::Msg(msg) => Some(msg),
SmartMessagePayload::Flush { .. } | SmartMessagePayload::Quit(_) => None,
}
}
}
#[test]
fn test_smart_channel() {
let (tx, rx) = smart_channel(SmartMessageSource::Sdk, SmartChannelSource::Sdk); assert_eq!(tx.len(), 0);
assert_eq!(rx.len(), 0);
assert_eq!(tx.latency_nanos(), 0);
tx.send(42).unwrap();
assert_eq!(tx.len(), 1);
assert_eq!(rx.len(), 1);
assert_eq!(tx.latency_nanos(), 0);
std::thread::sleep(std::time::Duration::from_millis(10));
assert_eq!(rx.recv().map(|msg| msg.into_data()), Ok(Some(42)));
assert_eq!(tx.len(), 0);
assert_eq!(rx.len(), 0);
assert!(tx.latency_nanos() > 1_000_000);
}
#[test]
fn test_smart_channel_connected() {
let (tx1, rx) = smart_channel(SmartMessageSource::Sdk, SmartChannelSource::Sdk); assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
assert!(rx.is_connected());
let tx2 = tx1.clone();
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
assert!(rx.is_connected());
tx2.send(42).unwrap();
assert_eq!(rx.try_recv().map(|msg| msg.into_data()), Ok(Some(42)));
assert!(rx.is_connected());
drop(tx1);
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
assert!(rx.is_connected());
drop(tx2);
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
assert!(!rx.is_connected());
}