Skip to content

Seeking on a consumer from the rebalance listener causes consumer to silently discard records #2310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rvesse opened this issue Mar 23, 2022 · 0 comments · Fixed by #2555
Closed

Comments

@rvesse
Copy link

rvesse commented Mar 23, 2022

Discovered what appears to be a multi-threading bug that can be provoked by trying to do a seek from a rebalance listener. After the seek the consumer returns the record at the seeked offset and then silently resets the position to some subsequent offset potentially skipping a bunch of records.

Turning on Python logging shows that the fetcher is incorrectly thinking that there are compacted/deleted records to skip and resetting the position i.e. the code block at

# advance position for any deleted compacted messages if required
if self._subscriptions.assignment[partition].last_offset_from_message_batch:
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
log.debug(
"Advance position for partition %s from %s to %s (last message batch location plus one)"
" to correct for deleted compacted messages",
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
is triggered

I understand that the package is not fully type-safe so this probably should not be expected to work but wanted to document in case anyone else runs into this.

Code to Reproduce

def str_to_bytes(value: str) -> bytes:
    return value.encode("UTF-8")


def bytes_to_str(value: bytes) -> str:
    return value.decode("UTF-8")


def consumer_fetch_misses_records():
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                             key_serializer=str_to_bytes,
                             value_serializer=str_to_bytes)
    for i in range(1, 11):
        producer.send(topic="test", key=str(i), value=str(i))

    producer.close()

    consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
                             key_deserializer=bytes_to_str,
                             value_deserializer=bytes_to_str)
    partition = TopicPartition("test", 0)
    listener = TestRebalanceListener(consumer)
    consumer.subscribe("test", listener=listener)

    for i, record in enumerate(consumer):
        print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")

class TestRebalanceListener(ConsumerRebalanceListener):

    def __init__(self, consumer):
        self.consumer = consumer

    def on_partitions_revoked(self, revoked):
        pass

    def on_partitions_assigned(self, assigned):
        for partition in assigned:
            self.consumer.seek(partition, 0)
            self.consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))


if __name__ == "__main__":
    consumer_fetch_misses_records()

Produces the following output:

[test-0#0] 1: 1

When it should produce all 10 lines i.e.:

[test-0#0] 1: 1
[test-0#0] 1: 1
[test-0#1] 2: 2
[test-0#2] 3: 3
[test-0#3] 4: 4
[test-0#4] 5: 5
[test-0#5] 6: 6
[test-0#6] 7: 7
[test-0#7] 8: 8
[test-0#8] 9: 9
[test-0#9] 10: 10

Workaround

The workaround I found is to have the rebalance listener set a boolean flag when the assignment changes and then have my main loop seek appropriately e.g.

def consumer_fetch_misses_records():
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                             key_serializer=str_to_bytes,
                             value_serializer=str_to_bytes)
    for i in range(1, 11):
        producer.send(topic="test", key=str(i), value=str(i))

    producer.close()

    consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
                             key_deserializer=bytes_to_str,
                             value_deserializer=bytes_to_str)
    partition = TopicPartition("test", 0)
    listener = TestRebalanceListener()
    consumer.subscribe("test", listener=listener)

    for i, record in enumerate(consumer):
        if listener.should_seek:
            consumer.seek(partition, 0)
            consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))
            listener.should_seek = False
        print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")


class TestRebalanceListener(ConsumerRebalanceListener):

    def __init__(self):
        self.should_seek = False

    def on_partitions_revoked(self, revoked):
        pass

    def on_partitions_assigned(self, assigned):
        self.should_seek = assigned is not None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant