Skip to content

[SPARK-52597][SS][TESTS] Fix the execution failure of StateStoreBasicOperationsBenchmark #51304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Jun 27, 2025

What changes were proposed in this pull request?

This pr has made the following changes to fix the StateStoreBasicOperationsBenchmark:

  1. Following the suggestion from @zecookiez, set "spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag" to false when initializing StateStoreConf.
  2. When initializing the RocksDBStateStoreProvider, populate the StreamExecution.RUN_ID_KEY for the incoming Hadoop Configuration.

Why are the changes needed?

Fix the execution failure of StateStoreBasicOperationsBenchmark:

build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark"
[error] Exception in thread "main" java.lang.AssertionError: assertion failed
[error] 	at scala.Predef$.assert(Predef.scala:264)
[error] 	at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.getRunId(StateStore.scala:673)
[error] 	at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.init(RocksDBStateStoreProvider.scala:394)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.newRocksDBStateProvider(StateStoreBasicOperationsBenchmark.scala:484)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.$anonfun$runPutBenchmark$3(StateStoreBasicOperationsBenchmark.scala:92)
[error] 	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
[error] 	at scala.collection.immutable.List.foreach(List.scala:334)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.$anonfun$runPutBenchmark$2(StateStoreBasicOperationsBenchmark.scala:87)
[error] 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[error] 	at org.apache.spark.benchmark.BenchmarkBase.runBenchmark(BenchmarkBase.scala:42)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.runPutBenchmark(StateStoreBasicOperationsBenchmark.scala:83)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.runBenchmarkSuite(StateStoreBasicOperationsBenchmark.scala:55)
[error] 	at org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark.main(StateStoreBasicOperationsBenchmark.scala)

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • It has been locally confirmed that the StateStoreBasicOperationsBenchmark can be executed successfully.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Jun 27, 2025
val storeConf = new StateStoreConf(sqlConf)

val configuration = new Configuration
configuration.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
Copy link
Contributor Author

@LuciferYang LuciferYang Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification is also needed; otherwise, the execution will still encounter errors:

build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark"
[error] Exception in thread "main" java.lang.AssertionError: assertion failed
[error] 	at scala.Predef$.assert(Predef.scala:264)
[error] 	at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.getRunId(StateStore.scala:673)
[error] 	at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.init(RocksDBStateStoreProvider.scala:394)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.newRocksDBStateProvider(StateStoreBasicOperationsBenchmark.scala:484)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.$anonfun$runPutBenchmark$3(StateStoreBasicOperationsBenchmark.scala:92)
[error] 	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
[error] 	at scala.collection.immutable.List.foreach(List.scala:334)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.$anonfun$runPutBenchmark$2(StateStoreBasicOperationsBenchmark.scala:87)
[error] 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[error] 	at org.apache.spark.benchmark.BenchmarkBase.runBenchmark(BenchmarkBase.scala:42)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.runPutBenchmark(StateStoreBasicOperationsBenchmark.scala:83)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark$.runBenchmarkSuite(StateStoreBasicOperationsBenchmark.scala:55)
[error] 	at org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72)
[error] 	at org.apache.spark.sql.execution.benchmark.StateStoreBasicOperationsBenchmark.main(StateStoreBasicOperationsBenchmark.scala)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually according to the original comment, there should be a random runid set when it's not found:

* Get the runId from the provided hadoopConf. If it is not found, generate a random UUID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If according to the comment, maybe we should generate a random one there and do a logWarning, so we could remove this random generation at test time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the description, it seems so, but the code implementation doesn't appear to match. It looks like when @ericm-db added the assertions, the comments weren't modified accordingly.

image

@ericm-db Maybe should submit a follow-up for SPARK-52188 to fix this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If according to the comment, maybe we should generate a random one there and do a logWarning, so we could remove this random generation at test time?

Based on the submission timestamps, it appears that the assertions were added after the comments. Since I'm not very familiar with this part of the code, I hope the original author, @ericm-db, can come up with a solution.

Copy link
Contributor Author

@LuciferYang LuciferYang Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

It seems that in the original logic, only test scenarios would automatically generate a UUID. So, personally, I think we should set it manually here, and the comment is already outdated. WDYT? @WweiL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thank you for the explanation! Yea I agree with you, let me just do a follow up on the comment there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @WweiL ~

Copy link
Contributor

@WweiL WweiL Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you merge this, thank you!

#51307

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you merge this, thank you!

#51307

done

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jun 27, 2025

I've submitted a job that executes via GitHub Action. Let's verify the effect of the modifications:

image

@LuciferYang
Copy link
Contributor Author

cc @HeartSaVioR @zecookiez

Copy link
Contributor

@zecookiez zecookiez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just saw that the benchmark job is producing proper output too. Thanks for putting in this change! 😃

Copy link
Contributor

@WweiL WweiL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

LuciferYang pushed a commit that referenced this pull request Jun 27, 2025
### What changes were proposed in this pull request?

#50924 removed the logic of generating a random id in `StateStoreProvider.getRunId` didn't update the comment. Following discussion here, update the comment #51304 (comment)

### Why are the changes needed?

Code readability

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

No need

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #51307 from WweiL/SPARK-52188-followup.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks for the quick turnaround, everyone!

@LuciferYang
Copy link
Contributor Author

Merged into master. Thanks @HeartSaVioR @zecookiez and @WweiL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants