Skip to content

Commit 5418a64

Browse files
committed
better response structure, better tests, reuse more code, fix protocol
1 parent f9afb4e commit 5418a64

File tree

2 files changed

+48
-48
lines changed

2 files changed

+48
-48
lines changed

kafka/admin/client.py

+35-44
Original file line numberDiff line numberDiff line change
@@ -973,49 +973,26 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
973973
:return: Dictionary with ``{leader_id -> {partitions}}``
974974
"""
975975
timeout_ms = self._validate_timeout(timeout_ms)
976-
version = self._matching_api_version(MetadataRequest)
977976

978977
partitions = set(partitions)
979-
topics = set()
980-
981-
for topic_partition in partitions:
982-
topics.add(topic_partition.topic)
983-
984-
request = MetadataRequest[version](
985-
topics=list(topics),
986-
allow_auto_topic_creation=False
987-
)
988-
989-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
990-
991-
self._wait_for_futures([future])
992-
response = future.value
993-
994-
version = self._matching_api_version(DeleteRecordsRequest)
978+
topics = set(tp.topic for tp in partitions)
995979

996-
PARTITIONS_INFO = 3
997-
NAME = 1
998-
PARTITION_INDEX = 1
999-
LEADER = 2
980+
response = self._get_cluster_metadata(topics=topics).to_object()
1000981

1001-
# We want to make as few requests as possible
1002-
# If a single node serves as a partition leader for multiple partitions (and/or
1003-
# topics), we can send all of those in a single request.
1004-
# For that we store {leader -> {topic1 -> [p0, p1], topic2 -> [p0, p1]}}
1005982
leader2partitions = defaultdict(list)
1006983
valid_partitions = set()
1007-
for topic in response.topics:
1008-
for partition in topic[PARTITIONS_INFO]:
1009-
t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX])
984+
for topic in response.get("topics", ()):
985+
for partition in topic.get("partitions", ()):
986+
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
1010987
if t2p in partitions:
1011-
leader2partitions[partition[LEADER]].append(t2p)
988+
leader2partitions[partition["leader"]].append(t2p)
1012989
valid_partitions.add(t2p)
1013990

1014991
if len(partitions) != len(valid_partitions):
1015992
unknown = set(partitions) - valid_partitions
1016993
raise UnknownTopicOrPartitionError(
1017-
"The following partitions are not known: %s" % ", "
1018-
.join(str(x) for x in unknown)
994+
"The following partitions are not known: %s"
995+
% ", ".join(str(x) for x in unknown)
1019996
)
1020997

1021998
return leader2partitions
@@ -1029,14 +1006,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
10291006
config.
10301007
:param partition_leader_id: ``str``: If specified, all deletion requests will be sent to
10311008
this node. No check is performed verifying that this is indeed the leader for all
1032-
listed partitions, use with caution.
1009+
listed partitions: use with caution.
10331010
1034-
:return: List of DeleteRecordsResponse
1011+
:return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker.
1012+
See DeleteRecordsResponse for possible fields. error_code for all partitions is
1013+
guaranteed to be zero, otherwise an exception is raised.
10351014
"""
10361015
timeout_ms = self._validate_timeout(timeout_ms)
10371016
responses = []
10381017
version = self._matching_api_version(DeleteRecordsRequest)
10391018

