Skip to content

Commit 98cac51

Browse files
committed
Implement project node
Implement project node. Improve error handling and general tidy up of code.
1 parent c50892e commit 98cac51

23 files changed

+328
-104
lines changed

csharp/src/Apache.Arrow.Acero/CLib.cs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public struct GArrowStringScalar { }
4444
public struct GArrowInt8Scalar { }
4545
public struct GArrowSortOptions { }
4646
public struct GArrowSortKey { }
47+
public struct GArrowProjectNodeOptions { }
48+
public struct GArrowInt32Scalar { }
4749

4850
public enum GArrowJoinType
4951
{
@@ -66,16 +68,16 @@ public enum GArrowSortOrder
6668
public const string DllName = "libarrow-glib-1300.dll";
6769

6870
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_new")]
69-
public static extern unsafe GArrowExecutePlan* garrow_execute_plan_new(GError** error);
71+
public static extern unsafe GArrowExecutePlan* garrow_execute_plan_new(out GError** error);
7072

7173
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_source_node_options_new_record_batch")]
7274
public static extern unsafe GArrowSourceNodeOptions* garrow_source_node_options_new_record_batch(GArrowRecordBatch* record_batch);
7375

7476
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_import")]
75-
public static extern unsafe GArrowRecordBatch* garrow_record_batch_import(CArrowArray* c_abi_array, GArrowSchema* schema, GError** error);
77+
public static extern unsafe GArrowRecordBatch* garrow_record_batch_import(CArrowArray* c_abi_array, GArrowSchema* schema, out GError** error);
7678

7779
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_schema_import")]
78-
public static extern unsafe GArrowSchema* garrow_schema_import(CArrowSchema* c_abi_schema, GError** error);
80+
public static extern unsafe GArrowSchema* garrow_schema_import(CArrowSchema* c_abi_schema, out GError** error);
7981

8082
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_schema_get_field")]
8183
public static extern unsafe GArrowField* garrow_schema_get_field(GArrowSchema* schema, uint i);
@@ -84,22 +86,22 @@ public enum GArrowSortOrder
8486
public static extern unsafe bool garrow_field_is_nullable(GArrowField* field);
8587

8688
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_hash_join_node_options_new")]
87-
public static extern unsafe GArrowHashJoinNodeOptions* garrow_hash_join_node_options_new(GArrowJoinType type, IntPtr left_keys, uint n_left_keys, IntPtr right_keys, uint n_right_keys, GError** error);
89+
public static extern unsafe GArrowHashJoinNodeOptions* garrow_hash_join_node_options_new(GArrowJoinType type, IntPtr left_keys, uint n_left_keys, IntPtr right_keys, uint n_right_keys, out GError** error);
8890

8991
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_hash_join_node")]
90-
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_hash_join_node(GArrowExecutePlan* plan, GArrowExecuteNode* left, GArrowExecuteNode* right, GArrowHashJoinNodeOptions* options, GError** error);
92+
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_hash_join_node(GArrowExecutePlan* plan, GArrowExecuteNode* left, GArrowExecuteNode* right, GArrowHashJoinNodeOptions* options, out GError** error);
9193

9294
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_source_node")]
93-
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_source_node(GArrowExecutePlan* plan, GArrowSourceNodeOptions* options, GError** error);
95+
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_source_node(GArrowExecutePlan* plan, GArrowSourceNodeOptions* options, out GError** error);
9496

9597
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_validate")]
96-
public static extern unsafe bool garrow_execute_plan_validate(GArrowExecutePlan* plan, GError** error);
98+
public static extern unsafe bool garrow_execute_plan_validate(GArrowExecutePlan* plan, out GError** error);
9799

98100
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_sink_node_options_new")]
99101
public static extern unsafe GArrowSinkNodeOptions* garrow_sink_node_options_new();
100102

101103
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_sink_node")]
102-
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_sink_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowSinkNodeOptions* options, GError** error);
104+
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_sink_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowSinkNodeOptions* options, out GError** error);
103105

