re_sorbet/
sorbet_columns.rsuse arrow::datatypes::{Field as ArrowField, Fields as ArrowFields};
use nohash_hasher::IntSet;
use re_log_types::EntityPath;
use crate::{
ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ComponentColumnDescriptor,
IndexColumnDescriptor, RowIdColumnDescriptor, SorbetError,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SorbetColumnDescriptors {
pub row_id: Option<RowIdColumnDescriptor>,
pub indices: Vec<IndexColumnDescriptor>,
pub components: Vec<ComponentColumnDescriptor>,
}
impl SorbetColumnDescriptors {
#[inline]
#[track_caller]
pub fn sanity_check(&self) {
for component in &self.components {
component.sanity_check();
}
}
pub fn num_columns(&self) -> usize {
let Self {
row_id,
indices,
components,
} = self;
row_id.is_some() as usize + indices.len() + components.len()
}
pub fn entity_paths(&self) -> IntSet<EntityPath> {
self.components
.iter()
.map(|col| col.entity_path.clone())
.collect()
}
pub fn descriptors(&self) -> impl Iterator<Item = ColumnDescriptorRef<'_>> + '_ {
self.row_id
.iter()
.map(ColumnDescriptorRef::from)
.chain(self.indices.iter().map(ColumnDescriptorRef::from))
.chain(self.components.iter().map(ColumnDescriptorRef::from))
}
pub fn indices_and_components(&self) -> Vec<ColumnDescriptor> {
itertools::chain!(
self.indices.iter().cloned().map(ColumnDescriptor::Time),
self.components
.iter()
.cloned()
.map(ColumnDescriptor::Component),
)
.collect()
}
pub fn get_index_or_component(&self, index_ignoring_row_id: usize) -> Option<ColumnDescriptor> {
if index_ignoring_row_id < self.indices.len() {
Some(ColumnDescriptor::Time(
self.indices[index_ignoring_row_id].clone(),
))
} else {
self.components
.get(index_ignoring_row_id - self.indices.len())
.cloned()
.map(ColumnDescriptor::Component)
}
}
pub fn arrow_fields(&self, batch_type: crate::BatchType) -> Vec<ArrowField> {
let Self {
row_id,
indices,
components,
} = self;
let mut fields: Vec<ArrowField> = Vec::with_capacity(self.num_columns());
if let Some(row_id) = row_id {
fields.push(row_id.to_arrow_field());
}
fields.extend(indices.iter().map(|column| column.to_arrow_field()));
fields.extend(
components
.iter()
.map(|column| column.to_arrow_field(batch_type)),
);
fields
}
#[must_use]
#[inline]
pub fn filter_components(mut self, keep: impl Fn(&ComponentColumnDescriptor) -> bool) -> Self {
self.components.retain(keep);
self
}
}
impl SorbetColumnDescriptors {
pub fn try_from_arrow_fields(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, SorbetError> {
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();
for field in fields {
let field = field.as_ref();
let column_kind = ColumnKind::try_from(field)?;
match column_kind {
ColumnKind::RowId => {
if indices.is_empty() && components.is_empty() {
row_ids.push(RowIdColumnDescriptor::try_from(field)?);
} else {
return Err(SorbetError::custom("RowId column must be the first column"));
}
}
ColumnKind::Index => {
if components.is_empty() {
indices.push(IndexColumnDescriptor::try_from(field)?);
} else {
return Err(SorbetError::custom(
"Index columns must come before any data columns",
));
}
}
ColumnKind::Component => {
components.push(ComponentColumnDescriptor::from_arrow_field(
chunk_entity_path,
field,
));
}
}
}
if row_ids.len() > 1 {
return Err(SorbetError::custom(
"Multiple row_id columns are not supported",
));
}
Ok(Self {
row_id: row_ids.pop(),
indices,
components,
})
}
}