1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
use arrow2::array::Array as Arrow2Array;
use re_log_types::{TimeInt, Timeline};
use re_types_core::ComponentName;
use crate::{Chunk, RowId};
// ---
/// A query at a given time, for a given timeline.
///
/// Get the latest version of the data available at this time.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct LatestAtQuery {
timeline: Timeline,
at: TimeInt,
}
impl std::fmt::Debug for LatestAtQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"<latest-at {} on {:?}>",
self.timeline.typ().format_utc(self.at),
self.timeline.name(),
))
}
}
impl LatestAtQuery {
/// The returned query is guaranteed to never include [`TimeInt::STATIC`].
#[inline]
pub fn new(timeline: Timeline, at: impl TryInto<TimeInt>) -> Self {
let at = at.try_into().unwrap_or(TimeInt::MIN);
Self { timeline, at }
}
#[inline]
pub const fn latest(timeline: Timeline) -> Self {
Self {
timeline,
at: TimeInt::MAX,
}
}
#[inline]
pub fn timeline(&self) -> Timeline {
self.timeline
}
#[inline]
pub fn at(&self) -> TimeInt {
self.at
}
}
// ---
impl Chunk {
/// Runs a [`LatestAtQuery`] filter on a [`Chunk`].
///
/// This behaves as a row-based filter: the result is a new [`Chunk`] that is vertically
/// sliced to only contain the row relevant for the specified `query`.
///
/// The resulting [`Chunk`] is guaranteed to contain all the same columns has the queried
/// chunk: there is no horizontal slicing going on.
///
/// An empty [`Chunk`] (i.e. 0 rows, but N columns) is returned if the `query` yields nothing.
///
/// Because the resulting chunk doesn't discard any column information, you can find extra relevant
/// information by inspecting the data, for examples timestamps on other timelines.
/// See [`Self::timeline_sliced`] and [`Self::component_sliced`] if you do want to filter this
/// extra data.
pub fn latest_at(&self, query: &LatestAtQuery, component_name: ComponentName) -> Self {
if self.is_empty() {
return self.clone();
}
re_tracing::profile_function!(format!("{query:?}"));
let Some(component_list_array) = self.get_first_component(&component_name) else {
return self.emptied();
};
let mut index = None;
let is_static = self.is_static();
let is_sorted_by_row_id = self.is_sorted();
if is_static {
if is_sorted_by_row_id {
// Static, row-sorted chunk
for i in (0..self.num_rows()).rev() {
if !component_list_array.is_valid(i) {
continue;
}
index = Some(i);
break;
}
} else {
// Static, row-unsorted chunk
let mut closest_row_id = RowId::ZERO;
for (i, row_id) in self.row_ids().enumerate() {
if !component_list_array.is_valid(i) {
continue;
}
let is_closer_row_id = row_id > closest_row_id;
if is_closer_row_id {
closest_row_id = row_id;
index = Some(i);
}
}
}
} else {
let Some(time_column) = self.timelines.get(&query.timeline()) else {
return self.emptied();
};
let is_sorted_by_time = time_column.is_sorted();
let times = time_column.times_raw();
if is_sorted_by_time {
// Temporal, row-sorted, time-sorted chunk
let i = times
.partition_point(|&time| time <= query.at().as_i64())
.saturating_sub(1);
for i in (0..=i).rev() {
if !component_list_array.is_valid(i) {
continue;
}
index = Some(i);
break;
}
} else {
// Temporal, unsorted chunk
let mut closest_data_time = TimeInt::MIN;
let mut closest_row_id = RowId::ZERO;
for (i, row_id) in self.row_ids().enumerate() {
if !component_list_array.is_valid(i) {
continue;
}
let data_time = TimeInt::new_temporal(times[i]);
let is_closer_time = data_time > closest_data_time && data_time <= query.at();
let is_same_time_but_closer_row_id =
data_time == closest_data_time && row_id > closest_row_id;
if is_closer_time || is_same_time_but_closer_row_id {
closest_data_time = data_time;
closest_row_id = row_id;
index = Some(i);
}
}
}
}
index.map_or_else(|| self.emptied(), |i| self.row_sliced(i, 1))
}
}