diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 62527838f..25f032015 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -418,6 +418,15 @@ def _send_request_to_controller(self, request): @staticmethod def _convert_new_topic_request(new_topic): + """ + Build the tuple required by CreateTopicsRequest from a NewTopic object. + + :param new_topic: A NewTopic instance containing name, partition count, replication factor, + replica assignments, and config entries. + :return: A tuple in the form: + (topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...], + [(config_key, config_value)...]) + """ return ( new_topic.name, new_topic.num_partitions, @@ -515,16 +524,34 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): return future.value def list_topics(self): + """ + Retrieve a list of all topic names in the cluster. + + :return: A list of topic name strings. + """ metadata = self._get_cluster_metadata(topics=None) obj = metadata.to_object() return [t['topic'] for t in obj['topics']] def describe_topics(self, topics=None): + """ + Fetch metadata for the specified topics or all topics if None. + + :param topics: (Optional) A list of topic names. If None, metadata for all + topics is retrieved. + :return: A list of dicts describing each topic (including partition info). + """ metadata = self._get_cluster_metadata(topics=topics) obj = metadata.to_object() return obj['topics'] def describe_cluster(self): + """ + Fetch cluster-wide metadata such as the list of brokers, the controller ID, + and the cluster ID. + + :return: A dict with cluster-wide metadata, excluding topic details. + """ metadata = self._get_cluster_metadata() obj = metadata.to_object() obj.pop('topics') # We have 'describe_topics' for this @@ -532,6 +559,13 @@ def describe_cluster(self): @staticmethod def _convert_describe_acls_response_to_acls(describe_response): + """ + Convert a DescribeAclsResponse into a list of ACL objects and a KafkaError. + + :param describe_response: The response object from the DescribeAclsRequest. + :return: A tuple of (list_of_acl_objects, error) where error is an instance + of KafkaError (NoError if successful). + """ version = describe_response.API_VERSION error = Errors.for_code(describe_response.error_code) @@ -617,6 +651,12 @@ def describe_acls(self, acl_filter): @staticmethod def _convert_create_acls_resource_request_v0(acl): + """ + Convert an ACL object into the CreateAclsRequest v0 format. + + :param acl: An ACL object with resource pattern and permissions. + :return: A tuple: (resource_type, resource_name, principal, host, operation, permission_type). + """ return ( acl.resource_pattern.resource_type, @@ -629,7 +669,12 @@ def _convert_create_acls_resource_request_v0(acl): @staticmethod def _convert_create_acls_resource_request_v1(acl): - + """ + Convert an ACL object into the CreateAclsRequest v1 format. + + :param acl: An ACL object with resource pattern and permissions. + :return: A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type). + """ return ( acl.resource_pattern.resource_type, acl.resource_pattern.resource_name, @@ -642,6 +687,17 @@ def _convert_create_acls_resource_request_v1(acl): @staticmethod def _convert_create_acls_response_to_acls(acls, create_response): + """ + Parse CreateAclsResponse and correlate success/failure with original ACL objects. + + :param acls: A list of ACL objects that were requested for creation. + :param create_response: The broker's CreateAclsResponse object. + :return: A dict with: + { + 'succeeded': [list of ACL objects successfully created], + 'failed': [(acl_object, KafkaError), ...] + } + """ version = create_response.API_VERSION creations_error = [] @@ -701,6 +757,12 @@ def create_acls(self, acls): @staticmethod def _convert_delete_acls_resource_request_v0(acl): + """ + Convert an ACLFilter object into the DeleteAclsRequest v0 format. + + :param acl: An ACLFilter object identifying the ACLs to be deleted. + :return: A tuple: (resource_type, resource_name, principal, host, operation, permission_type). + """ return ( acl.resource_pattern.resource_type, acl.resource_pattern.resource_name, @@ -712,6 +774,12 @@ def _convert_delete_acls_resource_request_v0(acl): @staticmethod def _convert_delete_acls_resource_request_v1(acl): + """ + Convert an ACLFilter object into the DeleteAclsRequest v1 format. + + :param acl: An ACLFilter object identifying the ACLs to be deleted. + :return: A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type). + """ return ( acl.resource_pattern.resource_type, acl.resource_pattern.resource_name, @@ -724,6 +792,14 @@ def _convert_delete_acls_resource_request_v1(acl): @staticmethod def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response): + """ + Parse the DeleteAclsResponse and map the results back to each input ACLFilter. + + :param acl_filters: A list of ACLFilter objects that were provided in the request. + :param delete_response: The response from the DeleteAclsRequest. + :return: A list of tuples of the form: + (acl_filter, [(matching_acl, KafkaError), ...], filter_level_error). + """ version = delete_response.API_VERSION filter_result_list = [] for i, filter_responses in enumerate(delete_response.filter_responses): @@ -795,6 +871,12 @@ def delete_acls(self, acl_filters): @staticmethod def _convert_describe_config_resource_request(config_resource): + """ + Convert a ConfigResource into the format required by DescribeConfigsRequest. + + :param config_resource: A ConfigResource with resource_type, name, and optional config keys. + :return: A tuple: (resource_type, resource_name, [list_of_config_keys] or None). + """ return ( config_resource.resource_type, config_resource.name, @@ -881,6 +963,12 @@ def describe_configs(self, config_resources, include_synonyms=False): @staticmethod def _convert_alter_config_resource_request(config_resource): + """ + Convert a ConfigResource into the format required by AlterConfigsRequest. + + :param config_resource: A ConfigResource with resource_type, name, and config (key, value) pairs. + :return: A tuple: (resource_type, resource_name, [(config_key, config_value), ...]). + """ return ( config_resource.resource_type, config_resource.name, @@ -930,6 +1018,13 @@ def alter_configs(self, config_resources): @staticmethod def _convert_create_partitions_request(topic_name, new_partitions): + """ + Convert a NewPartitions object into the tuple format for CreatePartitionsRequest. + + :param topic_name: The name of the existing topic. + :param new_partitions: A NewPartitions instance with total_count and new_assignments. + :return: A tuple: (topic_name, (total_count, [list_of_assignments])). + """ return ( topic_name, ( @@ -1311,6 +1406,12 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None): return results def _convert_delete_groups_response(self, response): + """ + Parse the DeleteGroupsResponse, mapping group IDs to their respective errors. + + :param response: A DeleteGroupsResponse object from the broker. + :return: A list of (group_id, KafkaError) for each deleted group. + """ if response.API_VERSION <= 1: results = [] for group_id, error_code in response.results: @@ -1322,12 +1423,12 @@ def _convert_delete_groups_response(self, response): .format(response.API_VERSION)) def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): - """Send a DeleteGroups request to a broker. - - :param group_ids: The consumer group ids of the groups which are to be deleted. - :param group_coordinator_id: The node_id of the broker which is the coordinator for - all the groups. - :return: A message future + """ + Send a DeleteGroupsRequest to the specified broker (the group coordinator). + + :param group_ids: A list of consumer group IDs to be deleted. + :param group_coordinator_id: The node_id of the broker coordinating these groups. + :return: A future representing the in-flight DeleteGroupsRequest. """ version = self._matching_api_version(DeleteGroupsRequest) if version <= 1: @@ -1339,6 +1440,12 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): return self._send_request_to_node(group_coordinator_id, request) def _wait_for_futures(self, futures): + """ + Block until all futures complete. If any fail, raise the encountered exception. + + :param futures: A list of Future objects awaiting results. + :raises: The first encountered exception if a future fails. + """ while not all(future.succeeded() for future in futures): for future in futures: self._client.poll(future=future)