Skip to content

Lazy unpack records in Consumer Fetcher #2555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 18, 2025
363 changes: 154 additions & 209 deletions kafka/consumer/fetcher.py

Large diffs are not rendered by default.

22 changes: 9 additions & 13 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,22 +707,18 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
# responses to enable pipelining while the user is handling the
# fetched records.
if not partial:
futures = self._fetcher.send_fetches()
if len(futures):
self._client.poll(timeout_ms=0)

if records:
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
# responses to enable pipelining while the user is handling the
# fetched records.
if not partial:
futures = self._fetcher.send_fetches()
if len(futures):
self._client.poll(timeout_ms=0)
return records

# Send any new fetches (won't resend pending fetches)
futures = self._fetcher.send_fetches()
if len(futures):
self._client.poll(timeout_ms=0)

self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
Expand Down
5 changes: 0 additions & 5 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,6 @@ def __init__(self):
self._position = None # OffsetAndMetadata exposed to the user
self.highwater = None
self.drop_pending_record_batch = False
# The last message offset hint available from a record batch with
# magic=2 which includes deleted compacted messages
self.last_offset_from_record_batch = None

def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
Expand All @@ -400,7 +397,6 @@ def await_reset(self, strategy):
self.awaiting_reset = True
self.reset_strategy = strategy
self._position = None
self.last_offset_from_record_batch = None
self.has_valid_position = False

def seek(self, offset):
Expand All @@ -409,7 +405,6 @@ def seek(self, offset):
self.reset_strategy = None
self.has_valid_position = True
self.drop_pending_record_batch = True
self.last_offset_from_record_batch = None

def pause(self):
self.paused = True
Expand Down
15 changes: 15 additions & 0 deletions kafka/record/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
class ABCRecord(object):
__slots__ = ()

@abc.abstractproperty
def size_in_bytes(self):
""" Number of total bytes in record
"""

@abc.abstractproperty
def offset(self):
""" Absolute offset of record
Expand Down Expand Up @@ -40,6 +45,11 @@ def checksum(self):
be the checksum for v0 and v1 and None for v2 and above.
"""

@abc.abstractmethod
def validate_crc(self):
""" Return True if v0/v1 record matches checksum. noop/True for v2 records
"""

@abc.abstractproperty
def headers(self):
""" If supported by version list of key-value tuples, or empty list if
Expand Down Expand Up @@ -100,6 +110,11 @@ def __iter__(self):
if needed.
"""

@abc.abstractproperty
def magic(self):
""" Return magic value (0, 1, 2) for batch.
"""


@add_metaclass(abc.ABCMeta)
class ABCRecords(object):
Expand Down
26 changes: 17 additions & 9 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ def _read_msg(

if self.is_control_batch:
return ControlRecord(
offset, timestamp, self.timestamp_type, key, value, headers)
length, offset, timestamp, self.timestamp_type, key, value, headers)
else:
return DefaultRecord(
offset, timestamp, self.timestamp_type, key, value, headers)
length, offset, timestamp, self.timestamp_type, key, value, headers)

def __iter__(self):
self._maybe_uncompress()
Expand Down Expand Up @@ -314,17 +314,22 @@ def validate_crc(self):

class DefaultRecord(ABCRecord):

__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
__slots__ = ("_size_in_bytes", "_offset", "_timestamp", "_timestamp_type", "_key", "_value",
"_headers")

def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers):
self._size_in_bytes = size_in_bytes
self._offset = offset
self._timestamp = timestamp
self._timestamp_type = timestamp_type
self._key = key
self._value = value
self._headers = headers

@property
def size_in_bytes(self):
return self._size_in_bytes

@property
def offset(self):
return self._offset
Expand Down Expand Up @@ -361,6 +366,9 @@ def headers(self):
def checksum(self):
return None

def validate_crc(self):
return True

def __repr__(self):
return (
"DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
Expand All @@ -371,16 +379,16 @@ def __repr__(self):


class ControlRecord(DefaultRecord):
__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
__slots__ = ("_size_in_bytes", "_offset", "_timestamp", "_timestamp_type", "_key", "_value",
"_headers", "_version", "_type")

KEY_STRUCT = struct.Struct(
">h" # Current Version => Int16
"h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit)
)

def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers)
def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers):
super(ControlRecord, self).__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers)
(self._version, self._type) = self.KEY_STRUCT.unpack(self._key)