104106
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_start")]
105107
public static extern unsafe void garrow_execute_plan_start(GArrowExecutePlan* plan);
@@ -111,16 +113,16 @@ public enum GArrowSortOrder
111113
public static extern unsafe GArrowRecordBatchReader* garrow_sink_node_options_get_reader(GArrowSinkNodeOptions* options, GArrowSchema* schema);
112114

113115
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_reader_export")]
114-
public static extern unsafe CArrowArrayStream* garrow_record_batch_reader_export(GArrowRecordBatchReader* reader, GError** error);
116+
public static extern unsafe CArrowArrayStream* garrow_record_batch_reader_export(GArrowRecordBatchReader* reader, out GError** error);
115117

116118
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_record_batch_reader_import")]
117-
public static extern unsafe GArrowRecordBatchReader* garrow_record_batch_reader_import(CArrowArrayStream* c_abi_array_stream, GError** error);
119+
public static extern unsafe GArrowRecordBatchReader* garrow_record_batch_reader_import(CArrowArrayStream* c_abi_array_stream, out GError** error);
118120

119121
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_source_node_options_new_record_batch_reader")]
120122
public static extern unsafe GArrowSourceNodeOptions* garrow_source_node_options_new_record_batch_reader(GArrowRecordBatchReader* reader);
121123

122124
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_filter_node")]
123-
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_filter_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowFilterNodeOptions* options, GError** error);
125+
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_filter_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowFilterNodeOptions* options, out GError** error);
124126

125127
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_filter_node_options_new")]
126128
public static extern unsafe GArrowFilterNodeOptions* garrow_filter_node_options_new(IntPtr expression);
@@ -129,7 +131,7 @@ public enum GArrowSortOrder
129131
public static extern unsafe GArrowCallExpression* garrow_call_expression_new(IntPtr function, IntPtr arguments, GArrowFunctionOptions* options);
130132

131133
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_field_expression_new")]
132-
public static extern unsafe GArrowFieldExpression* garrow_field_expression_new(IntPtr reference, GError** error);
134+
public static extern unsafe GArrowFieldExpression* garrow_field_expression_new(IntPtr reference, out GError** error);
133135

134136
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_literal_expression_new")]
135137
public static extern unsafe GArrowFieldExpression* garrow_literal_expression_new(GArrowDatum* datum);
@@ -156,10 +158,20 @@ public enum GArrowSortOrder
156158
public static extern unsafe GArrowSortOptions* garrow_sort_options_new(GList* sort_keys);
157159

158160
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_sort_key_new")]
159-
public static extern unsafe GArrowSortKey* garrow_sort_key_new(IntPtr target, GArrowSortOrder order, GError** error);
161+
public static extern unsafe GArrowSortKey* garrow_sort_key_new(IntPtr target, GArrowSortOrder order, out GError** error);
160162

161163
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_node")]
162164
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_node(GArrowExecutePlan* plan, IntPtr factory_name, IntPtr inputs, IntPtr options, out GError **error);
165+
166+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_project_node_options_new")]
167+
public static extern unsafe GArrowProjectNodeOptions* garrow_project_node_options_new(GList* expressions, IntPtr names, int n_names);
168+
169+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_execute_plan_build_project_node")]
170+
public static extern unsafe GArrowExecuteNode* garrow_execute_plan_build_project_node(GArrowExecutePlan* plan, GArrowExecuteNode* input, GArrowProjectNodeOptions* options, out GError** error);
171+
172+
173+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl, EntryPoint = "garrow_int32_scalar_new")]
174+
public static extern unsafe GArrowInt32Scalar* garrow_int32_scalar_new(int value);
163175
}
164176

165177
[StructLayout(LayoutKind.Sequential)]

csharp/src/Apache.Arrow.Acero/Declaration.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static IArrowArrayStream ToRecordBatchReader(Declaration declaration, Sch
7171
new List<Declaration>
7272
{
7373
declaration,
74-
new Declaration("table_sink", sinkNodeOptions)
74+
new Declaration("sink", sinkNodeOptions)
7575
});
7676

