Skip to content

Commit dd497fb

Browse files
committed
[SPARK-52597][SS][TESTS] Fix the execution failure of StateStoreBasicOperationsBenchmark
### What changes were proposed in this pull request? This pr has made the following changes to fix the `StateStoreBasicOperationsBenchmark`: 1. Following the suggestion from zecookiez, set "spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag" to false when initializing `StateStoreConf`. 2. When initializing the `RocksDBStateStoreProvider`, populate the `StreamExecution.RUN_ID_KEY` for the incoming Hadoop `Configuration`. ### Why are the changes needed? Fix the execution failure of `StateStoreBasicOperationsBenchmark`: ``` build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark" [error] Exception in thread "main" java.lang.AssertionError: assertion failed [error] at scala.Predef$.assert(Predef.scala:264) [error] at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.getRunId(StateStore.scala:673) [error] at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.init(RocksDBStateStoreProvider.scala:394) [error] at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.newRocksDBStateProvider(StateStoreBasicOperationsBenchmark.scala:484) [error] at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.$anonfun$runPutBenchmark$3(StateStoreBasicOperationsBenchmark.scala:92) [error] at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) [error] at scala.collection.immutable.List.foreach(List.scala:334) [error] at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.$anonfun$runPutBenchmark$2(StateStoreBasicOperationsBenchmark.scala:87) [error] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [error] at org.apache.spark.benchmark.BenchmarkBase.runBenchmark(BenchmarkBase.scala:42) [error] at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.runPutBenchmark(StateStoreBasicOperationsBenchmark.scala:83) [error] at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.runBenchmarkSuite(StateStoreBasicOperationsBenchmark.scala:55) [error] at org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72) [error] at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark.main(StateStoreBasicOperationsBenchmark.scala) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - It has been locally confirmed that the `StateStoreBasicOperationsBenchmark` can be executed successfully. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51304 from LuciferYang/SPARK-52597. Authored-by: yangjie01 <[email protected]> Signed-off-by: yangjie01 <[email protected]>
1 parent 16c0ce9 commit dd497fb

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.spark.sql.execution.benchmark
1919

20+
import java.util.UUID
21+
2022
import scala.util.Random
2123

2224
import org.apache.hadoop.conf.Configuration
2325

2426
import org.apache.spark.benchmark.Benchmark
2527
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
28+
import org.apache.spark.sql.execution.streaming.StreamExecution
2629
import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, NoPrefixKeyStateEncoderSpec, RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
2730
import org.apache.spark.sql.internal.SQLConf
2831
import org.apache.spark.sql.types.{IntegerType, StructField, StructType, TimestampType}
@@ -477,11 +480,16 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark {
477480
val sqlConf = new SQLConf()
478481
sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows",
479482
trackTotalNumberOfRows.toString)
483+
sqlConf.setConfString("spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag",
484+
false.toString)
480485
val storeConf = new StateStoreConf(sqlConf)
481486

487+
val configuration = new Configuration
488+
configuration.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
489+
482490
provider.init(
483491
storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
484-
useColumnFamilies = useColumnFamilies, storeConf, new Configuration,
492+
useColumnFamilies = useColumnFamilies, storeConf, configuration,
485493
useMultipleValuesPerKey = useMultipleValuesPerKey)
486494
provider
487495
}

0 commit comments

Comments
 (0)