re_datafusion/
partition_table.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::sync::Arc;

use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
    catalog::TableProvider,
    error::{DataFusionError, Result as DataFusionResult},
};
use tonic::transport::Channel;

use re_log_encoding::codec::wire::decoder::Decode as _;
use re_log_types::EntryId;
use re_protos::frontend::v1alpha1::GetPartitionTableSchemaRequest;
use re_protos::{
    frontend::v1alpha1::{
        frontend_service_client::FrontendServiceClient, ScanPartitionTableRequest,
    },
    manifest_registry::v1alpha1::ScanPartitionTableResponse,
};

use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};

#[derive(Debug, Clone)]
pub struct PartitionTableProvider {
    client: FrontendServiceClient<Channel>,
    dataset_id: EntryId,
}

impl PartitionTableProvider {
    pub fn new(client: FrontendServiceClient<Channel>, dataset_id: EntryId) -> Self {
        Self { client, dataset_id }
    }

    /// This is a convenience function
    pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
        Ok(GrpcStreamProvider::prepare(self).await?)
    }
}

#[async_trait]
impl GrpcStreamToTable for PartitionTableProvider {
    type GrpcStreamData = ScanPartitionTableResponse;

    async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
        let request = GetPartitionTableSchemaRequest {
            dataset_id: Some(self.dataset_id.into()),
        };

        Ok(Arc::new(
            self.client
                .get_partition_table_schema(request)
                .await
                .map_err(|err| DataFusionError::External(Box::new(err)))?
                .into_inner()
                .schema
                .ok_or(DataFusionError::External(
                    "Schema missing from GetPartitionTableSchema response".into(),
                ))?
                .try_into()?,
        ))
    }

    async fn send_streaming_request(
        &mut self,
    ) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
        let request = ScanPartitionTableRequest {
            dataset_id: Some(self.dataset_id.into()),
            scan_parameters: None,
        };

        self.client
            .scan_partition_table(request)
            .await
            .map_err(|err| DataFusionError::External(Box::new(err)))
    }

    fn process_response(
        &mut self,
        response: Self::GrpcStreamData,
    ) -> DataFusionResult<RecordBatch> {
        response
            .data
            .ok_or(DataFusionError::Execution(
                "DataFrame missing from PartitionList response".to_owned(),
            ))?
            .decode()
            .map_err(|err| DataFusionError::External(Box::new(err)))
    }
}