[SPARK-55441][SQL] Types Framework - Phase 1c - Client Integration#54905
[SPARK-55441][SQL] Types Framework - Phase 1c - Client Integration#54905davidm-db wants to merge 19 commits intoapache:masterfrom
Conversation
sql/api/src/main/scala/org/apache/spark/sql/types/ops/ClientTypeOps.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/types/ops/TimeTypeApiOps.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/types/ops/ClientTypeOps.scala
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/types/ops/ClientTypeOps.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/CatalystTypeOps.scala
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/types/ops/ClientTypeOps.scala
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/connect/client/arrow/types/ops/TimeTypeConnectOps.scala
Outdated
Show resolved
Hide resolved
...nect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
Outdated
Show resolved
Hide resolved
...t/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
Show resolved
Hide resolved
...nect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
Outdated
Show resolved
Hide resolved
...onnect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala
Outdated
Show resolved
Hide resolved
...nnect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
Outdated
Show resolved
Hide resolved
...t/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
Outdated
Show resolved
Hide resolved
...t/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
Outdated
Show resolved
Hide resolved
...t/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
Outdated
Show resolved
Hide resolved
...t/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala
Outdated
Show resolved
Hide resolved
| Map("typeInfo" -> s"${literalCase.name}(${literalCase.getNumber})")) | ||
| } | ||
| builder.build() | ||
| ProtoTypeOps |
There was a problem hiding this comment.
there doesn't seem to be a way to nicely/cleanly reduce the diff here
8da3e67 to
db8b270
Compare
db8b270 to
38aef26
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
Review
Prior state and problem: Adding a new data type required manually editing 13+ client-side infrastructure files with diverse dispatch patterns.
Design approach: Phase 1c extends the Types Framework with 4 new optional traits (ClientTypeOps, CatalystTypeOps, ProtoTypeOps, ConnectArrowTypeOps) that centralize client-facing operations per type. Each integration file wraps its original method body in a *Default helper and adds a framework dispatch. TimeType serves as the reference implementation.
Key design decisions:
- Feature flag gating: server-side ops check
typesFrameworkEnabled; Connect client Arrow ops skip the flag (client must handle whatever server sends). - Visibility strategy:
CatalystTypeOpsuses type-safeArrowFieldWriter(widened toprivate[sql]), whileConnectArrowTypeOpsusesAnyfor all parameters/returns due toprivate[arrow]visibility of concrete types. - Reverse lookup registration: forward dispatch goes through a single ops instance, but reverse lookups (Arrow→DataType, proto→DataType, value→proto) each need separate factory methods with per-type matching.
Implementation sketch: 13 integration points consistently use XxxTypeOps(dt).map(_.method).getOrElse(default(...)). Per-type ops classes: TimeTypeApiOps (sql/api), TimeTypeOps (catalyst), TimeTypeConnectOps (connect).
| */ | ||
| object ProtoTypeOps { | ||
|
|
||
| def apply(dt: DataType): Option[ProtoTypeOps] = { |
There was a problem hiding this comment.
Adding a second framework-managed type requires 5 separate match-clause additions in this file (apply, toLiteralProtoForValue, toCatalystType, getScalaConverterForKind, getProtoDataTypeFromLiteral). Three of those dispatch by proto enum cases and can be consolidated.
Concrete suggestion — extract a shared opsForKindCase lookup and add a toCatalystTypeFromProto method to the trait:
// Add to ProtoTypeOps trait:
def toCatalystTypeFromProto(t: proto.DataType): DataType
// In companion object — single KindCase → ops mapping:
private def opsForKindCase(
kindCase: proto.DataType.KindCase): Option[ProtoTypeOps] = kindCase match {
case proto.DataType.KindCase.TIME => Some(new TimeTypeConnectOps(TimeType()))
// Add new framework types here — single registration for all KindCase lookups
case _ => None
}
// Then toCatalystType, getScalaConverterForKind, getProtoDataTypeFromLiteral all delegate:
def toCatalystType(t: proto.DataType): Option[DataType] = {
if (!SqlApiConf.get.typesFrameworkEnabled) return None
opsForKindCase(t.getKindCase).map(_.toCatalystTypeFromProto(t))
}
def getScalaConverterForKind(
kindCase: proto.DataType.KindCase): Option[proto.Expression.Literal => Any] = {
if (!SqlApiConf.get.typesFrameworkEnabled) return None
opsForKindCase(kindCase).map(_.getScalaConverter)
}
def getProtoDataTypeFromLiteral(literal: proto.Expression.Literal): Option[proto.DataType] = {
if (!SqlApiConf.get.typesFrameworkEnabled) return None
// LiteralTypeCase maps 1:1 to KindCase for framework types
opsForKindCase(literalCaseToKindCase(literal.getLiteralTypeCase))
.map(_.getProtoDataTypeFromLiteral(literal))
}This reduces per-type registration in this file from 5 match points to 3 (apply, opsForKindCase, toLiteralProtoForValue).
There was a problem hiding this comment.
Applied your suggestion. Extracted a shared opsForKindCase lookup and added toCatalystTypeFromProto to the trait. toCatalystType, getScalaConverterForKind, and getProtoDataTypeFromLiteral all delegate through the single opsForKindCase registration point now. Per-type registration in this file reduced from 5 match blocks to 3 (apply, opsForKindCase, toLiteralProtoForValue). Added literalCaseToKindCase helper for the LiteralTypeCase-to-KindCase mapping.
| // the arrow package cast to the expected types. | ||
|
|
||
| /** Creates an Arrow serializer for writing values to a vector. Returns a Serializer. */ | ||
| def createArrowSerializer(vector: Any): Any |
There was a problem hiding this comment.
All three methods use Any for both parameters and return types — type mismatches between the ops implementation and the call site (e.g., casting to the wrong vector type) are only caught at runtime as ClassCastException. CatalystTypeOps avoided this by widening ArrowFieldWriter/TimeWriter to private[sql]. Could the Connect Arrow types (Serializer, Deserializer, ArrowVectorReader) similarly be widened to private[connect]?
There was a problem hiding this comment.
Widened FieldSerializer to private[connect], ArrowVectorReader and TimeVectorReader to private[connect]. All three return types are now proper: ArrowSerializer.Serializer, ArrowDeserializers.Deserializer[Any], ArrowVectorReader. Call sites no longer need .asInstanceOf casts on the return value.
For parameters, 1 of 3 is also proper (createArrowVectorReader takes FieldVector). The other 2 (createArrowSerializer, createArrowDeserializer) take AnyRef because the existing caller APIs (serializerFor(encoder, v: AnyRef), deserializerFor(encoder, data: AnyRef)) use AnyRef — these signatures predate our work and use pattern matching to determine the concrete vector type. Using AnyRef avoids an unnecessary intermediate cast.
Net result: 6 Any positions reduced to 2 AnyRef parameters. The remaining 2 are constrained by the existing caller APIs, not by visibility.
| binaryFormatter: BinaryFormatter): String = a match { | ||
| case (null, _) => if (nested) "null" else "NULL" | ||
| case (value, dt) => | ||
| ClientTypeOps(dt).map(_.formatExternal(value, nested)).getOrElse { |
There was a problem hiding this comment.
Two concerns with this dispatch:
-
Bypasses session formatters.
formatExternaluses the ops instance's ownFractionTimeFormatter, ignoring theformatters: TimeFormattersparameter. Currently both use the samenew FractionTimeFormatter()so output matches, but if session-level time formatting becomes configurable the framework path would silently ignore it. -
Per-value allocation.
ClientTypeOps(dt)→TypeApiOps(dt)creates a newTimeTypeApiOps(including a new@transient lazy val timeFormatter) on every call. For a result set with N TimeType values, that's N instances + N formatters vs. the original code which reuses a singleformatters.timefor all values.
Consider passing the session formatter through the ops interface, or caching the ops instance per DataType.
There was a problem hiding this comment.
-
TimeType currently has two parallel implementations — the original hardcoded branches and the framework ops. The framework implementation is how a new type would be built from scratch (formatter owned by the ops class, no dependency on external formatter pipelines). This should be the correct design for new types going forward.
- The concern about bypassing session formatters doesn't apply here —
FractionTimeFormatteris fully stateless (no session configuration, no timezone, no legacy parser policy). UnlikeTimestampFormatter(which takessessionLocalTimeZone) orDateFormatter(which checkslegacyTimeParserPolicy), the time formatter has zero session state. Both paths — the originalformatters.time.format(lt)and the framework'sformatExternal— create the same statelessFractionTimeFormatterand produce identical output. - For future types that DO need session-configurable formatting, the ops can read session state inside
formatExternalat call time viaSqlApiConf.get, matching howHiveResult.getTimeFormattersreadsSQLConf.get.sessionLocalTimeZone. The framework design handles this without API changes.
- The concern about bypassing session formatters doesn't apply here —
-
Re allocation: tracked for Phase 1d — singleton caching of ops instances across all factory companions uniform. This was already noted down during work on other phases and I'll fix it all together in the upcoming phase.
| def getCommonJDBCType(dt: DataType): Option[JdbcType] = { | ||
| // Uses .orElse (not .getOrElse) because this method returns Option[JdbcType]. | ||
| def getCommonJDBCType(dt: DataType): Option[JdbcType] = | ||
| ClientTypeOps(dt).map(ops => JdbcType(ops.jdbcTypeName, ops.getJdbcType)) |
There was a problem hiding this comment.
When the framework is enabled, getCommonJDBCType(TimeType) now returns Some(JdbcType("TIME", java.sql.Types.TIME)). Before this PR, the default path had no TimeType case and returned None. This is a new capability (not a regression), but the PR description's "Does this PR introduce any user-facing change?" says No. Since the flag defaults to true in tests, this changes test-environment behavior.
There was a problem hiding this comment.
Removed JDBC entirely from this PR. getCommonJDBCType reverted to original code (no framework dispatch), JDBC methods removed from ClientTypeOps. The framework should be a transparent refactoring with no behavior change. JDBC requires types from sql/core that ClientTypeOps (in sql/api) can't reference, and partial support (DDL without getter/setter) is worse than no support. A separate JdbcTypeOps trait in sql/core is planned for when full JDBC support is ready.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/ops/CatalystTypeOps.scala
Outdated
Show resolved
Hide resolved
|
Surface Areas Detected
Findings Finding 1: Arrow NANOSECOND unit maps to MICROS_PRECISION — semantic mismatch
Finding 2: Per-row object allocation in HiveResult hot path
Finding 3: toLiteralProto hardcodes DEFAULT_PRECISION instead of using instance precision
Finding 4: ConnectArrowTypeOps uses Any throughout — type safety completely lost
Finding 5: Null safety gap in HiveResult framework dispatch
Finding 6: Redundant TimeType branches in *Default methods — no verification of equivalence
Finding 7: SerializerBuildHelper / DeserializerBuildHelper dispatch by enc.dataType ignoring encoder type
Finding 8: Undocumented JDBC behavior change for TimeType
Finding 9: Repeated new TimeTypeConnectOps(TimeType()) allocations across 5+ sites
Finding 10: No new tests despite 966 lines of new code
Finding 11: ClientTypeOps.apply does not check feature flag — inconsistent gating
Finding 12: createFieldWriterDefault unnecessarily private[sql]
Finding 13: ProtoTypeOps companion has 5 separate registration points per type
Summary ┌──────────┬───────┐ Total findings: 13 Recommended Actions
|
|
@dejankrak-db All findings should be either addressed or replied to now. Finding 1 (Arrow precision mismatch): Pre-existing behavior — the original Finding 2 (Per-row allocation in HiveResult): Valid concern. Singleton caching of ops instances across all factory companions is tracked for Phase 1d (Validation & Performance). Finding 3 (toLiteralProto DEFAULT_PRECISION): Not a bug. The "generic" literal path receives a Finding 4 (ConnectArrowTypeOps Any types): Already fixed in a previous iteration — widened Finding 5 (Null safety in HiveResult): The null case is the first match arm and always runs before framework dispatch. This ordering is structural (same as all other type cases in the match) — not something that can be accidentally refactored away without changing the entire method signature. Finding 6 (Redundant TimeType in defaults): Intentional — project guideline: "Don't remove existing TimeType code. Its explicit case branches remain as safety net and fallback when the feature flag is off." Equivalence testing is tracked for Phase 1d/Phase 2. Finding 7 (Serializer dispatch order vs. encoder type): Finding 8 (JDBC behavior change): Removed JDBC from this PR entirely — Finding 9 (Repeated allocations): Tracked for Phase 1d — singleton caching across all factory companions uniformly. Finding 10 (No new tests): Tracked for Phase 2 (Testing Framework). The feature flag is enabled by default in tests, so the entire existing test suite validates the framework path. Finding 11 (ClientTypeOps.apply flag check): By design — Finding 12 (createFieldWriterDefault visibility): Fixed — changed from Finding 13 (ProtoTypeOps 5 registration points): Already fixed while addressing one of the Wenchen's comments — consolidated to 3 via a shared |
0f0049d to
7c65800
Compare
|
if the final design is to have one interface per module, then shall we merge |
7c65800 to
bc61b04
Compare
@cloud-fan merged two ops classes into a single one. all other refactors we talked about, like optional functionalities, etc. will be investigated and done as a follow-up, since they don't really fit into the topic of this pull request. |
| * TimeTypeApiOps for a reference implementation | ||
| * @since 4.2.0 | ||
| */ | ||
| trait ClientTypeOps { self: TypeApiOps => |
There was a problem hiding this comment.
TypeApiOps and ClientTypeOps are both for client side? shall we merge them into one? I'm ok to leave additional work for followup PRs, but not commit a draft version and rewrite it in followup.
There was a problem hiding this comment.
That's true. It's also true for CatalystTypeOps and TypeOps who are both on the catalyst side. However, in both cases, one is optional and the other is mandatory. I thought of merging it like this to enable functionality, and follow-up with a refactor, where I would introduce the concept of optional functions (returning Options) in the ops classes, since it's a framework design, rather than enabling functionality.
If you think however that we should do it here, I can, I was just explaining my reasoning.
What changes were proposed in this pull request?
Extends the Types Framework (Phase 1a: #54223) with optional traits for client-facing infrastructure dispatch. Adds 4 new traits and integrates 12 files so that new framework-managed types get client support automatically — zero edits to infrastructure files.
New traits:
CatalystTypeOps(catalyst) — serializer/deserializer expression building, Arrow field writersClientTypeOps(sql/api) — Arrow conversion, Python interop, Hive formatting, Thrift type IDProtoTypeOps(connect/common) — Spark Connect proto DataType and Literal conversionsConnectArrowTypeOps(connect/common) — Connect Arrow serde (no feature flag — client must handle whatever the server sends)TimeType serves as the reference implementation. One new per-type class (
TimeTypeConnectOps) handles both Connect traits; the existingTimeTypeApiOpsandTimeTypeOpsgain mix-ins forClientTypeOpsandCatalystTypeOpsrespectively.Integration pattern: Every integration point uses
XxxTypeOps(dt).map(_.method).getOrElse(methodDefault(...))— framework dispatch first, original code extracted to a private*Defaulthelper as fallback. This minimizes diff on existing code (match bodies stay at their original indentation in the helper).Trait method design: Methods that may need extra context for future complex types use overloading — a simple abstract version (most types override this) and a concrete extended version with additional parameters that delegates to the simple one by default. For example,
formatExternal(value)for most types, withformatExternal(value, nested)available for types that need nesting-aware formatting.Visibility changes:
ArrowFieldWriterandTimeWriterwidened fromprivate[arrow]toprivate[sql]soCatalystTypeOpscan declare/instantiate them.ArrowSerializer.FieldSerializer,ArrowVectorReader, andTimeVectorReaderwidened fromprivate[arrow]toprivate[connect]soConnectArrowTypeOpscan use proper return types instead ofAny.Why are the changes needed?
Without this, adding a new data type requires manually editing 13+ client integration files with diverse patterns. With these traits, a new type implements the relevant ops methods and all client infrastructure dispatches through the framework automatically.
JDBC Data Source is intentionally excluded from this PR. JDBC type mapping (
JdbcUtils) requires types fromsql/corethatClientTypeOps(insql/api) can't reference, and partial support (DDL without read/write) would be worse than no support. A separateJdbcTypeOpstrait insql/coreis planned to cover all four JDBC methods (getCommonJDBCType,getCatalystType,makeGetter,makeSetter) as a complete unit.Does this PR introduce any user-facing change?
No. The framework is gated behind
spark.sql.types.framework.enabled(defaultfalsein production,truein tests). Existing behavior is unchanged — all original TimeType branches remain as fallback in the*Defaultmethods.How was this patch tested?
The feature flag is enabled by default in test environments (
Utils.isTesting), so the entire existing test suite validates the framework code path. No new tests are added in this PR because the framework delegates to the same underlying logic that existing tests already cover.In subsequent phases, the testing focus will be on:
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6 (claude-opus-4-6)