Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.iceberg.IcebergReflectionSuite
- name: "csv"
value: |
org.apache.comet.csv.CometCsvNativeReadSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.iceberg.IcebergReflectionSuite
- name: "csv"
value: |
org.apache.comet.csv.CometCsvNativeReadSuite
Expand Down
6 changes: 6 additions & 0 deletions dev/diffs/iceberg/1.10.0.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,12 @@ index 16fa726032..64e367cf47 100644
.getOrCreate();

catalog =
@@ -202,3 +222,4 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
protected boolean countDeletes() {
- return true;
+ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
+ return false;
}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
index baf7fa8f88..665946ad82 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
Expand Down
6 changes: 6 additions & 0 deletions dev/diffs/iceberg/1.8.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,12 @@ index dda49b4946..529992de6b 100644
.getOrCreate();

catalog =
@@ -201,3 +221,4 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
protected boolean countDeletes() {
- return true;
+ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
+ return false;
}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
index e5831b76e4..5c45a111d9 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
Expand Down
6 changes: 6 additions & 0 deletions dev/diffs/iceberg/1.9.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,12 @@ index dda49b4946..529992de6b 100644
.getOrCreate();

catalog =
@@ -201,3 +221,4 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
protected boolean countDeletes() {
- return true;
+ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
+ return false;
}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
index e5831b76e4..5c45a111d9 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,18 @@ object IcebergReflection extends Logging {
val opsMethod = table.getClass.getDeclaredMethod("operations")
opsMethod.setAccessible(true)
val ops = opsMethod.invoke(table)
val currentMethod = ops.getClass.getDeclaredMethod("current")
currentMethod.setAccessible(true)
val metadata = currentMethod.invoke(ops)
val formatVersionMethod = metadata.getClass.getMethod("formatVersion")
Some(formatVersionMethod.invoke(metadata).asInstanceOf[Int])
findMethodInHierarchy(ops.getClass, "current")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is current the only method affected? operations (a few lines above this) is calling getDeclaredMethod also.
Also, are there other catalog implementations that could have similar issues but for other methods?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could always call replace all occurences of getDeclaredMethod with findMethodInHierarchy
Especially, those that assume the superclass's depth seems fragile

      val tasksMethod = scan.getClass.getSuperclass
        .getDeclaredMethod("tasks")
      val filterExpressionsMethod = scan.getClass.getSuperclass.getSuperclass
        .getDeclaredMethod("filterExpressions")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbutrovich wdyt? Is there a potential performance impact here if we replace getDeclaredMethod with findMethodInHierarchy?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried that at some point but it's been at least 6 months, open to trying again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking this over, I feel we can defer changing everything over to use findMethodInHierarchy until we hit an issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sgtm

.flatMap { currentMethod =>
val metadata = currentMethod.invoke(ops)
val formatVersionMethod = metadata.getClass.getMethod("formatVersion")
Some(formatVersionMethod.invoke(metadata).asInstanceOf[Int])
}
.orElse {
logError(
"Iceberg reflection failure: Failed to get format version: " +
"current() method not found in operations class hierarchy")
None
}
} catch {
case e: Exception =>
logError(s"Iceberg reflection failure: Failed to get format version: ${e.getMessage}")
Expand Down Expand Up @@ -327,9 +334,12 @@ object IcebergReflection extends Logging {
operationsMethod.setAccessible(true)
val operations = operationsMethod.invoke(table)

val currentMethod = operations.getClass.getDeclaredMethod("current")
currentMethod.setAccessible(true)
Some(currentMethod.invoke(operations))
findMethodInHierarchy(operations.getClass, "current").map(_.invoke(operations)).orElse {
logError(
"Iceberg reflection failure: Failed to get table metadata: " +
"current() method not found in operations class hierarchy")
None
}
} catch {
case e: Exception =>
logError(s"Iceberg reflection failure: Failed to get table metadata: ${e.getMessage}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,11 @@ case class CometScanRule(session: SparkSession)
// Check if table uses a FileIO implementation compatible with iceberg-rust

val fileIOCompatible = IcebergReflection.getFileIO(metadata.table) match {
case Some(fileIO)
if fileIO.getClass.getName == "org.apache.iceberg.inmemory.InMemoryFileIO" =>
fallbackReasons += "InMemoryFileIO is not supported by Comet's native reader"
false
case Some(_) =>
// InMemoryFileIO is now supported with table location fallback for REST catalogs
true
case None =>
fallbackReasons += "Could not check FileIO compatibility"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.iceberg

import java.util.Collections

import org.scalatest.funsuite.AnyFunSuite

import org.apache.iceberg.BaseMetastoreTableOperations
import org.apache.iceberg.BaseTable
import org.apache.iceberg.Schema
import org.apache.iceberg.TableMetadata
import org.apache.iceberg.io.FileIO
import org.apache.iceberg.types.Types

class IcebergReflectionSuite extends AnyFunSuite {

/** Mimics HiveTableOperations/GlueTableOperations which inherit current(). */
class StubTableOperations extends BaseMetastoreTableOperations {
override protected def tableName(): String = "test"
override def refresh(): TableMetadata = null
override def io(): FileIO = null
}

test("getTableMetadata succeeds when operations class inherits current()") {
val ops = new StubTableOperations()
val schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))
val expectedMetadata = TableMetadata.newTableMetadata(
schema,
org.apache.iceberg.PartitionSpec.unpartitioned(),
"file:///tmp/test-table",
Collections.emptyMap[String, String]())
val metadataField = classOf[BaseMetastoreTableOperations]
.getDeclaredField("currentMetadata")
metadataField.setAccessible(true)
metadataField.set(ops, expectedMetadata)
// current() checks shouldRefresh (default true) and calls refresh() instead of
// returning currentMetadata. Set to false so current() returns our stubbed metadata.
val refreshField = classOf[BaseMetastoreTableOperations]
.getDeclaredField("shouldRefresh")
refreshField.setAccessible(true)
refreshField.set(ops, false)

val table = new BaseTable(ops, "test-table")
val metadata = IcebergReflection.getTableMetadata(table)
assert(metadata.isDefined)
assert(metadata.get.isInstanceOf[TableMetadata])
}
}
Loading