diff --git a/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json b/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json index 9835d5c0e..e31b34076 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json +++ b/src/csharp/Microsoft.Spark.E2ETest/Resources/people.json @@ -1,3 +1,3 @@ -{"name":"Michael", "ids":[1], "info":{"city":"Burdwan", "state":"Paschimbanga"}} -{"name":"Andy", "age":30, "ids":[3,5], "info":{"city":"Los Angeles", "state":"California"}} -{"name":"Justin", "age":19, "ids":[2,4], "info":{"city":"Seattle"}} +{"name":"Michael", "ids":[1], "info1":{"city":"Burdwan"}, "info2":{"state":"Paschimbanga"}, "info3":{"company":{"job":"Developer"}}}" +{"name":"Andy", "age":30, "ids":[3,5], "info1":{"city":"Los Angeles"}, "info2":{"state":"California"}, "info3":{"company":{"job":"Developer"}}} +{"name":"Justin", "age":19, "ids":[2,4], "info1":{"city":"Seattle"}, "info2":{"state":"Washington"}, "info3":{"company":{"job":"Developer"}}} diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 57eff0019..4d8153829 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -136,20 +136,59 @@ public void TestUdfWithReturnAsMapType() [Fact] public void TestUdfWithRowType() { - Func udf = Udf( - (row) => - { - string city = row.GetAs("city"); - string state = row.GetAs("state"); - return $"{city},{state}"; - }); + // Single Row + { + Func udf = Udf( + (row) => row.GetAs("city")); - Row[] rows = _df.Select(udf(_df["info"])).Collect().ToArray(); - Assert.Equal(3, rows.Length); + Row[] rows = _df.Select(udf(_df["info1"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); - var expected = new[] { "Burdwan,Paschimbanga", "Los Angeles,California", "Seattle," }; - string[] actual = rows.Select(x => x[0].ToString()).ToArray(); - Assert.Equal(expected, actual); + var expected = new[] { "Burdwan", "Los Angeles", "Seattle" }; + string[] actual = rows.Select(x => x[0].ToString()).ToArray(); + Assert.Equal(expected, actual); + } + + // Multiple Rows + { + Func udf = Udf( + (row1, row2, str) => + { + string city = row1.GetAs("city"); + string state = row2.GetAs("state"); + return $"{str}:{city},{state}"; + }); + + Row[] rows = _df + .Select(udf(_df["info1"], _df["info2"], _df["name"])) + .Collect() + .ToArray(); + Assert.Equal(3, rows.Length); + + var expected = new[] { + "Michael:Burdwan,Paschimbanga", + "Andy:Los Angeles,California", + "Justin:Seattle,Washington" }; + string[] actual = rows.Select(x => x[0].ToString()).ToArray(); + Assert.Equal(expected, actual); + } + + // Nested Row + { + Func udf = Udf( + (row) => + { + Row outerCol = row.GetAs("company"); + return outerCol.GetAs("job"); + }); + + Row[] rows = _df.Select(udf(_df["info3"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); + + var expected = new[] { "Developer", "Developer", "Developer" }; + string[] actual = rows.Select(x => x[0].ToString()).ToArray(); + Assert.Equal(expected, actual); + } } /// @@ -168,14 +207,40 @@ public void TestUdfWithReturnAsRowType() Func udf = Udf( str => new GenericRow(new object[] { 1, "abc" }), schema); - Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"]).As("col")).Collect().ToArray(); + Assert.Equal(3, rows.Length); + foreach (Row row in rows) + { + Assert.Equal(1, row.Size()); + Row outerCol = row.GetAs("col"); + Assert.Equal(2, outerCol.Size()); + Assert.Equal(1, outerCol.GetAs("col1")); + Assert.Equal("abc", outerCol.GetAs("col2")); + } + } + + // Generic row is a part of top-level column. + { + var schema = new StructType(new[] + { + new StructField("col1", new IntegerType()) + }); + Func udf = Udf( + str => new GenericRow(new object[] { 111 }), schema); + + Column nameCol = _df["name"]; + Row[] rows = _df.Select(udf(nameCol).As("col"), nameCol).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) { Assert.Equal(2, row.Size()); - Assert.Equal(1, row.GetAs("col1")); - Assert.Equal("abc", row.GetAs("col2")); + Row col1 = row.GetAs("col"); + Assert.Equal(1, col1.Size()); + Assert.Equal(111, col1.GetAs("col1")); + + string col2 = row.GetAs("name"); + Assert.NotEmpty(col2); } } @@ -211,21 +276,23 @@ public void TestUdfWithReturnAsRowType() }), schema); - Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"]).As("col")).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) { - Assert.Equal(3, row.Size()); - Assert.Equal(1, row.GetAs("col1")); + Assert.Equal(1, row.Size()); + Row outerCol = row.GetAs("col"); + Assert.Equal(3, outerCol.Size()); + Assert.Equal(1, outerCol.GetAs("col1")); Assert.Equal( new Row(new object[] { 1 }, subSchema1), - row.GetAs("col2")); + outerCol.GetAs("col2")); Assert.Equal( new Row( new object[] { "abc", new Row(new object[] { 10 }, subSchema1) }, subSchema2), - row.GetAs("col3")); + outerCol.GetAs("col3")); } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 82fcee8fe..a27fa88f9 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -135,8 +135,17 @@ protected override CommandExecutorStat ExecuteCore( for (int i = 0; i < inputRows.Length; ++i) { + object row = inputRows[i]; + // The following can happen if an UDF takes Row object(s). + // The JVM Spark side sends a Row object that wraps all the columns used + // in the UDF, thus, it is normalized below (the extra layer is removed). + if (row is RowConstructor rowConstructor) + { + row = rowConstructor.GetRow().Values; + } + // Split id is not used for SQL UDFs, so 0 is passed. - outputRows.Add(commandRunner.Run(0, inputRows[i])); + outputRows.Add(commandRunner.Run(0, row)); } // The initial (estimated) buffer size for pickling rows is set to the size of diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index c66687219..f48545ea6 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -2,7 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; using System.Collections.Generic; using System.IO; using Microsoft.Spark.Interop.Ipc; @@ -34,24 +33,7 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { - // Unpickled object can be either a RowConstructor object (not materialized), - // or a Row object (materialized). Refer to RowConstruct.construct() to see how - // Row objects are unpickled. - switch (unpickled) - { - case RowConstructor rc: - yield return rc.GetRow(); - break; - - case object[] objs when objs.Length == 1 && (objs[0] is Row row): - yield return row; - break; - - default: - throw new NotSupportedException( - string.Format("Unpickle type {0} is not supported", - unpickled.GetType())); - } + yield return (unpickled as RowConstructor).GetRow(); } } } diff --git a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs index c01746f95..083e94cb5 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs @@ -65,15 +65,6 @@ public object construct(object[] args) s_schemaCache = new Dictionary(); } - // When a row is ready to be materialized, then construct() is called - // on the RowConstructor which represents the row. - if ((args.Length == 1) && (args[0] is RowConstructor rowConstructor)) - { - // Construct the Row and return args containing the Row. - args[0] = rowConstructor.GetRow(); - return args; - } - // Return a new RowConstructor where the args either represent the // schema or the row data. The parent becomes important when calling // GetRow() on the RowConstructor containing the row data.