re_redap_browser/
tables_session_context.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! Helper to maintain a [`SessionContext`] with the tables of a remote server.

use std::sync::Arc;

use datafusion::common::DataFusionError;
use datafusion::prelude::SessionContext;

use re_dataframe_ui::{DataFusionTableWidget, RequestedObject};
use re_datafusion::{PartitionTableProvider, TableEntryTableProvider};
use re_grpc_client::redap;
use re_grpc_client::redap::ConnectionError;
use re_log_types::EntryId;
use re_protos::catalog::v1alpha1::ext::EntryDetails;
use re_protos::catalog::v1alpha1::{EntryFilter, EntryKind, FindEntriesRequest};
use re_protos::external::prost;
use re_protos::TypeConversionError;
use re_viewer_context::AsyncRuntimeHandle;

#[derive(Debug, thiserror::Error)]
#[expect(clippy::enum_variant_names)]
pub enum SessionContextError {
    #[error(transparent)]
    TonicError(#[from] tonic::Status),

    #[error(transparent)]
    ConnectionError(#[from] ConnectionError),

    #[error(transparent)]
    DataFusionError(#[from] DataFusionError),

    #[error(transparent)]
    TypeConversionError(#[from] TypeConversionError),
}

struct Table {
    #[expect(dead_code)]
    entry_id: EntryId,
    name: String,
}

/// Wrapper over a [`SessionContext`] that contains all the tables registered in the remote server,
/// including the table entries and the partition tables of the dataset entries.
//TODO(ab): add support for local caching of table data
pub struct TablesSessionContext {
    pub ctx: Arc<SessionContext>,
    origin: re_uri::Origin,

    registered_tables: RequestedObject<Result<Vec<Table>, SessionContextError>>,
}

impl TablesSessionContext {
    pub fn new(
        runtime: &AsyncRuntimeHandle,
        egui_ctx: &egui::Context,
        origin: re_uri::Origin,
    ) -> Self {
        let ctx = Arc::new(SessionContext::new());

        let registered_tables = {
            RequestedObject::new_with_repaint(
                runtime,
                egui_ctx.clone(),
                register_all_table_entries(ctx.clone(), origin.clone()),
            )
        };

        Self {
            ctx,

            origin,
            registered_tables,
        }
    }

    /// Rebuild the entire session context and drop related [`DataFusionTableWidget`] caches.
    pub fn refresh(&mut self, runtime: &AsyncRuntimeHandle, egui_ctx: &egui::Context) {
        if let Some(Ok(tables)) = self.registered_tables.try_as_ref() {
            for table in tables {
                let table_name = table.name.as_str();

                DataFusionTableWidget::clear_state(egui_ctx, &self.ctx, table_name);
                let _ = self.ctx.deregister_table(table_name);
            }
        }

        self.registered_tables = RequestedObject::new_with_repaint(
            runtime,
            egui_ctx.clone(),
            register_all_table_entries(self.ctx.clone(), self.origin.clone()),
        );
    }

    pub fn on_frame_start(&mut self) {
        self.registered_tables.on_frame_start();
    }
}

async fn register_all_table_entries(
    ctx: Arc<SessionContext>,
    origin: re_uri::Origin,
) -> Result<Vec<Table>, SessionContextError> {
    let mut client = redap::client(origin.clone()).await?;

    let entries = client
        .find_entries(FindEntriesRequest {
            filter: Some(EntryFilter {
                id: None,
                name: None,
                entry_kind: None,
            }),
        })
        .await?
        .into_inner()
        .entries
        .into_iter()
        .map(TryInto::try_into)
        .collect::<Result<Vec<EntryDetails>, _>>()?;

    let mut registered_tables = vec![];

    for entry in entries {
        let table_provider = match entry.kind {
            EntryKind::Dataset => Some(
                PartitionTableProvider::new(
                    re_grpc_client::redap::client(origin.clone()).await?,
                    entry.id,
                )
                .into_provider()
                .await?,
            ),

            EntryKind::Table => Some(
                TableEntryTableProvider::new(
                    re_grpc_client::redap::client(origin.clone()).await?,
                    entry.id,
                )
                .into_provider()
                .await?,
            ),

            // TODO(ab): these do not exist yet
            EntryKind::DatasetView | EntryKind::TableView => None,

            EntryKind::Unspecified => {
                return Err(
                    TypeConversionError::from(prost::UnknownEnumValue(entry.kind as i32)).into(),
                );
            }
        };

        if let Some(table_provider) = table_provider {
            ctx.register_table(&entry.name, table_provider)?;

            registered_tables.push(Table {
                entry_id: entry.id,
                name: entry.name,
            });
        }
    }

    Ok(registered_tables)
}