Skip to content

Commit 9f77f8b

Browse files
committed
api: add support of a batch insert request
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW . Closes #399
1 parent 592db69 commit 9f77f8b

File tree

6 files changed

+299
-6
lines changed

6 files changed

+299
-6
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
99
## [Unreleased]
1010

1111
### Added
12-
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13-
connection and ctx is not canceled;
14-
also added logs for error case of `ConnectionPool.tryConnect()` calls in
12+
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13+
connection and ctx is not canceled;
14+
also added logs for error case of `ConnectionPool.tryConnect()` calls in
1515
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
1616
- Methods that are implemented but not included in the pooler interface (#395).
1717
- Implemented stringer methods for pool.Role (#405).
18+
- Support the IPROTO_INSERT_ARROW request (#399).
1819

1920
### Changed
2021

arrow/arrow.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package arrow
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
// Arrow MessagePack extension type
11+
const arrowExtId = 8
12+
13+
// Arrow struct wraps a raw arrow data buffer.
14+
type Arrow struct {
15+
data []byte
16+
}
17+
18+
// MakeArrow returns a new arrow.Arrow object that contains
19+
// wrapped a raw arrow data buffer.
20+
func MakeArrow(arrow []byte) (Arrow, error) {
21+
if len(arrow) == 0 {
22+
return Arrow{}, fmt.Errorf("no Arrow data")
23+
}
24+
return Arrow{arrow}, nil
25+
}
26+
27+
// ToArrow returns a []byte that contains Arrow raw data.
28+
func (a *Arrow) ToArrow() []byte {
29+
return a.data
30+
}
31+
32+
func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
33+
arrow := Arrow{
34+
data: make([]byte, 0, extLen),
35+
}
36+
n, err := d.Buffered().Read(arrow.data)
37+
if err != nil {
38+
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
39+
}
40+
if n < extLen || n != len(arrow.data) {
41+
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
42+
}
43+
44+
v.Set(reflect.ValueOf(arrow))
45+
return nil
46+
}
47+
48+
func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
49+
if v.IsValid() {
50+
return v.Interface().(Arrow).data, nil
51+
}
52+
53+
return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
54+
}
55+
56+
func init() {
57+
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
58+
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
59+
}

arrow/arrow_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package arrow_test
2+
3+
import (
4+
"encoding/hex"
5+
"log"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"github.com/tarantool/go-tarantool/v2"
13+
"github.com/tarantool/go-tarantool/v2/arrow"
14+
"github.com/tarantool/go-tarantool/v2/test_helpers"
15+
)
16+
17+
var isArrowSupported = false
18+
19+
var server = "127.0.0.1:3013"
20+
var dialer = tarantool.NetDialer{
21+
Address: server,
22+
User: "test",
23+
Password: "test",
24+
}
25+
var space = "testArrow"
26+
27+
var opts = tarantool.Opts{
28+
//! Timeout: 5 * time.Second,
29+
}
30+
31+
func skipIfArrowUnsupported(t *testing.T) {
32+
t.Helper()
33+
if !isArrowSupported {
34+
t.Skip("Skipping test for Tarantool without Arrow support in msgpack")
35+
}
36+
}
37+
38+
// TestInsert based on Tarantool test.
39+
// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua
40+
func TestInsert_invalid(t *testing.T) {
41+
skipIfArrowUnsupported(t)
42+
43+
arrows := []struct {
44+
arrow string
45+
expected string
46+
}{
47+
{
48+
"80",
49+
"Missing mandatory field 'SPACE_ID' in request",
50+
},
51+
{
52+
"8110cd0200",
53+
"Missing mandatory field 'ARROW' in request",
54+
},
55+
{
56+
"8210cd020036c0",
57+
"Invalid MsgPack - packet body",
58+
},
59+
{
60+
"8210cd020036c70000",
61+
"Invalid MsgPack - packet body",
62+
},
63+
64+
//? What is r[box.iproto.key.ERROR][0][1][6].method|details
65+
//? {
66+
//? "8210cd020036c70008",
67+
//? "Failed to decode Arrow IPC data",
68+
//? },
69+
{
70+
"8210cd020036c8011008ffffffff70000000040000009effffff0400010004000000" +
71+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
72+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
73+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
74+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
75+
"01000000000000003400000008000000000000000200000000000000000000000000" +
76+
"00000000000000000000000000000800000000000000000000000100000001000000" +
77+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
78+
"00000000000000000000",
79+
"memtx does not support arrow format",
80+
},
81+
}
82+
83+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
84+
defer conn.Close()
85+
86+
for i, a := range arrows {
87+
t.Run(strconv.Itoa(i), func(t *testing.T) {
88+
data, err := hex.DecodeString(a.arrow)
89+
require.NoError(t, err)
90+
91+
arr, err := arrow.MakeArrow(data)
92+
require.NoError(t, err)
93+
req := tarantool.NewInsertArrowRequest(space).Arrow(arr)
94+
95+
_, err = conn.Do(req).Get()
96+
//? fmt.Printf("%d: %s\n", i, err.Error())
97+
require.ErrorContains(t, err, a.expected)
98+
})
99+
}
100+
101+
}
102+
103+
// runTestMain is a body of TestMain function
104+
// (see https://pkg.go.dev/testing#hdr-Main).
105+
// Using defer + os.Exit is not works so TestMain body
106+
// is a separate function, see
107+
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
108+
func runTestMain(m *testing.M) int {
109+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0)
110+
if err != nil {
111+
log.Fatalf("Failed to extract Tarantool version: %s", err)
112+
}
113+
isArrowSupported = !isLess
114+
115+
if !isArrowSupported {
116+
log.Println("Skipping insert Arrow tests...")
117+
return m.Run()
118+
}
119+
120+
instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{
121+
Dialer: dialer,
122+
InitScript: "config.lua",
123+
Listen: server,
124+
WaitStart: 100 * time.Millisecond,
125+
ConnectRetry: 10,
126+
RetryTimeout: 500 * time.Millisecond,
127+
})
128+
defer test_helpers.StopTarantoolWithCleanup(instance)
129+
130+
if err != nil {
131+
log.Printf("Failed to prepare test Tarantool: %s", err)
132+
return 1
133+
}
134+
135+
return m.Run()
136+
}
137+
138+
func TestMain(m *testing.M) {
139+
code := runTestMain(m)
140+
os.Exit(code)
141+
}

