diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c46bc7f3a..ad209f70d 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -15,14 +15,13 @@ from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol import kafka.errors as Errors from kafka.errors import ( - IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, + IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, UnknownTopicOrPartitionError, UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest, DescribeLogDirsRequest -) + DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest) from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.metadata import MetadataRequest @@ -1115,8 +1114,118 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal .format(version)) return self._send_request_to_controller(request) - # delete records protocol not yet implemented - # Note: send the request to the partition leaders + def _get_leader_for_partitions(self, partitions, timeout_ms=None): + """Finds ID of the leader node for every given topic partition. + + Will raise UnknownTopicOrPartitionError if for some partition no leader can be found. + + :param partitions: ``[TopicPartition]``: partitions for which to find leaders. + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + config. + + :return: Dictionary with ``{leader_id -> {partitions}}`` + """ + timeout_ms = self._validate_timeout(timeout_ms) + + partitions = set(partitions) + topics = set(tp.topic for tp in partitions) + + response = self._get_cluster_metadata(topics=topics).to_object() + + leader2partitions = defaultdict(list) + valid_partitions = set() + for topic in response.get("topics", ()): + for partition in topic.get("partitions", ()): + t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) + if t2p in partitions: + leader2partitions[partition["leader"]].append(t2p) + valid_partitions.add(t2p) + + if len(partitions) != len(valid_partitions): + unknown = set(partitions) - valid_partitions + raise UnknownTopicOrPartitionError( + "The following partitions are not known: %s" + % ", ".join(str(x) for x in unknown) + ) + + return leader2partitions + + def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + config. + :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to + this node. No check is performed verifying that this is indeed the leader for all + listed partitions: use with caution. + + :return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker. + See DeleteRecordsResponse for possible fields. error_code for all partitions is + guaranteed to be zero, otherwise an exception is raised. + """ + timeout_ms = self._validate_timeout(timeout_ms) + responses = [] + version = self._client.api_version(DeleteRecordsRequest, max_version=0) + if version is None: + raise IncompatibleBrokerVersion("Broker does not support DeleteGroupsRequest") + + # We want to make as few requests as possible + # If a single node serves as a partition leader for multiple partitions (and/or + # topics), we can send all of those in a single request. + # For that we store {leader -> {partitions for leader}}, and do 1 request per leader + if partition_leader_id is None: + leader2partitions = self._get_leader_for_partitions( + set(records_to_delete), timeout_ms + ) + else: + leader2partitions = {partition_leader_id: set(records_to_delete)} + + for leader, partitions in leader2partitions.items(): + topic2partitions = defaultdict(list) + for partition in partitions: + topic2partitions[partition.topic].append(partition) + + request = DeleteRecordsRequest[version]( + topics=[ + (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) + for topic, partitions in topic2partitions.items() + ], + timeout_ms=timeout_ms + ) + future = self._send_request_to_node(leader, request) + self._wait_for_futures([future]) + + responses.append(future.value.to_object()) + + partition2result = {} + partition2error = {} + for response in responses: + for topic in response["topics"]: + for partition in topic["partitions"]: + tp = TopicPartition(topic["name"], partition["partition_index"]) + partition2result[tp] = partition + if partition["error_code"] != 0: + partition2error[tp] = partition["error_code"] + + if partition2error: + if len(partition2error) == 1: + key, error = next(iter(partition2error.items())) + raise Errors.for_code(error)( + "Error deleting records from topic %s partition %s" % (key.topic, key.partition) + ) + else: + raise Errors.BrokerResponseError( + "The following errors occured when trying to delete records: " + + ", ".join( + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) + for partition, error in partition2error.items() + ) + ) + + return partition2result # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 058325cb1..63604e576 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -179,6 +179,38 @@ class DeleteTopicsRequest_v3(Request): ] +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('low_watermark', Int64), + ('error_code', Int16))))), + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('offset', Int64))))), + ('timeout_ms', Int32) + ) + + +DeleteRecordsResponse = [DeleteRecordsResponse_v0] +DeleteRecordsRequest = [DeleteRecordsRequest_v0] + + class ListGroupsResponse_v0(Response): API_KEY = 16 API_VERSION = 0 diff --git a/test/conftest.py b/test/conftest.py index bf1fa6687..ddd491517 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -73,11 +73,11 @@ def kafka_consumer_factory(kafka_broker, topic, request): """Return a KafkaConsumer factory fixture""" _consumer = [None] - def factory(**kafka_consumer_params): + def factory(topics=(topic,), **kafka_consumer_params): params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) params.setdefault('auto_offset_reset', 'earliest') - _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params)) return _consumer[0] yield factory diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2f6b76598..83b6ccaf2 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,3 +1,4 @@ +from kafka.structs import TopicPartition import pytest from logging import info @@ -7,7 +8,9 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) -from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError) +from kafka.errors import ( + BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -315,3 +318,71 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa assert group1 not in consumergroups assert group2 in consumergroups assert group3 not in consumergroups + +@pytest.fixture(name="topic2") +def _topic2(kafka_broker, request): + """Same as `topic` fixture, but a different name if you need to topics.""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") +def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2): + t0p0 = TopicPartition(topic, 0) + t0p1 = TopicPartition(topic, 1) + t0p2 = TopicPartition(topic, 2) + t1p0 = TopicPartition(topic2, 0) + t1p1 = TopicPartition(topic2, 1) + t1p2 = TopicPartition(topic2, 2) + + partitions = (t0p0, t0p1, t0p2, t1p0, t1p1, t1p2) + + for p in partitions: + send_messages(range(0, 100), partition=p.partition, topic=p.topic) + + consumer1 = kafka_consumer_factory(group_id=None, topics=()) + consumer1.assign(partitions) + for _ in range(600): + next(consumer1) + + result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition} + assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition} + assert result[t1p0] == {"low_watermark": 40, "error_code": 0, "partition_index": t1p0.partition} + assert result[t1p2] == {"low_watermark": 30, "error_code": 0, "partition_index": t1p2.partition} + + consumer2 = kafka_consumer_factory(group_id=None, topics=()) + consumer2.assign(partitions) + all_messages = consumer2.poll(max_records=600, timeout_ms=2000) + assert sum(len(x) for x in all_messages.values()) == 600 - 100 - 50 - 40 - 30 + assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure there are no delayed messages + + assert not all_messages.get(t0p0, []) + assert [r.offset for r in all_messages[t0p1]] == list(range(50, 100)) + assert [r.offset for r in all_messages[t0p2]] == list(range(100)) + + assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100)) + assert [r.offset for r in all_messages[t1p1]] == list(range(100)) + assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100)) + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") +def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): + sleep(1) # sometimes the topic is not created yet...? + p0 = TopicPartition(topic, 0) + p1 = TopicPartition(topic, 1) + p2 = TopicPartition(topic, 2) + # verify that topic has been created + send_messages(range(0, 1), partition=p2.partition, topic=p2.topic) + + with pytest.raises(UnknownTopicOrPartitionError): + kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1}) + with pytest.raises(UnknownTopicOrPartitionError): + kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1}) + with pytest.raises(OffsetOutOfRangeError): + kafka_admin_client.delete_records({p0: 1000}) + with pytest.raises(BrokerResponseError): + kafka_admin_client.delete_records({p0: 1000, p1: 1000}) + + +