Skip to content

Support UDF that returns RowType #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Jan 25, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
764ceab
init
elvaliuliuliu Dec 19, 2019
cfe4e34
Merge branch 'master' into elva/udfReturnRowType
elvaliuliuliu Dec 23, 2019
47fdb37
add tests
elvaliuliuliu Dec 24, 2019
8607af0
Merge branch 'elva/udfReturnRowType' of https://github.com/elvaliuliu…
elvaliuliuliu Dec 24, 2019
4c4349d
Merge branch 'master' into elva/udfReturnRowType
elvaliuliuliu Dec 30, 2019
7c965ee
change pickling method
elvaliuliuliu Jan 3, 2020
f0f1858
add null check
elvaliuliuliu Jan 4, 2020
c441ee2
Merge branch 'master' into elva/udfReturnRowType
elvaliuliuliu Jan 6, 2020
c729a2c
add separate createudf fun
elvaliuliuliu Jan 7, 2020
cf2f127
Merge branch 'elva/udfReturnRowType' of https://github.com/elvaliuliu…
elvaliuliuliu Jan 7, 2020
c2a426f
change test
elvaliuliuliu Jan 7, 2020
28f438a
add comments
elvaliuliuliu Jan 7, 2020
dff5b44
change CreateUdf
elvaliuliuliu Jan 8, 2020
68f8cc5
edit test
elvaliuliuliu Jan 8, 2020
c315231
change RowConstructor
elvaliuliuliu Jan 8, 2020
c8c55b4
test content and modify RowCollector
elvaliuliuliu Jan 8, 2020
ff531ab
resolve comments
elvaliuliuliu Jan 8, 2020
7248cd2
InvalidCastException
elvaliuliuliu Jan 8, 2020
5041fc0
revert
elvaliuliuliu Jan 8, 2020
6423e69
change RowCollector
elvaliuliuliu Jan 9, 2020
9585666
remove GenericRow.cs
elvaliuliuliu Jan 10, 2020
fee85b8
Merge branch 'master' into elva/udfReturnRowType
elvaliuliuliu Jan 10, 2020
cedefd7
add pickling for GenericRow
elvaliuliuliu Jan 22, 2020
0a61732
Merge branch 'master' into elva/udfReturnRowType
elvaliuliuliu Jan 22, 2020
630408a
Merge branch 'elva/udfReturnRowType' of https://github.com/elvaliuliu…
elvaliuliuliu Jan 22, 2020
6925831
register customer pickler for GenericRow
elvaliuliuliu Jan 22, 2020
13fcc33
add space
elvaliuliuliu Jan 22, 2020
f38cb60
reformatting
elvaliuliuliu Jan 23, 2020
e7d9d1d
update register for GenericRow
elvaliuliuliu Jan 23, 2020
ca15287
clean code
elvaliuliuliu Jan 24, 2020
9d60518
create new GenericRowPickler.cs
elvaliuliuliu Jan 24, 2020
5b750c7
Merge branch 'master' into elva/udfReturnRowType
elvaliuliuliu Jan 24, 2020
6da4243
add GenericRowPickler.cs
elvaliuliuliu Jan 24, 2020
b621154
Merge branch 'elva/udfReturnRowType' of https://github.com/elvaliuliu…
elvaliuliuliu Jan 24, 2020
7a725ba
clean up
elvaliuliuliu Jan 24, 2020
6a3cf26
clean up
elvaliuliuliu Jan 24, 2020
0740dd1
address comments
elvaliuliuliu Jan 24, 2020
eead2c1
clean up
elvaliuliuliu Jan 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfComplexTypesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,58 @@ public void TestUdfWithRowType()
[Fact]
public void TestUdfWithReturnAsRowType()
{
// UDF with return as RowType throws a following exception:
// Unhandled Exception: System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation.
// --->System.ArgumentException: System.Object is not supported.
// at Microsoft.Spark.Utils.UdfUtils.GetReturnType(Type type) in Microsoft.Spark\Utils\UdfUtils.cs:line 142
// at Microsoft.Spark.Utils.UdfUtils.GetReturnType(Type type) in Microsoft.Spark\Utils\UdfUtils.cs:line 136
// at Microsoft.Spark.Sql.Functions.CreateUdf[TResult](String name, Delegate execute, PythonEvalType evalType) in Microsoft.Spark\Sql\Functions.cs:line 4053
// at Microsoft.Spark.Sql.Functions.CreateUdf[TResult](String name, Delegate execute) in Microsoft.Spark\Sql\Functions.cs:line 4040
// at Microsoft.Spark.Sql.Functions.Udf[T, TResult](Func`2 udf) in Microsoft.Spark\Sql\Functions.cs:line 3607
Assert.Throws<ArgumentException>(() => Udf<string, Row>(
(str) =>
{
var structFields = new List<StructField>()
// Single GenericRow
var schema1 = new StructType(new[]
{
new StructField("col1", new IntegerType()),
new StructField("col2", new StringType())
});
Func<Column, Column> udf1 = Udf<string>(
str => new GenericRow(new object[] { 1, "abc" }), schema1);

Row[] rows1 = _df.Select(udf1(_df["name"])).Collect().ToArray();
Assert.Equal(3, rows1.Length);

foreach (Row row in rows1)
{
Assert.Equal(2, row.Size());
Assert.Equal(1, row.GetAs<int>("col1"));
Assert.Equal("abc", row.GetAs<string>("col2"));
}

// Nested GenericRow
var subSchema1 = new StructType(new[]
{
new StructField("subCol1", new IntegerType())
});
var subSchema2 = new StructType(new[]
{
new StructField("subCol2", new StringType())
});
var schema2 = new StructType(new[]
{
new StructField("col1", subSchema1),
new StructField("col2", subSchema2)
});
Func<Column, Column> udf2 = Udf<string>(
str => new GenericRow(
new object[]
{
new StructField("name", new StringType()),
};
var schema = new StructType(structFields);
var row = new Row(new object[] { str }, schema);
return row;
}));
new GenericRow(new object[] { 1 }),
new GenericRow(new object[] { "abc" })
}), schema2);

