Skip to content

Commit ad0bd33

Browse files
authored
Merge pull request #18 from ndrean/main
[Fix] nkey jetstream
2 parents 606325e + 26ffbcd commit ad0bd33

File tree

4 files changed

+222
-102
lines changed

4 files changed

+222
-102
lines changed

.github/workflows/ci.yml

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -118,23 +118,6 @@ jobs:
118118
119119
kill $SERVER_PID || true
120120
121-
# Run Unit Tests without NATS server
122-
# mem leaks removed & ConnError handled
123-
- name: Run Unit Tests without NATS Server
124-
run: zig build test --summary all
125-
126-
# Run Unit Tests with NATS server
127-
- name: Run Unit Tests with NATS Server
128-
run: |
129-
# Start NATS in background with JetStream enabled (port 4222)
130-
nats-server -js -p 4222 &
131-
SERVER_PID=$!
132-
sleep 2
133-
134-
zig build test --summary all
135-
136-
# Kill the server
137-
kill $SERVER_PID || true
138121
139122
# TLS Upgrade
140123
- name: Test TLS Upgrade
@@ -150,9 +133,9 @@ jobs:
150133
-subj "/CN=localhost" \
151134
-addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
152135
153-
# 3. Start NATS with TLS (Port 4222)
136+
# 3. Start NATS with TLS (Port 4227)
154137
# We use the generated keys from int-test/
155-
nats-server --tls --tlscert=int-test/localhost+2.pem --tlskey=int-test/localhost+2-key.pem -DV -p 4222 &
138+
nats-server --tls --tlscert=int-test/localhost+2.pem --tlskey=int-test/localhost+2-key.pem -DV -p 4227 &
156139
SERVER_PID=$!
157140
158141
# Give the server a moment to initialize TLS
@@ -162,4 +145,22 @@ jobs:
162145
zig build tls
163146
164147
# 5. Cleanup
148+
kill $SERVER_PID || true
149+
150+
# Run Unit Tests without NATS server
151+
# mem leaks removed & ConnError handled
152+
- name: Run Unit Tests without NATS Server
153+
run: zig build test --summary all
154+
155+
# Run Unit Tests with NATS server
156+
- name: Run Unit Tests with NATS Server
157+
run: |
158+
# Start NATS in background with JetStream enabled (port 4228)
159+
nats-server -js -p 4228 &
160+
SERVER_PID=$!
161+
sleep 2
162+
163+
zig build test --summary all
164+
165+
# Kill the server
165166
kill $SERVER_PID || true

