Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/source/contributor-guide/adding_a_new_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,14 @@ For operators that run in the JVM:
Example pattern from `CometExecRule.scala`:

```scala
case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
case Some(CometNativeShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case Some(CometColumnarShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
case None => s
}
```

## Common Patterns and Helpers
Expand Down
67 changes: 0 additions & 67 deletions spark/src/main/scala/org/apache/comet/CometFallback.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,29 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes. For now, we are using this to attach the reasons why certain Spark
* operators or expressions are disabled.
* Record a fallback reason on a `TreeNode` (a Spark operator or expression) explaining why
* Comet cannot accelerate it. Reasons recorded here are surfaced in extended explain output
* (see `ExtendedExplainInfo`) and, when `COMET_LOG_FALLBACK_REASONS` is enabled, logged as
* warnings. The reasons are also rolled up from child nodes so that the operator that remains
* in the Spark plan carries the reasons from its converted-away subtree.
*
* Call this in any code path where Comet decides not to convert a given node - serde `convert`
* methods returning `None`, unsupported data types, disabled configs, etc. Do not use this for
* informational messages that are not fallback reasons: anything tagged here is treated by the
* rules as a signal that the node falls back to Spark.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* The Spark operator or expression that is falling back to Spark.
* @param info
* Information text. Optional, may be null or empty. If not provided, then only information
* from child nodes will be included.
* The fallback reason. Optional, may be null or empty - pass empty only when the call is used
* purely to roll up reasons from `exprs`.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* Child nodes whose own fallback reasons should be rolled up into `node`. Pass the
* sub-expressions or child operators whose failure caused `node` to fall back.
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* The type of the TreeNode. Typically `SparkPlan`, `AggregateExpression`, or `Expression`.
* @return
* The node with information (if any) attached
* `node` with fallback reasons attached (as a side effect on its tag map).
*/
def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = {
// support existing approach of passing in multiple infos in a newline-delimited string
Expand All @@ -228,22 +235,24 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes. For now, we are using this to attach the reasons why certain Spark
* operators or expressions are disabled.
* Record one or more fallback reasons on a `TreeNode` and roll up reasons from any child nodes.
* This is the set-valued form of [[withInfo]]; see that overload for the full contract.
*
* Reasons are accumulated (never overwritten) on the node's `EXTENSION_INFO` tag and are
* surfaced in extended explain output. When `COMET_LOG_FALLBACK_REASONS` is enabled, each new
* reason is also emitted as a warning.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* The Spark operator or expression that is falling back to Spark.
* @param info
* Information text. May contain zero or more strings. If not provided, then only information
* from child nodes will be included.
* The fallback reasons for this node. May be empty when the call is used purely to roll up
* child reasons.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* Child nodes whose own fallback reasons should be rolled up into `node`.
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* The type of the TreeNode. Typically `SparkPlan`, `AggregateExpression`, or `Expression`.
* @return
* The node with information (if any) attached
* `node` with fallback reasons attached (as a side effect on its tag map).
*/
def withInfos[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = {
if (CometConf.COMET_LOG_FALLBACK_REASONS.get()) {
Expand All @@ -259,25 +268,27 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes
* Roll up fallback reasons from `exprs` onto `node` without adding a new reason of its own. Use
* this when a parent operator is itself falling back and wants to preserve the reasons recorded
* on its child expressions/operators so they appear together in explain output.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* The parent operator or expression falling back to Spark.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* Child nodes whose fallback reasons should be aggregated onto `node`.
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* The type of the TreeNode. Typically `SparkPlan`, `AggregateExpression`, or `Expression`.
* @return
* The node with information (if any) attached
* `node` with the rolled-up reasons attached (as a side effect on its tag map).
*/
def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = {
withInfos(node, Set.empty, exprs: _*)
}

/**
* Checks whether a TreeNode has any explain information attached
* True if any fallback reason has been recorded on `node` (via [[withInfo]] / [[withInfos]]).
* Callers that need to short-circuit when a prior rule pass has already decided a node falls
* back can use this as the sticky signal.
*/
def hasExplainInfo(node: TreeNode[_]): Boolean = {
node.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty)
Expand Down
23 changes: 12 additions & 11 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case s: ShuffleExchangeExec if CometShuffleExchangeExec.nativeShuffleSupported(s) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)

case s: ShuffleExchangeExec if CometShuffleExchangeExec.columnarShuffleSupported(s) =>
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured)
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
plan.transformUp { case s: ShuffleExchangeExec =>
CometShuffleExchangeExec.shuffleSupported(s) match {
case Some(CometNativeShuffle) =>
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
case Some(CometColumnarShuffle) =>
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
case None =>
s
}
}
}

Expand Down
Loading
Loading