Skip to content

Commit 724bb05

Browse files
authored
Add use_existing_consumer option for NATS async consumer (#1074)
1 parent 6f5b67d commit 724bb05

File tree

3 files changed

+37
-6
lines changed

3 files changed

+37
-6
lines changed

internal/configtypes/types.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,12 @@ type NatsJetStreamConsumerConfig struct {
925925
Token string `mapstructure:"token" json:"token" toml:"token" yaml:"token"`
926926
// StreamName is the name of the NATS JetStream stream to use.
927927
StreamName string `mapstructure:"stream_name" json:"stream_name" toml:"stream_name" yaml:"stream_name"`
928+
// UseExistingConsumer when enabled tells Centrifugo to use an existing consumer with
929+
// durable_consumer_name instead of creating a new one. When on, these fields are ignored:
930+
// deliver_policy, subjects, max_ack_pending, and all other consumer-creation-related options
931+
// which may be added later (like ack wait, etc.). The existing consumer's configuration defines
932+
// all behavior, and Centrifugo will fail to start if the consumer does not already exist.
933+
UseExistingConsumer bool `mapstructure:"use_existing_consumer" default:"false" json:"use_existing_consumer" toml:"use_existing_consumer" yaml:"use_existing_consumer"`
928934
// Subjects is the list of NATS subjects (topics) to filter.
929935
Subjects []string `mapstructure:"subjects" json:"subjects" toml:"subjects" yaml:"subjects"`
930936
// DurableConsumerName sets the name of the durable JetStream consumer to use.
@@ -971,8 +977,10 @@ func (cfg NatsJetStreamConsumerConfig) Validate() error {
971977
if cfg.DurableConsumerName == "" {
972978
return errors.New("durable_consumer_name is required for consumer")
973979
}
974-
if cfg.DeliverPolicy != "new" && cfg.DeliverPolicy != "all" {
975-
return errors.New("deliver_policy must be either 'new' or 'all'")
980+
if !cfg.UseExistingConsumer {
981+
if cfg.DeliverPolicy != "new" && cfg.DeliverPolicy != "all" {
982+
return errors.New("deliver_policy must be either 'new' or 'all'")
983+
}
976984
}
977985
if cfg.PublicationDataMode.Enabled && cfg.PublicationDataMode.ChannelsHeader == "" {
978986
return errors.New("channels_header is required for publication data mode")

internal/consuming/nats_jetstream.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,17 @@ func createJetStreamConsumer(ctx context.Context, nc *nats.Conn, cfg NatsJetStre
106106
if err != nil {
107107
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
108108
}
109+
110+
if cfg.UseExistingConsumer {
111+
// If UseExistingConsumer is enabled, just look up the existing consumer by DurableConsumerName
112+
consumer, err := js.Consumer(ctx, cfg.StreamName, cfg.DurableConsumerName)
113+
if err != nil {
114+
return nil, fmt.Errorf("failed to lookup existing consumer %q: %w", cfg.DurableConsumerName, err)
115+
}
116+
return consumer, nil
117+
}
118+
119+
// By default, create or update the consumer
109120
deliverPolicy := jetstream.DeliverNewPolicy
110121
if cfg.DeliverPolicy == "all" {
111122
deliverPolicy = jetstream.DeliverAllPolicy

internal/consuming/nats_jetstream_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,21 @@ import (
1818
func TestNatsJetStreamConsumer(t *testing.T) {
1919
t.Parallel()
2020
testCases := []struct {
21-
name string
21+
name string
22+
useExistingConsumer bool
2223
}{
23-
{name: "basic"},
24+
{name: "basic", useExistingConsumer: false},
25+
{name: "use_existing_consumer", useExistingConsumer: true},
2426
}
2527

2628
for _, tc := range testCases {
2729
t.Run(tc.name, func(t *testing.T) {
28-
testNatsJetStreamConsumer(t)
30+
testNatsJetStreamConsumer(t, tc.useExistingConsumer)
2931
})
3032
}
3133
}
3234

33-
func testNatsJetStreamConsumer(t *testing.T) {
35+
func testNatsJetStreamConsumer(t *testing.T, useExistingConsumer bool) {
3436
url := "nats://localhost:4222"
3537
subject := "test.subject." + uuid.NewString()
3638
durableConsumerName := "test-durable-" + uuid.NewString()
@@ -50,6 +52,15 @@ func testNatsJetStreamConsumer(t *testing.T) {
5052
})
5153
require.NoError(t, err)
5254

55+
// If useExistingConsumer is true, create the consumer manually first
56+
if useExistingConsumer {
57+
_, err = js.AddConsumer(streamName, &nats.ConsumerConfig{
58+
Durable: durableConsumerName,
59+
Name: durableConsumerName,
60+
})
61+
require.NoError(t, err)
62+
}
63+
5364
var receivedNum atomic.Int64
5465

5566
createConsumer := func(name string, done chan struct{}) *NatsJetStreamConsumer {
@@ -68,6 +79,7 @@ func testNatsJetStreamConsumer(t *testing.T) {
6879
DurableConsumerName: durableConsumerName, // shared durable name
6980
DeliverPolicy: "new",
7081
MethodHeader: "test-method",
82+
UseExistingConsumer: useExistingConsumer,
7183
}
7284

7385
consumer, err := NewNatsJetStreamConsumer(cfg, dispatcher, testCommon(prometheus.NewRegistry()))

0 commit comments

Comments
 (0)