diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7527a1f39..1fa2c32ec 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -2,6 +2,7 @@ import collections import copy +import itertools import logging import random import sys @@ -363,176 +364,50 @@ def _append(self, drained, part, max_records, update_offsets): return 0 tp = part.topic_partition - fetch_offset = part.fetch_offset if not self._subscriptions.is_assigned(tp): # this can happen when a rebalance happened before # fetched records are returned to the consumer's poll call log.debug("Not returning fetched records for partition %s" " since it is no longer assigned", tp) + elif not self._subscriptions.is_fetchable(tp): + # this can happen when a partition is paused before + # fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + else: # note that the position should always be available # as long as the partition is still assigned position = self._subscriptions.assignment[tp].position - if not self._subscriptions.is_fetchable(tp): - # this can happen when a partition is paused before - # fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) - - elif fetch_offset == position.offset: - # we are ensured to have at least one record since we already checked for emptiness + if part.next_fetch_offset == position.offset: part_records = part.take(max_records) - next_offset = part_records[-1].offset + 1 - leader_epoch = part_records[-1].leader_epoch - - log.log(0, "Returning fetched records at offset %d for assigned" - " partition %s and update position to %s (leader epoch %s)", position.offset, - tp, next_offset, leader_epoch) - - for record in part_records: - drained[tp].append(record) - - if update_offsets: + log.debug("Returning fetched records at offset %d for assigned" + " partition %s", position.offset, tp) + drained[tp].extend(part_records) + # We want to increment subscription position if (1) we're using consumer.poll(), + # or (2) we didn't return any records (consumer iterator will update position + # when each message is yielded). There may be edge cases where we re-fetch records + # that we'll end up skipping, but for now we'll live with that. + highwater = self._subscriptions.assignment[tp].highwater + if highwater is not None: + self._sensors.records_fetch_lag.record(highwater - part.next_fetch_offset) + if update_offsets or not part_records: # TODO: save leader_epoch - self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1) + log.debug("Updating fetch position for assigned partition %s to %s (leader epoch %s)", + tp, part.next_fetch_offset, part.leader_epoch) + self._subscriptions.assignment[tp].position = OffsetAndMetadata(part.next_fetch_offset, '', -1) return len(part_records) else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request log.debug("Ignoring fetched records for %s at offset %s since" - " the current position is %d", tp, part.fetch_offset, + " the current position is %d", tp, part.next_fetch_offset, position.offset) - part.discard() + part.drain() return 0 - def _message_generator(self): - """Iterate over fetched_records""" - while self._next_partition_records or self._completed_fetches: - - if not self._next_partition_records: - completion = self._completed_fetches.popleft() - self._next_partition_records = self._parse_fetched_data(completion) - continue - - # Send additional FetchRequests when the internal queue is low - # this should enable moderate pipelining - if len(self._completed_fetches) <= self.config['iterator_refetch_records']: - self.send_fetches() - - tp = self._next_partition_records.topic_partition - - # We can ignore any prior signal to drop pending record batches - # because we are starting from a fresh one where fetch_offset == position - # i.e., the user seek()'d to this position - self._subscriptions.assignment[tp].drop_pending_record_batch = False - - for msg in self._next_partition_records.take(): - - # Because we are in a generator, it is possible for - # subscription state to change between yield calls - # so we need to re-check on each loop - # this should catch assignment changes, pauses - # and resets via seek_to_beginning / seek_to_end - if not self._subscriptions.is_fetchable(tp): - log.debug("Not returning fetched records for partition %s" - " since it is no longer fetchable", tp) - self._next_partition_records = None - break - - # If there is a seek during message iteration, - # we should stop unpacking this record batch and - # wait for a new fetch response that aligns with the - # new seek position - elif self._subscriptions.assignment[tp].drop_pending_record_batch: - log.debug("Skipping remainder of record batch for partition %s", tp) - self._subscriptions.assignment[tp].drop_pending_record_batch = False - self._next_partition_records = None - break - - # Compressed messagesets may include earlier messages - elif msg.offset < self._subscriptions.assignment[tp].position.offset: - log.debug("Skipping message offset: %s (expecting %s)", - msg.offset, - self._subscriptions.assignment[tp].position.offset) - continue - - self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, '', -1) - yield msg - - self._next_partition_records = None - - def _unpack_records(self, tp, records): - try: - batch = records.next_batch() - while batch is not None: - - # Try DefaultsRecordBatch / message log format v2 - # base_offset, last_offset_delta, and control batches - try: - batch_offset = batch.base_offset + batch.last_offset_delta - leader_epoch = batch.leader_epoch - self._subscriptions.assignment[tp].last_offset_from_record_batch = batch_offset - # Control batches have a single record indicating whether a transaction - # was aborted or committed. - # When isolation_level is READ_COMMITTED (currently unsupported) - # we should also skip all messages from aborted transactions - # For now we only support READ_UNCOMMITTED and so we ignore the - # abort/commit signal. - if batch.is_control_batch: - batch = records.next_batch() - continue - except AttributeError: - leader_epoch = -1 - pass - - for record in batch: - key_size = len(record.key) if record.key is not None else -1 - value_size = len(record.value) if record.value is not None else -1 - key = self._deserialize( - self.config['key_deserializer'], - tp.topic, record.key) - value = self._deserialize( - self.config['value_deserializer'], - tp.topic, record.value) - headers = record.headers - header_size = sum( - len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in - headers) if headers else -1 - yield ConsumerRecord( - tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp, - record.timestamp_type, key, value, headers, record.checksum, - key_size, value_size, header_size) - - batch = records.next_batch() - - # If unpacking raises StopIteration, it is erroneously - # caught by the generator. We want all exceptions to be raised - # back to the user. See Issue 545 - except StopIteration: - log.exception('StopIteration raised unpacking messageset') - raise RuntimeError('StopIteration raised unpacking messageset') - - def __iter__(self): # pylint: disable=non-iterator-returned - return self - - def __next__(self): - if not self._iterator: - self._iterator = self._message_generator() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise - - def _deserialize(self, f, topic, bytes_): - if not f: - return bytes_ - if isinstance(f, Deserializer): - return f.deserialize(topic, bytes_) - return f(bytes_) - def _send_list_offsets_requests(self, timestamps): """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. @@ -711,16 +586,6 @@ def _create_fetch_requests(self): for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) - # advance position for any deleted compacted messages if required - if self._subscriptions.assignment[partition].last_offset_from_record_batch: - next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1 - if next_offset_from_batch_header > self._subscriptions.assignment[partition].position.offset: - log.debug( - "Advance position for partition %s from %s to %s (last record batch location plus one)" - " to correct for deleted compacted messages and/or transactional control records", - partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header) - self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, '', -1) - position = self._subscriptions.assignment[partition].position # fetch if there is a leader and no in-flight requests @@ -856,12 +721,9 @@ def _handle_fetch_error(self, node_id, exception): def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition fetch_offset = completed_fetch.fetched_offset - num_bytes = 0 - records_count = 0 - parsed_records = None - error_code, highwater = completed_fetch.partition_data[:2] error_type = Errors.for_code(error_code) + parsed_records = None try: if not self._subscriptions.is_fetchable(tp): @@ -890,13 +752,12 @@ def _parse_fetched_data(self, completed_fetch): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position.offset) - unpacked = list(self._unpack_records(tp, records)) - parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) - if unpacked: - last_offset = unpacked[-1].offset - self._sensors.records_fetch_lag.record(highwater - last_offset) - num_bytes = records.valid_bytes() - records_count = len(unpacked) + parsed_records = self.PartitionRecords(fetch_offset, tp, records, + self.config['key_deserializer'], + self.config['value_deserializer'], + self.config['check_crcs'], + completed_fetch.metric_aggregator) + return parsed_records elif records.size_in_bytes() > 0: # we did not read a single message from a non-empty # buffer because that message's size is larger than @@ -911,7 +772,6 @@ def _parse_fetched_data(self, completed_fetch): record_too_large_partitions, self.config['max_partition_fetch_bytes']), record_too_large_partitions) - self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count) elif error_type in (Errors.NotLeaderForPartitionError, Errors.ReplicaNotAvailableError, @@ -942,52 +802,125 @@ def _parse_fetched_data(self, completed_fetch): raise error_type('Unexpected error while fetching data') finally: - completed_fetch.metric_aggregator.record(tp, num_bytes, records_count) + if parsed_records is None: + completed_fetch.metric_aggregator.record(tp, 0, 0) + + return None - return parsed_records + def close(self): + if self._next_partition_records is not None: + self._next_partition_records.drain() class PartitionRecords(object): - def __init__(self, fetch_offset, tp, messages): + def __init__(self, fetch_offset, tp, records, key_deserializer, value_deserializer, check_crcs, metric_aggregator): self.fetch_offset = fetch_offset self.topic_partition = tp - self.messages = messages + self.leader_epoch = -1 + self.next_fetch_offset = fetch_offset + self.bytes_read = 0 + self.records_read = 0 + self.metric_aggregator = metric_aggregator + self.check_crcs = check_crcs + self.record_iterator = itertools.dropwhile( + self._maybe_skip_record, + self._unpack_records(tp, records, key_deserializer, value_deserializer)) + + def _maybe_skip_record(self, record): # When fetching an offset that is in the middle of a # compressed batch, we will get all messages in the batch. # But we want to start 'take' at the fetch_offset # (or the next highest offset in case the message was compacted) - for i, msg in enumerate(messages): - if msg.offset < fetch_offset: - log.debug("Skipping message offset: %s (expecting %s)", - msg.offset, fetch_offset) - else: - self.message_idx = i - break - + if record.offset < self.fetch_offset: + log.debug("Skipping message offset: %s (expecting %s)", + record.offset, self.fetch_offset) + return True else: - self.message_idx = 0 - self.messages = None + return False - # For truthiness evaluation we need to define __len__ or __nonzero__ - def __len__(self): - if self.messages is None or self.message_idx >= len(self.messages): - return 0 - return len(self.messages) - self.message_idx + # For truthiness evaluation + def __bool__(self): + return self.record_iterator is not None - def discard(self): - self.messages = None + def drain(self): + if self.record_iterator is not None: + self.record_iterator = None + self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read) def take(self, n=None): - if not len(self): - return [] - if n is None or n > len(self): - n = len(self) - next_idx = self.message_idx + n - res = self.messages[self.message_idx:next_idx] - self.message_idx = next_idx - # fetch_offset should be incremented by 1 to parallel the - # subscription position (also incremented by 1) - self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1) - return res + return list(itertools.islice(self.record_iterator, 0, n)) + + def _unpack_records(self, tp, records, key_deserializer, value_deserializer): + try: + batch = records.next_batch() + last_batch = None + while batch is not None: + last_batch = batch + + if self.check_crcs and not batch.validate_crc(): + raise Errors.CorruptRecordException( + "Record batch for partition %s at offset %s failed crc check" % ( + self.topic_partition, batch.base_offset)) + + # Try DefaultsRecordBatch / message log format v2 + # base_offset, last_offset_delta, and control batches + if batch.magic == 2: + self.leader_epoch = batch.leader_epoch + # Control batches have a single record indicating whether a transaction + # was aborted or committed. + # When isolation_level is READ_COMMITTED (currently unsupported) + # we should also skip all messages from aborted transactions + # For now we only support READ_UNCOMMITTED and so we ignore the + # abort/commit signal. + if batch.is_control_batch: + self.next_fetch_offset = next(batch).offset + 1 + batch = records.next_batch() + continue + + for record in batch: + if self.check_crcs and not record.validate_crc(): + raise Errors.CorruptRecordException( + "Record for partition %s at offset %s failed crc check" % ( + self.topic_partition, record.offset)) + key_size = len(record.key) if record.key is not None else -1 + value_size = len(record.value) if record.value is not None else -1 + key = self._deserialize(key_deserializer, tp.topic, record.key) + value = self._deserialize(value_deserializer, tp.topic, record.value) + headers = record.headers + header_size = sum( + len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in + headers) if headers else -1 + self.records_read += 1 + self.bytes_read += record.size_in_bytes + self.next_fetch_offset = record.offset + 1 + yield ConsumerRecord( + tp.topic, tp.partition, self.leader_epoch, record.offset, record.timestamp, + record.timestamp_type, key, value, headers, record.checksum, + key_size, value_size, header_size) + + batch = records.next_batch() + else: + # Message format v2 preserves the last offset in a batch even if the last record is removed + # through compaction. By using the next offset computed from the last offset in the batch, + # we ensure that the offset of the next fetch will point to the next batch, which avoids + # unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck + # fetching the same batch repeatedly). + if last_batch and last_batch.magic == 2: + self.next_fetch_offset = last_batch.base_offset + last_batch.last_offset_delta + 1 + self.drain() + + # If unpacking raises StopIteration, it is erroneously + # caught by the generator. We want all exceptions to be raised + # back to the user. See Issue 545 + except StopIteration: + log.exception('StopIteration raised unpacking messageset') + raise RuntimeError('StopIteration raised unpacking messageset') + + def _deserialize(self, f, topic, bytes_): + if not f: + return bytes_ + if isinstance(f, Deserializer): + return f.deserialize(topic, bytes_) + return f(bytes_) class FetchSessionHandler(object): @@ -1196,6 +1129,14 @@ def to_forget(self): return list(partition_data.items()) +class FetchMetrics(object): + __slots__ = ('total_bytes', 'total_records') + + def __init__(self): + self.total_bytes = 0 + self.total_records = 0 + + class FetchResponseMetricAggregator(object): """ Since we parse the message data for each partition from each fetch @@ -1206,8 +1147,8 @@ class FetchResponseMetricAggregator(object): def __init__(self, sensors, partitions): self.sensors = sensors self.unrecorded_partitions = partitions - self.total_bytes = 0 - self.total_records = 0 + self.fetch_metrics = FetchMetrics() + self.topic_fetch_metrics = collections.defaultdict(FetchMetrics) def record(self, partition, num_bytes, num_records): """ @@ -1216,13 +1157,17 @@ def record(self, partition, num_bytes, num_records): have reported, we write the metric. """ self.unrecorded_partitions.remove(partition) - self.total_bytes += num_bytes - self.total_records += num_records + self.fetch_metrics.total_bytes += num_bytes + self.fetch_metrics.total_records += num_records + self.topic_fetch_metrics[partition.topic].total_bytes += num_bytes + self.topic_fetch_metrics[partition.topic].total_records += num_records # once all expected partitions from the fetch have reported in, record the metrics if not self.unrecorded_partitions: - self.sensors.bytes_fetched.record(self.total_bytes) - self.sensors.records_fetched.record(self.total_records) + self.sensors.bytes_fetched.record(self.fetch_metrics.total_bytes) + self.sensors.records_fetched.record(self.fetch_metrics.total_records) + for topic, metrics in six.iteritems(self.topic_fetch_metrics): + self.sensors.record_topic_fetch_metrics(topic, metrics.total_bytes, metrics.total_records) class FetchManagerMetrics(object): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 3fccf4755..fa5b8ea8b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -707,22 +707,18 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) + # Before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + if not partial: + futures = self._fetcher.send_fetches() + if len(futures): + self._client.poll(timeout_ms=0) + if records: - # Before returning the fetched records, we can send off the - # next round of fetches and avoid block waiting for their - # responses to enable pipelining while the user is handling the - # fetched records. - if not partial: - futures = self._fetcher.send_fetches() - if len(futures): - self._client.poll(timeout_ms=0) return records - # Send any new fetches (won't resend pending fetches) - futures = self._fetcher.send_fetches() - if len(futures): - self._client.poll(timeout_ms=0) - self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000)) # after the long poll, we should check whether the group needs to rebalance # prior to returning data so that the group can stabilize faster diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 0cfcfd2d4..abe37fb86 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -382,9 +382,6 @@ def __init__(self): self._position = None # OffsetAndMetadata exposed to the user self.highwater = None self.drop_pending_record_batch = False - # The last message offset hint available from a record batch with - # magic=2 which includes deleted compacted messages - self.last_offset_from_record_batch = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -400,7 +397,6 @@ def await_reset(self, strategy): self.awaiting_reset = True self.reset_strategy = strategy self._position = None - self.last_offset_from_record_batch = None self.has_valid_position = False def seek(self, offset): @@ -409,7 +405,6 @@ def seek(self, offset): self.reset_strategy = None self.has_valid_position = True self.drop_pending_record_batch = True - self.last_offset_from_record_batch = None def pause(self): self.paused = True diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 73f91a039..df7178562 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -9,6 +9,11 @@ class ABCRecord(object): __slots__ = () + @abc.abstractproperty + def size_in_bytes(self): + """ Number of total bytes in record + """ + @abc.abstractproperty def offset(self): """ Absolute offset of record @@ -40,6 +45,11 @@ def checksum(self): be the checksum for v0 and v1 and None for v2 and above. """ + @abc.abstractmethod + def validate_crc(self): + """ Return True if v0/v1 record matches checksum. noop/True for v2 records + """ + @abc.abstractproperty def headers(self): """ If supported by version list of key-value tuples, or empty list if @@ -100,6 +110,11 @@ def __iter__(self): if needed. """ + @abc.abstractproperty + def magic(self): + """ Return magic value (0, 1, 2) for batch. + """ + @add_metaclass(abc.ABCMeta) class ABCRecords(object): diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 14732cb06..0d69d72a2 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -275,10 +275,10 @@ def _read_msg( if self.is_control_batch: return ControlRecord( - offset, timestamp, self.timestamp_type, key, value, headers) + length, offset, timestamp, self.timestamp_type, key, value, headers) else: return DefaultRecord( - offset, timestamp, self.timestamp_type, key, value, headers) + length, offset, timestamp, self.timestamp_type, key, value, headers) def __iter__(self): self._maybe_uncompress() @@ -314,10 +314,11 @@ def validate_crc(self): class DefaultRecord(ABCRecord): - __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + __slots__ = ("_size_in_bytes", "_offset", "_timestamp", "_timestamp_type", "_key", "_value", "_headers") - def __init__(self, offset, timestamp, timestamp_type, key, value, headers): + def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers): + self._size_in_bytes = size_in_bytes self._offset = offset self._timestamp = timestamp self._timestamp_type = timestamp_type @@ -325,6 +326,10 @@ def __init__(self, offset, timestamp, timestamp_type, key, value, headers): self._value = value self._headers = headers + @property + def size_in_bytes(self): + return self._size_in_bytes + @property def offset(self): return self._offset @@ -361,6 +366,9 @@ def headers(self): def checksum(self): return None + def validate_crc(self): + return True + def __repr__(self): return ( "DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," @@ -371,7 +379,7 @@ def __repr__(self): class ControlRecord(DefaultRecord): - __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + __slots__ = ("_size_in_bytes", "_offset", "_timestamp", "_timestamp_type", "_key", "_value", "_headers", "_version", "_type") KEY_STRUCT = struct.Struct( @@ -379,8 +387,8 @@ class ControlRecord(DefaultRecord): "h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit) ) - def __init__(self, offset, timestamp, timestamp_type, key, value, headers): - super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers) + def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers): + super(ControlRecord, self).__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers) (self._version, self._type) = self.KEY_STRUCT.unpack(self._key) # see https://kafka.apache.org/documentation/#controlbatch @@ -548,8 +556,8 @@ def write_header(self, use_compression_type=True): 0, # CRC will be set below, as we need a filled buffer for it self._get_attributes(use_compression_type), self._last_offset, - self._first_timestamp, - self._max_timestamp, + self._first_timestamp or 0, + self._max_timestamp or 0, self._producer_id, self._producer_epoch, self._base_sequence, diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 2f8523fcb..920b4fcc6 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -164,6 +164,10 @@ def timestamp_type(self): def compression_type(self): return self._attributes & self.CODEC_MASK + @property + def magic(self): + return self._magic + def validate_crc(self): crc = calc_crc32(self._buffer[self.MAGIC_OFFSET:]) return self._crc == crc @@ -232,6 +236,9 @@ def _read_key_value(self, pos): value = self._buffer[pos:pos + value_size].tobytes() return key, value + def _crc_bytes(self, msg_pos, length): + return self._buffer[msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length] + def __iter__(self): if self._magic == 1: key_offset = self.KEY_OFFSET_V1 @@ -255,7 +262,7 @@ def __iter__(self): absolute_base_offset = -1 for header, msg_pos in headers: - offset, _, crc, _, attrs, timestamp = header + offset, length, crc, _, attrs, timestamp = header # There should only ever be a single layer of compression assert not attrs & self.CODEC_MASK, ( 'MessageSet at offset %d appears double-compressed. This ' @@ -271,28 +278,36 @@ def __iter__(self): offset += absolute_base_offset key, value = self._read_key_value(msg_pos + key_offset) + crc_bytes = self._crc_bytes(msg_pos, length) yield LegacyRecord( - offset, timestamp, timestamp_type, - key, value, crc) + self._magic, offset, timestamp, timestamp_type, + key, value, crc, crc_bytes) else: key, value = self._read_key_value(key_offset) + crc_bytes = self._crc_bytes(0, len(self._buffer) - self.LOG_OVERHEAD) yield LegacyRecord( - self._offset, self._timestamp, timestamp_type, - key, value, self._crc) + self._magic, self._offset, self._timestamp, timestamp_type, + key, value, self._crc, crc_bytes) class LegacyRecord(ABCRecord): - __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", - "_crc") + __slots__ = ("_magic", "_offset", "_timestamp", "_timestamp_type", "_key", "_value", + "_crc", "_crc_bytes") - def __init__(self, offset, timestamp, timestamp_type, key, value, crc): + def __init__(self, magic, offset, timestamp, timestamp_type, key, value, crc, crc_bytes): + self._magic = magic self._offset = offset self._timestamp = timestamp self._timestamp_type = timestamp_type self._key = key self._value = value self._crc = crc + self._crc_bytes = crc_bytes + + @property + def magic(self): + return self._magic @property def offset(self): @@ -330,11 +345,19 @@ def headers(self): def checksum(self): return self._crc + def validate_crc(self): + crc = calc_crc32(self._crc_bytes) + return self._crc == crc + + @property + def size_in_bytes(self): + return LegacyRecordBatchBuilder.estimate_size_in_bytes(self._magic, None, self._key, self._value) + def __repr__(self): return ( - "LegacyRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," + "LegacyRecord(magic={!r} offset={!r}, timestamp={!r}, timestamp_type={!r}," " key={!r}, value={!r}, crc={!r})".format( - self._offset, self._timestamp, self._timestamp_type, + self._magic, self._offset, self._timestamp, self._timestamp_type, self._key, self._value, self._crc) ) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index fc2ef2d6b..72baea547 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -115,7 +115,7 @@ class MemoryRecordsBuilder(object): __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed", "_bytes_written") - def __init__(self, magic, compression_type, batch_size): + def __init__(self, magic, compression_type, batch_size, offset=0): assert magic in [0, 1, 2], "Not supported magic" assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type" if magic >= 2: @@ -130,10 +130,14 @@ def __init__(self, magic, compression_type, batch_size): self._batch_size = batch_size self._buffer = None - self._next_offset = 0 + self._next_offset = offset self._closed = False self._bytes_written = 0 + def skip(self, offsets_to_skip): + # Exposed for testing compacted records + self._next_offset += offsets_to_skip + def append(self, timestamp, key, value, headers=[]): """ Append a message to the buffer. diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 3bf334e06..a22f78657 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -52,9 +52,9 @@ def fetcher(client, subscription_state, topic): return Fetcher(client, subscription_state, Metrics()) -def _build_record_batch(msgs, compression=0): +def _build_record_batch(msgs, compression=0, offset=0, magic=2): builder = MemoryRecordsBuilder( - magic=1, compression_type=0, batch_size=9999999) + magic=magic, compression_type=0, batch_size=9999999, offset=offset) for msg in msgs: key, value, timestamp = msg builder.append(key=key, value=value, timestamp=timestamp, headers=[]) @@ -443,8 +443,7 @@ def test__handle_fetch_error(fetcher, caplog, exception, log_level): assert caplog.records[0].levelname == logging.getLevelName(log_level) -def test__unpack_records(fetcher): - fetcher.config['check_crcs'] = False +def test__unpack_records(mocker): tp = TopicPartition('foo', 0) messages = [ (None, b"a", None), @@ -452,7 +451,8 @@ def test__unpack_records(fetcher): (None, b"c", None), ] memory_records = MemoryRecords(_build_record_batch(messages)) - records = list(fetcher._unpack_records(tp, memory_records)) + part_records = Fetcher.PartitionRecords(0, tp, memory_records, None, None, False, mocker.MagicMock()) + records = list(part_records.record_iterator) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a' @@ -463,24 +463,6 @@ def test__unpack_records(fetcher): assert records[2].offset == 2 -def test__message_generator(fetcher, topic, mocker): - fetcher.config['check_crcs'] = False - tp = TopicPartition(topic, 0) - msgs = [] - for i in range(10): - msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) - fetcher._completed_fetches.append(completed_fetch) - for i in range(10): - msg = next(fetcher) - assert isinstance(msg, ConsumerRecord) - assert msg.offset == i - assert msg.value == b'foo' - - def test__parse_fetched_data(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) @@ -493,7 +475,8 @@ def test__parse_fetched_data(fetcher, topic, mocker): ) partition_record = fetcher._parse_fetched_data(completed_fetch) assert isinstance(partition_record, fetcher.PartitionRecords) - assert len(partition_record) == 10 + assert partition_record + assert len(partition_record.take()) == 10 def test__parse_fetched_data__paused(fetcher, topic, mocker): @@ -563,7 +546,7 @@ def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): assert fetcher._subscriptions.assignment[tp].awaiting_reset is True -def test_partition_records_offset(): +def test_partition_records_offset(mocker): """Test that compressed messagesets are handle correctly when fetch offset is in the middle of the message list """ @@ -571,39 +554,45 @@ def test_partition_records_offset(): batch_end = 130 fetch_offset = 123 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, - None, None, 'key', 'value', [], 'checksum', 0, 0, -1) - for i in range(batch_start, batch_end)] - records = Fetcher.PartitionRecords(fetch_offset, None, messages) - assert len(records) > 0 + messages = [(None, b'msg', None) for i in range(batch_start, batch_end)] + memory_records = MemoryRecords(_build_record_batch(messages, offset=batch_start)) + records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records, None, None, False, mocker.MagicMock()) + assert records + assert records.next_fetch_offset == fetch_offset msgs = records.take(1) assert msgs[0].offset == fetch_offset - assert records.fetch_offset == fetch_offset + 1 + assert records.next_fetch_offset == fetch_offset + 1 msgs = records.take(2) assert len(msgs) == 2 - assert len(records) > 0 - records.discard() - assert len(records) == 0 + assert records + assert records.next_fetch_offset == fetch_offset + 3 + records.drain() + assert not records -def test_partition_records_empty(): - records = Fetcher.PartitionRecords(0, None, []) - assert len(records) == 0 +def test_partition_records_empty(mocker): + tp = TopicPartition('foo', 0) + memory_records = MemoryRecords(_build_record_batch([])) + records = Fetcher.PartitionRecords(0, tp, memory_records, None, None, False, mocker.MagicMock()) + msgs = records.take() + assert len(msgs) == 0 + assert not records -def test_partition_records_no_fetch_offset(): +def test_partition_records_no_fetch_offset(mocker): batch_start = 0 batch_end = 100 fetch_offset = 123 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, - None, None, 'key', 'value', None, 'checksum', 0, 0, -1) - for i in range(batch_start, batch_end)] - records = Fetcher.PartitionRecords(fetch_offset, None, messages) - assert len(records) == 0 + messages = [(None, b'msg', None) for i in range(batch_start, batch_end)] + memory_records = MemoryRecords(_build_record_batch(messages, offset=batch_start)) + records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records, None, None, False, mocker.MagicMock()) + msgs = records.take() + assert len(msgs) == 0 + assert not records -def test_partition_records_compacted_offset(): +def test_partition_records_compacted_offset(mocker): """Test that messagesets are handle correctly when the fetch offset points to a message that has been compacted """ @@ -611,10 +600,17 @@ def test_partition_records_compacted_offset(): batch_end = 100 fetch_offset = 42 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, - None, None, 'key', 'value', None, 'checksum', 0, 0, -1) - for i in range(batch_start, batch_end) if i != fetch_offset] - records = Fetcher.PartitionRecords(fetch_offset, None, messages) - assert len(records) == batch_end - fetch_offset - 1 - msgs = records.take(1) + builder = MemoryRecordsBuilder( + magic=2, compression_type=0, batch_size=9999999) + + for i in range(batch_start, batch_end): + if i == fetch_offset: + builder.skip(1) + else: + builder.append(key=None, value=b'msg', timestamp=None, headers=[]) + builder.close() + memory_records = MemoryRecords(builder.buffer()) + records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records, None, None, False, mocker.MagicMock()) + msgs = records.take() + assert len(msgs) == batch_end - fetch_offset - 1 assert msgs[0].offset == fetch_offset + 1