src/consumer_tests.zig

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,17 @@ const DontDeleteConsumer = !DeleteConsumer;
88
test "create/consume/delete consumer" {
99
const STREAM = "ORDERS_TEST1"; // Unique stream name to avoid parallel test conflicts
1010

11-
deleteStream(STREAM) catch |err| {
12-
if (err == error.ConnectionRefused) return error.SkipZigTest;
13-
return err;
14-
};
11+
deleteStream(STREAM) catch return error.SkipZigTest;
1512

16-
createStream(STREAM) catch |err| {
17-
if (err == error.ConnectionRefused) return error.SkipZigTest;
18-
return err;
19-
};
13+
createStream(STREAM) catch return error.SkipZigTest;
2014

2115
{ // ephemeral consumer
2216
var conf: ConsumerConfig = .{};
2317
// Match the stream's subject pattern
2418
var filter_buf: [64]u8 = undefined;
2519
const filter = try std.fmt.bufPrint(&filter_buf, "{s}.*", .{STREAM});
2620
conf.filter_subject = filter;
27-
var consumer: Consumer = try Consumer.START(std.testing.allocator, .{}, STREAM, &conf);
21+
var consumer: Consumer = Consumer.START(std.testing.allocator, .{}, STREAM, &conf) catch return error.SkipZigTest;
2822

2923
try testing.expectEqual(null, consumer.CONSUME(protocol.SECNS * 1));
3024

@@ -39,7 +33,7 @@ test "create/consume/delete consumer" {
3933
var filter_buf2: [64]u8 = undefined;
4034
const filter2 = try std.fmt.bufPrint(&filter_buf2, "{s}.*", .{STREAM});
4135
conf.filter_subject = filter2;
42-
var consumer: Consumer = try Consumer.START(std.testing.allocator, .{}, STREAM, &conf);
36+
var consumer: Consumer = Consumer.START(std.testing.allocator, .{}, STREAM, &conf) catch return error.SkipZigTest;
4337

4438
try testing.expectEqual(null, consumer.CONSUME(protocol.SECNS * 1));
4539

@@ -52,19 +46,14 @@ test "create/consume/delete consumer" {
5246
test "publish/consume ephemeral consumer" {
5347
const STREAM = "ORDERS_TEST2"; // Unique stream name to avoid parallel test conflicts
5448

55-
createStream(STREAM) catch |err| {
56-
if (err == error.ConnectionRefused) return error.SkipZigTest;
57-
return err;
58-
};
49+
createStream(STREAM) catch return error.SkipZigTest;
5950

60-
purgeStream(STREAM) catch |err| {
61-
if (err == error.ConnectionRefused) return error.SkipZigTest;
62-
return err;
63-
};
51+
purgeStream(STREAM) catch return error.SkipZigTest;
6452

6553
defer _deleteStream(STREAM);
6654

6755
var submitter: JetStream = try JetStream.CONNECT(std.testing.allocator, DefaultConnectOpts);
56+
defer submitter.DISCONNECT();
6857

6958
// ephemeral consumer
7059
var filter_buf: [64]u8 = undefined;
@@ -89,7 +78,6 @@ test "publish/consume ephemeral consumer" {
8978
try submitter.PUBLISH(subject, null, "1");
9079
try submitter.PUBLISH(subject, null, "2");
9180
try submitter.PUBLISH(subject, null, "3");
92-
submitter.DISCONNECT();
9381

9482
order = try consumer.CONSUME(protocol.SECNS * 2);
9583
try testing.expectEqual(std.mem.eql(u8, "1", order.?.letter.getPayload().?), true);
@@ -119,19 +107,14 @@ test "publish/consume ephemeral consumer" {
119107
test "publish/consume durable consumer" {
120108
const STREAM = "ORDERS_TEST3"; // Unique stream name to avoid parallel test conflicts
121109

122-
createStream(STREAM) catch |err| {
123-
if (err == error.ConnectionRefused) return error.SkipZigTest;
124-
return err;
125-
};
110+
createStream(STREAM) catch return error.SkipZigTest;
126111

127-
purgeStream(STREAM) catch |err| {
128-
if (err == error.ConnectionRefused) return error.SkipZigTest;
129-
return err;
130-
};
112+
purgeStream(STREAM) catch return error.SkipZigTest;
131113

132114
defer _deleteStream(STREAM);
133115

134116
var js: JetStream = try JetStream.CONNECT(std.testing.allocator, .{});
117+
defer js.DISCONNECT();
135118

136119
var filter_buf: [64]u8 = undefined;
137120
const filter = try std.fmt.bufPrint(&filter_buf, "{s}.*", .{STREAM});
@@ -157,7 +140,6 @@ test "publish/consume durable consumer" {
157140
try js.PUBLISH(subject, null, "1");
158141
try js.PUBLISH(subject, null, "2");
159142
try js.PUBLISH(subject, null, "3");
160-
js.DISCONNECT();
161143

162144
order = try consumer.CONSUME(protocol.SECNS * 2);
163145
try testing.expectEqual(std.mem.eql(u8, "1", order.?.letter.getPayload().?), true);
@@ -182,14 +164,8 @@ test "publish/consume durable consumer" {
182164
test "publish/consume two consumers" {
183165
const STREAM = "ORDERS_TEST4"; // Unique stream name to avoid parallel test conflicts
184166

185-
createStream(STREAM) catch |err| {
186-
if (err == error.ConnectionRefused) return error.SkipZigTest;
187-
return err;
188-
};
189-
purgeStream(STREAM) catch |err| {
190-
if (err == error.ConnectionRefused) return error.SkipZigTest;
191-
return err;
192-
};
167+
createStream(STREAM) catch return error.SkipZigTest;
168+
purgeStream(STREAM) catch return error.SkipZigTest;
193169
defer _deleteStream(STREAM);
194170

195171
var subject_received_buf: [64]u8 = undefined;
@@ -199,8 +175,8 @@ test "publish/consume two consumers" {
199175
const subject_processed = try std.fmt.bufPrint(&subject_processed_buf, "{s}.processed", .{STREAM});
200176

201177
var js: JetStream = try JetStream.CONNECT(std.testing.allocator, .{});
178+
defer js.DISCONNECT();
202179
try js.PUBLISH(subject_received, null, "1");
203-
js.DISCONNECT();
204180

205181
var conf: ConsumerConfig = .{
206182
.durable_name = "NEW",
@@ -237,14 +213,8 @@ test "publish/consume two consumers" {
237213
test "publish/consume/subscribe" {
238214
const STREAM = "ORDERS_TEST5"; // Unique stream name to avoid parallel test conflicts
239215

240-
createStream(STREAM) catch |err| {
241-
if (err == error.ConnectionRefused) return error.SkipZigTest;
242-
return err;
243-
};
244-
purgeStream(STREAM) catch |err| {
245-
if (err == error.ConnectionRefused) return error.SkipZigTest;
246-
return err;
247-
};
216+
createStream(STREAM) catch return error.SkipZigTest;
217+
purgeStream(STREAM) catch return error.SkipZigTest;
248218
defer _deleteStream(STREAM);
249219

250220
var stream_pattern_buf: [64]u8 = undefined;
@@ -265,9 +235,9 @@ test "publish/consume/subscribe" {
265235
const subject_completed = try std.fmt.bufPrint(&subject_completed_buf, "{s}.completed", .{STREAM});
266236

267237
var js: JetStream = try JetStream.CONNECT(std.testing.allocator, DefaultConnectOpts);
238+
defer js.DISCONNECT();
268239
try js.PUBLISH(subject_received, null, "1");
269240
mcount += 1;
270-
js.DISCONNECT();
271241

272242
var conf: ConsumerConfig = .{
273243
.durable_name = "NEW",

0 commit comments

Comments
 (0)