arrow/config.lua

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
--? local uuid = require('uuid')
2+
--? local msgpack = require('msgpack')
3+
4+
-- Do not set listen for now so connector won't be
5+
-- able to send requests until everything is configured.
6+
box.cfg{
7+
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
8+
}
9+
10+
box.schema.user.create('test', { password = 'test' , if_not_exists = true })
11+
box.schema.user.grant('test', 'execute', 'universe', nil, { if_not_exists = true })
12+
13+
--? local uuid_msgpack_supported = pcall(msgpack.encode, uuid.new())
14+
--? if not uuid_msgpack_supported then
15+
--? error('UUID unsupported, use Tarantool 2.4.1 or newer')
16+
--? end
17+
18+
local s = box.schema.space.create('testArrow', {
19+
id = 524,
20+
if_not_exists = true,
21+
})
22+
s:create_index('primary', {
23+
type = 'tree',
24+
parts = {{ field = 1, type = 'integer' }},
25+
if_not_exists = true
26+
})
27+
s:truncate()
28+
29+
box.schema.user.grant('test', 'read,write', 'space', 'testArrow', { if_not_exists = true })
30+
31+
-- Set listen only when every other thing is configured.
32+
box.cfg{
33+
listen = os.getenv("TEST_TNT_LISTEN"),
34+
}

request.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ import (
1313
"github.com/vmihailenco/msgpack/v5"
1414
)
1515

16+
// INSERT Arrow request.
17+
//
18+
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
19+
// https://github.com/tarantool/go-replica/issues/30
20+
const iprotoInsertArrowType = iproto.Type(17)
21+
1622
type spaceEncoder struct {
1723
Id uint32
1824
Name string
@@ -1156,6 +1162,50 @@ func (req *InsertRequest) Context(ctx context.Context) *InsertRequest {
11561162
return req
11571163
}
11581164

1165+
// InsertArrowRequest helps you to create an insert request object for execution
1166+
// by a Connection.
1167+
type InsertArrowRequest struct {
1168+
spaceRequest
1169+
arrow interface{}
1170+
}
1171+
1172+
// NewInsertArrowRequest returns a new empty InsertArrowRequest.
1173+
func NewInsertArrowRequest(space interface{}) *InsertArrowRequest {
1174+
req := new(InsertArrowRequest)
1175+
req.rtype = iprotoInsertArrowType
1176+
req.setSpace(space)
1177+
req.arrow = []interface{}{}
1178+
return req
1179+
}
1180+
1181+
// Arrow sets the arrow for insertion the insert arrow request.
1182+
// Note: default value is nil.
1183+
func (req *InsertArrowRequest) Arrow(arrow interface{}) *InsertArrowRequest {
1184+
req.arrow = arrow
1185+
return req
1186+
}
1187+
1188+
// Body fills an msgpack.Encoder with the insert arrow request body.
1189+
func (req *InsertArrowRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
1190+
spaceEnc, err := newSpaceEncoder(res, req.space)
1191+
if err != nil {
1192+
return err
1193+
}
1194+
1195+
return fillInsert(enc, spaceEnc, req.arrow)
1196+
}
1197+
1198+
// Context sets a passed context to the request.
1199+
//
1200+
// Pay attention that when using context with request objects,
1201+
// the timeout option for Connection does not affect the lifetime
1202+
// of the request. For those purposes use context.WithTimeout() as
1203+
// the root context.
1204+
func (req *InsertArrowRequest) Context(ctx context.Context) *InsertArrowRequest {
1205+
req.ctx = ctx
1206+
return req
1207+
}
1208+
11591209
// ReplaceRequest helps you to create a replace request object for execution
11601210
// by a Connection.
11611211
type ReplaceRequest struct {

test_helpers/main.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,22 @@ func atoiUint64(str string) (uint64, error) {
120120
return res, nil
121121
}
122122

123+
func getTarantoolExec() string {
124+
125+
if tar_bin := os.Getenv("TARANTOOL_BIN"); tar_bin != "" {
126+
return tar_bin
127+
}
128+
129+
return "tarantool"
130+
}
131+
123132
// IsTarantoolVersionLess checks if tarantool version is less
124133
// than passed <major.minor.patch>. Returns error if failed
125134
// to extract version.
126135
func IsTarantoolVersionLess(majorMin uint64, minorMin uint64, patchMin uint64) (bool, error) {
127136
var major, minor, patch uint64
128137

129-
out, err := exec.Command("tarantool", "--version").Output()
138+
out, err := exec.Command(getTarantoolExec(), "--version").Output()
130139

131140
if err != nil {
132141
return true, err
@@ -202,8 +211,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) {
202211
return inst, err
203212
}
204213
}
205-
206-
inst.Cmd = exec.Command("tarantool", startOpts.InitScript)
214+
inst.Cmd = exec.Command(getTarantoolExec(), startOpts.InitScript)
207215

208216
inst.Cmd.Env = append(
209217
os.Environ(),

0 commit comments

Comments
 (0)