Skip to content

Commit 4966fe9

Browse files
committed
[SPARK-53967][PYTHON] Avoid intermediate pandas dataframe creation in df.toPandas
### What changes were proposed in this pull request? Avoid intermediate pandas dataframe creation in `df.toPandas` before: batches -> table -> intermediate pdf -> result pdf (based on `pa.Table.to_pandas`) after: batches -> table -> result pdf (based on `pa.ChunkedArray.to_pandas`) ### Why are the changes needed? the intermediate pandas dataframe can be skipped simple benchmark in my local ``` spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true") import time from pyspark.sql import functions as sf df = spark.range(1000000).select( (sf.col("id") % 2).alias("key"), sf.col("id").alias("v") ) cols = {f"col_{i}": sf.lit(f"c{i}") for i in range(100)} df = df.withColumns(cols) df.cache() df.count() pdf = df.toPandas() # warm up start_arrow = time.perf_counter() for i in range(100): pdf = df.toPandas() time.perf_counter() - start_arrow ``` master: 304.49954012501985 secs this PR: 285.2997682078276 secs ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #52680 from zhengruifeng/avoid_unnecessary_pdf_creation. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
1 parent 9de0a27 commit 4966fe9

File tree

1 file changed

+43
-37
lines changed

1 file changed

+43
-37
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ def toPandas(self) -> "PandasDataFrameLike":
8080
from pyspark.sql.pandas.utils import require_minimum_pyarrow_version
8181

8282
require_minimum_pyarrow_version()
83-
to_arrow_schema(self.schema, prefers_large_types=jconf.arrowUseLargeVarTypes())
83+
arrow_schema = to_arrow_schema(
84+
self.schema, prefers_large_types=jconf.arrowUseLargeVarTypes()
85+
)
8486
except Exception as e:
8587
if jconf.arrowPySparkFallbackEnabled():
8688
msg = (
@@ -112,41 +114,40 @@ def toPandas(self) -> "PandasDataFrameLike":
112114

113115
self_destruct = jconf.arrowPySparkSelfDestructEnabled()
114116
batches = self._collect_as_arrow(split_batches=self_destruct)
115-
if len(batches) > 0:
116-
table = pa.Table.from_batches(batches)
117-
# Ensure only the table has a reference to the batches, so that
118-
# self_destruct (if enabled) is effective
119-
del batches
120-
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
121-
# values, but we should use datetime.date to match the behavior with when
122-
# Arrow optimization is disabled.
123-
pandas_options = {
124-
"date_as_object": True,
125-
"coerce_temporal_nanoseconds": True,
126-
}
127-
if self_destruct:
128-
# Configure PyArrow to use as little memory as possible:
129-
# self_destruct - free columns as they are converted
130-
# split_blocks - create a separate Pandas block for each column
131-
# use_threads - convert one column at a time
132-
pandas_options.update(
133-
{
134-
"self_destruct": True,
135-
"split_blocks": True,
136-
"use_threads": False,
137-
}
138-
)
139-
# Rename columns to avoid duplicated column names.
140-
pdf = table.rename_columns(
141-
[f"col_{i}" for i in range(table.num_columns)]
142-
).to_pandas(**pandas_options)
143117

144-
# Rename back to the original column names.
145-
pdf.columns = self.columns
118+
# Rename columns to avoid duplicated column names.
119+
temp_col_names = [f"col_{i}" for i in range(len(self.columns))]
120+
if len(batches) > 0:
121+
table = pa.Table.from_batches(batches).rename_columns(temp_col_names)
146122
else:
147-
pdf = pd.DataFrame(columns=self.columns)
123+
# empty dataset
124+
table = arrow_schema.empty_table().rename_columns(temp_col_names)
125+
126+
# Ensure only the table has a reference to the batches, so that
127+
# self_destruct (if enabled) is effective
128+
del batches
129+
130+
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
131+
# values, but we should use datetime.date to match the behavior with when
132+
# Arrow optimization is disabled.
133+
pandas_options = {
134+
"date_as_object": True,
135+
"coerce_temporal_nanoseconds": True,
136+
}
137+
if self_destruct:
138+
# Configure PyArrow to use as little memory as possible:
139+
# self_destruct - free columns as they are converted
140+
# split_blocks - create a separate Pandas block for each column
141+
# use_threads - convert one column at a time
142+
pandas_options.update(
143+
{
144+
"self_destruct": True,
145+
"split_blocks": True,
146+
"use_threads": False,
147+
}
148+
)
148149

149-
if len(pdf.columns) > 0:
150+
if len(self.columns) > 0:
150151
timezone = jconf.sessionLocalTimeZone()
151152
struct_in_pandas = jconf.pandasStructHandlingMode()
152153

@@ -155,21 +156,26 @@ def toPandas(self) -> "PandasDataFrameLike":
155156
error_on_duplicated_field_names = True
156157
struct_in_pandas = "dict"
157158

158-
return pd.concat(
159+
pdf = pd.concat(
159160
[
160161
_create_converter_to_pandas(
161162
field.dataType,
162163
field.nullable,
163164
timezone=timezone,
164165
struct_in_pandas=struct_in_pandas,
165166
error_on_duplicated_field_names=error_on_duplicated_field_names,
166-
)(pser)
167-
for (_, pser), field in zip(pdf.items(), self.schema.fields)
167+
)(arrow_col.to_pandas(**pandas_options))
168+
for arrow_col, field in zip(table.columns, self.schema.fields)
168169
],
169170
axis="columns",
170171
)
171172
else:
172-
return pdf
173+
# empty columns
174+
pdf = table.to_pandas(**pandas_options)
175+
176+
pdf.columns = self.columns
177+
return pdf
178+
173179
except Exception as e:
174180
# We might have to allow fallback here as well but multiple Spark jobs can
175181
# be executed. So, simply fail in this case for now.

0 commit comments

Comments
 (0)