Skip to content

Commit 9c74d01

Browse files
committed
Address martin-g review comments on JSON arrays PR apache#19924
1. Forward stream read errors to result_tx instead of only logging them 2. Switch byte channel from std::sync::mpsc to tokio::sync::mpsc to avoid blocking a tokio worker thread when the buffer is full 3. Add validate_complete() after parsing to detect malformed JSON 4. Use workspace = true for serde_json dep; remove unused log dep 5. Use PathBuf::join() instead of hardcoded "/" in test paths 6. Detect leading non-whitespace content before '[' in JSON array parser https://claude.ai/code/session_01UGW59c5Fu8isLiNhU191uy
1 parent 828deb2 commit 9c74d01

5 files changed

Lines changed: 91 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ mod tests {
5959
/// Create a temporary JSON file and return (TempDir, path)
6060
fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
6161
let tmp_dir = tempfile::TempDir::new().unwrap();
62-
let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
62+
let path = tmp_dir.path().join("test.json");
6363
std::fs::write(&path, content).unwrap();
64-
(tmp_dir, path)
64+
(tmp_dir, path.to_string_lossy().to_string())
6565
}
6666

6767
/// Infer schema from JSON array format file
@@ -395,7 +395,8 @@ mod tests {
395395
async fn test_write_empty_json_from_sql() -> Result<()> {
396396
let ctx = SessionContext::new();
397397
let tmp_dir = tempfile::TempDir::new()?;
398-
let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy());
398+
let path = tmp_dir.path().join("empty_sql.json");
399+
let path = path.to_string_lossy().to_string();
399400
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
400401
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
401402
.await?;
@@ -421,7 +422,8 @@ mod tests {
421422
)?;
422423

423424
let tmp_dir = tempfile::TempDir::new()?;
424-
let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy());
425+
let path = tmp_dir.path().join("empty_batch.json");
426+
let path = path.to_string_lossy().to_string();
425427
let df = ctx.read_batch(empty_batch.clone())?;
426428
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
427429
.await?;
@@ -586,7 +588,8 @@ mod tests {
586588
use std::io::Write;
587589

588590
let tmp_dir = tempfile::TempDir::new()?;
589-
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());
591+
let path = tmp_dir.path().join("array.json.gz");
592+
let path = path.to_string_lossy().to_string();
590593

591594
let file = std::fs::File::create(&path)?;
592595
let mut encoder = GzEncoder::new(file, Compression::default());

datafusion/datasource-json/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ datafusion-physical-expr-common = { workspace = true }
4343
datafusion-physical-plan = { workspace = true }
4444
datafusion-session = { workspace = true }
4545
futures = { workspace = true }
46-
log = "0.4.29"
4746
object_store = { workspace = true }
48-
serde_json = "1.0.149"
47+
serde_json = { workspace = true }
4948
tokio = { workspace = true }
5049
tokio-stream = { workspace = true, features = ["sync"] }
5150

