Skip to content

Commit cbcc7f8

Browse files
[SPARK-55304][SS][PYTHON] Introduce support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader
### What changes were proposed in this pull request? This PR proposes to introduce the support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader. To support Admission control, we propose to change `DataSourceStreamReader` interface as following: (Created a table to perform side-by-side comparison) | **Before** | **After** | | :---: | :---: | | `class DataSourceStreamReader(ABC):` | `class DataSourceStreamReader(ABC):` | | `def initialOffset(self) -> dict` | `def initialOffset(self) -> dict` | | `def latestOffset() -> dict` | `def latestOffset(self, start: dict, limit: ReadLimit) -> dict` | | | `# NOTE: Optional to implement, default = ReadAllAvailable()` | | | `def getDefaultReadLimit(self) -> ReadLimit` | | | `# NOTE: Optional to implement, default = None` | | | `def reportLatestOffset(self) -> Optional[dict]` | | `def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]` | `def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]` | | `abstractmethod def read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator["RecordBatch"]]` | `abstractmethod def read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator["RecordBatch"]]` | | `def commit(self, end: dict) -> None` | `def commit(self, end: dict) -> None` | | `def stop(self) -> None` | `def stop(self) -> None` | The main change is following: * The method signature for `latestOffset` is changed. The method is mandatory. * The method `getDefaultReadLimit` is added, as optional. * The method `reportLatestOffset` is added, as optional. This way, new implementations would support Admission Control by default. We ensure the engine can handle the case of the old method signature, via Python’s built-in inspect module (similar to Java’s reflection). If the method “latestOffset” is implemented without parameters, we fall back to the source which does not enable admission control. For all new sources, implementing latestOffset with parameters is strongly recommended. ReadLimit interface and built-in implementations will be available for source implementations to leverage. Built-in implementations are as follows: `ReadAllAvailable`, `ReadMinRows`, `ReadMaxRows`, `ReadMaxFiles`, `ReadMaxBytes`. We won’t support custom implementation of `ReadLimit` interface at this point since it requires major efforts and we don’t see a demand, but we can plan for it if there is a strong demand. We do not make any change to `SimpleDataSourceStreamReader` for Admission Control, since it is designed for small data fetch and could be considered as already limiting the data. We could still add the `ReadLimit` later if we see strong demand of limiting the fetch size via the source option. To support `Trigger.AvailableNow`, we propose to introduce a new interface as following: ``` class SupportsTriggerAvailableNow(ABC): abstractmethod def prepareForTriggerAvailableNow(self) -> None ``` The above interface can be “mixed-up” with both `DataSourceStreamReader` and `SimpleDataSourceStreamReader`. It won’t work with `DataSourceStreamReader` implementations having the old method signature of `latestOffset()`, likewise mentioned above. ### Why are the changes needed? This is to catch up with supported features in Scala DSv2 API, which we got reports from developers that missing features block them to implement some data sources. ### Does this PR introduce _any_ user-facing change? Yes, users implementing streaming reader via python data source API will be able to add the support of Admission Control and Trigger.AvailableNow, which had been major lacks of features. ### How was this patch tested? New UTs. ### Was this patch authored or co-authored using generative AI tooling? Co-authored using claude-4.5-sonnet Closes #54085 from HeartSaVioR/SPARK-55304. Lead-authored-by: Jungtaek Lim <[email protected]> Co-authored-by: Jitesh Soni <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent b75a329 commit cbcc7f8

File tree

10 files changed

+1276
-102
lines changed

10 files changed

+1276
-102
lines changed

python/pyspark/sql/datasource.py

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
)
3333

3434
from pyspark.sql import Row
35+
from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadLimit
3536
from pyspark.sql.types import StructType
3637
from pyspark.errors import PySparkNotImplementedError
3738

@@ -714,9 +715,35 @@ def initialOffset(self) -> dict:
714715
messageParameters={"feature": "initialOffset"},
715716
)
716717

717-
def latestOffset(self) -> dict:
718+
def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
718719
"""
719-
Returns the most recent offset available.
720+
Returns the most recent offset available given a read limit. The start offset can be used
721+
to figure out how much new data should be read given the limit.
722+
723+
The `start` will be provided from the return value of :meth:`initialOffset()` for
724+
the very first micro-batch, and for subsequent micro-batches, the start offset is the
725+
ending offset from the previous micro-batch. The source can return the `start` parameter
726+
as it is, if there is no data to process.
727+
728+
:class:`ReadLimit` can be used by the source to limit the amount of data returned in this
729+
call. The implementation should implement :meth:`getDefaultReadLimit()` to provide the
730+
proper :class:`ReadLimit` if the source can limit the amount of data returned based on the
731+
source options.
732+
733+
The engine can still call :meth:`latestOffset()` with :class:`ReadAllAvailable` even if the
734+
source produces the different read limit from :meth:`getDefaultReadLimit()`, to respect the
735+
semantic of trigger. The source must always respect the given readLimit provided by the
736+
engine; e.g. if the readLimit is :class:`ReadAllAvailable`, the source must ignore the read
737+
limit configured through options.
738+
739+
.. versionadded:: 4.2.0
740+
741+
Parameters
742+
----------
743+
start : dict
744+
The start offset of the microbatch to continue reading from.
745+
limit : :class:`ReadLimit`
746+
The limit on the amount of data to be returned by this call.
720747
721748
Returns
722749
-------
@@ -726,14 +753,58 @@ def latestOffset(self) -> dict:
726753
727754
Examples
728755
--------
729-
>>> def latestOffset(self):
730-
... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}}
756+
>>> from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadMaxRows
757+
>>> def latestOffset(self, start, limit):
758+
... # Assume the source has 10 new records between start and latest offset
759+
... if isinstance(limit, ReadAllAvailable):
760+
... return {"index": start["index"] + 10}
761+
... else: # e.g., limit is ReadMaxRows(5)
762+
... return {"index": start["index"] + min(10, limit.maxRows)}
731763
"""
764+
# NOTE: Previous Spark versions didn't have start offset and read limit parameters for this
765+
# method. While Spark will ensure the backward compatibility for existing data sources, the
766+
# new data sources are strongly encouraged to implement this new method signature.
732767
raise PySparkNotImplementedError(
733768
errorClass="NOT_IMPLEMENTED",
734769
messageParameters={"feature": "latestOffset"},
735770
)
736771

