Skip to content

Commit aae3e0f

Browse files
authored
refactor: make PhysicalExprAdatperFactory::create fallible (#20017)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19956 . ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, covered by existing unit tests. ## Are there any user-facing changes? `PhysicalExprAdapterFactory::create` now returns a result and I think this is a breaking change. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent adb8c8a commit aae3e0f

8 files changed

Lines changed: 64 additions & 41 deletions

File tree

datafusion-examples/examples/custom_data_source/custom_file_casts.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ impl PhysicalExprAdapterFactory for CustomCastPhysicalExprAdapterFactory {
156156
&self,
157157
logical_file_schema: SchemaRef,
158158
physical_file_schema: SchemaRef,
159-
) -> Arc<dyn PhysicalExprAdapter> {
159+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
160160
let inner = self
161161
.inner
162-
.create(logical_file_schema, Arc::clone(&physical_file_schema));
163-
Arc::new(CustomCastsPhysicalExprAdapter {
162+
.create(logical_file_schema, Arc::clone(&physical_file_schema))?;
163+
Ok(Arc::new(CustomCastsPhysicalExprAdapter {
164164
physical_file_schema,
165165
inner,
166-
})
166+
}))
167167
}
168168
}
169169

datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,18 +278,18 @@ impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
278278
&self,
279279
logical_file_schema: SchemaRef,
280280
physical_file_schema: SchemaRef,
281-
) -> Arc<dyn PhysicalExprAdapter> {
281+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
282282
let default_factory = DefaultPhysicalExprAdapterFactory;
283283
let default_adapter = default_factory.create(
284284
Arc::clone(&logical_file_schema),
285285
Arc::clone(&physical_file_schema),
286-
);
286+
)?;
287287

288-
Arc::new(DefaultValuePhysicalExprAdapter {
288+
Ok(Arc::new(DefaultValuePhysicalExprAdapter {
289289
logical_file_schema,
290290
physical_file_schema,
291291
default_adapter,
292-
})
292+
}))
293293
}
294294
}
295295

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,17 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
275275
&self,
276276
logical_file_schema: SchemaRef,
277277
physical_file_schema: SchemaRef,
278-
) -> Arc<dyn PhysicalExprAdapter> {
278+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
279279
let default_factory = DefaultPhysicalExprAdapterFactory;
280280
let default_adapter = default_factory.create(
281281
Arc::clone(&logical_file_schema),
282282
Arc::clone(&physical_file_schema),
283-
);
283+
)?;
284284

285-
Arc::new(ShreddedJsonRewriter {
285+
Ok(Arc::new(ShreddedJsonRewriter {
286286
physical_file_schema,
287287
default_adapter,
288-
})
288+
}))
289289
}
290290
}
291291

datafusion/core/src/datasource/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,10 @@ mod tests {
149149
&self,
150150
_logical_file_schema: SchemaRef,
151151
physical_file_schema: SchemaRef,
152-
) -> Arc<dyn PhysicalExprAdapter> {
153-
Arc::new(TestPhysicalExprAdapter {
152+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
153+
Ok(Arc::new(TestPhysicalExprAdapter {
154154
physical_file_schema,
155-
})
155+
}))
156156
}
157157
}
158158

