re_sorbet/
sorbet_columns.rsuse arrow::datatypes::{Field as ArrowField, Fields as ArrowFields};
use itertools::Itertools as _;
use nohash_hasher::IntSet;
use re_log_types::{EntityPath, TimelineName};
use crate::{
ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ColumnSelector, ComponentColumnDescriptor,
ComponentColumnSelector, IndexColumnDescriptor, RowIdColumnDescriptor, SorbetError,
TimeColumnSelector,
};
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[expect(clippy::enum_variant_names)]
pub enum ColumnSelectorResolveError {
#[error("Column for component '{0}' not found")]
ComponentNotFound(String),
#[error(
"Multiple columns were found for component '{0}'. Consider using a more specific selector."
)]
MultipleComponentColumnFound(String),
#[error("Index column for timeline '{0}' not found")]
TimelineNotFound(TimelineName),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SorbetColumnDescriptors {
pub row_id: Option<RowIdColumnDescriptor>,
pub indices: Vec<IndexColumnDescriptor>,
pub components: Vec<ComponentColumnDescriptor>,
}
impl SorbetColumnDescriptors {
#[inline]
#[track_caller]
pub fn sanity_check(&self) {
for component in &self.components {
component.sanity_check();
}
}
pub fn num_columns(&self) -> usize {
let Self {
row_id,
indices,
components,
} = self;
row_id.is_some() as usize + indices.len() + components.len()
}
pub fn entity_paths(&self) -> IntSet<EntityPath> {
self.components
.iter()
.map(|col| col.entity_path.clone())
.collect()
}
pub fn descriptors(&self) -> impl Iterator<Item = ColumnDescriptorRef<'_>> + '_ {
self.row_id
.iter()
.map(ColumnDescriptorRef::from)
.chain(self.indices.iter().map(ColumnDescriptorRef::from))
.chain(self.components.iter().map(ColumnDescriptorRef::from))
}
pub fn indices_and_components(&self) -> Vec<ColumnDescriptor> {
itertools::chain!(
self.indices.iter().cloned().map(ColumnDescriptor::Time),
self.components
.iter()
.cloned()
.map(ColumnDescriptor::Component),
)
.collect()
}
pub fn get_index_or_component(&self, index_ignoring_row_id: usize) -> Option<ColumnDescriptor> {
if index_ignoring_row_id < self.indices.len() {
Some(ColumnDescriptor::Time(
self.indices[index_ignoring_row_id].clone(),
))
} else {
self.components
.get(index_ignoring_row_id - self.indices.len())
.cloned()
.map(ColumnDescriptor::Component)
}
}
pub fn resolve_selector(
&self,
column_selector: &ColumnSelector,
) -> Result<ColumnDescriptorRef<'_>, ColumnSelectorResolveError> {
match column_selector {
ColumnSelector::Time(selector) => self
.resolve_index_column_selector(selector)
.map(ColumnDescriptorRef::Time),
ColumnSelector::Component(selector) => self
.resolve_component_column_selector(selector)
.map(ColumnDescriptorRef::Component),
}
}
pub fn resolve_index_column_selector(
&self,
index_column_selector: &TimeColumnSelector,
) -> Result<&IndexColumnDescriptor, ColumnSelectorResolveError> {
self.indices
.iter()
.find(|column| column.timeline_name() == index_column_selector.timeline)
.ok_or(ColumnSelectorResolveError::TimelineNotFound(
index_column_selector.timeline,
))
}
pub fn resolve_component_column_selector(
&self,
component_column_selector: &ComponentColumnSelector,
) -> Result<&ComponentColumnDescriptor, ColumnSelectorResolveError> {
let ComponentColumnSelector {
entity_path,
component_name,
} = component_column_selector;
let exact_match = self.components.iter().find(|column| {
column.component_name.as_str() == component_name && &column.entity_path == entity_path
});
if let Some(exact_match) = exact_match {
return Ok(exact_match);
}
let mut partial_match = self.components.iter().filter(|column| {
column.component_name.matches(component_name) && &column.entity_path == entity_path
});
let first_match = partial_match.next();
if partial_match.next().is_none() {
first_match.ok_or(ColumnSelectorResolveError::ComponentNotFound(
component_name.clone(),
))
} else {
Err(ColumnSelectorResolveError::MultipleComponentColumnFound(
component_name.clone(),
))
}
}
pub fn arrow_fields(&self, batch_type: crate::BatchType) -> Vec<ArrowField> {
let Self {
row_id,
indices,
components,
} = self;
let mut fields: Vec<ArrowField> = Vec::with_capacity(self.num_columns());
if let Some(row_id) = row_id {
fields.push(row_id.to_arrow_field());
}
fields.extend(indices.iter().map(|column| column.to_arrow_field()));
fields.extend(
components
.iter()
.map(|column| column.to_arrow_field(batch_type)),
);
fields
}
#[must_use]
#[inline]
pub fn filter_components(mut self, keep: impl Fn(&ComponentColumnDescriptor) -> bool) -> Self {
self.components.retain(keep);
self
}
}
impl SorbetColumnDescriptors {
pub fn try_from_arrow_fields(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, SorbetError> {
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();
for field in fields {
let field = field.as_ref();
let column_kind = ColumnKind::try_from(field)?;
match column_kind {
ColumnKind::RowId => {
if indices.is_empty() && components.is_empty() {
row_ids.push(RowIdColumnDescriptor::try_from(field)?);
} else {
let err = format!(
"RowId column must be the first column; but the columns were: {:?}",
fields.iter().map(|f| f.name()).collect_vec()
);
return Err(SorbetError::custom(err));
}
}
ColumnKind::Index => {
if components.is_empty() {
indices.push(IndexColumnDescriptor::try_from(field)?);
} else {
return Err(SorbetError::custom(
"Index columns must come before any data columns",
));
}
}
ColumnKind::Component => {
components.push(ComponentColumnDescriptor::from_arrow_field(
chunk_entity_path,
field,
));
}
}
}
if row_ids.len() > 1 {
return Err(SorbetError::custom(
"Multiple row_id columns are not supported",
));
}
Ok(Self {
row_id: row_ids.pop(),
indices,
components,
})
}
pub fn try_from_arrow_fields_forgiving(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, SorbetError> {
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();
for field in fields {
let field = field.as_ref();
let column_kind = ColumnKind::try_from(field)?;
match column_kind {
ColumnKind::RowId => {
row_ids.push(RowIdColumnDescriptor::try_from(field)?);
}
ColumnKind::Index => {
indices.push(IndexColumnDescriptor::try_from(field)?);
}
ColumnKind::Component => {
components.push(ComponentColumnDescriptor::from_arrow_field(
chunk_entity_path,
field,
));
}
}
}
Ok(Self {
row_id: row_ids.pop(),
indices,
components,
})
}
}