Skip to content

feat(arrow): Streamline Apache Arrow extension types #823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,34 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230425184555-43f156fcdec9

require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
)

replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230417154311-f9add0212acd

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect; indirect // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect; indirect // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect; indirect // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect; indirect // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect; indirect // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudquery/arrow/go/v12 v12.0.0-20230417154311-f9add0212acd h1:G093N165IqQvq84MK3Ozi7QwfAWyfYywXijkOyxGJdI=
github.com/cloudquery/arrow/go/v12 v12.0.0-20230417154311-f9add0212acd/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
github.com/cloudquery/arrow/go/v12 v12.0.0-20230425184555-43f156fcdec9 h1:DOmgyWSIXR8FLBS23UgIwCkEE/JPBWztzTjFHyj5lCw=
github.com/cloudquery/arrow/go/v12 v12.0.0-20230425184555-43f156fcdec9/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand Down
2 changes: 1 addition & 1 deletion plugins/destination/plugin_testing_overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
SourceName: sourceName,
SyncTime: secondSyncTime,
MaxRows: 1,
StableUUID: *u,
StableUUID: u,
}
updatedResource := testdata.GenTestData(table, opts)[0]
// write second time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
opts = testdata.GenTestDataOptions{
SourceName: sourceName,
SyncTime: secondSyncTime,
StableUUID: *u,
StableUUID: u,
MaxRows: 1,
}
updatedResources := testdata.GenTestData(table, opts)[0]
Expand Down
8 changes: 4 additions & 4 deletions schema/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func CQTypesToRecord(mem memory.Allocator, c []CQTypes, arrowSchema *arrow.Schem
}
case TypeInet:
if c[j][i].(*Inet).Status == Present {
bldr.Field(i).(*types.InetBuilder).Append(*c[j][i].(*Inet).IPNet)
bldr.Field(i).(*types.InetBuilder).Append(c[j][i].(*Inet).IPNet)
} else {
bldr.Field(i).(*types.InetBuilder).AppendNull()
}
Expand All @@ -407,14 +407,14 @@ func CQTypesToRecord(mem memory.Allocator, c []CQTypes, arrowSchema *arrow.Schem
listBldr := bldr.Field(i).(*array.ListBuilder)
listBldr.Append(true)
for _, e := range c[j][i].(*InetArray).Elements {
listBldr.ValueBuilder().(*types.InetBuilder).Append(*e.IPNet)
listBldr.ValueBuilder().(*types.InetBuilder).Append(e.IPNet)
}
} else {
bldr.Field(i).(*array.ListBuilder).AppendNull()
}
case TypeCIDR:
if c[j][i].(*CIDR).Status == Present {
bldr.Field(i).(*types.InetBuilder).Append(*c[j][i].(*CIDR).IPNet)
bldr.Field(i).(*types.InetBuilder).Append(c[j][i].(*CIDR).IPNet)
} else {
bldr.Field(i).(*types.InetBuilder).AppendNull()
}
Expand All @@ -423,7 +423,7 @@ func CQTypesToRecord(mem memory.Allocator, c []CQTypes, arrowSchema *arrow.Schem
listBldr := bldr.Field(i).(*array.ListBuilder)
listBldr.Append(true)
for _, e := range c[j][i].(*CIDRArray).Elements {
listBldr.ValueBuilder().(*types.InetBuilder).Append(*e.IPNet)
listBldr.ValueBuilder().(*types.InetBuilder).Append(e.IPNet)
}
} else {
bldr.Field(i).(*array.ListBuilder).AppendNull()
Expand Down
6 changes: 3 additions & 3 deletions testdata/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,19 @@ func GenTestData(sc *arrow.Schema, opts GenTestDataOptions) []arrow.Record {
if err != nil {
panic(err)
}
bldr.Field(i).(*types.InetBuilder).Append(*ipnet)
bldr.Field(i).(*types.InetBuilder).Append(ipnet)
} else if arrow.TypeEqual(c.Type, arrow.ListOf(types.ExtensionTypes.Inet)) {
bldr.Field(i).(*array.ListBuilder).Append(true)
_, ipnet, err := net.ParseCIDR("192.0.2.1/24")
if err != nil {
panic(err)
}
bldr.Field(i).(*array.ListBuilder).ValueBuilder().(*types.InetBuilder).Append(*ipnet)
bldr.Field(i).(*array.ListBuilder).ValueBuilder().(*types.InetBuilder).Append(ipnet)
_, ipnet, err = net.ParseCIDR("192.0.2.1/24")
if err != nil {
panic(err)
}
bldr.Field(i).(*array.ListBuilder).ValueBuilder().(*types.InetBuilder).Append(*ipnet)
bldr.Field(i).(*array.ListBuilder).ValueBuilder().(*types.InetBuilder).Append(ipnet)
} else if arrow.TypeEqual(c.Type, types.ExtensionTypes.Mac) {
mac, err := net.ParseMAC("aa:bb:cc:dd:ee:ff")
if err != nil {
Expand Down
81 changes: 44 additions & 37 deletions types/inet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@ type InetBuilder struct {
*array.ExtensionBuilder
}

func NewInetBuilder(bldr *array.ExtensionBuilder) *InetBuilder {
b := &InetBuilder{
ExtensionBuilder: bldr,
}
return b
func NewInetBuilder(builder *array.ExtensionBuilder) *InetBuilder {
return &InetBuilder{ExtensionBuilder: builder}
}

func (b *InetBuilder) Append(v net.IPNet) {
func (b *InetBuilder) Append(v *net.IPNet) {
if v == nil {
b.AppendNull()
return
}
b.ExtensionBuilder.Builder.(*array.StringBuilder).Append(v.String())
}

func (b *InetBuilder) UnsafeAppend(v net.IPNet) {
func (b *InetBuilder) UnsafeAppend(v *net.IPNet) {
b.ExtensionBuilder.Builder.(*array.StringBuilder).UnsafeAppend([]byte(v.String()))
}

func (b *InetBuilder) AppendValues(v []net.IPNet, valid []bool) {
func (b *InetBuilder) AppendValues(v []*net.IPNet, valid []bool) {
data := make([]string, len(v))
for i, v := range v {
if !valid[i] {
continue
}
data[i] = v.String()
}
b.ExtensionBuilder.Builder.(*array.StringBuilder).AppendValues(data, valid)
Expand All @@ -48,7 +52,7 @@ func (b *InetBuilder) AppendValueFromString(s string) error {
if err != nil {
return err
}
b.Append(*data)
b.Append(data)
return nil
}

Expand All @@ -58,20 +62,18 @@ func (b *InetBuilder) UnmarshalOne(dec *json.Decoder) error {
return err
}

var val net.IPNet
var val *net.IPNet
switch v := t.(type) {
case string:
_, data, err := net.ParseCIDR(v)
_, val, err = net.ParseCIDR(v)
if err != nil {
return err
}
val = *data
case []byte:
_, data, err := net.ParseCIDR(string(v))
_, val, err = net.ParseCIDR(string(v))
if err != nil {
return err
}
val = *data
case nil:
b.AppendNull()
return nil
Expand Down Expand Up @@ -116,7 +118,7 @@ type InetArray struct {
array.ExtensionArrayBase
}

func (a InetArray) String() string {
func (a *InetArray) String() string {
arr := a.Storage().(*array.String)
o := new(strings.Builder)
o.WriteString("[")
Expand All @@ -126,33 +128,39 @@ func (a InetArray) String() string {
}
switch {
case a.IsNull(i):
o.WriteString("(null)")
o.WriteString(array.NullValueStr)
default:
fmt.Fprintf(o, "\"%s\"", arr.Value(i))
fmt.Fprintf(o, "%q", a.ValueStr(i))
}
}
o.WriteString("]")
return o.String()
}

func (a *InetArray) Value(i int) *net.IPNet {
if a.IsNull(i) {
return nil
}
_, ipnet, err := net.ParseCIDR(a.Storage().(*array.String).Value(i))
if err != nil {
panic(fmt.Errorf("invalid ip+net: %w", err))
}

return ipnet
}

func (a *InetArray) ValueStr(i int) string {
arr := a.Storage().(*array.String)
switch {
case a.IsNull(i):
return "(null)"
return array.NullValueStr
default:
return arr.Value(i)
return a.Value(i).String()
}
}

func (a *InetArray) GetOneForMarshal(i int) any {
arr := a.Storage().(*array.String)
if a.IsValid(i) {
_, ipnet, err := net.ParseCIDR(arr.Value(i))
if err != nil {
panic(fmt.Errorf("invalid ip+net: %w", err))
}
return ipnet.String()
if val := a.Value(i); val != nil {
return val.String()
}
return nil
}
Expand All @@ -166,27 +174,26 @@ type InetType struct {
// NewInetType is a convenience function to create an instance of InetType
// with the correct storage type
func NewInetType() *InetType {
return &InetType{
ExtensionBase: arrow.ExtensionBase{
Storage: &arrow.StringType{}}}
return &InetType{ExtensionBase: arrow.ExtensionBase{Storage: &arrow.StringType{}}}
}

func (InetType) ArrayType() reflect.Type {
// ArrayType returns TypeOf(InetArray{}) for constructing Inet arrays
func (*InetType) ArrayType() reflect.Type {
return reflect.TypeOf(InetArray{})
}

func (InetType) ExtensionName() string {
func (*InetType) ExtensionName() string {
return "inet"
}

// Serialize returns "inet-serialized" for testing proper metadata passing
func (InetType) Serialize() string {
func (*InetType) Serialize() string {
return "inet-serialized"
}

// Deserialize expects storageType to be StringType and the data to be
// "inet-serialized" in order to correctly create a InetType for testing deserialize.
func (InetType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) {
func (*InetType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) {
if data != "inet-serialized" {
return nil, fmt.Errorf("type identifier did not match: '%s'", data)
}
Expand All @@ -196,11 +203,11 @@ func (InetType) Deserialize(storageType arrow.DataType, data string) (arrow.Exte
return NewInetType(), nil
}

// InetType are equal if both are named "inet"
func (u InetType) ExtensionEquals(other arrow.ExtensionType) bool {
// ExtensionEquals returns true if both extensions have the same name
func (u *InetType) ExtensionEquals(other arrow.ExtensionType) bool {
return u.ExtensionName() == other.ExtensionName()
}

func (InetType) NewBuilder(bldr *array.ExtensionBuilder) array.Builder {
func (*InetType) NewBuilder(bldr *array.ExtensionBuilder) array.Builder {
return NewInetBuilder(bldr)
}
6 changes: 3 additions & 3 deletions types/inet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"github.com/stretchr/testify/require"
)

func mustParseInet(s string) net.IPNet {
func mustParseInet(s string) *net.IPNet {
_, ipnet, err := net.ParseCIDR(s)
if err != nil {
panic(err)
}
return *ipnet
return ipnet
}

func TestInetBuilder(t *testing.T) {
Expand All @@ -31,7 +31,7 @@ func TestInetBuilder(t *testing.T) {
require.Equal(t, 4, b.Len(), "unexpected Len()")
require.Equal(t, 2, b.NullN(), "unexpected NullN()")

values := []net.IPNet{
values := []*net.IPNet{
mustParseInet("192.168.0.0/26"),
mustParseInet("192.168.0.0/27"),
}
Expand Down
Loading