re_dataframe_ui/
datafusion_adapter.rsuse std::sync::Arc;
use datafusion::common::{DataFusionError, TableReference};
use datafusion::functions::expr_fn::concat;
use datafusion::logical_expr::{col, lit};
use datafusion::prelude::SessionContext;
use parking_lot::Mutex;
use re_sorbet::{BatchType, SorbetBatch};
use re_viewer_context::AsyncRuntimeHandle;
use crate::table_blueprint::TableBlueprint;
use crate::RequestedObject;
#[derive(Clone)]
struct DataFusionQuery {
session_ctx: Arc<SessionContext>,
table_ref: TableReference,
blueprint: TableBlueprint,
}
impl DataFusionQuery {
fn new(
session_ctx: Arc<SessionContext>,
table_ref: TableReference,
blueprint: TableBlueprint,
) -> Self {
Self {
session_ctx,
table_ref,
blueprint,
}
}
async fn execute(self) -> Result<Vec<SorbetBatch>, DataFusionError> {
let mut dataframe = self.session_ctx.table(self.table_ref).await?;
let TableBlueprint {
sort_by,
partition_links,
} = &self.blueprint;
if let Some(partition_links) = partition_links {
let uri = format!(
"{}/dataset/{}/data?partition_id=",
partition_links.origin, partition_links.dataset_id
);
dataframe = dataframe.with_column(
&partition_links.column_name,
concat(vec![
lit(uri),
col(&partition_links.partition_id_column_name),
]),
)?;
}
if let Some(sort_by) = sort_by {
dataframe = dataframe.sort(vec![
col(&sort_by.column).sort(sort_by.direction.is_ascending(), true)
])?;
}
let record_batches = dataframe.collect().await?;
let sorbet_batches = record_batches
.iter()
.map(|record_batch| {
SorbetBatch::try_from_record_batch(record_batch, BatchType::Dataframe)
})
.collect::<Result<Vec<_>, _>>()
.map_err(|err| DataFusionError::External(err.into()))?;
Ok(sorbet_batches)
}
}
impl PartialEq for DataFusionQuery {
fn eq(&self, other: &Self) -> bool {
let Self {
session_ctx,
table_ref,
blueprint,
} = self;
Arc::ptr_eq(session_ctx, &other.session_ctx)
&& table_ref == &other.table_ref
&& blueprint == &other.blueprint
}
}
type RequestedSorbetBatches = RequestedObject<Result<Vec<SorbetBatch>, DataFusionError>>;
#[derive(Clone)]
pub struct DataFusionAdapter {
id: egui::Id,
query: DataFusionQuery,
pub last_sorbet_batches: Option<Vec<SorbetBatch>>,
pub requested_sorbet_batches: Arc<Mutex<RequestedSorbetBatches>>,
}
impl DataFusionAdapter {
pub fn clear_state(egui_ctx: &egui::Context, id: egui::Id) {
egui_ctx.data_mut(|data| {
data.remove::<Self>(id);
});
}
pub fn get(
runtime: &AsyncRuntimeHandle,
ui: &egui::Ui,
session_ctx: &Arc<SessionContext>,
table_ref: TableReference,
id: egui::Id,
initial_blueprint: TableBlueprint,
) -> Self {
let adapter = ui.data(|data| data.get_temp::<Self>(id));
let adapter = adapter.unwrap_or_else(|| {
let query = DataFusionQuery::new(Arc::clone(session_ctx), table_ref, initial_blueprint);
let table_state = Self {
id,
requested_sorbet_batches: Arc::new(Mutex::new(RequestedObject::new_with_repaint(
runtime,
ui.ctx().clone(),
query.clone().execute(),
))),
query,
last_sorbet_batches: None,
};
ui.data_mut(|data| {
data.insert_temp(id, table_state.clone());
});
table_state
});
adapter.requested_sorbet_batches.lock().on_frame_start();
adapter
}
pub fn blueprint(&self) -> &TableBlueprint {
&self.query.blueprint
}
pub fn update_query(
mut self,
runtime: &AsyncRuntimeHandle,
ui: &egui::Ui,
new_blueprint: TableBlueprint,
) {
if self.query.blueprint != new_blueprint {
self.query.blueprint = new_blueprint;
let mut dataframe = self.requested_sorbet_batches.lock();
if let Some(Ok(sorbet_batches)) = dataframe.try_as_ref() {
self.last_sorbet_batches = Some(sorbet_batches.clone());
}
*dataframe = RequestedObject::new_with_repaint(
runtime,
ui.ctx().clone(),
self.query.clone().execute(),
);
}
ui.data_mut(|data| {
data.insert_temp(self.id, self);
});
}
}