772+
def getDefaultReadLimit(self) -> ReadLimit:
773+
"""
774+
Returns the read limits potentially passed to the data source through options when creating
775+
the data source. See the built-in implementations of :class:`ReadLimit` for available read
776+
limits.
777+
778+
Implementing this method is optional. By default, it returns :class:`ReadAllAvailable`,
779+
which means there is no limit on the amount of data returned by :meth:`latestOffset()`.
780+
781+
.. versionadded:: 4.2.0
782+
"""
783+
return ReadAllAvailable()
784+
785+
def reportLatestOffset(self) -> Optional[dict]:
786+
"""
787+
Returns the most recent offset available. The information is used to report the latest
788+
offset in the streaming query status.
789+
The source can return `None`, if there is no data to process or the source does not support
790+
to this method.
791+
792+
.. versionadded:: 4.2.0
793+
794+
Returns
795+
-------
796+
dict or None
797+
A dict or recursive dict whose key and value are primitive types, which includes
798+
Integer, String and Boolean.
799+
Returns `None` if the source does not support reporting latest offset.
800+
801+
Examples
802+
--------
803+
>>> def reportLatestOffset(self):
804+
... return {"partition-1": {"index": 100}, "partition-2": {"index": 200}}
805+
"""
806+
return None
807+
737808
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
738809
"""
739810
Returns a list of InputPartition given the start and end offsets. Each InputPartition

python/pyspark/sql/datasource_internal.py

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,25 @@
1919
import json
2020
import copy
2121
from itertools import chain
22-
from typing import Iterator, List, Optional, Sequence, Tuple
22+
from typing import Iterator, List, Sequence, Tuple, Type, Dict
2323

2424
from pyspark.sql.datasource import (
2525
DataSource,
2626
DataSourceStreamReader,
2727
InputPartition,
2828
SimpleDataSourceStreamReader,
2929
)
30+
from pyspark.sql.streaming.datasource import (
31+
ReadAllAvailable,
32+
ReadLimit,
33+
ReadMaxBytes,
34+
ReadMaxRows,
35+
ReadMinRows,
36+
ReadMaxFiles,
37+
)
3038
from pyspark.sql.types import StructType
3139
from pyspark.errors import PySparkNotImplementedError
40+
from pyspark.errors.exceptions.base import PySparkException
3241

3342

3443
def _streamReader(datasource: DataSource, schema: StructType) -> "DataSourceStreamReader":
@@ -62,13 +71,9 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
6271
so that :class:`SimpleDataSourceStreamReader` can integrate with streaming engine like an
6372
ordinary :class:`DataSourceStreamReader`.
6473
65-
current_offset tracks the latest progress of the record prefetching, it is initialized to be
66-
initialOffset() when query start for the first time or initialized to be the end offset of
67-
the last planned batch when query restarts.
68-
6974
When streaming engine calls latestOffset(), the wrapper calls read() that starts from
70-
current_offset, prefetches and cache the data, then updates the current_offset to be
71-
the end offset of the new data.
75+
start provided via the parameter of latestOffset(), prefetches and cache the data, then updates
76+
the current_offset to be the end offset of the new data.
7277
7378
When streaming engine call planInputPartitions(start, end), the wrapper get the prefetched data
7479
from cache and send it to JVM along with the input partitions.
@@ -79,28 +84,26 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
7984

8085
def __init__(self, simple_reader: SimpleDataSourceStreamReader):
8186
self.simple_reader = simple_reader
82-
self.initial_offset: Optional[dict] = None
83-
self.current_offset: Optional[dict] = None
8487
self.cache: List[PrefetchedCacheEntry] = []
8588

8689
def initialOffset(self) -> dict:
87-
if self.initial_offset is None:
88-
self.initial_offset = self.simple_reader.initialOffset()
89-
return self.initial_offset
90-
91-
def latestOffset(self) -> dict:
92-
# when query start for the first time, use initial offset as the start offset.
93-
if self.current_offset is None:
94-
self.current_offset = self.initialOffset()
95-
(iter, end) = self.simple_reader.read(self.current_offset)
96-
self.cache.append(PrefetchedCacheEntry(self.current_offset, end, iter))
97-
self.current_offset = end
90+
return self.simple_reader.initialOffset()
91+
92+
def getDefaultReadLimit(self) -> ReadLimit:
93+
# We do not consider providing different read limit on simple stream reader.
94+
return ReadAllAvailable()
95+
96+
def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
97+
assert start is not None, "start offset should not be None"
98+
assert isinstance(
99+
limit, ReadAllAvailable
100+
), "simple stream reader does not support read limit"
101+
102+
(iter, end) = self.simple_reader.read(start)
103+
self.cache.append(PrefetchedCacheEntry(start, end, iter))
98104
return end
99105

100106
def commit(self, end: dict) -> None:
101-
if self.current_offset is None:
102-
self.current_offset = end
103-
104107
end_idx = -1
105108
for idx, entry in enumerate(self.cache):
106109
if json.dumps(entry.end) == json.dumps(end):
@@ -112,11 +115,6 @@ def commit(self, end: dict) -> None:
112115
self.simple_reader.commit(end)
113116

114117
def partitions(self, start: dict, end: dict) -> Sequence["InputPartition"]:
115-
# when query restart from checkpoint, use the last committed offset as the start offset.
116-
# This depends on the streaming engine calling planInputPartitions() of the last batch
117-
# in offset log when query restart.
118-
if self.current_offset is None:
119-
self.current_offset = end
120118
if len(self.cache) > 0:
121119
assert self.cache[-1].end == end
122120
return [SimpleInputPartition(start, end)]
@@ -144,3 +142,33 @@ def read(
144142
self, input_partition: SimpleInputPartition # type: ignore[override]
145143
) -> Iterator[Tuple]:
146144
return self.simple_reader.readBetweenOffsets(input_partition.start, input_partition.end)
145+
146+
147+
class ReadLimitRegistry:
148+
def __init__(self) -> None:
149+
self._registry: Dict[str, Type[ReadLimit]] = {}
150+
# Register built-in ReadLimit types
151+
self.__register(ReadAllAvailable)
152+
self.__register(ReadMinRows)
153+
self.__register(ReadMaxRows)
154+
self.__register(ReadMaxFiles)
155+
self.__register(ReadMaxBytes)
156+
157+
def __register(self, read_limit_type: Type["ReadLimit"]) -> None:
158+
name = read_limit_type.__name__
159+
if name in self._registry:
160+
raise PySparkException(f"ReadLimit type '{name}' is already registered.")
161+
self._registry[name] = read_limit_type
162+
163+
def get(self, params_with_type: dict) -> ReadLimit:
164+
type_name = params_with_type["_type"]
165+
if type_name is None:
166+
raise PySparkException("ReadLimit type name is missing.")
167+
168+
read_limit_type = self._registry.get(type_name)
169+
if read_limit_type is None:
170+
raise PySparkException("name '{}' is not registered.".format(type_name))
171+
172+
params_without_type = params_with_type.copy()
173+
del params_without_type["_type"]
174+
return read_limit_type(**params_without_type)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from abc import ABC, abstractmethod
19+
from dataclasses import dataclass
20+
21+
22+
class ReadLimit:
23+
"""
24+
Specifies limits on how much data to read from a streaming source when
25+
determining the latest offset.
26+
27+
As of Spark 4.2.0, only built-in implementations of :class:`ReadLimit` are supported. Please
28+
refer to the following classes for the supported types:
29+
30+
- :class:`ReadAllAvailable`
31+
- :class:`ReadMinRows`
32+
- :class:`ReadMaxRows`
33+
- :class:`ReadMaxFiles`
34+
- :class:`ReadMaxBytes`
35+
"""
36+
37+
38+
@dataclass
39+
class ReadAllAvailable(ReadLimit):
40+
"""
41+
A :class:`ReadLimit` that indicates to read all available data, regardless of the given source
42+
options.
43+
"""
44+
45+
46+
@dataclass
47+
class ReadMinRows(ReadLimit):
48+
"""
49+
A :class:`ReadLimit` that indicates to read minimum N rows. If there is less than N rows
50+
available for read, the source should skip producing a new offset to read and wait until more
51+
data arrives.
52+
53+
Note that the semantic does not work properly with Trigger.AvailableNow since the source
54+
may end up waiting forever for more data to arrive. It is the source's responsibility to
55+
handle this case properly.
56+
"""
57+
58+
min_rows: int
59+
60+
61+
@dataclass
62+
class ReadMaxRows(ReadLimit):
63+
"""
64+
A :class:`ReadLimit` that indicates to read maximum N rows. The source should not read more
65+
than N rows when determining the latest offset.
66+
"""
67+
68+
max_rows: int
69+
70+
71+
@dataclass
72+
class ReadMaxFiles(ReadLimit):
73+
"""
74+
A :class:`ReadLimit` that indicates to read maximum N files. The source should not read more
75+
than N files when determining the latest offset.
76+
"""
77+
78+
max_files: int
79+
80+
81+
@dataclass
82+
class ReadMaxBytes(ReadLimit):
83+
"""
84+
A :class:`ReadLimit` that indicates to read maximum N bytes. The source should not read more
85+
than N bytes when determining the latest offset.
86+
"""
87+
88+
max_bytes: int
89+
90+
91+
class SupportsTriggerAvailableNow(ABC):
92+
"""
93+
A mixin interface for streaming sources that support Trigger.AvailableNow. This interface can
94+
be added to both :class:`DataSourceStreamReader` and :class:`SimpleDataSourceStreamReader`.
95+
"""
96+
97+
@abstractmethod
98+
def prepareForTriggerAvailableNow(self) -> None:
99+
"""
100+
This will be called at the beginning of streaming queries with Trigger.AvailableNow, to let
101+
the source record the offset for the current latest data at the time (a.k.a the target
102+
offset for the query). The source must behave as if there is no new data coming in after
103+
the target offset, i.e., the source must not return an offset higher than the target offset
104+
when :meth:`DataSourceStreamReader.latestOffset()` is called.
105+
106+
The source can extend the semantic of "current latest data" based on its own logic, but the
107+
extended semantic must not violate the expectation that the source will not read any data
108+
which is added later than the time this method has called.
109+
110+
Note that it is the source's responsibility to ensure that calling
111+
:meth:`DataSourceStreamReader.latestOffset()` or :meth:`SimpleDataSourceStreamReader.read()`
112+
after calling this method will eventually reach the target offset, and finally returns the
113+
same offset as given start parameter, to indicate that there is no more data to read. This
114+
includes the case where the query is restarted and the source is asked to read from the
115+
offset being journaled in previous run - source should take care of exceptional cases like
116+
new partition has added during the restart, etc, to ensure that the query run will be
117+
completed at some point.
118+
"""
119+
pass

python/pyspark/sql/streaming/listener.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -914,12 +914,20 @@ def fromJObject(cls, jprogress: "JavaObject") -> "SourceProgress":
914914

915915
@classmethod
916916
def fromJson(cls, j: Dict[str, Any]) -> "SourceProgress":
917+
def _to_json_string(value: Any) -> str:
918+
"""Convert offset value to JSON string. If already a string, return as-is.
919+
If a dict/list, JSON-encode it."""
920+
if isinstance(value, str):
921+
return value
922+
else:
923+
return json.dumps(value)
924+
917925
return cls(
918926
jdict=j,
919927
description=j["description"],
920-
startOffset=str(j["startOffset"]),
921-
endOffset=str(j["endOffset"]),
922-
latestOffset=str(j["latestOffset"]),
928+
startOffset=_to_json_string(j["startOffset"]),
929+
endOffset=_to_json_string(j["endOffset"]),
930+
latestOffset=_to_json_string(j["latestOffset"]),
923931
numInputRows=j["numInputRows"],
924932
inputRowsPerSecond=j["inputRowsPerSecond"],
925933
processedRowsPerSecond=j["processedRowsPerSecond"],

0 commit comments

Comments
 (0)