Skip to content

Commit 2e51003

Browse files
committed
make PhysicalExprAdatperFactory::create fallible
1 parent b2c29ac commit 2e51003

8 files changed

Lines changed: 64 additions & 41 deletions

File tree

datafusion-examples/examples/custom_data_source/custom_file_casts.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ impl PhysicalExprAdapterFactory for CustomCastPhysicalExprAdapterFactory {
156156
&self,
157157
logical_file_schema: SchemaRef,
158158
physical_file_schema: SchemaRef,
159-
) -> Arc<dyn PhysicalExprAdapter> {
159+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
160160
let inner = self
161161
.inner
162-
.create(logical_file_schema, Arc::clone(&physical_file_schema));
163-
Arc::new(CustomCastsPhysicalExprAdapter {
162+
.create(logical_file_schema, Arc::clone(&physical_file_schema))?;
163+
Ok(Arc::new(CustomCastsPhysicalExprAdapter {
164164
physical_file_schema,
165165
inner,
166-
})
166+
}))
167167
}
168168
}
169169

datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,18 +278,18 @@ impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
278278
&self,
279279
logical_file_schema: SchemaRef,
280280
physical_file_schema: SchemaRef,
281-
) -> Arc<dyn PhysicalExprAdapter> {
281+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
282282
let default_factory = DefaultPhysicalExprAdapterFactory;
283283
let default_adapter = default_factory.create(
284284
Arc::clone(&logical_file_schema),
285285
Arc::clone(&physical_file_schema),
286-
);
286+
)?;
287287

288-
Arc::new(DefaultValuePhysicalExprAdapter {
288+
Ok(Arc::new(DefaultValuePhysicalExprAdapter {
289289
logical_file_schema,
290290
physical_file_schema,
291291
default_adapter,
292-
})
292+
}))
293293
}
294294
}
295295

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,17 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
275275
&self,
276276
logical_file_schema: SchemaRef,
277277
physical_file_schema: SchemaRef,
278-
) -> Arc<dyn PhysicalExprAdapter> {
278+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
279279
let default_factory = DefaultPhysicalExprAdapterFactory;
280280
let default_adapter = default_factory.create(
281281
Arc::clone(&logical_file_schema),
282282
Arc::clone(&physical_file_schema),
283-
);
283+
)?;
284284

285-
Arc::new(ShreddedJsonRewriter {
285+
Ok(Arc::new(ShreddedJsonRewriter {
286286
physical_file_schema,
287287
default_adapter,
288-
})
288+
}))
289289
}
290290
}
291291

datafusion/core/src/datasource/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,10 @@ mod tests {
149149
&self,
150150
_logical_file_schema: SchemaRef,
151151
physical_file_schema: SchemaRef,
152-
) -> Arc<dyn PhysicalExprAdapter> {
153-
Arc::new(TestPhysicalExprAdapter {
152+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
153+
Ok(Arc::new(TestPhysicalExprAdapter {
154154
physical_file_schema,
155-
})
155+
}))
156156
}
157157
}
158158