# see https://kafka.apache.org/documentation/#controlbatch
Expand Down Expand Up @@ -548,8 +556,8 @@ def write_header(self, use_compression_type=True):
0, # CRC will be set below, as we need a filled buffer for it
self._get_attributes(use_compression_type),
self._last_offset,
self._first_timestamp,
self._max_timestamp,
self._first_timestamp or 0,
self._max_timestamp or 0,
self._producer_id,
self._producer_epoch,
self._base_sequence,
Expand Down
43 changes: 33 additions & 10 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def timestamp_type(self):
def compression_type(self):
return self._attributes & self.CODEC_MASK

@property
def magic(self):
return self._magic

def validate_crc(self):
crc = calc_crc32(self._buffer[self.MAGIC_OFFSET:])
return self._crc == crc
Expand Down Expand Up @@ -232,6 +236,9 @@ def _read_key_value(self, pos):
value = self._buffer[pos:pos + value_size].tobytes()
return key, value

def _crc_bytes(self, msg_pos, length):
return self._buffer[msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length]

def __iter__(self):
if self._magic == 1:
key_offset = self.KEY_OFFSET_V1
Expand All @@ -255,7 +262,7 @@ def __iter__(self):
absolute_base_offset = -1

for header, msg_pos in headers:
offset, _, crc, _, attrs, timestamp = header
offset, length, crc, _, attrs, timestamp = header
# There should only ever be a single layer of compression
assert not attrs & self.CODEC_MASK, (
'MessageSet at offset %d appears double-compressed. This '
Expand All @@ -271,28 +278,36 @@ def __iter__(self):
offset += absolute_base_offset

key, value = self._read_key_value(msg_pos + key_offset)
crc_bytes = self._crc_bytes(msg_pos, length)
yield LegacyRecord(
offset, timestamp, timestamp_type,
key, value, crc)
self._magic, offset, timestamp, timestamp_type,
key, value, crc, crc_bytes)
else:
key, value = self._read_key_value(key_offset)
crc_bytes = self._crc_bytes(0, len(self._buffer) - self.LOG_OVERHEAD)
yield LegacyRecord(
self._offset, self._timestamp, timestamp_type,
key, value, self._crc)
self._magic, self._offset, self._timestamp, timestamp_type,
key, value, self._crc, crc_bytes)


class LegacyRecord(ABCRecord):

__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
"_crc")
__slots__ = ("_magic", "_offset", "_timestamp", "_timestamp_type", "_key", "_value",
"_crc", "_crc_bytes")

def __init__(self, offset, timestamp, timestamp_type, key, value, crc):
def __init__(self, magic, offset, timestamp, timestamp_type, key, value, crc, crc_bytes):
self._magic = magic
self._offset = offset
self._timestamp = timestamp
self._timestamp_type = timestamp_type
self._key = key
self._value = value
self._crc = crc
self._crc_bytes = crc_bytes

@property
def magic(self):
return self._magic

@property
def offset(self):
Expand Down Expand Up @@ -330,11 +345,19 @@ def headers(self):
def checksum(self):
return self._crc

def validate_crc(self):
crc = calc_crc32(self._crc_bytes)
return self._crc == crc

@property
def size_in_bytes(self):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(self._magic, None, self._key, self._value)

def __repr__(self):
return (
"LegacyRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
"LegacyRecord(magic={!r} offset={!r}, timestamp={!r}, timestamp_type={!r},"
" key={!r}, value={!r}, crc={!r})".format(
self._offset, self._timestamp, self._timestamp_type,
self._magic, self._offset, self._timestamp, self._timestamp_type,
self._key, self._value, self._crc)
)

Expand Down
8 changes: 6 additions & 2 deletions kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class MemoryRecordsBuilder(object):
__slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed",
"_bytes_written")

def __init__(self, magic, compression_type, batch_size):
def __init__(self, magic, compression_type, batch_size, offset=0):
assert magic in [0, 1, 2], "Not supported magic"
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
if magic >= 2:
Expand All @@ -130,10 +130,14 @@ def __init__(self, magic, compression_type, batch_size):
self._batch_size = batch_size
self._buffer = None

self._next_offset = 0
self._next_offset = offset
self._closed = False
self._bytes_written = 0

def skip(self, offsets_to_skip):
# Exposed for testing compacted records
self._next_offset += offsets_to_skip

def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.

Expand Down
Loading
Loading