[SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd#25734
[SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd#25734mgaido91 wants to merge 5 commits into
Conversation
|
cc @cloud-fan @hvanhovell @maropu @viirya I haven't yet added the flag requested by @hvanhovell as I have some doubts about it as I expressed in the other PR. I'll add if you think it is needed. |
|
Test build #110354 has finished for PR 25734 at commit
|
|
Test build #110356 has finished for PR 25734 at commit
|
|
Hi, All. |
|
#25738 is ready. |
| private val sqlConfigs = conf.getAllConfs | ||
| private lazy val sqlConfExecutorSide = { | ||
| val props = new Properties() | ||
| props.putAll(sqlConfigs.asJava) |
|
hi all, I am going to update this once the followups are done, meanwhile do you all agree adding the flag proposed by @hvanhovell ? If so, I'll add it when I update this PR. Thanks. |
|
Yes. I'm +1 for @hvanhovell 's advice for the flag. Please add a document for that flag, too. |
|
+1 for the flag to keep the legacy behaivour. Is that document @dongjoon-hyun suggested the migration guide? Yea, we need to update that, too. |
|
ok thanks, but then shouldn't we add a note in the migration guide for 3.0 too? |
|
Test build #110471 has finished for PR 25734 at commit
|
| buildConf("spark.sql.legacy.rdd.applyConf") | ||
| .internal() | ||
| .doc("When false, SQL configurations are disregarded when operations on a RDD derived from" + | ||
| " a dataframe are executed. This is the (buggy) behavior up to 2.4.3.") |
|
|
||
| - Starting from 2.4.5, SQL configurations are effective also when a Dataset is converted to an RDD and its | ||
| plan is executed due to action on the derived RDD. The previous buggy behavior can be restored setting | ||
| `spark.sql.legacy.rdd.applyConf` to `false`. |
There was a problem hiding this comment.
Hi, @hvanhovell , @cloud-fan and @gatorsmile . As @mgaido91 asked here, this PR will add this flag only at branch-2.4. In this case, is it okay this config will be added and deprecated at 2.4.5 and will be removed at 3.0.0?
For me, we don't need to add this configuration to master. Did I understand correctly?
There was a problem hiding this comment.
I don't think we need to add the flag to master. We are allowed to break behavior there.
| ## Upgrading from Spark SQL 2.4 to 2.4.5 | ||
|
|
||
| - Starting from 2.4.5, SQL configurations are effective also when a Dataset is converted to an RDD and its | ||
| plan is executed due to action on the derived RDD. The previous buggy behavior can be restored setting |
There was a problem hiding this comment.
I'd personally refrain from using the term buggy. Please explain what the previous behavior was.
|
Test build #110506 has finished for PR 25734 at commit
|
|
Test build #110512 has finished for PR 25734 at commit
|
|
Retest this please. |
|
Test build #113545 has finished for PR 25734 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Merged to branch-2.4.
Sorry for being late, @mgaido91 .
cc @gatorsmile
### What changes were proposed in this pull request?
The PR proposes to create a custom `RDD` which enables to propagate `SQLConf` also in cases not tracked by SQL execution, as it happens when a `Dataset` is converted to and RDD either using `.rdd` or `.queryExecution.toRdd` and then the returned RDD is used to invoke actions on it.
In this way, SQL configs are effective also in these cases, while earlier they were ignored.
### Why are the changes needed?
Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
```
withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
df.createOrReplaceTempView("spark64kb")
val data = spark.sql("select * from spark64kb limit 10")
// Subexpression elimination is used here, despite it should have been disabled
data.describe()
}
```
### Why are the changes needed?
Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
```
withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
df.createOrReplaceTempView("spark64kb")
val data = spark.sql("select * from spark64kb limit 10")
// Subexpression elimination is used here, despite it should have been disabled
data.describe()
}
```
### Does this PR introduce any user-facing change?
When a user calls `.queryExecution.toRdd`, a `SQLExecutionRDD` is returned wrapping the `RDD` of the execute. When `.rdd` is used, an additional `SQLExecutionRDD` is present in the hierarchy.
### How was this patch tested?
added UT
Closes #25734 from mgaido91/SPARK-28939_2.4.
Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
In this way, SQL configs are effective also in these cases, while earlier they were ignored.
Why are the changes needed?
Without this patch, all the times
.rddor.queryExecution.toRddare used, all the SQL configs set are ignored. An example of a reproducer can be:Why are the changes needed?
Without this patch, all the times
.rddor.queryExecution.toRddare used, all the SQL configs set are ignored. An example of a reproducer can be:Does this PR introduce any user-facing change?
When a user calls
.queryExecution.toRdd, aSQLExecutionRDDis returned wrapping theRDDof the execute. When.rddis used, an additionalSQLExecutionRDDis present in the hierarchy.How was this patch tested?
added UT