datafusion/datasource-json/src/source.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,14 +358,18 @@ impl FileOpener for JsonOpener {
358358
let decompressed_stream =
359359
file_compression_type.convert_stream(s.boxed())?;
360360

361-
// Channel for bytes: async producer -> sync consumer
361+
// Channel for bytes: async producer -> blocking consumer
362+
// Uses tokio::sync::mpsc so the async send never blocks a
363+
// tokio worker thread; the consumer calls blocking_recv()
364+
// inside spawn_blocking.
362365
let (byte_tx, byte_rx) =
363-
std::sync::mpsc::sync_channel::<bytes::Bytes>(
366+
tokio::sync::mpsc::channel::<bytes::Bytes>(
364367
CHANNEL_BUFFER_SIZE,
365368
);
366369

367370
// Channel for results: sync producer -> async consumer
368371
let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
372+
let error_tx = result_tx.clone();
369373

370374
// Async task: read from object store stream and send bytes to channel
371375
// Store the SpawnedTask to keep it alive until stream is dropped
@@ -374,12 +378,16 @@ impl FileOpener for JsonOpener {
374378
while let Some(chunk) = decompressed_stream.next().await {
375379
match chunk {
376380
Ok(bytes) => {
377-
if byte_tx.send(bytes).is_err() {
381+
if byte_tx.send(bytes).await.is_err() {
378382
break; // Consumer dropped
379383
}
380384
}
381385
Err(e) => {
382-
log::error!("Error reading JSON stream: {e}");
386+
let _ = error_tx
387+
.send(Err(arrow::error::ArrowError::ExternalError(
388+
Box::new(e),
389+
)))
390+
.await;
383391
break;
384392
}
385393
}
@@ -391,14 +399,14 @@ impl FileOpener for JsonOpener {
391399
// Store the SpawnedTask to keep it alive until stream is dropped
392400
let parse_task = SpawnedTask::spawn_blocking(move || {
393401
let channel_reader = ChannelReader::new(byte_rx);
394-
let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
402+
let mut ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
395403
channel_reader,
396404
JSON_CONVERTER_BUFFER_SIZE,
397405
);
398406

399407
match ReaderBuilder::new(schema)
400408
.with_batch_size(batch_size)
401-
.build(ndjson_reader)
409+
.build(&mut ndjson_reader)
402410
{
403411
Ok(arrow_reader) => {
404412
for batch_result in arrow_reader {
@@ -412,6 +420,13 @@ impl FileOpener for JsonOpener {
412420
let _ = result_tx.blocking_send(Err(e));
413421
}
414422
}
423+
424+
// Validate the JSON array was properly formed
425+
if let Err(e) = ndjson_reader.validate_complete() {
426+
let _ = result_tx.blocking_send(Err(
427+
arrow::error::ArrowError::JsonError(e.to_string()),
428+
));
429+
}
415430
// result_tx dropped here, closes the stream
416431
});
417432

datafusion/datasource-json/src/utils.rs

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Utility types for JSON processing
1919
2020
use std::io::{BufRead, Read};
21-
use std::sync::mpsc::Receiver;
2221

2322
use bytes::Bytes;
2423

@@ -130,6 +129,8 @@ pub struct JsonArrayToNdjsonReader<R: Read> {
130129
output_filled: usize,
131130
/// Whether trailing non-whitespace content was detected after ']'
132131
has_trailing_content: bool,
132+
/// Whether leading non-whitespace content was detected before '['
133+
has_leading_content: bool,
133134
}
134135

135136
impl<R: Read> JsonArrayToNdjsonReader<R> {
@@ -156,6 +157,7 @@ impl<R: Read> JsonArrayToNdjsonReader<R> {
156157
output_pos: 0,
157158
output_filled: 0,
158159
has_trailing_content: false,
160+
has_leading_content: false,
159161
}
160162
}
161163

@@ -169,6 +171,12 @@ impl<R: Read> JsonArrayToNdjsonReader<R> {
169171
/// - Missing closing `]`
170172
/// - Unexpected trailing content after `]`
171173
pub fn validate_complete(&self) -> std::io::Result<()> {
174+
if self.has_leading_content {
175+
return Err(std::io::Error::new(
176+
std::io::ErrorKind::InvalidData,
177+
"Malformed JSON: unexpected leading content before '['",
178+
));
179+
}
172180
if self.depth != 0 {
173181
return Err(std::io::Error::new(
174182
std::io::ErrorKind::InvalidData,
@@ -204,8 +212,9 @@ impl<R: Read> JsonArrayToNdjsonReader<R> {
204212
// Looking for the opening '[', skip whitespace
205213
if byte == b'[' {
206214
self.state = JsonArrayState::InArray;
215+
} else if !byte.is_ascii_whitespace() {
216+
self.has_leading_content = true;
207217
}
208-
// Skip whitespace and the '[' itself
209218
None
210219
}
211220
JsonArrayState::InArray => {
@@ -376,7 +385,7 @@ impl<R: Read> BufRead for JsonArrayToNdjsonReader<R> {
376385
// │ byte_tx.send(chunk) │
377386
// └─────────────────────────────────────────────────────────────────────────┘
378387
// │
379-
// ▼ std::sync::mpsc::sync_channel<Bytes>
388+
// ▼ tokio::sync::mpsc::channel<Bytes>
380389
// │ (bounded, ~32MB buffer)
381390
// ▼
382391
// ┌─────────────────────────────────────────────────────────────────────────┐
@@ -401,19 +410,21 @@ impl<R: Read> BufRead for JsonArrayToNdjsonReader<R> {
401410
// - Miscellaneous: ~4MB
402411
//
403412

404-
/// A synchronous `Read` implementation that receives bytes from a channel.
413+
/// A synchronous `Read` implementation that receives bytes from an async channel.
405414
///
406415
/// This enables true streaming between async and sync contexts without
407-
/// loading the entire file into memory.
416+
/// loading the entire file into memory. Uses `tokio::sync::mpsc::Receiver`
417+
/// with `blocking_recv()` so the async producer never blocks a tokio worker
418+
/// thread, while the sync consumer (running in `spawn_blocking`) safely blocks.
408419
pub struct ChannelReader {
409-
rx: Receiver<Bytes>,
420+
rx: tokio::sync::mpsc::Receiver<Bytes>,
410421
current: Option<Bytes>,
411422
pos: usize,
412423
}
413424

414425
impl ChannelReader {
415-
/// Create a new ChannelReader from a receiver.
416-
pub fn new(rx: Receiver<Bytes>) -> Self {
426+
/// Create a new ChannelReader from a tokio mpsc receiver.
427+
pub fn new(rx: tokio::sync::mpsc::Receiver<Bytes>) -> Self {
417428
Self {
418429
rx,
419430
current: None,
@@ -437,13 +448,13 @@ impl Read for ChannelReader {
437448
}
438449

439450
// Current chunk exhausted, get next from channel
440-
match self.rx.recv() {
441-
Ok(bytes) => {
451+
match self.rx.blocking_recv() {
452+
Some(bytes) => {
442453
self.current = Some(bytes);
443454
self.pos = 0;
444455
// Loop back to read from new chunk
445456
}
446-
Err(_) => return Ok(0), // Channel closed = EOF
457+
None => return Ok(0), // Channel closed = EOF
447458
}
448459
}
449460
}
@@ -596,6 +607,39 @@ mod tests {
596607
);
597608
}
598609

610+
#[test]
611+
fn test_json_array_with_leading_junk() {
612+
let input = r#"junk[{"a":1}, {"a":2}]"#;
613+
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
614+
let mut output = String::new();
615+
reader.read_to_string(&mut output).unwrap();
616+
617+
// Should still extract the valid array content
618+
assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
619+
620+
// But validation should catch the leading junk
621+
let result = reader.validate_complete();
622+
assert!(result.is_err());
623+
let err_msg = result.unwrap_err().to_string();
624+
assert!(
625+
err_msg.contains("leading content"),
626+
"Expected leading content error, got: {err_msg}"
627+
);
628+
}
629+
630+
#[test]
631+
fn test_json_array_with_leading_whitespace_ok() {
632+
let input = r#"
633+
[{"a":1}, {"a":2}]"#;
634+
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
635+
let mut output = String::new();
636+
reader.read_to_string(&mut output).unwrap();
637+
assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
638+
639+
// Leading whitespace should be fine
640+
reader.validate_complete().unwrap();
641+
}
642+
599643
#[test]
600644
fn test_validate_complete_valid_with_trailing_whitespace() {
601645
let input = r#"[{"a":1},{"a":2}]
@@ -690,11 +734,11 @@ mod tests {
690734
/// Test ChannelReader
691735
#[test]
692736
fn test_channel_reader() {
693-
let (tx, rx) = std::sync::mpsc::sync_channel(4);
737+
let (tx, rx) = tokio::sync::mpsc::channel(4);
694738

695-
// Send some chunks
696-
tx.send(Bytes::from("Hello, ")).unwrap();
697-
tx.send(Bytes::from("World!")).unwrap();
739+
// Send some chunks (try_send is non-async)
740+
tx.try_send(Bytes::from("Hello, ")).unwrap();
741+
tx.try_send(Bytes::from("World!")).unwrap();
698742
drop(tx); // Close channel
699743

700744
let mut reader = ChannelReader::new(rx);
@@ -707,9 +751,9 @@ mod tests {
707751
/// Test ChannelReader with small reads
708752
#[test]
709753
fn test_channel_reader_small_reads() {
710-
let (tx, rx) = std::sync::mpsc::sync_channel(4);
754+
let (tx, rx) = tokio::sync::mpsc::channel(4);
711755

712-
tx.send(Bytes::from("ABCDEFGHIJ")).unwrap();
756+
tx.try_send(Bytes::from("ABCDEFGHIJ")).unwrap();
713757
drop(tx);
714758

715759
let mut reader = ChannelReader::new(rx);

0 commit comments

Comments
 (0)