Skip to content

Commit 75afa8e

Browse files
fangchenlizhengruifeng
authored andcommitted
[SPARK-55229][SPARK-55231][PYTHON] Implement DataFrame.zipWithIndex in PySpark
### What changes were proposed in this pull request? Implement DataFrame.zipWithIndex in PySpark Classic ### Why are the changes needed? This method was added in Scala earlier. We need to add it in PySpark classic so user can use it in PySpark. ### Does this PR introduce _any_ user-facing change? Yes, user can see and use this API in PySpark. ### How was this patch tested? Unittests added. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.6 Closes #54195 from fangchenli/pyspark-zip-with-index. Authored-by: Fangchen Li <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent f3ad0f6 commit 75afa8e

File tree

4 files changed

+90
-0
lines changed

4 files changed

+90
-0
lines changed

python/pyspark/sql/classic/dataframe.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ def explain(
280280
def exceptAll(self, other: ParentDataFrame) -> ParentDataFrame:
281281
return DataFrame(self._jdf.exceptAll(other._jdf), self.sparkSession)
282282

283+
def zipWithIndex(self, indexColName: str = "index") -> ParentDataFrame:
284+
return DataFrame(self._jdf.zipWithIndex(indexColName), self.sparkSession)
285+
283286
def isLocal(self) -> bool:
284287
return self._jdf.isLocal()
285288

python/pyspark/sql/connect/dataframe.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,11 @@ def exceptAll(self, other: ParentDataFrame) -> ParentDataFrame:
12121212
res._cached_schema = self._merge_cached_schema(other)
12131213
return res
12141214

1215+
def zipWithIndex(self, indexColName: str = "index") -> ParentDataFrame:
1216+
return self.select(
1217+
F.col("*"), F._invoke_function("distributed_sequence_id").alias(indexColName)
1218+
)
1219+
12151220
def intersect(self, other: ParentDataFrame) -> ParentDataFrame:
12161221
self._check_same_session(other)
12171222
res = DataFrame(

python/pyspark/sql/dataframe.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,58 @@ def exceptAll(self, other: "DataFrame") -> "DataFrame":
782782
"""
783783
...
784784

785+
@dispatch_df_method
786+
def zipWithIndex(self, indexColName: str = "index") -> "DataFrame":
787+
"""Returns a new :class:`DataFrame` by appending a column containing consecutive
788+
0-based Long indices, similar to :meth:`RDD.zipWithIndex`.
789+
790+
The index column is appended as the last column of the resulting :class:`DataFrame`.
791+
792+
.. versionadded:: 4.2.0
793+
794+
Parameters
795+
----------
796+
indexColName : str, default "index"
797+
The name of the index column to append.
798+
799+
Returns
800+
-------
801+
:class:`DataFrame`
802+
A new DataFrame with an appended index column.
803+
804+
Notes
805+
-----
806+
If a column with `indexColName` already exists in the schema, the resulting
807+
:class:`DataFrame` will have duplicate column names. Selecting the duplicate column
808+
by name will throw `AMBIGUOUS_REFERENCE`, and writing the :class:`DataFrame` will
809+
throw `COLUMN_ALREADY_EXISTS`.
810+
811+
Examples
812+
--------
813+
>>> df = spark.createDataFrame(
814+
... [("a", 1), ("b", 2), ("c", 3)], ["letter", "number"])
815+
>>> df.zipWithIndex().show()
816+
+------+------+-----+
817+
|letter|number|index|
818+
+------+------+-----+
819+
| a| 1| 0|
820+
| b| 2| 1|
821+
| c| 3| 2|
822+
+------+------+-----+
823+
824+
Custom index column name:
825+
826+
>>> df.zipWithIndex("row_id").show()
827+
+------+------+------+
828+
|letter|number|row_id|
829+
+------+------+------+
830+
| a| 1| 0|
831+
| b| 2| 1|
832+
| c| 3| 2|
833+
+------+------+------+
834+
"""
835+
...
836+
785837
@dispatch_df_method
786838
def isLocal(self) -> bool:
787839
"""Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally

python/pyspark/sql/tests/test_dataframe.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,36 @@ def test_to_json(self):
12141214
self.assertIsInstance(df, DataFrame)
12151215
self.assertEqual(df.select("value").count(), 10)
12161216

1217+
def test_zip_with_index(self):
1218+
df = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "number"])
1219+
1220+
# Default column name "index"
1221+
result = df.zipWithIndex()
1222+
self.assertEqual(result.columns, ["letter", "number", "index"])
1223+
rows = result.collect()
1224+
self.assertEqual(len(rows), 3)
1225+
indices = [row["index"] for row in rows]
1226+
self.assertEqual(sorted(indices), [0, 1, 2])
1227+
1228+
# Custom column name
1229+
result = df.zipWithIndex("row_id")
1230+
self.assertEqual(result.columns, ["letter", "number", "row_id"])
1231+
rows = result.collect()
1232+
indices = [row["row_id"] for row in rows]
1233+
self.assertEqual(sorted(indices), [0, 1, 2])
1234+
1235+
# Duplicate column name causes AMBIGUOUS_REFERENCE on select
1236+
result = df.zipWithIndex("letter")
1237+
with self.assertRaises(AnalysisException) as ctx:
1238+
result.select("letter").collect()
1239+
self.assertEqual(ctx.exception.getCondition(), "AMBIGUOUS_REFERENCE")
1240+
1241+
# Duplicate column name causes COLUMN_ALREADY_EXISTS on write
1242+
with tempfile.TemporaryDirectory() as d:
1243+
with self.assertRaises(AnalysisException) as ctx:
1244+
result.write.parquet(d)
1245+
self.assertEqual(ctx.exception.getCondition(), "COLUMN_ALREADY_EXISTS")
1246+
12171247

12181248
class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
12191249
pass

0 commit comments

Comments
 (0)