Skip to content

Commit 2af8622

Browse files
committed
CASSGO 1 Support for Native Protocol 5
Native Protocol 5 was introduced with the release of C* 4.0. This PR provides full support for a newer version including new format frames (segments), and new fields for QUERY, BATCH, and EXECUTE messages. Also, this PR brings changes to the Compressor interface to follow an append-like design. One more thing, it bumps Go version to the newer 1.19. Patch by Bohdan Siryk; Reviewed by João Reis, James Hartig for CASSGO-1 CASSGO-30
1 parent bf16ec3 commit 2af8622

18 files changed

+2176
-183
lines changed

.github/workflows/main.yml

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ jobs:
3737
go: [ '1.22', '1.23' ]
3838
cassandra_version: [ '4.0.13', '4.1.6' ]
3939
auth: [ "false" ]
40-
compressor: [ "snappy" ]
40+
compressor: [ "snappy", "lz4" ]
4141
tags: [ "cassandra", "integration", "ccm" ]
42+
proto_version: [ "4", "5" ]
43+
exclude:
44+
- proto_version: "5"
45+
compressor: "snappy"
4246
steps:
4347
- uses: actions/checkout@v2
4448
- uses: actions/setup-go@v2
@@ -102,7 +106,7 @@ jobs:
102106
ccm status
103107
ccm node1 nodetool status
104108
105-
args="-gocql.timeout=60s -runssl -proto=4 -rf=3 -clusterSize=3 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
109+
args="-gocql.timeout=60s -runssl -proto=${{ matrix.proto_version }} -rf=3 -clusterSize=3 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
106110
107111
echo "args=$args" >> $GITHUB_ENV
108112
echo "JVM_EXTRA_OPTS=$JVM_EXTRA_OPTS" >> $GITHUB_ENV
@@ -115,7 +119,7 @@ jobs:
115119
if: 'failure()'
116120
uses: actions/upload-artifact@v4
117121
with:
118-
name: ccm-cluster-cassandra-${{ matrix.cassandra_version }}-go-${{ matrix.go }}-tag-${{ matrix.tags }}
122+
name: ccm-cluster-cassandra-${{ matrix.cassandra_version }}-go-${{ matrix.go }}-tag-${{ matrix.tags }}-proto-version-${{ matrix.proto_version }}-compressor-${{ matrix.compressor }}
119123
path: /home/runner/.ccm/test
120124
retention-days: 5
121125
integration-auth-cassandra:
@@ -129,9 +133,12 @@ jobs:
129133
matrix:
130134
go: [ '1.22', '1.23' ]
131135
cassandra_version: [ '4.0.13' ]
132-
compressor: [ "snappy" ]
136+
compressor: [ "snappy", "lz4" ]
133137
tags: [ "integration" ]
134-
138+
proto_version: [ "4", "5" ]
139+
exclude:
140+
- proto_version: "5"
141+
compressor: "snappy"
135142
steps:
136143
- uses: actions/checkout@v3
137144
- uses: actions/setup-go@v4
@@ -193,7 +200,7 @@ jobs:
193200
ccm status
194201
ccm node1 nodetool status
195202
196-
args="-gocql.timeout=60s -runssl -proto=4 -rf=3 -clusterSize=1 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
203+
args="-gocql.timeout=60s -runssl -proto=${{ matrix.proto_version }} -rf=3 -clusterSize=1 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
197204
198205
echo "args=$args" >> $GITHUB_ENV
199206
echo "JVM_EXTRA_OPTS=$JVM_EXTRA_OPTS" >> $GITHUB_ENV

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- Support of sending queries to the specific node with Query.SetHostID() (CASSGO-4)
1313

14+
- Support for Native Protocol 5. Following protocol changes exposed new API
15+
Query.SetKeyspace(), Query.WithNowInSeconds(), Batch.SetKeyspace(), Batch.WithNowInSeconds() (CASSGO-1)
16+
1417
### Changed
1518

1619
- Move lz4 compressor to lz4 package within the gocql module (CASSGO-32)
@@ -41,6 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4144

4245
- Refactor HostInfo creation and ConnectAddress() method (CASSGO-45)
4346

47+
- gocql.Compressor interface changes to follow append-like design. Bumped Go version to 1.19 (CASSGO-1)
48+
4449
### Fixed
4550
- Cassandra version unmarshal fix (CASSGO-49)
4651

batch_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
package gocql
2929

3030
import (
31+
"github.com/stretchr/testify/require"
3132
"testing"
3233
"time"
3334
)
@@ -86,3 +87,84 @@ func TestBatch_WithTimestamp(t *testing.T) {
8687
t.Errorf("got ts %d, expected %d", storedTs, micros)
8788
}
8889
}
90+
91+
func TestBatch_WithNowInSeconds(t *testing.T) {
92+
session := createSession(t)
93+
defer session.Close()
94+
95+
if session.cfg.ProtoVersion < protoVersion5 {
96+
t.Skip("Batch now in seconds are only available on protocol >= 5")
97+
}
98+
99+
if err := createTable(session, `CREATE TABLE IF NOT EXISTS batch_now_in_seconds (id int primary key, val text)`); err != nil {
100+
t.Fatal(err)
101+
}
102+
103+
b := session.NewBatch(LoggedBatch)
104+
b.WithNowInSeconds(0)
105+
b.Query("INSERT INTO batch_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val")
106+
if err := session.ExecuteBatch(b); err != nil {
107+
t.Fatal(err)
108+
}
109+
110+
var remainingTTL int
111+
err := session.Query(`SELECT TTL(val) FROM batch_now_in_seconds WHERE id = ?`, 1).
112+
WithNowInSeconds(10).
113+
Scan(&remainingTTL)
114+
if err != nil {
115+
t.Fatal(err)
116+
}
117+
118+
require.Equal(t, remainingTTL, 10)
119+
}
120+
121+
func TestBatch_SetKeyspace(t *testing.T) {
122+
session := createSession(t)
123+
defer session.Close()
124+
125+
if session.cfg.ProtoVersion < protoVersion5 {
126+
t.Skip("keyspace for BATCH message is not supported in protocol < 5")
127+
}
128+
129+
const keyspaceStmt = `
130+
CREATE KEYSPACE IF NOT EXISTS gocql_keyspace_override_test
131+
WITH replication = {
132+
'class': 'SimpleStrategy',
133+
'replication_factor': '1'
134+
};
135+
`
136+
137+
err := session.Query(keyspaceStmt).Exec()
138+
if err != nil {
139+
t.Fatal(err)
140+
}
141+
142+
err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_keyspace_override_test.batch_keyspace(id int, value text, PRIMARY KEY (id))")
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
147+
ids := []int{1, 2}
148+
texts := []string{"val1", "val2"}
149+
150+
b := session.NewBatch(LoggedBatch).SetKeyspace("gocql_keyspace_override_test")
151+
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[0], texts[0])
152+
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[1], texts[1])
153+
err = session.ExecuteBatch(b)
154+
if err != nil {
155+
t.Fatal(err)
156+
}
157+
158+
var (
159+
id int
160+
text string
161+
)
162+
163+
iter := session.Query("SELECT * FROM gocql_keyspace_override_test.batch_keyspace").Iter()
164+
defer iter.Close()
165+
166+
for i := 0; iter.Scan(&id, &text); i++ {
167+
require.Equal(t, id, ids[i])
168+
require.Equal(t, text, texts[i])
169+
}
170+
}

0 commit comments

Comments
 (0)