datafusion/core/tests/parquet/expr_adapter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory {
6363
&self,
6464
logical_file_schema: SchemaRef,
6565
physical_file_schema: SchemaRef,
66-
) -> Arc<dyn PhysicalExprAdapter> {
67-
Arc::new(CustomPhysicalExprAdapter {
66+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
67+
Ok(Arc::new(CustomPhysicalExprAdapter {
6868
logical_file_schema: Arc::clone(&logical_file_schema),
6969
physical_file_schema: Arc::clone(&physical_file_schema),
7070
inner: Arc::new(DefaultPhysicalExprAdapter::new(
7171
logical_file_schema,
7272
physical_file_schema,
7373
)),
74-
})
74+
}))
7575
}
7676
}
7777

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ impl FileOpener for ParquetOpener {
412412
let rewriter = expr_adapter_factory.create(
413413
Arc::clone(&logical_file_schema),
414414
Arc::clone(&physical_file_schema),
415-
);
415+
)?;
416416
let simplifier = PhysicalExprSimplifier::new(&physical_file_schema);
417417
predicate = predicate
418418
.map(|p| simplifier.simplify(rewriter.rewrite(p)?))

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,7 @@ mod test {
739739
let expr = logical2physical(&expr, &table_schema);
740740
let expr = DefaultPhysicalExprAdapterFactory {}
741741
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
742+
.expect("creating expr adapter")
742743
.rewrite(expr)
743744
.expect("rewriting expression");
744745
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
@@ -778,6 +779,7 @@ mod test {
778779
// Rewrite the expression to add CastExpr for type coercion
779780
let expr = DefaultPhysicalExprAdapterFactory {}
780781
.create(Arc::new(table_schema), Arc::clone(&file_schema))
782+
.expect("creating expr adapter")
781783
.rewrite(expr)
782784
.expect("rewriting expression");
783785
let candidate = FilterCandidateBuilder::new(expr, file_schema)

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ where
141141
/// &self,
142142
/// logical_file_schema: SchemaRef,
143143
/// physical_file_schema: SchemaRef,
144-
/// ) -> Arc<dyn PhysicalExprAdapter> {
145-
/// Arc::new(CustomPhysicalExprAdapter {
144+
/// ) -> Result<Arc<dyn PhysicalExprAdapter>> {
145+
/// Ok(Arc::new(CustomPhysicalExprAdapter {
146146
/// logical_file_schema,
147147
/// physical_file_schema,
148-
/// })
148+
/// }))
149149
/// }
150150
/// }
151151
/// ```
@@ -178,7 +178,7 @@ pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug {
178178
&self,
179179
logical_file_schema: SchemaRef,
180180
physical_file_schema: SchemaRef,
181-
) -> Arc<dyn PhysicalExprAdapter>;
181+
) -> Result<Arc<dyn PhysicalExprAdapter>>;
182182
}
183183

184184
#[derive(Debug, Clone)]
@@ -189,11 +189,11 @@ impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory {
189189
&self,
190190
logical_file_schema: SchemaRef,
191191
physical_file_schema: SchemaRef,
192-
) -> Arc<dyn PhysicalExprAdapter> {
193-
Arc::new(DefaultPhysicalExprAdapter {
192+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
193+
Ok(Arc::new(DefaultPhysicalExprAdapter {
194194
logical_file_schema,
195195
physical_file_schema,
196-
})
196+
}))
197197
}
198198
}
199199