datafusion/core/tests/parquet/expr_adapter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory {
6363
&self,
6464
logical_file_schema: SchemaRef,
6565
physical_file_schema: SchemaRef,
66-
) -> Arc<dyn PhysicalExprAdapter> {
67-
Arc::new(CustomPhysicalExprAdapter {
66+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
67+
Ok(Arc::new(CustomPhysicalExprAdapter {
6868
logical_file_schema: Arc::clone(&logical_file_schema),
6969
physical_file_schema: Arc::clone(&physical_file_schema),
7070
inner: Arc::new(DefaultPhysicalExprAdapter::new(
7171
logical_file_schema,
7272
physical_file_schema,
7373
)),
74-
})
74+
}))
7575
}
7676
}
7777

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ impl FileOpener for ParquetOpener {
412412
let rewriter = expr_adapter_factory.create(
413413
Arc::clone(&logical_file_schema),
414414
Arc::clone(&physical_file_schema),
415-
);
415+
)?;
416416
let simplifier = PhysicalExprSimplifier::new(&physical_file_schema);
417417
predicate = predicate
418418
.map(|p| simplifier.simplify(rewriter.rewrite(p)?))

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,7 @@ mod test {
739739
let expr = logical2physical(&expr, &table_schema);
740740
let expr = DefaultPhysicalExprAdapterFactory {}
741741
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
742+
.expect("creating expr adapter")
742743
.rewrite(expr)
743744
.expect("rewriting expression");
744745
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
@@ -778,6 +779,7 @@ mod test {
778779
// Rewrite the expression to add CastExpr for type coercion
779780
let expr = DefaultPhysicalExprAdapterFactory {}
780781
.create(Arc::new(table_schema), Arc::clone(&file_schema))
782+
.expect("creating expr adapter")
781783
.rewrite(expr)
782784
.expect("rewriting expression");
783785
let candidate = FilterCandidateBuilder::new(expr, file_schema)

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ where
141141
/// &self,
142142
/// logical_file_schema: SchemaRef,
143143
/// physical_file_schema: SchemaRef,
144-
/// ) -> Arc<dyn PhysicalExprAdapter> {
145-
/// Arc::new(CustomPhysicalExprAdapter {
144+
/// ) -> Result<Arc<dyn PhysicalExprAdapter>> {
145+
/// Ok(Arc::new(CustomPhysicalExprAdapter {
146146
/// logical_file_schema,
147147
/// physical_file_schema,
148-
/// })
148+
/// }))
149149
/// }
150150
/// }
151151
/// ```
@@ -178,7 +178,7 @@ pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug {
178178
&self,
179179
logical_file_schema: SchemaRef,
180180
physical_file_schema: SchemaRef,
181-
) -> Arc<dyn PhysicalExprAdapter>;
181+
) -> Result<Arc<dyn PhysicalExprAdapter>>;
182182
}
183183

184184
#[derive(Debug, Clone)]
@@ -189,11 +189,11 @@ impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory {
189189
&self,
190190
logical_file_schema: SchemaRef,
191191
physical_file_schema: SchemaRef,
192-
) -> Arc<dyn PhysicalExprAdapter> {
193-
Arc::new(DefaultPhysicalExprAdapter {
192+
) -> Result<Arc<dyn PhysicalExprAdapter>> {
193+
Ok(Arc::new(DefaultPhysicalExprAdapter {
194194
logical_file_schema,
195195
physical_file_schema,
196-
})
196+
}))
197197
}
198198
}
199199

