From e9d22e1704731e01b9f63e25fcf74b313009827a Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 9 Sep 2019 17:43:08 +0200 Subject: [PATCH 1/5] [SPARK-28939][SQL][BACKPORT-2.4] Propagate SQLConf for plans executed by toRdd --- .../apache/spark/sql/internal/SQLConf.scala | 11 +++- .../spark/sql/execution/QueryExecution.scala | 3 +- .../spark/sql/execution/SQLExecutionRDD.scala | 64 +++++++++++++++++++ .../internal/ExecutorSideSQLConfSuite.scala | 46 ++++++++++++- 4 files changed, 119 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 523bb3f9b91ee..9aaf078e9b97d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -113,7 +113,9 @@ object SQLConf { * Returns the active config object within the current scope. If there is an active SparkSession, * the proper SQLConf associated with the thread's active session is used. If it's called from * tasks in the executor side, a SQLConf will be created from job local properties, which are set - * and propagated from the driver side. + * and propagated from the driver side, unless a `SQLConf` has been set in the scope by + * `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created + * from DataFrames. * * The way this works is a little bit convoluted, due to the fact that config was added initially * only for physical plans (and as a result not in sql/catalyst module). @@ -127,7 +129,12 @@ object SQLConf { */ def get: SQLConf = { if (TaskContext.get != null) { - new ReadOnlySQLConf(TaskContext.get()) + val conf = existingConf.get() + if (conf != null) { + conf + } else { + new ReadOnlySQLConf(TaskContext.get()) + } } else { val isSchedulerEventLoopThread = SparkContext.getActive .map(_.dagScheduler.eventProcessLoop.eventThread) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 64f49e2d0d4e6..377f95ac0e2df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -77,7 +77,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[InternalRow] = executedPlan.execute() + lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD( + executedPlan.execute(), sparkSession.sessionState.conf) /** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala new file mode 100644 index 0000000000000..7373da33e12ad --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import java.util.Properties + +import scala.collection.JavaConverters._ + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf + +/** + * It is just a wrapper over `sqlRDD`, which sets and makes effective all the configs from the + * captured `SQLConf`. + * Please notice that this means we may miss configurations set after the creation of this RDD and + * before its execution. + * + * @param sqlRDD the `RDD` generated by the SQL plan + * @param conf the `SQLConf` to apply to the execution of the SQL plan + */ +class SQLExecutionRDD( + var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends RDD[InternalRow](sqlRDD) { + private val sqlConfigs = conf.getAllConfs + private lazy val sqlConfExecutorSide = { + val props = new Properties() + props.putAll(sqlConfigs.asJava) + val newConf = new SQLConf() + newConf.setConf(props) + newConf + } + + override val partitioner = firstParent[InternalRow].partitioner + + override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + // If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set + // and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of + // this RDD. + if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) { + SQLConf.withExistingConf(sqlConfExecutorSide) { + firstParent[InternalRow].iterator(split, context) + } + } else { + firstParent[InternalRow].iterator(split, context) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 5b4736ef4f7f3..8114fd12fdb11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -17,8 +17,13 @@ package org.apache.spark.sql.internal -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.debug.codegenStringSeq import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SQLTestUtils @@ -98,4 +103,41 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { } } } + + test("SPARK-28939: propagate SQLConf also in conversions to RDD") { + val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y") + val physicalPlan = SQLConfAssertPlan(confs) + val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan) + withSQLConf(confs: _*) { + // Force RDD evaluation to trigger asserts + dummyQueryExecution.toRdd.collect() + } + val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan) + // Without setting the configs assertions fail + val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect()) + assert(e.getCause.isInstanceOf[NoSuchElementException]) + } +} + +case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = { + sqlContext + .sparkContext + .parallelize(0 until 2, 2) + .mapPartitions { it => + val confs = SQLConf.get + confToCheck.foreach { case (key, expectedValue) => + assert(confs.getConfString(key) == expectedValue) + } + it.map(i => InternalRow.fromSeq(Seq(i))) + } + } + + override def output: Seq[Attribute] = Seq.empty +} + +case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan) + extends QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan: SparkPlan = physicalPlan + override lazy val executedPlan: SparkPlan = physicalPlan } From d145b14ceb186d517b9622d8cf87c2d8b7135235 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 9 Sep 2019 18:21:09 +0200 Subject: [PATCH 2/5] fix --- .../scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala index 7373da33e12ad..b79afe3b86576 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.internal.SQLConf * @param conf the `SQLConf` to apply to the execution of the SQL plan */ class SQLExecutionRDD( - var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends RDD[InternalRow](sqlRDD) { + var sqlRDD: RDD[InternalRow], conf: SQLConf) extends RDD[InternalRow](sqlRDD) { private val sqlConfigs = conf.getAllConfs private lazy val sqlConfExecutorSide = { val props = new Properties() From 43bd0215a32a412efbc20da20da69a392229d3d0 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 11 Sep 2019 11:12:57 +0200 Subject: [PATCH 3/5] introduce config --- docs/sql-migration-guide-upgrade.md | 6 ++++ .../apache/spark/sql/internal/SQLConf.scala | 8 +++++ .../spark/sql/execution/QueryExecution.scala | 10 ++++-- .../spark/sql/execution/SQLExecutionRDD.scala | 8 +---- .../internal/ExecutorSideSQLConfSuite.scala | 32 +++++++++++++------ 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index b703cb5ab44e0..c4f85bcca221c 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -7,6 +7,12 @@ displayTitle: Spark SQL Upgrading Guide * Table of contents {:toc} +## Upgrading from Spark SQL 2.4 to 2.4.5 + + - Starting from 2.4.5, SQL configurations are effective also when a Dataset is converted to an RDD and its + plan is executed due to action on the derived RDD. The previous buggy behavior can be restored setting + `spark.sql.legacy.rdd.applyConf` to `false`. + ## Upgrading from Spark SQL 2.4 to 2.4.1 - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9aaf078e9b97d..5bff9ce1dbd78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1298,6 +1298,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val USE_CONF_ON_RDD_OPERATION = + buildConf("spark.sql.legacy.rdd.applyConf") + .internal() + .doc("When false, SQL configurations are disregarded when operations on a RDD derived from" + + " a dataframe are executed. This is the (buggy) behavior up to 2.4.3.") + .booleanConf + .createWithDefault(true) + val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter") .internal() .doc("When true, the apply function of the rule verifies whether the right node of the" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 377f95ac0e2df..37353b873227f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} import org.apache.spark.util.Utils @@ -77,8 +78,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD( - executedPlan.execute(), sparkSession.sessionState.conf) + lazy val toRdd: RDD[InternalRow] = { + if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) { + new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf) + } else { + executedPlan.execute() + } + } /** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala index b79afe3b86576..307e64b6bff7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala @@ -16,10 +16,6 @@ */ package org.apache.spark.sql.execution -import java.util.Properties - -import scala.collection.JavaConverters._ - import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -38,10 +34,8 @@ class SQLExecutionRDD( var sqlRDD: RDD[InternalRow], conf: SQLConf) extends RDD[InternalRow](sqlRDD) { private val sqlConfigs = conf.getAllConfs private lazy val sqlConfExecutorSide = { - val props = new Properties() - props.putAll(sqlConfigs.asJava) val newConf = new SQLConf() - newConf.setConf(props) + sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) } newConf } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 8114fd12fdb11..ae7206b8f46c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -105,17 +105,29 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { } test("SPARK-28939: propagate SQLConf also in conversions to RDD") { - val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y") - val physicalPlan = SQLConfAssertPlan(confs) - val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan) - withSQLConf(confs: _*) { - // Force RDD evaluation to trigger asserts - dummyQueryExecution.toRdd.collect() + withSQLConf(SQLConf.USE_CONF_ON_RDD_OPERATION.key -> "true") { + val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y") + val physicalPlan = SQLConfAssertPlan(confs) + val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan) + withSQLConf(confs: _*) { + // Force RDD evaluation to trigger asserts + dummyQueryExecution.toRdd.collect() + } + val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan) + // Without setting the configs assertions fail + val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect()) + assert(e.getCause.isInstanceOf[NoSuchElementException]) + } + withSQLConf(SQLConf.USE_CONF_ON_RDD_OPERATION.key -> "false") { + val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y") + val physicalPlan = SQLConfAssertPlan(confs) + val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan) + withSQLConf(confs: _*) { + // Force RDD evaluation to trigger asserts + val e = intercept[SparkException](dummyQueryExecution.toRdd.collect()) + assert(e.getCause.isInstanceOf[NoSuchElementException]) + } } - val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan) - // Without setting the configs assertions fail - val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect()) - assert(e.getCause.isInstanceOf[NoSuchElementException]) } } From a5eb604cc1b267869240f13b73f660b87dac24f1 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Sep 2019 09:39:10 +0200 Subject: [PATCH 4/5] fix spark version, add deprecation --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5bff9ce1dbd78..a59ac183498e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1302,7 +1302,8 @@ object SQLConf { buildConf("spark.sql.legacy.rdd.applyConf") .internal() .doc("When false, SQL configurations are disregarded when operations on a RDD derived from" + - " a dataframe are executed. This is the (buggy) behavior up to 2.4.3.") + " a dataframe are executed. This is the (buggy) behavior up to 2.4.4. This config is " + + "deprecated and it will be removed in 3.0.0.") .booleanConf .createWithDefault(true) From 1b145e2158679dc27fce07a8ddf17f6341175afe Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Sep 2019 12:30:43 +0200 Subject: [PATCH 5/5] Update sql-migration-guide-upgrade.md --- docs/sql-migration-guide-upgrade.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index c4f85bcca221c..a6fc42a4249d8 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -10,8 +10,9 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading from Spark SQL 2.4 to 2.4.5 - Starting from 2.4.5, SQL configurations are effective also when a Dataset is converted to an RDD and its - plan is executed due to action on the derived RDD. The previous buggy behavior can be restored setting - `spark.sql.legacy.rdd.applyConf` to `false`. + plan is executed due to action on the derived RDD. The previous behavior can be restored setting + `spark.sql.legacy.rdd.applyConf` to `false`: in this case, SQL configurations are ignored for operations + performed on a RDD derived from a Dataset. ## Upgrading from Spark SQL 2.4 to 2.4.1