Skip to content

Support for Row type Ser/De and exposing the CreateDataFrame API #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
044a01e
Initial commit for Row Ser/De and exposing CreateDataFrame
Niharikadutta Nov 15, 2019
b1d8dfc
Reverting changes to Basic.cs
Niharikadutta Nov 15, 2019
5c7dd03
Merge branch 'master' into nidutta/RowTypeSerDeSupport
Niharikadutta Nov 15, 2019
b6b82be
Added example in Basic.cs about how to use CreateDataFrame
Niharikadutta Nov 15, 2019
ae31dd2
Removing unneccessary comments
Niharikadutta Nov 15, 2019
9546229
Removing whitespace
Niharikadutta Nov 15, 2019
ca094d7
Added GenericRow support and some tests, removed comments and added C…
Niharikadutta Nov 16, 2019
3c58aa7
Exposing GenericRow tests and cleaned up comments.
Niharikadutta Dec 10, 2019
74ff143
Made GenericRow as internal
Niharikadutta Dec 10, 2019
236ea30
PR review comments removing whitespaces and commented out functions
Niharikadutta Dec 10, 2019
520528f
exposed CreateDataFrame API that does not take in a schema but infers…
Niharikadutta Dec 11, 2019
1ca7a6c
PR review comments - 2
Niharikadutta Dec 11, 2019
25deb3d
Update src/csharp/Microsoft.Spark/Sql/GenericRow.cs
Niharikadutta Dec 11, 2019
ac8a27f
PR review comment changes - 3
Niharikadutta Dec 11, 2019
fedff8d
merging latest from repo
Niharikadutta Dec 11, 2019
bac8899
Removed commented code
Niharikadutta Dec 11, 2019
b754147
Using composition in Row to avoid duplication with GenericRow
Niharikadutta Dec 12, 2019
3c3e677
Changed Row Ser/De to utilize existing framework and added jvm create…
Niharikadutta Dec 13, 2019
499fc47
Merge branch 'master' into nidutta/RowTypeSerDeSupport
Niharikadutta Dec 13, 2019
66f26c9
Fixed indentation and added E2E tests for CreateDataFrame
Niharikadutta Dec 14, 2019
8943ca8
PR review changes
Niharikadutta Dec 17, 2019
67f1375
Added logic to test properties of created DataFrame in test
Niharikadutta Dec 18, 2019
bb011d7
PR review changes
Niharikadutta Dec 18, 2019
f3eec4e
Sorted usings
Niharikadutta Dec 18, 2019
56dd332
Update src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTe…
Niharikadutta Dec 20, 2019
aa4913c
Update src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTe…
Niharikadutta Dec 20, 2019
68145a0
Update src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTe…
Niharikadutta Dec 20, 2019
a940b9a
Update src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTe…
Niharikadutta Dec 20, 2019
e1a46fc
PR review changes
Niharikadutta Dec 21, 2019
6bf02d8
Update src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Niharikadutta Dec 26, 2019
bc8029a
Update src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Niharikadutta Dec 26, 2019
7c5bce6
Update src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Niharikadutta Dec 26, 2019
fcc0a13
Update src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Niharikadutta Dec 26, 2019
fae4b2f
PR review changes - exposed CreateDataFrame for string and int withou…
Niharikadutta Jan 2, 2020
6a4bd4d
Made changes as per PR review comments
Niharikadutta Jan 5, 2020
4f5bfff
PR review comments changes
Niharikadutta Jan 8, 2020
09bdf4d
PR review comment changes
Niharikadutta Jan 9, 2020
149044a
PR review comments changes
Niharikadutta Jan 9, 2020
e281692
PR review changes
Niharikadutta Jan 10, 2020
ea84838
PR review changes
Niharikadutta Jan 10, 2020
defe8da
Merge branch 'master' into nidutta/RowTypeSerDeSupport
Niharikadutta Jan 10, 2020
c12a576
update
Niharikadutta Jan 10, 2020
7a45963
Merge branch 'nidutta/RowTypeSerDeSupport' of github.com:Niharikadutt…
Niharikadutta Jan 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Basic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
Expand Down Expand Up @@ -30,9 +31,9 @@ public void Run(string[] args)
.Config("spark.some.config.option", "some-value")
.GetOrCreate();

