use std::str::FromStr as _;
use pyo3::{
exceptions::{PyLookupError, PyRuntimeError},
pyclass, pymethods,
types::PyAnyMethods as _,
FromPyObject, Py, PyAny, PyResult, Python,
};
use re_log_types::EntryId;
use re_protos::catalog::v1alpha1::EntryFilter;
use crate::catalog::{to_py_err, ConnectionHandle, PyDataset, PyEntry, PyEntryId, PyTable};
#[pyclass(name = "CatalogClient")]
pub struct PyCatalogClient {
#[expect(dead_code)]
origin: re_uri::Origin,
connection: ConnectionHandle,
datafusion_ctx: Option<Py<PyAny>>,
}
impl PyCatalogClient {
pub fn connection(&self) -> &ConnectionHandle {
&self.connection
}
}
#[pymethods]
impl PyCatalogClient {
#[new]
fn new(py: Python<'_>, addr: String) -> PyResult<Self> {
let origin = addr.as_str().parse::<re_uri::Origin>().map_err(to_py_err)?;
let connection = ConnectionHandle::new(py, origin.clone())?;
let datafusion_ctx = py
.import("datafusion")
.and_then(|datafusion| Ok(datafusion.getattr("SessionContext")?.call0()?.unbind()))
.ok();
Ok(Self {
origin,
connection,
datafusion_ctx,
})
}
fn entries(self_: Py<Self>, py: Python<'_>) -> PyResult<Vec<Py<PyEntry>>> {
let mut connection = self_.borrow(py).connection.clone();
let entry_details = connection.find_entries(
py,
EntryFilter {
id: None,
name: None,
entry_kind: None,
},
)?;
entry_details
.into_iter()
.map(|details| {
let id = Py::new(py, PyEntryId::from(details.id))?;
Py::new(
py,
PyEntry {
client: self_.clone_ref(py),
id,
details,
},
)
})
.collect()
}
fn get_dataset(
self_: Py<Self>,
name_or_id: EntryIdLike,
py: Python<'_>,
) -> PyResult<Py<PyDataset>> {
let mut connection = self_.borrow(py).connection.clone();
let id = name_or_id.resolve(&mut connection, py)?;
let entry_id = id.borrow(py).id;
let client = self_.clone_ref(py);
let dataset_entry = connection.read_dataset(py, entry_id)?;
let entry = PyEntry {
client,
id,
details: dataset_entry.details,
};
let dataset = PyDataset {
dataset_handle: dataset_entry.handle,
};
Py::new(py, (dataset, entry))
}
fn create_dataset(self_: Py<Self>, py: Python<'_>, name: &str) -> PyResult<Py<PyDataset>> {
let mut connection = self_.borrow_mut(py).connection.clone();
let dataset_entry = connection.create_dataset(py, name.to_owned())?;
let entry_id = Py::new(py, PyEntryId::from(dataset_entry.details.id))?;
let entry = PyEntry {
client: self_.clone_ref(py),
id: entry_id,
details: dataset_entry.details,
};
let dataset = PyDataset {
dataset_handle: dataset_entry.handle,
};
Py::new(py, (dataset, entry))
}
fn get_table(
self_: Py<Self>,
name_or_id: EntryIdLike,
py: Python<'_>,
) -> PyResult<Py<PyTable>> {
let mut connection = self_.borrow(py).connection.clone();
let id = name_or_id.resolve(&mut connection, py)?;
let entry_id = id.borrow(py).id;
let client = self_.clone_ref(py);
let dataset_entry = connection.read_table(py, entry_id)?;
let entry = PyEntry {
client,
id,
details: dataset_entry.details,
};
let table = PyTable::default();
Py::new(py, (table, entry))
}
fn entries_table(self_: Py<Self>, py: Python<'_>) -> PyResult<Py<PyTable>> {
Self::get_table(self_, EntryIdLike::Str("__entries".to_owned()), py)
}
#[getter]
pub fn ctx(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
if let Some(datafusion_ctx) = &self.datafusion_ctx {
Ok(datafusion_ctx.clone_ref(py))
} else {
Err(PyRuntimeError::new_err(
"DataFusion context not available (the `datafusion` package may need to be installed)".to_owned(),
))
}
}
}
#[derive(FromPyObject)]
enum EntryIdLike {
Str(String),
Id(Py<PyEntryId>),
}
impl EntryIdLike {
fn resolve(self, connection: &mut ConnectionHandle, py: Python<'_>) -> PyResult<Py<PyEntryId>> {
match self {
Self::Str(name_or_id) => {
let mut entry_details = connection.find_entries(
py,
EntryFilter {
id: None,
name: Some(name_or_id.clone()),
entry_kind: None,
},
)?;
if entry_details.is_empty() {
if let Ok(entry_id) = EntryId::from_str(name_or_id.as_str()) {
entry_details = connection.find_entries(
py,
EntryFilter {
id: Some(entry_id.into()),
name: None,
entry_kind: None,
},
)?;
}
}
if entry_details.is_empty() {
return Err(PyLookupError::new_err(
"No entry found with name or id 'name_or_id'",
));
}
Py::new(
py,
PyEntryId {
id: entry_details[0].id,
},
)
}
Self::Id(id) => Ok(id),
}
}
}