7777
sinkNode.AddToPlan(plan);
@@ -89,15 +89,15 @@ private ExecNode AddToPlan(ExecPlan plan)
8989
{
9090
var nodes = new List<ExecNode>();
9191

92-
foreach (var input in Inputs)
92+
foreach (Declaration input in Inputs)
9393
{
94-
var node = input.AddToPlan(plan);
94+
ExecNode node = input.AddToPlan(plan);
9595
nodes.Add(node);
9696
}
9797

9898
switch (_factoryName)
9999
{
100-
case "table_sink":
100+
case "sink":
101101
return new SinkNode(_options as SinkNodeOptions, plan, nodes);
102102

103103
case "record_batch_source":
@@ -115,6 +115,9 @@ private ExecNode AddToPlan(ExecPlan plan)
115115
case "union":
116116
return new UnionNode(plan, nodes);
117117

118+
case "project":
119+
return new ProjectNode(_options as ProjectNodeOptions, plan, nodes);
120+
118121
default:
119122
throw new Exception($"Unknown factory {_factoryName}");
120123
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using static Apache.Arrow.Acero.CLib;
3+
4+
namespace Apache.Arrow.Acero
5+
{
6+
public static class ExceptionUtil
7+
{
8+
public static unsafe void ThrowOnError(GError** error)
9+
{
10+
if ((IntPtr)error != IntPtr.Zero)
11+
throw new GLib.GException((IntPtr)error);
12+
}
13+
}
14+
}

csharp/src/Apache.Arrow.Acero/ExecNode.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,5 @@ namespace Apache.Arrow.Acero
2020
public abstract class ExecNode
2121
{
2222
public abstract unsafe GArrowExecuteNode* GetPtr();
23-
2423
}
2524
}

csharp/src/Apache.Arrow.Acero/ExecPlan.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16+
using static Apache.Arrow.Acero.CLib;
17+
1618
namespace Apache.Arrow.Acero
1719
{
1820
public class ExecPlan
@@ -21,12 +23,22 @@ public class ExecPlan
2123

2224
public unsafe ExecPlan()
2325
{
24-
_planPtr = CLib.garrow_execute_plan_new(null);
26+
GError** error;
27+
28+
_planPtr = CLib.garrow_execute_plan_new(out error);
29+
30+
ExceptionUtil.ThrowOnError(error);
2531
}
2632

2733
public unsafe bool Validate()
2834
{
29-
return CLib.garrow_execute_plan_validate(_planPtr, null);
35+
GError** error;
36+
37+
var valid = CLib.garrow_execute_plan_validate(_planPtr, out error);
38+
39+
ExceptionUtil.ThrowOnError(error);
40+
41+
return valid;
3042
}
3143

3244
public unsafe void StartProducing()

csharp/src/Apache.Arrow.Acero/ExportUtil.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
using Apache.Arrow.C;
1717
using Apache.Arrow.Ipc;
18+
using static Apache.Arrow.Acero.CLib;
1819

1920
namespace Apache.Arrow.Acero
2021
{
@@ -29,7 +30,10 @@ internal static class ExportUtil
2930
CArrowSchemaExporter.ExportSchema(schema, cSchema);
3031

3132
// Import CArrowSchema into a GArrowSchema
32-
var gSchema = CLib.garrow_schema_import(cSchema, null);
33+
GError** error;
34+
var gSchema = CLib.garrow_schema_import(cSchema, out error);
35+
36+
ExceptionUtil.ThrowOnError(error);
3337

3438
return gSchema;
3539
}
@@ -45,7 +49,10 @@ internal static class ExportUtil
4549
CArrowArrayExporter.ExportRecordBatch(recordBatch, cArray);
4650

4751
// import the CArrowArray into a gArrowRecordBatch
48-
var gRecordBatch = CLib.garrow_record_batch_import(cArray, schemaPtr, null);
52+
GError** error;
53+
var gRecordBatch = CLib.garrow_record_batch_import(cArray, schemaPtr, out error);
54+
55+
ExceptionUtil.ThrowOnError(error);
4956

5057
return gRecordBatch;
5158
}
@@ -59,7 +66,10 @@ internal static class ExportUtil
5966
CArrowArrayStreamExporter.ExportArrayStream(recordBatchReader, cArrayStream);
6067

6168
// next import the c arrow as a gArrowRecordBatch
62-
var gRecordBatchReader = CLib.garrow_record_batch_reader_import(cArrayStream, null);
69+
GError** error;
70+
var gRecordBatchReader = CLib.garrow_record_batch_reader_import(cArrayStream, out error);
71+
72+
ExceptionUtil.ThrowOnError(error);
6373

6474
return gRecordBatchReader;
6575
}

