Skip to content

Modify RowConstructor to work with WithColumn #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,62 @@ public void TestCollect()
Assert.Equal(19, row3.GetAs<int>("age"));
}

[Fact]
public void TestWithColumn()
{
Func<Column, Column> sizeNameAgeUdf = Udf<Row, string>(
r =>
{
string name = r.GetAs<string>("name");
int? age = r.GetAs<int?>("age");
if (age.HasValue)
{
return $"{r.Size()},{name},{age.Value}";
}

return $"{r.Size()},{name},{string.Empty}";
});

string[] allCols = _df.Columns().ToArray();
DataFrame nameAgeColDF =
_df.WithColumn("NameAgeCol", Struct(allCols[0], allCols.Skip(1).ToArray()));
DataFrame sizeNameAgeColDF =
nameAgeColDF.WithColumn("SizeNameAgeCol", sizeNameAgeUdf(nameAgeColDF["NameAgeCol"]));

Row[] originalDFRows = _df.Collect().ToArray();
Assert.Equal(3, originalDFRows.Length);

Row[] sizeNameAgeColDFRows = sizeNameAgeColDF.Collect().ToArray();
Assert.Equal(3, sizeNameAgeColDFRows.Length);

{
Row row = sizeNameAgeColDFRows[0];
Assert.Equal("Michael", row.GetAs<string>("name"));
Assert.Null(row.Get("age"));
Assert.IsType<Row>(row.Get("NameAgeCol"));
Assert.Equal(originalDFRows[0], row.GetAs<Row>("NameAgeCol"));
Assert.Equal("2,Michael,", row.GetAs<string>("SizeNameAgeCol"));
}

{
Row row = sizeNameAgeColDFRows[1];
Assert.Equal("Andy", row.GetAs<string>("name"));
Assert.Equal(30, row.GetAs<int>("age"));
Assert.IsType<Row>(row.Get("NameAgeCol"));
Assert.Equal(originalDFRows[1], row.GetAs<Row>("NameAgeCol"));
Assert.Equal("2,Andy,30", row.GetAs<string>("SizeNameAgeCol"));
}

{
Row row = sizeNameAgeColDFRows[2];
Assert.Equal("Justin", row.GetAs<string>("name"));
Assert.Equal(19, row.GetAs<int>("age"));
Assert.IsType<Row>(row.Get("NameAgeCol"));
Assert.Equal(originalDFRows[2], row.GetAs<Row>("NameAgeCol"));
Assert.Equal("2,Justin,19", row.GetAs<string>("SizeNameAgeCol"));
}
}

