-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set #50924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @Kimahriman PTAL |
LGTM |
@@ -2098,6 +2098,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid | |||
useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = { | |||
val provider = new RocksDBStateStoreProvider() | |||
val testStateSchemaProvider = new TestStateSchemaProvider | |||
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericm-db - is it possible to also add a data source reader test ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we no longer have the isTesting check and unconditionally assert that runId should not be null, those tests would just fail.
Without this fix, all of those tests were failing with runId being equal to null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool thanks
cc @cloud-fan @HeartSaVioR Could one of you merge this in to 4.0/4.1/master |
@@ -968,7 +965,6 @@ object StateStore extends Logging { | |||
if (version < 0) { | |||
throw QueryExecutionErrors.unexpectedStateStoreVersion(version) | |||
} | |||
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this is redundant, nice finding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 nice finding.
Thanks! Merging to master/4.0. |
…Y is not set Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit` Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context". This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801 This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source. No Unit tests No Closes #50924 from ericm-db/sds-fix. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…Y is not set ### What changes were proposed in this pull request? Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit` ### Why are the changes needed? Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context". This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801 This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50924 from ericm-db/sds-fix. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request? #50924 removed the logic of generating a random id in `StateStoreProvider.getRunId` didn't update the comment. Following discussion here, update the comment #51304 (comment) ### Why are the changes needed? Code readability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need ### Was this patch authored or co-authored using generative AI tooling? No Closes #51307 from WweiL/SPARK-52188-followup. Authored-by: Wei Liu <[email protected]> Signed-off-by: yangjie01 <[email protected]>
What changes were proposed in this pull request?
Setting the StreamExecution.RUN_ID_KEY in
StateStore.createAndInit
Why are the changes needed?
Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context".
This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801
This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No