From d547abc9a10bb8eb0c0a6642ddffab2c3b34aca8 Mon Sep 17 00:00:00 2001 From: Ruslan Date: Tue, 2 Jun 2020 00:36:47 +0600 Subject: [PATCH 01/11] feat AdminClient: support delete_records Authored-by: Ruslan Cherrypicked-by: Arsen Kitov --- kafka/admin/client.py | 58 ++++++++++++++++++++++++++++++++++++++++- kafka/protocol/admin.py | 33 ++++++++++++++++++++++- 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c46bc7f3a..5b2d78170 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -21,7 +21,7 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest, DescribeLogDirsRequest + DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest) ) from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.find_coordinator import FindCoordinatorRequest @@ -1115,6 +1115,62 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal .format(version)) return self._send_request_to_controller(request) + def delete_records(self, records_to_delete, timeout_ms=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + :return: List of DeleteRecordsResponse + """ + timeout_ms = self._validate_timeout(timeout_ms) + version = self._matching_api_version(MetadataRequest) + + topics = set() + + for topic2partition in records_to_delete: + topics.add(topic2partition.topic) + + request = MetadataRequest[version]( + topics=list(topics), + allow_auto_topic_creation=False + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + response = future.value + + version = self._matching_api_version(DeleteRecordsRequest) + + PARTITIONS_INFO = 3 + NAME = 1 + PARTITION_INDEX = 1 + LEADER = 2 + + partition2leader = dict() + + for topic in response.topics: + for partition in topic[PARTITIONS_INFO]: + t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) + partition2leader[t2p] = partition[LEADER] + + responses = [] + + for topic2partition in records_to_delete: + request = DeleteRecordsRequest[version]( + topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])], + timeout_ms=timeout_ms + ) + # Sending separate request for each partition leader + future = self._send_request_to_node(partition2leader[topic2partition], request) + self._wait_for_futures([future]) + + response = future.value + responses.append(response) + + return responses + # delete records protocol not yet implemented # Note: send the request to the partition leaders diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 058325cb1..d7c50ad00 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -179,6 +179,38 @@ class DeleteTopicsRequest_v3(Request): ] +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('low_watermark', Int64), + ('error_code', Int16))))), + ('throttle_time_ms', Int32) + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('offset', Int64))))), + ('timeout_ms', Int32) + ) + + +DeleteRecordsResponse = [DeleteRecordsResponse_v0] +DeleteRecordsRequest = [DeleteRecordsRequest_v0] + + class ListGroupsResponse_v0(Response): API_KEY = 16 API_VERSION = 0 @@ -829,7 +861,6 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] - class DeleteGroupsResponse_v0(Response): API_KEY = 42 API_VERSION = 0 From 4285d905d965d557a2a0cf4d319def92cf59e4aa Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 13:57:01 +0100 Subject: [PATCH 02/11] address review comments --- kafka/admin/client.py | 3 --- kafka/protocol/admin.py | 1 + test/conftest.py | 4 ++-- test/test_admin_integration.py | 25 +++++++++++++++++++++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5b2d78170..7c1da595b 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1171,9 +1171,6 @@ def delete_records(self, records_to_delete, timeout_ms=None): return responses - # delete records protocol not yet implemented - # Note: send the request to the partition leaders - # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index d7c50ad00..86ad02f16 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -861,6 +861,7 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + class DeleteGroupsResponse_v0(Response): API_KEY = 42 API_VERSION = 0 diff --git a/test/conftest.py b/test/conftest.py index bf1fa6687..ddd491517 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -73,11 +73,11 @@ def kafka_consumer_factory(kafka_broker, topic, request): """Return a KafkaConsumer factory fixture""" _consumer = [None] - def factory(**kafka_consumer_params): + def factory(topics=(topic,), **kafka_consumer_params): params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) params.setdefault('auto_offset_reset', 'earliest') - _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params)) return _consumer[0] yield factory diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2f6b76598..b93637ffe 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,3 +1,4 @@ +from kafka.structs import TopicPartition import pytest from logging import info @@ -315,3 +316,27 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa assert group1 not in consumergroups assert group2 in consumergroups assert group3 not in consumergroups + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") +def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic): + p0 = TopicPartition(topic, 0) + p1 = TopicPartition(topic, 1) + p2 = TopicPartition(topic, 2) + + for p in (p0, p1, p2): + send_messages(range(0, 100), partition=p.partition, topic=p.topic) + + consumer1 = kafka_consumer_factory(group_id=None, topics=()) + consumer1.assign([p0, p1, p2]) + for _ in range(300): + next(consumer1) + + kafka_admin_client.delete_records({p0: -1, p1: 50}) + + consumer2 = kafka_consumer_factory(group_id=None, topics=()) + consumer2.assign([p0, p1, p2]) + all_messages = consumer2.poll(max_records=300, timeout_ms=1000) + assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure we read everything + assert not all_messages.get(p0, []) + assert [r.offset for r in all_messages[p1]] == list(range(50, 100)) + assert [r.offset for r in all_messages[p2]] == list(range(100)) From 9629b676511a297e872bd786f778dbd7d93156cc Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 16:58:24 +0100 Subject: [PATCH 03/11] fix: do not send unnecessary duplicate requests --- kafka/admin/client.py | 24 +++++++++++------- test/test_admin_integration.py | 46 +++++++++++++++++++++++----------- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 7c1da595b..f2ce4d8f8 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1128,8 +1128,8 @@ def delete_records(self, records_to_delete, timeout_ms=None): topics = set() - for topic2partition in records_to_delete: - topics.add(topic2partition.topic) + for topic_partition in records_to_delete: + topics.add(topic_partition.topic) request = MetadataRequest[version]( topics=list(topics), @@ -1148,22 +1148,28 @@ def delete_records(self, records_to_delete, timeout_ms=None): PARTITION_INDEX = 1 LEADER = 2 - partition2leader = dict() - + # We want to make as few requests as possible + # If a single node serves as a partition leader for multiple partitions (and/or + # topics), we can send all of those in a single request. + # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} + leader2topic2partitions = defaultdict(lambda: defaultdict(list)) for topic in response.topics: for partition in topic[PARTITIONS_INFO]: t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) - partition2leader[t2p] = partition[LEADER] + if t2p in records_to_delete: + leader2topic2partitions[partition[LEADER]][t2p.topic].append(t2p) responses = [] - for topic2partition in records_to_delete: + for leader, topic2partitions in leader2topic2partitions.items(): request = DeleteRecordsRequest[version]( - topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])], + topics=[ + (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) + for topic, partitions in topic2partitions.items() + ], timeout_ms=timeout_ms ) - # Sending separate request for each partition leader - future = self._send_request_to_node(partition2leader[topic2partition], request) + future = self._send_request_to_node(leader, request) self._wait_for_futures([future]) response = future.value diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index b93637ffe..2999514c2 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -317,26 +317,44 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa assert group2 in consumergroups assert group3 not in consumergroups +@pytest.fixture(name="topic2") +def _topic2(kafka_broker, request): + """Same as `topic` fixture, but a different name if you need to topics.""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") -def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic): - p0 = TopicPartition(topic, 0) - p1 = TopicPartition(topic, 1) - p2 = TopicPartition(topic, 2) +def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2): + t0p0 = TopicPartition(topic, 0) + t0p1 = TopicPartition(topic, 1) + t0p2 = TopicPartition(topic, 2) + t1p0 = TopicPartition(topic2, 0) + t1p1 = TopicPartition(topic2, 1) + t1p2 = TopicPartition(topic2, 2) + + partitions = (t0p0, t0p1, t0p2, t1p0, t1p1, t1p2) - for p in (p0, p1, p2): + for p in partitions: send_messages(range(0, 100), partition=p.partition, topic=p.topic) consumer1 = kafka_consumer_factory(group_id=None, topics=()) - consumer1.assign([p0, p1, p2]) - for _ in range(300): + consumer1.assign(partitions) + for _ in range(600): next(consumer1) - kafka_admin_client.delete_records({p0: -1, p1: 50}) + kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) consumer2 = kafka_consumer_factory(group_id=None, topics=()) - consumer2.assign([p0, p1, p2]) - all_messages = consumer2.poll(max_records=300, timeout_ms=1000) - assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure we read everything - assert not all_messages.get(p0, []) - assert [r.offset for r in all_messages[p1]] == list(range(50, 100)) - assert [r.offset for r in all_messages[p2]] == list(range(100)) + consumer2.assign(partitions) + all_messages = consumer2.poll(max_records=600, timeout_ms=2000) + assert sum(len(x) for x in all_messages.values()) == 600 - 100 - 50 - 40 - 30 + assert not consumer2.poll(max_records=1, timeout_ms=1000) # ensure there are no delayed messages + + assert not all_messages.get(t0p0, []) + assert [r.offset for r in all_messages[t0p1]] == list(range(50, 100)) + assert [r.offset for r in all_messages[t0p2]] == list(range(100)) + + assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100)) + assert [r.offset for r in all_messages[t1p1]] == list(range(100)) + assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100)) From ae4187fcb6dbee94d15133d9f8bc5f796487ccd7 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 18:29:30 +0100 Subject: [PATCH 04/11] tests & cleanup --- kafka/admin/client.py | 84 +++++++++++++++++++++++++++++----- test/test_admin_integration.py | 20 +++++++- 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index f2ce4d8f8..a0ac60bba 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -15,7 +15,7 @@ from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol import kafka.errors as Errors from kafka.errors import ( - IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, + IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, UnknownTopicOrPartitionError, UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( @@ -1115,20 +1115,24 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal .format(version)) return self._send_request_to_controller(request) - def delete_records(self, records_to_delete, timeout_ms=None): - """Delete records whose offset is smaller than the given offset of the corresponding partition. + def _get_leader_for_partitions(self, partitions, timeout_ms=None): + """Finds ID of the leader node for every given topic partition. - :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the - given partitions. + Will raise UnknownTopicOrPartitionError if for some partition no leader can be found. - :return: List of DeleteRecordsResponse + :param partitions: ``[TopicPartition]``: partitions for which to find leaders. + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + config. + + :return: Dictionary with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) version = self._matching_api_version(MetadataRequest) + partitions = set(partitions) topics = set() - for topic_partition in records_to_delete: + for topic_partition in partitions: topics.add(topic_partition.topic) request = MetadataRequest[version]( @@ -1152,21 +1156,60 @@ def delete_records(self, records_to_delete, timeout_ms=None): # If a single node serves as a partition leader for multiple partitions (and/or # topics), we can send all of those in a single request. # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} - leader2topic2partitions = defaultdict(lambda: defaultdict(list)) + leader2partitions = defaultdict(list) + valid_partitions = set() for topic in response.topics: for partition in topic[PARTITIONS_INFO]: t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) - if t2p in records_to_delete: - leader2topic2partitions[partition[LEADER]][t2p.topic].append(t2p) + if t2p in partitions: + leader2partitions[partition[LEADER]].append(t2p) + valid_partitions.add(t2p) + + if len(partitions) != len(valid_partitions): + unknown = set(partitions) - valid_partitions + raise UnknownTopicOrPartitionError( + "The following partitions are not known: %s" % ", " + .join(str(x) for x in unknown) + ) + + return leader2partitions + + def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + Note: if partition + + :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + config. + :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to + this node. No check is performed verifying that this is indeed the leader for all + listed partitions, use with caution. + :return: List of DeleteRecordsResponse + """ + timeout_ms = self._validate_timeout(timeout_ms) responses = [] + version = self._matching_api_version(DeleteRecordsRequest) + + if partition_leader_id is None: + leader2partitions = self._get_leader_for_partitions( + set(records_to_delete), timeout_ms + ) + else: + leader2partitions = {partition_leader_id: set(records_to_delete)} + + for leader, partitions in leader2partitions.items(): + topic2partitions = defaultdict(list) + for partition in partitions: + topic2partitions[partition.topic].append(partition) - for leader, topic2partitions in leader2topic2partitions.items(): request = DeleteRecordsRequest[version]( topics=[ (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) for topic, partitions in topic2partitions.items() - ], + ], timeout_ms=timeout_ms ) future = self._send_request_to_node(leader, request) @@ -1175,6 +1218,23 @@ def delete_records(self, records_to_delete, timeout_ms=None): response = future.value responses.append(response) + partition2error = {} + for response in responses: + for topic in getattr(response, 'topics', ()): + for partition in getattr(topic, 'partitions', ()): + if getattr(partition, 'error_code', 0) != 0: + tp = TopicPartition(topic, partition['partition_index']) + partition2error[tp] =partition['error_code'] + + if partition2error: + if len(partition2error) == 1: + raise Errors.for_code(partition2error[0])() + else: + raise Errors.BrokerResponseError( + "The following errors occured when trying to delete records: " + ", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items()) + ) + return responses # create delegation token protocol not yet implemented diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2999514c2..75231b933 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -8,7 +8,9 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) -from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError) +from kafka.errors import ( + KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + GroupIdNotFoundError, UnknownTopicOrPartitionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -358,3 +360,19 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message assert [r.offset for r in all_messages[t1p0]] == list(range(40, 100)) assert [r.offset for r in all_messages[t1p1]] == list(range(100)) assert [r.offset for r in all_messages[t1p2]] == list(range(30, 100)) + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") +def test_delete_records_with_errors(kafka_admin_client, topic): + sleep(1) # sometimes the topic is not created yet...? + p0 = TopicPartition(topic, 0) + # verify that topic has been created + kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000) + + with pytest.raises(UnknownTopicOrPartitionError): + kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1}) + with pytest.raises(UnknownTopicOrPartitionError): + kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1}) + + + From 2996c6a2bb9b47e4cdd3735b136ebd6eab369898 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Sun, 19 Mar 2023 18:42:07 +0100 Subject: [PATCH 05/11] fmt --- kafka/admin/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index a0ac60bba..20497d98b 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1224,7 +1224,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id for partition in getattr(topic, 'partitions', ()): if getattr(partition, 'error_code', 0) != 0: tp = TopicPartition(topic, partition['partition_index']) - partition2error[tp] =partition['error_code'] + partition2error[tp] = partition['error_code'] if partition2error: if len(partition2error) == 1: From ebc582eaa26871f8edc9e7e32e37dbd54c74e388 Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Mon, 20 Mar 2023 20:19:29 +0100 Subject: [PATCH 06/11] typo: typo in docs --- kafka/admin/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 20497d98b..6f9e78c25 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1177,8 +1177,6 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): """Delete records whose offset is smaller than the given offset of the corresponding partition. - Note: if partition - :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the given partitions. :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from From b08c9d975d488b6834497cbf60ab5434c4ae27be Mon Sep 17 00:00:00 2001 From: Arsen Kitov Date: Tue, 21 Mar 2023 08:21:37 +0100 Subject: [PATCH 07/11] better response structure, better tests, reuse more code, fix protocol --- kafka/admin/client.py | 79 +++++++++++++++------------------- kafka/protocol/admin.py | 2 +- test/test_admin_integration.py | 20 ++++++--- 3 files changed, 51 insertions(+), 50 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 6f9e78c25..025730f51 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1127,49 +1127,26 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): :return: Dictionary with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) - version = self._matching_api_version(MetadataRequest) partitions = set(partitions) - topics = set() + topics = set(tp.topic for tp in partitions) - for topic_partition in partitions: - topics.add(topic_partition.topic) + response = self._get_cluster_metadata(topics=topics).to_object() - request = MetadataRequest[version]( - topics=list(topics), - allow_auto_topic_creation=False - ) - - future = self._send_request_to_node(self._client.least_loaded_node(), request) - - self._wait_for_futures([future]) - response = future.value - - version = self._matching_api_version(DeleteRecordsRequest) - - PARTITIONS_INFO = 3 - NAME = 1 - PARTITION_INDEX = 1 - LEADER = 2 - - # We want to make as few requests as possible - # If a single node serves as a partition leader for multiple partitions (and/or - # topics), we can send all of those in a single request. - # For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}} leader2partitions = defaultdict(list) valid_partitions = set() - for topic in response.topics: - for partition in topic[PARTITIONS_INFO]: - t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX]) + for topic in response.get("topics", ()): + for partition in topic.get("partitions", ()): + t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) if t2p in partitions: - leader2partitions[partition[LEADER]].append(t2p) + leader2partitions[partition["leader"]].append(t2p) valid_partitions.add(t2p) if len(partitions) != len(valid_partitions): unknown = set(partitions) - valid_partitions raise UnknownTopicOrPartitionError( - "The following partitions are not known: %s" % ", " - .join(str(x) for x in unknown) + "The following partitions are not known: %s" + % ", ".join(str(x) for x in unknown) ) return leader2partitions @@ -1183,14 +1160,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id config. :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to this node. No check is performed verifying that this is indeed the leader for all - listed partitions, use with caution. + listed partitions: use with caution. - :return: List of DeleteRecordsResponse + :return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker. + See DeleteRecordsResponse for possible fields. error_code for all partitions is + guaranteed to be zero, otherwise an exception is raised. """ timeout_ms = self._validate_timeout(timeout_ms) responses = [] version = self._matching_api_version(DeleteRecordsRequest) + # We want to make as few requests as possible + # If a single node serves as a partition leader for multiple partitions (and/or + # topics), we can send all of those in a single request. + # For that we store {leader -> {partitions for leader}}, and do 1 request per leader if partition_leader_id is None: leader2partitions = self._get_leader_for_partitions( set(records_to_delete), timeout_ms @@ -1213,27 +1196,35 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id future = self._send_request_to_node(leader, request) self._wait_for_futures([future]) - response = future.value - responses.append(response) + responses.append(future.value.to_object()) + partition2result = {} partition2error = {} for response in responses: - for topic in getattr(response, 'topics', ()): - for partition in getattr(topic, 'partitions', ()): - if getattr(partition, 'error_code', 0) != 0: - tp = TopicPartition(topic, partition['partition_index']) - partition2error[tp] = partition['error_code'] + for topic in response["topics"]: + for partition in topic["partitions"]: + tp = TopicPartition(topic["name"], partition["partition_index"]) + partition2result[tp] = partition + if partition["error_code"] != 0: + partition2error[tp] = partition["error_code"] if partition2error: if len(partition2error) == 1: - raise Errors.for_code(partition2error[0])() + key, error = next(iter(partition2error.items())) + raise Errors.for_code(error)( + "Error deleting records from topic %s partition %s" % (key.topic, key.partition) + ) else: raise Errors.BrokerResponseError( - "The following errors occured when trying to delete records: " - ", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items()) + "The following errors occured when trying to delete records: " + + ", ".join( + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) + for partition, error in partition2error.items() + ) ) - return responses + return partition2result # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 86ad02f16..63604e576 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -183,13 +183,13 @@ class DeleteRecordsResponse_v0(Response): API_KEY = 21 API_VERSION = 0 SCHEMA = Schema( + ('throttle_time_ms', Int32), ('topics', Array( ('name', String('utf-8')), ('partitions', Array( ('partition_index', Int32), ('low_watermark', Int64), ('error_code', Int16))))), - ('throttle_time_ms', Int32) ) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 75231b933..77b4938b2 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -9,8 +9,8 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) from kafka.errors import ( - KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, - GroupIdNotFoundError, UnknownTopicOrPartitionError) + BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -345,7 +345,11 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message for _ in range(600): next(consumer1) - kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) + assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition} + assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition} + assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition} + assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition} consumer2 = kafka_consumer_factory(group_id=None, topics=()) consumer2.assign(partitions) @@ -363,16 +367,22 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") -def test_delete_records_with_errors(kafka_admin_client, topic): +def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): sleep(1) # sometimes the topic is not created yet...? p0 = TopicPartition(topic, 0) + p1 = TopicPartition(topic, 1) + p2 = TopicPartition(topic, 2) # verify that topic has been created - kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000) + send_messages(range(0, 1), partition=p2.partition, topic=p2.topic) with pytest.raises(UnknownTopicOrPartitionError): kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1}) with pytest.raises(UnknownTopicOrPartitionError): kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1}) + with pytest.raises(OffsetOutOfRangeError): + kafka_admin_client.delete_records({p0: 1000}) + with pytest.raises(BrokerResponseError): + kafka_admin_client.delete_records({p0: 1000, p1: 1000}) From 3d56f9be20d6628f2d6f609bb6a5043708f1bbf1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 13:12:42 -0700 Subject: [PATCH 08/11] whitespace --- kafka/admin/client.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 025730f51..fcf3be86f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1121,9 +1121,9 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): Will raise UnknownTopicOrPartitionError if for some partition no leader can be found. :param partitions: ``[TopicPartition]``: partitions for which to find leaders. - :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from config. - + :return: Dictionary with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) @@ -1145,7 +1145,7 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): if len(partitions) != len(valid_partitions): unknown = set(partitions) - valid_partitions raise UnknownTopicOrPartitionError( - "The following partitions are not known: %s" + "The following partitions are not known: %s" % ", ".join(str(x) for x in unknown) ) @@ -1156,14 +1156,14 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the given partitions. - :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from + :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from config. :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to this node. No check is performed verifying that this is indeed the leader for all listed partitions: use with caution. :return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker. - See DeleteRecordsResponse for possible fields. error_code for all partitions is + See DeleteRecordsResponse for possible fields. error_code for all partitions is guaranteed to be zero, otherwise an exception is raised. """ timeout_ms = self._validate_timeout(timeout_ms) @@ -1171,7 +1171,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id version = self._matching_api_version(DeleteRecordsRequest) # We want to make as few requests as possible - # If a single node serves as a partition leader for multiple partitions (and/or + # If a single node serves as a partition leader for multiple partitions (and/or # topics), we can send all of those in a single request. # For that we store {leader -> {partitions for leader}}, and do 1 request per leader if partition_leader_id is None: @@ -1218,8 +1218,8 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id raise Errors.BrokerResponseError( "The following errors occured when trying to delete records: " + ", ".join( - "%s(partition=%d): %s" % - (partition.topic, partition.partition, Errors.for_code(error).__name__) + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) for partition, error in partition2error.items() ) ) From 62a35103fe4c496407d1d84c651401ba71191589 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 13:13:02 -0700 Subject: [PATCH 09/11] Update for client.api_version() --- kafka/admin/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fcf3be86f..b4cc051c9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1168,7 +1168,9 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id """ timeout_ms = self._validate_timeout(timeout_ms) responses = [] - version = self._matching_api_version(DeleteRecordsRequest) + version = self._client.api_version(DeleteRecordsRequest, max_version=0) + if version is None: + raise IncompatibleBrokerVersion("Broker does not support DeleteGroupsRequest") # We want to make as few requests as possible # If a single node serves as a partition leader for multiple partitions (and/or From 5361f2abb9a85b3e3e9bf3aad4957bc3cfd7d3e9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 13:15:45 -0700 Subject: [PATCH 10/11] fix merge conflict --- kafka/admin/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index b4cc051c9..ad209f70d 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -22,7 +22,6 @@ CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest) -) from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.metadata import MetadataRequest From 85a40bd05a2985340dc652740c50b8bed360a08a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 13:24:54 -0700 Subject: [PATCH 11/11] fixup low_watermark in tests --- test/test_admin_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 77b4938b2..83b6ccaf2 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -348,8 +348,8 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000) assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition} assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition} - assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition} - assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition} + assert result[t1p0] == {"low_watermark": 40, "error_code": 0, "partition_index": t1p0.partition} + assert result[t1p2] == {"low_watermark": 30, "error_code": 0, "partition_index": t1p2.partition} consumer2 = kafka_consumer_factory(group_id=None, topics=()) consumer2.assign(partitions)