From 88e5f405b97dbdd6ae2d6192e75a3f20ec4a1dda Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 28 Feb 2025 15:09:00 -0800 Subject: [PATCH 1/6] Implement Incremental Fetch Sessions / KIP-227 --- kafka/consumer/fetcher.py | 314 ++++++++++++++++++++++++++++++++------ 1 file changed, 265 insertions(+), 49 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c6886c490..bcd75ed19 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -57,7 +57,8 @@ class Fetcher(six.Iterator): 'check_crcs': True, 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', - 'retry_backoff_ms': 100 + 'retry_backoff_ms': 100, + 'enable_incremental_fetch_sessions': True, } def __init__(self, client, subscriptions, metrics, **configs): @@ -110,6 +111,7 @@ def __init__(self, client, subscriptions, metrics, **configs): self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) self._isolation_level = READ_UNCOMMITTED + self._session_handlers = {} def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -119,11 +121,11 @@ def send_fetches(self): List of Futures: each future resolves to a FetchResponse """ futures = [] - for node_id, request in six.iteritems(self._create_fetch_requests()): + for node_id, (request, fetch_offsets) in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request, wakeup=False) - future.add_callback(self._handle_fetch_response, request, time.time()) + future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time()) future.add_errback(self._handle_fetch_error, node_id) futures.append(future) self._fetch_futures.extend(futures) @@ -680,11 +682,11 @@ def _create_fetch_requests(self): FetchRequests skipped if no leader, or node has requests in flight Returns: - dict: {node_id: FetchRequest, ...} (version depends on client api_versions) + dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} (version depends on client api_versions) """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() - version = self._client.api_version(FetchRequest, max_version=6) + version = self._client.api_version(FetchRequest, max_version=7) fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) for partition in self._fetchable_partitions(): @@ -708,70 +710,102 @@ def _create_fetch_requests(self): " Requesting metadata update", partition) self._client.cluster.request_update() - elif self._client.in_flight_request_count(node_id) == 0: - if version < 5: - partition_info = ( - partition.partition, - position, - self.config['max_partition_fetch_bytes'] - ) - else: - partition_info = ( - partition.partition, - position, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) - fetchable[node_id][partition.topic].append(partition_info) - log.debug("Adding fetch request for partition %s at offset %d", - partition, position) - else: + elif self._client.in_flight_request_count(node_id) > 0: log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s", partition, node_id) + continue + + if version < 5: + partition_info = ( + partition.partition, + position, + self.config['max_partition_fetch_bytes'] + ) + else: + partition_info = ( + partition.partition, + position, + -1, # log_start_offset is used internally by brokers / replicas only + self.config['max_partition_fetch_bytes'], + ) + fetchable[node_id][partition.topic].append(partition_info) + log.debug("Adding fetch request for partition %s at offset %d", + partition, position) requests = {} for node_id, partition_data in six.iteritems(fetchable): - # As of version == 3 partitions will be returned in order as - # they are requested, so to avoid starvation with - # `fetch_max_bytes` option we need this shuffle - # NOTE: we do have partition_data in random order due to usage - # of unordered structures like dicts, but that does not - # guarantee equal distribution, and starting in Python3.6 - # dicts retain insert order. - partition_data = list(partition_data.items()) - random.shuffle(partition_data) + if version >= 7 and self.config['enable_incremental_fetch_sessions']: + if node_id not in self._session_handlers: + self._session_handlers[node_id] = FetchSessionHandler(node_id) + next_partitions = {TopicPartition(topic, partition_info[0]): partition_info + for topic, partitions in six.iteritems(partition_data) + for partition_info in partitions} + session = self._session_handlers[node_id].build_next(next_partitions) + else: + # No incremental fetch support + session = FetchRequestData(partition_data, [], FetchMetadata.LEGACY) + # As of version == 3 partitions will be returned in order as + # they are requested, so to avoid starvation with + # `fetch_max_bytes` option we need this shuffle + # NOTE: we do have partition_data in random order due to usage + # of unordered structures like dicts, but that does not + # guarantee equal distribution, and starting in Python3.6 + # dicts retain insert order. + partition_data = list(partition_data.items()) + random.shuffle(partition_data) if version <= 2: - requests[node_id] = FetchRequest[version]( + request = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], partition_data) elif version == 3: - requests[node_id] = FetchRequest[version]( + request = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], self.config['fetch_max_bytes'], partition_data) - else: - # through v6 - requests[node_id] = FetchRequest[version]( + elif version <= 6: + request = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], self.config['fetch_max_bytes'], self._isolation_level, partition_data) + else: + # Through v8 + request = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + self._isolation_level, + session.id, + session.epoch, + session.to_send, + session.to_forget) + + fetch_offsets = {} + for topic, partitions in six.iteritems(partition_data): + for partition_data in partitions: + partition, offset = partition_data[:2] + fetch_offsets[TopicPartition(topic, partition)] = offset + + requests[node_id] = (request, fetch_offsets) + return requests - def _handle_fetch_response(self, request, send_time, response): + def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): """The callback for fetch completion""" - fetch_offsets = {} - for topic, partitions in request.topics: - for partition_data in partitions: - partition, offset = partition_data[:2] - fetch_offsets[TopicPartition(topic, partition)] = offset + if response.API_VERSION >= 7 and self.config['enable_incremental_fetch_sessions']: + if node_id not in self._session_handlers: + log.error("Unable to find fetch session handler for node %s. Ignoring fetch response", node_id) + return + if not self._session_handlers[node_id].handle_response(response): + return partitions = set([TopicPartition(topic, partition_data[0]) for topic, partitions in response.topics @@ -784,6 +818,7 @@ def _handle_fetch_response(self, request, send_time, response): random.shuffle(partitions) for partition_data in partitions: tp = TopicPartition(topic, partition_data[0]) + fetch_offset = fetch_offsets[tp] completed_fetch = CompletedFetch( tp, fetch_offsets[tp], response.API_VERSION, @@ -797,12 +832,10 @@ def _handle_fetch_response(self, request, send_time, response): self._sensors.fetch_latency.record((time.time() - send_time) * 1000) def _handle_fetch_error(self, node_id, exception): - log.log( - logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR, - 'Fetch to node %s failed: %s', - node_id, - exception - ) + level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR + log.log(level, 'Fetch to node %s failed: %s', node_id, exception) + if node_id in self._session_handlers: + self._session_handlers[node_id].handle_error(exception) def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition @@ -940,6 +973,189 @@ def take(self, n=None): return res +class FetchSessionHandler(object): + """ + FetchSessionHandler maintains the fetch session state for connecting to a broker. + + Using the protocol outlined by KIP-227, clients can create incremental fetch sessions. + These sessions allow the client to fetch information about a set of partition over + and over, without explicitly enumerating all the partitions in the request and the + response. + + FetchSessionHandler tracks the partitions which are in the session. It also + determines which partitions need to be included in each fetch request, and what + the attached fetch session metadata should be for each request. + """ + + def __init__(self, node_id): + self.node_id = node_id + self.next_metadata = FetchMetadata.INITIAL + self.session_partitions = {} + + def build_next(self, next_partitions): + if self.next_metadata.is_full: + log.debug("Built full fetch %s for node %s with %s partition(s).", + self.next_metadata, self.node_id, len(next_partitions)) + self.session_partitions = next_partitions + return FetchRequestData(next_partitions, [], self.next_metadata); + + prev_tps = set(self.session_partitions.keys()) + next_tps = set(next_partitions.keys()) + log.debug("Building incremental partitions from next: %s, previous: %s", next_tps, prev_tps) + added = next_tps - prev_tps + for tp in added: + self.session_partitions[tp] = next_partitions[tp] + removed = prev_tps - next_tps + for tp in removed: + self.session_partitions.pop(tp) + altered = set() + for tp in next_tps & prev_tps: + if next_partitions[tp] != self.session_partitions[tp]: + self.session_partitions[tp] = next_partitions[tp] + altered.add(tp) + + log.debug("Built incremental fetch %s for node %s. Added %s, altered %s, removed %s out of %s", + self.next_metadata, self.node_id, added, altered, removed, self.session_partitions.keys()) + to_send = {tp: next_partitions[tp] for tp in (added | altered)} + return FetchRequestData(to_send, removed, self.session_partitions.copy(), self.next_metadata) + + def handle_response(self, response): + if response.error_code != Errors.NoError.errno: + error_type = Errors.for_code(response.error_code) + log.info("Node %s was unable to process the fetch request with %s: %s.", + self.node_id, self.next_metadata, error_type()) + if error_type is Errors.FetchSessionIdNotFoundError: + self.next_metadata = FetchMetadata.INITIAL + else: + self.next_metadata = self.next_metadata.next_close_existing() + return False + + response_tps = self._response_partitions(response) + session_tps = set(self.session_partitions.keys()) + if self.next_metadata.is_full: + if response_tps != session_tps: + log.info("Node %s sent an invalid full fetch response with extra %s / omitted %s", + self.node_id, response_tps - session_tps, session_tps - response_tps) + self.next_metadata = FetchMetadata.INITIAL + return False + elif response.session_id == FetchMetadata.INVALID_SESSION_ID: + log.debug("Node %s sent a full fetch response with %s partitions", + self.node_id, len(response_tps)) + self.next_metadata = FetchMetadata.INITIAL + return True + else: + # The server created a new incremental fetch session. + log.debug("Node %s sent a full fetch response that created a new incremental fetch session %s" + " with %s response partitions", + self.node_id, response.session_id, + len(response_tps)) + self.next_metadata = FetchMetadata.new_incremental(response.session_id) + return True + else: + if response_tps - session_tps: + log.info("Node %s sent an invalid incremental fetch response with extra partitions %s", + self.node_id, response_tps - session_tps) + self.next_metadata = self.next_metadata.next_close_existing() + return False + elif response.session_id == FetchMetadata.INVALID_SESSION_ID: + # The incremental fetch session was closed by the server. + log.debug("Node %s sent an incremental fetch response closing session %s" + " with %s response partitions (%s implied)", + self.node_id, self.next_metadata.session_id, + len(response_tps), len(self.session_partitions) - len(response_tps)) + self.next_metadata = FetchMetadata.INITIAL + return True + else: + # The incremental fetch session was continued by the server. + log.debug("Node %s sent an incremental fetch response for session %s" + " with %s response partitions (%s implied)", + self.node_id, response.session_id, + len(response_tps), len(self.session_partitions) - len(response_tps)) + self.next_metadata = self.next_metadata.next_incremental() + return True + + def handle_error(self, _exception): + self.next_metadata = self.next_metadata.next_close_existing() + + def _response_partitions(self, response): + return {TopicPartition(topic, partition_data[0]) + for topic, partitions in response.topics + for partition_data in partitions} + + +class FetchMetadata(object): + __slots__ = ('session_id', 'epoch') + + MAX_EPOCH = 2147483647 + INVALID_SESSION_ID = 0 # used by clients with no session. + INITIAL_EPOCH = 0 # client wants to create or recreate a session. + FINAL_EPOCH = -1 # client wants to close any existing session, and not create a new one. + + def __init__(self, session_id, epoch): + self.session_id = session_id + self.epoch = epoch + + @property + def is_full(self): + return self.epoch == self.INITIAL_EPOCH or self.epoch == self.FINAL_EPOCH + + @classmethod + def next_epoch(cls, prev_epoch): + if prev_epoch < 0: + return cls.FINAL_EPOCH + elif prev_epoch == cls.MAX_EPOCH: + return 1 + else: + return prev_epoch + 1 + + def next_close_existing(self): + return self.__class__(self.session_id, self.INITIAL_EPOCH) + + @classmethod + def new_incremental(cls, session_id): + return cls(session_id, cls.next_epoch(cls.INITIAL_EPOCH)) + + def next_incremental(self): + return self.__class__(self.session_id, self.next_epoch(self.epoch)) + +FetchMetadata.INITIAL = FetchMetadata(FetchMetadata.INVALID_SESSION_ID, FetchMetadata.INITIAL_EPOCH) +FetchMetadata.LEGACY = FetchMetadata(FetchMetadata.INVALID_SESSION_ID, FetchMetadata.FINAL_EPOCH) + + +class FetchRequestData(object): + __slots__ = ('_to_send', '_to_forget', '_metadata') + + def __init__(self, to_send, to_forget, metadata): + self._to_send = to_send # {TopicPartition: (partition, ...)} + self._to_forget = to_forget + self._metadata = metadata + + @property + def metadata(self): + return self._metadata + + @property + def id(self): + return self._metadata.session_id + + @property + def epoch(self): + return self._metadata.epoch + + @property + def to_send(self): + # Return as list of [(topic, [(partition, ...), ...]), ...] + # so it an be passed directly to encoder + partition_data = collections.defaultdict(list) + for tp, partition_info in six.iteritems(self._to_send): + partition_data[tp.topic].append(partition_info) + return list(partition_data.items()) + + @property + def to_forget(self): + return self._to_forget + + class FetchResponseMetricAggregator(object): """ Since we parse the message data for each partition from each fetch From 0cf95798dfb175cfe8975578dca7a22d2d7a43f8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 1 Mar 2025 18:06:33 -0800 Subject: [PATCH 2/6] Fixup FetchRequestData init; to_forget encoding --- kafka/consumer/fetcher.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index bcd75ed19..0d1270027 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1017,7 +1017,7 @@ def build_next(self, next_partitions): log.debug("Built incremental fetch %s for node %s. Added %s, altered %s, removed %s out of %s", self.next_metadata, self.node_id, added, altered, removed, self.session_partitions.keys()) to_send = {tp: next_partitions[tp] for tp in (added | altered)} - return FetchRequestData(to_send, removed, self.session_partitions.copy(), self.next_metadata) + return FetchRequestData(to_send, removed, self.next_metadata) def handle_response(self, response): if response.error_code != Errors.NoError.errno: @@ -1127,7 +1127,7 @@ class FetchRequestData(object): def __init__(self, to_send, to_forget, metadata): self._to_send = to_send # {TopicPartition: (partition, ...)} - self._to_forget = to_forget + self._to_forget = to_forget # {TopicPartition} self._metadata = metadata @property @@ -1153,7 +1153,12 @@ def to_send(self): @property def to_forget(self): - return self._to_forget + # Return as list of [(topic, (partiiton, ...)), ...] + # so it an be passed directly to encoder + partition_data = collections.defaultdict(list) + for tp in six.iteritems(self._to_forget): + partition_data[tp.topic].append(tp.partition) + return list(partition_data.items()) class FetchResponseMetricAggregator(object): From d07a8df477a17601df746b2985fdba5b147f930d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 2 Mar 2025 08:48:43 -0800 Subject: [PATCH 3/6] more fixes for FetchRequestData --- kafka/consumer/fetcher.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 0d1270027..93e6f7b4d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -734,16 +734,16 @@ def _create_fetch_requests(self): requests = {} for node_id, partition_data in six.iteritems(fetchable): + next_partitions = {TopicPartition(topic, partition_info[0]): partition_info + for topic, partitions in six.iteritems(partition_data) + for partition_info in partitions} if version >= 7 and self.config['enable_incremental_fetch_sessions']: if node_id not in self._session_handlers: self._session_handlers[node_id] = FetchSessionHandler(node_id) - next_partitions = {TopicPartition(topic, partition_info[0]): partition_info - for topic, partitions in six.iteritems(partition_data) - for partition_info in partitions} session = self._session_handlers[node_id].build_next(next_partitions) else: # No incremental fetch support - session = FetchRequestData(partition_data, [], FetchMetadata.LEGACY) + session = FetchRequestData(next_partitions, None, FetchMetadata.LEGACY) # As of version == 3 partitions will be returned in order as # they are requested, so to avoid starvation with # `fetch_max_bytes` option we need this shuffle @@ -789,10 +789,9 @@ def _create_fetch_requests(self): session.to_forget) fetch_offsets = {} - for topic, partitions in six.iteritems(partition_data): - for partition_data in partitions: - partition, offset = partition_data[:2] - fetch_offsets[TopicPartition(topic, partition)] = offset + for tp, partition_data in six.iteritems(next_partitions): + offset = partition_data[1] + fetch_offsets[tp] = offset requests[node_id] = (request, fetch_offsets) @@ -997,7 +996,7 @@ def build_next(self, next_partitions): log.debug("Built full fetch %s for node %s with %s partition(s).", self.next_metadata, self.node_id, len(next_partitions)) self.session_partitions = next_partitions - return FetchRequestData(next_partitions, [], self.next_metadata); + return FetchRequestData(next_partitions, None, self.next_metadata); prev_tps = set(self.session_partitions.keys()) next_tps = set(next_partitions.keys()) @@ -1126,8 +1125,8 @@ class FetchRequestData(object): __slots__ = ('_to_send', '_to_forget', '_metadata') def __init__(self, to_send, to_forget, metadata): - self._to_send = to_send # {TopicPartition: (partition, ...)} - self._to_forget = to_forget # {TopicPartition} + self._to_send = to_send or dict() # {TopicPartition: (partition, ...)} + self._to_forget = to_forget or set() # {TopicPartition} self._metadata = metadata @property @@ -1156,7 +1155,7 @@ def to_forget(self): # Return as list of [(topic, (partiiton, ...)), ...] # so it an be passed directly to encoder partition_data = collections.defaultdict(list) - for tp in six.iteritems(self._to_forget): + for tp in self._to_forget: partition_data[tp.topic].append(tp.partition) return list(partition_data.items()) From 4a3b3e73fbeb800ac9da1199d911c04e2101102b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 2 Mar 2025 08:48:55 -0800 Subject: [PATCH 4/6] fix fetcher tests --- test/test_fetcher.py | 47 +++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index e74369289..256c24fda 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -79,8 +79,17 @@ def test_send_fetches(fetcher, topic, mocker): ])]) ] - mocker.patch.object(fetcher, '_create_fetch_requests', - return_value=dict(enumerate(fetch_requests))) + def build_fetch_offsets(request): + fetch_offsets = {} + for topic, partitions in request.topics: + for partition_data in partitions: + partition, offset = partition_data[:2] + fetch_offsets[TopicPartition(topic, partition)] = offset + return fetch_offsets + + mocker.patch.object( + fetcher, '_create_fetch_requests', + return_value=(dict(enumerate(map(lambda r: (r, build_fetch_offsets(r)), fetch_requests))))) mocker.patch.object(fetcher._client, 'ready', return_value=True) mocker.patch.object(fetcher._client, 'send') @@ -100,8 +109,8 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): fetcher._client._api_versions = BROKER_API_VERSIONS[api_version] mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) by_node = fetcher._create_fetch_requests() - requests = by_node.values() - assert set([r.API_VERSION for r in requests]) == set([fetch_version]) + requests_and_offsets = by_node.values() + assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version]) def test_update_fetch_positions(fetcher, topic, mocker): @@ -345,19 +354,15 @@ def test_fetched_records(fetcher, topic, mocker): assert partial is False -@pytest.mark.parametrize(("fetch_request", "fetch_response", "num_partitions"), [ +@pytest.mark.parametrize(("fetch_offsets", "fetch_response", "num_partitions"), [ ( - FetchRequest[0]( - -1, 100, 100, - [('foo', [(0, 0, 1000),])]), + {TopicPartition('foo', 0): 0}, FetchResponse[0]( [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( - FetchRequest[1]( - -1, 100, 100, - [('foo', [(0, 0, 1000), (1, 0, 1000),])]), + {TopicPartition('foo', 0): 0, TopicPartition('foo', 1): 0}, FetchResponse[1]( 0, [("foo", [ @@ -367,41 +372,33 @@ def test_fetched_records(fetcher, topic, mocker): 2, ), ( - FetchRequest[2]( - -1, 100, 100, - [('foo', [(0, 0, 1000),])]), + {TopicPartition('foo', 0): 0}, FetchResponse[2]( 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( - FetchRequest[3]( - -1, 100, 100, 10000, - [('foo', [(0, 0, 1000),])]), + {TopicPartition('foo', 0): 0}, FetchResponse[3]( 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), 1, ), ( - FetchRequest[4]( - -1, 100, 100, 10000, 0, - [('foo', [(0, 0, 1000),])]), + {TopicPartition('foo', 0): 0}, FetchResponse[4]( 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), 1, ), ( # This may only be used in broker-broker api calls - FetchRequest[5]( - -1, 100, 100, 10000, 0, - [('foo', [(0, 0, 1000),])]), + {TopicPartition('foo', 0): 0}, FetchResponse[5]( 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), 1, ), ]) -def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions): - fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response) +def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_partitions): + fetcher._handle_fetch_response(0, fetch_offsets, time.time(), fetch_response) assert len(fetcher._completed_fetches) == num_partitions From 08fc2869ac80a9e4f08529ea6c63dd40cf2cfc37 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 2 Mar 2025 08:58:33 -0800 Subject: [PATCH 5/6] simplify fetchable; push partition randomization to FetchRequestData --- kafka/consumer/fetcher.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 93e6f7b4d..a01457b4a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -687,7 +687,7 @@ def _create_fetch_requests(self): # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() version = self._client.api_version(FetchRequest, max_version=7) - fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) + fetchable = collections.defaultdict(dict) for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) @@ -728,15 +728,12 @@ def _create_fetch_requests(self): -1, # log_start_offset is used internally by brokers / replicas only self.config['max_partition_fetch_bytes'], ) - fetchable[node_id][partition.topic].append(partition_info) + fetchable[node_id][partition] = partition_info log.debug("Adding fetch request for partition %s at offset %d", partition, position) requests = {} - for node_id, partition_data in six.iteritems(fetchable): - next_partitions = {TopicPartition(topic, partition_info[0]): partition_info - for topic, partitions in six.iteritems(partition_data) - for partition_info in partitions} + for node_id, next_partitions in six.iteritems(fetchable): if version >= 7 and self.config['enable_incremental_fetch_sessions']: if node_id not in self._session_handlers: self._session_handlers[node_id] = FetchSessionHandler(node_id) @@ -744,29 +741,20 @@ def _create_fetch_requests(self): else: # No incremental fetch support session = FetchRequestData(next_partitions, None, FetchMetadata.LEGACY) - # As of version == 3 partitions will be returned in order as - # they are requested, so to avoid starvation with - # `fetch_max_bytes` option we need this shuffle - # NOTE: we do have partition_data in random order due to usage - # of unordered structures like dicts, but that does not - # guarantee equal distribution, and starting in Python3.6 - # dicts retain insert order. - partition_data = list(partition_data.items()) - random.shuffle(partition_data) if version <= 2: request = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], - partition_data) + session.to_send) elif version == 3: request = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], self.config['fetch_max_bytes'], - partition_data) + session.to_send) elif version <= 6: request = FetchRequest[version]( -1, # replica_id @@ -774,7 +762,7 @@ def _create_fetch_requests(self): self.config['fetch_min_bytes'], self.config['fetch_max_bytes'], self._isolation_level, - partition_data) + session.to_send) else: # Through v8 request = FetchRequest[version]( @@ -1148,7 +1136,14 @@ def to_send(self): partition_data = collections.defaultdict(list) for tp, partition_info in six.iteritems(self._to_send): partition_data[tp.topic].append(partition_info) - return list(partition_data.items()) + # As of version == 3 partitions will be returned in order as + # they are requested, so to avoid starvation with + # `fetch_max_bytes` option we need this shuffle + # NOTE: we do have partition_data in random order due to usage + # of unordered structures like dicts, but that does not + # guarantee equal distribution, and starting in Python3.6 + # dicts retain insert order. + return random.sample(list(partition_data.items()), k=len(partition_data)) @property def to_forget(self): From d6fc4cd20ad593cd076bf01a4f11df08425babcc Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 3 Mar 2025 12:45:52 -0800 Subject: [PATCH 6/6] Document enable_incremental_fetch_sessions config --- kafka/consumer/fetcher.py | 2 ++ kafka/consumer/group.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index a01457b4a..795aaf1bb 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -69,6 +69,8 @@ def __init__(self, client, subscriptions, metrics, **configs): raw message key and returns a deserialized key. value_deserializer (callable, optional): Any callable that takes a raw message value and returns a deserialized value. + enable_incremental_fetch_sessions: (bool): Use incremental fetch sessions + when available / supported by kafka broker. See KIP-227. Default: True. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_max_wait_ms for more data to accumulate. Default: 1. diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 38d758578..f150c4bd6 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -60,6 +60,8 @@ class KafkaConsumer(six.Iterator): raw message key and returns a deserialized key. value_deserializer (callable): Any callable that takes a raw message value and returns a deserialized value. + enable_incremental_fetch_sessions: (bool): Use incremental fetch sessions + when available / supported by kafka broker. See KIP-227. Default: True. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_max_wait_ms for more data to accumulate. Default: 1. @@ -266,6 +268,7 @@ class KafkaConsumer(six.Iterator): 'group_id': None, 'key_deserializer': None, 'value_deserializer': None, + 'enable_incremental_fetch_sessions': True, 'fetch_max_wait_ms': 500, 'fetch_min_bytes': 1, 'fetch_max_bytes': 52428800,