re_datafusion/
wasm_compat.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
use datafusion::common::DataFusionError;

/// This is a no-op on non-Wasm target, because the `tonic` future are already `Send`. See wasm
/// version for information.
#[cfg(not(target_arch = "wasm32"))]
#[inline]
pub async fn make_future_send<F, T>(f: F) -> Result<T, DataFusionError>
where
    F: std::future::Future<Output = Result<T, DataFusionError>> + Send + 'static,
    T: Send + 'static,
{
    f.await
}

/// Convert a non-`Send` future into a `Send` one by spawning it and awaiting its result via a
/// channel.
///
/// This is required because `tonic` provides non-`Send` futures while `DataFusion` requires `Send`
/// ones.
#[cfg(target_arch = "wasm32")]
pub fn make_future_send<F, T>(
    f: F,
) -> impl std::future::Future<Output = Result<T, DataFusionError>> + Send + 'static
where
    F: std::future::Future<Output = Result<T, DataFusionError>> + 'static,
    T: Send + 'static,
{
    use futures::{pin_mut, FutureExt as _};
    use futures_util::future::{select, Either};

    let (mut tx, rx) = futures::channel::oneshot::channel();

    wasm_bindgen_futures::spawn_local(async {
        let cancellation = tx.cancellation();

        // needed by `select`
        pin_mut!(f, cancellation);

        match select(f, cancellation).await {
            Either::Left((result, _)) => {
                let _ = tx.send(result);
            }

            Either::Right(_) => {
                // If cancellation is triggered, it means that the future holding on `rx` was
                // dropped. So we don't need to do anything.
            }
        }
    });

    rx.map(|result| result.unwrap_or_else(|err| Err(DataFusionError::External(err.into()))))
}