Skip to content

Commit c09ac23

Browse files
ericm-dbHeartSaVioR
authored andcommitted
[SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set
### What changes were proposed in this pull request? Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit` ### Why are the changes needed? Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context". This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801 This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50924 from ericm-db/sds-fix. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent af6499f commit c09ac23

File tree

7 files changed

+22
-14
lines changed

7 files changed

+22
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ object StateStoreProvider extends Logging {
630630
hadoopConf: Configuration,
631631
useMultipleValuesPerKey: Boolean,
632632
stateSchemaProvider: Option[StateSchemaProvider]): StateStoreProvider = {
633+
hadoopConf.set(StreamExecution.RUN_ID_KEY, providerId.queryRunId.toString)
633634
val provider = create(storeConf.providerClass)
634635
provider.init(providerId.storeId, keySchema, valueSchema, keyStateEncoderSpec,
635636
useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaProvider)
@@ -669,12 +670,8 @@ object StateStoreProvider extends Logging {
669670
*/
670671
private[state] def getRunId(hadoopConf: Configuration): String = {
671672
val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY)
672-
if (runId != null) {
673-
runId
674-
} else {
675-
assert(Utils.isTesting, "Failed to find query id/batch Id in task context")
676-
UUID.randomUUID().toString
677-
}
673+
assert(runId != null)
674+
runId
678675
}
679676

680677
/**
@@ -968,7 +965,6 @@ object StateStore extends Logging {
968965
if (version < 0) {
969966
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
970967
}
971-
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString)
972968
val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema,
973969
keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey,
974970
stateSchemaBroadcast)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2.state
1919

20+
import java.util.UUID
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.scalatest.Assertions
2224

2325
import org.apache.spark.sql.Row
24-
import org.apache.spark.sql.execution.streaming.MemoryStream
26+
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
2527
import org.apache.spark.sql.execution.streaming.state._
2628
import org.apache.spark.sql.functions.col
2729
import org.apache.spark.sql.internal.SQLConf
@@ -71,14 +73,16 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB
7173
*/
7274
private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = {
7375
val provider = newStateStoreProvider()
76+
val conf = new Configuration
77+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
7478
provider.init(
7579
StateStoreId(checkpointDir, 0, 0),
7680
keySchema,
7781
valueSchema,
7882
NoPrefixKeyStateEncoderSpec(keySchema),
7983
useColumnFamilies = false,
8084
StateStoreConf(spark.sessionState.conf),
81-
new Configuration)
85+
conf)
8286
provider
8387
}
8488

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state
1818

1919
import java.io.{File, FileWriter}
2020
import java.nio.ByteOrder
21+
import java.util.UUID
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.scalatest.Assertions
@@ -28,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
2829
import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow}
2930
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3031
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
31-
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog}
32+
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog, StreamExecution}
3233
import org.apache.spark.sql.execution.streaming.state._
3334
import org.apache.spark.sql.functions.col
3435
import org.apache.spark.sql.internal.SQLConf
@@ -588,14 +589,16 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
588589
*/
589590
private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = {
590591
val provider = newStateStoreProvider()
592+
val conf = new Configuration()
593+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
591594
provider.init(
592595
StateStoreId(checkpointDir, 0, 0),
593596
keySchema,
594597
valueSchema,
595598
NoPrefixKeyStateEncoderSpec(keySchema),
596599
useColumnFamilies = false,
597600
StateStoreConf(spark.sessionState.conf),
598-
new Configuration)
601+
conf)
599602
provider
600603
}
601604

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io.File
21+
import java.util.UUID
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.Path
@@ -26,7 +27,7 @@ import org.scalatest.Tag
2627
import org.apache.spark.{SparkContext, SparkException, TaskContext}
2728
import org.apache.spark.sql.{DataFrame, ForeachWriter}
2829
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
29-
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream}
30+
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, StreamExecution}
3031
import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper
3132
import org.apache.spark.sql.functions._
3233
import org.apache.spark.sql.internal.SQLConf
@@ -154,6 +155,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider {
154155
hadoopConf: Configuration,
155156
useMultipleValuesPerKey: Boolean = false,
156157
stateSchemaProvider: Option[StateSchemaProvider] = None): Unit = {
158+
hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
157159
innerProvider.init(
158160
stateStoreId,
159161
keySchema,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.SparkSession
3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
3535
import org.apache.spark.sql.catalyst.util.quietly
36-
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
36+
import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamExecution}
3737
import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.test.SharedSparkSession
3939
import org.apache.spark.sql.types._
@@ -2098,6 +2098,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
20982098
useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = {
20992099
val provider = new RocksDBStateStoreProvider()
21002100
val testStateSchemaProvider = new TestStateSchemaProvider
2101+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
21012102
provider.init(
21022103
storeId,
21032104
keySchema,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
10581058
minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
10591059
numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get,
10601060
hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = {
1061+
hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
10611062
val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory)
10621063
val provider = new HDFSBackedStateStoreProvider()
10631064
provider.init(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter
2828
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
2929
import org.apache.spark.sql.Encoders
3030
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
31-
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, ValueStateImplWithTTL}
31+
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StreamExecution, ValueStateImplWithTTL}
3232
import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState}
3434
import org.apache.spark.sql.test.SharedSparkSession
@@ -461,6 +461,7 @@ abstract class StateVariableSuiteBase extends SharedSparkSession
461461
conf: Configuration = new Configuration,
462462
useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
463463
val provider = new RocksDBStateStoreProvider()
464+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
464465
provider.init(
465466
storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec,
466467
useColumnFamilies,

0 commit comments

Comments
 (0)