1919
2020use crate :: config:: ExecutionConfig ;
2121use crate :: extensions:: { DftSessionStateBuilder , Extension } ;
22- use deltalake:: delta_datafusion:: DeltaTableFactory ;
22+ use datafusion:: catalog:: { Session , TableProviderFactory } ;
23+ use datafusion:: common:: DataFusionError ;
24+ use datafusion:: logical_expr:: logical_plan:: CreateExternalTable ;
25+ use deltalake:: table:: builder:: ensure_table_uri;
26+ use std:: collections:: HashMap ;
2327use std:: sync:: Arc ;
2428
2529#[ derive( Debug , Default ) ]
@@ -31,20 +35,66 @@ impl DeltaLakeExtension {
3135 }
3236}
3337
38+ /// A wrapper around delta-rs's table factory that normalizes option keys passed
39+ /// through DataFusion's `CREATE EXTERNAL TABLE ... OPTIONS (...)` clause.
40+ ///
41+ /// DataFusion's `parse_options_map` prepends `"format."` to any option key that
42+ /// does not contain a dot (e.g. `'aws_endpoint' '...'` becomes `"format.aws_endpoint"`
43+ /// in `cmd.options`). delta-rs's `S3ObjectStoreFactory` parses these keys via
44+ /// `AmazonS3ConfigKey::from_str`, which does not recognise the `"format."` prefix,
45+ /// so credentials and endpoint settings are silently ignored and the AWS SDK
46+ /// credential chain is invoked instead.
47+ ///
48+ /// This factory strips the leading `"format."` prefix before forwarding the options
49+ /// to `open_table_with_storage_options`, allowing callers to supply S3 credentials
50+ /// directly in the DDL without needing ambient environment variables.
51+ #[ derive( Debug , Default ) ]
52+ struct DeltaTableFactory { }
53+
54+ #[ async_trait:: async_trait]
55+ impl TableProviderFactory for DeltaTableFactory {
56+ async fn create (
57+ & self ,
58+ _ctx : & dyn Session ,
59+ cmd : & CreateExternalTable ,
60+ ) -> datafusion:: error:: Result < Arc < dyn datafusion:: catalog:: TableProvider > > {
61+ let table_url = ensure_table_uri ( & cmd. location )
62+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
63+
64+ let provider = if cmd. options . is_empty ( ) {
65+ deltalake:: open_table ( table_url)
66+ . await
67+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?
68+ } else {
69+ // DataFusion prepends "format." to option keys that don't contain a dot.
70+ // Strip that prefix so delta-rs can recognise the keys.
71+ let options: HashMap < String , String > = cmd
72+ . options
73+ . iter ( )
74+ . map ( |( k, v) | {
75+ let key = k
76+ . strip_prefix ( "format." )
77+ . map ( str:: to_string)
78+ . unwrap_or_else ( || k. clone ( ) ) ;
79+ ( key, v. clone ( ) )
80+ } )
81+ . collect ( ) ;
82+ deltalake:: open_table_with_storage_options ( table_url, options)
83+ . await
84+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?
85+ } ;
86+
87+ Ok ( Arc :: new ( provider) )
88+ }
89+ }
90+
3491#[ async_trait:: async_trait]
3592impl Extension for DeltaLakeExtension {
3693 async fn register (
3794 & self ,
3895 _config : ExecutionConfig ,
3996 builder : & mut DftSessionStateBuilder ,
4097 ) -> datafusion:: common:: Result < ( ) > {
41- // Register S3 handlers if s3 feature is enabled
42- // This is required for Delta Lake to recognize s3:// URLs
43- #[ cfg( feature = "s3" ) ]
44- {
45- deltalake:: aws:: register_handlers ( None ) ;
46- }
47-
4898 builder. add_table_factory ( "DELTATABLE" , Arc :: new ( DeltaTableFactory { } ) ) ;
4999 Ok ( ( ) )
50100 }
0 commit comments