1019+
# We want to make as few requests as possible
1020+
# If a single node serves as a partition leader for multiple partitions (and/or
1021+
# topics), we can send all of those in a single request.
1022+
# For that we store {leader -> {partitions for leader}}, and do 1 request per leader
10401023
if partition_leader_id is None:
10411024
leader2partitions = self._get_leader_for_partitions(
10421025
set(records_to_delete), timeout_ms
@@ -1059,27 +1042,35 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
10591042
future = self._send_request_to_node(leader, request)
10601043
self._wait_for_futures([future])
10611044

1062-
response = future.value
1063-
responses.append(response)
1045+
responses.append(future.value.to_object())
10641046

1047+
partition2result = {}
10651048
partition2error = {}
10661049
for response in responses:
1067-
for topic in getattr(response, 'topics', ()):
1068-
for partition in getattr(topic, 'partitions', ()):
1069-
if getattr(partition, 'error_code', 0) != 0:
1070-
tp = TopicPartition(topic, partition['partition_index'])
1071-
partition2error[tp] = partition['error_code']
1050+
for topic in response["topics"]:
1051+
for partition in topic["partitions"]:
1052+
tp = TopicPartition(topic["name"], partition["partition_index"])
1053+
partition2result[tp] = partition
1054+
if partition["error_code"] != 0:
1055+
partition2error[tp] = partition["error_code"]
10721056

10731057
if partition2error:
10741058
if len(partition2error) == 1:
1075-
raise Errors.for_code(partition2error[0])()
1059+
key, error = next(iter(partition2error.items()))
1060+
raise Errors.for_code(error)(
1061+
"Error deleting records from topic %s partition %s" % (key.topic, key.partition)
1062+
)
10761063
else:
10771064
raise Errors.BrokerResponseError(
1078-
"The following errors occured when trying to delete records: "
1079-
", ".join("%s: %s" % (partition, error) for partition, error in partition2error.items())
1065+
"The following errors occured when trying to delete records: " +
1066+
", ".join(
1067+
"%s(partition=%d): %s" %
1068+
(partition.topic, partition.partition, Errors.for_code(error).__name__)
1069+
for partition, error in partition2error.items()
1070+
)
10801071
)
10811072

1082-
return responses
1073+
return partition2result
10831074

10841075
# create delegation token protocol not yet implemented
10851076
# Note: send the request to the least_loaded_node()

test/test_admin_integration.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
from kafka.admin import (
1010
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
1111
from kafka.errors import (
12-
KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
13-
GroupIdNotFoundError, UnknownTopicOrPartitionError)
12+
BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
13+
GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError)
1414

1515

1616
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -342,7 +342,11 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message
342342
for _ in range(600):
343343
next(consumer1)
344344

345-
kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000)
345+
result = kafka_admin_client.delete_records({t0p0: -1, t0p1: 50, t1p0: 40, t1p2: 30}, timeout_ms=1000)
346+
assert result[t0p0] == {"low_watermark": 100, "error_code": 0, "partition_index": t0p0.partition}
347+
assert result[t0p1] == {"low_watermark": 50, "error_code": 0, "partition_index": t0p1.partition}
348+
assert result[t1p0] == {"low_watermark": 60, "error_code": 0, "partition_index": t1p0.partition}
349+
assert result[t1p2] == {"low_watermark": 70, "error_code": 0, "partition_index": t1p2.partition}
346350

347351
consumer2 = kafka_consumer_factory(group_id=None, topics=())
348352
consumer2.assign(partitions)
@@ -363,13 +367,18 @@ def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_message
363367
def test_delete_records_with_errors(kafka_admin_client, topic):
364368
sleep(1) # sometimes the topic is not created yet...?
365369
p0 = TopicPartition(topic, 0)
370+
p1 = TopicPartition(topic, 1)
366371
# verify that topic has been created
367-
kafka_admin_client.delete_records({p0: 10}, timeout_ms=1000)
372+
kafka_admin_client.delete_records({p0: -1}, timeout_ms=1000)
368373

369374
with pytest.raises(UnknownTopicOrPartitionError):
370375
kafka_admin_client.delete_records({TopicPartition(topic, 9999): -1})
371376
with pytest.raises(UnknownTopicOrPartitionError):
372377
kafka_admin_client.delete_records({TopicPartition("doesntexist", 0): -1})
378+
with pytest.raises(OffsetOutOfRangeError):
379+
kafka_admin_client.delete_records({p0: 1000})
380+
with pytest.raises(BrokerResponseError):
381+
kafka_admin_client.delete_records({p0: 1000, p1: 1000})
373382

374383

375384

0 commit comments

Comments
 (0)