pub mod shutdown;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::pin::Pin;
use re_byte_size::SizeBytes;
use re_log_encoding::codec::wire::decoder::Decode as _;
use re_log_types::TableMsg;
use re_protos::sdk_comms::v1alpha1::ReadTablesRequest;
use re_protos::sdk_comms::v1alpha1::ReadTablesResponse;
use re_protos::sdk_comms::v1alpha1::WriteMessagesRequest;
use re_protos::sdk_comms::v1alpha1::WriteTableRequest;
use re_protos::sdk_comms::v1alpha1::WriteTableResponse;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt as _;
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tower_http::cors::CorsLayer;
use re_memory::MemoryLimit;
use re_protos::{
common::v1alpha1::{
DataframePart as DataframePartProto, StoreKind as StoreKindProto, TableId as TableIdProto,
},
log_msg::v1alpha1::LogMsg as LogMsgProto,
sdk_comms::v1alpha1::{
message_proxy_service_server, ReadMessagesRequest, ReadMessagesResponse,
WriteMessagesResponse,
},
};
pub const DEFAULT_SERVER_PORT: u16 = 9876;
pub const DEFAULT_MEMORY_LIMIT: MemoryLimit = MemoryLimit::UNLIMITED;
const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
const MAX_ENCODING_MESSAGE_SIZE: usize = MAX_DECODING_MESSAGE_SIZE;
const MESSAGE_QUEUE_CAPACITY: usize =
(16 * 1024 * 1024 / std::mem::size_of::<Msg>()).next_power_of_two();
pub async fn serve(
addr: SocketAddr,
memory_limit: MemoryLimit,
shutdown: shutdown::Shutdown,
) -> Result<(), tonic::transport::Error> {
serve_impl(addr, MessageProxy::new(memory_limit), shutdown).await
}
async fn serve_impl(
addr: SocketAddr,
message_proxy: MessageProxy,
shutdown: shutdown::Shutdown,
) -> Result<(), tonic::transport::Error> {
let tcp_listener = TcpListener::bind(addr)
.await
.unwrap_or_else(|err| panic!("failed to bind listener on {addr}: {err}"));
let incoming =
TcpIncoming::from_listener(tcp_listener, true, None).expect("failed to init listener");
let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
} else {
format!("rerun+http://{addr}/proxy")
};
re_log::info!("Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`");
let cors = CorsLayer::very_permissive();
let grpc_web = tonic_web::GrpcWebLayer::new();
let routes = {
let mut routes_builder = tonic::service::Routes::builder();
routes_builder.add_service(
re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer::new(
message_proxy,
)
.max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
.max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
);
routes_builder.routes()
};
Server::builder()
.accept_http1(true) .layer(cors) .layer(grpc_web) .add_routes(routes)
.serve_with_incoming_shutdown(incoming, shutdown.wait())
.await
}
pub async fn serve_from_channel(
addr: SocketAddr,
memory_limit: MemoryLimit,
shutdown: shutdown::Shutdown,
channel_rx: re_smart_channel::Receiver<re_log_types::LogMsg>,
) {
let message_proxy = MessageProxy::new(memory_limit);
let event_tx = message_proxy.event_tx.clone();
tokio::spawn(async move {
use re_smart_channel::SmartMessagePayload;
loop {
let msg = match channel_rx.try_recv() {
Ok(msg) => match msg.payload {
SmartMessagePayload::Msg(msg) => msg,
SmartMessagePayload::Flush { on_flush_done } => {
on_flush_done(); continue;
}
SmartMessagePayload::Quit(err) => {
if let Some(err) = err {
re_log::debug!("smart channel sender quit: {err}");
} else {
re_log::debug!("smart channel sender quit");
}
break;
}
},
Err(re_smart_channel::TryRecvError::Disconnected) => {
re_log::debug!("smart channel sender closed, closing receiver");
break;
}
Err(re_smart_channel::TryRecvError::Empty) => {
tokio::task::yield_now().await;
continue;
}
};
let msg = match re_log_encoding::protobuf_conversions::log_msg_to_proto(
msg,
re_log_encoding::Compression::LZ4,
) {
Ok(msg) => msg,
Err(err) => {
re_log::error!("failed to encode message: {err}");
continue;
}
};
if event_tx.send(Event::Message(msg)).await.is_err() {
re_log::debug!("shut down, closing sender");
break;
}
}
});
if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
re_log::error!("message proxy server crashed: {err}");
}
}
pub fn spawn_from_rx_set(
addr: SocketAddr,
memory_limit: MemoryLimit,
shutdown: shutdown::Shutdown,
rxs: re_smart_channel::ReceiveSet<re_log_types::LogMsg>,
) {
let message_proxy = MessageProxy::new(memory_limit);
let event_tx = message_proxy.event_tx.clone();
tokio::spawn(async move {
if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
re_log::error!("message proxy server crashed: {err}");
}
});
tokio::spawn(async move {
loop {
let Some(msg) = rxs.try_recv().and_then(|(_, m)| m.into_data()) else {
if rxs.is_empty() {
break;
}
tokio::task::yield_now().await;
continue;
};
let msg = match re_log_encoding::protobuf_conversions::log_msg_to_proto(
msg,
re_log_encoding::Compression::LZ4,
) {
Ok(msg) => msg,
Err(err) => {
re_log::error!("failed to encode message: {err}");
continue;
}
};
if event_tx.send(Event::Message(msg)).await.is_err() {
re_log::debug!("shut down, closing sender");
break;
}
}
});
}
pub fn spawn_with_recv(
addr: SocketAddr,
memory_limit: MemoryLimit,
shutdown: shutdown::Shutdown,
) -> (
re_smart_channel::Receiver<re_log_types::LogMsg>,
crossbeam::channel::Receiver<re_log_types::TableMsg>,
) {
let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
addr,
));
let (channel_log_tx, channel_log_rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
re_smart_channel::SmartChannelSource::MessageProxy(uri),
);
let (channel_table_tx, channel_table_rx) = crossbeam::channel::unbounded();
let (message_proxy, mut broadcast_log_rx, mut broadcast_table_rx) =
MessageProxy::new_with_recv(memory_limit);
tokio::spawn(async move {
if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
re_log::error!("message proxy server crashed: {err}");
}
});
tokio::spawn(async move {
loop {
let msg = match broadcast_log_rx.recv().await {
Ok(msg) => re_log_encoding::protobuf_conversions::log_msg_from_proto(msg),
Err(broadcast::error::RecvError::Closed) => {
re_log::debug!("message proxy server shut down, closing receiver");
channel_log_tx.quit(None).ok();
break;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
re_log::warn!(
"message proxy receiver dropped {n} messages due to backpressure"
);
continue;
}
};
match msg {
Ok(msg) => {
if channel_log_tx.send(msg).is_err() {
re_log::debug!(
"message proxy smart channel receiver closed, closing sender"
);
break;
}
}
Err(err) => {
re_log::error!("dropping LogMsg due to failed decode: {err}");
continue;
}
}
}
});
tokio::spawn(async move {
loop {
let msg = match broadcast_table_rx.recv().await {
Ok(msg) => msg.data.decode().map(|data| TableMsg {
id: msg.id.into(),
data,
}),
Err(broadcast::error::RecvError::Closed) => {
re_log::debug!("message proxy server shut down, closing receiver");
break;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
re_log::warn!(
"message proxy receiver dropped {n} messages due to backpressure"
);
continue;
}
};
match msg {
Ok(msg) => {
if channel_table_tx.send(msg).is_err() {
re_log::debug!(
"message proxy smart channel receiver closed, closing sender"
);
break;
}
}
Err(err) => {
re_log::error!("dropping table due to failed decode: {err}");
continue;
}
}
}
});
(channel_log_rx, channel_table_rx)
}
enum Event {
NewClient(
oneshot::Sender<(
Vec<Msg>,
broadcast::Receiver<LogMsgProto>,
broadcast::Receiver<TableMsgProto>,
)>,
),
Message(LogMsgProto),
Table(TableMsgProto),
}
#[derive(Clone)]
struct TableMsgProto {
id: TableIdProto,
data: DataframePartProto,
}
#[derive(Clone)]
enum Msg {
LogMsg(LogMsgProto),
Table(TableMsgProto),
}
impl Msg {
fn total_size_bytes(&self) -> u64 {
match self {
Self::LogMsg(log_msg) => log_msg.total_size_bytes(),
Self::Table(table) => table.total_size_bytes(),
}
}
}
impl From<LogMsgProto> for Msg {
fn from(value: LogMsgProto) -> Self {
Self::LogMsg(value)
}
}
impl From<TableMsgProto> for Msg {
fn from(value: TableMsgProto) -> Self {
Self::Table(value)
}
}
struct EventLoop {
server_memory_limit: MemoryLimit,
broadcast_log_tx: broadcast::Sender<LogMsgProto>,
broadcast_table_tx: broadcast::Sender<TableMsgProto>,
event_rx: mpsc::Receiver<Event>,
ordered_message_queue: VecDeque<Msg>,
ordered_message_bytes: u64,
persistent_message_queue: VecDeque<LogMsgProto>,
}
impl EventLoop {
fn new(
server_memory_limit: MemoryLimit,
event_rx: mpsc::Receiver<Event>,
broadcast_log_tx: broadcast::Sender<LogMsgProto>,
broadcast_table_tx: broadcast::Sender<TableMsgProto>,
) -> Self {
Self {
server_memory_limit,
broadcast_log_tx,
broadcast_table_tx,
event_rx,
ordered_message_queue: Default::default(),
ordered_message_bytes: 0,
persistent_message_queue: Default::default(),
}
}
async fn run_in_place(mut self) {
loop {
let Some(event) = self.event_rx.recv().await else {
break;
};
match event {
Event::NewClient(channel) => self.handle_new_client(channel),
Event::Message(msg) => self.handle_msg(msg),
Event::Table(table) => self.handle_table(table),
}
}
}
fn handle_new_client(
&self,
channel: oneshot::Sender<(
Vec<Msg>,
broadcast::Receiver<LogMsgProto>,
broadcast::Receiver<TableMsgProto>,
)>,
) {
channel
.send((
self.persistent_message_queue
.iter()
.cloned()
.map(Msg::from)
.chain(self.ordered_message_queue.iter().cloned())
.collect(),
self.broadcast_log_tx.subscribe(),
self.broadcast_table_tx.subscribe(),
))
.ok();
}
fn handle_msg(&mut self, msg: LogMsgProto) {
self.broadcast_log_tx.send(msg.clone()).ok();
self.gc_if_using_too_much_ram();
let Some(inner) = &msg.msg else {
re_log::error!(
"{}",
re_protos::missing_field!(re_protos::log_msg::v1alpha1::LogMsg, "msg")
);
return;
};
use re_protos::log_msg::v1alpha1::log_msg::Msg;
match inner {
Msg::SetStoreInfo(..) | Msg::BlueprintActivationCommand(..) => {
self.persistent_message_queue.push_back(msg);
}
Msg::ArrowMsg(ref inner)
if inner
.store_id
.as_ref()
.is_some_and(|id| id.kind() == StoreKindProto::Blueprint) =>
{
self.persistent_message_queue.push_back(msg);
}
Msg::ArrowMsg(..) => {
let approx_size_bytes = msg.total_size_bytes();
self.ordered_message_bytes += approx_size_bytes;
self.ordered_message_queue.push_back(msg.into());
}
}
}
fn handle_table(&mut self, table: TableMsgProto) {
self.broadcast_table_tx.send(table.clone()).ok();
self.gc_if_using_too_much_ram();
let approx_size_bytes = table.total_size_bytes();
self.ordered_message_bytes += approx_size_bytes;
self.ordered_message_queue.push_back(Msg::Table(table));
}
fn gc_if_using_too_much_ram(&mut self) {
re_tracing::profile_function!();
let Some(max_bytes) = self.server_memory_limit.max_bytes else {
return;
};
let max_bytes = max_bytes as u64;
if max_bytes >= self.ordered_message_bytes {
return;
};
{
re_tracing::profile_scope!("Drop messages");
re_log::info_once!(
"Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.",
re_format::format_bytes(max_bytes as _)
);
let bytes_to_free = self.ordered_message_bytes - max_bytes;
let mut bytes_dropped = 0;
let mut messages_dropped = 0;
while bytes_dropped < bytes_to_free {
if let Some(msg) = self.ordered_message_queue.pop_front() {
bytes_dropped += msg.total_size_bytes();
messages_dropped += 1;
} else {
break;
}
}
re_log::trace!(
"Dropped {} bytes in {messages_dropped} message(s)",
re_format::format_bytes(bytes_dropped as _)
);
}
}
}
impl SizeBytes for TableMsgProto {
fn heap_size_bytes(&self) -> u64 {
let Self { id, data } = self;
id.heap_size_bytes() + data.heap_size_bytes()
}
}
pub struct MessageProxy {
_queue_task_handle: tokio::task::JoinHandle<()>,
event_tx: mpsc::Sender<Event>,
}
impl MessageProxy {
pub fn new(server_memory_limit: MemoryLimit) -> Self {
Self::new_with_recv(server_memory_limit).0
}
fn new_with_recv(
server_memory_limit: MemoryLimit,
) -> (
Self,
broadcast::Receiver<LogMsgProto>,
broadcast::Receiver<TableMsgProto>,
) {
let (event_tx, event_rx) = mpsc::channel(MESSAGE_QUEUE_CAPACITY);
let (broadcast_log_tx, broadcast_log_rx) = broadcast::channel(MESSAGE_QUEUE_CAPACITY);
let (broadcast_table_tx, broadcast_table_rx) = broadcast::channel(MESSAGE_QUEUE_CAPACITY);
let task_handle = tokio::spawn(async move {
EventLoop::new(
server_memory_limit,
event_rx,
broadcast_log_tx,
broadcast_table_tx,
)
.run_in_place()
.await;
});
(
Self {
_queue_task_handle: task_handle,
event_tx,
},
broadcast_log_rx,
broadcast_table_rx,
)
}
async fn push_msg(&self, msg: LogMsgProto) {
self.event_tx.send(Event::Message(msg)).await.ok();
}
async fn push_table(&self, table: TableMsgProto) {
self.event_tx.send(Event::Table(table)).await.ok();
}
async fn new_client_message_stream(&self) -> ReadMessagesStream {
let (sender, receiver) = oneshot::channel();
if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
re_log::error!("Error accepting new client: {err}");
return Box::pin(tokio_stream::empty());
};
let (history, log_channel, _) = match receiver.await {
Ok(v) => v,
Err(err) => {
re_log::error!("Error accepting new client: {err}");
return Box::pin(tokio_stream::empty());
}
};
let history = tokio_stream::iter(
history
.into_iter()
.filter_map(|log_msg| {
if let Msg::LogMsg(log_msg) = log_msg {
Some(ReadMessagesResponse {
log_msg: Some(log_msg),
})
} else {
None
}
})
.map(Ok),
);
let channel = BroadcastStream::new(log_channel).map(|result| {
result
.map(|log_msg| ReadMessagesResponse {
log_msg: Some(log_msg),
})
.map_err(|err| {
re_log::error!("Error reading message from broadcast channel: {err}");
tonic::Status::internal("internal channel error")
})
});
Box::pin(history.chain(channel))
}
async fn new_client_table_stream(&self) -> ReadTablesStream {
let (sender, receiver) = oneshot::channel();
if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
re_log::error!("Error accepting new client: {err}");
return Box::pin(tokio_stream::empty());
};
let (history, _, table_channel) = match receiver.await {
Ok(v) => v,
Err(err) => {
re_log::error!("Error accepting new client: {err}");
return Box::pin(tokio_stream::empty());
}
};
let history = tokio_stream::iter(
history
.into_iter()
.filter_map(|table| {
if let Msg::Table(table) = table {
Some(ReadTablesResponse {
id: Some(table.id),
data: Some(table.data),
})
} else {
None
}
})
.map(Ok),
);
let channel = BroadcastStream::new(table_channel).map(|result| {
result
.map(|table| ReadTablesResponse {
id: Some(table.id),
data: Some(table.data),
})
.map_err(|err| {
re_log::error!("Error reading message from broadcast channel: {err}");
tonic::Status::internal("internal channel error")
})
});
Box::pin(history.chain(channel))
}
}
type ReadMessagesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadMessagesResponse>> + Send>>;
type ReadTablesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadTablesResponse>> + Send>>;
#[tonic::async_trait]
impl message_proxy_service_server::MessageProxyService for MessageProxy {
async fn write_messages(
&self,
request: tonic::Request<tonic::Streaming<WriteMessagesRequest>>,
) -> tonic::Result<tonic::Response<WriteMessagesResponse>> {
let mut stream = request.into_inner();
loop {
match stream.message().await {
Ok(Some(WriteMessagesRequest {
log_msg: Some(log_msg),
})) => {
self.push_msg(log_msg).await;
}
Ok(Some(WriteMessagesRequest { log_msg: None })) => {
re_log::warn!("missing log_msg in WriteMessagesRequest");
}
Ok(None) => {
break;
}
Err(err) => {
re_log::error!("Error while receiving messages: {err}");
break;
}
}
}
Ok(tonic::Response::new(WriteMessagesResponse {}))
}
type ReadMessagesStream = ReadMessagesStream;
async fn read_messages(
&self,
_: tonic::Request<ReadMessagesRequest>,
) -> tonic::Result<tonic::Response<Self::ReadMessagesStream>> {
Ok(tonic::Response::new(self.new_client_message_stream().await))
}
type ReadTablesStream = ReadTablesStream;
async fn write_table(
&self,
request: tonic::Request<WriteTableRequest>,
) -> tonic::Result<tonic::Response<WriteTableResponse>> {
if let WriteTableRequest {
id: Some(id),
data: Some(data),
} = request.into_inner()
{
self.push_table(TableMsgProto { id, data }).await;
} else {
re_log::warn!("malformed `WriteTableRequest`");
}
Ok(tonic::Response::new(WriteTableResponse {}))
}
async fn read_tables(
&self,
_: tonic::Request<ReadTablesRequest>,
) -> tonic::Result<tonic::Response<Self::ReadTablesStream>> {
Ok(tonic::Response::new(self.new_client_table_stream().await))
}
}
#[cfg(test)]
mod tests {
use super::*;
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_encoding::protobuf_conversions::{log_msg_from_proto, log_msg_to_proto};
use re_log_encoding::Compression;
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource,
};
use re_protos::sdk_comms::v1alpha1::{
message_proxy_service_client::MessageProxyServiceClient,
message_proxy_service_server::MessageProxyServiceServer,
};
use similar_asserts::assert_eq;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tonic::transport::server::TcpIncoming;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
#[derive(Clone)]
struct Completion(Arc<CancellationToken>);
impl Drop for Completion {
fn drop(&mut self) {
self.finish();
}
}
impl Completion {
fn new() -> Self {
Self(Arc::new(CancellationToken::new()))
}
fn finish(&self) {
self.0.cancel();
}
async fn wait(&self) {
self.0.cancelled().await;
}
}
fn fake_log_stream_blueprint(n: usize) -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Blueprint);
let mut messages = Vec::new();
messages.push(LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *RowId::new(),
info: StoreInfo {
application_id: ApplicationId("test".to_owned()),
store_id: store_id.clone(),
cloned_from: None,
store_source: StoreSource::RustSdk {
rustc_version: String::new(),
llvm_version: String::new(),
},
store_version: Some(CrateVersion::LOCAL),
},
}));
for _ in 0..n {
messages.push(LogMsg::ArrowMsg(
store_id.clone(),
re_chunk::Chunk::builder("test_entity".into())
.with_archetype(
re_chunk::RowId::new(),
re_log_types::TimePoint::default().with(
re_log_types::Timeline::new_sequence("blueprint"),
re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
),
&re_types::blueprint::archetypes::Background::new(
re_types::blueprint::components::BackgroundKind::SolidColor,
)
.with_color([255, 0, 0]),
)
.build()
.unwrap()
.to_arrow_msg()
.unwrap(),
));
}
messages.push(LogMsg::BlueprintActivationCommand(
re_log_types::BlueprintActivationCommand {
blueprint_id: store_id,
make_active: true,
make_default: true,
},
));
messages
}
fn fake_log_stream_recording(n: usize) -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Recording);
let mut messages = Vec::new();
messages.push(LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *RowId::new(),
info: StoreInfo {
application_id: ApplicationId("test".to_owned()),
store_id: store_id.clone(),
cloned_from: None,
store_source: StoreSource::RustSdk {
rustc_version: String::new(),
llvm_version: String::new(),
},
store_version: Some(CrateVersion::LOCAL),
},
}));
for _ in 0..n {
messages.push(LogMsg::ArrowMsg(
store_id.clone(),
re_chunk::Chunk::builder("test_entity".into())
.with_archetype(
re_chunk::RowId::new(),
re_log_types::TimePoint::default().with(
re_log_types::Timeline::new_sequence("log_time"),
re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
),
&re_types::archetypes::Points2D::new([(0.0, 0.0), (1.0, 1.0), (2.0, 2.0)]),
)
.build()
.unwrap()
.to_arrow_msg()
.unwrap(),
));
}
messages
}
async fn setup() -> (Completion, SocketAddr) {
setup_with_memory_limit(MemoryLimit::UNLIMITED).await
}
async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) {
let completion = Completion::new();
let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = tcp_listener.local_addr().unwrap();
tokio::spawn({
let completion = completion.clone();
async move {
tonic::transport::Server::builder()
.add_service(MessageProxyServiceServer::new(super::MessageProxy::new(
memory_limit,
)))
.serve_with_incoming_shutdown(
TcpIncoming::from_listener(tcp_listener, true, None).unwrap(),
completion.wait(),
)
.await
.unwrap();
}
});
(completion, addr)
}
async fn make_client(addr: SocketAddr) -> MessageProxyServiceClient<Channel> {
MessageProxyServiceClient::new(
Endpoint::from_shared(format!("http://{addr}"))
.unwrap()
.connect()
.await
.unwrap(),
)
}
async fn read_log_stream(
log_stream: &mut tonic::Response<tonic::Streaming<ReadMessagesResponse>>,
n: usize,
) -> Vec<LogMsg> {
let mut stream_ref = log_stream
.get_mut()
.map(|result| log_msg_from_proto(result.unwrap().log_msg.unwrap()).unwrap());
let mut messages = Vec::new();
for _ in 0..n {
messages.push(stream_ref.next().await.unwrap());
}
messages
}
#[tokio::test]
async fn pubsub_basic() {
let (completion, addr) = setup().await;
let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
client
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
let actual = read_log_stream(&mut log_stream, messages.len()).await;
assert_eq!(messages, actual);
assert!(matches!(messages[0], LogMsg::SetStoreInfo(..)));
assert!(matches!(actual[0], LogMsg::SetStoreInfo(..)));
completion.finish();
}
#[tokio::test]
async fn pubsub_history() {
let (completion, addr) = setup().await;
let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
client
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
let actual = read_log_stream(&mut log_stream, messages.len()).await;
assert_eq!(messages, actual);
completion.finish();
}
#[tokio::test]
async fn one_producer_many_consumers() {
let (completion, addr) = setup().await;
let mut producer = make_client(addr).await; let mut consumers = vec![make_client(addr).await, make_client(addr).await];
let messages = fake_log_stream_blueprint(3);
let mut log_streams = vec![];
for consumer in &mut consumers {
log_streams.push(
consumer
.read_messages(ReadMessagesRequest {})
.await
.unwrap(),
);
}
producer
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
for log_stream in &mut log_streams {
let actual = read_log_stream(log_stream, messages.len()).await;
assert_eq!(messages, actual);
}
completion.finish();
}
#[tokio::test]
async fn many_producers_many_consumers() {
let (completion, addr) = setup().await;
let mut producers = vec![make_client(addr).await, make_client(addr).await];
let mut consumers = vec![make_client(addr).await, make_client(addr).await];
let messages = fake_log_stream_blueprint(3);
let mut log_streams = vec![];
for consumer in &mut consumers {
log_streams.push(
consumer
.read_messages(ReadMessagesRequest {})
.await
.unwrap(),
);
}
for producer in &mut producers {
producer
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
}
let expected = [messages.clone(), messages.clone()].concat();
for log_stream in &mut log_streams {
let actual = read_log_stream(log_stream, expected.len()).await;
assert_eq!(actual, expected);
}
completion.finish();
}
#[tokio::test]
async fn memory_limit_drops_messages() {
let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
let mut client = make_client(addr).await;
let messages = fake_log_stream_recording(3);
client
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
let mut actual = vec![];
loop {
let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
tokio::pin!(timeout_stream);
let timeout_result = timeout_stream.try_next().await;
match timeout_result {
Ok(Some(value)) => {
actual.push(log_msg_from_proto(value.unwrap().log_msg.unwrap()).unwrap());
}
Ok(None) | Err(_) => break,
}
}
assert_eq!(actual.len(), 2);
assert_eq!(&actual[0], &messages[0]);
assert_eq!(&actual[1], messages.last().unwrap());
completion.finish();
}
#[tokio::test]
async fn memory_limit_does_not_drop_blueprint() {
let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
let mut client = make_client(addr).await;
let messages = fake_log_stream_blueprint(3);
client
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
let mut actual = vec![];
loop {
let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
tokio::pin!(timeout_stream);
let timeout_result = timeout_stream.try_next().await;
match timeout_result {
Ok(Some(value)) => {
actual.push(log_msg_from_proto(value.unwrap().log_msg.unwrap()).unwrap());
}
Ok(None) | Err(_) => break,
}
}
assert_eq!(messages, actual);
completion.finish();
}
#[tokio::test]
async fn memory_limit_does_not_interrupt_stream() {
let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
client
.write_messages(tokio_stream::iter(
messages
.clone()
.into_iter()
.map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap())
.map(|msg| WriteMessagesRequest { log_msg: Some(msg) }),
))
.await
.unwrap();
let actual = read_log_stream(&mut log_stream, messages.len()).await;
assert_eq!(messages, actual);
completion.finish();
}
}