Skip to content

Commit 0989788

Browse files
support extend block tables (#3824)
1 parent 6ef3b61 commit 0989788

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

fastdeploy/engine/request.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class RequestType(Enum):
4040
PREFILL = 0
4141
DECODE = 1
4242
PREEMPTED = 2
43+
EXTEND = 3
4344

4445

4546
@dataclass
@@ -141,6 +142,9 @@ def __init__(
141142
self.task_type = RequestType.PREFILL
142143
self.idx = None
143144
self.need_prefill_tokens = self.prompt_token_ids_len
145+
# extend block tables
146+
self.use_extend_tables = False
147+
self.extend_block_tables = []
144148

145149
@classmethod
146150
def from_dict(cls, d: dict):

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,18 @@ class ScheduledPreemptTask:
5555
task_type: RequestType = RequestType.PREEMPTED
5656

5757

58+
@dataclass
59+
class ScheduledExtendBlocksTask:
60+
"""
61+
Task for allocating new blocks to extend.
62+
"""
63+
64+
idx: int
65+
request_id: str
66+
extend_block_tables: list[int]
67+
task_type: RequestType = RequestType.EXTEND
68+
69+
5870
class ResourceManagerV1(ResourceManager):
5971
"""
6072
Resource manager for scheduler v1.
@@ -80,6 +92,8 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
8092
self.to_be_rescheduled_request_id_set = set()
8193
main_process_metrics.max_batch_size.set(max_num_seqs)
8294

95+
self.using_extend_tables_req_id = set()
96+
8397
def allocated_slots(self, request: Request):
8498
return len(request.block_tables) * self.config.cache_config.block_size
8599

@@ -405,6 +419,57 @@ def schedule(self):
405419
break
406420
else:
407421
llm_logger.error("Unknown request status type")
422+
423+
# schedule when extend block tables is needed
424+
for req in self.running:
425+
num_prefill_blocks = req.need_prefill_tokens // self.config.cache_config.block_size
426+
# alocate
427+
if req.use_extend_tables and req.request_id not in self.using_extend_tables_req_id:
428+
llm_logger.info(
429+
f"req {req.request_id} at batch id {req.idx} with num_prefill_blocks {num_prefill_blocks} is going to enable extend tables"
430+
)
431+
self.using_extend_tables_req_id.add(req.request_id)
432+
if self.cache_manager.can_allocate_gpu_blocks(self.config.cache_config.enc_dec_block_num):
433+
req.extend_block_tables = req.block_tables[:num_prefill_blocks] # copy prompt cache
434+
req.extend_block_tables.extend(
435+
self.cache_manager.allocate_gpu_blocks(self.config.cache_config.enc_dec_block_num)
436+
)
437+
scheduled_reqs.append(
438+
ScheduledExtendBlocksTask(
439+
idx=req.idx, request_id=req.request_id, extend_block_tables=req.extend_block_tables
440+
)
441+
)
442+
llm_logger.info(f"extend blocks is {req.extend_block_tables}")
443+
else:
444+
continue
445+
# recycle
446+
elif not req.use_extend_tables and req.request_id in self.using_extend_tables_req_id:
447+
llm_logger.info(f"req {req.request_id} is going to disable extend tables")
448+
self.using_extend_tables_req_id.remove(req.request_id)
449+
self.cache_manager.recycle_gpu_blocks(req.extend_block_tables[num_prefill_blocks:])
450+
req.extend_block_tables = []
451+
452+
# allocate extend blocks when blocks is going to exhaust
453+
elif req.request_id in self.using_extend_tables_req_id:
454+
if (
455+
self.allocated_slots(req) - req.num_total_tokens
456+
<= self.config.cache_config.prealloc_dec_block_slot_num_threshold
457+
):
458+
llm_logger.info(
459+
f"req {req.request_id} is going to alocate more extend tables because allocated_slots {self.allocated_slots(req)} and prealloc_dec_block_slot_num_threshold {self.config.cache_config.prealloc_dec_block_slot_num_threshold} req.num_total_tokens {req.num_total_tokens}"
460+
)
461+
if self.cache_manager.can_allocate_gpu_blocks(self.config.cache_config.enc_dec_block_num):
462+
req.extend_block_tables.extend(
463+
self.cache_manager.allocate_gpu_blocks(self.config.cache_config.enc_dec_block_num)
464+
)
465+
scheduled_reqs.append(
466+
ScheduledExtendBlocksTask(
467+
idx=req.idx, request_id=req.request_id, extend_block_tables=req.extend_block_tables
468+
)
469+
)
470+
else:
471+
continue
472+
408473
if scheduled_reqs:
409474
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
410475
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
@@ -488,6 +553,15 @@ def _free_blocks(self, request: Request):
488553
self.cache_manager.recycle_gpu_blocks(request.block_tables)
489554
request.block_tables = []
490555

556+
if request.request_id in self.using_extend_tables_req_id:
557+
num_prefill_blocks = request.need_prefill_tokens // self.config.cache_config.block_size
558+
self.using_extend_tables_req_id.remove(request.request_id)
559+
self.cache_manager.recycle_gpu_blocks(request.extend_block_tables[num_prefill_blocks:])
560+
llm_logger.info(
561+
f"req {request.request_id} recycle extend blocks {request.extend_block_tables[num_prefill_blocks:]}"
562+
)
563+
request.extend_block_tables = []
564+
491565
def finish_requests_async(self, request_ids: Union[str, Iterable[str]]):
492566
return self.finish_execution_pool.submit(self.finish_requests, request_ids)
493567

0 commit comments

Comments
 (0)