From 764ceab7c8faf3965b5431137935551f8591ddfd Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Thu, 19 Dec 2019 03:07:18 -0800 Subject: [PATCH 01/28] init --- .../Command/SqlCommandExecutor.cs | 2 +- src/csharp/Microsoft.Spark/Sql/Functions.cs | 193 +++++++++++++++++- .../Microsoft.Spark/Utils/PythonSerDe.cs | 55 ++++- 3 files changed, 246 insertions(+), 4 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 82fcee8fe..f1b11903a 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -165,7 +165,7 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; - Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); + Pickler pickler = s_pickler ?? (s_pickler = PythonSerDe.CreatePickler()); pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) diff --git a/src/csharp/Microsoft.Spark/Sql/Functions.cs b/src/csharp/Microsoft.Spark/Sql/Functions.cs index 260a38352..e57644c68 100644 --- a/src/csharp/Microsoft.Spark/Sql/Functions.cs +++ b/src/csharp/Microsoft.Spark/Sql/Functions.cs @@ -8,6 +8,7 @@ using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql.Expressions; +using Microsoft.Spark.Sql.Types; using Microsoft.Spark.Utils; namespace Microsoft.Spark.Sql @@ -3797,6 +3798,188 @@ public static Func(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf)).Apply10; } + /// Creates a UDF from the specified delegate. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf(Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply0; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf(Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply1; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf(Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply2; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply3; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply4; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// Specifies the type of the fifth argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply5; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// Specifies the type of the fifth argument to the UDF. + /// Specifies the type of the sixth argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply6; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// Specifies the type of the fifth argument to the UDF. + /// Specifies the type of the sixth argument to the UDF. + /// Specifies the type of the seventh argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply7; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// Specifies the type of the fifth argument to the UDF. + /// Specifies the type of the sixth argument to the UDF. + /// Specifies the type of the seventh argument to the UDF. + /// Specifies the type of the eighth argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply8; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// Specifies the type of the fifth argument to the UDF. + /// Specifies the type of the sixth argument to the UDF. + /// Specifies the type of the seventh argument to the UDF. + /// Specifies the type of the eighth argument to the UDF. + /// Specifies the type of the ninth argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// A delegate that when invoked will return a for the result of the UDF. + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply9; + } + + /// Creates a UDF from the specified delegate. + /// Specifies the type of the first argument to the UDF. + /// Specifies the type of the second argument to the UDF. + /// Specifies the type of the third argument to the UDF. + /// Specifies the type of the fourth argument to the UDF. + /// Specifies the type of the fifth argument to the UDF. + /// Specifies the type of the sixth argument to the UDF. + /// Specifies the type of the seventh argument to the UDF. + /// Specifies the type of the eighth argument to the UDF. + /// Specifies the type of the ninth argument to the UDF. + /// Specifies the type of the tenth argument to the UDF. + /// The UDF function implementation. + /// Schema associated with this row + /// + /// A delegate that returns a for the result of the UDF. + /// + public static Func Udf( + Func udf, StructType returnType) + { + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply10; + } + /// Creates a Vector UDF from the specified delegate. /// Specifies the type of the first argument to the UDF. /// Specifies the return type of the UDF. @@ -4071,6 +4254,11 @@ private static UserDefinedFunction CreateUdf(string name, Delegate exec return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF); } + private static UserDefinedFunction CreateUdf(string name, Delegate execute, StructType returnType) + { + return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF, returnType); + } + private static UserDefinedFunction CreateVectorUdf(string name, Delegate execute) { return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_SCALAR_PANDAS_UDF); @@ -4079,7 +4267,8 @@ private static UserDefinedFunction CreateVectorUdf(string name, Delegat private static UserDefinedFunction CreateUdf( string name, Delegate execute, - UdfUtils.PythonEvalType evalType) + UdfUtils.PythonEvalType evalType, + StructType returnType = null) { return UserDefinedFunction.Create( name, @@ -4088,7 +4277,7 @@ private static UserDefinedFunction CreateUdf( CommandSerDe.SerializedMode.Row, CommandSerDe.SerializedMode.Row), evalType, - UdfUtils.GetReturnType(typeof(TResult))); + returnType.Json ?? UdfUtils.GetReturnType(typeof(TResult))); } private static Column ApplyFunction(string funcName) diff --git a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs index d5a374d55..a3461300c 100644 --- a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs @@ -6,6 +6,7 @@ using System.Buffers; using System.Diagnostics; using System.IO; +using System.Text; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Razorvine.Pickle; @@ -56,7 +57,7 @@ internal static object[] GetUnpickledObjects(Stream stream, int messageLength) var unpickler = new Unpickler(); object unpickledItems = unpickler.loads( - new ReadOnlyMemory(buffer, 0, messageLength), + new ReadOnlyMemory(buffer, 0, messageLength), stackCapacity: 102); // Spark sends batches of 100 rows, and +2 is for markers. s_rowConstructor.Reset(); Debug.Assert(unpickledItems != null); @@ -67,5 +68,57 @@ internal static object[] GetUnpickledObjects(Stream stream, int messageLength) ArrayPool.Shared.Return(buffer); } } + + /// + /// Custom pickler for GenericRow objects. + /// Refer to + /// spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala + /// + internal class GenericRowPickler : IObjectPickler + { + private readonly string _module = "pyspark.sql.types"; + + public void Register() + { + Pickler.registerCustomPickler(GetType(), this); + Pickler.registerCustomPickler(typeof(GenericRow), this); + } + + public void pickle(object o, Stream stream, Pickler currentPickler) + { + if (o.Equals(this)) + { + SerDe.Write(stream, Opcodes.GLOBAL); + SerDe.Write(stream, Encoding.UTF8.GetBytes( + $"{_module}\n_create_row_inbound_converter\n")); + } + else + { + if (!(o is GenericRow genericRow)) + { + throw new InvalidOperationException("A GenericRow object is expected."); + } + + currentPickler.save(this); + SerDe.Write(stream, Opcodes.TUPLE1); + SerDe.Write(stream, Opcodes.REDUCE); + + SerDe.Write(stream, Opcodes.MARK); + for (int i = 0; i < genericRow.Size(); ++i) + { + currentPickler.save(genericRow.Get(i)); + } + + SerDe.Write(stream, Opcodes.TUPLE); + SerDe.Write(stream, Opcodes.REDUCE); + } + } + } + + internal static Pickler CreatePickler() + { + new GenericRowPickler().Register(); + return new Pickler(); + } } } From 47fdb374b89da59c9c25da92b8d6e26ab43f844e Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Mon, 23 Dec 2019 17:30:48 -0800 Subject: [PATCH 02/28] add tests --- .../UdfTests/UdfComplexTypesTests.cs | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 43f189b5e..88535dde6 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -158,25 +158,15 @@ public void TestUdfWithRowType() [Fact] public void TestUdfWithReturnAsRowType() { - // UDF with return as RowType throws a following exception: - // Unhandled Exception: System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. - // --->System.ArgumentException: System.Object is not supported. - // at Microsoft.Spark.Utils.UdfUtils.GetReturnType(Type type) in Microsoft.Spark\Utils\UdfUtils.cs:line 142 - // at Microsoft.Spark.Utils.UdfUtils.GetReturnType(Type type) in Microsoft.Spark\Utils\UdfUtils.cs:line 136 - // at Microsoft.Spark.Sql.Functions.CreateUdf[TResult](String name, Delegate execute, PythonEvalType evalType) in Microsoft.Spark\Sql\Functions.cs:line 4053 - // at Microsoft.Spark.Sql.Functions.CreateUdf[TResult](String name, Delegate execute) in Microsoft.Spark\Sql\Functions.cs:line 4040 - // at Microsoft.Spark.Sql.Functions.Udf[T, TResult](Func`2 udf) in Microsoft.Spark\Sql\Functions.cs:line 3607 - Assert.Throws(() => Udf( - (str) => - { - var structFields = new List() - { - new StructField("name", new StringType()), - }; - var schema = new StructType(structFields); - var row = new Row(new object[] { str }, schema); - return row; - })); + var schema = new StructType(new[] + { + new StructField("name", new StringType()) + }); + Func udf = Udf( + str => new GenericRow(new object[] { "Hello" + str }), schema); + + Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); } } } From 7c965ee19c526cdafec14777c2f77a64b4fda306 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 3 Jan 2020 00:30:48 -0800 Subject: [PATCH 03/28] change pickling method --- .../Command/SqlCommandExecutor.cs | 7 +- src/csharp/Microsoft.Spark/Sql/GenericRow.cs | 103 ++++++++++++++++++ .../Microsoft.Spark/Utils/PythonSerDe.cs | 53 --------- 3 files changed, 109 insertions(+), 54 deletions(-) create mode 100644 src/csharp/Microsoft.Spark/Sql/GenericRow.cs diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index f1b11903a..0921b43f7 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -165,7 +165,12 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; - Pickler pickler = s_pickler ?? (s_pickler = PythonSerDe.CreatePickler()); + if (rows.FirstOrDefault() is GenericRow) + { + rows = rows.Select(r => (object)(r as GenericRow).Values).AsEnumerable(); + } + + Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) diff --git a/src/csharp/Microsoft.Spark/Sql/GenericRow.cs b/src/csharp/Microsoft.Spark/Sql/GenericRow.cs new file mode 100644 index 000000000..1e623d9a4 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Sql/GenericRow.cs @@ -0,0 +1,103 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Microsoft.Spark.Sql +{ + /// + /// Represents a row object in RDD, equivalent to GenericRow in Spark. + /// + public sealed class GenericRow + { + /// + /// Constructor for the GenericRow class. + /// + /// Column values for a row + public GenericRow(object[] values) + { + Values = values; + } + + /// + /// Values representing this row. + /// + public object[] Values { get; } + + /// + /// Returns the number of columns in this row. + /// + /// Number of columns in this row + public int Size() => Values.Length; + + /// + /// Returns the column value at the given index. + /// + /// Index to look up + /// A column value + public object this[int index] => Get(index); + + /// + /// Returns the column value at the given index. + /// + /// Index to look up + /// A column value + public object Get(int index) + { + if (index >= Size()) + { + throw new IndexOutOfRangeException($"index ({index}) >= column counts ({Size()})"); + } + else if (index < 0) + { + throw new IndexOutOfRangeException($"index ({index}) < 0)"); + } + + return Values[index]; + } + + /// + /// Returns the string version of this row. + /// + /// String version of this row + public override string ToString() + { + var cols = new List(); + foreach (object item in Values) + { + cols.Add(item?.ToString() ?? string.Empty); + } + + return $"[{(string.Join(",", cols.ToArray()))}]"; + } + + /// + /// Returns the column value at the given index, as a type T. + /// TODO: If the original type is "long" and its value can be + /// fit into the "int", Pickler will serialize the value as int. + /// Since the value is boxed, will throw an exception. + /// + /// Type to convert to + /// Index to look up + /// A column value as a type T + public T GetAs(int index) => (T)Get(index); + + /// + /// Checks if the given object is same as the current object. + /// + /// Other object to compare against + /// True if the other object is equal. + public override bool Equals(object obj) => + ReferenceEquals(this, obj) || + ((obj is GenericRow row) && Values.SequenceEqual(row.Values)); + + /// + /// Returns the hash code of the current object. + /// + /// The hash code of the current object + public override int GetHashCode() => base.GetHashCode(); + } +} diff --git a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs index a3461300c..ffbf6ca16 100644 --- a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs @@ -6,7 +6,6 @@ using System.Buffers; using System.Diagnostics; using System.IO; -using System.Text; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Razorvine.Pickle; @@ -68,57 +67,5 @@ internal static object[] GetUnpickledObjects(Stream stream, int messageLength) ArrayPool.Shared.Return(buffer); } } - - /// - /// Custom pickler for GenericRow objects. - /// Refer to - /// spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala - /// - internal class GenericRowPickler : IObjectPickler - { - private readonly string _module = "pyspark.sql.types"; - - public void Register() - { - Pickler.registerCustomPickler(GetType(), this); - Pickler.registerCustomPickler(typeof(GenericRow), this); - } - - public void pickle(object o, Stream stream, Pickler currentPickler) - { - if (o.Equals(this)) - { - SerDe.Write(stream, Opcodes.GLOBAL); - SerDe.Write(stream, Encoding.UTF8.GetBytes( - $"{_module}\n_create_row_inbound_converter\n")); - } - else - { - if (!(o is GenericRow genericRow)) - { - throw new InvalidOperationException("A GenericRow object is expected."); - } - - currentPickler.save(this); - SerDe.Write(stream, Opcodes.TUPLE1); - SerDe.Write(stream, Opcodes.REDUCE); - - SerDe.Write(stream, Opcodes.MARK); - for (int i = 0; i < genericRow.Size(); ++i) - { - currentPickler.save(genericRow.Get(i)); - } - - SerDe.Write(stream, Opcodes.TUPLE); - SerDe.Write(stream, Opcodes.REDUCE); - } - } - } - - internal static Pickler CreatePickler() - { - new GenericRowPickler().Register(); - return new Pickler(); - } } } From f0f18583812f18244d7e968337c00fddfee24401 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 3 Jan 2020 18:21:39 -0800 Subject: [PATCH 04/28] add null check --- src/csharp/Microsoft.Spark/Sql/Functions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark/Sql/Functions.cs b/src/csharp/Microsoft.Spark/Sql/Functions.cs index e57644c68..3099b60fb 100644 --- a/src/csharp/Microsoft.Spark/Sql/Functions.cs +++ b/src/csharp/Microsoft.Spark/Sql/Functions.cs @@ -4277,7 +4277,7 @@ private static UserDefinedFunction CreateUdf( CommandSerDe.SerializedMode.Row, CommandSerDe.SerializedMode.Row), evalType, - returnType.Json ?? UdfUtils.GetReturnType(typeof(TResult))); + returnType?.Json ?? UdfUtils.GetReturnType(typeof(TResult))); } private static Column ApplyFunction(string funcName) From c729a2c1637e26cdf7adc12c05c96f1f2fe5175f Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Mon, 6 Jan 2020 16:33:42 -0800 Subject: [PATCH 05/28] add separate createudf fun --- src/csharp/Microsoft.Spark/Sql/Functions.cs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Sql/Functions.cs b/src/csharp/Microsoft.Spark/Sql/Functions.cs index 3099b60fb..9ea0e8a33 100644 --- a/src/csharp/Microsoft.Spark/Sql/Functions.cs +++ b/src/csharp/Microsoft.Spark/Sql/Functions.cs @@ -4256,7 +4256,7 @@ private static UserDefinedFunction CreateUdf(string name, Delegate exec private static UserDefinedFunction CreateUdf(string name, Delegate execute, StructType returnType) { - return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF, returnType); + return CreateUdfWithReturnType(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF, returnType); } private static UserDefinedFunction CreateVectorUdf(string name, Delegate execute) @@ -4265,10 +4265,25 @@ private static UserDefinedFunction CreateVectorUdf(string name, Delegat } private static UserDefinedFunction CreateUdf( + string name, + Delegate execute, + UdfUtils.PythonEvalType evalType) + { + return UserDefinedFunction.Create( + name, + CommandSerDe.Serialize( + execute, + CommandSerDe.SerializedMode.Row, + CommandSerDe.SerializedMode.Row), + evalType, + UdfUtils.GetReturnType(typeof(TResult))); + } + + private static UserDefinedFunction CreateUdfWithReturnType( string name, Delegate execute, UdfUtils.PythonEvalType evalType, - StructType returnType = null) + StructType returnType) { return UserDefinedFunction.Create( name, @@ -4277,7 +4292,7 @@ private static UserDefinedFunction CreateUdf( CommandSerDe.SerializedMode.Row, CommandSerDe.SerializedMode.Row), evalType, - returnType?.Json ?? UdfUtils.GetReturnType(typeof(TResult))); + returnType.Json); } private static Column ApplyFunction(string funcName) From c2a426f2faf80a1ee7def0ea4d7eb946f6a9534a Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Mon, 6 Jan 2020 22:23:45 -0800 Subject: [PATCH 06/28] change test --- .../UdfTests/UdfComplexTypesTests.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 88535dde6..193d7ab7f 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -160,13 +160,17 @@ public void TestUdfWithReturnAsRowType() { var schema = new StructType(new[] { - new StructField("name", new StringType()) + new StructField("col1", new IntegerType()), + new StructField("col2", new StringType()) }); Func udf = Udf( - str => new GenericRow(new object[] { "Hello" + str }), schema); + str => new GenericRow(new object[] { 1, "abc" }), schema); - Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); - Assert.Equal(3, rows.Length); + Assert.Throws( + () => _df.Select(udf(_df["name"])).Collect().ToArray()); + + // Show() works here. See the example below. + _df.Select(udf(_df["name"])).Show(); } } } From 28f438a363ff93ff3d92f437c79611e0d11f8cf2 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 7 Jan 2020 00:44:59 -0800 Subject: [PATCH 07/28] add comments --- .../UdfTests/UdfComplexTypesTests.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 193d7ab7f..0cd5cc219 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -166,6 +166,12 @@ public void TestUdfWithReturnAsRowType() Func udf = Udf( str => new GenericRow(new object[] { 1, "abc" }), schema); + // UDF with return as RowType throws a following exception: + // Unhandled Exception: System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. + // --->System.NullReferenceException: Object reference not set to an instance of an object. + // at Microsoft.Spark.Sql.RowCollector.Collect(ISocketWrapper socket) + MoveNext() in Microsoft.Spark\Sql\RowCollector.cs:line 36 + // at Microsoft.Spark.Sql.DataFrame.GetRows(String funcName) + MoveNext() in Microsoft.Spark\Sql\DataFrame.cs:line 891 + // at Microsoft.Spark.Examples.Sql.Batch.Basic.Run(String[] args) in Microsoft.Spark.CSharp.Examples\Sql\Batch\Basic.cs:line 54 Assert.Throws( () => _df.Select(udf(_df["name"])).Collect().ToArray()); From dff5b4444e100b80431c0065633683fee6098031 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 7 Jan 2020 23:46:34 -0800 Subject: [PATCH 08/28] change CreateUdf --- src/csharp/Microsoft.Spark/Sql/Functions.cs | 53 ++++++++++----------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Sql/Functions.cs b/src/csharp/Microsoft.Spark/Sql/Functions.cs index 9ea0e8a33..79c523e16 100644 --- a/src/csharp/Microsoft.Spark/Sql/Functions.cs +++ b/src/csharp/Microsoft.Spark/Sql/Functions.cs @@ -3806,7 +3806,7 @@ public static Func public static Func Udf(Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply0; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply0; } /// Creates a UDF from the specified delegate. @@ -3818,7 +3818,7 @@ public static Func Udf(Func udf, StructType returnType) /// public static Func Udf(Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply1; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply1; } /// Creates a UDF from the specified delegate. @@ -3831,7 +3831,7 @@ public static Func Udf(Func udf, StructType re /// public static Func Udf(Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply2; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply2; } /// Creates a UDF from the specified delegate. @@ -3846,7 +3846,7 @@ public static Func Udf(Func public static Func Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply3; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply3; } /// Creates a UDF from the specified delegate. @@ -3862,7 +3862,7 @@ public static Func Udf( public static Func Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply4; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply4; } /// Creates a UDF from the specified delegate. @@ -3879,7 +3879,7 @@ public static Func Udf( public static Func Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply5; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply5; } /// Creates a UDF from the specified delegate. @@ -3897,7 +3897,7 @@ public static Func Udf Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply6; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply6; } /// Creates a UDF from the specified delegate. @@ -3916,7 +3916,7 @@ public static Func Udf Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply7; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply7; } /// Creates a UDF from the specified delegate. @@ -3936,7 +3936,7 @@ public static Func Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply8; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply8; } /// Creates a UDF from the specified delegate. @@ -3955,7 +3955,7 @@ public static Func Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply9; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply9; } /// Creates a UDF from the specified delegate. @@ -3977,7 +3977,7 @@ public static Func Udf( Func udf, StructType returnType) { - return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply10; + return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply10; } /// Creates a Vector UDF from the specified delegate. @@ -4254,9 +4254,9 @@ private static UserDefinedFunction CreateUdf(string name, Delegate exec return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF); } - private static UserDefinedFunction CreateUdf(string name, Delegate execute, StructType returnType) + private static UserDefinedFunction CreateUdf(string name, Delegate execute, StructType returnType) { - return CreateUdfWithReturnType(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF, returnType); + return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF, returnType); } private static UserDefinedFunction CreateVectorUdf(string name, Delegate execute) @@ -4267,23 +4267,21 @@ private static UserDefinedFunction CreateVectorUdf(string name, Delegat private static UserDefinedFunction CreateUdf( string name, Delegate execute, - UdfUtils.PythonEvalType evalType) - { - return UserDefinedFunction.Create( - name, - CommandSerDe.Serialize( - execute, - CommandSerDe.SerializedMode.Row, - CommandSerDe.SerializedMode.Row), - evalType, - UdfUtils.GetReturnType(typeof(TResult))); - } + UdfUtils.PythonEvalType evalType) => + CreateUdf(name, execute, evalType, UdfUtils.GetReturnType(typeof(TResult))); + + private static UserDefinedFunction CreateUdf( + string name, + Delegate execute, + UdfUtils.PythonEvalType evalType, + StructType returnType) => + CreateUdf(name, execute, evalType, returnType.Json); - private static UserDefinedFunction CreateUdfWithReturnType( + private static UserDefinedFunction CreateUdf( string name, Delegate execute, UdfUtils.PythonEvalType evalType, - StructType returnType) + string returnType) { return UserDefinedFunction.Create( name, @@ -4291,8 +4289,7 @@ private static UserDefinedFunction CreateUdfWithReturnType( execute, CommandSerDe.SerializedMode.Row, CommandSerDe.SerializedMode.Row), - evalType, - returnType.Json); + evalType, returnType); } private static Column ApplyFunction(string funcName) From 68f8cc5f2df31217e051d28ccf3b8a5584d44f84 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 8 Jan 2020 00:12:45 -0800 Subject: [PATCH 09/28] edit test --- .../UdfTests/UdfComplexTypesTests.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 0cd5cc219..462477c66 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -172,11 +172,13 @@ public void TestUdfWithReturnAsRowType() // at Microsoft.Spark.Sql.RowCollector.Collect(ISocketWrapper socket) + MoveNext() in Microsoft.Spark\Sql\RowCollector.cs:line 36 // at Microsoft.Spark.Sql.DataFrame.GetRows(String funcName) + MoveNext() in Microsoft.Spark\Sql\DataFrame.cs:line 891 // at Microsoft.Spark.Examples.Sql.Batch.Basic.Run(String[] args) in Microsoft.Spark.CSharp.Examples\Sql\Batch\Basic.cs:line 54 - Assert.Throws( - () => _df.Select(udf(_df["name"])).Collect().ToArray()); + // Assert.Throws( + // () => _df.Select(udf(_df["name"])).Collect().ToArray()); + + _df.Select(udf(_df["name"])).Collect().ToArray(); // Show() works here. See the example below. - _df.Select(udf(_df["name"])).Show(); + // _df.Select(udf(_df["name"])).Show(); } } } From c315231bbfcc0c8dd7178bafbab64b588c74210b Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 8 Jan 2020 14:12:19 -0800 Subject: [PATCH 10/28] change RowConstructor --- src/csharp/Microsoft.Spark/Sql/RowConstructor.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs index c01746f95..af596bc77 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs @@ -69,9 +69,7 @@ public object construct(object[] args) // on the RowConstructor which represents the row. if ((args.Length == 1) && (args[0] is RowConstructor rowConstructor)) { - // Construct the Row and return args containing the Row. - args[0] = rowConstructor.GetRow(); - return args; + return rowConstructor.GetRow(); } // Return a new RowConstructor where the args either represent the From c8c55b4107c0fc0240fe7baa71f5cad18003edcb Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 8 Jan 2020 14:54:41 -0800 Subject: [PATCH 11/28] test content and modify RowCollector --- .../UdfTests/UdfComplexTypesTests.cs | 20 ++++++++----------- .../Microsoft.Spark/Sql/RowCollector.cs | 16 ++++++++++++++- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 462477c66..c7d76c0c9 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -166,19 +166,15 @@ public void TestUdfWithReturnAsRowType() Func udf = Udf( str => new GenericRow(new object[] { 1, "abc" }), schema); - // UDF with return as RowType throws a following exception: - // Unhandled Exception: System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation. - // --->System.NullReferenceException: Object reference not set to an instance of an object. - // at Microsoft.Spark.Sql.RowCollector.Collect(ISocketWrapper socket) + MoveNext() in Microsoft.Spark\Sql\RowCollector.cs:line 36 - // at Microsoft.Spark.Sql.DataFrame.GetRows(String funcName) + MoveNext() in Microsoft.Spark\Sql\DataFrame.cs:line 891 - // at Microsoft.Spark.Examples.Sql.Batch.Basic.Run(String[] args) in Microsoft.Spark.CSharp.Examples\Sql\Batch\Basic.cs:line 54 - // Assert.Throws( - // () => _df.Select(udf(_df["name"])).Collect().ToArray()); - - _df.Select(udf(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); - // Show() works here. See the example below. - // _df.Select(udf(_df["name"])).Show(); + foreach (Row row in rows) + { + Assert.Equal(2, row.Size()); + Assert.Equal(1, row[0]); + Assert.Equal("abc", row[1]); + } } } } diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index f48545ea6..cbf2a223f 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.Collections.Generic; using System.IO; using Microsoft.Spark.Interop.Ipc; @@ -33,7 +34,20 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { - yield return (unpickled as RowConstructor).GetRow(); + switch (unpickled) + { + case RowConstructor rc: + yield return rc.GetRow(); + break; + + case Row row: + yield return row; + break; + + default: + throw new NotSupportedException( + string.Format($"Unpickle type {unpickled.GetType()} is not supported")); + } } } } From ff531ab41e5f048ade22f1e08d4de566ce63ff8a Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 8 Jan 2020 15:18:25 -0800 Subject: [PATCH 12/28] resolve comments --- src/csharp/Microsoft.Spark/Sql/Functions.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Sql/Functions.cs b/src/csharp/Microsoft.Spark/Sql/Functions.cs index 79c523e16..d79531f58 100644 --- a/src/csharp/Microsoft.Spark/Sql/Functions.cs +++ b/src/csharp/Microsoft.Spark/Sql/Functions.cs @@ -3829,7 +3829,8 @@ public static Func Udf(Func udf, StructType re /// /// A delegate that returns a for the result of the UDF. /// - public static Func Udf(Func udf, StructType returnType) + public static Func Udf( + Func udf, StructType returnType) { return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply2; } @@ -4289,7 +4290,8 @@ private static UserDefinedFunction CreateUdf( execute, CommandSerDe.SerializedMode.Row, CommandSerDe.SerializedMode.Row), - evalType, returnType); + evalType, + returnType); } private static Column ApplyFunction(string funcName) From 7248cd20b9bafd686fcf904a6adb3e091eb8f82b Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 8 Jan 2020 15:31:52 -0800 Subject: [PATCH 13/28] InvalidCastException --- .../Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index c7d76c0c9..86cc6f728 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -40,7 +40,7 @@ public void TestUdfWithArrayType() // at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.SingleCommandRunner.Run(Int32 splitId, Object input) in Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 239 // at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 139 Func udf = Udf(array => string.Join(',', array)); - Assert.Throws(() => _df.Select(udf(_df["ids"])).Show()); + Assert.Throws(() => _df.Select(udf(_df["ids"])).Show()); // Currently, there is a workaround to support ArrayType using ArrayList. Func workingUdf = Udf( @@ -92,7 +92,7 @@ public void TestUdfWithMapType() dict => dict.Count.ToString()); DataFrame df = _df.WithColumn("NameIdsMap", Map(_df["name"], _df["ids"])); - Assert.Throws(() => df.Select(udf(df["NameIdsMap"])).Show()); + Assert.Throws(() => df.Select(udf(df["NameIdsMap"])).Show()); // Currently, there is a workaround to support MapType using Hashtable. Func workingUdf = Udf( From 5041fc09072a1119cd43925e8b1ac471d182d509 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 8 Jan 2020 15:51:44 -0800 Subject: [PATCH 14/28] revert --- .../Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 86cc6f728..c7d76c0c9 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -40,7 +40,7 @@ public void TestUdfWithArrayType() // at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.SingleCommandRunner.Run(Int32 splitId, Object input) in Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 239 // at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 139 Func udf = Udf(array => string.Join(',', array)); - Assert.Throws(() => _df.Select(udf(_df["ids"])).Show()); + Assert.Throws(() => _df.Select(udf(_df["ids"])).Show()); // Currently, there is a workaround to support ArrayType using ArrayList. Func workingUdf = Udf( @@ -92,7 +92,7 @@ public void TestUdfWithMapType() dict => dict.Count.ToString()); DataFrame df = _df.WithColumn("NameIdsMap", Map(_df["name"], _df["ids"])); - Assert.Throws(() => df.Select(udf(df["NameIdsMap"])).Show()); + Assert.Throws(() => df.Select(udf(df["NameIdsMap"])).Show()); // Currently, there is a workaround to support MapType using Hashtable. Func workingUdf = Udf( From 6423e69dc46eccc9431099e6438ca5c147cfde42 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Thu, 9 Jan 2020 15:20:26 -0800 Subject: [PATCH 15/28] change RowCollector --- src/csharp/Microsoft.Spark/Sql/RowCollector.cs | 13 +++++++++++-- src/csharp/Microsoft.Spark/Sql/RowConstructor.cs | 4 +++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index cbf2a223f..7b59d87cd 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -34,19 +34,28 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { + // yield return (unpickled as RowConstructor).GetRow(); switch (unpickled) { case RowConstructor rc: yield return rc.GetRow(); break; - case Row row: + case object[] objs: + if ((objs.Length != 1) || !(objs[0] is Row row)) + { + throw new NotSupportedException( + string.Format("Expected single Row in unpickled type {0}", + unpickled.GetType())); + } + yield return row; break; default: throw new NotSupportedException( - string.Format($"Unpickle type {unpickled.GetType()} is not supported")); + string.Format("Unpickle type {0} is not supported", + unpickled.GetType())); } } } diff --git a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs index af596bc77..c01746f95 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs @@ -69,7 +69,9 @@ public object construct(object[] args) // on the RowConstructor which represents the row. if ((args.Length == 1) && (args[0] is RowConstructor rowConstructor)) { - return rowConstructor.GetRow(); + // Construct the Row and return args containing the Row. + args[0] = rowConstructor.GetRow(); + return args; } // Return a new RowConstructor where the args either represent the From 95856669119a06d5581e8d338ded79e7239510e4 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 10 Jan 2020 14:53:31 -0800 Subject: [PATCH 16/28] remove GenericRow.cs --- src/csharp/Microsoft.Spark/Sql/GenericRow.cs | 103 ------------------- 1 file changed, 103 deletions(-) delete mode 100644 src/csharp/Microsoft.Spark/Sql/GenericRow.cs diff --git a/src/csharp/Microsoft.Spark/Sql/GenericRow.cs b/src/csharp/Microsoft.Spark/Sql/GenericRow.cs deleted file mode 100644 index 1e623d9a4..000000000 --- a/src/csharp/Microsoft.Spark/Sql/GenericRow.cs +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.Collections.Generic; -using System.Linq; - -namespace Microsoft.Spark.Sql -{ - /// - /// Represents a row object in RDD, equivalent to GenericRow in Spark. - /// - public sealed class GenericRow - { - /// - /// Constructor for the GenericRow class. - /// - /// Column values for a row - public GenericRow(object[] values) - { - Values = values; - } - - /// - /// Values representing this row. - /// - public object[] Values { get; } - - /// - /// Returns the number of columns in this row. - /// - /// Number of columns in this row - public int Size() => Values.Length; - - /// - /// Returns the column value at the given index. - /// - /// Index to look up - /// A column value - public object this[int index] => Get(index); - - /// - /// Returns the column value at the given index. - /// - /// Index to look up - /// A column value - public object Get(int index) - { - if (index >= Size()) - { - throw new IndexOutOfRangeException($"index ({index}) >= column counts ({Size()})"); - } - else if (index < 0) - { - throw new IndexOutOfRangeException($"index ({index}) < 0)"); - } - - return Values[index]; - } - - /// - /// Returns the string version of this row. - /// - /// String version of this row - public override string ToString() - { - var cols = new List(); - foreach (object item in Values) - { - cols.Add(item?.ToString() ?? string.Empty); - } - - return $"[{(string.Join(",", cols.ToArray()))}]"; - } - - /// - /// Returns the column value at the given index, as a type T. - /// TODO: If the original type is "long" and its value can be - /// fit into the "int", Pickler will serialize the value as int. - /// Since the value is boxed, will throw an exception. - /// - /// Type to convert to - /// Index to look up - /// A column value as a type T - public T GetAs(int index) => (T)Get(index); - - /// - /// Checks if the given object is same as the current object. - /// - /// Other object to compare against - /// True if the other object is equal. - public override bool Equals(object obj) => - ReferenceEquals(this, obj) || - ((obj is GenericRow row) && Values.SequenceEqual(row.Values)); - - /// - /// Returns the hash code of the current object. - /// - /// The hash code of the current object - public override int GetHashCode() => base.GetHashCode(); - } -} From cedefd7161cb7feb904dfaaf7dc671ed45a15828 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Tue, 21 Jan 2020 19:19:28 -0800 Subject: [PATCH 17/28] add pickling for GenericRow --- .../UdfTests/UdfComplexTypesTests.cs | 51 ++++++++++++++++--- .../Command/SqlCommandExecutor.cs | 24 ++++++++- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index c7d76c0c9..05274ea39 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -158,22 +158,57 @@ public void TestUdfWithRowType() [Fact] public void TestUdfWithReturnAsRowType() { - var schema = new StructType(new[] + // Single GenericRow + var schema1 = new StructType(new[] { new StructField("col1", new IntegerType()), new StructField("col2", new StringType()) }); - Func udf = Udf( - str => new GenericRow(new object[] { 1, "abc" }), schema); + Func udf1 = Udf( + str => new GenericRow(new object[] { 1, "abc" }), schema1); - Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); - Assert.Equal(3, rows.Length); + Row[] rows1 = _df.Select(udf1(_df["name"])).Collect().ToArray(); + Assert.Equal(3, rows1.Length); + + foreach (Row row in rows1) + { + Assert.Equal(2, row.Size()); + Assert.Equal(1, row.GetAs("col1")); + Assert.Equal("abc", row.GetAs("col2")); + } - foreach (Row row in rows) + // Nested GenericRow + var subSchema1 = new StructType(new[] + { + new StructField("subCol1", new IntegerType()) + }); + var subSchema2 = new StructType(new[] + { + new StructField("subCol2", new StringType()) + }); + var schema2 = new StructType(new[] + { + new StructField("col1", subSchema1), + new StructField("col2", subSchema2) + }); + Func udf2 = Udf( + str => new GenericRow( + new object[] + { + new GenericRow(new object[] { 1 }), + new GenericRow(new object[] { "abc" }) + }), schema2); + + Row[] rows2 = _df.Select(udf2(_df["name"])).Collect().ToArray(); + Assert.Equal(3, rows2.Length); + + foreach (Row row in rows2) { Assert.Equal(2, row.Size()); - Assert.Equal(1, row[0]); - Assert.Equal("abc", row[1]); + Assert.IsType(row.Get("col1")); + Assert.IsType(row.Get("col2")); + Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs("col1")); + Assert.Equal(new Row(new object[] { "abc" }, subSchema2), row.GetAs("col2")); } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 0921b43f7..424e6264b 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -165,13 +165,33 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; + // Cast GenericRow to object if needed. + var newRows = new List(); if (rows.FirstOrDefault() is GenericRow) { - rows = rows.Select(r => (object)(r as GenericRow).Values).AsEnumerable(); + for (int i = 0; i < rows.Count(); ++i) + { + object[] cols = (rows.ElementAt(i) as GenericRow).Values; + if (cols.FirstOrDefault() is GenericRow) + { + object[] newCols = new object[cols.Length]; + for (int j = 0; j < cols.Length; ++j) + { + newCols[j] = (cols[j] as GenericRow).Values; + } + newRows.Add(newCols); + } + else + { + newRows.Add(cols); + } + } } Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); - pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); + pickler.dumps( + newRows.Count() == 0 ? rows : newRows, + ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) { From 6925831daad290faeb728dda706479fbe165e504 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 22 Jan 2020 01:51:39 -0800 Subject: [PATCH 18/28] register customer pickler for GenericRow --- .../Command/SqlCommandExecutor.cs | 29 ++----------------- .../Microsoft.Spark/Sql/RowCollector.cs | 2 +- .../Microsoft.Spark/Utils/PythonSerDe.cs | 26 +++++++++++++++++ 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 424e6264b..1daf49436 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -165,33 +165,8 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; - // Cast GenericRow to object if needed. - var newRows = new List(); - if (rows.FirstOrDefault() is GenericRow) - { - for (int i = 0; i < rows.Count(); ++i) - { - object[] cols = (rows.ElementAt(i) as GenericRow).Values; - if (cols.FirstOrDefault() is GenericRow) - { - object[] newCols = new object[cols.Length]; - for (int j = 0; j < cols.Length; ++j) - { - newCols[j] = (cols[j] as GenericRow).Values; - } - newRows.Add(newCols); - } - else - { - newRows.Add(cols); - } - } - } - - Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); - pickler.dumps( - newRows.Count() == 0 ? rows : newRows, - ref s_outputBuffer, out int bytesWritten); + Pickler pickler = s_pickler ?? (s_pickler = PythonSerDe.CreatePickler()); + pickler.dumps(rows,ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) { diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index 7b59d87cd..0ada405de 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -34,13 +34,13 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { - // yield return (unpickled as RowConstructor).GetRow(); switch (unpickled) { case RowConstructor rc: yield return rc.GetRow(); break; + // Unpickled object contains single Row case object[] objs: if ((objs.Length != 1) || !(objs[0] is Row row)) { diff --git a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs index ffbf6ca16..8b43ceb1b 100644 --- a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs @@ -67,5 +67,31 @@ internal static object[] GetUnpickledObjects(Stream stream, int messageLength) ArrayPool.Shared.Return(buffer); } } + + /// + /// Custom pickler for GenericRow objects. + /// + internal class GenericRowPickler : IObjectPickler + { + public void pickle(object o, Stream outs, Pickler currentPickler) + { + GenericRow row = (GenericRow)o; + currentPickler.save(row.Values); + } + + public void Register() + { + Pickler.registerCustomPickler(typeof(GenericRow), new GenericRowPickler()); + } + } + + /// + /// Register custom pickler for GenericRow object and create pickler. + /// + internal static Pickler CreatePickler() + { + new GenericRowPickler().Register(); + return new Pickler(); + } } } From 13fcc33841f3080c23d71a61671f2e5fd8e51c94 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Wed, 22 Jan 2020 14:25:15 -0800 Subject: [PATCH 19/28] add space --- src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 1daf49436..f1b11903a 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -166,7 +166,7 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) s_outputBuffer = new byte[sizeHint]; Pickler pickler = s_pickler ?? (s_pickler = PythonSerDe.CreatePickler()); - pickler.dumps(rows,ref s_outputBuffer, out int bytesWritten); + pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) { From f38cb6056bd9c768e7d938fe39d3e32451636906 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Thu, 23 Jan 2020 11:24:06 -0800 Subject: [PATCH 20/28] reformatting --- .../UdfTests/UdfComplexTypesTests.cs | 91 ++++++++++--------- 1 file changed, 47 insertions(+), 44 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index 05274ea39..bb34e4f99 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -159,56 +159,59 @@ public void TestUdfWithRowType() public void TestUdfWithReturnAsRowType() { // Single GenericRow - var schema1 = new StructType(new[] { - new StructField("col1", new IntegerType()), - new StructField("col2", new StringType()) - }); - Func udf1 = Udf( - str => new GenericRow(new object[] { 1, "abc" }), schema1); + var schema = new StructType(new[] + { + new StructField("col1", new IntegerType()), + new StructField("col2", new StringType()) + }); + Func udf1 = Udf( + str => new GenericRow(new object[] { 1, "abc" }), schema); - Row[] rows1 = _df.Select(udf1(_df["name"])).Collect().ToArray(); - Assert.Equal(3, rows1.Length); + Row[] rows = _df.Select(udf1(_df["name"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); - foreach (Row row in rows1) - { - Assert.Equal(2, row.Size()); - Assert.Equal(1, row.GetAs("col1")); - Assert.Equal("abc", row.GetAs("col2")); + foreach (Row row in rows) + { + Assert.Equal(2, row.Size()); + Assert.Equal(1, row.GetAs("col1")); + Assert.Equal("abc", row.GetAs("col2")); + } } - // Nested GenericRow - var subSchema1 = new StructType(new[] - { - new StructField("subCol1", new IntegerType()) - }); - var subSchema2 = new StructType(new[] - { - new StructField("subCol2", new StringType()) - }); - var schema2 = new StructType(new[] - { - new StructField("col1", subSchema1), - new StructField("col2", subSchema2) - }); - Func udf2 = Udf( - str => new GenericRow( - new object[] - { - new GenericRow(new object[] { 1 }), - new GenericRow(new object[] { "abc" }) - }), schema2); - - Row[] rows2 = _df.Select(udf2(_df["name"])).Collect().ToArray(); - Assert.Equal(3, rows2.Length); - - foreach (Row row in rows2) { - Assert.Equal(2, row.Size()); - Assert.IsType(row.Get("col1")); - Assert.IsType(row.Get("col2")); - Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs("col1")); - Assert.Equal(new Row(new object[] { "abc" }, subSchema2), row.GetAs("col2")); + var subSchema1 = new StructType(new[] + { + new StructField("subCol1", new IntegerType()) + }); + var subSchema2 = new StructType(new[] + { + new StructField("subCol2", new StringType()) + }); + var schema = new StructType(new[] + { + new StructField("col1", subSchema1), + new StructField("col2", subSchema2) + }); + Func udf2 = Udf( + str => new GenericRow( + new object[] + { + new GenericRow(new object[] { 1 }), + new GenericRow(new object[] { "abc" }) + }), schema); + + Row[] rows = _df.Select(udf2(_df["name"])).Collect().ToArray(); + Assert.Equal(3, rows.Length); + + foreach (Row row in rows) + { + Assert.Equal(2, row.Size()); + Assert.IsType(row.Get("col1")); + Assert.IsType(row.Get("col2")); + Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs("col1")); + Assert.Equal(new Row(new object[] { "abc" }, subSchema2), row.GetAs("col2")); + } } } } From e7d9d1d7f4ee791b011684a3d5b5df9c256adec7 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Thu, 23 Jan 2020 11:39:31 -0800 Subject: [PATCH 21/28] update register for GenericRow --- .../Command/SqlCommandExecutor.cs | 2 +- src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs | 17 +++-------------- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index f1b11903a..82fcee8fe 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -165,7 +165,7 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; - Pickler pickler = s_pickler ?? (s_pickler = PythonSerDe.CreatePickler()); + Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) diff --git a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs index 8b43ceb1b..8fb156338 100644 --- a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs @@ -35,6 +35,9 @@ static PythonSerDe() s_rowConstructor = new RowConstructor(); Unpickler.registerConstructor( "pyspark.sql.types", "_create_row_inbound_converter", s_rowConstructor); + + // Register custom pickler for GenericRow object. + Pickler.registerCustomPickler(typeof(GenericRow), new GenericRowPickler()); } /// @@ -78,20 +81,6 @@ public void pickle(object o, Stream outs, Pickler currentPickler) GenericRow row = (GenericRow)o; currentPickler.save(row.Values); } - - public void Register() - { - Pickler.registerCustomPickler(typeof(GenericRow), new GenericRowPickler()); - } - } - - /// - /// Register custom pickler for GenericRow object and create pickler. - /// - internal static Pickler CreatePickler() - { - new GenericRowPickler().Register(); - return new Pickler(); } } } From ca15287ecb3689d1688c00bbb39e533d82d3989b Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 00:50:24 -0800 Subject: [PATCH 22/28] clean code --- .../UdfTests/UdfComplexTypesTests.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index bb34e4f99..dd0680208 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -165,10 +165,10 @@ public void TestUdfWithReturnAsRowType() new StructField("col1", new IntegerType()), new StructField("col2", new StringType()) }); - Func udf1 = Udf( + Func udf = Udf( str => new GenericRow(new object[] { 1, "abc" }), schema); - Row[] rows = _df.Select(udf1(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) @@ -193,7 +193,7 @@ public void TestUdfWithReturnAsRowType() new StructField("col1", subSchema1), new StructField("col2", subSchema2) }); - Func udf2 = Udf( + Func udf = Udf( str => new GenericRow( new object[] { @@ -201,7 +201,7 @@ public void TestUdfWithReturnAsRowType() new GenericRow(new object[] { "abc" }) }), schema); - Row[] rows = _df.Select(udf2(_df["name"])).Collect().ToArray(); + Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) @@ -210,7 +210,9 @@ public void TestUdfWithReturnAsRowType() Assert.IsType(row.Get("col1")); Assert.IsType(row.Get("col2")); Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs("col1")); - Assert.Equal(new Row(new object[] { "abc" }, subSchema2), row.GetAs("col2")); + Assert.Equal( + new Row(new object[] { "abc" }, subSchema2), + row.GetAs("col2")); } } } From 9d6051848dbfca82e79451587836d49fbf5fb6f3 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 00:59:08 -0800 Subject: [PATCH 23/28] create new GenericRowPickler.cs --- src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs index 8fb156338..aed075963 100644 --- a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs @@ -70,17 +70,5 @@ internal static object[] GetUnpickledObjects(Stream stream, int messageLength) ArrayPool.Shared.Return(buffer); } } - - /// - /// Custom pickler for GenericRow objects. - /// - internal class GenericRowPickler : IObjectPickler - { - public void pickle(object o, Stream outs, Pickler currentPickler) - { - GenericRow row = (GenericRow)o; - currentPickler.save(row.Values); - } - } } } From 6da4243e12fef30da945c1b24dbbe1b03f664ead Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 01:56:41 -0800 Subject: [PATCH 24/28] add GenericRowPickler.cs --- .../Microsoft.Spark/Sql/GenericRowPickler.cs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs diff --git a/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs b/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs new file mode 100644 index 000000000..92deb7fb0 --- /dev/null +++ b/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs @@ -0,0 +1,21 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.IO; +using Razorvine.Pickle; + +namespace Microsoft.Spark.Sql +{ + /// + /// Custom pickler for GenericRow objects. + /// + internal class GenericRowPickler : IObjectPickler + { + public void pickle(object o, Stream outs, Pickler currentPickler) + { + GenericRow row = (GenericRow)o; + currentPickler.save(row.Values); + } + } +} From 7a725ba4c3cf58a0f662e99f85b3c3ef6b009238 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 01:59:39 -0800 Subject: [PATCH 25/28] clean up --- .../Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index dd0680208..e7f9d825d 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -207,8 +207,6 @@ public void TestUdfWithReturnAsRowType() foreach (Row row in rows) { Assert.Equal(2, row.Size()); - Assert.IsType(row.Get("col1")); - Assert.IsType(row.Get("col2")); Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs("col1")); Assert.Equal( new Row(new object[] { "abc" }, subSchema2), From 6a3cf26de23795e91d98d6d8a292a3fa97cdcf9c Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 11:50:05 -0800 Subject: [PATCH 26/28] clean up --- src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs b/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs index 92deb7fb0..bd78df402 100644 --- a/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs +++ b/src/csharp/Microsoft.Spark/Sql/GenericRowPickler.cs @@ -14,8 +14,7 @@ internal class GenericRowPickler : IObjectPickler { public void pickle(object o, Stream outs, Pickler currentPickler) { - GenericRow row = (GenericRow)o; - currentPickler.save(row.Values); + currentPickler.save(((GenericRow)o).Values); } } } From 0740dd1255a7cfde7590653128a110c3d2b85db6 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 14:31:00 -0800 Subject: [PATCH 27/28] address comments --- .../UdfTests/UdfComplexTypesTests.cs | 36 +++++++++++++------ .../Microsoft.Spark/Sql/RowCollector.cs | 13 +++---- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index e7f9d825d..d27d4fc7b 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -48,7 +48,7 @@ public void TestUdfWithArrayType() Row[] rows = _df.Select(workingUdf(_df["ids"])).Collect().ToArray(); Assert.Equal(3, rows.Length); - + var expected = new[] { "1", "3,5", "2,4" }; string[] rowsToArray = rows.Select(x => x[0].ToString()).ToArray(); Assert.Equal(expected, rowsToArray); @@ -178,39 +178,55 @@ public void TestUdfWithReturnAsRowType() Assert.Equal("abc", row.GetAs("col2")); } } + // Nested GenericRow { var subSchema1 = new StructType(new[] { - new StructField("subCol1", new IntegerType()) + new StructField("col1", new IntegerType()), }); var subSchema2 = new StructType(new[] { - new StructField("subCol2", new StringType()) + new StructField("col1", new StringType()), + new StructField("col2", subSchema1), }); var schema = new StructType(new[] { - new StructField("col1", subSchema1), - new StructField("col2", subSchema2) + new StructField("col1", new IntegerType()), + new StructField("col2", subSchema1), + new StructField("col3", subSchema2) }); + Func udf = Udf( str => new GenericRow( new object[] { + 1, new GenericRow(new object[] { 1 }), - new GenericRow(new object[] { "abc" }) - }), schema); + new GenericRow(new object[] + { + "abc", + new GenericRow(new object[] { 10 }) + }) + }), + schema); Row[] rows = _df.Select(udf(_df["name"])).Collect().ToArray(); Assert.Equal(3, rows.Length); foreach (Row row in rows) { - Assert.Equal(2, row.Size()); - Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs("col1")); + Assert.Equal(3, row.Size()); + Assert.Equal(1, row.GetAs("col1")); Assert.Equal( - new Row(new object[] { "abc" }, subSchema2), + new Row(new object[] { 1 }, subSchema1), row.GetAs("col2")); + Row col3 = row.GetAs("col3"); + Assert.Equal( + new Row( + new object[] { "abc", new Row(new object[] { 10 }, subSchema1) }, + subSchema2), + row.GetAs("col3")); } } } diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index 0ada405de..df3d23ed9 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -34,21 +34,16 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { + // Unpickled object can be either a RowConstructor object (not materialized), or + // a Row object (materialized). Refer to RowConstruct.construct() to see how + // Row objects are unpickled. switch (unpickled) { case RowConstructor rc: yield return rc.GetRow(); break; - // Unpickled object contains single Row - case object[] objs: - if ((objs.Length != 1) || !(objs[0] is Row row)) - { - throw new NotSupportedException( - string.Format("Expected single Row in unpickled type {0}", - unpickled.GetType())); - } - + case object[] objs when objs.Length == 1 && (objs[0] is Row row): yield return row; break; From eead2c1abfbce256409e711da642f072eb4c9275 Mon Sep 17 00:00:00 2001 From: Elva Liu Date: Fri, 24 Jan 2020 15:14:18 -0800 Subject: [PATCH 28/28] clean up --- .../Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs | 1 - src/csharp/Microsoft.Spark/Sql/RowCollector.cs | 4 ++-- src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs index d27d4fc7b..57eff0019 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs @@ -221,7 +221,6 @@ public void TestUdfWithReturnAsRowType() Assert.Equal( new Row(new object[] { 1 }, subSchema1), row.GetAs("col2")); - Row col3 = row.GetAs("col3"); Assert.Equal( new Row( new object[] { "abc", new Row(new object[] { 10 }, subSchema1) }, diff --git a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs index df3d23ed9..c66687219 100644 --- a/src/csharp/Microsoft.Spark/Sql/RowCollector.cs +++ b/src/csharp/Microsoft.Spark/Sql/RowCollector.cs @@ -34,8 +34,8 @@ public IEnumerable Collect(ISocketWrapper socket) foreach (object unpickled in unpickledObjects) { - // Unpickled object can be either a RowConstructor object (not materialized), or - // a Row object (materialized). Refer to RowConstruct.construct() to see how + // Unpickled object can be either a RowConstructor object (not materialized), + // or a Row object (materialized). Refer to RowConstruct.construct() to see how // Row objects are unpickled. switch (unpickled) { diff --git a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs index aed075963..0a674f668 100644 --- a/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs @@ -36,7 +36,7 @@ static PythonSerDe() Unpickler.registerConstructor( "pyspark.sql.types", "_create_row_inbound_converter", s_rowConstructor); - // Register custom pickler for GenericRow object. + // Register custom pickler for GenericRow objects. Pickler.registerCustomPickler(typeof(GenericRow), new GenericRowPickler()); }