Skip to content

Improve sticky assignor's test coverage #2161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,19 @@ def parse_member_metadata(cls, metadata):

@classmethod
def metadata(cls, topics):
if cls.member_assignment is None:
return cls._metadata(topics, cls.member_assignment, cls.generation)

@classmethod
def _metadata(cls, topics, member_assignment_partitions, generation=-1):
if member_assignment_partitions is None:
log.debug("No member assignment available")
user_data = b''
else:
log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
partitions_by_topic = defaultdict(list)
for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable
for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), cls.generation)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)

Expand Down
75 changes: 33 additions & 42 deletions test/test_assignors.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, [])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand All @@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker):
del subscriptions['C0']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand Down Expand Up @@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(
member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)

Expand All @@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand All @@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand Down Expand Up @@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker):
del subscriptions['C10']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand All @@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker):
subscriptions['C10'] = {'t'}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(
member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker):
del subscriptions['C5']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
Expand All @@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):

member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

for i in range(50):
member = 'C{}'.format(randint(1, n_consumers))
Expand Down Expand Up @@ -517,7 +517,7 @@ def test_new_subscription(mocker):
subscriptions['C0'].add('t1')
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, [])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand All @@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker):

member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, member_assignments[member])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member])

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -570,7 +570,7 @@ def test_stickiness(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

cluster = create_cluster(mocker, topics={}, topics_partitions={})
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
# assume both C1 and C2 have partition 1 assigned to them in generation 1
member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu

member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand All @@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

member_metadata = {
'C1': build_metadata({'t'}, []),
'C2': build_metadata({'t'}, []),
'C3': build_metadata({'t'}, []),
'C1': StickyPartitionAssignor._metadata({'t'}, []),
'C2': StickyPartitionAssignor._metadata({'t'}, []),
'C3': StickyPartitionAssignor._metadata({'t'}, []),
}

assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2

member_metadata = {
'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()),
}

assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

member_metadata = {
'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}

assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

member_metadata = {
'C1': build_metadata({'t'}, []),
'C2': build_metadata({'t'}, []),
'C3': build_metadata({'t'}, []),
'C1': StickyPartitionAssignor._metadata({'t'}, []),
'C2': StickyPartitionAssignor._metadata({'t'}, []),
'C3': StickyPartitionAssignor._metadata({'t'}, []),
}

assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2

member_metadata = {
'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
}

assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

member_metadata = {
'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}

assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
}
member_metadata = {}
for member in six.iterkeys(member_assignments):
member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member])

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
Expand All @@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
def make_member_metadata(subscriptions):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, [])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
return member_metadata


def build_metadata(topics, member_assignment_partitions, generation=-1):
partitions_by_topic = defaultdict(list)
for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)


def assert_assignment(result_assignment, expected_assignment):
assert result_assignment == expected_assignment
assert set(result_assignment) == set(expected_assignment)
Expand Down