3838//! let a = Int32Array::from(vec![1, 2, 3]);
3939//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4040//!
41- //! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
41+ //! let json_rows = json::writer::record_batches_to_json_rows(&[batch]).unwrap() ;
4242//! assert_eq!(
4343//! serde_json::Value::Object(json_rows[1].clone()),
4444//! serde_json::json!({"a": 2}),
@@ -110,64 +110,61 @@ use serde_json::Value;
110110
111111use crate :: array:: * ;
112112use crate :: datatypes:: * ;
113- use crate :: error:: Result ;
113+ use crate :: error:: { ArrowError , Result } ;
114114use crate :: record_batch:: RecordBatch ;
115115
116- fn primitive_array_to_json < T : ArrowPrimitiveType > ( array : & ArrayRef ) -> Vec < Value > {
117- as_primitive_array :: < T > ( array)
116+ fn primitive_array_to_json < T : ArrowPrimitiveType > (
117+ array : & ArrayRef ,
118+ ) -> Result < Vec < Value > > {
119+ Ok ( as_primitive_array :: < T > ( array)
118120 . iter ( )
119121 . map ( |maybe_value| match maybe_value {
120122 Some ( v) => v. into_json_value ( ) . unwrap_or ( Value :: Null ) ,
121123 None => Value :: Null ,
122124 } )
123- . collect ( )
125+ . collect ( ) )
124126}
125127
126128fn struct_array_to_jsonmap_array (
127129 array : & StructArray ,
128130 row_count : usize ,
129- ) -> Vec < JsonMap < String , Value > > {
131+ ) -> Result < Vec < JsonMap < String , Value > > > {
130132 let inner_col_names = array. column_names ( ) ;
131133
132134 let mut inner_objs = iter:: repeat ( JsonMap :: new ( ) )
133135 . take ( row_count)
134136 . collect :: < Vec < JsonMap < String , Value > > > ( ) ;
135137
136- array
137- . columns ( )
138- . iter ( )
139- . enumerate ( )
140- . for_each ( |( j, struct_col) | {
141- set_column_for_json_rows (
142- & mut inner_objs,
143- row_count,
144- struct_col,
145- inner_col_names[ j] ,
146- ) ;
147- } ) ;
148-
149- inner_objs
138+ for ( j, struct_col) in array. columns ( ) . iter ( ) . enumerate ( ) {
139+ set_column_for_json_rows (
140+ & mut inner_objs,
141+ row_count,
142+ struct_col,
143+ inner_col_names[ j] ,
144+ ) ?
145+ }
146+ Ok ( inner_objs)
150147}
151148
152149/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s
153- pub fn array_to_json_array ( array : & ArrayRef ) -> Vec < Value > {
150+ pub fn array_to_json_array ( array : & ArrayRef ) -> Result < Vec < Value > > {
154151 match array. data_type ( ) {
155- DataType :: Null => iter:: repeat ( Value :: Null ) . take ( array. len ( ) ) . collect ( ) ,
156- DataType :: Boolean => as_boolean_array ( array)
152+ DataType :: Null => Ok ( iter:: repeat ( Value :: Null ) . take ( array. len ( ) ) . collect ( ) ) ,
153+ DataType :: Boolean => Ok ( as_boolean_array ( array)
157154 . iter ( )
158155 . map ( |maybe_value| match maybe_value {
159156 Some ( v) => v. into ( ) ,
160157 None => Value :: Null ,
161158 } )
162- . collect ( ) ,
159+ . collect ( ) ) ,
163160
164- DataType :: Utf8 => as_string_array ( array)
161+ DataType :: Utf8 => Ok ( as_string_array ( array)
165162 . iter ( )
166163 . map ( |maybe_value| match maybe_value {
167164 Some ( v) => v. into ( ) ,
168165 None => Value :: Null ,
169166 } )
170- . collect ( ) ,
167+ . collect ( ) ) ,
171168 DataType :: Int8 => primitive_array_to_json :: < Int8Type > ( array) ,
172169 DataType :: Int16 => primitive_array_to_json :: < Int16Type > ( array) ,
173170 DataType :: Int32 => primitive_array_to_json :: < Int32Type > ( array) ,
@@ -181,28 +178,26 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
181178 DataType :: List ( _) => as_list_array ( array)
182179 . iter ( )
183180 . map ( |maybe_value| match maybe_value {
184- Some ( v) => Value :: Array ( array_to_json_array ( & v) ) ,
185- None => Value :: Null ,
181+ Some ( v) => Ok ( Value :: Array ( array_to_json_array ( & v) ? ) ) ,
182+ None => Ok ( Value :: Null ) ,
186183 } )
187184 . collect ( ) ,
188185 DataType :: LargeList ( _) => as_large_list_array ( array)
189186 . iter ( )
190187 . map ( |maybe_value| match maybe_value {
191- Some ( v) => Value :: Array ( array_to_json_array ( & v) ) ,
192- None => Value :: Null ,
188+ Some ( v) => Ok ( Value :: Array ( array_to_json_array ( & v) ? ) ) ,
189+ None => Ok ( Value :: Null ) ,
193190 } )
194191 . collect ( ) ,
195192 DataType :: Struct ( _) => {
196193 let jsonmaps =
197- struct_array_to_jsonmap_array ( as_struct_array ( array) , array. len ( ) ) ;
198- jsonmaps. into_iter ( ) . map ( Value :: Object ) . collect ( )
199- }
200- _ => {
201- panic ! (
202- "Unsupported datatype for array conversion: {:#?}" ,
203- array. data_type( )
204- ) ;
194+ struct_array_to_jsonmap_array ( as_struct_array ( array) , array. len ( ) ) ?;
195+ Ok ( jsonmaps. into_iter ( ) . map ( Value :: Object ) . collect ( ) )
205196 }
197+ t => Err ( ArrowError :: JsonError ( format ! (
198+ "data type {:?} not supported" ,
199+ t
200+ ) ) ) ,
206201 }
207202}
208203
@@ -261,37 +256,37 @@ fn set_column_for_json_rows(
261256 row_count : usize ,
262257 array : & ArrayRef ,
263258 col_name : & str ,
264- ) {
259+ ) -> Result < ( ) > {
265260 match array. data_type ( ) {
266261 DataType :: Int8 => {
267- set_column_by_primitive_type :: < Int8Type > ( rows, row_count, array, col_name)
262+ set_column_by_primitive_type :: < Int8Type > ( rows, row_count, array, col_name) ;
268263 }
269264 DataType :: Int16 => {
270- set_column_by_primitive_type :: < Int16Type > ( rows, row_count, array, col_name)
265+ set_column_by_primitive_type :: < Int16Type > ( rows, row_count, array, col_name) ;
271266 }
272267 DataType :: Int32 => {
273- set_column_by_primitive_type :: < Int32Type > ( rows, row_count, array, col_name)
268+ set_column_by_primitive_type :: < Int32Type > ( rows, row_count, array, col_name) ;
274269 }
275270 DataType :: Int64 => {
276- set_column_by_primitive_type :: < Int64Type > ( rows, row_count, array, col_name)
271+ set_column_by_primitive_type :: < Int64Type > ( rows, row_count, array, col_name) ;
277272 }
278273 DataType :: UInt8 => {
279- set_column_by_primitive_type :: < UInt8Type > ( rows, row_count, array, col_name)
274+ set_column_by_primitive_type :: < UInt8Type > ( rows, row_count, array, col_name) ;
280275 }
281276 DataType :: UInt16 => {
282- set_column_by_primitive_type :: < UInt16Type > ( rows, row_count, array, col_name)
277+ set_column_by_primitive_type :: < UInt16Type > ( rows, row_count, array, col_name) ;
283278 }
284279 DataType :: UInt32 => {
285- set_column_by_primitive_type :: < UInt32Type > ( rows, row_count, array, col_name)
280+ set_column_by_primitive_type :: < UInt32Type > ( rows, row_count, array, col_name) ;
286281 }
287282 DataType :: UInt64 => {
288- set_column_by_primitive_type :: < UInt64Type > ( rows, row_count, array, col_name)
283+ set_column_by_primitive_type :: < UInt64Type > ( rows, row_count, array, col_name) ;
289284 }
290285 DataType :: Float32 => {
291- set_column_by_primitive_type :: < Float32Type > ( rows, row_count, array, col_name)
286+ set_column_by_primitive_type :: < Float32Type > ( rows, row_count, array, col_name) ;
292287 }
293288 DataType :: Float64 => {
294- set_column_by_primitive_type :: < Float64Type > ( rows, row_count, array, col_name)
289+ set_column_by_primitive_type :: < Float64Type > ( rows, row_count, array, col_name) ;
295290 }
296291 DataType :: Null => {
297292 // when value is null, we simply skip setting the key
@@ -444,7 +439,7 @@ fn set_column_for_json_rows(
444439 }
445440 DataType :: Struct ( _) => {
446441 let inner_objs =
447- struct_array_to_jsonmap_array ( as_struct_array ( array) , row_count) ;
442+ struct_array_to_jsonmap_array ( as_struct_array ( array) , row_count) ? ;
448443 rows. iter_mut ( )
449444 . take ( row_count)
450445 . zip ( inner_objs. into_iter ( ) )
@@ -457,34 +452,34 @@ fn set_column_for_json_rows(
457452 rows. iter_mut ( )
458453 . zip ( listarr. iter ( ) )
459454 . take ( row_count)
460- . for_each ( |( row, maybe_value) | {
455+ . try_for_each ( |( row, maybe_value) | -> Result < ( ) > {
461456 if let Some ( v) = maybe_value {
462457 row. insert (
463458 col_name. to_string ( ) ,
464- Value :: Array ( array_to_json_array ( & v) ) ,
459+ Value :: Array ( array_to_json_array ( & v) ? ) ,
465460 ) ;
466461 }
467- } ) ;
462+ Ok ( ( ) )
463+ } ) ?;
468464 }
469465 DataType :: LargeList ( _) => {
470466 let listarr = as_large_list_array ( array) ;
471467 rows. iter_mut ( )
472468 . zip ( listarr. iter ( ) )
473469 . take ( row_count)
474- . for_each ( |( row, maybe_value) | {
470+ . try_for_each ( |( row, maybe_value) | -> Result < ( ) > {
475471 if let Some ( v) = maybe_value {
476- row. insert (
477- col_name. to_string ( ) ,
478- Value :: Array ( array_to_json_array ( & v) ) ,
479- ) ;
472+ let val = array_to_json_array ( & v) ?;
473+ row. insert ( col_name. to_string ( ) , Value :: Array ( val) ) ;
480474 }
481- } ) ;
475+ Ok ( ( ) )
476+ } ) ?;
482477 }
483478 DataType :: Dictionary ( _, value_type) => {
484479 let slice = array. slice ( 0 , row_count) ;
485480 let hydrated = crate :: compute:: kernels:: cast:: cast ( & slice, value_type)
486481 . expect ( "cannot cast dictionary to underlying values" ) ;
487- set_column_for_json_rows ( rows, row_count, & hydrated, col_name)
482+ set_column_for_json_rows ( rows, row_count, & hydrated, col_name) ? ;
488483 }
489484 DataType :: Map ( _, _) => {
490485 let maparr = as_map_array ( array) ;
@@ -494,11 +489,14 @@ fn set_column_for_json_rows(
494489
495490 // Keys have to be strings to convert to json.
496491 if !matches ! ( keys. data_type( ) , DataType :: Utf8 ) {
497- panic ! ( "Unsupported datatype: {:#?}" , array. data_type( ) ) ;
492+ return Err ( ArrowError :: JsonError ( format ! (
493+ "data type {:?} not supported in nested map for json writer" ,
494+ keys. data_type( )
495+ ) ) ) ;
498496 }
499497
500498 let keys = as_string_array ( & keys) ;
501- let values = array_to_json_array ( & values) ;
499+ let values = array_to_json_array ( & values) ? ;
502500
503501 let mut kv = keys. iter ( ) . zip ( values. into_iter ( ) ) ;
504502
@@ -522,34 +520,38 @@ fn set_column_for_json_rows(
522520 }
523521 }
524522 _ => {
525- panic ! ( "Unsupported datatype: {:#?}" , array. data_type( ) ) ;
523+ return Err ( ArrowError :: JsonError ( format ! (
524+ "data type {:?} not supported in nested map for json writer" ,
525+ array. data_type( )
526+ ) ) )
526527 }
527528 }
529+ Ok ( ( ) )
528530}
529531
530532/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
531533/// [`JsonMap`]s (objects)
532534pub fn record_batches_to_json_rows (
533535 batches : & [ RecordBatch ] ,
534- ) -> Vec < JsonMap < String , Value > > {
536+ ) -> Result < Vec < JsonMap < String , Value > > > {
535537 let mut rows: Vec < JsonMap < String , Value > > = iter:: repeat ( JsonMap :: new ( ) )
536538 . take ( batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) )
537539 . collect ( ) ;
538540
539541 if !rows. is_empty ( ) {
540542 let schema = batches[ 0 ] . schema ( ) ;
541543 let mut base = 0 ;
542- batches . iter ( ) . for_each ( | batch| {
544+ for batch in batches {
543545 let row_count = batch. num_rows ( ) ;
544- batch. columns ( ) . iter ( ) . enumerate ( ) . for_each ( | ( j , col ) | {
546+ for ( j , col ) in batch. columns ( ) . iter ( ) . enumerate ( ) {
545547 let col_name = schema. field ( j) . name ( ) ;
546- set_column_for_json_rows ( & mut rows[ base..] , row_count, col, col_name) ;
547- } ) ;
548+ set_column_for_json_rows ( & mut rows[ base..] , row_count, col, col_name) ?
549+ }
548550 base += row_count;
549- } ) ;
551+ }
550552 }
551553
552- rows
554+ Ok ( rows)
553555}
554556
555557/// This trait defines how to format a sequence of JSON objects to a
@@ -683,7 +685,7 @@ where
683685
684686 /// Convert the [`RecordBatch`] into JSON rows, and write them to the output
685687 pub fn write_batches ( & mut self , batches : & [ RecordBatch ] ) -> Result < ( ) > {
686- for row in record_batches_to_json_rows ( batches) {
688+ for row in record_batches_to_json_rows ( batches) ? {
687689 self . write_row ( & Value :: Object ( row) ) ?;
688690 }
689691 Ok ( ( ) )
0 commit comments