Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import

from collections import defaultdict
from collections import defaultdict, namedtuple
import copy
import logging
import socket
Expand All @@ -17,9 +17,11 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.types import Array
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.version import __version__
Expand Down Expand Up @@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
if type(response_field) == Array:
described_groups = response.__dict__[response_name]
described_groups_field_schema = response_field.array_of
for described_group in described_groups:
described_group_information_list = []
is_consumer_protocol_type = False
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
is_consumer_protocol_type = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the logic of what you're trying to do here a little more? It's a bit confusing as currently written so difficult for me to figure out what you're trying to do here...

Copy link
Contributor Author

@Apurva007 Apurva007 Apr 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically its checking if the protocol_type returned by the broker is an empty string or if it is "consumer" then it needs to execute the extra decoding of the member metadata and member assignment fields in the member array.
The Kafka Java implementation for this:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2881
Hope it helps. Please let me know if you need more information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. One suggestion--for your github links, hit the y key on your keyboard and it will tie the link to a specific commit, rather than worrying about trunk changing under your feet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! I will use that next time. 😃

if type(group_information_field) == Array:
member_information_list = []
member_schema = group_information_field.array_of
for members in described_group_information:
member_information = []
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
if member_name == 'member_metadata' and is_consumer_protocol_type:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
elif member_name == 'member_assignment' and is_consumer_protocol_type:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
else:
member_information.append(member)
else:
member_info_tuple = MemberInformation._make(member_information)
member_information_list.append(member_info_tuple)
else:
described_group_information_list.append(member_information_list)
else:
described_group_information_list.append(described_group_information)
else:
if response.API_VERSION <=2:
described_group_information_list.append([])
group_description = GroupInformation._make(described_group_information_list)
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
else:
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
Expand Down
5 changes: 5 additions & 0 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
["offset", "timestamp"])

MemberInformation = namedtuple("MemberInformation",
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])

GroupInformation = namedtuple("GroupInformation",
["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"])

# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
Expand Down
19 changes: 18 additions & 1 deletion test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from test.testutil import env_kafka_version

from kafka.errors import NoError
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)

Expand Down Expand Up @@ -138,3 +138,20 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):

with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
"""
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
"""
consumer = kafka_consumer_factory(group_id='testgrp', auto_offset_reset='earliest')
consumer.poll(timeout_ms=20)
output = kafka_admin_client.describe_consumer_groups(['testgrp'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should probably have 2 consumer groups... I think that would catch the possible error above where it looks like only the value of the last consumer group is being returned.

Also, I'd probably have 2 consumers in one of the groups, and then verify that there are two member metadata for the one consumer group.

So that'd take 3 consumers altogether, split between two groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will make that change.

assert output[0].group == 'testgrp'
assert output[0].members[0].member_metadata.subscription[0] == topic