diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 3e14d02b73da5..9a85169ad451a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -630,6 +630,7 @@ object StateStoreProvider extends Logging { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean, stateSchemaProvider: Option[StateSchemaProvider]): StateStoreProvider = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, providerId.queryRunId.toString) val provider = create(storeConf.providerClass) provider.init(providerId.storeId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaProvider) @@ -669,12 +670,8 @@ object StateStoreProvider extends Logging { */ private[state] def getRunId(hadoopConf: Configuration): String = { val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY) - if (runId != null) { - runId - } else { - assert(Utils.isTesting, "Failed to find query id/batch Id in task context") - UUID.randomUUID().toString - } + assert(runId != null) + runId } /** @@ -968,7 +965,6 @@ object StateStore extends Logging { if (version < 0) { throw QueryExecutionErrors.unexpectedStateStoreVersion(version) } - hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString) val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaBroadcast) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index 59c0af8afd198..8ee6d8762404c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.datasources.v2.state +import java.util.UUID + import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -71,6 +73,8 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB */ private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = { val provider = newStateStoreProvider() + val conf = new Configuration + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( StateStoreId(checkpointDir, 0, 0), keySchema, @@ -78,7 +82,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, StateStoreConf(spark.sessionState.conf), - new Configuration) + conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index fca7d16012cee..56a6a1e641f48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state import java.io.{File, FileWriter} import java.nio.ByteOrder +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions @@ -28,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row} import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil -import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog} +import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -588,6 +589,8 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass */ private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = { val provider = newStateStoreProvider() + val conf = new Configuration() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( StateStoreId(checkpointDir, 0, 0), keySchema, @@ -595,7 +598,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, StateStoreConf(spark.sessionState.conf), - new Configuration) + conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index 22150ffde5db6..fd317903be96e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.File +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -26,7 +27,7 @@ import org.scalatest.Tag import org.apache.spark.{SparkContext, SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, ForeachWriter} import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream} +import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -154,6 +155,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean = false, stateSchemaProvider: Option[StateSchemaProvider] = None): Unit = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) innerProvider.init( stateStoreId, keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index b135086821886..99d7e255f9517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamExecution} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -2098,6 +2098,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() val testStateSchemaProvider = new TestStateSchemaProvider + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( storeId, keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 4226ee94e98d3..aa4a50b853a42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1058,6 +1058,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get, hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory) val provider = new HDFSBackedStateStoreProvider() provider.init( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index 093e8b991cc9b..909e888a3dd9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} -import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, ValueStateImplWithTTL} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StreamExecution, ValueStateImplWithTTL} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState} import org.apache.spark.sql.test.SharedSparkSession @@ -461,6 +461,7 @@ abstract class StateVariableSuiteBase extends SharedSparkSession conf: Configuration = new Configuration, useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec, useColumnFamilies,