Skip to content

Commit 233a984

Browse files
JSON output (#370)
Closes #369
1 parent c93782b commit 233a984

3 files changed

Lines changed: 191 additions & 12 deletions

File tree

src/args.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ pub struct DftArgs {
7474
#[clap(long, short, help = "Only show how long the query took to run")]
7575
pub time: bool,
7676

77+
#[clap(
78+
long,
79+
short = 'j',
80+
help = "Output query results as line-delimited JSON"
81+
)]
82+
pub json: bool,
83+
84+
#[clap(
85+
long,
86+
short = 'C',
87+
help = "Concatenate all result batches into a single batch before printing"
88+
)]
89+
pub concat: bool,
90+
7791
#[clap(long, short, help = "Benchmark the provided query")]
7892
pub bench: bool,
7993

src/cli/mod.rs

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl CliApp {
109109
.do_get(flight_info)
110110
.await?;
111111
let flight_batch_stream = stream::select_all(streams);
112-
self.print_any_stream(flight_batch_stream).await;
112+
self.print_stream(flight_batch_stream).await;
113113
Ok(())
114114
}
115115
FlightSqlCommand::GetDbSchemas {
@@ -127,7 +127,7 @@ impl CliApp {
127127
.do_get(flight_info)
128128
.await?;
129129
let flight_batch_stream = stream::select_all(streams);
130-
self.print_any_stream(flight_batch_stream).await;
130+
self.print_stream(flight_batch_stream).await;
131131
Ok(())
132132
}
133133

@@ -154,7 +154,7 @@ impl CliApp {
154154
.do_get(flight_info)
155155
.await?;
156156
let flight_batch_stream = stream::select_all(streams);
157-
self.print_any_stream(flight_batch_stream).await;
157+
self.print_stream(flight_batch_stream).await;
158158
Ok(())
159159
}
160160
FlightSqlCommand::GetTableTypes => {
@@ -169,7 +169,7 @@ impl CliApp {
169169
.do_get(flight_info)
170170
.await?;
171171
let flight_batch_stream = stream::select_all(streams);
172-
self.print_any_stream(flight_batch_stream).await;
172+
self.print_stream(flight_batch_stream).await;
173173
Ok(())
174174
}
175175
FlightSqlCommand::GetSqlInfo { info } => {
@@ -184,7 +184,7 @@ impl CliApp {
184184
.do_get(flight_info)
185185
.await?;
186186
let flight_batch_stream = stream::select_all(streams);
187-
self.print_any_stream(flight_batch_stream).await;
187+
self.print_stream(flight_batch_stream).await;
188188
Ok(())
189189
}
190190
FlightSqlCommand::GetXdbcTypeInfo { data_type } => {
@@ -199,7 +199,7 @@ impl CliApp {
199199
.do_get(flight_info)
200200
.await?;
201201
let flight_batch_stream = stream::select_all(streams);
202-
self.print_any_stream(flight_batch_stream).await;
202+
self.print_stream(flight_batch_stream).await;
203203
Ok(())
204204
}
205205
}
@@ -403,6 +403,8 @@ impl CliApp {
403403
let stream = client.do_get(ticket.into_request()).await?;
404404
if let Some(output_path) = &self.args.output {
405405
self.output_stream(stream, output_path).await?
406+
} else if self.args.json {
407+
self.print_json_stream(stream).await;
406408
} else if let Some(start) = start {
407409
self.exec_stream(stream).await;
408410
let elapsed = start.elapsed();
@@ -543,6 +545,8 @@ impl CliApp {
543545
.await?;
544546
if let Some(output_path) = &self.args.output {
545547
self.output_stream(stream, output_path).await?;
548+
} else if self.args.json {
549+
self.print_json_stream(stream).await;
546550
} else if let Some(start) = start {
547551
self.exec_stream(stream).await;
548552
let elapsed = start.elapsed();
@@ -679,18 +683,114 @@ impl CliApp {
679683
}
680684
}
681685

682-
async fn print_any_stream<S, E>(&self, mut stream: S)
686+
#[cfg(feature = "flightsql")]
687+
async fn print_stream<S, E>(&self, stream: S)
688+
where
689+
S: Stream<Item = Result<RecordBatch, E>> + Unpin,
690+
E: Error,
691+
{
692+
if self.args.json {
693+
self.print_json_stream(stream).await;
694+
} else {
695+
self.print_any_stream(stream).await;
696+
}
697+
}
698+
699+
async fn collect_stream<S, E>(&self, mut stream: S) -> Option<Vec<RecordBatch>>
683700
where
684701
S: Stream<Item = Result<RecordBatch, E>> + Unpin,
685702
E: Error,
686703
{
704+
let mut batches = Vec::new();
687705
while let Some(maybe_batch) = stream.next().await {
688706
match maybe_batch {
689-
Ok(batch) => match pretty_format_batches(&[batch]) {
690-
Ok(d) => println!("{}", d),
691-
Err(e) => println!("Error formatting batch: {e}"),
692-
},
693-
Err(e) => println!("Error executing SQL: {e}"),
707+
Ok(batch) => batches.push(batch),
708+
Err(e) => {
709+
println!("Error executing SQL: {e}");
710+
return None;
711+
}
712+
}
713+
}
714+
Some(batches)
715+
}
716+
717+
async fn print_any_stream<S, E>(&self, stream: S)
718+
where
719+
S: Stream<Item = Result<RecordBatch, E>> + Unpin,
720+
E: Error,
721+
{
722+
if self.args.concat {
723+
let Some(batches) = self.collect_stream(stream).await else {
724+
return;
725+
};
726+
if !batches.is_empty() {
727+
let schema = batches[0].schema();
728+
match datafusion::arrow::compute::concat_batches(&schema, &batches) {
729+
Ok(batch) => match pretty_format_batches(&[batch]) {
730+
Ok(d) => println!("{}", d),
731+
Err(e) => println!("Error formatting batch: {e}"),
732+
},
733+
Err(e) => println!("Error concatenating batches: {e}"),
734+
}
735+
}
736+
} else {
737+
let mut stream = stream;
738+
while let Some(maybe_batch) = stream.next().await {
739+
match maybe_batch {
740+
Ok(batch) => match pretty_format_batches(&[batch]) {
741+
Ok(d) => println!("{}", d),
742+
Err(e) => println!("Error formatting batch: {e}"),
743+
},
744+
Err(e) => println!("Error executing SQL: {e}"),
745+
}
746+
}
747+
}
748+
}
749+
750+
async fn print_json_stream<S, E>(&self, stream: S)
751+
where
752+
S: Stream<Item = Result<RecordBatch, E>> + Unpin,
753+
E: Error,
754+
{
755+
if self.args.concat {
756+
let Some(batches) = self.collect_stream(stream).await else {
757+
return;
758+
};
759+
if !batches.is_empty() {
760+
let schema = batches[0].schema();
761+
match datafusion::arrow::compute::concat_batches(&schema, &batches) {
762+
Ok(batch) => {
763+
let mut writer = json::writer::LineDelimitedWriter::new(std::io::stdout());
764+
if let Err(e) = writer.write(&batch) {
765+
println!("Error formatting batch as JSON: {e}");
766+
return;
767+
}
768+
if let Err(e) = writer.finish() {
769+
println!("Error finishing JSON output: {e}");
770+
}
771+
}
772+
Err(e) => println!("Error concatenating batches: {e}"),
773+
}
774+
}
775+
} else {
776+
let mut stream = stream;
777+
let mut writer = json::writer::LineDelimitedWriter::new(std::io::stdout());
778+
while let Some(maybe_batch) = stream.next().await {
779+
match maybe_batch {
780+
Ok(batch) => {
781+
if let Err(e) = writer.write(&batch) {
782+
println!("Error formatting batch as JSON: {e}");
783+
return;
784+
}
785+
}
786+
Err(e) => {
787+
println!("Error executing SQL: {e}");
788+
return;
789+
}
790+
}
791+
}
792+
if let Err(e) = writer.finish() {
793+
println!("Error finishing JSON output: {e}");
694794
}
695795
}
696796
}

tests/cli_cases/basic.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,71 @@ fn test_output_parquet() {
492492
assert.stdout(contains_str(expected));
493493
}
494494

495+
#[test]
496+
fn test_json_output() {
497+
let assert = Command::cargo_bin("dft")
498+
.unwrap()
499+
.arg("-c")
500+
.arg("SELECT 1 AS id, 'hello' AS name")
501+
.arg("-j")
502+
.assert()
503+
.success();
504+
505+
assert.stdout(contains_str(r#"{"id":1,"name":"hello"}"#));
506+
}
507+
508+
#[test]
509+
fn test_json_output_multiple_rows() {
510+
let assert = Command::cargo_bin("dft")
511+
.unwrap()
512+
.arg("-c")
513+
.arg("SELECT * FROM (VALUES (1, 'a'), (2, 'b')) AS t(id, val)")
514+
.arg("-j")
515+
.assert()
516+
.success();
517+
518+
assert
519+
.stdout(contains_str(r#"{"id":1,"val":"a"}"#))
520+
.stdout(contains_str(r#"{"id":2,"val":"b"}"#));
521+
}
522+
523+
#[test]
524+
fn test_concat_output() {
525+
let assert = Command::cargo_bin("dft")
526+
.unwrap()
527+
.arg("-c")
528+
.arg("SELECT * FROM (VALUES (1, 'a'), (2, 'b')) AS t(id, val)")
529+
.arg("-C")
530+
.assert()
531+
.success();
532+
533+
// With concat the result is a single table with all rows
534+
let expected = r#"
535+
+----+-----+
536+
| id | val |
537+
+----+-----+
538+
| 1 | a |
539+
| 2 | b |
540+
+----+-----+"#;
541+
assert.stdout(contains_str(expected));
542+
}
543+
544+
#[test]
545+
fn test_json_and_concat_output() {
546+
let assert = Command::cargo_bin("dft")
547+
.unwrap()
548+
.arg("-c")
549+
.arg("SELECT * FROM (VALUES (1, 'a'), (2, 'b')) AS t(id, val)")
550+
.arg("-j")
551+
.arg("-C")
552+
.assert()
553+
.success();
554+
555+
assert
556+
.stdout(contains_str(r#"{"id":1,"val":"a"}"#))
557+
.stdout(contains_str(r#"{"id":2,"val":"b"}"#));
558+
}
559+
495560
#[test]
496561
#[cfg(feature = "vortex")]
497562
fn test_output_vortex() {

0 commit comments

Comments
 (0)