-
-
Notifications
You must be signed in to change notification settings - Fork 16.5k
[kv_offload] Merge FilterReusedOffloadingManager and add lifecycle hook to OffloadingManager
#41727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||
| from collections import OrderedDict | ||
| from collections.abc import Collection, Iterable | ||
| from typing import Literal | ||
|
|
||
|
|
@@ -198,3 +199,106 @@ def take_events(self) -> Iterable[OffloadingEvent]: | |
| if self.events is not None: | ||
| yield from self.events | ||
| self.events.clear() | ||
|
|
||
|
|
||
| class FilterReusedOffloadingManager(OffloadingManager): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was actually thinking to remove
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @orozery Do you want it to be done in this PR or in a separate PR? |
||
| """An :class:`OffloadingManager` decorator that skips storing blocks | ||
| whose reuse frequency is below *store_threshold*. | ||
|
|
||
| All methods are delegated to the *backing* manager. Two methods are | ||
| intercepted: | ||
|
|
||
| * ``prepare_store`` — filters out keys that have not yet | ||
| * ``lookup`` — records the visited key in an internal LRU | ||
| counter, then delegates to the backing manager. | ||
| crossed the threshold *before* calling the backing | ||
| ``prepare_store``. | ||
|
hickeyma marked this conversation as resolved.
|
||
|
|
||
| Args: | ||
| backing: The underlying ``OffloadingManager`` to delegate to. | ||
| store_threshold: A block must be seen at least this many times in | ||
| ``lookup()`` before it is eligible for offloading. Must be >= 2 | ||
| (a value of 1 would be equivalent to no filtering). | ||
| max_tracker_size: Maximum entries in the internal tracker's LRU table. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| backing: OffloadingManager, | ||
| store_threshold: int = 2, | ||
| max_tracker_size: int = 64_000, | ||
| ): | ||
| if store_threshold < 2: | ||
| raise ValueError( | ||
| "FilterReusedOffloadingManager store_threshold must be >= 2, " | ||
| f"got {store_threshold}" | ||
| ) | ||
| if max_tracker_size < 1: | ||
| raise ValueError( | ||
| "FilterReusedOffloadingManager max_tracker_size must be >= 1, " | ||
| f"got {max_tracker_size}" | ||
| ) | ||
| self._backing = backing | ||
| self.store_threshold = store_threshold | ||
| self.max_tracker_size = max_tracker_size | ||
| # Ordered so we can evict the LRU entry in O(1). | ||
| self.counts: OrderedDict[OffloadKey, int] = OrderedDict() | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Intercepted methods | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None: | ||
| """Record the key, then delegate lookup to backing manager.""" | ||
| if key in self.counts: | ||
| self.counts.move_to_end(key) | ||
| self.counts[key] += 1 | ||
| else: | ||
| if len(self.counts) >= self.max_tracker_size: | ||
| self.counts.popitem(last=False) # evict LRU | ||
| self.counts[key] = 1 | ||
| return self._backing.lookup(key, req_context) | ||
|
|
||
| def prepare_store( | ||
| self, keys: Collection[OffloadKey], req_context: ReqContext | ||
| ) -> PrepareStoreOutput | None: | ||
| """Filter out blocks below threshold, then delegate to backing. | ||
|
|
||
| Filtering is evaluated *before* calling the backing manager's | ||
| ``prepare_store`` so that blocks that would be skipped do not | ||
| consume any CPU offload capacity. | ||
| """ | ||
| eligible = [ | ||
| key for key in keys if self.counts.get(key, 0) >= self.store_threshold | ||
| ] | ||
|
|
||
| # Passing an empty list is intentional and safe — CPUOffloadingManager | ||
| # handles it correctly, returning a PrepareStoreOutput with empty lists. | ||
| # Delegate to the backing manager with only the eligible keys. | ||
| return self._backing.prepare_store(eligible, req_context) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Delegated methods | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def prepare_load( | ||
| self, keys: Collection[OffloadKey], req_context: ReqContext | ||
| ) -> LoadStoreSpec: | ||
| return self._backing.prepare_load(keys, req_context) | ||
|
|
||
| def touch(self, keys: Collection[OffloadKey]) -> None: | ||
| return self._backing.touch(keys) | ||
|
|
||
| def complete_load(self, keys: Collection[OffloadKey]) -> None: | ||
| return self._backing.complete_load(keys) | ||
|
|
||
| def complete_store( | ||
| self, keys: Collection[OffloadKey], success: bool = True | ||
| ) -> None: | ||
| return self._backing.complete_store(keys, success) | ||
|
|
||
| def take_events(self) -> Iterable[OffloadingEvent]: | ||
| return self._backing.take_events() | ||
|
|
||
| def request_finished(self, req_id: str) -> bool: | ||
| return self._backing.request_finished(req_id) | ||
|
hickeyma marked this conversation as resolved.
|
||
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.