Skip to content

Commit 1809824

Browse files
andygroveclaude
andcommitted
feat: Set ffi_safe flag conditionally for native_iceberg_compat scans
For native_iceberg_compat scans that have no partition columns and no missing columns, set arrow_ffi_safe=true on the Scan protobuf. In this case all Arrow arrays come from parquet file data with non-reused buffers, so a cheap clone suffices instead of a deep copy on the native side. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent aa5afd6 commit 1809824

1 file changed

Lines changed: 24 additions & 0 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,6 +1904,30 @@ case class CometSortMergeJoinExec(
19041904
}
19051905

19061906
object CometScanWrapper extends CometSink[SparkPlan] {
1907+
override def convert(
1908+
op: SparkPlan,
1909+
builder: Operator.Builder,
1910+
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
1911+
val result = super.convert(op, builder, childOp: _*)
1912+
result.map { operator =>
1913+
op match {
1914+
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
1915+
val hasPartitionColumns = scan.relation.partitionSchema.nonEmpty
1916+
val hasMissingColumns = scan.requiredSchema.fields.exists { field =>
1917+
!scan.relation.dataSchema.fieldNames.contains(field.name)
1918+
}
1919+
val ffiSafe = !hasPartitionColumns && !hasMissingColumns
1920+
if (ffiSafe) {
1921+
val scanProto = operator.getScan.toBuilder.setArrowFfiSafe(true).build()
1922+
operator.toBuilder.setScan(scanProto).build()
1923+
} else {
1924+
operator
1925+
}
1926+
case _ => operator
1927+
}
1928+
}
1929+
}
1930+
19071931
override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec = {
19081932
CometScanWrapper(nativeOp, op)
19091933
}

0 commit comments

Comments
 (0)