Skip to content

Commit b090b21

Browse files
authored
Cover sticky assignor's metadata method with tests (#2161)
1 parent 83b7b27 commit b090b21

File tree

2 files changed

+40
-45
lines changed

2 files changed

+40
-45
lines changed

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -648,15 +648,19 @@ def parse_member_metadata(cls, metadata):
648648

649649
@classmethod
650650
def metadata(cls, topics):
651-
if cls.member_assignment is None:
651+
return cls._metadata(topics, cls.member_assignment, cls.generation)
652+
653+
@classmethod
654+
def _metadata(cls, topics, member_assignment_partitions, generation=-1):
655+
if member_assignment_partitions is None:
652656
log.debug("No member assignment available")
653657
user_data = b''
654658
else:
655659
log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
656660
partitions_by_topic = defaultdict(list)
657-
for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable
661+
for topic_partition in member_assignment_partitions:
658662
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
659-
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), cls.generation)
663+
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
660664
user_data = data.encode()
661665
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
662666

test/test_assignors.py

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker):
111111
del subscriptions['C1']
112112
member_metadata = {}
113113
for member, topics in six.iteritems(subscriptions):
114-
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
114+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
115115

116116
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
117117
expected_assignment = {
@@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker):
154154
}
155155
member_metadata = {}
156156
for member, topics in six.iteritems(subscriptions):
157-
member_metadata[member] = build_metadata(topics, [])
157+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
158158

159159
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
160160
expected_assignment = {
@@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker):
167167
del subscriptions['C0']
168168
member_metadata = {}
169169
for member, topics in six.iteritems(subscriptions):
170-
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
170+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
171171

172172
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
173173
expected_assignment = {
@@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
326326
}
327327
member_metadata = {}
328328
for member, topics in six.iteritems(subscriptions):
329-
member_metadata[member] = build_metadata(
329+
member_metadata[member] = StickyPartitionAssignor._metadata(
330330
topics, assignment[member].partitions() if member in assignment else []
331331
)
332332

@@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
338338
}
339339
member_metadata = {}
340340
for member, topics in six.iteritems(subscriptions):
341-
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
341+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
342342

343343
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
344344
verify_validity_and_balance(subscriptions, assignment)
@@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
367367
}
368368
member_metadata = {}
369369
for member, topics in six.iteritems(subscriptions):
370-
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
370+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
371371

372372
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
373373
expected_assignment = {
@@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
382382
}
383383
member_metadata = {}
384384
for member, topics in six.iteritems(subscriptions):
385-
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
385+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
386386

387387
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
388388
expected_assignment = {
@@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker):
413413
del subscriptions['C10']
414414
member_metadata = {}
415415
for member, topics in six.iteritems(subscriptions):
416-
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
416+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
417417

418418
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
419419
verify_validity_and_balance(subscriptions, assignment)
@@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker):
435435
subscriptions['C10'] = {'t'}
436436
member_metadata = {}
437437
for member, topics in six.iteritems(subscriptions):
438-
member_metadata[member] = build_metadata(
438+
member_metadata[member] = StickyPartitionAssignor._metadata(
439439
topics, assignment[member].partitions() if member in assignment else []
440440
)
441441
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker):
462462
del subscriptions['C5']
463463
member_metadata = {}
464464
for member, topics in six.iteritems(subscriptions):
465-
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
465+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
466466
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
467467
verify_validity_and_balance(subscriptions, assignment)
468468
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
@@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
488488

489489
member_metadata = {}
490490
for member, topics in six.iteritems(subscriptions):
491-
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
491+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
492492

493493
for i in range(50):
494494
member = 'C{}'.format(randint(1, n_consumers))
@@ -517,7 +517,7 @@ def test_new_subscription(mocker):
517517
subscriptions['C0'].add('t1')
518518
member_metadata = {}
519519
for member, topics in six.iteritems(subscriptions):
520-
member_metadata[member] = build_metadata(topics, [])
520+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
521521

