Skip to content

Commit e9d22e1

Browse files
committed
[SPARK-28939][SQL][BACKPORT-2.4] Propagate SQLConf for plans executed by toRdd
1 parent 483dcf5 commit e9d22e1

4 files changed

Lines changed: 119 additions & 5 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ object SQLConf {
113113
* Returns the active config object within the current scope. If there is an active SparkSession,
114114
* the proper SQLConf associated with the thread's active session is used. If it's called from
115115
* tasks in the executor side, a SQLConf will be created from job local properties, which are set
116-
* and propagated from the driver side.
116+
* and propagated from the driver side, unless a `SQLConf` has been set in the scope by
117+
* `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
118+
* from DataFrames.
117119
*
118120
* The way this works is a little bit convoluted, due to the fact that config was added initially
119121
* only for physical plans (and as a result not in sql/catalyst module).
@@ -127,7 +129,12 @@ object SQLConf {
127129
*/
128130
def get: SQLConf = {
129131
if (TaskContext.get != null) {
130-
new ReadOnlySQLConf(TaskContext.get())
132+
val conf = existingConf.get()
133+
if (conf != null) {
134+
conf
135+
} else {
136+
new ReadOnlySQLConf(TaskContext.get())
137+
}
131138
} else {
132139
val isSchedulerEventLoopThread = SparkContext.getActive
133140
.map(_.dagScheduler.eventProcessLoop.eventThread)

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
7777
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
7878

7979
/** Internal version of the RDD. Avoids copies and has no schema */
80-
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
80+
lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
81+
executedPlan.execute(), sparkSession.sessionState.conf)
8182

8283
/**
8384
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution
18+
19+
import java.util.Properties
20+
21+
import scala.collection.JavaConverters._
22+
23+
import org.apache.spark.{Partition, TaskContext}
24+
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.internal.SQLConf
27+
28+
/**
29+
* It is just a wrapper over `sqlRDD`, which sets and makes effective all the configs from the
30+
* captured `SQLConf`.
31+
* Please notice that this means we may miss configurations set after the creation of this RDD and
32+
* before its execution.
33+
*
34+
* @param sqlRDD the `RDD` generated by the SQL plan
35+
* @param conf the `SQLConf` to apply to the execution of the SQL plan
36+
*/
37+
class SQLExecutionRDD(
38+
var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends RDD[InternalRow](sqlRDD) {
39+
private val sqlConfigs = conf.getAllConfs
40+
private lazy val sqlConfExecutorSide = {
41+
val props = new Properties()
42+
props.putAll(sqlConfigs.asJava)
43+
val newConf = new SQLConf()
44+
newConf.setConf(props)
45+
newConf
46+
}
47+
48+
override val partitioner = firstParent[InternalRow].partitioner
49+
50+
override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
51+
52+
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
53+
// If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set
54+
// and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of
55+
// this RDD.
56+
if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
57+
SQLConf.withExistingConf(sqlConfExecutorSide) {
58+
firstParent[InternalRow].iterator(split, context)
59+
}
60+
} else {
61+
firstParent[InternalRow].iterator(split, context)
62+
}
63+
}
64+
}

sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717

1818
package org.apache.spark.sql.internal
1919

20-
import org.apache.spark.SparkFunSuite
21-
import org.apache.spark.sql.{AnalysisException, SparkSession}
20+
import org.apache.spark.{SparkException, SparkFunSuite}
21+
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.sql.SparkSession
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.expressions.Attribute
25+
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
26+
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
2227
import org.apache.spark.sql.execution.debug.codegenStringSeq
2328
import org.apache.spark.sql.functions.col
2429
import org.apache.spark.sql.test.SQLTestUtils
@@ -98,4 +103,41 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
98103
}
99104
}
100105
}
106+
107+
test("SPARK-28939: propagate SQLConf also in conversions to RDD") {
108+
val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
109+
val physicalPlan = SQLConfAssertPlan(confs)
110+
val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
111+
withSQLConf(confs: _*) {
112+
// Force RDD evaluation to trigger asserts
113+
dummyQueryExecution.toRdd.collect()
114+
}
115+
val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan)
116+
// Without setting the configs assertions fail
117+
val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
118+
assert(e.getCause.isInstanceOf[NoSuchElementException])
119+
}
120+
}
121+
122+
case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {
123+
override protected def doExecute(): RDD[InternalRow] = {
124+
sqlContext
125+
.sparkContext
126+
.parallelize(0 until 2, 2)
127+
.mapPartitions { it =>
128+
val confs = SQLConf.get
129+
confToCheck.foreach { case (key, expectedValue) =>
130+
assert(confs.getConfString(key) == expectedValue)
131+
}
132+
it.map(i => InternalRow.fromSeq(Seq(i)))
133+
}
134+
}
135+
136+
override def output: Seq[Attribute] = Seq.empty
137+
}
138+
139+
case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan)
140+
extends QueryExecution(spark, LocalRelation()) {
141+
override lazy val sparkPlan: SparkPlan = physicalPlan
142+
override lazy val executedPlan: SparkPlan = physicalPlan
101143
}

0 commit comments

Comments
 (0)