-
-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Expand file tree
/
Copy pathgpu_worker.py
More file actions
445 lines (386 loc) · 16.6 KB
/
gpu_worker.py
File metadata and controls
445 lines (386 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import time
from collections import deque
from dataclasses import dataclass
import numpy as np
import torch
from vllm import _custom_ops as ops
from vllm.logger import init_logger
from vllm.utils.math_utils import cdiv
from vllm.utils.platform_utils import is_pin_memory_available
from vllm.v1.kv_offload.base import (
BlockIDsLoadStoreSpec,
CanonicalKVCacheRef,
CanonicalKVCaches,
GPULoadStoreSpec,
)
from vllm.v1.kv_offload.cpu.shared_offload_region import SharedOffloadRegion
from vllm.v1.kv_offload.worker.worker import (
OffloadingHandler,
TransferResult,
TransferSpec,
)
logger = init_logger(__name__)
@dataclass
class Transfer:
job_id: int
stream: torch.cuda.Stream
start_event: torch.Event
end_event: torch.Event
num_bytes: int
def compute_sub_block_ptrs(
block_ids: np.ndarray,
block_size_factor: int,
output: np.ndarray,
tensor: torch.Tensor,
skip_count: int = 0,
):
"""
Compute byte pointers for sub-blocks of the given block IDs.
Each block in block_ids contains block_size_factor sub-blocks.
The pointer for sub-block j of block b is:
base_ptr + b * row_stride + j * sub_block_size
where sub_block_size = tensor.shape[1] // block_size_factor (gpu page size).
This handles tensors where row_stride != block_size_factor * sub_block_size
(e.g. non-contiguous CPU tensors).
Args:
block_ids: array of block IDs at the tensor's native granularity.
block_size_factor: number of sub-blocks per block.
output: pre-allocated int64 array to write pointers into.
tensor: the source or destination tensor.
skip_count: sub-blocks to skip in the first block.
"""
assert skip_count < block_size_factor
num_sub_blocks = len(output)
base_ptr = tensor.data_ptr()
row_stride = tensor.stride(0)
if block_size_factor == 1:
# Fast path: 1:1 mapping, no sub-block expansion needed.
output[:] = base_ptr + block_ids[:num_sub_blocks] * row_stride
return
# Vectorized expansion for block_size_factor > 1.
assert tensor.shape[1] % block_size_factor == 0
sub_block_size = tensor.shape[1] // block_size_factor
sub_offsets = np.arange(block_size_factor, dtype=np.int64) * sub_block_size
# (num_blocks, 1) + (1, block_size_factor) -> (num_blocks, block_size_factor)
all_ptrs = (
base_ptr + block_ids.astype(np.int64)[:, np.newaxis] * row_stride
) + sub_offsets[np.newaxis, :]
# Flatten and apply skip_count / truncation
flat = all_ptrs.ravel()
output[:] = flat[skip_count : skip_count + num_sub_blocks]
def pin_mmap_region(region: SharedOffloadRegion) -> None:
"""Register the entire mmap as CUDA pinned memory via cudaHostRegister."""
rank = region.rank
base_ptr = region._base.data_ptr()
result = torch.cuda.cudart().cudaHostRegister(base_ptr, region.total_size_bytes, 0)
if result.value != 0:
logger.warning(
"cudaHostRegister failed for rank=%d (code=%d) — "
"transfers will still work but may be slower (unpinned DMA)",
rank,
result,
)
else:
logger.debug(
"cudaHostRegister rank=%d %.2f GB",
rank,
region.total_size_bytes / 1e9,
)
region.is_pinned = True
class SingleDirectionOffloadingHandler(OffloadingHandler):
"""
SingleDirectionOffloadingHandler handles transfers for a single direction,
either CPU->GPU or GPU->CPU.
Transfers are guaranteed to be executed in order of their submission.
Each transfer uses a unique CUDA stream, and its stream will start
executing only after the streams of previous transfers have finished.
"""
def __init__(
self,
gpu_tensors: list[torch.Tensor],
cpu_tensors: list[torch.Tensor],
block_size_factor: int,
kv_cache_groups_data_refs: list[list[CanonicalKVCacheRef]],
gpu_to_cpu: bool,
):
"""
Initialize a SingleDirectionOffloadingHandler.
Args:
gpu_tensors: list of GPU KV cache tensors.
Each of shape (num_gpu_blocks, gpu_page_size_bytes) with dtype int8.
cpu_tensors: list of CPU KV cache tensors.
Each of shape (num_cpu_blocks, cpu_page_size_bytes) with dtype int8.
Order should match gpu_tensors.
kv_cache_groups_data_refs: list of CanonicalKVCacheRef per group.
gpu_to_cpu: if True, transfer from GPU to CPU; otherwise CPU to GPU.
"""
assert len(gpu_tensors) == len(cpu_tensors)
assert len(gpu_tensors) > 0
# assert input tensors are as expected
for gpu_tensor, cpu_tensor in zip(gpu_tensors, cpu_tensors):
assert gpu_tensor.dtype == torch.int8
assert gpu_tensor.ndim == 2
assert gpu_tensor.is_cuda
assert cpu_tensor.dtype == torch.int8
assert cpu_tensor.ndim == 2
assert cpu_tensor.device.type == "cpu"
_, gpu_page_size = gpu_tensor.shape
_, cpu_page_size = cpu_tensor.shape
assert cpu_page_size == gpu_page_size * block_size_factor
self.src_tensors: list[torch.Tensor] = (
gpu_tensors if gpu_to_cpu else cpu_tensors
)
self.dst_tensors: list[torch.Tensor] = (
cpu_tensors if gpu_to_cpu else gpu_tensors
)
self.gpu_to_cpu: bool = gpu_to_cpu
self.kv_cache_groups_data_refs = kv_cache_groups_data_refs
# GPU blocks may be smaller
# cpu_page_size = gpu_page_size * block_size_factor.
self.src_block_size_factor = 1 if self.gpu_to_cpu else block_size_factor
self.dst_block_size_factor = block_size_factor if self.gpu_to_cpu else 1
self.transfer_type = ("GPU", "CPU") if self.gpu_to_cpu else ("CPU", "GPU")
# job_id -> event
self._transfer_events: dict[int, torch.Event] = {}
# queue of transfers (job_id, stream, event)
self._transfers: deque[Transfer] = deque()
# list of CUDA streams available for re-use
self._stream_pool: list[torch.cuda.Stream] = []
# list of CUDA events available for re-use
self._event_pool: list[torch.Event] = []
def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool:
src_spec, dst_spec = transfer_spec
assert isinstance(src_spec, BlockIDsLoadStoreSpec)
assert isinstance(dst_spec, BlockIDsLoadStoreSpec)
src_blocks = src_spec.block_ids
dst_blocks = dst_spec.block_ids
assert src_blocks.ndim == 1
assert dst_blocks.ndim == 1
num_src_blocks = len(src_blocks)
num_dst_blocks = len(dst_blocks)
# There are 2 types of transfers:
# 1. GPU -> CPU
# 2. CPU -> GPU
#
# transfers are also to CPU blocks, EXCEPT MAYBE for the first and last block.
# i.e. the first and last CPU blocks in src_blocks can match against
# a smaller (byte-wise) set of GPU blocks in dst_blocks.
# In such cases, we may need to skip some gpu-sized sub-blocks,
# and start reading/writing from the middle of the first CPU block.
# If we have multiple KV cache groups (when using HMA with hybrid models),
# we may have a partial first/last CPU block per each group.
# The group_sizes parameter encodes the size of each group of blocks
# in the GPU dst_blocks.
# If group_sizes is None, we assume all blocks belong to a single group.
# The logical_offset parameter maps each group of blocks to its logical
# offset inside the request, counting in GPU blocks.
# This allows us to find the correct starting position
# in the matching first CPU block.
# extract group_sizes from the GPU spec
gpu_spec = src_spec if self.gpu_to_cpu else dst_spec
assert isinstance(gpu_spec, GPULoadStoreSpec)
group_sizes = gpu_spec.group_sizes
assert len(group_sizes) == len(self.kv_cache_groups_data_refs)
# extract block indices from the GPU spec
block_indices = gpu_spec.block_indices
assert len(block_indices) == len(self.kv_cache_groups_data_refs)
num_copy_ops = 0
for group_size, group_data_refs in zip(
group_sizes, self.kv_cache_groups_data_refs
):
num_copy_ops += group_size * len(group_data_refs)
all_src = np.empty(num_copy_ops, dtype=np.int64)
all_dst = np.empty(num_copy_ops, dtype=np.int64)
all_sizes = np.empty(num_copy_ops, dtype=np.int64)
src_offset = 0
dst_offset = 0
op_idx = 0
# count total number of bytes copied
num_transfer_bytes = 0
for group_size, block_idx, group_data_refs in zip(
group_sizes, block_indices, self.kv_cache_groups_data_refs
):
if group_size == 0:
continue
src_logical_blocks_to_skip = block_idx % self.src_block_size_factor
dst_logical_blocks_to_skip = block_idx % self.dst_block_size_factor
src_logical_blocks_count = group_size + src_logical_blocks_to_skip
dst_logical_blocks_count = group_size + dst_logical_blocks_to_skip
dst_blocks_count = cdiv(
dst_logical_blocks_count, self.dst_block_size_factor
)
dst_end_offset = dst_offset + dst_blocks_count
assert dst_end_offset <= num_dst_blocks
src_blocks_count = cdiv(
src_logical_blocks_count, self.src_block_size_factor
)
src_end_offset = src_offset + src_blocks_count
assert src_end_offset <= num_src_blocks
group_src = src_blocks[src_offset:src_end_offset]
group_dst = dst_blocks[dst_offset:dst_end_offset]
for data_ref in group_data_refs:
t_idx = data_ref.tensor_idx
end_idx = op_idx + group_size
compute_sub_block_ptrs(
group_src,
self.src_block_size_factor,
all_src[op_idx:end_idx],
self.src_tensors[t_idx],
skip_count=src_logical_blocks_to_skip,
)
compute_sub_block_ptrs(
group_dst,
self.dst_block_size_factor,
all_dst[op_idx:end_idx],
self.dst_tensors[t_idx],
skip_count=dst_logical_blocks_to_skip,
)
all_sizes[op_idx:end_idx] = data_ref.page_size_bytes
num_transfer_bytes += group_size * data_ref.page_size_bytes
op_idx = end_idx
src_offset = src_end_offset
dst_offset = dst_end_offset
assert src_offset == num_src_blocks
assert dst_offset == num_dst_blocks
assert op_idx == num_copy_ops
batch_src = torch.from_numpy(all_src)
batch_dst = torch.from_numpy(all_dst)
batch_sizes = torch.from_numpy(all_sizes)
stream = self._stream_pool.pop() if self._stream_pool else torch.cuda.Stream()
start_event = (
self._event_pool.pop()
if self._event_pool
else torch.Event(enable_timing=True)
)
end_event = (
self._event_pool.pop()
if self._event_pool
else torch.Event(enable_timing=True)
)
if self.gpu_to_cpu:
# wait for model computation to finish before offloading
stream.wait_stream(torch.cuda.current_stream())
if self._transfers:
last_transfer: Transfer = self._transfers[-1]
last_event = last_transfer.end_event
# assure job will start only after the previous one completes
stream.wait_event(last_event)
# CPU->GPU reads from host pinned memory, which is never written
# by a concurrent GPU stream, so CU_MEMCPY_SRC_ACCESS_ORDER_ANY is
# safe and lets the driver pipeline source reads. GPU->CPU reads
# from the live GPU KV cache, which the compute stream keeps
# writing; we must keep STREAM ordering so source reads are gated
# by the transfer stream's wait_stream(compute) barrier.
is_src_access_order_any = not self.gpu_to_cpu
with torch.cuda.stream(stream):
start_event.record(stream)
if num_copy_ops > 0:
ops.swap_blocks_batch(
batch_src,
batch_dst,
batch_sizes,
is_src_access_order_any=is_src_access_order_any,
)
end_event.record(stream)
self._transfer_events[job_id] = end_event
self._transfers.append(
Transfer(
job_id=job_id,
stream=stream,
start_event=start_event,
end_event=end_event,
num_bytes=num_transfer_bytes,
)
)
# success
return True
def get_finished(self) -> list[TransferResult]:
results: list[TransferResult] = []
while self._transfers and self._transfers[0].end_event.query():
transfer = self._transfers.popleft()
transfer_time = (
transfer.start_event.elapsed_time(transfer.end_event) * 1e-3
) # elapsed_time is in milliseconds
result = TransferResult(
job_id=transfer.job_id,
success=True,
transfer_size=transfer.num_bytes,
transfer_time=transfer_time,
transfer_type=self.transfer_type,
)
results.append(result)
self._stream_pool.append(transfer.stream)
self._event_pool.append(transfer.end_event)
self._event_pool.append(transfer.start_event)
del self._transfer_events[transfer.job_id]
return results
def wait(self, job_ids: set[int]):
for job_id in job_ids:
event = self._transfer_events.get(job_id)
if event is not None:
event.synchronize()
def shutdown(self) -> None:
while self._transfers:
transfer = self._transfers.popleft()
transfer.end_event.synchronize()
self._transfer_events.clear()
self._stream_pool.clear()
self._event_pool.clear()
self.src_tensors.clear()
self.dst_tensors.clear()
class CpuGpuOffloadingHandlers:
def __init__(
self,
kv_caches: CanonicalKVCaches,
block_size_factor: int,
num_cpu_blocks: int,
mmap_region: SharedOffloadRegion | None = None,
):
pin_memory = is_pin_memory_available()
logger.info("Allocating %d CPU tensors...", len(kv_caches.tensors))
self._mmap_region = mmap_region
if mmap_region is not None and pin_memory:
pin_mmap_region(mmap_region)
gpu_tensors: list[torch.Tensor] = []
cpu_tensors: list[torch.Tensor] = []
for kv_cache_tensor in kv_caches.tensors:
gpu_page_size_bytes = kv_cache_tensor.page_size_bytes
gpu_tensor = kv_cache_tensor.tensor.view(torch.int8).view(
(-1, gpu_page_size_bytes)
)
cpu_page_size_bytes = gpu_page_size_bytes * block_size_factor
if mmap_region is not None:
cpu_tensor = mmap_region.create_next_view(cpu_page_size_bytes)
else:
t0 = time.monotonic()
cpu_tensor = torch.zeros(
(num_cpu_blocks, cpu_page_size_bytes),
dtype=torch.int8,
device="cpu",
pin_memory=pin_memory,
)
logger.debug(
"torch.zeros pinned tensor %d×%d (%.2f GB): %.3f s",
num_cpu_blocks,
cpu_page_size_bytes,
num_cpu_blocks * cpu_page_size_bytes / 1e9,
time.monotonic() - t0,
)
gpu_tensors.append(gpu_tensor)
cpu_tensors.append(cpu_tensor)
self.gpu_to_cpu_handler = SingleDirectionOffloadingHandler(
gpu_tensors=gpu_tensors,
cpu_tensors=cpu_tensors,
block_size_factor=block_size_factor,
kv_cache_groups_data_refs=kv_caches.group_data_refs,
gpu_to_cpu=True,
)
self.cpu_to_gpu_handler = SingleDirectionOffloadingHandler(
gpu_tensors=gpu_tensors,
cpu_tensors=cpu_tensors,
block_size_factor=block_size_factor,
kv_cache_groups_data_refs=kv_caches.group_data_refs,
gpu_to_cpu=False,
)