use std::{
hash::{Hash as _, Hasher},
sync::Arc,
time::{Duration, Instant},
};
use arrow2::array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray};
use crossbeam::channel::{Receiver, Sender};
use nohash_hasher::IntMap;
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline};
use re_types_core::{ComponentDescriptor, SizeBytes as _};
use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
#[derive(thiserror::Error, Debug)]
pub enum ChunkBatcherError {
#[error("Failed to parse config: '{name}={value}': {err}")]
ParseConfig {
name: &'static str,
value: String,
err: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Failed to spawn background thread '{name}': {err}")]
SpawnThread {
name: &'static str,
err: Box<dyn std::error::Error + Send + Sync>,
},
}
pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
#[derive(Clone, Default)]
pub struct BatcherHooks {
#[allow(clippy::type_complexity)]
pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
pub on_release: Option<re_log_types::ArrowChunkReleaseCallback>,
}
impl BatcherHooks {
pub const NONE: Self = Self {
on_insert: None,
on_release: None,
};
}
impl PartialEq for BatcherHooks {
fn eq(&self, other: &Self) -> bool {
let Self {
on_insert,
on_release,
} = self;
let on_insert_eq = match (on_insert, &other.on_insert) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
};
on_insert_eq && on_release == &other.on_release
}
}
impl std::fmt::Debug for BatcherHooks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
on_insert,
on_release,
} = self;
f.debug_struct("BatcherHooks")
.field("on_insert", &on_insert.as_ref().map(|_| "…"))
.field("on_release", &on_release)
.finish()
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ChunkBatcherConfig {
pub flush_tick: Duration,
pub flush_num_bytes: u64,
pub flush_num_rows: u64,
pub chunk_max_rows_if_unsorted: u64,
pub max_commands_in_flight: Option<u64>,
pub max_chunks_in_flight: Option<u64>,
pub hooks: BatcherHooks,
}
impl Default for ChunkBatcherConfig {
fn default() -> Self {
Self::DEFAULT
}
}
impl ChunkBatcherConfig {
pub const DEFAULT: Self = Self {
flush_tick: Duration::from_millis(8), flush_num_bytes: 1024 * 1024, flush_num_rows: u64::MAX,
chunk_max_rows_if_unsorted: 256,
max_commands_in_flight: None,
max_chunks_in_flight: None,
hooks: BatcherHooks::NONE,
};
pub const ALWAYS: Self = Self {
flush_tick: Duration::MAX,
flush_num_bytes: 0,
flush_num_rows: 0,
chunk_max_rows_if_unsorted: 256,
max_commands_in_flight: None,
max_chunks_in_flight: None,
hooks: BatcherHooks::NONE,
};
pub const NEVER: Self = Self {
flush_tick: Duration::MAX,
flush_num_bytes: u64::MAX,
flush_num_rows: u64::MAX,
chunk_max_rows_if_unsorted: 256,
max_commands_in_flight: None,
max_chunks_in_flight: None,
hooks: BatcherHooks::NONE,
};
pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
#[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
#[inline]
pub fn from_env() -> ChunkBatcherResult<Self> {
Self::default().apply_env()
}
pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
let mut new = self.clone();
if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
let flush_duration_secs: f64 =
s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
name: Self::ENV_FLUSH_TICK,
value: s.clone(),
err: Box::new(err),
})?;
new.flush_tick = Duration::from_secs_f64(flush_duration_secs);
}
if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
if let Some(num_bytes) = re_format::parse_bytes(&s) {
new.flush_num_bytes = num_bytes.unsigned_abs();
} else {
new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
name: Self::ENV_FLUSH_NUM_BYTES,
value: s.clone(),
err: Box::new(err),
})?;
}
}
if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
name: Self::ENV_FLUSH_NUM_ROWS,
value: s.clone(),
err: Box::new(err),
})?;
}
if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
new.chunk_max_rows_if_unsorted =
s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
value: s.clone(),
err: Box::new(err),
})?;
}
#[allow(deprecated)]
if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
new.chunk_max_rows_if_unsorted =
s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
value: s.clone(),
err: Box::new(err),
})?;
}
Ok(new)
}
}
#[test]
fn chunk_batcher_config() {
std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
let config = ChunkBatcherConfig::from_env().unwrap();
let expected = ChunkBatcherConfig {
flush_tick: Duration::from_millis(300),
flush_num_bytes: 42,
flush_num_rows: 666,
chunk_max_rows_if_unsorted: 7777,
..Default::default()
};
assert_eq!(expected, config);
std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
let config = ChunkBatcherConfig::from_env().unwrap();
let expected = ChunkBatcherConfig {
flush_tick: Duration::from_millis(300),
flush_num_bytes: 42,
flush_num_rows: 666,
chunk_max_rows_if_unsorted: 9999,
..Default::default()
};
assert_eq!(expected, config);
}
#[derive(Clone)]
pub struct ChunkBatcher {
inner: Arc<ChunkBatcherInner>,
}
struct ChunkBatcherInner {
tx_cmds: Sender<Command>,
rx_chunks: Option<Receiver<Chunk>>,
cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
}
impl Drop for ChunkBatcherInner {
fn drop(&mut self) {
if let Some(rx_chunks) = self.rx_chunks.take() {
if !rx_chunks.is_empty() {
re_log::warn!("Dropping data");
}
}
self.tx_cmds.send(Command::Shutdown).ok();
if let Some(handle) = self.cmds_to_chunks_handle.take() {
handle.join().ok();
}
}
}
enum Command {
AppendChunk(Chunk),
AppendRow(EntityPath, PendingRow),
Flush(Sender<()>),
Shutdown,
}
impl Command {
fn flush() -> (Self, Receiver<()>) {
let (tx, rx) = crossbeam::channel::bounded(0); (Self::Flush(tx), rx)
}
}
impl ChunkBatcher {
#[must_use = "Batching threads will automatically shutdown when this object is dropped"]
#[allow(clippy::needless_pass_by_value)]
pub fn new(config: ChunkBatcherConfig) -> ChunkBatcherResult<Self> {
let (tx_cmds, rx_cmd) = match config.max_commands_in_flight {
Some(cap) => crossbeam::channel::bounded(cap as _),
None => crossbeam::channel::unbounded(),
};
let (tx_chunk, rx_chunks) = match config.max_chunks_in_flight {
Some(cap) => crossbeam::channel::bounded(cap as _),
None => crossbeam::channel::unbounded(),
};
let cmds_to_chunks_handle = {
const NAME: &str = "ChunkBatcher::cmds_to_chunks";
std::thread::Builder::new()
.name(NAME.into())
.spawn({
let config = config.clone();
move || batching_thread(config, rx_cmd, tx_chunk)
})
.map_err(|err| ChunkBatcherError::SpawnThread {
name: NAME,
err: Box::new(err),
})?
};
re_log::debug!(?config, "creating new chunk batcher");
let inner = ChunkBatcherInner {
tx_cmds,
rx_chunks: Some(rx_chunks),
cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
};
Ok(Self {
inner: Arc::new(inner),
})
}
pub fn push_chunk(&self, chunk: Chunk) {
self.inner.push_chunk(chunk);
}
#[inline]
pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
self.inner.push_row(entity_path, row);
}
#[inline]
pub fn flush_async(&self) {
self.inner.flush_async();
}
#[inline]
pub fn flush_blocking(&self) {
self.inner.flush_blocking();
}
pub fn chunks(&self) -> Receiver<Chunk> {
#[allow(clippy::unwrap_used)]
self.inner.rx_chunks.clone().unwrap()
}
}
impl ChunkBatcherInner {
fn push_chunk(&self, chunk: Chunk) {
self.send_cmd(Command::AppendChunk(chunk));
}
fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
self.send_cmd(Command::AppendRow(entity_path, row));
}
fn flush_async(&self) {
let (flush_cmd, _) = Command::flush();
self.send_cmd(flush_cmd);
}
fn flush_blocking(&self) {
let (flush_cmd, oneshot) = Command::flush();
self.send_cmd(flush_cmd);
oneshot.recv().ok();
}
fn send_cmd(&self, cmd: Command) {
self.tx_cmds.send(cmd).ok();
}
}
#[allow(clippy::needless_pass_by_value)]
fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chunk: Sender<Chunk>) {
let rx_tick = crossbeam::channel::tick(config.flush_tick);
struct Accumulator {
latest: Instant,
entity_path: EntityPath,
pending_rows: Vec<PendingRow>,
pending_num_bytes: u64,
}
impl Accumulator {
fn new(entity_path: EntityPath) -> Self {
Self {
entity_path,
latest: Instant::now(),
pending_rows: Default::default(),
pending_num_bytes: Default::default(),
}
}
fn reset(&mut self) {
self.latest = Instant::now();
self.pending_rows.clear();
self.pending_num_bytes = 0;
}
}
let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
acc.pending_num_bytes += row.total_size_bytes();
acc.pending_rows.push(row);
}
fn do_flush_all(
acc: &mut Accumulator,
tx_chunk: &Sender<Chunk>,
reason: &str,
chunk_max_rows_if_unsorted: u64,
) {
let rows = std::mem::take(&mut acc.pending_rows);
if rows.is_empty() {
return;
}
re_log::trace!(
"Flushing {} rows and {} bytes. Reason: {reason}",
rows.len(),
re_format::format_bytes(acc.pending_num_bytes as _)
);
let chunks =
PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
for chunk in chunks {
let chunk = match chunk {
Ok(chunk) => chunk,
Err(err) => {
re_log::error!(%err, "corrupt chunk detected, dropping");
continue;
}
};
tx_chunk.send(chunk).ok();
}
acc.reset();
}
re_log::trace!(
"Flushing every: {:.2}s, {} rows, {}",
config.flush_tick.as_secs_f64(),
config.flush_num_rows,
re_format::format_bytes(config.flush_num_bytes as _),
);
let mut skip_next_tick = false;
use crossbeam::select;
loop {
select! {
recv(rx_cmd) -> cmd => {
let Ok(cmd) = cmd else {
break;
};
match cmd {
Command::AppendChunk(chunk) => {
tx_chunk.send(chunk).ok();
},
Command::AppendRow(entity_path, row) => {
let acc = accs.entry(entity_path.clone())
.or_insert_with(|| Accumulator::new(entity_path));
do_push_row(acc, row);
if let Some(config) = config.hooks.on_insert.as_ref() {
config(&acc.pending_rows);
}
if acc.pending_rows.len() as u64 >= config.flush_num_rows {
do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
skip_next_tick = true;
} else if acc.pending_num_bytes >= config.flush_num_bytes {
do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
skip_next_tick = true;
}
},
Command::Flush(oneshot) => {
skip_next_tick = true;
for acc in accs.values_mut() {
do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
}
drop(oneshot); },
Command::Shutdown => break,
};
},
recv(rx_tick) -> _ => {
if skip_next_tick {
skip_next_tick = false;
} else {
for acc in accs.values_mut() {
do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
}
}
},
};
}
drop(rx_cmd);
for acc in accs.values_mut() {
do_flush_all(
acc,
&tx_chunk,
"shutdown",
config.chunk_max_rows_if_unsorted,
);
}
drop(tx_chunk);
}
#[derive(Debug, Clone)]
pub struct PendingRow {
pub row_id: RowId,
pub timepoint: TimePoint,
pub components: IntMap<ComponentDescriptor, Box<dyn Arrow2Array>>,
}
impl PendingRow {
#[inline]
pub fn new(
timepoint: TimePoint,
components: IntMap<ComponentDescriptor, Box<dyn Arrow2Array>>,
) -> Self {
Self {
row_id: RowId::new(),
timepoint,
components,
}
}
}
impl re_types_core::SizeBytes for PendingRow {
#[inline]
fn heap_size_bytes(&self) -> u64 {
let Self {
row_id,
timepoint,
components,
} = self;
row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
}
}
impl PendingRow {
pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
let Self {
row_id,
timepoint,
components,
} = self;
let timelines = timepoint
.into_iter()
.map(|(timeline, time)| {
let times = Arrow2PrimitiveArray::<i64>::from_vec(vec![time.as_i64()]);
let time_column = TimeColumn::new(Some(true), timeline, times);
(timeline, time_column)
})
.collect();
let mut per_name = ChunkComponents::default();
for (component_desc, array) in components {
let list_array = crate::util::arrays_to_list_array_opt(&[Some(&*array as _)]);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
}
}
Chunk::from_native_row_ids(
ChunkId::new(),
entity_path,
Some(true),
&[row_id],
timelines,
per_name,
)
}
pub fn many_into_chunks(
entity_path: EntityPath,
chunk_max_rows_if_unsorted: u64,
mut rows: Vec<Self>,
) -> impl Iterator<Item = ChunkResult<Chunk>> {
re_tracing::profile_function!();
{
re_tracing::profile_scope!("sort rows");
rows.sort_by_key(|row| row.row_id);
}
let mut per_timeline_set: IntMap<u64 , Vec<Self>> = Default::default();
{
re_tracing::profile_scope!("compute timeline sets");
for row in rows {
let mut hasher = ahash::AHasher::default();
row.timepoint
.timelines()
.for_each(|timeline| timeline.hash(&mut hasher));
per_timeline_set
.entry(hasher.finish())
.or_default()
.push(row);
}
}
per_timeline_set.into_values().flat_map(move |rows| {
re_tracing::profile_scope!("iterate per timeline set");
let mut per_datatype_set: IntMap<u64 , Vec<Self>> =
Default::default();
{
re_tracing::profile_scope!("compute datatype sets");
for row in rows {
let mut hasher = ahash::AHasher::default();
row.components
.values()
.for_each(|array| array.data_type().hash(&mut hasher));
per_datatype_set
.entry(hasher.finish())
.or_default()
.push(row);
}
}
let entity_path = entity_path.clone();
per_datatype_set.into_values().flat_map(move |rows| {
re_tracing::profile_scope!("iterate per datatype set");
let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
let mut timelines: IntMap<Timeline, PendingTimeColumn> = IntMap::default();
let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn Arrow2Array>>> =
IntMap::default();
for row in &rows {
for component_desc in row.components.keys() {
all_components.entry(component_desc.clone()).or_default();
}
}
let mut chunks = Vec::new();
let mut components = all_components.clone();
for row in &rows {
let Self {
row_id,
timepoint: row_timepoint,
components: row_components,
} = row;
for (&timeline, _) in row_timepoint {
let time_column = timelines
.entry(timeline)
.or_insert_with(|| PendingTimeColumn::new(timeline));
if !row_ids.is_empty() && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
&& !time_column.is_sorted
{
chunks.push(Chunk::from_native_row_ids(
ChunkId::new(),
entity_path.clone(),
Some(true),
&std::mem::take(&mut row_ids),
std::mem::take(&mut timelines)
.into_iter()
.map(|(timeline, time_column)| (timeline, time_column.finish()))
.collect(),
{
let mut per_name = ChunkComponents::default();
for (component_desc, arrays) in std::mem::take(&mut components)
{
let list_array =
crate::util::arrays_to_list_array_opt(&arrays);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
}
}
per_name
},
));
components = all_components.clone();
}
}
row_ids.push(*row_id);
for (&timeline, &time) in row_timepoint {
let time_column = timelines
.entry(timeline)
.or_insert_with(|| PendingTimeColumn::new(timeline));
time_column.push(time);
}
for (component_desc, arrays) in &mut components {
arrays.push(
row_components
.get(component_desc)
.map(|array| &**array as &dyn Arrow2Array),
);
}
}
chunks.push(Chunk::from_native_row_ids(
ChunkId::new(),
entity_path.clone(),
Some(true),
&std::mem::take(&mut row_ids),
timelines
.into_iter()
.map(|(timeline, time_column)| (timeline, time_column.finish()))
.collect(),
{
let mut per_name = ChunkComponents::default();
for (component_desc, arrays) in components {
let list_array = crate::util::arrays_to_list_array_opt(&arrays);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
}
}
per_name
},
));
chunks
})
})
}
}
struct PendingTimeColumn {
timeline: Timeline,
times: Vec<i64>,
is_sorted: bool,
time_range: ResolvedTimeRange,
}
impl PendingTimeColumn {
fn new(timeline: Timeline) -> Self {
Self {
timeline,
times: Default::default(),
is_sorted: true,
time_range: ResolvedTimeRange::EMPTY,
}
}
fn push(&mut self, time: TimeInt) {
let Self {
timeline: _,
times,
is_sorted,
time_range,
} = self;
*is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
time_range.set_min(TimeInt::min(time_range.min(), time));
time_range.set_max(TimeInt::max(time_range.max(), time));
times.push(time.as_i64());
}
fn finish(self) -> TimeColumn {
let Self {
timeline,
times,
is_sorted,
time_range,
} = self;
TimeColumn {
timeline,
times: Arrow2PrimitiveArray::<i64>::from_vec(times).to(timeline.datatype()),
is_sorted,
time_range,
}
}
}
#[cfg(test)]
mod tests {
use crossbeam::channel::TryRecvError;
use re_log_types::example_components::{MyPoint, MyPoint64};
use re_types_core::{Component as _, Loggable as _};
use super::*;
#[test]
fn simple() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let timeline1 = Timeline::new_temporal("log_time");
let timepoint1 = TimePoint::default().with(timeline1, 42);
let timepoint2 = TimePoint::default().with(timeline1, 43);
let timepoint3 = TimePoint::default().with(timeline1, 44);
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())];
let components3 = [(MyPoint::descriptor(), points3.clone())];
let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
let entity_path1: EntityPath = "a/b/c".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path1.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(1, chunks.len());
{
let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
let expected_timelines = [(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![42, 43, 44]),
),
)];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some))
.unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
Ok(())
}
#[test]
fn simple_static() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let timeless = TimePoint::default();
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())];
let components3 = [(MyPoint::descriptor(), points3.clone())];
let row1 = PendingRow::new(timeless.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timeless.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timeless.clone(), components3.into_iter().collect());
let entity_path1: EntityPath = "a/b/c".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path1.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(1, chunks.len());
{
let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
let expected_timelines = [];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some))
.unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
Ok(())
}
#[test]
fn different_entities() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let timeline1 = Timeline::new_temporal("log_time");
let timepoint1 = TimePoint::default().with(timeline1, 42);
let timepoint2 = TimePoint::default().with(timeline1, 43);
let timepoint3 = TimePoint::default().with(timeline1, 44);
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())];
let components3 = [(MyPoint::descriptor(), points3.clone())];
let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
let entity_path1: EntityPath = "ent1".into();
let entity_path2: EntityPath = "ent2".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path2.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(2, chunks.len());
{
let expected_row_ids = vec![row1.row_id, row3.row_id];
let expected_timelines = [(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![42, 44]),
),
)];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
{
let expected_row_ids = vec![row2.row_id];
let expected_timelines = [(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![43]),
),
)];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
entity_path2.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[1]);
assert_eq!(expected_chunk, chunks[1]);
}
Ok(())
}
#[test]
fn different_timelines() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let timeline1 = Timeline::new_temporal("log_time");
let timeline2 = Timeline::new_sequence("frame_nr");
let timepoint1 = TimePoint::default().with(timeline1, 42);
let timepoint2 = TimePoint::default()
.with(timeline1, 43)
.with(timeline2, 1000);
let timepoint3 = TimePoint::default()
.with(timeline1, 44)
.with(timeline2, 1001);
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())];
let components3 = [(MyPoint::descriptor(), points3.clone())];
let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
let entity_path1: EntityPath = "a/b/c".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path1.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(2, chunks.len());
{
let expected_row_ids = vec![row1.row_id];
let expected_timelines = [(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![42]),
),
)];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
{
let expected_row_ids = vec![row2.row_id, row3.row_id];
let expected_timelines = [
(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![43, 44]),
),
),
(
timeline2,
TimeColumn::new(
Some(true),
timeline2,
Arrow2PrimitiveArray::from_vec(vec![1000, 1001]),
),
),
];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[1]);
assert_eq!(expected_chunk, chunks[1]);
}
Ok(())
}
#[test]
fn different_datatypes() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let timeline1 = Timeline::new_temporal("log_time");
let timepoint1 = TimePoint::default().with(timeline1, 42);
let timepoint2 = TimePoint::default().with(timeline1, 43);
let timepoint3 = TimePoint::default().with(timeline1, 44);
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 =
MyPoint64::to_arrow2([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())]; let components3 = [(MyPoint::descriptor(), points3.clone())];
let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
let entity_path1: EntityPath = "a/b/c".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path1.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(2, chunks.len());
{
let expected_row_ids = vec![row1.row_id, row3.row_id];
let expected_timelines = [(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![42, 44]),
),
)];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
{
let expected_row_ids = vec![row2.row_id];
let expected_timelines = [(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![43]),
),
)];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[1]);
assert_eq!(expected_chunk, chunks[1]);
}
Ok(())
}
#[test]
fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 1000,
..ChunkBatcherConfig::NEVER
})?;
let timeline1 = Timeline::new_temporal("log_time");
let timeline2 = Timeline::new_temporal("frame_nr");
let timepoint1 = TimePoint::default()
.with(timeline2, 1000)
.with(timeline1, 42);
let timepoint2 = TimePoint::default()
.with(timeline2, 1001)
.with(timeline1, 43);
let timepoint3 = TimePoint::default()
.with(timeline2, 1002)
.with(timeline1, 44);
let timepoint4 = TimePoint::default()
.with(timeline2, 1003)
.with(timeline1, 45);
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let points4 =
MyPoint::to_arrow2([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())];
let components3 = [(MyPoint::descriptor(), points3.clone())];
let components4 = [(MyPoint::descriptor(), points4.clone())];
let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
let entity_path1: EntityPath = "a/b/c".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path1.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
batcher.push_row(entity_path1.clone(), row4.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(1, chunks.len());
{
let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
let expected_timelines = [
(
timeline1,
TimeColumn::new(
Some(false),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![45, 42, 43, 44]),
),
),
(
timeline2,
TimeColumn::new(
Some(false),
timeline2,
Arrow2PrimitiveArray::from_vec(vec![1003, 1000, 1001, 1002]),
),
),
];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(
&[&*points1, &*points2, &*points3, &*points4].map(Some),
)
.unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
Ok(())
}
#[test]
fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 3,
..ChunkBatcherConfig::NEVER
})?;
let timeline1 = Timeline::new_temporal("log_time");
let timeline2 = Timeline::new_temporal("frame_nr");
let timepoint1 = TimePoint::default()
.with(timeline2, 1000)
.with(timeline1, 42);
let timepoint2 = TimePoint::default()
.with(timeline2, 1001)
.with(timeline1, 43);
let timepoint3 = TimePoint::default()
.with(timeline2, 1002)
.with(timeline1, 44);
let timepoint4 = TimePoint::default()
.with(timeline2, 1003)
.with(timeline1, 45);
let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let points4 =
MyPoint::to_arrow2([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
let components1 = [(MyPoint::descriptor(), points1.clone())];
let components2 = [(MyPoint::descriptor(), points2.clone())];
let components3 = [(MyPoint::descriptor(), points3.clone())];
let components4 = [(MyPoint::descriptor(), points4.clone())];
let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
let entity_path1: EntityPath = "a/b/c".into();
batcher.push_row(entity_path1.clone(), row1.clone());
batcher.push_row(entity_path1.clone(), row2.clone());
batcher.push_row(entity_path1.clone(), row3.clone());
batcher.push_row(entity_path1.clone(), row4.clone());
let chunks_rx = batcher.chunks();
drop(batcher); let mut chunks = Vec::new();
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
chunks.push(chunk);
}
chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
eprintln!("Chunks:");
for chunk in &chunks {
eprintln!("{chunk}");
}
assert_eq!(2, chunks.len());
{
let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
let expected_timelines = [
(
timeline1,
TimeColumn::new(
Some(false),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![45, 42, 43]),
),
),
(
timeline2,
TimeColumn::new(
Some(false),
timeline2,
Arrow2PrimitiveArray::from_vec(vec![1003, 1000, 1001]),
),
),
];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some))
.unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[0]);
assert_eq!(expected_chunk, chunks[0]);
}
{
let expected_row_ids = vec![row4.row_id];
let expected_timelines = [
(
timeline1,
TimeColumn::new(
Some(true),
timeline1,
Arrow2PrimitiveArray::from_vec(vec![44]),
),
),
(
timeline2,
TimeColumn::new(
Some(true),
timeline2,
Arrow2PrimitiveArray::from_vec(vec![1002]),
),
),
];
let expected_components = [(
MyPoint::descriptor(),
crate::util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
entity_path1.clone(),
None,
&expected_row_ids,
expected_timelines.into_iter().collect(),
expected_components.into_iter().collect(),
)?;
eprintln!("Expected:\n{expected_chunk}");
eprintln!("Got:\n{}", chunks[1]);
assert_eq!(expected_chunk, chunks[1]);
}
Ok(())
}
}