@@ -232,7 +232,8 @@ impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory {
232232
/// # logical_file_schema: &Schema,
233233
/// # ) -> datafusion_common::Result<()> {
234234
/// let factory = DefaultPhysicalExprAdapterFactory;
235-
/// let adapter = factory.create(Arc::new(logical_file_schema.clone()), Arc::new(physical_file_schema.clone()));
235+
/// let adapter =
236+
/// factory.create(Arc::new(logical_file_schema.clone()), Arc::new(physical_file_schema.clone()))?;
236237
/// let adapted_predicate = adapter.rewrite(predicate)?;
237238
/// # Ok(())
238239
/// # }
@@ -571,7 +572,7 @@ impl BatchAdapterFactory {
571572
pub fn make_adapter(&self, source_schema: SchemaRef) -> Result<BatchAdapter> {
572573
let expr_adapter = self
573574
.expr_adapter_factory
574-
.create(Arc::clone(&self.target_schema), Arc::clone(&source_schema));
575+
.create(Arc::clone(&self.target_schema), Arc::clone(&source_schema))?;
575576

576577
let simplifier = PhysicalExprSimplifier::new(&self.target_schema);
577578

@@ -647,7 +648,9 @@ mod tests {
647648
let (physical_schema, logical_schema) = create_test_schema();
648649

649650
let factory = DefaultPhysicalExprAdapterFactory;
650-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
651+
let adapter = factory
652+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
653+
.unwrap();
651654
let column_expr = Arc::new(Column::new("a", 0));
652655

653656
let result = adapter.rewrite(column_expr).unwrap();
@@ -660,7 +663,9 @@ mod tests {
660663
fn test_rewrite_multi_column_expr_with_type_cast() {
661664
let (physical_schema, logical_schema) = create_test_schema();
662665
let factory = DefaultPhysicalExprAdapterFactory;
663-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
666+
let adapter = factory
667+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
668+
.unwrap();
664669

665670
// Create a complex expression: (a + 5) OR (c > 0.0) that tests the recursive case of the rewriter
666671
let column_a = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
@@ -725,7 +730,9 @@ mod tests {
725730
)]);
726731

727732
let factory = DefaultPhysicalExprAdapterFactory;
728-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
733+
let adapter = factory
734+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
735+
.unwrap();
729736
let column_expr = Arc::new(Column::new("data", 0));
730737

731738
let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
@@ -763,7 +770,9 @@ mod tests {
763770
)]);
764771

765772
let factory = DefaultPhysicalExprAdapterFactory;
766-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
773+
let adapter = factory
774+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
775+
.unwrap();
767776
let column_expr = Arc::new(Column::new("data", 0));
768777

769778
let result = adapter.rewrite(column_expr).unwrap();
@@ -803,7 +812,9 @@ mod tests {
803812
let (physical_schema, logical_schema) = create_test_schema();
804813

805814
let factory = DefaultPhysicalExprAdapterFactory;
806-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
815+
let adapter = factory
816+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
817+
.unwrap();
807818
let column_expr = Arc::new(Column::new("c", 2));
808819

809820
let result = adapter.rewrite(column_expr)?;
@@ -827,7 +838,9 @@ mod tests {
827838
]);
828839

829840
let factory = DefaultPhysicalExprAdapterFactory;
830-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
841+
let adapter = factory
842+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
843+
.unwrap();
831844
let column_expr = Arc::new(Column::new("b", 1));
832845

833846
let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
@@ -843,7 +856,9 @@ mod tests {
843856
]);
844857

845858
let factory = DefaultPhysicalExprAdapterFactory;
846-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
859+
let adapter = factory
860+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
861+
.unwrap();
847862
let column_expr = Arc::new(Column::new("b", 1));
848863

849864
let result = adapter.rewrite(column_expr).unwrap();
@@ -909,7 +924,9 @@ mod tests {
909924
let (physical_schema, logical_schema) = create_test_schema();
910925

911926
let factory = DefaultPhysicalExprAdapterFactory;
912-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
927+
let adapter = factory
928+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
929+
.unwrap();
913930
let column_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
914931

915932
let result = adapter.rewrite(Arc::clone(&column_expr))?;
@@ -933,7 +950,9 @@ mod tests {
933950
]);
934951

935952
let factory = DefaultPhysicalExprAdapterFactory;
936-
let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
953+
let adapter = factory
954+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
955+
.unwrap();
937956
let column_expr = Arc::new(Column::new("b", 1));
938957

939958
let result = adapter.rewrite(column_expr);
@@ -991,8 +1010,9 @@ mod tests {
9911010
];
9921011

9931012
let factory = DefaultPhysicalExprAdapterFactory;
994-
let adapter =
995-
factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
1013+
let adapter = factory
1014+
.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema))
1015+
.unwrap();
9961016

9971017
let adapted_projection = projection
9981018
.into_iter()
@@ -1093,8 +1113,9 @@ mod tests {
10931113
let projection = vec![col("data", &logical_schema).unwrap()];
10941114

10951115
let factory = DefaultPhysicalExprAdapterFactory;
1096-
let adapter =
1097-
factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
1116+
let adapter = factory
1117+
.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema))
1118+
.unwrap();
10981119

10991120
let adapted_projection = projection
11001121
.into_iter()

0 commit comments

Comments
 (0)