Skip to content

Commit 13cebf8

Browse files
authored
FFI_TableOptions are using default values only (#20721)
## Which issue does this PR close? - Closes #20704 ## Rationale for this change FFI_TableOptions fails with a warning that is getting swallowed in the unit tests. ## What changes are included in this PR? Correctly check format for table options. ## Are these changes tested? Unit tests updated. ## Are there any user-facing changes? None, internal only. ## Context Related to #20705 but targetting `main`.
1 parent 00e36e8 commit 13cebf8

1 file changed

Lines changed: 95 additions & 12 deletions

File tree

  • datafusion/ffi/src/session

datafusion/ffi/src/session/mod.rs

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use arrow_schema::SchemaRef;
2626
use arrow_schema::ffi::FFI_ArrowSchema;
2727
use async_ffi::{FfiFuture, FutureExt};
2828
use async_trait::async_trait;
29-
use datafusion_common::config::{ConfigOptions, TableOptions};
29+
use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
3030
use datafusion_common::{DFSchema, DataFusionError};
3131
use datafusion_execution::TaskContext;
3232
use datafusion_execution::config::SessionConfig;
@@ -240,20 +240,38 @@ unsafe extern "C" fn window_functions_fn_wrapper(
240240
.collect()
241241
}
242242

243-
fn table_options_to_rhash(options: &TableOptions) -> RHashMap<RString, RString> {
244-
options
243+
fn table_options_to_rhash(mut options: TableOptions) -> RHashMap<RString, RString> {
244+
// It is important that we mutate options here and set current format
245+
// to None so that when we call `entries()` we get ALL format entries.
246+
// We will pass current_format as a special case and strip it on the
247+
// other side of the boundary.
248+
let current_format = options.current_format.take();
249+
let mut options: HashMap<RString, RString> = options
245250
.entries()
246251
.into_iter()
247252
.filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
248-
.collect()
253+
.collect();
254+
if let Some(current_format) = current_format {
255+
options.insert(
256+
"datafusion_ffi.table_current_format".into(),
257+
match current_format {
258+
ConfigFileType::JSON => "json",
259+
ConfigFileType::PARQUET => "parquet",
260+
ConfigFileType::CSV => "csv",
261+
}
262+
.into(),
263+
);
264+
}
265+
266+
options.into()
249267
}
250268

251269
unsafe extern "C" fn table_options_fn_wrapper(
252270
session: &FFI_SessionRef,
253271
) -> RHashMap<RString, RString> {
254272
let session = session.inner();
255273
let table_options = session.table_options();
256-
table_options_to_rhash(table_options)
274+
table_options_to_rhash(table_options.clone())
257275
}
258276

259277
unsafe extern "C" fn default_table_options_fn_wrapper(
@@ -262,7 +280,7 @@ unsafe extern "C" fn default_table_options_fn_wrapper(
262280
let session = session.inner();
263281
let table_options = session.default_table_options();
264282

265-
table_options_to_rhash(&table_options)
283+
table_options_to_rhash(table_options)
266284
}
267285

268286
unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> FFI_TaskContext {
@@ -438,15 +456,70 @@ impl Clone for FFI_SessionRef {
438456
}
439457

440458
fn table_options_from_rhashmap(options: RHashMap<RString, RString>) -> TableOptions {
441-
let options = options
459+
let mut options: HashMap<String, String> = options
442460
.into_iter()
443461
.map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
444462
.collect();
463+
let current_format = options.remove("datafusion_ffi.table_current_format");
464+
465+
let mut table_options = TableOptions::default();
466+
let formats = [
467+
ConfigFileType::CSV,
468+
ConfigFileType::JSON,
469+
ConfigFileType::PARQUET,
470+
];
471+
for format in formats {
472+
// It is imperative that if new enum variants are added below that they be
473+
// included in the formats list above and in the extension check below.
474+
let format_name = match &format {
475+
ConfigFileType::CSV => "csv",
476+
ConfigFileType::PARQUET => "parquet",
477+
ConfigFileType::JSON => "json",
478+
};
479+
let format_options: HashMap<String, String> = options
480+
.iter()
481+
.filter_map(|(k, v)| {
482+
let (prefix, key) = k.split_once(".")?;
483+
if prefix == format_name {
484+
Some((format!("format.{key}"), v.to_owned()))
485+
} else {
486+
None
487+
}
488+
})
489+
.collect();
490+
if !format_options.is_empty() {
491+
table_options.current_format = Some(format.clone());
492+
table_options
493+
.alter_with_string_hash_map(&format_options)
494+
.unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
495+
}
496+
}
497+
498+
let extension_options: HashMap<String, String> = options
499+
.iter()
500+
.filter_map(|(k, v)| {
501+
let (prefix, _) = k.split_once(".")?;
502+
if !["json", "parquet", "csv"].contains(&prefix) {
503+
Some((k.to_owned(), v.to_owned()))
504+
} else {
505+
None
506+
}
507+
})
508+
.collect();
509+
if !extension_options.is_empty() {
510+
table_options
511+
.alter_with_string_hash_map(&extension_options)
512+
.unwrap_or_else(|err| log::warn!("Error parsing table options: {err}"));
513+
}
445514

446-
TableOptions::from_string_hash_map(&options).unwrap_or_else(|err| {
447-
log::warn!("Error parsing default table options: {err}");
448-
TableOptions::default()
449-
})
515+
table_options.current_format =
516+
current_format.and_then(|format| match format.as_str() {
517+
"csv" => Some(ConfigFileType::CSV),
518+
"parquet" => Some(ConfigFileType::PARQUET),
519+
"json" => Some(ConfigFileType::JSON),
520+
_ => None,
521+
});
522+
table_options
450523
}
451524

452525
#[async_trait]
@@ -556,6 +629,7 @@ mod tests {
556629
use std::sync::Arc;
557630

558631
use arrow_schema::{DataType, Field, Schema};
632+
use datafusion::execution::SessionStateBuilder;
559633
use datafusion_common::DataFusionError;
560634
use datafusion_expr::col;
561635
use datafusion_expr::registry::FunctionRegistry;
@@ -566,7 +640,16 @@ mod tests {
566640
#[tokio::test]
567641
async fn test_ffi_session() -> Result<(), DataFusionError> {
568642
let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
569-
let state = ctx.state();
643+
let mut table_options = TableOptions::default();
644+
table_options.csv.has_header = Some(true);
645+
table_options.json.schema_infer_max_rec = Some(10);
646+
table_options.parquet.global.coerce_int96 = Some("123456789".into());
647+
table_options.current_format = Some(ConfigFileType::JSON);
648+
649+
let state = SessionStateBuilder::new_from_existing(ctx.state())
650+
.with_table_options(table_options)
651+
.build();
652+
570653
let logical_codec = FFI_LogicalExtensionCodec::new(
571654
Arc::new(DefaultLogicalExtensionCodec {}),
572655
None,

0 commit comments

Comments
 (0)