diff --git a/tests/v1/kv_offload/cpu/test_manager.py b/tests/v1/kv_offload/cpu/test_manager.py index e043590a4184..f007eb76ba02 100644 --- a/tests/v1/kv_offload/cpu/test_manager.py +++ b/tests/v1/kv_offload/cpu/test_manager.py @@ -17,7 +17,6 @@ from vllm.v1.kv_offload.cpu.common import CPULoadStoreSpec from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy -from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager def make_req_context(kv_transfer_params: dict | None = None) -> ReqContext: @@ -565,14 +564,14 @@ def test_full_scenario(self): def test_filter_reused_manager(): """ - Tests FilterReusedOffloadingManager with a CPUOffloadingManager. + Tests CPUOffloadingManager reuse filtering (store_threshold=2). """ - lru_manager = CPUOffloadingManager( - num_blocks=4, cache_policy="lru", enable_events=True - ) - - manager = FilterReusedOffloadingManager( - backing=lru_manager, store_threshold=2, max_tracker_size=3 + manager = CPUOffloadingManager( + num_blocks=4, + cache_policy="lru", + enable_events=True, + store_threshold=2, + max_tracker_size=3, ) # Lookup [1, 2] -> 1st time, added to tracker but not eligible for store yet diff --git a/vllm/v1/kv_offload/cpu/manager.py b/vllm/v1/kv_offload/cpu/manager.py index 80bcb568f99a..6e32dadc844b 100644 --- a/vllm/v1/kv_offload/cpu/manager.py +++ b/vllm/v1/kv_offload/cpu/manager.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from collections import OrderedDict from collections.abc import Collection, Iterable from typing import Literal @@ -37,6 +38,8 @@ def __init__( num_blocks: int, cache_policy: Literal["lru", "arc"] = "lru", enable_events: bool = False, + store_threshold: int = 1, + max_tracker_size: int = 64_000, ): self.medium: str = CPULoadStoreSpec.medium() self._num_blocks: int = num_blocks @@ -50,6 +53,11 @@ def __init__( f"Supported: {list(_CACHE_POLICIES)}" ) self._policy: CachePolicy = policy_cls(cache_capacity=num_blocks) + self.store_threshold: int = store_threshold + self.max_tracker_size: int = max_tracker_size + self.counts: OrderedDict[OffloadKey, int] | None = ( + OrderedDict() if store_threshold >= 2 else None + ) # --- block pool --- @@ -85,6 +93,14 @@ def _get_load_store_spec( # --- OffloadingManager interface --- def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None: + if self.counts is not None: + if key in self.counts: + self.counts.move_to_end(key) + self.counts[key] += 1 + else: + if len(self.counts) >= self.max_tracker_size: + self.counts.popitem(last=False) + self.counts[key] = 1 block = self._policy.get(key) if block is None: return False @@ -121,6 +137,8 @@ def prepare_store( keys: Collection[OffloadKey], req_context: ReqContext, ) -> PrepareStoreOutput | None: + if self.counts is not None: + keys = [k for k in keys if self.counts.get(k, 0) >= self.store_threshold] # filter out blocks that are already stored keys_to_store = [k for k in keys if self._policy.get(k) is None] diff --git a/vllm/v1/kv_offload/cpu/spec.py b/vllm/v1/kv_offload/cpu/spec.py index 54046d98f452..b90f51199098 100644 --- a/vllm/v1/kv_offload/cpu/spec.py +++ b/vllm/v1/kv_offload/cpu/spec.py @@ -15,7 +15,6 @@ from vllm.v1.kv_offload.cpu.common import CPULoadStoreSpec from vllm.v1.kv_offload.cpu.gpu_worker import CpuGpuOffloadingHandlers from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager -from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager from vllm.v1.kv_offload.worker.worker import OffloadingHandler @@ -60,26 +59,15 @@ def get_manager(self) -> OffloadingManager: enable_events = ( kv_events_config is not None and kv_events_config.enable_kv_cache_events ) - + store_threshold = int(self.extra_config.get("store_threshold", 0)) + max_tracker_size = int(self.extra_config.get("max_tracker_size", 64_000)) self._manager = CPUOffloadingManager( num_blocks=self.num_blocks, cache_policy=self.eviction_policy, # type: ignore[arg-type] enable_events=enable_events, + store_threshold=store_threshold, + max_tracker_size=max_tracker_size, ) - - # store_threshold: how many times a block must appear in lookup() - # before it is eligible for CPU offloading. Values < 2 disable - # filtering (a threshold of 1 equals no filter; 0 is the default). - store_threshold = int(self.extra_config.get("store_threshold", 0)) - if store_threshold >= 2: - max_tracker_size = int( - self.extra_config.get("max_tracker_size", 64_000) - ) - self._manager = FilterReusedOffloadingManager( - backing=self._manager, - store_threshold=store_threshold, - max_tracker_size=max_tracker_size, - ) return self._manager def get_handlers( diff --git a/vllm/v1/kv_offload/reuse_manager.py b/vllm/v1/kv_offload/reuse_manager.py deleted file mode 100644 index 6cb0a5f7591c..000000000000 --- a/vllm/v1/kv_offload/reuse_manager.py +++ /dev/null @@ -1,120 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -""" -Reuse-frequency gating for CPU KV-cache offload stores. - -FilterReusedOffloadingManager — OffloadingManager decorator that skips - storing blocks that have not yet been seen enough times. -""" - -from collections import OrderedDict -from collections.abc import Collection, Iterable - -from vllm.v1.kv_offload.base import ( - LoadStoreSpec, - OffloadingEvent, - OffloadingManager, - OffloadKey, - PrepareStoreOutput, - ReqContext, -) - - -class FilterReusedOffloadingManager(OffloadingManager): - """An :class:`OffloadingManager` decorator that skips storing blocks - whose reuse frequency is below *store_threshold*. - - All methods are delegated to the *backing* manager. Two methods are - intercepted: - - * ``prepare_store`` — filters out keys that have not yet - * ``lookup`` — records the visited key in an internal LRU - counter, then delegates to the backing manager. - crossed the threshold *before* calling the backing - ``prepare_store``. - - Args: - backing: The underlying ``OffloadingManager`` to delegate to. - store_threshold: A block must be seen at least this many times in - ``lookup()`` before it is eligible for offloading. Must be >= 2 - (a value of 1 would be equivalent to no filtering). - max_tracker_size: Maximum entries in the internal tracker's LRU table. - """ - - def __init__( - self, - backing: OffloadingManager, - store_threshold: int = 2, - max_tracker_size: int = 64_000, - ): - if store_threshold < 2: - raise ValueError( - "FilterReusedOffloadingManager store_threshold must be >= 2, " - f"got {store_threshold}" - ) - if max_tracker_size < 1: - raise ValueError( - "FilterReusedOffloadingManager max_tracker_size must be >= 1, " - f"got {max_tracker_size}" - ) - self._backing = backing - self.store_threshold = store_threshold - self.max_tracker_size = max_tracker_size - # Ordered so we can evict the LRU entry in O(1). - self.counts: OrderedDict[OffloadKey, int] = OrderedDict() - - # ------------------------------------------------------------------ - # Intercepted methods - # ------------------------------------------------------------------ - - def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None: - """Record the key, then delegate lookup to backing manager.""" - if key in self.counts: - self.counts.move_to_end(key) - self.counts[key] += 1 - else: - if len(self.counts) >= self.max_tracker_size: - self.counts.popitem(last=False) # evict LRU - self.counts[key] = 1 - return self._backing.lookup(key, req_context) - - def prepare_store( - self, keys: Collection[OffloadKey], req_context: ReqContext - ) -> PrepareStoreOutput | None: - """Filter out blocks below threshold, then delegate to backing. - - Filtering is evaluated *before* calling the backing manager's - ``prepare_store`` so that blocks that would be skipped do not - consume any CPU offload capacity. - """ - eligible = [ - key for key in keys if self.counts.get(key, 0) >= self.store_threshold - ] - - # Passing an empty list is intentional and safe — CPUOffloadingManager - # handles it correctly, returning a PrepareStoreOutput with empty lists. - # Delegate to the backing manager with only the eligible keys. - return self._backing.prepare_store(eligible, req_context) - - # ------------------------------------------------------------------ - # Delegated methods - # ------------------------------------------------------------------ - - def prepare_load( - self, keys: Collection[OffloadKey], req_context: ReqContext - ) -> LoadStoreSpec: - return self._backing.prepare_load(keys, req_context) - - def touch(self, keys: Collection[OffloadKey]) -> None: - return self._backing.touch(keys) - - def complete_load(self, keys: Collection[OffloadKey]) -> None: - return self._backing.complete_load(keys) - - def complete_store( - self, keys: Collection[OffloadKey], success: bool = True - ) -> None: - return self._backing.complete_store(keys, success) - - def take_events(self) -> Iterable[OffloadingEvent]: - return self._backing.take_events()