@@ -109,7 +109,7 @@ impl CliApp {
109109 . do_get ( flight_info)
110110 . await ?;
111111 let flight_batch_stream = stream:: select_all ( streams) ;
112- self . print_any_stream ( flight_batch_stream) . await ;
112+ self . print_stream ( flight_batch_stream) . await ;
113113 Ok ( ( ) )
114114 }
115115 FlightSqlCommand :: GetDbSchemas {
@@ -127,7 +127,7 @@ impl CliApp {
127127 . do_get ( flight_info)
128128 . await ?;
129129 let flight_batch_stream = stream:: select_all ( streams) ;
130- self . print_any_stream ( flight_batch_stream) . await ;
130+ self . print_stream ( flight_batch_stream) . await ;
131131 Ok ( ( ) )
132132 }
133133
@@ -154,7 +154,7 @@ impl CliApp {
154154 . do_get ( flight_info)
155155 . await ?;
156156 let flight_batch_stream = stream:: select_all ( streams) ;
157- self . print_any_stream ( flight_batch_stream) . await ;
157+ self . print_stream ( flight_batch_stream) . await ;
158158 Ok ( ( ) )
159159 }
160160 FlightSqlCommand :: GetTableTypes => {
@@ -169,7 +169,7 @@ impl CliApp {
169169 . do_get ( flight_info)
170170 . await ?;
171171 let flight_batch_stream = stream:: select_all ( streams) ;
172- self . print_any_stream ( flight_batch_stream) . await ;
172+ self . print_stream ( flight_batch_stream) . await ;
173173 Ok ( ( ) )
174174 }
175175 FlightSqlCommand :: GetSqlInfo { info } => {
@@ -184,7 +184,7 @@ impl CliApp {
184184 . do_get ( flight_info)
185185 . await ?;
186186 let flight_batch_stream = stream:: select_all ( streams) ;
187- self . print_any_stream ( flight_batch_stream) . await ;
187+ self . print_stream ( flight_batch_stream) . await ;
188188 Ok ( ( ) )
189189 }
190190 FlightSqlCommand :: GetXdbcTypeInfo { data_type } => {
@@ -199,7 +199,7 @@ impl CliApp {
199199 . do_get ( flight_info)
200200 . await ?;
201201 let flight_batch_stream = stream:: select_all ( streams) ;
202- self . print_any_stream ( flight_batch_stream) . await ;
202+ self . print_stream ( flight_batch_stream) . await ;
203203 Ok ( ( ) )
204204 }
205205 }
@@ -403,6 +403,8 @@ impl CliApp {
403403 let stream = client. do_get ( ticket. into_request ( ) ) . await ?;
404404 if let Some ( output_path) = & self . args . output {
405405 self . output_stream ( stream, output_path) . await ?
406+ } else if self . args . json {
407+ self . print_json_stream ( stream) . await ;
406408 } else if let Some ( start) = start {
407409 self . exec_stream ( stream) . await ;
408410 let elapsed = start. elapsed ( ) ;
@@ -543,6 +545,8 @@ impl CliApp {
543545 . await ?;
544546 if let Some ( output_path) = & self . args . output {
545547 self . output_stream ( stream, output_path) . await ?;
548+ } else if self . args . json {
549+ self . print_json_stream ( stream) . await ;
546550 } else if let Some ( start) = start {
547551 self . exec_stream ( stream) . await ;
548552 let elapsed = start. elapsed ( ) ;
@@ -679,18 +683,114 @@ impl CliApp {
679683 }
680684 }
681685
682- async fn print_any_stream < S , E > ( & self , mut stream : S )
686+ #[ cfg( feature = "flightsql" ) ]
687+ async fn print_stream < S , E > ( & self , stream : S )
688+ where
689+ S : Stream < Item = Result < RecordBatch , E > > + Unpin ,
690+ E : Error ,
691+ {
692+ if self . args . json {
693+ self . print_json_stream ( stream) . await ;
694+ } else {
695+ self . print_any_stream ( stream) . await ;
696+ }
697+ }
698+
699+ async fn collect_stream < S , E > ( & self , mut stream : S ) -> Option < Vec < RecordBatch > >
683700 where
684701 S : Stream < Item = Result < RecordBatch , E > > + Unpin ,
685702 E : Error ,
686703 {
704+ let mut batches = Vec :: new ( ) ;
687705 while let Some ( maybe_batch) = stream. next ( ) . await {
688706 match maybe_batch {
689- Ok ( batch) => match pretty_format_batches ( & [ batch] ) {
690- Ok ( d) => println ! ( "{}" , d) ,
691- Err ( e) => println ! ( "Error formatting batch: {e}" ) ,
692- } ,
693- Err ( e) => println ! ( "Error executing SQL: {e}" ) ,
707+ Ok ( batch) => batches. push ( batch) ,
708+ Err ( e) => {
709+ println ! ( "Error executing SQL: {e}" ) ;
710+ return None ;
711+ }
712+ }
713+ }
714+ Some ( batches)
715+ }
716+
717+ async fn print_any_stream < S , E > ( & self , stream : S )
718+ where
719+ S : Stream < Item = Result < RecordBatch , E > > + Unpin ,
720+ E : Error ,
721+ {
722+ if self . args . concat {
723+ let Some ( batches) = self . collect_stream ( stream) . await else {
724+ return ;
725+ } ;
726+ if !batches. is_empty ( ) {
727+ let schema = batches[ 0 ] . schema ( ) ;
728+ match datafusion:: arrow:: compute:: concat_batches ( & schema, & batches) {
729+ Ok ( batch) => match pretty_format_batches ( & [ batch] ) {
730+ Ok ( d) => println ! ( "{}" , d) ,
731+ Err ( e) => println ! ( "Error formatting batch: {e}" ) ,
732+ } ,
733+ Err ( e) => println ! ( "Error concatenating batches: {e}" ) ,
734+ }
735+ }
736+ } else {
737+ let mut stream = stream;
738+ while let Some ( maybe_batch) = stream. next ( ) . await {
739+ match maybe_batch {
740+ Ok ( batch) => match pretty_format_batches ( & [ batch] ) {
741+ Ok ( d) => println ! ( "{}" , d) ,
742+ Err ( e) => println ! ( "Error formatting batch: {e}" ) ,
743+ } ,
744+ Err ( e) => println ! ( "Error executing SQL: {e}" ) ,
745+ }
746+ }
747+ }
748+ }
749+
750+ async fn print_json_stream < S , E > ( & self , stream : S )
751+ where
752+ S : Stream < Item = Result < RecordBatch , E > > + Unpin ,
753+ E : Error ,
754+ {
755+ if self . args . concat {
756+ let Some ( batches) = self . collect_stream ( stream) . await else {
757+ return ;
758+ } ;
759+ if !batches. is_empty ( ) {
760+ let schema = batches[ 0 ] . schema ( ) ;
761+ match datafusion:: arrow:: compute:: concat_batches ( & schema, & batches) {
762+ Ok ( batch) => {
763+ let mut writer = json:: writer:: LineDelimitedWriter :: new ( std:: io:: stdout ( ) ) ;
764+ if let Err ( e) = writer. write ( & batch) {
765+ println ! ( "Error formatting batch as JSON: {e}" ) ;
766+ return ;
767+ }
768+ if let Err ( e) = writer. finish ( ) {
769+ println ! ( "Error finishing JSON output: {e}" ) ;
770+ }
771+ }
772+ Err ( e) => println ! ( "Error concatenating batches: {e}" ) ,
773+ }
774+ }
775+ } else {
776+ let mut stream = stream;
777+ let mut writer = json:: writer:: LineDelimitedWriter :: new ( std:: io:: stdout ( ) ) ;
778+ while let Some ( maybe_batch) = stream. next ( ) . await {
779+ match maybe_batch {
780+ Ok ( batch) => {
781+ if let Err ( e) = writer. write ( & batch) {
782+ println ! ( "Error formatting batch as JSON: {e}" ) ;
783+ return ;
784+ }
785+ }
786+ Err ( e) => {
787+ println ! ( "Error executing SQL: {e}" ) ;
788+ return ;
789+ }
790+ }
791+ }
792+ if let Err ( e) = writer. finish ( ) {
793+ println ! ( "Error finishing JSON output: {e}" ) ;
694794 }
695795 }
696796 }
0 commit comments