From 30c45fbaffd1038a6289a0e5267ebd4203dec978 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 28 Jan 2020 15:08:06 -0800 Subject: [PATCH 1/4] init --- .../Resources/people.json | 6 +- .../UdfTests/UdfComplexTypesTests.cs | 106 ++++++++++++++---- .../Command/SqlCommandExecutor.cs | 11 +- .../Microsoft.Spark/Sql/RowCollector.cs | 20 +--- .../Microsoft.Spark/Sql/RowConstructor.cs | 9 -- 5 files changed, 100 insertions(+), 52 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json b/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json index 9835d5c0e..e31b34076 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json +++ b/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json @@ -1,3 +1,3 @@ -{"name":"Michael", "ids":[1], "info":{"city":"Burdwan", "state":"Paschimbanga"}} -{"name":"Andy", "age":30, "ids":[3,5], "info":{"city":"Los Angeles", "state":"California"}} -{"name":"Justin", "age":19, "ids":[2,4], "info":{"city":"Seattle"}} +{"name":"Michael", "ids":[1], "info1":{"city":"Burdwan"}, "info2":{"state":"Paschimbanga"}, "info3":{"company":{"job":"Developer"}}}" +{"name":"Andy", "age":30, "ids":[3,5], "info1":{"city":"Los Angeles"}, "info2":{"state":"California"}, "info3":{"company":{"job":"Developer"}}} +{"name":"Justin", "age":19, "ids":[2,4], "info1":{"city":"Seattle"}, "info2":{"state":"Washington"}, "info3":{"company":{"job":"Developer"}}} diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 57eff0019..b9e352843 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -136,20 +136,58 @@ public void TestUdfWithReturnAsMapType() [Fact] public void TestUdfWithRowType() { - Func udf = Udf( - (row) => - { - string city = row.GetAs("city"); - string state = row.GetAs("state"); - return $"{city},{state}"; - }); + // Single Row + { + Func udf = Udf( + (row) => + { + string city = row.GetAs("city"); + return $"{city}"; + }); - Row[] rows = _df.Select(udf(_df["info"])).Collect().ToArray(); - Assert.Equal(3, rows.Length); + Row[] rows = _df.Select(udf(_df["info1"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); - var expected = new[] { "Burdwan,Paschimbanga", "Los Angeles,California", "Seattle," }; - string[] actual = rows.Select(x => x[0].ToString()).ToArray(); - Assert.Equal(expected, actual); + var expected = new[] { "Burdwan", "Los Angeles", "Seattle" }; + string[] actual = rows.Select(x => x[0].ToString()).ToArray(); + Assert.Equal(expected, actual); + } + + // Row is a part of top-level column. + { + Func udf = Udf( + (row1, row2, str) => + { + string city = row1.GetAs("city"); + string state = row2.GetAs("state"); + return $"{str}:{city},{state}"; + }); + + Row[] rows = _df.Select(udf(_df["info1"], _df["info2"], _df["name"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); + + var expected = new[] { "Michael:Burdwan,Paschimbanga", "Andy:Los Angeles,California", "Justin:Seattle,Washington" }; + string[] actual = rows.Select(x => x[0].ToString()).ToArray(); + Assert.Equal(expected, actual); + } + + // Nested Row + { + Func udf = Udf( + (row) => + { + Row outerCol = row.GetAs("company"); + string job = outerCol.GetAs("job"); + return $"{job}"; + }); + + Row[] rows = _df.Select(udf(_df["info3"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); + + var expected = new[] { "Developer", "Developer", "Developer" }; + string[] actual = rows.Select(x => x[0].ToString()).ToArray(); + Assert.Equal(expected, actual); + } } /// @@ -168,14 +206,40 @@ public void TestUdfWithReturnAsRowType() Func udf = Udf( str => new GenericRow(new object[] { 1, "abc" }), schema); - Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"]).As("col")).Collect().ToArray(); + Assert.Equal(3, rows.Length); + foreach (Row row in rows) + { + Assert.Equal(1, row.Size()); + Row outerCol = row.GetAs("col"); + Assert.Equal(2, outerCol.Size()); + Assert.Equal(1, outerCol.GetAs("col1")); + Assert.Equal("abc", outerCol.GetAs("col2")); + } + } + + // Generic row is a part of top-level column. + { + var schema = new StructType(new[] + { + new StructField("col1", new IntegerType()) + }); + Func udf = Udf( + str => new GenericRow(new object[] { 111 }), schema); + + Column nameCol = _df["name"]; + Row[] rows = _df.Select(udf(nameCol).As("col"), nameCol).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) { Assert.Equal(2, row.Size()); - Assert.Equal(1, row.GetAs("col1")); - Assert.Equal("abc", row.GetAs("col2")); + Row col1 = row.GetAs("col"); + Assert.Equal(1, col1.Size()); + Assert.Equal(111, col1.GetAs("col1")); + + string col2 = row.GetAs("name"); + Assert.NotEmpty(col2); } } @@ -211,21 +275,23 @@ public void TestUdfWithReturnAsRowType() }), schema); - Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"]).As("col")).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) { - Assert.Equal(3, row.Size()); - Assert.Equal(1, row.GetAs("col1")); + Assert.Equal(1, row.Size()); + Row outerCol = row.GetAs("col"); + Assert.Equal(3, outerCol.Size()); + Assert.Equal(1, outerCol.GetAs("col1")); Assert.Equal( new Row(new object[] { 1 }, subSchema1), - row.GetAs("col2")); + outerCol.GetAs("col2")); Assert.Equal( new Row( new object[] { "abc", new Row(new object[] { 10 }, subSchema1) }, subSchema2), - row.GetAs("col3")); + outerCol.GetAs("col3")); } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 82fcee8fe..09f6fe055 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -135,8 +135,17 @@ protected override CommandExecutorStat ExecuteCore( for (int i = 0; i < inputRows.Length; ++i) { + object row = inputRows[i]; + // The following can happen if an UDF take Row object(s). + // The JVM Spark side sends a Row object that wraps all the columns used + // in the UDF, thus, it is normalized below (the extra layer is removed). + if (row is RowConstructor rowConstructor) + { + row = rowConstructor.GetRow().Values; + } + // Split id is not used for SQL UDFs, so 0 is passed. - outputRows.Add(commandRunner.Run(0, inputRows[i])); + outputRows.Add(commandRunner.Run(0, row)); } // The initial (estimated) buffer size for pickling rows is set to the size of diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index c66687219..f48545ea6 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -2,7 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; using System.Collections.Generic; using System.IO; using Microsoft.Spark.Interop.Ipc; @@ -34,24 +33,7 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { - // Unpickled object can be either a RowConstructor object (not materialized), - // or a Row object (materialized). Refer to RowConstruct.construct() to see how - // Row objects are unpickled. - switch (unpickled) - { - case RowConstructor rc: - yield return rc.GetRow(); - break; - - case object[] objs when objs.Length == 1 && (objs[0] is Row row): - yield return row; - break; - - default: - throw new NotSupportedException( - string.Format("Unpickle type {0} is not supported", - unpickled.GetType())); - } + yield return (unpickled as RowConstructor).GetRow(); } } } diff --git a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs index c01746f95..083e94cb5 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs @@ -65,15 +65,6 @@ public object construct(object[] args) s_schemaCache = new Dictionary(); } - // When a row is ready to be materialized, then construct() is called - // on the RowConstructor which represents the row. - if ((args.Length == 1) && (args[0] is RowConstructor rowConstructor)) - { - // Construct the Row and return args containing the Row. - args[0] = rowConstructor.GetRow(); - return args; - } - // Return a new RowConstructor where the args either represent the // schema or the row data. The parent becomes important when calling // GetRow() on the RowConstructor containing the row data. From e442db1e5a4be8e4f65d7e8985f15e4d29458b51 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 28 Jan 2020 15:18:32 -0800 Subject: [PATCH 2/4] fixes --- .../Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs | 5 ++++- .../Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index b9e352843..8aae98eb6 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -166,7 +166,10 @@ public void TestUdfWithRowType() Row[] rows = _df.Select(udf(_df["info1"], _df["info2"], _df["name"])).Collect().ToArray(); Assert.Equal(3, rows.Length); - var expected = new[] { "Michael:Burdwan,Paschimbanga", "Andy:Los Angeles,California", "Justin:Seattle,Washington" }; + var expected = new[] { + "Michael:Burdwan,Paschimbanga", + "Andy:Los Angeles,California", + "Justin:Seattle,Washington" }; string[] actual = rows.Select(x => x[0].ToString()).ToArray(); Assert.Equal(expected, actual); } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 09f6fe055..a27fa88f9 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -136,7 +136,7 @@ protected override CommandExecutorStat ExecuteCore( for (int i = 0; i < inputRows.Length; ++i) { object row = inputRows[i]; - // The following can happen if an UDF take Row object(s). + // The following can happen if an UDF takes Row object(s). // The JVM Spark side sends a Row object that wraps all the columns used // in the UDF, thus, it is normalized below (the extra layer is removed). if (row is RowConstructor rowConstructor) From 9723c8dd621d04f3f42a9427acdca01c92d2219a Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 28 Jan 2020 18:35:58 -0800 Subject: [PATCH 3/4] resolve comments --- .../UdfTests/UdfComplexTypesTests.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 8aae98eb6..12101767d 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -141,8 +141,7 @@ public void TestUdfWithRowType() Func udf = Udf( (row) => { - string city = row.GetAs("city"); - return $"{city}"; + return row.GetAs("city"); }); Row[] rows = _df.Select(udf(_df["info1"])).Collect().ToArray(); @@ -153,7 +152,7 @@ public void TestUdfWithRowType() Assert.Equal(expected, actual); } - // Row is a part of top-level column. + // Multiple Rows { Func udf = Udf( (row1, row2, str) => @@ -163,7 +162,10 @@ public void TestUdfWithRowType() return $"{str}:{city},{state}"; }); - Row[] rows = _df.Select(udf(_df["info1"], _df["info2"], _df["name"])).Collect().ToArray(); + Row[] rows = _df + .Select(udf(_df["info1"], _df["info2"], _df["name"])) + .Collect() + .ToArray(); Assert.Equal(3, rows.Length); var expected = new[] { @@ -180,8 +182,7 @@ public void TestUdfWithRowType() (row) => { Row outerCol = row.GetAs("company"); - string job = outerCol.GetAs("job"); - return $"{job}"; + return outerCol.GetAs("job"); }); Row[] rows = _df.Select(udf(_df["info3"])).Collect().ToArray(); From baf051cd266ed75603c780b5c5cf89db209288f1 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 28 Jan 2020 18:44:26 -0800 Subject: [PATCH 4/4] resolve comments --- .../Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 12101767d..4d8153829 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -139,10 +139,7 @@ public void TestUdfWithRowType() // Single Row { Func udf = Udf( - (row) => - { - return row.GetAs("city"); - }); + (row) => row.GetAs("city")); Row[] rows = _df.Select(udf(_df["info1"])).Collect().ToArray(); Assert.Equal(3, rows.Length);