-
Notifications
You must be signed in to change notification settings - Fork 34
Description
FISSION:
fission version
client:
fission/core:
BuildDate: "2025-01-25T15:57:43Z"
GitCommit: 2853498a
Version: v1.21.0
server:
fission/core:
BuildDate: "2025-01-25T15:57:43Z"
GitCommit: 2853498a
Version: v1.21.0
KUBERNETES:
minikube version
minikube version: v1.35.0
Describe the bug
Using this Keda-NATS-JetStream MQTrigger:
fission mqt create \
--name flow-origin-trigger \
--function flow-origin \
--mqtype nats-jetstream \
--mqtkind keda \
--topic netflow-raw \
--resptopic netflow-labeled \
--errortopic netflow-error \
--contenttype='application/protobuf' \
--metadata stream=netflow-stream \
--metadata consumer=flow-origin-consumer \
--metadata natsServerMonitoringEndpoint=nats.nats.svc.cluster.local:8222 \
--metadata natsServer=nats://nats.nats.svc.cluster.local:4222 \
--metadata lagThreshold=1 \
--metadata account='$G'
And this push-based Consumer:
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: flow-origin-consumer
namespace: nats
spec:
streamName: netflow-stream
durableName: flow-origin-consumer
deliverPolicy: last
ackPolicy: explicit
ackWait: 30s
maxDeliver: 3
deliverSubject: flow-origin-delivery
filterSubject: netflow-raw
I expect Keda to scale my triggers as lagThreshold increases beyond the specified level (set to 1, for testing). Multiple triggers are indeed created, but only one is Running, the rest are in a failed state.
To Reproduce
Use config as above
Expected result
I expect multiple triggers to be created and to each receive the same data from the consumer
Actual result
All but one trigger fail with this error:
kubectl logs flow-origin-trigger-7bb64c9754-6zglx
{"level":"fatal","ts":1744215779.0369778,"caller":"nats-jetstream-http-connector/main.go:72","msg":"error occurred while parsing metadata","error":"consumer is already bound to a subscription","stacktrace":"main.main\n\tgithub.com/fission/keda-connectors/nats-jetstream-http-connector/main.go:72\nruntime.main\n\truntime/proc.go:272"}
By default, a JetStream server manages the state of the consumer and expects only one active subscription to be actively receiving messages for a given push consumer at any point. I think that's why only one trigger is running and the rest fail with "consumer is already bound to a subscription".
But NATS JetStream allows one to create a Queue Groups so that a push-based consumer can distribute messages among subscribers in a round-robin fashion. This prevents multiple independent subscriptions from trying to bind to the same consumer. But when I apply the documented means of creating a Queue Group (specifying a 'spec.deliverGroup' in the consumer), my triggers give me this error:
{"level":"fatal","ts":1744214822.7898843,"caller":"nats-jetstream-http-connector/main.go:72","msg":"error occurred while parsing metadata","error":"cannot create a subscription for a consumer with a deliver group \"flow-origin-group\"","stacktrace":"main.main\n\tgithub.com/fission/keda-connectors/nats-jetstream-http-connector/main.go:72\nruntime.main\n\truntime/proc.go:272"}
Additional context
Not only does the fission keda-nats-http-connector not allow multiple subscribers to the Consumer, it also doesn't allow for a round-robin based group of subscribers.
I had Grok look at main.go, and it said:
The current main.go uses Subscribe instead of QueueSubscribe, so it doesn’t join the deliverGroup. Each pod tries to own the consumer exclusively, crashing with "cannot create a subscription for a consumer with a deliver group".
It might be that main.go needs to be written such that it allows mqtriggers to be part of a deliverGroup