Skip to content

Commit d0c0056

Browse files
committed
initial commit
1 parent 2bd193a commit d0c0056

File tree

6 files changed

+233
-10
lines changed

6 files changed

+233
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1573,7 +1573,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
15731573

15741574
case u @ Union(children, _, _)
15751575
// if there are duplicate output columns, give them unique expr ids
1576-
if children.exists(c => c.output.map(_.exprId).distinct.length < c.output.length) =>
1576+
if (u.allChildrenCompatible &&
1577+
conf.getConf(SQLConf.ENFORCE_TYPE_COERCION_BEFORE_UNION_DEDUPLICATION)) &&
1578+
children.exists(c => c.output.map(_.exprId).distinct.length < c.output.length) =>
15771579
val newChildren = children.map { c =>
15781580
if (c.output.map(_.exprId).distinct.length < c.output.length) {
15791581
val existingExprIds = mutable.HashSet[ExprId]()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -603,18 +603,23 @@ case class Union(
603603
}
604604

605605
override lazy val resolved: Boolean = {
606-
// allChildrenCompatible needs to be evaluated after childrenResolved
607-
def allChildrenCompatible: Boolean =
608-
children.tail.forall( child =>
609-
// compare the attribute number with the first child
610-
child.output.length == children.head.output.length &&
611-
// compare the data types with the first child
612-
child.output.zip(children.head.output).forall {
613-
case (l, r) => DataType.equalsStructurally(l.dataType, r.dataType, true)
614-
})
615606
children.length > 1 && !(byName || allowMissingCol) && childrenResolved && allChildrenCompatible
616607
}
617608

609+
/**
610+
* Checks whether the child outputs are compatible by using `DataType.equalsStructurally`. Do
611+
* that by comparing the size of the output with the size of the first child's output and by
612+
* comparing output data types with the data types of the first child's output.
613+
*
614+
* This method needs to be evaluated after `childrenResolved`.
615+
*/
616+
def allChildrenCompatible: Boolean = childrenResolved && children.tail.forall { child =>
617+
child.output.length == children.head.output.length &&
618+
child.output.zip(children.head.output).forall {
619+
case (l, r) => DataType.equalsStructurally(l.dataType, r.dataType, true)
620+
}
621+
}
622+
618623
override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): Union =
619624
copy(children = newChildren)
620625
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5949,6 +5949,17 @@ object SQLConf {
59495949
.createWithDefault(2)
59505950
}
59515951

5952+
val ENFORCE_TYPE_COERCION_BEFORE_UNION_DEDUPLICATION =
5953+
buildConf("spark.sql.enforceTypeCoercionBeforeUnionDeduplication.enabled")
5954+
.internal()
5955+
.doc(
5956+
"When set to true, we enforce type coercion to run before deduplication of UNION " +
5957+
"children outputs. Otherwise, order is relative to rule ordering."
5958+
)
5959+
.version("4.1.0")
5960+
.booleanConf
5961+
.createWithDefault(true)
5962+
59525963
/**
59535964
* Holds information about keys that have been deprecated.
59545965
*

sql/core/src/test/resources/sql-tests/analyzer-results/union.sql.out

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,24 @@ CreateViewCommand `t2`, VALUES (1.0, 1), (2.0, 4) tbl(c1, c2), false, true, Loca
1515
+- LocalRelation [c1#x, c2#x]
1616

1717

18+
-- !query
19+
CREATE TABLE parquetTable (col1 INT, col2 INT, col3 INT, col4 INT) USING parquet
20+
-- !query analysis
21+
CreateDataSourceTableCommand `spark_catalog`.`default`.`parquetTable`, false
22+
23+
24+
-- !query
25+
CREATE TABLE csvTable (col1 INT, col2 INT, col3 INT, col4 INT) USING csv
26+
-- !query analysis
27+
CreateDataSourceTableCommand `spark_catalog`.`default`.`csvTable`, false
28+
29+
30+
-- !query
31+
CREATE TABLE jsonTable (col1 INT, col2 INT, col3 INT, col4 INT) USING json
32+
-- !query analysis
33+
CreateDataSourceTableCommand `spark_catalog`.`default`.`jsonTable`, false
34+
35+
1836
-- !query
1937
SELECT *
2038
FROM (SELECT * FROM t1
@@ -241,6 +259,63 @@ Aggregate [sum(v#x) AS sum(v)#x]
241259
+- LocalRelation [v#x]
242260

243261

262+
-- !query
263+
SELECT col1, col2, col3, NULLIF('','') AS col4
264+
FROM parquetTable
265+
UNION ALL
266+
SELECT col2, col2, null AS col3, col4
267+
FROM parquetTable
268+
-- !query analysis
269+
Union false, false
270+
:- Project [col1#x, col2#x, col3#x, cast(col4#x as bigint) AS col4#xL]
271+
: +- Project [col1#x, col2#x, col3#x, nullif(, ) AS col4#x]
272+
: +- SubqueryAlias spark_catalog.default.parquettable
273+
: +- Relation spark_catalog.default.parquettable[col1#x,col2#x,col3#x,col4#x] parquet
274+
+- Project [col2#x, col2#x AS col2#x, col3#x, col4#xL]
275+
+- Project [col2#x, col2#x, cast(col3#x as int) AS col3#x, cast(col4#x as bigint) AS col4#xL]
276+
+- Project [col2#x, col2#x, null AS col3#x, col4#x]
277+
+- SubqueryAlias spark_catalog.default.parquettable
278+
+- Relation spark_catalog.default.parquettable[col1#x,col2#x,col3#x,col4#x] parquet
279+
280+
281+
-- !query
282+
SELECT col1, col2, col3, NULLIF('','') AS col4
283+
FROM csvTable
284+
UNION ALL
285+
SELECT col2, col2, null AS col3, col4
286+
FROM csvTable
287+
-- !query analysis
288+
Union false, false
289+
:- Project [col1#x, col2#x, col3#x, cast(col4#x as bigint) AS col4#xL]
290+
: +- Project [col1#x, col2#x, col3#x, nullif(, ) AS col4#x]
291+
: +- SubqueryAlias spark_catalog.default.csvtable
292+
: +- Relation spark_catalog.default.csvtable[col1#x,col2#x,col3#x,col4#x] csv
293+
+- Project [col2#x, col2#x AS col2#x, col3#x, col4#xL]
294+
+- Project [col2#x, col2#x, cast(col3#x as int) AS col3#x, cast(col4#x as bigint) AS col4#xL]
295+
+- Project [col2#x, col2#x, null AS col3#x, col4#x]
296+
+- SubqueryAlias spark_catalog.default.csvtable
297+
+- Relation spark_catalog.default.csvtable[col1#x,col2#x,col3#x,col4#x] csv
298+
299+
300+
-- !query
301+
SELECT col1, col2, col3, NULLIF('','') AS col4
302+
FROM jsonTable
303+
UNION ALL
304+
SELECT col2, col2, null AS col3, col4
305+
FROM jsonTable
306+
-- !query analysis
307+
Union false, false
308+
:- Project [col1#x, col2#x, col3#x, cast(col4#x as bigint) AS col4#xL]
309+
: +- Project [col1#x, col2#x, col3#x, nullif(, ) AS col4#x]
310+
: +- SubqueryAlias spark_catalog.default.jsontable
311+
: +- Relation spark_catalog.default.jsontable[col1#x,col2#x,col3#x,col4#x] json
312+
+- Project [col2#x, col2#x AS col2#x, col3#x, col4#xL]
313+
+- Project [col2#x, col2#x, cast(col3#x as int) AS col3#x, cast(col4#x as bigint) AS col4#xL]
314+
+- Project [col2#x, col2#x, null AS col3#x, col4#x]
315+
+- SubqueryAlias spark_catalog.default.jsontable
316+
+- Relation spark_catalog.default.jsontable[col1#x,col2#x,col3#x,col4#x] json
317+
318+
244319
-- !query
245320
DROP VIEW IF EXISTS t1
246321
-- !query analysis
@@ -275,3 +350,24 @@ DropTempViewCommand p2
275350
DROP VIEW IF EXISTS p3
276351
-- !query analysis
277352
DropTempViewCommand p3
353+
354+
355+
-- !query
356+
DROP TABLE IF EXISTS parquetTable
357+
-- !query analysis
358+
DropTable true, false
359+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.parquetTable
360+
361+
362+
-- !query
363+
DROP TABLE IF EXISTS csvTable
364+
-- !query analysis
365+
DropTable true, false
366+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.csvTable
367+
368+
369+
-- !query
370+
DROP TABLE IF EXISTS jsonTable
371+
-- !query analysis
372+
DropTable true, false
373+
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.jsonTable

sql/core/src/test/resources/sql-tests/inputs/union.sql

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2);
22
CREATE OR REPLACE TEMPORARY VIEW t2 AS VALUES (1.0, 1), (2.0, 4) tbl(c1, c2);
3+
CREATE TABLE parquetTable (col1 INT, col2 INT, col3 INT, col4 INT) USING parquet;
4+
CREATE TABLE csvTable (col1 INT, col2 INT, col3 INT, col4 INT) USING csv;
5+
CREATE TABLE jsonTable (col1 INT, col2 INT, col3 INT, col4 INT) USING json;
36

47
-- Simple Union
58
SELECT *
@@ -59,10 +62,32 @@ SELECT SUM(t.v) FROM (
5962
SELECT v + v AS v FROM t3
6063
) t;
6164

65+
-- SPARK-52462: UNION should produce consistent results with different underlying table providers.
66+
SELECT col1, col2, col3, NULLIF('','') AS col4
67+
FROM parquetTable
68+
UNION ALL
69+
SELECT col2, col2, null AS col3, col4
70+
FROM parquetTable;
71+
72+
SELECT col1, col2, col3, NULLIF('','') AS col4
73+
FROM csvTable
74+
UNION ALL
75+
SELECT col2, col2, null AS col3, col4
76+
FROM csvTable;
77+
78+
SELECT col1, col2, col3, NULLIF('','') AS col4
79+
FROM jsonTable
80+
UNION ALL
81+
SELECT col2, col2, null AS col3, col4
82+
FROM jsonTable;
83+
6284
-- Clean-up
6385
DROP VIEW IF EXISTS t1;
6486
DROP VIEW IF EXISTS t2;
6587
DROP VIEW IF EXISTS t3;
6688
DROP VIEW IF EXISTS p1;
6789
DROP VIEW IF EXISTS p2;
6890
DROP VIEW IF EXISTS p3;
91+
DROP TABLE IF EXISTS parquetTable;
92+
DROP TABLE IF EXISTS csvTable;
93+
DROP TABLE IF EXISTS jsonTable;

sql/core/src/test/resources/sql-tests/results/union.sql.out

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,30 @@ struct<>
1515

1616

1717

18+
-- !query
19+
CREATE TABLE parquetTable (col1 INT, col2 INT, col3 INT, col4 INT) USING parquet
20+
-- !query schema
21+
struct<>
22+
-- !query output
23+
24+
25+
26+
-- !query
27+
CREATE TABLE csvTable (col1 INT, col2 INT, col3 INT, col4 INT) USING csv
28+
-- !query schema
29+
struct<>
30+
-- !query output
31+
32+
33+
34+
-- !query
35+
CREATE TABLE jsonTable (col1 INT, col2 INT, col3 INT, col4 INT) USING json
36+
-- !query schema
37+
struct<>
38+
-- !query output
39+
40+
41+
1842
-- !query
1943
SELECT *
2044
FROM (SELECT * FROM t1
@@ -200,6 +224,42 @@ struct<sum(v):decimal(21,0)>
200224
3
201225

202226

227+
-- !query
228+
SELECT col1, col2, col3, NULLIF('','') AS col4
229+
FROM parquetTable
230+
UNION ALL
231+
SELECT col2, col2, null AS col3, col4
232+
FROM parquetTable
233+
-- !query schema
234+
struct<col1:int,col2:int,col3:int,col4:bigint>
235+
-- !query output
236+
237+
238+
239+
-- !query
240+
SELECT col1, col2, col3, NULLIF('','') AS col4
241+
FROM csvTable
242+
UNION ALL
243+
SELECT col2, col2, null AS col3, col4
244+
FROM csvTable
245+
-- !query schema
246+
struct<col1:int,col2:int,col3:int,col4:bigint>
247+
-- !query output
248+
249+
250+
251+
-- !query
252+
SELECT col1, col2, col3, NULLIF('','') AS col4
253+
FROM jsonTable
254+
UNION ALL
255+
SELECT col2, col2, null AS col3, col4
256+
FROM jsonTable
257+
-- !query schema
258+
struct<col1:int,col2:int,col3:int,col4:bigint>
259+
-- !query output
260+
261+
262+
203263
-- !query
204264
DROP VIEW IF EXISTS t1
205265
-- !query schema
@@ -246,3 +306,27 @@ DROP VIEW IF EXISTS p3
246306
struct<>
247307
-- !query output
248308

309+
310+
311+
-- !query
312+
DROP TABLE IF EXISTS parquetTable
313+
-- !query schema
314+
struct<>
315+
-- !query output
316+
317+
318+
319+
-- !query
320+
DROP TABLE IF EXISTS csvTable
321+
-- !query schema
322+
struct<>
323+
-- !query output
324+
325+
326+
327+
-- !query
328+
DROP TABLE IF EXISTS jsonTable
329+
-- !query schema
330+
struct<>
331+
-- !query output
332+

0 commit comments

Comments
 (0)