use std::sync::Arc;
use crossbeam::channel::Select;
use parking_lot::Mutex;
use crate::{Receiver, RecvError, SmartChannelSource, SmartMessage};
pub struct ReceiveSet<T: Send> {
receivers: Mutex<Vec<Receiver<T>>>,
}
impl<T: Send> Default for ReceiveSet<T> {
fn default() -> Self {
Self::new(Vec::new())
}
}
impl<T: Send> ReceiveSet<T> {
pub fn new(receivers: Vec<Receiver<T>>) -> Self {
Self {
receivers: Mutex::new(receivers),
}
}
pub fn add(&self, r: Receiver<T>) {
re_tracing::profile_function!();
let mut rx = self.receivers.lock();
rx.push(r);
}
pub fn remove(&self, source: &SmartChannelSource) {
self.receivers.lock().retain(|r| r.source() != source);
}
pub fn retain(&self, mut f: impl FnMut(&Receiver<T>) -> bool) {
self.receivers.lock().retain(|r| f(r));
}
pub fn clear(&self) {
self.receivers.lock().clear();
}
#[cfg(target_arch = "wasm32")]
pub fn remove_by_uri(&self, uri: &str) {
self.receivers.lock().retain(|r| match r.source() {
SmartChannelSource::RrdHttpStream { url, .. } => url != uri,
SmartChannelSource::WsClient { ws_server_url } => ws_server_url != uri,
_ => true,
});
}
pub fn sources(&self) -> Vec<Arc<SmartChannelSource>> {
re_tracing::profile_function!();
let mut rx = self.receivers.lock();
rx.retain(|r| r.is_connected());
rx.iter().map(|r| r.source.clone()).collect()
}
pub fn is_connected(&self) -> bool {
!self.is_empty()
}
pub fn accepts_tcp_connections(&self) -> bool {
re_tracing::profile_function!();
self.sources()
.iter()
.any(|s| matches!(**s, SmartChannelSource::TcpServer { .. }))
}
pub fn is_empty(&self) -> bool {
re_tracing::profile_function!();
let mut rx = self.receivers.lock();
rx.retain(|r| r.is_connected());
rx.is_empty()
}
pub fn latency_ns(&self) -> u64 {
re_tracing::profile_function!();
let mut latency_ns = 0;
let rx = self.receivers.lock();
for r in rx.iter() {
latency_ns = r.latency_ns().max(latency_ns);
}
latency_ns
}
pub fn queue_len(&self) -> usize {
re_tracing::profile_function!();
let rx = self.receivers.lock();
rx.iter().map(|r| r.len()).sum()
}
pub fn recv(&self) -> Result<SmartMessage<T>, RecvError> {
re_tracing::profile_function!();
let mut rx = self.receivers.lock();
loop {
rx.retain(|r| r.is_connected());
if rx.is_empty() {
return Err(RecvError);
}
let mut sel = Select::new();
for r in rx.iter() {
sel.recv(&r.rx);
}
let oper = sel.select();
let index = oper.index();
if let Ok(msg) = oper.recv(&rx[index].rx) {
return Ok(msg);
}
}
}
pub fn try_recv(&self) -> Option<(Arc<SmartChannelSource>, SmartMessage<T>)> {
re_tracing::profile_function!();
let mut rx = self.receivers.lock();
rx.retain(|r| r.is_connected());
if rx.is_empty() {
return None;
}
let mut sel = Select::new();
for r in rx.iter() {
sel.recv(&r.rx);
}
let oper = sel.try_select().ok()?;
let index = oper.index();
if let Ok(msg) = oper.recv(&rx[index].rx) {
return Some((rx[index].source.clone(), msg));
}
for rx in rx.iter() {
if let Ok(msg) = rx.try_recv() {
return Some((rx.source.clone(), msg));
}
}
None
}
pub fn recv_timeout(
&self,
timeout: std::time::Duration,
) -> Option<(Arc<SmartChannelSource>, SmartMessage<T>)> {
re_tracing::profile_function!();
let mut rx = self.receivers.lock();
rx.retain(|r| r.is_connected());
if rx.is_empty() {
return None;
}
let mut sel = Select::new();
for r in rx.iter() {
sel.recv(&r.rx);
}
let oper = sel.select_timeout(timeout).ok()?;
let index = oper.index();
if let Ok(msg) = oper.recv(&rx[index].rx) {
return Some((rx[index].source.clone(), msg));
}
for rx in rx.iter() {
if let Ok(msg) = rx.try_recv() {
return Some((rx.source.clone(), msg));
}
}
None
}
}
#[test]
fn test_receive_set() {
use crate::{smart_channel, SmartMessageSource};
let timeout = std::time::Duration::from_millis(100);
let (tx_file, rx_file) = smart_channel::<i32>(
SmartMessageSource::File("path".into()),
SmartChannelSource::File("path".into()),
);
let (tx_sdk, rx_sdk) = smart_channel::<i32>(SmartMessageSource::Sdk, SmartChannelSource::Sdk);
let set = ReceiveSet::default();
assert_eq!(set.try_recv(), None);
assert_eq!(set.recv_timeout(timeout), None);
assert_eq!(set.sources(), vec![]);
set.add(rx_file);
assert_eq!(set.try_recv(), None);
assert_eq!(set.recv_timeout(timeout), None);
assert_eq!(
set.sources(),
vec![Arc::new(SmartChannelSource::File("path".into()))]
);
set.add(rx_sdk);
assert_eq!(set.try_recv(), None);
assert_eq!(set.recv_timeout(timeout), None);
assert_eq!(
set.sources(),
vec![
Arc::new(SmartChannelSource::File("path".into())),
Arc::new(SmartChannelSource::Sdk)
]
);
tx_sdk.send(42).unwrap();
assert_eq!(set.try_recv().unwrap().0, Arc::new(SmartChannelSource::Sdk));
assert_eq!(set.try_recv(), None);
assert_eq!(set.recv_timeout(timeout), None);
assert_eq!(set.sources().len(), 2);
drop(tx_sdk);
assert_eq!(set.try_recv(), None);
assert_eq!(set.recv_timeout(timeout), None);
assert_eq!(
set.sources(),
vec![Arc::new(SmartChannelSource::File("path".into()))]
);
drop(tx_file);
assert_eq!(set.try_recv(), None);
assert_eq!(set.recv_timeout(timeout), None);
assert_eq!(set.sources(), vec![]);
}