@@ -232,7 +232,8 @@ impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory {
232232
/// # logical_file_schema: &Schema,
233233
/// # ) -> datafusion_common::Result<()> {
234234
/// let factory = DefaultPhysicalExprAdapterFactory;
235-
/// let adapter = factory.create(Arc::new(logical_file_schema.clone()), Arc::new(physical_file_schema.clone()));
235+
/// let adapter =
236+
/// factory.create(Arc::new(logical_file_schema.clone()), Arc::new(physical_file_schema.clone()))?;
236237
/// let adapted_predicate = adapter.rewrite(predicate)?;
237238
/// # Ok(())
238239
/// # }
@@ -571,7 +572,7 @@ impl BatchAdapterFactory {
571572
pub fn make_adapter(&self, source_schema: SchemaRef) -> Result<BatchAdapter> {
572573
let expr_adapter = self
573574
.expr_adapter_factory
574-
.create(Arc::clone(&self.target_schema), Arc::clone(&source_schema));
575+
.create(Arc::clone(&self.target_schema), Arc::clone(&source_schema))?;
575576

576577
let simplifier = PhysicalExprSimplifier::new(&self.target_schema);
577578

@@ -647,7 +648,9 @@ mod tests {
647648
let (physical_schema, logical_schema) = create_test_schema();
648649

649650
let factory = DefaultPhysicalExprAdapterFactory;
650-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
651+
let adapter = factory
652+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
653+
.unwrap();
651654
let column_expr = Arc::new(Column::new("a", 0));
652655

653656
let result = adapter.rewrite(column_expr).unwrap();
@@ -660,7 +663,9 @@ mod tests {
660663
fn test_rewrite_multi_column_expr_with_type_cast() {
661664
let (physical_schema, logical_schema) = create_test_schema();
662665
let factory = DefaultPhysicalExprAdapterFactory;
663-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
666+
let adapter = factory
667+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
668+
.unwrap();
664669

665670
// Create a complex expression: (a + 5) OR (c > 0.0) that tests the recursive case of the rewriter
666671
let column_a = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
@@ -725,7 +730,9 @@ mod tests {
725730
)]);
726731

727732
let factory = DefaultPhysicalExprAdapterFactory;
728-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
733+
let adapter = factory
734+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
735+
.unwrap();
729736
let column_expr = Arc::new(Column::new("data", 0));
730737

731738
let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
@@ -763,7 +770,9 @@ mod tests {
763770
)]);
764771

765772
let factory = DefaultPhysicalExprAdapterFactory;
766-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
773+
let adapter = factory
774+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
775+
.unwrap();
767776
let column_expr = Arc::new(Column::new("data", 0));
768777

769778
let result = adapter.rewrite(column_expr).unwrap();
@@ -803,7 +812,9 @@ mod tests {
803812
let (physical_schema, logical_schema) = create_test_schema();
804813

805814
let factory = DefaultPhysicalExprAdapterFactory;
806-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
815+
let adapter = factory
816+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
817+
.unwrap();
807818
let column_expr = Arc::new(Column::new("c", 2));
808819

809820
let result = adapter.rewrite(column_expr)?;
@@ -827,7 +838,9 @@ mod tests {
827838
]);
828839

829840
let factory = DefaultPhysicalExprAdapterFactory;
830-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
841+
let adapter = factory
842+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
843+
.unwrap();
831844
let column_expr = Arc::new(Column::new("b", 1));
832845

833846
let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
@@ -843,7 +856,9 @@ mod tests {
843856
]);
844857

845858
let factory = DefaultPhysicalExprAdapterFactory;
846-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
859+
let adapter = factory
860+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
861+
.unwrap();
847862
let column_expr = Arc::new(Column::new("b", 1));
848863

849864
let result = adapter.rewrite(column_expr).unwrap();
@@ -909,7 +924,9 @@ mod tests {
909924
let (physical_schema, logical_schema) = create_test_schema();
910925

911926
let factory = DefaultPhysicalExprAdapterFactory;
912-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
927+
let adapter = factory
928+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
929+
.unwrap();
913930
let column_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
914931

915932
let result = adapter.rewrite(Arc::clone(&column_expr))?;
@@ -933,7 +950,9 @@ mod tests {
933950
]);
934951

935952
let factory = DefaultPhysicalExprAdapterFactory;
936-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
953+
let adapter = factory
954+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
955+
.unwrap();
937956
let column_expr = Arc::new(Column::new("b", 1));
938957

939958
let result = adapter.rewrite(column_expr);
@@ -991,8 +1010,9 @@ mod tests {
9911010
];
9921011

9931012
let factory = DefaultPhysicalExprAdapterFactory;
994-
let adapter =
995-
factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
1013+
let adapter = factory
1014+
.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema))
1015+
.unwrap();
9961016

9971017
let adapted_projection = projection
9981018
.into_iter()
@@ -1093,8 +1113,9 @@ mod tests {
10931113
let projection = vec![col("data", &logical_schema).unwrap()];
10941114

10951115
let factory = DefaultPhysicalExprAdapterFactory;
1096-
let adapter =
1097-
factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
1116+
let adapter = factory
1117+
.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema))
1118+
.unwrap();
10981119

10991120
let adapted_projection = projection
11001121
.into_iter()

0 commit comments

Comments
 (0)