522522
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
523523
verify_validity_and_balance(subscriptions, assignment)
@@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker):
540540

541541
member_metadata = {}
542542
for member, topics in six.iteritems(subscriptions):
543-
member_metadata[member] = build_metadata(topics, member_assignments[member])
543+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member])
544544

545545
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
546546
verify_validity_and_balance(subscriptions, assignment)
@@ -570,7 +570,7 @@ def test_stickiness(mocker):
570570
del subscriptions['C1']
571571
member_metadata = {}
572572
for member, topics in six.iteritems(subscriptions):
573-
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
573+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
574574

575575
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
576576
verify_validity_and_balance(subscriptions, assignment)
@@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
625625
}
626626
member_metadata = {}
627627
for member, topics in six.iteritems(subscriptions):
628-
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
628+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
629629

630630
cluster = create_cluster(mocker, topics={}, topics_partitions={})
631631
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker):
645645
member_metadata = {}
646646
for member, topics in six.iteritems(subscriptions):
647647
# assume both C1 and C2 have partition 1 assigned to them in generation 1
648-
member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
648+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
649649

650650
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
651651
verify_validity_and_balance(subscriptions, assignment)
@@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu
676676

677677
member_metadata = {}
678678
for member, topics in six.iteritems(subscriptions):
679-
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
679+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
680680

681681
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
682682
verify_validity_and_balance(subscriptions, assignment)
@@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker):
687687
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
688688

689689
member_metadata = {
690-
'C1': build_metadata({'t'}, []),
691-
'C2': build_metadata({'t'}, []),
692-
'C3': build_metadata({'t'}, []),
690+
'C1': StickyPartitionAssignor._metadata({'t'}, []),
691+
'C2': StickyPartitionAssignor._metadata({'t'}, []),
692+
'C3': StickyPartitionAssignor._metadata({'t'}, []),
693693
}
694694

695695
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker):
699699
assert len(assignment1['C3'].assignment[0][1]) == 2
700700

701701
member_metadata = {
702-
'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
703-
'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
702+
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
703+
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()),
704704
}
705705

706706
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker):
712712
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
713713

714714
member_metadata = {
715-
'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
716-
'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
715+
'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
716+
'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
717717
}
718718

719719
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker):
727727
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
728728

729729
member_metadata = {
730-
'C1': build_metadata({'t'}, []),
731-
'C2': build_metadata({'t'}, []),
732-
'C3': build_metadata({'t'}, []),
730+
'C1': StickyPartitionAssignor._metadata({'t'}, []),
731+
'C2': StickyPartitionAssignor._metadata({'t'}, []),
732+
'C3': StickyPartitionAssignor._metadata({'t'}, []),
733733
}
734734

735735
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker):
739739
assert len(assignment1['C3'].assignment[0][1]) == 2
740740

741741
member_metadata = {
742-
'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
742+
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
743743
}
744744

745745
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker):
749749
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
750750

751751
member_metadata = {
752-
'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
753-
'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
754-
'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
752+
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1),
753+
'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
754+
'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
755755
}
756756

757757
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
778778
}
779779
member_metadata = {}
780780
for member in six.iterkeys(member_assignments):
781-
member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
781+
member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member])
782782

783783
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
784784
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
@@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
788788
def make_member_metadata(subscriptions):
789789
member_metadata = {}
790790
for member, topics in six.iteritems(subscriptions):
791-
member_metadata[member] = build_metadata(topics, [])
791+
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
792792
return member_metadata
793793

794794

795-
def build_metadata(topics, member_assignment_partitions, generation=-1):
796-
partitions_by_topic = defaultdict(list)
797-
for topic_partition in member_assignment_partitions:
798-
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
799-
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
800-
user_data = data.encode()
801-
return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)
802-
803-
804795
def assert_assignment(result_assignment, expected_assignment):
805796
assert result_assignment == expected_assignment
806797
assert set(result_assignment) == set(expected_assignment)

0 commit comments

Comments
 (0)