Skip to content

Commit 90c5d1f

Browse files
authored
feat: DH-21522: allow column region optimizations in Predicate Pushdown filtering (#7666)
This PR refactors predicate pushdown filtering for regioned column sources to support both table-location and per-column-region optimizations, enabling more granular (region-level) pushdown planning and execution. **Changes:** - Introduces a new `RegionedPushdownAction` model (Location vs Region actions) and new regioned pushdown filter context types. - Updates regioned pushdown execution to run per-region and merge results, enabling column-region pushdown participation. - Refactors `ParquetTableLocation` pushdown logic from an internal enum to the new action-based API and updates the engine interfaces accordingly. Code Coverage Summary: - ImmutableConstant[Type]Source - makePushdownFilterContext / estimatePushdownFilterCost at 100% - pushdownFilter effectively 100% (empty selection detection not tested, might not be reachable without dedicated test)
1 parent f0b0041 commit 90c5d1f

95 files changed

Lines changed: 3558 additions & 1526 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

engine/chunk/src/main/java/io/deephaven/chunk/WritableBooleanChunk.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import io.deephaven.chunk.attributes.Any;
1111
import io.deephaven.chunk.util.pools.MultiChunkPool;
1212

13+
import io.deephaven.function.ArraySort;
1314
import io.deephaven.util.type.TypeUtils;
15+
import org.jetbrains.annotations.NotNull;
1416

1517
import java.util.Arrays;
1618
// region FillWithNullValueImports

engine/chunk/src/main/java/io/deephaven/chunk/WritableObjectChunk.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import io.deephaven.chunk.util.pools.MultiChunkPool;
1212

1313
import io.deephaven.function.ArraySort;
14+
import io.deephaven.util.type.TypeUtils;
15+
import org.jetbrains.annotations.NotNull;
1416

1517
import java.util.Arrays;
1618
// region FillWithNullValueImports

engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void estimatePushdownFilterCost(
164164
final LongConsumer onComplete,
165165
final Consumer<Exception> onError) {
166166
// Default to having no benefit by pushing down.
167-
onComplete.accept(Long.MAX_VALUE);
167+
onComplete.accept(PushdownResult.UNSUPPORTED_ACTION_COST);
168168
}
169169

170170
@Override

engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.stream.Collectors;
3737
import java.util.stream.Stream;
3838

39+
import static io.deephaven.engine.table.impl.PushdownResult.UNSUPPORTED_ACTION_COST;
40+
3941
/**
4042
* The AbstractFilterExecution incorporates the idea that we have an added and modified RowSet to filter and that there
4143
* are a resulting pair of added and modified rows representing what was filtered. There is also the possibility that we
@@ -210,7 +212,7 @@ private class StatelessFilter implements Comparable<StatelessFilter>, SafeClosea
210212
/**
211213
* The cost of the pushdown filter operation.
212214
*/
213-
public long pushdownFilterCost = Long.MAX_VALUE;
215+
public long pushdownFilterCost = UNSUPPORTED_ACTION_COST;
214216
/**
215217
* The result of the pushdown filter operation, or null if pushdown is not supported.
216218
*/
@@ -250,12 +252,13 @@ public StatelessFilter(
250252

251253
/**
252254
* Schedules pushdown filter cost estimation for {@link #pushdownMatcher}. After {@link #pushdownFilterCost} has
253-
* been set (or set to {@link Long#MAX_VALUE} if pushdown is not supported), {@code onComplete} will be called.
255+
* been set (or set to {@link PushdownResult#UNSUPPORTED_ACTION_COST} if pushdown is not supported),
256+
* {@code onComplete} will be called.
254257
*/
255258
public void scheduleUpdatePushdownFilterCost(final RowSet selection, final Runnable onComplete,
256259
final Consumer<Exception> onError) {
257260
if (pushdownMatcher == null) {
258-
pushdownFilterCost = Long.MAX_VALUE;
261+
pushdownFilterCost = UNSUPPORTED_ACTION_COST;
259262
onComplete.run();
260263
return;
261264
}
@@ -443,7 +446,7 @@ private void executeStatelessFilter(
443446
};
444447

445448
final RowSet input = localInput.get();
446-
if (sf.pushdownMatcher != null && sf.pushdownFilterCost < Long.MAX_VALUE) {
449+
if (sf.pushdownMatcher != null && sf.pushdownFilterCost != UNSUPPORTED_ACTION_COST) {
447450
// Execute the pushdown filter and return.
448451
sf.pushdownMatcher.pushdownFilter(sf.filter, input, usePrev, sf.context,
449452
costCeiling, jobScheduler(), onPushdownComplete, filterNec);

0 commit comments

Comments
 (0)