use arrow::{
array::{
Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray,
RecordBatch as ArrowRecordBatch, StructArray as ArrowStructArray,
},
datatypes::{
DataType as ArrowDatatype, Field as ArrowField, Fields as ArrowFields,
Schema as ArrowSchema, TimeUnit as ArrowTimeUnit,
},
};
use itertools::Itertools;
use nohash_hasher::IntMap;
use tap::Tap as _;
use re_arrow_util::{arrow_util::into_arrow_ref, ArrowArrayDowncastRef as _};
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, Timeline};
use re_types_core::{
arrow_helpers::as_array_ref, Component as _, ComponentDescriptor, Loggable as _,
};
use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, RowId, TimeColumn};
pub type ArrowMetadata = std::collections::HashMap<String, String>;
#[derive(Debug, Clone)]
pub struct TransportChunk {
batch: ArrowRecordBatch,
}
impl std::fmt::Display for TransportChunk {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
re_format_arrow::format_record_batch_with_width(self, f.width()).fmt(f)
}
}
impl AsRef<ArrowRecordBatch> for TransportChunk {
#[inline]
fn as_ref(&self) -> &ArrowRecordBatch {
&self.batch
}
}
impl std::ops::Deref for TransportChunk {
type Target = ArrowRecordBatch;
#[inline]
fn deref(&self) -> &ArrowRecordBatch {
&self.batch
}
}
impl From<ArrowRecordBatch> for TransportChunk {
#[inline]
fn from(batch: ArrowRecordBatch) -> Self {
Self { batch }
}
}
impl From<TransportChunk> for ArrowRecordBatch {
#[inline]
fn from(chunk: TransportChunk) -> Self {
chunk.batch
}
}
impl TransportChunk {
pub const CHUNK_METADATA_KEY_ID: &'static str = "rerun.id";
pub const CHUNK_METADATA_KEY_ENTITY_PATH: &'static str = "rerun.entity_path";
pub const CHUNK_METADATA_KEY_HEAP_SIZE_BYTES: &'static str = "rerun.heap_size_bytes";
pub const CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID: &'static str = "rerun.is_sorted";
pub const FIELD_METADATA_KEY_KIND: &'static str = "rerun.kind";
pub const FIELD_METADATA_VALUE_KIND_TIME: &'static str = "time";
pub const FIELD_METADATA_VALUE_KIND_CONTROL: &'static str = "control";
pub const FIELD_METADATA_VALUE_KIND_DATA: &'static str = "data";
pub const FIELD_METADATA_KEY_ARCHETYPE_NAME: &'static str = "rerun.archetype_name";
pub const FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME: &'static str = "rerun.archetype_field_name";
pub const FIELD_METADATA_MARKER_IS_SORTED_BY_TIME: &'static str =
Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID;
#[inline]
pub fn chunk_metadata_id(id: ChunkId) -> ArrowMetadata {
[
(
Self::CHUNK_METADATA_KEY_ID.to_owned(),
format!("{:X}", id.as_u128()),
), ]
.into()
}
#[inline]
pub fn chunk_metadata_heap_size_bytes(heap_size_bytes: u64) -> ArrowMetadata {
[
(
Self::CHUNK_METADATA_KEY_HEAP_SIZE_BYTES.to_owned(),
heap_size_bytes.to_string(),
), ]
.into()
}
#[inline]
pub fn chunk_metadata_entity_path(entity_path: &EntityPath) -> ArrowMetadata {
[
(
Self::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(),
entity_path.to_string(),
), ]
.into()
}
#[inline]
pub fn chunk_metadata_is_sorted() -> ArrowMetadata {
[
(
Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID.to_owned(),
String::new(),
), ]
.into()
}
#[inline]
pub fn field_metadata_time_column() -> ArrowMetadata {
[
(
Self::FIELD_METADATA_KEY_KIND.to_owned(),
Self::FIELD_METADATA_VALUE_KIND_TIME.to_owned(),
), ]
.into()
}
#[inline]
pub fn field_metadata_control_column() -> ArrowMetadata {
[
(
Self::FIELD_METADATA_KEY_KIND.to_owned(),
Self::FIELD_METADATA_VALUE_KIND_CONTROL.to_owned(),
), ]
.into()
}
#[inline]
pub fn field_metadata_data_column() -> ArrowMetadata {
[
(
Self::FIELD_METADATA_KEY_KIND.to_owned(),
Self::FIELD_METADATA_VALUE_KIND_DATA.to_owned(),
), ]
.into()
}
#[inline]
pub fn field_metadata_is_sorted() -> ArrowMetadata {
[
(
Self::FIELD_METADATA_MARKER_IS_SORTED_BY_TIME.to_owned(),
String::new(),
), ]
.into()
}
#[inline]
pub fn field_metadata_component_descriptor(
component_desc: &ComponentDescriptor,
) -> ArrowMetadata {
component_desc
.archetype_name
.iter()
.copied()
.map(|archetype_name| {
(
Self::FIELD_METADATA_KEY_ARCHETYPE_NAME.to_owned(),
archetype_name.to_string(),
)
})
.chain(component_desc.archetype_field_name.iter().copied().map(
|archetype_field_name| {
(
Self::FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME.to_owned(),
archetype_field_name.to_string(),
)
},
))
.collect()
}
#[inline]
pub fn component_descriptor_from_field(field: &ArrowField) -> ComponentDescriptor {
ComponentDescriptor {
archetype_name: field
.metadata()
.get(Self::FIELD_METADATA_KEY_ARCHETYPE_NAME)
.cloned()
.map(Into::into),
component_name: field.name().clone().into(),
archetype_field_name: field
.metadata()
.get(Self::FIELD_METADATA_KEY_ARCHETYPE_FIELD_NAME)
.cloned()
.map(Into::into),
}
}
}
impl TransportChunk {
#[inline]
pub fn id(&self) -> ChunkResult<ChunkId> {
if let Some(id) = self.metadata().get(Self::CHUNK_METADATA_KEY_ID) {
let id = u128::from_str_radix(id, 16).map_err(|err| ChunkError::Malformed {
reason: format!("cannot deserialize chunk id: {err}"),
})?;
Ok(ChunkId::from_u128(id))
} else {
Err(crate::ChunkError::Malformed {
reason: format!("chunk id missing from metadata ({:?})", self.metadata()),
})
}
}
#[inline]
pub fn entity_path(&self) -> ChunkResult<EntityPath> {
match self
.schema_ref()
.metadata
.get(Self::CHUNK_METADATA_KEY_ENTITY_PATH)
{
Some(entity_path) => Ok(EntityPath::parse_forgiving(entity_path)),
None => Err(crate::ChunkError::Malformed {
reason: format!(
"entity path missing from metadata ({:?})",
self.schema_ref().metadata
),
}),
}
}
#[inline]
pub fn heap_size_bytes(&self) -> Option<u64> {
self.metadata()
.get(Self::CHUNK_METADATA_KEY_HEAP_SIZE_BYTES)
.and_then(|s| s.parse::<u64>().ok())
}
#[inline]
pub fn fields(&self) -> &ArrowFields {
&self.schema_ref().fields
}
pub fn metadata(&self) -> &std::collections::HashMap<String, String> {
&self.batch.schema_ref().metadata
}
#[inline]
pub fn is_sorted(&self) -> bool {
self.metadata()
.contains_key(Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID)
}
#[inline]
fn columns_of_kind<'a>(
&'a self,
kind: &'a str,
) -> impl Iterator<Item = (&'a ArrowField, &'a ArrowArrayRef)> + 'a {
self.fields().iter().enumerate().filter_map(|(i, field)| {
let actual_kind = field.metadata().get(Self::FIELD_METADATA_KEY_KIND);
(actual_kind.map(|s| s.as_str()) == Some(kind))
.then(|| {
self.batch
.columns()
.get(i)
.map(|column| (field.as_ref(), column))
})
.flatten()
})
}
#[inline]
pub fn controls(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> {
self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_CONTROL)
}
#[inline]
pub fn components(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> {
self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_DATA)
}
#[inline]
pub fn timelines(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> {
self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_TIME)
}
#[inline]
pub fn num_controls(&self) -> usize {
self.controls().count()
}
#[inline]
pub fn num_timelines(&self) -> usize {
self.timelines().count()
}
#[inline]
pub fn num_components(&self) -> usize {
self.components().count()
}
}
impl Chunk {
pub fn to_record_batch(&self) -> ChunkResult<ArrowRecordBatch> {
self.sanity_check()?;
re_tracing::profile_function!(format!(
"num_columns={} num_rows={}",
self.num_columns(),
self.num_rows()
));
let Self {
id,
entity_path,
heap_size_bytes: _, is_sorted,
row_ids,
timelines,
components,
} = self;
let mut fields: Vec<ArrowField> = vec![];
let mut metadata = std::collections::HashMap::default();
let mut columns: Vec<ArrowArrayRef> =
Vec::with_capacity(1 + timelines.len() + components.len());
{
re_tracing::profile_scope!("metadata");
metadata.extend(TransportChunk::chunk_metadata_id(*id));
metadata.extend(TransportChunk::chunk_metadata_entity_path(entity_path));
metadata.extend(TransportChunk::chunk_metadata_heap_size_bytes(
self.heap_size_bytes(),
));
if *is_sorted {
metadata.extend(TransportChunk::chunk_metadata_is_sorted());
}
}
{
re_tracing::profile_scope!("row ids");
fields.push(
ArrowField::new(
RowId::descriptor().to_string(),
RowId::arrow_datatype().clone(),
false,
)
.with_metadata(
TransportChunk::field_metadata_control_column().tap_mut(|metadata| {
metadata.insert(
"ARROW:extension:name".to_owned(),
re_tuid::Tuid::ARROW_EXTENSION_NAME.to_owned(),
);
}),
),
);
columns.push(into_arrow_ref(row_ids.clone()));
}
{
re_tracing::profile_scope!("timelines");
let mut timelines = timelines
.iter()
.map(|(timeline, info)| {
let TimeColumn {
timeline: _,
times: _,
is_sorted,
time_range: _,
} = info;
let nullable = false; let field =
ArrowField::new(timeline.name().to_string(), timeline.datatype(), nullable)
.with_metadata({
let mut metadata = TransportChunk::field_metadata_time_column();
if *is_sorted {
metadata.extend(TransportChunk::field_metadata_is_sorted());
}
metadata
});
let times = info.times_array();
(field, times)
})
.collect_vec();
timelines
.sort_by(|(field1, _times1), (field2, _times2)| field1.name().cmp(field2.name()));
for (field, times) in timelines {
fields.push(field);
columns.push(times);
}
}
{
re_tracing::profile_scope!("components");
let mut components = components
.values()
.flat_map(|per_desc| per_desc.iter())
.map(|(component_desc, list_array)| {
let list_array = ArrowListArray::from(list_array.clone());
let field = ArrowField::new(
component_desc.component_name.to_string(),
list_array.data_type().clone(),
true,
)
.with_metadata({
let mut metadata = TransportChunk::field_metadata_data_column();
metadata.extend(TransportChunk::field_metadata_component_descriptor(
component_desc,
));
metadata
});
(field, as_array_ref(list_array))
})
.collect_vec();
components
.sort_by(|(field1, _data1), (field2, _data2)| field1.name().cmp(field2.name()));
for (field, data) in components {
fields.push(field);
columns.push(data);
}
}
let schema = ArrowSchema::new_with_metadata(fields, metadata);
Ok(ArrowRecordBatch::try_new(schema.into(), columns)?)
}
pub fn to_transport(&self) -> ChunkResult<TransportChunk> {
let record_batch = self.to_record_batch()?;
Ok(TransportChunk::from(record_batch))
}
pub fn from_record_batch(batch: ArrowRecordBatch) -> ChunkResult<Self> {
Self::from_transport(&TransportChunk::from(batch))
}
pub fn from_transport(transport: &TransportChunk) -> ChunkResult<Self> {
re_tracing::profile_function!(format!(
"num_columns={} num_rows={}",
transport.num_columns(),
transport.num_rows()
));
let (id, entity_path, is_sorted) = {
re_tracing::profile_scope!("metadata");
(
transport.id()?,
transport.entity_path()?,
transport.is_sorted(),
)
};
let row_ids = {
re_tracing::profile_scope!("row ids");
let Some(row_ids) = transport.controls().find_map(|(field, column)| {
(field.name() == RowId::descriptor().component_name.as_str()).then_some(column)
}) else {
return Err(ChunkError::Malformed {
reason: format!("missing row_id column ({:?})", transport.schema_ref()),
});
};
ArrowArrayRef::from(row_ids.clone())
.downcast_array_ref::<ArrowStructArray>()
.ok_or_else(|| ChunkError::Malformed {
reason: format!(
"RowId data has the wrong datatype: expected {:?} but got {:?} instead",
RowId::arrow_datatype(),
*row_ids.data_type(),
),
})?
.clone()
};
let timelines = {
re_tracing::profile_scope!("timelines");
let mut timelines = IntMap::default();
for (field, column) in transport.timelines() {
let timeline = match column.data_type() {
ArrowDatatype::Int64 => Timeline::new_sequence(field.name().as_str()),
ArrowDatatype::Timestamp(ArrowTimeUnit::Nanosecond, None) => {
Timeline::new_temporal(field.name().as_str())
}
_ => {
return Err(ChunkError::Malformed {
reason: format!(
"time column '{}' is not deserializable ({:?})",
field.name(),
column.data_type()
),
});
}
};
let times = TimeColumn::read_array(&ArrowArrayRef::from(column.clone())).map_err(
|err| ChunkError::Malformed {
reason: format!("Bad time column '{}': {err}", field.name()),
},
)?;
let is_sorted = field
.metadata()
.contains_key(TransportChunk::FIELD_METADATA_MARKER_IS_SORTED_BY_TIME);
let time_column = TimeColumn::new(is_sorted.then_some(true), timeline, times);
if timelines.insert(timeline, time_column).is_some() {
return Err(ChunkError::Malformed {
reason: format!(
"time column '{}' was specified more than once",
field.name(),
),
});
}
}
timelines
};
let components = {
let mut components = ChunkComponents::default();
for (field, column) in transport.components() {
let column = column
.downcast_array_ref::<ArrowListArray>()
.ok_or_else(|| ChunkError::Malformed {
reason: format!(
"The outer array in a chunked component batch must be a sparse list, got {:?}",
column.data_type(),
),
})?;
let component_desc = TransportChunk::component_descriptor_from_field(field);
if components
.insert_descriptor(component_desc, column.clone())
.is_some()
{
return Err(ChunkError::Malformed {
reason: format!(
"component column '{}' was specified more than once",
field.name(),
),
});
}
}
components
};
let mut res = Self::new(
id,
entity_path,
is_sorted.then_some(true),
row_ids,
timelines,
components,
)?;
if let Some(heap_size_bytes) = transport.heap_size_bytes() {
res.heap_size_bytes = heap_size_bytes.into();
}
Ok(res)
}
}
impl Chunk {
#[inline]
pub fn from_arrow_msg(msg: &re_log_types::ArrowMsg) -> ChunkResult<Self> {
let re_log_types::ArrowMsg {
chunk_id: _,
timepoint_max: _,
batch,
on_release: _,
} = msg;
Self::from_record_batch(batch.clone())
}
#[inline]
pub fn to_arrow_msg(&self) -> ChunkResult<re_log_types::ArrowMsg> {
re_tracing::profile_function!();
self.sanity_check()?;
let transport = self.to_transport()?;
Ok(re_log_types::ArrowMsg {
chunk_id: re_tuid::Tuid::from_u128(self.id().as_u128()),
timepoint_max: self.timepoint_max(),
batch: transport.into(),
on_release: None,
})
}
}
#[cfg(test)]
mod tests {
use nohash_hasher::IntMap;
use re_arrow_util::arrow_util;
use re_log_types::{
example_components::{MyColor, MyPoint},
Timeline,
};
use super::*;
#[test]
fn roundtrip() -> anyhow::Result<()> {
let entity_path = EntityPath::parse_forgiving("a/b/c");
let timeline1 = Timeline::new_temporal("log_time");
let timelines1 = std::iter::once((
timeline1,
TimeColumn::new(Some(true), timeline1, vec![42, 43, 44, 45].into()),
))
.collect();
let timelines2 = IntMap::default(); let points1 = MyPoint::to_arrow([
MyPoint::new(1.0, 2.0),
MyPoint::new(3.0, 4.0),
MyPoint::new(5.0, 6.0),
])?;
let points2 = None;
let points3 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0)])?;
let points4 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
let colors1 = MyColor::to_arrow([
MyColor::from_rgb(1, 2, 3),
MyColor::from_rgb(4, 5, 6),
MyColor::from_rgb(7, 8, 9),
])?;
let colors2 = MyColor::to_arrow([MyColor::from_rgb(10, 20, 30)])?;
let colors3 = None;
let colors4 = None;
let components = [
(MyPoint::descriptor(), {
let list_array = arrow_util::arrays_to_list_array_opt(&[
Some(&*points1),
points2,
Some(&*points3),
Some(&*points4),
])
.unwrap();
assert_eq!(4, list_array.len());
list_array
}),
(MyPoint::descriptor(), {
let list_array = arrow_util::arrays_to_list_array_opt(&[
Some(&*colors1),
Some(&*colors2),
colors3,
colors4,
])
.unwrap();
assert_eq!(4, list_array.len());
list_array
}),
];
let row_ids = vec![RowId::new(), RowId::new(), RowId::new(), RowId::new()];
for timelines in [timelines1, timelines2] {
let chunk_original = Chunk::from_native_row_ids(
ChunkId::new(),
entity_path.clone(),
None,
&row_ids,
timelines.clone(),
components.clone().into_iter().collect(),
)?;
let mut chunk_before = chunk_original.clone();
for _ in 0..3 {
let chunk_in_transport = chunk_before.to_transport()?;
let chunk_after = Chunk::from_record_batch(chunk_in_transport.clone().into())?;
assert_eq!(
chunk_in_transport.entity_path()?,
*chunk_original.entity_path()
);
assert_eq!(
chunk_in_transport.entity_path()?,
*chunk_after.entity_path()
);
assert_eq!(
chunk_in_transport.heap_size_bytes(),
Some(chunk_after.heap_size_bytes()),
);
assert_eq!(
chunk_in_transport.num_columns(),
chunk_original.num_columns()
);
assert_eq!(chunk_in_transport.num_columns(), chunk_after.num_columns());
assert_eq!(chunk_in_transport.num_rows(), chunk_original.num_rows());
assert_eq!(chunk_in_transport.num_rows(), chunk_after.num_rows());
assert_eq!(
chunk_in_transport.num_controls(),
chunk_original.num_controls()
);
assert_eq!(
chunk_in_transport.num_controls(),
chunk_after.num_controls()
);
assert_eq!(
chunk_in_transport.num_timelines(),
chunk_original.num_timelines()
);
assert_eq!(
chunk_in_transport.num_timelines(),
chunk_after.num_timelines()
);
assert_eq!(
chunk_in_transport.num_components(),
chunk_original.num_components()
);
assert_eq!(
chunk_in_transport.num_components(),
chunk_after.num_components()
);
eprintln!("{chunk_before}");
eprintln!("{chunk_in_transport}");
eprintln!("{chunk_after}");
assert_eq!(chunk_before, chunk_after);
assert!(chunk_before.are_equal(&chunk_after));
chunk_before = chunk_after;
}
}
Ok(())
}
}