Skip to content

Two kafka sources with different consumer group names #198

@butorine

Description

@butorine

We have a service that is reading from two kafka topics on the same kafka cluster using two different consumer groups. This is needed because topics are encoded differently. It's configured like so:

    const kafkaJsonConfig: IKafkaBrokerConfiguration & IKafkaSubscriptionConfiguration = {
        broker: "commonBroker",
        group: "consumerGroup_json",
        topics: "topic1",
        encoder: ProtoJsonMessageEncoder(),
    };

    const kafkaProtoConfig: IKafkaBrokerConfiguration & IKafkaSubscriptionConfiguration = {
        broker: "commonBroker",
        group: "consumerGroup_proto",
        topics: "topic2",
        encoder: ProtoMessageEncoder(),
    };
    ...
    .input()
        .add(kafkaSource(kafkaJsonConfig))
        .add(kafkaSource(kafkaProtoConfig))
        .done()
   ...

This works well most of the time, but when kafka cluster goes down and then recovers, this service looses connection to the cluster and never recovers like other more traditional services. The errors we see are like this:

{"level":"ERROR","timestamp":"2020-09-27T13:12:11.194Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 2)","broker":"[broker]","clientId":"[client ID]","error":"The coordinator is loading and hence can't process requests for this group","correlationId":0,"size":24}
{"level":"ERROR","timestamp":"2020-09-27T13:12:11.194Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSProtocolError: The coordinator is loading and hence can't process requests for this group","groupId":"consumerGroup_json","stack":"KafkaJSProtocolError: The coordinator is loading and hence can't process requests for this group
at createErrorFromCode (/usr/src/node_modules/kafkajs/src/protocol/error.js:537:10)
at Object.parse (/usr/src/node_modules/kafkajs/src/protocol/requests/joinGroup/v0/response.js:37:11)
at Connection.send (/usr/src/node_modules/kafkajs/src/network/connection.js:311:35)
at runMicrotasks (<anonymous>)
at processTicksAndRejections (internal/process/task_queues.js:93:5)
at async Broker.joinGroup (/usr/src/node_modules/kafkajs/src/broker/index.js:351:12)
at async ConsumerGroup.join (/usr/src/node_modules/kafkajs/src/consumer/consumerGroup.js:93:23)
at async /usr/src/node_modules/kafkajs/src/consumer/runner.js:51:9
at async Runner.start (/usr/src/node_modules/kafkajs/src/consumer/runner.js:105:7)
at async start (/usr/src/node_modules/kafkajs/src/consumer/index.js:230:7)"}

Metadata

Metadata

Assignees

No one assigned

    Labels

    M-kafkaThis issue is related to the kafka moduleP2Priority 2enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions