Skip to content

Commit 8e88f5a

Browse files
zifeif2anishshri-db
authored andcommitted
[SPARK-55999][SS] Enable forceSnapshotUploadOnLag by default
### What changes were proposed in this pull request? 1. Config default change (SQLConf.scala): forceSnapshotUploadOnLag default false → true 2. 3 SPARK-51358 tests fixed (StateStoreCoordinatorSuite.scala): Explicitly set forceSnapshotUploadOnLag=false so lag detection tests aren't interfered with by the forced remediation 3. Cleanup (StateStoreCoordinatorSuite.scala): Removed redundant forceSnapshotUploadOnLag -> "true" ### Why are the changes needed? When state store is lagging in uploading snapshot in maintenance thread, turning on this feature allows state store to upload snapshot in query execution thread, which improves query reliability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The feature is tested in #52773 ### Was this patch authored or co-authored using generative AI tooling? yes Closes #54847 from zifeif2/enable-force-snapshot. Authored-by: zifeif2 <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent fb51fb3 commit 8e88f5a

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2863,7 +2863,7 @@ object SQLConf {
28632863
)
28642864
.version("4.2.0")
28652865
.booleanConf
2866-
.createWithDefault(false)
2866+
.createWithDefault(true)
28672867

28682868
val STATEFUL_SHUFFLE_PARTITIONS_INTERNAL =
28692869
buildConf("spark.sql.streaming.internal.stateStore.partitions")

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,8 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
440440
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
441441
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
442442
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2",
443-
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
443+
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0",
444+
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false"
444445
) {
445446
case (coordRef, spark) =>
446447
import spark.implicits._
@@ -477,7 +478,8 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
477478
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
478479
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "5",
479480
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0",
480-
SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT.key -> "5"
481+
SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT.key -> "5",
482+
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false"
481483
) {
482484
case (coordRef, spark) =>
483485
import spark.implicits._
@@ -521,7 +523,8 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
521523
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
522524
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
523525
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2",
524-
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
526+
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0",
527+
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false"
525528
) {
526529
case (coordRef, spark) =>
527530
import spark.implicits._
@@ -946,7 +949,6 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
946949
object StateStoreCoordinatorSuite {
947950
// Common configuration for SPARK-54063 tests
948951
private val spark54063CommonConfigs: Seq[(String, String)] = Seq(
949-
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "true",
950952
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
951953
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
952954
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",

sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
562562

563563
test("[SPARK-54063] STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG requires " +
564564
"STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG") {
565+
// Default values should work fine - both default to true
566+
assert(spark.sessionState.conf.stateStoreForceSnapshotUploadOnLag === true)
567+
565568
// This should work fine - both enabled
566569
withSQLConf(
567570
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",

0 commit comments

Comments
 (0)