csharp/src/Apache.Arrow.Acero/Expression.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,11 @@ namespace Apache.Arrow.Acero
2020
public abstract class Expression
2121
{
2222
public abstract IntPtr GetPtr();
23+
24+
public override unsafe string ToString()
25+
{
26+
var strPtr = CLib.garrow_expression_to_string(GetPtr());
27+
return StringUtil.PtrToStringUtf8((byte*)strPtr);
28+
}
2329
}
2430
}

csharp/src/Apache.Arrow.Acero/FieldExpression.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// limitations under the License.
1515

1616
using System;
17+
using static Apache.Arrow.Acero.CLib;
1718

1819
namespace Apache.Arrow.Acero
1920
{
@@ -25,7 +26,11 @@ public unsafe FieldExpression(string field)
2526
{
2627
var reference = (nint)StringUtil.ToCStringUtf8(field);
2728

28-
_ptr = (nint)CLib.garrow_field_expression_new(reference, null);
29+
GError** error;
30+
31+
_ptr = (IntPtr)CLib.garrow_field_expression_new(reference, out error);
32+
33+
ExceptionUtil.ThrowOnError(error);
2934
}
3035

3136
public override IntPtr GetPtr()

csharp/src/Apache.Arrow.Acero/FilterNode.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// limitations under the License.
1515

1616
using System.Collections.Generic;
17+
using static Apache.Arrow.Acero.CLib;
1718

1819
namespace Apache.Arrow.Acero
1920
{
@@ -22,8 +23,12 @@ public class FilterNode : ExecNode
2223
private unsafe CLib.GArrowExecuteNode* _nodePtr;
2324

2425
public unsafe FilterNode(FilterNodeOptions options, ExecPlan plan, List<ExecNode> nodes)
25-
{
26-
_nodePtr = CLib.garrow_execute_plan_build_filter_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), null);
26+
{
27+
GError** error;
28+
29+
_nodePtr = CLib.garrow_execute_plan_build_filter_node(plan.GetPtr(), nodes[0].GetPtr(), options.GetPtr(), out error);
30+
31+
ExceptionUtil.ThrowOnError(error);
2732
}
2833

2934
public override unsafe CLib.GArrowExecuteNode* GetPtr()

csharp/src/Apache.Arrow.Acero/FilterNodeOptions.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
using System;
1716
using static Apache.Arrow.Acero.CLib;
1817

1918
namespace Apache.Arrow.Acero
@@ -25,11 +24,6 @@ public class FilterNodeOptions : ExecNodeOptions
2524
public unsafe FilterNodeOptions(Expression expr)
2625
{
2726
_optionsPtr = CLib.garrow_filter_node_options_new(expr.GetPtr());
28-
29-
var strPtr = CLib.garrow_expression_to_string(expr.GetPtr());
30-
var str = StringUtil.PtrToStringUtf8((byte*)strPtr);
31-
32-
Console.WriteLine(str);
3327
}
3428

3529
internal unsafe GArrowFilterNodeOptions* GetPtr()

0 commit comments

Comments
 (0)