Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ strum_macros = "0.27.2"
tempfile = "3"
testcontainers-modules = { version = "0.14" }
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
tokio-stream = "0.1"
tokio-util = "0.7"
url = "2.5.7"
uuid = "1.20"
zstd = { version = "0.13", default-features = false }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
true,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,22 @@ config_namespace! {
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
/// The JSON format to use when reading files.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub newline_delimited: bool, default = true
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
/// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?;
/// // expand into multiple columns if it's json array, flatten field name if it's nested structure
/// let df = df.unnest_columns(&["b","c","d"])?;
/// let expected = vec![
Expand Down
252 changes: 246 additions & 6 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod tests {
use super::*;

use crate::datasource::file_format::test_util::scan_format;
use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use arrow::array::RecordBatch;
use arrow_schema::Schema;
Expand All @@ -46,12 +46,54 @@ mod tests {
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;

use crate::execution::options::JsonReadOptions;
use datafusion_common::Result;
use datafusion_datasource::file_compression_type::FileCompressionType;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use regex::Regex;
use rstest::rstest;
// ==================== Test Helpers ====================

/// Create a temporary JSON file and return (TempDir, path)
fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
let tmp_dir = tempfile::TempDir::new().unwrap();
let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
std::fs::write(&path, content).unwrap();
(tmp_dir, path)
}

/// Infer schema from JSON array format file
async fn infer_json_array_schema(
content: &str,
) -> Result<arrow::datatypes::SchemaRef> {
let (_tmp_dir, path) = create_temp_json(content);
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;
let format = JsonFormat::default().with_newline_delimited(false);
format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
}

/// Register a JSON array table and run a query
async fn query_json_array(content: &str, query: &str) -> Result<Vec<RecordBatch>> {
let (_tmp_dir, path) = create_temp_json(content);
let ctx = SessionContext::new();
let options = JsonReadOptions::default().newline_delimited(false);
ctx.register_json("test_table", &path, options).await?;
ctx.sql(query).await?.collect().await
}

/// Register a JSON array table and run a query, return formatted string
async fn query_json_array_str(content: &str, query: &str) -> Result<String> {
let result = query_json_array(content, query).await?;
Ok(batches_to_string(&result))
}

// ==================== Existing Tests ====================

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -208,7 +250,7 @@ mod tests {
let ctx = SessionContext::new_with_config(config);

let table_path = "tests/data/1.json";
let options = NdJsonReadOptions::default();
let options = JsonReadOptions::default();

ctx.register_json("json_parallel", table_path, options)
.await?;
Expand Down Expand Up @@ -240,7 +282,7 @@ mod tests {
let ctx = SessionContext::new_with_config(config);

let table_path = "tests/data/empty.json";
let options = NdJsonReadOptions::default();
let options = JsonReadOptions::default();

ctx.register_json("json_parallel_empty", table_path, options)
.await?;
Expand Down Expand Up @@ -314,7 +356,6 @@ mod tests {
.digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());

let mut all_batches = RecordBatch::new_empty(schema.clone());
// We get RequiresMoreData after 2 batches because of how json::Decoder works
for _ in 0..2 {
let output = deserializer.next()?;
let DeserializerOutput::RecordBatch(batch) = output else {
Expand Down Expand Up @@ -358,7 +399,6 @@ mod tests {
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Expand All @@ -385,10 +425,210 @@ mod tests {
let df = ctx.read_batch(empty_batch.clone())?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

// ==================== JSON Array Format Tests ====================

#[tokio::test]
async fn test_json_array_schema_inference() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#,
)
.await?;

let fields: Vec<_> = schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
Ok(())
}

#[tokio::test]
async fn test_json_array_empty() -> Result<()> {
let schema = infer_json_array_schema("[]").await?;
assert_eq!(schema.fields().len(), 0);
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#,
)
.await?;

let info_field = schema.field_with_name("info").unwrap();
assert!(matches!(info_field.data_type(), DataType::Struct(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_list_type() -> Result<()> {
let schema =
infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?;

let tags_field = schema.field_with_name("tags").unwrap();
assert!(matches!(tags_field.data_type(), DataType::List(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_basic_query() -> Result<()> {
let result = query_json_array_str(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#,
"SELECT a, b FROM test_table ORDER BY a",
)
.await?;

assert_snapshot!(result, @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
| 3 | test |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_nulls() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#,
"SELECT id, name FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+
| id | name |
+----+---------+
| 1 | Alice |
| 2 | |
| 3 | Charlie |
+----+---------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#,
"SELECT id, unnest(values) as value FROM test_table ORDER BY id, value",
)
.await?;

assert_snapshot!(result, @r"
+----+-------+
| id | value |
+----+-------+
| 1 | 10 |
| 1 | 20 |
| 1 | 30 |
| 2 | 40 |
| 2 | 50 |
+----+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest_struct() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#,
"SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+-----+
| id | product | qty |
+----+---------+-----+
| 1 | A | 2 |
| 1 | B | 3 |
| 2 | C | 1 |
+----+---------+-----+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct_access() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#,
"SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+-------------+-------+
| id | dept_name | head |
+----+-------------+-------+
| 1 | Engineering | Alice |
| 2 | Sales | Bob |
+----+-------------+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_compression() -> Result<()> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());

let file = std::fs::File::create(&path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
encoder.write_all(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(),
)?;
encoder.finish()?;

let ctx = SessionContext::new();
let options = JsonReadOptions::default()
.newline_delimited(false)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".json.gz");

ctx.register_json("test_table", &path, options).await?;
let result = ctx
.sql("SELECT a, b FROM test_table ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_list_of_structs() -> Result<()> {
let batches = query_json_array(
r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#,
"SELECT id, items FROM test_table ORDER BY id",
)
.await?;

assert_eq!(1, batches.len());
assert_eq!(2, batches[0].num_rows());
Ok(())
}
}
Loading