// Need to explicitly specify the schema since pickling vs. arrow formatting
// will return different types. Pickling will turn longs into ints if the values fit.
// Same as the "age INT, name STRING" DDL-format string.
// Need to explicitly specify the schema since pickling vs.arrow formatting
// will return different types.Pickling will turn longs into ints if the values fit.
// Same as the "age INT, name STRING" DDL - format string.
var inputSchema = new StructType(new[]
{
new StructField("age", new IntegerType()),
Expand Down Expand Up @@ -107,7 +108,7 @@ public void Run(string[] args)
joinedDf2.Show();

DataFrame joinedDf3 = df.Join(df, df["name"] == df["name"], "outer");
joinedDf3.Show();
joinedDf3.Show();

spark.Stop();
}
Expand Down
44 changes: 44 additions & 0 deletions src/csharp/Microsoft.Spark.UnitTest/Sql/GenericRowTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark.UnitTest.TestUtils;
using Microsoft.Spark.Utils;
using Moq;
using Razorvine.Pickle;
using Xunit;

namespace Microsoft.Spark.UnitTest
{
public class GenericRowTests
{
[Fact]
public void GenericRowTest()
{
var row = new GenericRow(new object[] { 1, "abc" });

// Validate Size().
Assert.Equal(2, row.Size());

// Validate [] operator.
Assert.Equal(1, row[0]);
Assert.Equal("abc", row[1]);

// Validate Get*(int).
Assert.Equal(1, row.Get(0));
Assert.Equal("abc", row.Get(1));
Assert.Equal(1, row.GetAs<int>(0));
Assert.ThrowsAny<Exception>(() => row.GetAs<string>(0));
Assert.Equal("abc", row.GetAs<string>(1));
Assert.ThrowsAny<Exception>(() => row.GetAs<int>(1));
}
}
}
16 changes: 16 additions & 0 deletions src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Microsoft.Spark.Sql;

namespace Microsoft.Spark.Interop.Ipc
{
Expand All @@ -25,6 +26,7 @@ internal class PayloadHelper
private static readonly byte[] s_byteArrayTypeId = new[] { (byte)'r' };
private static readonly byte[] s_intArrayTypeId = new[] { (byte)'l' };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this is called s_intArrayTypeId. Seems to make more sense if it was renamed to s_arrayTypeId

private static readonly byte[] s_dictionaryTypeId = new[] { (byte)'e' };
private static readonly byte[] s_rowArrTypeId = new[] { (byte)'R' };

private static readonly ConcurrentDictionary<Type, bool> s_isDictionaryTable =
new ConcurrentDictionary<Type, bool>();
Expand Down Expand Up @@ -231,6 +233,15 @@ internal static void ConvertArgsToBytes(
SerDe.Write(destination, argProvider.Reference.Id);
break;

case IEnumerable<GenericRow> argRowArray:
SerDe.Write(destination, (int)argRowArray.Count());
foreach (GenericRow r in argRowArray)
{
SerDe.Write(destination, (int)r.Values.Length);
ConvertArgsToBytes(destination, r.Values, true);
}
break;

default:
throw new NotSupportedException(
string.Format($"Type {arg.GetType()} is not supported"));
Expand Down Expand Up @@ -283,6 +294,11 @@ internal static byte[] GetTypeId(Type type)
{
return s_intArrayTypeId;
}

if (type == typeof(IEnumerable<GenericRow>))
{
return s_rowArrTypeId;
}
break;
}

Expand Down
168 changes: 168 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/GenericRow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.Sql.Types;

namespace Microsoft.Spark.Sql
{
/// <summary>
/// Represents a row object in RDD, equivalent to GenericRow in Spark.
/// </summary>
public sealed class GenericRow
{
/// <summary>
/// Constructor for the Row class.
/// </summary>
/// <param name="values">Column values for a row</param>
internal GenericRow(object[] values)
{
Values = values;
//TODO:
//Convert() -> implement type checking for not implemented exception
}

/// <summary>
/// Values representing this row.
/// </summary>
public object[] Values { get; }

/// <summary>
/// Returns the number of columns in this row.
/// </summary>
/// <returns>Number of columns in this row</returns>
public int Size() => Values.Length;

/// <summary>
/// Returns the column value at the given index.
/// </summary>
/// <param name="index">Index to look up</param>
/// <returns>A column value</returns>
public object this[int index] => Get(index);

/// <summary>
/// Returns the column value at the given index.
/// </summary>
/// <param name="index">Index to look up</param>
/// <returns>A column value</returns>
public object Get(int index)
{
if (index >= Size())
{
throw new IndexOutOfRangeException($"index ({index}) >= column counts ({Size()})");
}
else if (index < 0)
{
throw new IndexOutOfRangeException($"index ({index}) < 0)");
}

return Values[index];
}

///// <summary>
///// Returns the column value whose column name is given.
///// </summary>
///// <param name="columnName">Column name to look up</param>
///// <returns>A column value</returns>
//public object Get(string columnName) =>
// Get(Schema.Fields.FindIndex(f => f.Name == columnName));

/// <summary>
/// Returns the string version of this row.
/// </summary>
/// <returns>String version of this row</returns>
public override string ToString()
{
var cols = new List<string>();
foreach (object item in Values)
{
cols.Add(item?.ToString() ?? string.Empty);
}

return $"[{(string.Join(",", cols.ToArray()))}]";
}

/// <summary>
/// Returns the column value at the given index, as a type T.
/// TODO: If the original type is "long" and its value can be
/// fit into the "int", Pickler will serialize the value as int.
/// Since the value is boxed, <see cref="GetAs{T}(int)"/> will throw an exception.
/// </summary>
/// <typeparam name="T">Type to convert to</typeparam>
/// <param name="index">Index to look up</param>
/// <returns>A column value as a type T</returns>
public T GetAs<T>(int index) => (T)Get(index);

///// <summary>
///// Returns the column value whose column name is given, as a type T.
///// TODO: If the original type is "long" and its value can be
///// fit into the "int", Pickler will serialize the value as int.
///// Since the value is boxed, <see cref="GetAs{T}(string)"/> will throw an exception.
///// </summary>
///// <typeparam name="T">Type to convert to</typeparam>
///// <param name="columnName">Column name to look up</param>
///// <returns>A column value as a type T</returns>
//public T GetAs<T>(string columnName) => (T)Get(columnName);

/// <summary>
/// Checks if the given object is same as the current object.
/// </summary>
/// <param name="obj">Other object to compare against</param>
/// <returns>True if the other object is equal.</returns>
public override bool Equals(object obj)
{
if (obj is null)
{
return false;
}

if (ReferenceEquals(this, obj))
{
return true;
}

if (obj is Row otherRow)
{
return Values.SequenceEqual(otherRow.Values);
}

return false;
}

/// <summary>
/// Returns the hash code of the current object.
/// </summary>
/// <returns>The hash code of the current object</returns>
public override int GetHashCode() => base.GetHashCode();

//TODO:
///// <summary>
///// Converts the values to .NET values. Currently, only the simple types such as
///// int, string, etc. are supported (which are already converted correctly by
///// the Pickler). Note that explicit type checks against the schema are not performed.
///// </summary>
//private void Convert()
//{
// foreach (object val in Values)
// {
// TypeCode valType = Type.GetTypeCode(val.GetType());
// if (valType == TypeCode.Object)
// {
// switch (valType)
// {
// case object[]:
// SerDe.Write(destination, (int)arg);
// break;

// case TypeCode.Int64:
// SerDe.Write(destination, (long)arg);
// break;
// }
// }
// }
//}
}
}
1 change: 0 additions & 1 deletion src/csharp/Microsoft.Spark/Sql/Row.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ internal Row(object[] values, StructType schema)
{
Values = values;
Schema = schema;

var schemaColumnCount = Schema.Fields.Count;
if (Size() != schemaColumnCount)
{
Expand Down
12 changes: 12 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices.ComTypes;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Internal.Scala;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;

namespace Microsoft.Spark.Sql
{
Expand Down Expand Up @@ -136,6 +139,15 @@ public SparkSession NewSession() =>
public DataFrame Table(string tableName) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("table", tableName));

/// <summary>
/// Returns a dataframe as per the schema and data.
/// </summary>
/// <param name="data">List of Row objects</param>
/// <param name="schema">Schema as StructType</param>
/// <returns>DataFrame object</returns>
public DataFrame CreateDataFrame(IEnumerable<GenericRow> data, StructType schema) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("createDataFrame", data, DataType.FromJson(_jvmObject.Jvm, schema.Json)));

/// <summary>
/// Executes a SQL query using Spark, returning the result as a DataFrame.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ package org.apache.spark.api.dotnet
import java.io.{DataInputStream, DataOutputStream}
import java.sql.{Date, Time, Timestamp}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

/**
* Functions to serialize and deserialize between CLR & JVM.
Expand Down Expand Up @@ -39,6 +44,7 @@ object SerDe {
case 'D' => readDate(dis)
case 't' => readTime(dis)
case 'j' => JVMObjectTracker.getObject(readString(dis))
case 'R' => readRowArr(dis)
case _ => throw new IllegalArgumentException(s"Invalid type $dataType")
}
}
Expand Down Expand Up @@ -90,6 +96,16 @@ object SerDe {
t
}

def readRow(in: DataInputStream): Row = {
val rowLen = readInt(in)
var rowValues: ListBuffer[Any] = ListBuffer()
for ( j <- 0 until rowLen) {
val elemType = readObjectType(in)
rowValues += readTypedObject(in, elemType)
}
Row.fromSeq(rowValues.toList)
}

def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
val len = readInt(in)
(0 until len).map(_ => readBytes(in)).toArray
Expand Down Expand Up @@ -120,6 +136,15 @@ object SerDe {
(0 until len).map(_ => readString(in)).toArray
}

def readRowArr(in: DataInputStream): java.util.List[Row] = {
val arrLen = readInt(in)
val arr = new Array[Row](arrLen)
for ( i <- 0 until arrLen) {
arr(i) = readRow(in)
}
ListBuffer(arr: _*)
}

def readList(dis: DataInputStream): Array[_] = {
val arrType = readObjectType(dis)
arrType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class DotnetBackendHandler(server: DotnetBackend)
return false
}

for (i <- 0 to numArgs - 1) {
for (i <- 0 until numArgs) {
val parameterType = parameterTypes(i)
var parameterWrapperType = parameterType

Expand Down
Loading