Skip to content

Commit fe18186

Browse files
ericm-dbHeartSaVioR
authored andcommitted
[SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set
Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit` Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context". This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801 This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source. No Unit tests No Closes #50924 from ericm-db/sds-fix. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 942e95e commit fe18186

File tree

8 files changed

+22
-15
lines changed

8 files changed

+22
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io._
21-
import java.util.UUID
2221
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2322

2423
import scala.util.control.NonFatal
@@ -795,12 +794,8 @@ object RocksDBStateStoreProvider {
795794

796795
private def getRunId(hadoopConf: Configuration): String = {
797796
val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY)
798-
if (runId != null) {
799-
runId
800-
} else {
801-
assert(Utils.isTesting, "Failed to find query id/batch Id in task context")
802-
UUID.randomUUID().toString
803-
}
797+
assert(runId != null)
798+
runId
804799
}
805800

806801
// Native operation latencies report as latency in microseconds

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ object StateStoreProvider {
529529
hadoopConf: Configuration,
530530
useMultipleValuesPerKey: Boolean,
531531
stateSchemaProvider: Option[StateSchemaProvider]): StateStoreProvider = {
532+
hadoopConf.set(StreamExecution.RUN_ID_KEY, providerId.queryRunId.toString)
532533
val provider = create(storeConf.providerClass)
533534
provider.init(providerId.storeId, keySchema, valueSchema, keyStateEncoderSpec,
534535
useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaProvider)
@@ -822,7 +823,6 @@ object StateStore extends Logging {
822823
if (version < 0) {
823824
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
824825
}
825-
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString)
826826
val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema,
827827
keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey,
828828
stateSchemaBroadcast)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2.state
1919

20+
import java.util.UUID
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.scalatest.Assertions
2224

2325
import org.apache.spark.sql.Row
24-
import org.apache.spark.sql.execution.streaming.MemoryStream
26+
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
2527
import org.apache.spark.sql.execution.streaming.state._
2628
import org.apache.spark.sql.functions.col
2729
import org.apache.spark.sql.internal.SQLConf
@@ -71,14 +73,16 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB
7173
*/
7274
private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = {
7375
val provider = newStateStoreProvider()
76+
val conf = new Configuration
77+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
7478
provider.init(
7579
StateStoreId(checkpointDir, 0, 0),
7680
keySchema,
7781
valueSchema,
7882
NoPrefixKeyStateEncoderSpec(keySchema),
7983
useColumnFamilies = false,
8084
StateStoreConf(spark.sessionState.conf),
81-
new Configuration)
85+
conf)
8286
provider
8387
}
8488

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state
1818

1919
import java.io.{File, FileWriter}
2020
import java.nio.ByteOrder
21+
import java.util.UUID
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.scalatest.Assertions
@@ -28,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
2829
import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow}
2930
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3031
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
31-
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog}
32+
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog, StreamExecution}
3233
import org.apache.spark.sql.execution.streaming.state._
3334
import org.apache.spark.sql.functions.col
3435
import org.apache.spark.sql.internal.SQLConf
@@ -588,14 +589,16 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
588589
*/
589590
private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = {
590591
val provider = newStateStoreProvider()
592+
val conf = new Configuration()
593+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
591594
provider.init(
592595
StateStoreId(checkpointDir, 0, 0),
593596
keySchema,
594597
valueSchema,
595598
NoPrefixKeyStateEncoderSpec(keySchema),
596599
useColumnFamilies = false,
597600
StateStoreConf(spark.sessionState.conf),
598-
new Configuration)
601+
conf)
599602
provider
600603
}
601604

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io.File
21+
import java.util.UUID
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.Path
@@ -26,7 +27,7 @@ import org.scalatest.Tag
2627
import org.apache.spark.{SparkContext, SparkException, TaskContext}
2728
import org.apache.spark.sql.{DataFrame, ForeachWriter}
2829
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
29-
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream}
30+
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, StreamExecution}
3031
import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper
3132
import org.apache.spark.sql.functions._
3233
import org.apache.spark.sql.internal.SQLConf
@@ -154,6 +155,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider {
154155
hadoopConf: Configuration,
155156
useMultipleValuesPerKey: Boolean = false,
156157
stateSchemaProvider: Option[StateSchemaProvider] = None): Unit = {
158+
hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
157159
innerProvider.init(
158160
stateStoreId,
159161
keySchema,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.SparkSession
3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
3535
import org.apache.spark.sql.catalyst.util.quietly
36-
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
36+
import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamExecution}
3737
import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.test.SharedSparkSession
3939
import org.apache.spark.sql.types._
@@ -2097,6 +2097,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
20972097
useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = {
20982098
val provider = new RocksDBStateStoreProvider()
20992099
val testStateSchemaProvider = new TestStateSchemaProvider
2100+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
21002101
provider.init(
21012102
storeId,
21022103
keySchema,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
10201020
minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
10211021
numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get,
10221022
hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = {
1023+
hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
10231024
val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory)
10241025
val provider = new HDFSBackedStateStoreProvider()
10251026
provider.init(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter
2828
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
2929
import org.apache.spark.sql.Encoders
3030
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
31-
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, ValueStateImplWithTTL}
31+
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StreamExecution, ValueStateImplWithTTL}
3232
import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState}
3434
import org.apache.spark.sql.test.SharedSparkSession
@@ -460,6 +460,7 @@ abstract class StateVariableSuiteBase extends SharedSparkSession
460460
conf: Configuration = new Configuration,
461461
useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
462462
val provider = new RocksDBStateStoreProvider()
463+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
463464
provider.init(
464465
storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec,
465466
useColumnFamilies,

0 commit comments

Comments
 (0)