[Fact]
public void TestUDF()
{
Expand Down
4 changes: 0 additions & 4 deletions src/csharp/Microsoft.Spark/Sql/Row.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,6 @@ private void Convert()
{
throw new NotImplementedException();
}
else if (field.DataType is StructType)
{
throw new NotImplementedException();
}
else if (field.DataType is DecimalType)
{
throw new NotImplementedException();
Expand Down
131 changes: 96 additions & 35 deletions src/csharp/Microsoft.Spark/Sql/RowConstructor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using Microsoft.Spark.Sql.Types;
using Razorvine.Pickle;

Expand All @@ -16,74 +18,133 @@ namespace Microsoft.Spark.Sql
internal sealed class RowConstructor : IObjectConstructor
{
/// <summary>
/// Schema of the rows being received. Note that this is thread local variable
/// because one RowConstructor object is registered to the Unpickler and there
/// could be multiple threads unpickling the data using the same object registered.
/// Cache the schemas of the rows being received. Multiple schemas may be
/// sent per batch if there are nested rows contained in the row. Note that
/// this is thread local variable because one RowConstructor object is
/// registered to the Unpickler and there could be multiple threads unpickling
/// the data using the same object registered.
/// </summary>
[ThreadStatic]
private static StructType s_currentSchema;
private static IDictionary<string, StructType> s_schemaCache;

/// <summary>
/// Stores values passed from construct().
/// The RowConstructor that created this instance.
/// </summary>
private object[] _values;
private readonly RowConstructor _parent;

/// <summary>
/// Stores the schema for a row.
/// Stores the args passed from construct().
/// </summary>
private StructType _schema;
private readonly object[] _args;

/// <summary>
/// Returns a string representation of this object.
/// </summary>
/// <returns>A string representation of this object</returns>
public override string ToString()
public RowConstructor() : this(null, null)
{
}

public RowConstructor(RowConstructor parent, object[] args)
{
return string.Format("{{{0}}}", string.Join(",", _values));
_parent = parent;
_args = args;
}

/// <summary>
/// Used by Unpickler to pass unpickled data for handling.
/// </summary>
/// <param name="args">Unpickled data</param>
/// <returns>New RowConstructor object capturing unpickled data</returns>
/// <returns>New RowConstructor object capturing args data</returns>
public object construct(object[] args)
{
// Every first call to construct() contains the schema data. When
// a new RowConstructor object is returned from this function,
// construct() is called on the returned object with the actual
// row data.
// row data. The original RowConstructor object may be reused by the
// Unpickler and each subsequent construct() call can contain the
// schema data or a RowConstructor object that contains row data.
if (s_schemaCache is null)
{
s_schemaCache = new Dictionary<string, StructType>();
}

// Cache the schema to avoid parsing it for each construct() call.
if (s_currentSchema is null)
// When a row is ready to be materialized, then construct() is called
// on the RowConstructor which represents the row.
if ((args.Length == 1) && (args[0] is RowConstructor rowConstructor))
{
s_currentSchema = (StructType)DataType.ParseDataType((string)args[0]);
// Construct the Row and return args containing the Row.
args[0] = rowConstructor.GetRow();
return args;
}

// Note that on the first call, _values will be set to schema passed in,
// but the subsequent construct() call on this object will replace
// _values with the actual row data.
return new RowConstructor { _values = args, _schema = s_currentSchema };
// Return a new RowConstructor where the args either represent the
// schema or the row data. The parent becomes important when calling
// GetRow() on the RowConstructor containing the row data.
//
// - When args is the schema, return a new RowConstructor where the
// parent is set to the calling RowConstructor object.
//
// - In the case where args is the row data, construct() is called on a
// RowConstructor object that contains the schema for the row data. A
// new RowConstructor is returned where the parent is set to the schema
// containing RowConstructor.
return new RowConstructor(this, args);
}

/// <summary>
/// Construct a Row object from unpickled data.
/// Resets the cached row schema.
/// Construct a Row object from unpickled data. This is only to be called
/// on a RowConstructor that contains the row data.
/// </summary>
/// <returns>A row object with unpickled data</returns>
public Row GetRow()
{
// Reset the current schema to ensure that the subsequent construct()
// will use the correct schema if rows from different tables are being
// unpickled. This means that if pickled data is received per row, the
// JSON schema will be parsed for each row. In Spark, rows are batched
// such that the schema will be parsed once for a batch.
// One optimization is to introduce Reset() to reset the s_currentSchema,
// but this has a burden of remembering to call Reset() on the user side,
// thus not done.
s_currentSchema = null;
Debug.Assert(_parent != null);

// It is possible that an entry of a Row (row1) may itself be a Row (row2).
// If the entry is a RowConstructor then it will be a RowConstructor
// which contains the data for row2. Therefore we will call GetRow()
// on the RowConstructor to materialize row2 and replace the RowConstructor
// entry in row1.
for (int i = 0; i < _args.Length; ++i)
{
if (_args[i] is RowConstructor rowConstructor)
{
_args[i] = rowConstructor.GetRow();
}
}

return new Row(_args, _parent.GetSchema());
}

/// <summary>
/// Clears the schema cache. Spark sends rows in batches and for each
/// row there is an accompanying set of schemas and row entries. If the
/// schema was not cached, then it would need to be parsed and converted
/// to a StructType for every row in the batch. A new batch may contain
/// rows from a different table, so calling <c>Reset</c> after each
/// batch would aid in preventing the cache from growing too large.
/// Caching the schemas for each batch, ensures that each schema is
/// only parsed and converted to a StructType once per batch.
/// </summary>
internal void Reset()
{
s_schemaCache?.Clear();
}

/// <summary>
/// Get or cache the schema string contained in args. Calling this
/// is only valid if the child args contain the row values.
/// </summary>
/// <returns></returns>
private StructType GetSchema()
{
Debug.Assert(s_schemaCache != null);
Debug.Assert((_args != null) && (_args.Length == 1) && (_args[0] is string));
var schemaString = (string)_args[0];
if (!s_schemaCache.TryGetValue(schemaString, out StructType schema))
{
schema = (StructType)DataType.ParseDataType(schemaString);
s_schemaCache.Add(schemaString, schema);
}

return new Row(_values, _schema);
return schema;
}
}
}
12 changes: 11 additions & 1 deletion src/csharp/Microsoft.Spark/Utils/PythonSerDe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@ namespace Microsoft.Spark.Utils
/// </summary>
internal class PythonSerDe
{
// One RowConstructor object is registered to the Unpickler and
// there could be multiple threads unpickling row data using
// this object. However there is no issue as the field(s) that are
// reused by this object are instantiated on a per-thread basis and
// therefore not shared between threads.
private static readonly RowConstructor s_rowConstructor;

static PythonSerDe()
{
// Custom picklers used in PySpark implementation.
// Refer to spark/python/pyspark/sql/types.py.
Unpickler.registerConstructor(
"pyspark.sql.types", "_parse_datatype_json_string", new StringConstructor());

s_rowConstructor = new RowConstructor();
Unpickler.registerConstructor(
"pyspark.sql.types", "_create_row_inbound_converter", new RowConstructor());
"pyspark.sql.types", "_create_row_inbound_converter", s_rowConstructor);
}

/// <summary>
Expand All @@ -49,6 +58,7 @@ internal static object[] GetUnpickledObjects(Stream stream, int messageLength)
object unpickledItems = unpickler.loads(
new ReadOnlyMemory<byte>(buffer, 0, messageLength),
stackCapacity: 102); // Spark sends batches of 100 rows, and +2 is for markers.
s_rowConstructor.Reset();
Debug.Assert(unpickledItems != null);
return (unpickledItems as object[]);
}
Expand Down