use std::collections::BTreeMap;
use arrow::datatypes::Schema as ArrowSchema;
use pyo3::exceptions::PyValueError;
use pyo3::{
create_exception, exceptions::PyConnectionError, exceptions::PyRuntimeError, PyErr, PyResult,
Python,
};
use re_log_encoding::codec::wire::decoder::Decode as _;
use tokio_stream::StreamExt as _;
use re_chunk::{LatestAtQuery, RangeQuery};
use re_chunk_store::ChunkStore;
use re_dataframe::ViewContentsSelector;
use re_grpc_client::redap::{get_chunks_response_to_chunk_and_partition_id, RedapClient};
use re_log_types::{ApplicationId, EntryId, StoreId, StoreInfo, StoreKind, StoreSource};
use re_protos::{
catalog::v1alpha1::{
ext::{DatasetEntry, EntryDetails, TableEntry},
CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, ReadDatasetEntryRequest,
ReadTableEntryRequest,
},
common::v1alpha1::{IfDuplicateBehavior, TaskId},
frontend::v1alpha1::{GetChunksRequest, GetDatasetSchemaRequest, RegisterWithDatasetRequest},
manifest_registry::v1alpha1::ext::{DataSource, Query, QueryLatestAt, QueryRange},
};
use crate::catalog::to_py_err;
use crate::utils::wait_for_future;
create_exception!(catalog, ConnectionError, PyConnectionError);
#[derive(Clone)]
pub struct ConnectionHandle {
origin: re_uri::Origin,
client: RedapClient,
}
impl ConnectionHandle {
pub fn new(py: Python<'_>, origin: re_uri::Origin) -> PyResult<Self> {
let client = wait_for_future(py, re_grpc_client::redap::client(origin.clone()))
.map_err(to_py_err)?;
Ok(Self { origin, client })
}
pub fn client(&self) -> RedapClient {
self.client.clone()
}
pub fn origin(&self) -> &re_uri::Origin {
&self.origin
}
}
impl ConnectionHandle {
pub fn find_entries(
&mut self,
py: Python<'_>,
filter: EntryFilter,
) -> PyResult<Vec<EntryDetails>> {
let response = wait_for_future(
py,
self.client
.find_entries(re_protos::catalog::v1alpha1::FindEntriesRequest {
filter: Some(filter),
}),
)
.map_err(to_py_err)?;
let entries: Result<Vec<_>, _> = response
.into_inner()
.entries
.into_iter()
.map(TryInto::try_into)
.collect();
Ok(entries?)
}
pub fn delete_entry(&mut self, py: Python<'_>, entry_id: EntryId) -> PyResult<()> {
let _response = wait_for_future(
py,
self.client.delete_entry(DeleteEntryRequest {
id: Some(entry_id.into()),
}),
)
.map_err(to_py_err)?;
Ok(())
}
pub fn create_dataset(&mut self, py: Python<'_>, name: String) -> PyResult<DatasetEntry> {
let response = wait_for_future(
py,
self.client
.create_dataset_entry(CreateDatasetEntryRequest { name: Some(name) }),
)
.map_err(to_py_err)?;
Ok(response
.into_inner()
.dataset
.ok_or(PyRuntimeError::new_err("No dataset in response"))?
.try_into()?)
}
pub fn read_dataset(&mut self, py: Python<'_>, entry_id: EntryId) -> PyResult<DatasetEntry> {
let response = wait_for_future(
py,
self.client.read_dataset_entry(ReadDatasetEntryRequest {
id: Some(entry_id.into()),
}),
)
.map_err(to_py_err)?;
Ok(response
.into_inner()
.dataset
.ok_or(PyRuntimeError::new_err("No dataset in response"))?
.try_into()?)
}
pub fn read_table(&mut self, py: Python<'_>, entry_id: EntryId) -> PyResult<TableEntry> {
let response = wait_for_future(
py,
self.client.read_table_entry(ReadTableEntryRequest {
id: Some(entry_id.into()),
}),
)
.map_err(to_py_err)?;
Ok(response
.into_inner()
.table
.ok_or(PyRuntimeError::new_err("No table in response"))?
.try_into()?)
}
pub fn get_dataset_schema(
&mut self,
py: Python<'_>,
entry_id: EntryId,
) -> PyResult<ArrowSchema> {
wait_for_future(py, async {
self.client
.get_dataset_schema(GetDatasetSchemaRequest {
dataset_id: Some(entry_id.into()),
})
.await
.map_err(to_py_err)?
.into_inner()
.schema()
.map_err(to_py_err)
})
}
pub fn register_with_dataset(
&mut self,
py: Python<'_>,
dataset_id: EntryId,
recording_uri: String,
) -> PyResult<TaskId> {
wait_for_future(py, async {
let response = self
.client
.register_with_dataset(RegisterWithDatasetRequest {
dataset_id: Some(dataset_id.into()),
data_sources: vec![DataSource::new_rrd(recording_uri)
.map_err(to_py_err)?
.into()],
on_duplicate: IfDuplicateBehavior::Error as i32,
})
.await
.map_err(to_py_err)?
.into_inner();
response
.id
.ok_or_else(|| PyValueError::new_err("received response without task id"))
})
}
pub fn wait_for_task(
&mut self,
py: Python<'_>,
task_id: &TaskId,
timeout: std::time::Duration,
) -> PyResult<()> {
wait_for_future(py, async {
let timeout: prost_types::Duration = timeout.try_into().map_err(|err| {
PyValueError::new_err(format!(
"failed to convert timeout to serialized duration: {err}"
))
})?;
let request = re_protos::redap_tasks::v1alpha1::QueryTasksOnCompletionRequest {
ids: vec![task_id.clone()],
timeout: Some(timeout),
};
let mut response_stream = self
.client
.query_tasks_on_completion(request)
.await
.map_err(to_py_err)?
.into_inner();
let task_status = if let Some(response) = response_stream.next().await {
response
.map_err(to_py_err)?
.data
.ok_or_else(|| PyValueError::new_err("received response without data"))?
.decode()
.map_err(to_py_err)?
} else {
return Err(PyValueError::new_err("no response from task"));
};
let exec_status = task_status
.column_by_name("exec_status")
.and_then(|col| col.as_any().downcast_ref::<arrow::array::StringArray>())
.and_then(|arr| arr.value(0).into())
.ok_or_else(|| PyValueError::new_err("couldn't read exec_status column"))?;
let msgs = task_status
.column_by_name("msgs")
.and_then(|col| col.as_any().downcast_ref::<arrow::array::StringArray>())
.and_then(|arr| arr.value(0).into())
.ok_or_else(|| PyValueError::new_err("couldn't read msgs column"))?;
if exec_status == "success" {
Ok(())
} else {
Err(PyValueError::new_err(format!("task failed: {msgs}",)))
}
})
}
#[allow(clippy::too_many_arguments)]
pub fn get_chunks_for_dataframe_query(
&mut self,
py: Python<'_>,
dataset_id: EntryId,
contents: &Option<ViewContentsSelector>,
latest_at: Option<LatestAtQuery>,
range: Option<RangeQuery>,
partition_ids: &[impl AsRef<str> + Sync],
) -> PyResult<BTreeMap<String, ChunkStore>> {
let entity_paths = contents
.as_ref()
.map_or(vec![], |contents| contents.keys().collect::<Vec<_>>());
let query = Query {
latest_at: latest_at.map(|latest_at| QueryLatestAt {
index: latest_at.timeline().to_string(),
at: latest_at.at().as_i64(),
fuzzy_descriptors: vec![], }),
range: range.map(|range| {
QueryRange {
index: range.timeline().to_string(),
index_range: range.range,
fuzzy_descriptors: vec![], }
}),
columns_always_include_everything: false,
columns_always_include_chunk_ids: false,
columns_always_include_entity_paths: false,
columns_always_include_byte_offsets: false,
columns_always_include_static_indexes: false,
columns_always_include_global_indexes: false,
columns_always_include_component_indexes: false,
};
let mut stores = BTreeMap::default();
wait_for_future(py, async {
let get_chunks_response_stream = self
.client
.get_chunks(GetChunksRequest {
dataset_id: Some(dataset_id.into()),
partition_ids: partition_ids
.iter()
.map(|id| id.as_ref().to_owned().into())
.collect(),
chunk_ids: vec![],
entity_paths: entity_paths
.into_iter()
.map(|p| (*p).clone().into())
.collect(),
query: Some(query.into()),
})
.await
.map_err(to_py_err)?
.into_inner();
let mut chunk_stream =
get_chunks_response_to_chunk_and_partition_id(get_chunks_response_stream);
while let Some(chunk_and_partition_id) = chunk_stream.next().await {
let (chunk, partition_id) = chunk_and_partition_id.map_err(to_py_err)?;
let partition_id = partition_id.ok_or_else(|| {
PyValueError::new_err("Received chunk without a partition id")
})?;
let store = stores.entry(partition_id.clone()).or_insert_with(|| {
let store_info = StoreInfo {
application_id: ApplicationId::from(partition_id),
store_id: StoreId::random(StoreKind::Recording),
cloned_from: None,
store_source: StoreSource::Unknown,
store_version: None,
};
let mut store =
ChunkStore::new(store_info.store_id.clone(), Default::default());
store.set_info(store_info);
store
});
store
.insert_chunk(&std::sync::Arc::new(chunk))
.map_err(to_py_err)?;
}
Ok::<_, PyErr>(())
})?;
Ok(stores)
}
}