Skip to content

Added missing docstrings in admin/client.py #2487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 114 additions & 7 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,15 @@ def _send_request_to_controller(self, request):

@staticmethod
def _convert_new_topic_request(new_topic):
"""
Build the tuple required by CreateTopicsRequest from a NewTopic object.

:param new_topic: A NewTopic instance containing name, partition count, replication factor,
replica assignments, and config entries.
:return: A tuple in the form:
(topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...],
[(config_key, config_value)...])
"""
return (
new_topic.name,
new_topic.num_partitions,
Expand Down Expand Up @@ -515,23 +524,48 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
return future.value

def list_topics(self):
"""
Retrieve a list of all topic names in the cluster.

:return: A list of topic name strings.
"""
metadata = self._get_cluster_metadata(topics=None)
obj = metadata.to_object()
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
"""
Fetch metadata for the specified topics or all topics if None.

:param topics: (Optional) A list of topic names. If None, metadata for all
topics is retrieved.
:return: A list of dicts describing each topic (including partition info).
"""
metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']

def describe_cluster(self):
"""
Fetch cluster-wide metadata such as the list of brokers, the controller ID,
and the cluster ID.

:return: A dict with cluster-wide metadata, excluding topic details.
"""
metadata = self._get_cluster_metadata()
obj = metadata.to_object()
obj.pop('topics') # We have 'describe_topics' for this
return obj

@staticmethod
def _convert_describe_acls_response_to_acls(describe_response):
"""
Convert a DescribeAclsResponse into a list of ACL objects and a KafkaError.

:param describe_response: The response object from the DescribeAclsRequest.
:return: A tuple of (list_of_acl_objects, error) where error is an instance
of KafkaError (NoError if successful).
"""
version = describe_response.API_VERSION

error = Errors.for_code(describe_response.error_code)
Expand Down Expand Up @@ -617,6 +651,12 @@ def describe_acls(self, acl_filter):

@staticmethod
def _convert_create_acls_resource_request_v0(acl):
"""
Convert an ACL object into the CreateAclsRequest v0 format.

:param acl: An ACL object with resource pattern and permissions.
:return: A tuple: (resource_type, resource_name, principal, host, operation, permission_type).
"""

return (
acl.resource_pattern.resource_type,
Expand All @@ -629,7 +669,12 @@ def _convert_create_acls_resource_request_v0(acl):

@staticmethod
def _convert_create_acls_resource_request_v1(acl):

"""
Convert an ACL object into the CreateAclsRequest v1 format.

:param acl: An ACL object with resource pattern and permissions.
:return: A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type).
"""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
Expand All @@ -642,6 +687,17 @@ def _convert_create_acls_resource_request_v1(acl):

@staticmethod
def _convert_create_acls_response_to_acls(acls, create_response):
"""
Parse CreateAclsResponse and correlate success/failure with original ACL objects.

:param acls: A list of ACL objects that were requested for creation.
:param create_response: The broker's CreateAclsResponse object.
:return: A dict with:
{
'succeeded': [list of ACL objects successfully created],
'failed': [(acl_object, KafkaError), ...]
}
"""
version = create_response.API_VERSION

creations_error = []
Expand Down Expand Up @@ -701,6 +757,12 @@ def create_acls(self, acls):

@staticmethod
def _convert_delete_acls_resource_request_v0(acl):
"""
Convert an ACLFilter object into the DeleteAclsRequest v0 format.

:param acl: An ACLFilter object identifying the ACLs to be deleted.
:return: A tuple: (resource_type, resource_name, principal, host, operation, permission_type).
"""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
Expand All @@ -712,6 +774,12 @@ def _convert_delete_acls_resource_request_v0(acl):

@staticmethod
def _convert_delete_acls_resource_request_v1(acl):
"""
Convert an ACLFilter object into the DeleteAclsRequest v1 format.

:param acl: An ACLFilter object identifying the ACLs to be deleted.
:return: A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type).
"""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
Expand All @@ -724,6 +792,14 @@ def _convert_delete_acls_resource_request_v1(acl):

@staticmethod
def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response):
"""
Parse the DeleteAclsResponse and map the results back to each input ACLFilter.

:param acl_filters: A list of ACLFilter objects that were provided in the request.
:param delete_response: The response from the DeleteAclsRequest.
:return: A list of tuples of the form:
(acl_filter, [(matching_acl, KafkaError), ...], filter_level_error).
"""
version = delete_response.API_VERSION
filter_result_list = []
for i, filter_responses in enumerate(delete_response.filter_responses):
Expand Down Expand Up @@ -795,6 +871,12 @@ def delete_acls(self, acl_filters):

@staticmethod
def _convert_describe_config_resource_request(config_resource):
"""
Convert a ConfigResource into the format required by DescribeConfigsRequest.

:param config_resource: A ConfigResource with resource_type, name, and optional config keys.
:return: A tuple: (resource_type, resource_name, [list_of_config_keys] or None).
"""
return (
config_resource.resource_type,
config_resource.name,
Expand Down Expand Up @@ -881,6 +963,12 @@ def describe_configs(self, config_resources, include_synonyms=False):

@staticmethod
def _convert_alter_config_resource_request(config_resource):
"""
Convert a ConfigResource into the format required by AlterConfigsRequest.

:param config_resource: A ConfigResource with resource_type, name, and config (key, value) pairs.
:return: A tuple: (resource_type, resource_name, [(config_key, config_value), ...]).
"""
return (
config_resource.resource_type,
config_resource.name,
Expand Down Expand Up @@ -930,6 +1018,13 @@ def alter_configs(self, config_resources):

@staticmethod
def _convert_create_partitions_request(topic_name, new_partitions):
"""
Convert a NewPartitions object into the tuple format for CreatePartitionsRequest.

:param topic_name: The name of the existing topic.
:param new_partitions: A NewPartitions instance with total_count and new_assignments.
:return: A tuple: (topic_name, (total_count, [list_of_assignments])).
"""
return (
topic_name,
(
Expand Down Expand Up @@ -1311,6 +1406,12 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
return results

def _convert_delete_groups_response(self, response):
"""
Parse the DeleteGroupsResponse, mapping group IDs to their respective errors.

:param response: A DeleteGroupsResponse object from the broker.
:return: A list of (group_id, KafkaError) for each deleted group.
"""
if response.API_VERSION <= 1:
results = []
for group_id, error_code in response.results:
Expand All @@ -1322,12 +1423,12 @@ def _convert_delete_groups_response(self, response):
.format(response.API_VERSION))

def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
"""Send a DeleteGroups request to a broker.

:param group_ids: The consumer group ids of the groups which are to be deleted.
:param group_coordinator_id: The node_id of the broker which is the coordinator for
all the groups.
:return: A message future
"""
Send a DeleteGroupsRequest to the specified broker (the group coordinator).

:param group_ids: A list of consumer group IDs to be deleted.
:param group_coordinator_id: The node_id of the broker coordinating these groups.
:return: A future representing the in-flight DeleteGroupsRequest.
"""
version = self._matching_api_version(DeleteGroupsRequest)
if version <= 1:
Expand All @@ -1339,6 +1440,12 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
return self._send_request_to_node(group_coordinator_id, request)

def _wait_for_futures(self, futures):
"""
Block until all futures complete. If any fail, raise the encountered exception.

:param futures: A list of Future objects awaiting results.
:raises: The first encountered exception if a future fails.
"""
while not all(future.succeeded() for future in futures):
for future in futures:
self._client.poll(future=future)
Expand Down