Struct Decoder
pub struct Decoder {
tape_decoder: TapeDecoder,
decoder: Box<dyn ArrayDecoder>,
batch_size: usize,
is_field: bool,
schema: Arc<Schema>,
}
Expand description
A low-level interface for reading JSON data from a byte stream
See Reader
for a higher-level interface for interface with BufRead
The push-based interface facilitates integration with sources that yield arbitrarily
delimited bytes ranges, such as BufRead
, or a chunked byte stream received from
object storage
fn read_from_json<R: BufRead>(
mut reader: R,
schema: SchemaRef,
) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
let mut next = move || {
loop {
// Decoder is agnostic that buf doesn't contain whole records
let buf = reader.fill_buf()?;
if buf.is_empty() {
break; // Input exhausted
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
// Consume the number of bytes read
reader.consume(decoded);
if decoded != read {
break; // Read batch size
}
}
decoder.flush()
};
Ok(std::iter::from_fn(move || next().transpose()))
}
Fields§
§tape_decoder: TapeDecoder
§decoder: Box<dyn ArrayDecoder>
§batch_size: usize
§is_field: bool
§schema: Arc<Schema>
Implementations§
§impl Decoder
impl Decoder
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
Read JSON objects from buf
, returning the number of bytes read
This method returns once batch_size
objects have been parsed since the
last call to Self::flush
, or buf
is exhausted. Any remaining bytes
should be included in the next call to Self::decode
There is no requirement that buf
contains a whole number of records, facilitating
integration with arbitrary byte streams, such as those yielded by BufRead
pub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
pub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
Serialize rows
to this Decoder
This provides a simple way to convert serde-compatible datastructures into arrow
RecordBatch
.
Custom conversion logic as described in arrow_array::builder will likely outperform this, especially where the schema is known at compile-time, however, this provides a mechanism to get something up and running quickly
It can be used with serde_json::Value
let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&json).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 1);
let values = batch.column(0).as_primitive::<Float32Type>().values();
assert_eq!(values, &[2.3, 5.7])
Or with arbitrary Serialize
types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
float: f32,
}
let schema = Schema::new(vec![
Field::new("int32", DataType::Int32, false),
Field::new("float", DataType::Float32, false),
]);
let rows = vec![
MyStruct{ int32: 0, float: 3. },
MyStruct{ int32: 4, float: 67.53 },
];
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&rows).unwrap();
let batch = decoder.flush().unwrap().unwrap();
// Expect batch containing two columns
let int32 = batch.column(0).as_primitive::<Int32Type>();
assert_eq!(int32.values(), &[0, 4]);
let float = batch.column(1).as_primitive::<Float32Type>();
assert_eq!(float.values(), &[3., 67.53]);
Or even complex nested types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
list: Vec<f64>,
nested: Vec<Option<Nested>>,
}
impl MyStruct {
/// Returns the [`Fields`] for [`MyStruct`]
fn fields() -> Fields {
let nested = DataType::Struct(Nested::fields());
Fields::from([
Arc::new(Field::new("int32", DataType::Int32, false)),
Arc::new(Field::new_list(
"list",
Field::new("element", DataType::Float64, false),
false,
)),
Arc::new(Field::new_list(
"nested",
Field::new("element", nested, true),
true,
)),
])
}
}
#[derive(Serialize)]
struct Nested {
map: BTreeMap<String, Vec<String>>
}
impl Nested {
/// Returns the [`Fields`] for [`Nested`]
fn fields() -> Fields {
let element = Field::new("element", DataType::Utf8, false);
Fields::from([
Arc::new(Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new_list("value", element, false),
false, // sorted
false, // nullable
))
])
}
}
let data = vec![
MyStruct {
int32: 34,
list: vec![1., 2., 34.],
nested: vec![
None,
Some(Nested {
map: vec![
("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
("key2".to_string(), vec!["baz".to_string()])
].into_iter().collect()
})
]
},
MyStruct {
int32: 56,
list: vec![],
nested: vec![]
},
MyStruct {
int32: 24,
list: vec![-1., 245.],
nested: vec![None]
}
];
let schema = Schema::new(MyStruct::fields());
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&data).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
// Convert to StructArray to format
let s = StructArray::from(batch);
let options = FormatOptions::default().with_null("null");
let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
Note: this ignores any batch size setting, and always decodes all rows
pub fn has_partial_record(&self) -> bool
pub fn has_partial_record(&self) -> bool
True if the decoder is currently part way through decoding a record.
pub fn len(&self) -> usize
pub fn len(&self) -> usize
The number of unflushed records, including the partially decoded record (if any).
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Flushes the currently buffered data to a RecordBatch
Returns Ok(None)
if no buffered data, i.e. Self::is_empty
is true.
Note: This will return an error if called part way through decoding a record,
i.e. Self::has_partial_record
is true.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Decoder
impl !RefUnwindSafe for Decoder
impl Send for Decoder
impl !Sync for Decoder
impl Unpin for Decoder
impl !UnwindSafe for Decoder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CheckedAs for T
impl<T> CheckedAs for T
Source§fn checked_as<Dst>(self) -> Option<Dst>where
T: CheckedCast<Dst>,
fn checked_as<Dst>(self) -> Option<Dst>where
T: CheckedCast<Dst>,
Source§impl<Src, Dst> CheckedCastFrom<Src> for Dstwhere
Src: CheckedCast<Dst>,
impl<Src, Dst> CheckedCastFrom<Src> for Dstwhere
Src: CheckedCast<Dst>,
Source§fn checked_cast_from(src: Src) -> Option<Dst>
fn checked_cast_from(src: Src) -> Option<Dst>
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<Src, Dst> LosslessTryInto<Dst> for Srcwhere
Dst: LosslessTryFrom<Src>,
impl<Src, Dst> LosslessTryInto<Dst> for Srcwhere
Dst: LosslessTryFrom<Src>,
Source§fn lossless_try_into(self) -> Option<Dst>
fn lossless_try_into(self) -> Option<Dst>
Source§impl<Src, Dst> LossyInto<Dst> for Srcwhere
Dst: LossyFrom<Src>,
impl<Src, Dst> LossyInto<Dst> for Srcwhere
Dst: LossyFrom<Src>,
Source§fn lossy_into(self) -> Dst
fn lossy_into(self) -> Dst
Source§impl<T> OverflowingAs for T
impl<T> OverflowingAs for T
Source§fn overflowing_as<Dst>(self) -> (Dst, bool)where
T: OverflowingCast<Dst>,
fn overflowing_as<Dst>(self) -> (Dst, bool)where
T: OverflowingCast<Dst>,
Source§impl<Src, Dst> OverflowingCastFrom<Src> for Dstwhere
Src: OverflowingCast<Dst>,
impl<Src, Dst> OverflowingCastFrom<Src> for Dstwhere
Src: OverflowingCast<Dst>,
Source§fn overflowing_cast_from(src: Src) -> (Dst, bool)
fn overflowing_cast_from(src: Src) -> (Dst, bool)
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> SaturatingAs for T
impl<T> SaturatingAs for T
Source§fn saturating_as<Dst>(self) -> Dstwhere
T: SaturatingCast<Dst>,
fn saturating_as<Dst>(self) -> Dstwhere
T: SaturatingCast<Dst>,
Source§impl<Src, Dst> SaturatingCastFrom<Src> for Dstwhere
Src: SaturatingCast<Dst>,
impl<Src, Dst> SaturatingCastFrom<Src> for Dstwhere
Src: SaturatingCast<Dst>,
Source§fn saturating_cast_from(src: Src) -> Dst
fn saturating_cast_from(src: Src) -> Dst
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.