Row[] rows2 = _df.Select(udf2(_df["name"])).Collect().ToArray();
Assert.Equal(3, rows2.Length);

foreach (Row row in rows2)
{
Assert.Equal(2, row.Size());
Assert.IsType<Row>(row.Get("col1"));
Assert.IsType<Row>(row.Get("col2"));
Assert.Equal(new Row(new object[] { 1 }, subSchema1), row.GetAs<Row>("col1"));
Assert.Equal(new Row(new object[] { "abc" }, subSchema2), row.GetAs<Row>("col2"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private void WriteOutput(Stream stream, IEnumerable<object> rows, int sizeHint)
if (s_outputBuffer == null)
s_outputBuffer = new byte[sizeHint];

Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false));
Pickler pickler = s_pickler ?? (s_pickler = PythonSerDe.CreatePickler());
pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten);

if (bytesWritten <= 0)
Expand Down
207 changes: 205 additions & 2 deletions src/csharp/Microsoft.Spark/Sql/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark.Utils;

namespace Microsoft.Spark.Sql
Expand Down Expand Up @@ -3797,6 +3798,189 @@ public static Func<Column, Column, Column, Column, Column, Column, Column, Colum
return CreateUdf<TResult>(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf)).Apply10;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column> Udf(Func<GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply0;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T">Specifies the type of the first argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column> Udf<T>(Func<T, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply1;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column> Udf<T1, T2>(
Func<T1, T2, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply2;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column> Udf<T1, T2, T3>(
Func<T1, T2, T3, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply3;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4>(
Func<T1, T2, T3, T4, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply4;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <typeparam name="T5">Specifies the type of the fifth argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4, T5>(
Func<T1, T2, T3, T4, T5, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply5;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <typeparam name="T5">Specifies the type of the fifth argument to the UDF.</typeparam>
/// <typeparam name="T6">Specifies the type of the sixth argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4, T5, T6>(
Func<T1, T2, T3, T4, T5, T6, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply6;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <typeparam name="T5">Specifies the type of the fifth argument to the UDF.</typeparam>
/// <typeparam name="T6">Specifies the type of the sixth argument to the UDF.</typeparam>
/// <typeparam name="T7">Specifies the type of the seventh argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4, T5, T6, T7>(
Func<T1, T2, T3, T4, T5, T6, T7, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply7;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <typeparam name="T5">Specifies the type of the fifth argument to the UDF.</typeparam>
/// <typeparam name="T6">Specifies the type of the sixth argument to the UDF.</typeparam>
/// <typeparam name="T7">Specifies the type of the seventh argument to the UDF.</typeparam>
/// <typeparam name="T8">Specifies the type of the eighth argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column, Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4, T5, T6, T7, T8>(
Func<T1, T2, T3, T4, T5, T6, T7, T8, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply8;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <typeparam name="T5">Specifies the type of the fifth argument to the UDF.</typeparam>
/// <typeparam name="T6">Specifies the type of the sixth argument to the UDF.</typeparam>
/// <typeparam name="T7">Specifies the type of the seventh argument to the UDF.</typeparam>
/// <typeparam name="T8">Specifies the type of the eighth argument to the UDF.</typeparam>
/// <typeparam name="T9">Specifies the type of the ninth argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>A delegate that when invoked will return a <see cref="Column"/> for the result of the UDF.</returns>
public static Func<Column, Column, Column, Column, Column, Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4, T5, T6, T7, T8, T9>(
Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply9;
}

/// <summary>Creates a UDF from the specified delegate.</summary>
/// <typeparam name="T1">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument to the UDF.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument to the UDF.</typeparam>
/// <typeparam name="T4">Specifies the type of the fourth argument to the UDF.</typeparam>
/// <typeparam name="T5">Specifies the type of the fifth argument to the UDF.</typeparam>
/// <typeparam name="T6">Specifies the type of the sixth argument to the UDF.</typeparam>
/// <typeparam name="T7">Specifies the type of the seventh argument to the UDF.</typeparam>
/// <typeparam name="T8">Specifies the type of the eighth argument to the UDF.</typeparam>
/// <typeparam name="T9">Specifies the type of the ninth argument to the UDF.</typeparam>
/// <typeparam name="T10">Specifies the type of the tenth argument to the UDF.</typeparam>
/// <param name="udf">The UDF function implementation.</param>
/// <param name="returnType">Schema associated with this row</param>
/// <returns>
/// A delegate that returns a <see cref="Column"/> for the result of the UDF.
/// </returns>
public static Func<Column, Column, Column, Column, Column, Column, Column, Column, Column, Column, Column> Udf<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(
Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, GenericRow> udf, StructType returnType)
{
return CreateUdf(udf.Method.ToString(), UdfUtils.CreateUdfWrapper(udf), returnType).Apply10;
}

/// <summary>Creates a Vector UDF from the specified delegate.</summary>
/// <typeparam name="T">Specifies the type of the first argument to the UDF.</typeparam>
/// <typeparam name="TResult">Specifies the return type of the UDF.</typeparam>
Expand Down Expand Up @@ -4071,6 +4255,11 @@ private static UserDefinedFunction CreateUdf<TResult>(string name, Delegate exec
return CreateUdf<TResult>(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF);
}

private static UserDefinedFunction CreateUdf(string name, Delegate execute, StructType returnType)
{
return CreateUdf(name, execute, UdfUtils.PythonEvalType.SQL_BATCHED_UDF, returnType);
}

private static UserDefinedFunction CreateVectorUdf<TResult>(string name, Delegate execute)
{
return CreateUdf<TResult>(name, execute, UdfUtils.PythonEvalType.SQL_SCALAR_PANDAS_UDF);
Expand All @@ -4079,7 +4268,21 @@ private static UserDefinedFunction CreateVectorUdf<TResult>(string name, Delegat
private static UserDefinedFunction CreateUdf<TResult>(
string name,
Delegate execute,
UdfUtils.PythonEvalType evalType)
UdfUtils.PythonEvalType evalType) =>
CreateUdf(name, execute, evalType, UdfUtils.GetReturnType(typeof(TResult)));

private static UserDefinedFunction CreateUdf(
string name,
Delegate execute,
UdfUtils.PythonEvalType evalType,
StructType returnType) =>
CreateUdf(name, execute, evalType, returnType.Json);

private static UserDefinedFunction CreateUdf(
string name,
Delegate execute,
UdfUtils.PythonEvalType evalType,
string returnType)
{
return UserDefinedFunction.Create(
name,
Expand All @@ -4088,7 +4291,7 @@ private static UserDefinedFunction CreateUdf<TResult>(
CommandSerDe.SerializedMode.Row,
CommandSerDe.SerializedMode.Row),
evalType,
UdfUtils.GetReturnType(typeof(TResult)));
returnType);
}

private static Column ApplyFunction(string funcName)
Expand Down
Loading