Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ def __init__(
From old wersion worker args
TODO(gongshaotian): Reclassify
"""
self.max_num_seqs: int = 34
# Set default block num for profile run
self.total_block_num: int = 2000
# block size
Expand All @@ -297,7 +296,6 @@ def __init__(
# Do profile or not
self.do_profile: bool = False

self.max_num_batched_tokens: int = 2048
# splitwise role
self.splitwise_role: str = "mixed"
# guided decoding backend
Expand Down Expand Up @@ -1109,8 +1107,6 @@ def __init__(
speculative_config: SpeculativeConfig = None,
tokenizer: str = None,
max_model_len: int = 8192,
max_num_seqs: int = 8,
max_num_batched_tokens: Optional[int] = None,
ips: str = None,
use_warmup: bool = False,
engine_worker_queue_port: str = "8002",
Expand Down Expand Up @@ -1143,19 +1139,18 @@ def __init__(
self.moba_attention_config: Optional[MobaAttentionConfig] = moba_attention_config
# Initialize cuda graph capture list
if self.graph_opt_config.cudagraph_capture_sizes is None:
self.graph_opt_config._set_cudagraph_sizes(max_num_seqs=self.parallel_config.max_num_seqs)
self.graph_opt_config._set_cudagraph_sizes(max_num_seqs=self.scheduler_config.max_num_seqs)

if self.graph_opt_config.cudagraph_only_prefill:
self.graph_opt_config.init_with_cudagrpah_size(max_capture_size=512)
else:
self.graph_opt_config.init_with_cudagrpah_size(max_capture_size=self.parallel_config.max_num_seqs)
self.graph_opt_config.init_with_cudagrpah_size(max_capture_size=self.scheduler_config.max_num_seqs)

# TODO(wangmingkai02): change graph_opt_level=2 when using static mode with cinn
if self.graph_opt_config.graph_opt_level == 2:
self.graph_opt_config.graph_opt_level = 1

self.tokenizer = tokenizer
self.max_num_batched_tokens = max_num_batched_tokens
self.ips = ips
self.tool_parser = tool_parser

Expand All @@ -1177,7 +1172,6 @@ def __init__(
self.node_rank = idx

self.max_model_len = max_model_len
self.max_num_seqs = max_num_seqs
self.limit_mm_per_prompt = limit_mm_per_prompt
self.mm_processor_kwargs = mm_processor_kwargs
self.use_warmup = use_warmup
Expand Down Expand Up @@ -1243,22 +1237,22 @@ def postprocess(self):

self.paddle_commit_id = paddle.version.commit

if self.max_num_batched_tokens is None:
if self.scheduler_config.max_num_batched_tokens is None:
if int(envs.ENABLE_V1_KVCACHE_SCHEDULER):
if paddle.is_compiled_with_xpu():
self.max_num_batched_tokens = self.max_model_len
self.scheduler_config.max_num_batched_tokens = self.max_model_len
else:
self.max_num_batched_tokens = 8192 # if set to max_model_len, it's easy to be OOM
self.scheduler_config.max_num_batched_tokens = 8192 # if set to max_model_len, it's easy to be OOM
else:
if self.cache_config.enable_chunked_prefill:
self.max_num_batched_tokens = 2048
self.scheduler_config.max_num_batched_tokens = 2048
else:
self.max_num_batched_tokens = self.max_model_len
self.scheduler_config.max_num_batched_tokens = self.max_model_len

if self.long_prefill_token_threshold == 0:
self.long_prefill_token_threshold = int(self.max_model_len * 0.04)

self.cache_config.postprocess(self.max_num_batched_tokens, self.max_num_seqs)
self.cache_config.postprocess(self.scheduler_config.max_num_batched_tokens, self.scheduler_config.max_num_seqs)
self.cache_config.max_block_num_per_seq = int(self.max_model_len // self.cache_config.block_size)

if self.guided_decoding_backend == "auto":
Expand All @@ -1272,19 +1266,24 @@ def check(self):
"""
check the legality of config
"""
assert self.max_num_seqs <= 256, (
"The parameter `max_num_seqs` is not allowed to exceed 256, " f"but now it's {self.max_num_seqs}."
assert self.scheduler_config.max_num_seqs <= 256, (
"The parameter `max_num_seqs` is not allowed to exceed 256, "
f"but now it's {self.scheduler_config.max_num_seqs}."
)
assert self.nnode >= 1, f"nnode: {self.nnode} should no less than 1"
assert self.max_model_len >= 16, f"max_model_len: {self.max_model_len} should be larger than 16"
assert self.max_num_seqs >= 1, f"max_num_seqs: {self.max_num_seqs} should be larger than 1"
assert self.max_num_batched_tokens >= self.max_num_seqs, (
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
f"should be larger than or equal to max_num_seqs: {self.max_num_seqs}"
assert (
self.scheduler_config.max_num_seqs >= 1
), f"max_num_seqs: {self.scheduler_config.max_num_seqs} should be larger than 1"
assert self.scheduler_config.max_num_batched_tokens >= self.scheduler_config.max_num_seqs, (
f"max_num_batched_tokens: {self.scheduler_config.max_num_batched_tokens} "
f"should be larger than or equal to max_num_seqs: {self.scheduler_config.max_num_seqs}"
)
assert self.max_num_batched_tokens <= self.max_model_len * self.max_num_seqs, (
f"max_num_batched_tokens: {self.max_num_batched_tokens} should be larger"
f"than or equal to max_num_seqs: {self.max_num_seqs} * max_model_len: {self.max_model_len}"
assert (
self.scheduler_config.max_num_batched_tokens <= self.max_model_len * self.scheduler_config.max_num_seqs
), (
f"max_num_batched_tokens: {self.scheduler_config.max_num_batched_tokens} should be larger"
f"than or equal to max_num_seqs: {self.scheduler_config.max_num_seqs} * max_model_len: {self.max_model_len}"
)
assert (
self.max_num_partial_prefills >= 1
Expand All @@ -1305,13 +1304,13 @@ def check(self):

if not self.cache_config.enable_chunked_prefill:
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
assert self.max_num_batched_tokens >= self.max_model_len, (
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
assert self.scheduler_config.max_num_batched_tokens >= self.max_model_len, (
f"max_num_batched_tokens: {self.scheduler_config.max_num_batched_tokens} "
f"should be larger than or equal to max_model_len: {self.max_model_len}"
)
else:
assert self.max_num_batched_tokens >= self.cache_config.block_size, (
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
assert self.scheduler_config.max_num_batched_tokens >= self.cache_config.block_size, (
f"max_num_batched_tokens: {self.scheduler_config.max_num_batched_tokens} "
f"should be larger than or equal to block_size: {self.cache_config.block_size}"
)

Expand Down
14 changes: 2 additions & 12 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,23 +932,15 @@ def create_scheduler_config(self) -> SchedulerConfig:
"""
prefix = "scheduler_"
prefix_len = len(prefix)
extra_params = [
"max_model_len",
"enable_chunked_prefill",
"max_num_partial_prefills",
"max_long_partial_prefills",
"long_prefill_token_threshold",
]

all = asdict(self)
params = dict()
for k, v in all.items():
if k[:prefix_len] == prefix:
params[k[prefix_len:]] = v
elif k in extra_params:
else:
params[k] = v

return SchedulerConfig(**params)
return SchedulerConfig(params)

def create_graph_optimization_config(self) -> GraphOptimizationConfig:
"""
Expand Down Expand Up @@ -1048,9 +1040,7 @@ def create_engine_config(self) -> FDConfig:
load_config=load_cfg,
parallel_config=parallel_cfg,
max_model_len=self.max_model_len,
max_num_seqs=self.max_num_seqs,
speculative_config=speculative_cfg,
max_num_batched_tokens=self.max_num_batched_tokens,
ips=self.ips,
use_warmup=self.use_warmup,
engine_worker_queue_port=self.engine_worker_queue_port,
Expand Down
10 changes: 5 additions & 5 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, cfg, start_queue=True):

if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager = ResourceManagerV1(
cfg.max_num_seqs,
cfg.scheduler_config.max_num_seqs,
cfg,
cfg.parallel_config.tensor_parallel_size,
cfg.splitwise_role,
Expand All @@ -83,7 +83,7 @@ def __init__(self, cfg, start_queue=True):
)
else:
self.resource_manager = ResourceManager(
cfg.max_num_seqs,
cfg.scheduler_config.max_num_seqs,
cfg,
cfg.parallel_config.tensor_parallel_size,
cfg.splitwise_role,
Expand All @@ -109,7 +109,7 @@ def __init__(self, cfg, start_queue=True):
self.partial_chunked_tokens = [0] * (self.cfg.max_num_partial_prefills + 1)
for idx in range(1, self.cfg.max_num_partial_prefills + 1):
self.partial_chunked_tokens[idx] = (
(self.cfg.max_num_batched_tokens // idx)
(self.cfg.scheduler_config.max_num_batched_tokens // idx)
// self.cfg.cache_config.block_size
* self.cfg.cache_config.block_size
)
Expand Down Expand Up @@ -356,7 +356,7 @@ def update_tokens(idx, chunk_size, update_chunk=False):
requests_chunk = [[] for _ in range(len(requests))]
chunk_request_num = len(current_request_size)
while chunk_request_num >= 1:
remain_batched_tokens = self.cfg.max_num_batched_tokens
remain_batched_tokens = self.cfg.scheduler_config.max_num_batched_tokens
for idx in range(len(current_request_size)):
if current_request_size[idx] <= 0:
continue
Expand Down Expand Up @@ -496,7 +496,7 @@ def _insert_task_to_worker(self):
available_blocks=self.resource_manager.available_block_num(),
block_size=self.cfg.cache_config.block_size,
reserved_output_blocks=self.cfg.cache_config.enc_dec_block_num,
max_num_batched_tokens=self.cfg.max_num_batched_tokens,
max_num_batched_tokens=self.cfg.scheduler_config.max_num_batched_tokens,
batch=num_prefill_batch,
)

Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def _start_worker_service(self):
ips = ",".join(self.cfg.ips)
arguments = (
f" --devices {self.cfg.device_ids} {py_script}"
f" --max_num_seqs {self.cfg.max_num_seqs} --max_model_len {self.cfg.max_model_len}"
f" --max_num_seqs {self.cfg.scheduler_config.max_num_seqs} --max_model_len {self.cfg.max_model_len}"
f" --gpu_memory_utilization {self.cfg.cache_config.gpu_memory_utilization}"
f" --model {self.cfg.model_config.model!s}"
f" --device_ids {self.cfg.device_ids}"
Expand All @@ -480,7 +480,7 @@ def _start_worker_service(self):
f" --eos_tokens_lens {self.data_processor.eos_token_id_len}"
f" --pad_token_id {self.data_processor.pad_token_id}"
f" --engine_pid {self.cfg.engine_worker_queue_port[0]}"
f" --max_num_batched_tokens {self.cfg.max_num_batched_tokens}"
f" --max_num_batched_tokens {self.cfg.scheduler_config.max_num_batched_tokens}"
f" --splitwise_role {self.cfg.splitwise_role}"
f" --kv_cache_ratio {self.cfg.cache_config.kv_cache_ratio}"
f" --expert_parallel_size {self.cfg.parallel_config.expert_parallel_size}"
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def schedule(self):
with self.lock:
scheduled_reqs: list[Request] = []
preempted_reqs: list[Request] = []
token_budget = self.config.max_num_batched_tokens
token_budget = self.config.scheduler_config.max_num_batched_tokens

# First, schedule the RUNNING requests.
req_index = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def __init__(
):
super().__init__(fd_config=fd_config)
self.vocab_size = fd_config.model_config.vocab_size
self.batch_size = fd_config.parallel_config.max_num_seqs
self.batch_size = fd_config.scheduler_config.max_num_seqs

self.any_whitespace = not fd_config.parallel_config.disable_any_whitespace

Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/model_executor/layers/attention/attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def __init__(

self.cache_k_block_means = paddle.zeros(
[
fd_config.parallel_config.max_num_seqs,
fd_config.scheduler_config.max_num_seqs,
moba_max_seq_length // moba_block_size,
self.kv_num_heads,
self.head_dim,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def __init__(
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False)
self.max_partition_size: int = int(os.getenv("FLAGS_max_partition_size", "32768"))
self.zero_seq_enc_lens_for_decode = paddle.zeros(
shape=[fd_config.parallel_config.max_num_seqs, 1], dtype=paddle.int32
shape=[fd_config.scheduler_config.max_num_seqs, 1], dtype=paddle.int32
)

def get_attntion_meta(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
assert fd_config.moba_attention_config is not None, "moba_attention_config is None"
self.block_size = fd_config.parallel_config.block_size
self.max_seq_len = fd_config.parallel_config.max_model_len
self.max_num_seqs = fd_config.parallel_config.max_num_seqs
self.max_num_seqs = fd_config.scheduler_config.max_num_seqs
self.kv_num_heads = kv_num_heads
self.num_heads = num_heads
self.head_dim = fd_config.model_config.head_dim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(
self.attention_metadata: GCUFlashAttnMetadata = None
self.block_size = fd_config.cache_config.block_size
self.max_seq_len = fd_config.parallel_config.max_model_len
self.max_num_seqs = fd_config.parallel_config.max_num_seqs
self.max_num_seqs = fd_config.scheduler_config.max_num_seqs

self.causal = getattr(fd_config.model_config, "causal", True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
self.attention_metadata: GCUMemEfficientAttnMetadata = None
self.block_size = fd_config.cache_config.block_size
self.max_seq_len = fd_config.parallel_config.max_model_len
self.max_num_seqs = fd_config.parallel_config.max_num_seqs
self.max_num_seqs = fd_config.scheduler_config.max_num_seqs

self.causal = getattr(fd_config.model_config, "causal", True)

Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/model_executor/layers/sample/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def __init__(self, fd_config: FDConfig = None):
):
early_stopper_cls = get_early_stopper_cls_from_stragegy(fd_config.early_stop_config.strategy)
self.early_stopper = early_stopper_cls()
self.early_stopper.initialize(fd_config.parallel_config.max_num_seqs, fd_config.early_stop_config)
self.early_stopper.initialize(fd_config.scheduler_config.max_num_seqs, fd_config.early_stop_config)

def set_reasoning_parser(self, reasoning_parser: Optional[ReasoningParser] = None):
"""set reasoning parser"""
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/model_executor/models/deepseek_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,11 @@ def __init__(self, fd_config: FDConfig):
num_embeddings=fd_config.model_config.vocab_size,
prefix="lm_head",
)
self.position_ids_buffer = paddle.empty([fd_config.parallel_config.max_num_batched_tokens], dtype=paddle.int32)
self.position_ids_buffer = paddle.empty(
[fd_config.scheduler_config.max_num_batched_tokens], dtype=paddle.int32
)
self.mask_encoder_batch_buffer = paddle.empty(
[fd_config.parallel_config.max_num_batched_tokens, 1], dtype=paddle.int32
[fd_config.scheduler_config.max_num_batched_tokens, 1], dtype=paddle.int32
)

@classmethod
Expand Down
25 changes: 15 additions & 10 deletions fastdeploy/scheduler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,12 @@ class SchedulerConfig:
Creates appropriate config based on scheduler type (local/global).
"""

def __init__(self, name="local", **kwargs):
def __init__(self, args):
"""
Initialize scheduler configuration factory.

Args:
name: Scheduler type ("local" for LocalScheduler or "global" for GlobalScheduler)
**kwargs: Configuration parameters for the specific scheduler type
args: Configuration parameters for the specific scheduler type

Initializes:
- Appropriate config object based on scheduler type
Expand All @@ -217,17 +216,23 @@ def __init__(self, name="local", **kwargs):
Raises:
Exception: If invalid scheduler type is specified
"""
self.name = name
self.name = "local" # "local" for LocalScheduler or "global" for GlobalScheduler
self.max_num_batched_tokens = 2048
self.max_num_seqs = 34
self.config = None

if name == "local":
self.config = LocalSchedulerConfig(**kwargs)
for key, value in args.items():
if hasattr(self, key):
setattr(self, key, value)

if name == "global":
self.config = GlobalSchedulerConfig(**kwargs)
if self.name == "local":
self.config = LocalSchedulerConfig(**args)

if name == "splitwise":
self.config = SplitWiseSchedulerConfig(**kwargs)
if self.name == "global":
self.config = GlobalSchedulerConfig(**args)

if self.name == "splitwise":
self.config = SplitWiseSchedulerConfig(**args)

def check(self):
"""
Expand Down
3 changes: 2 additions & 1 deletion fastdeploy/spec_decode/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def __init__(self, cfg: FDConfig):
self.speculative_config = self.cfg.speculative_config
self.cache_config = self.cfg.cache_config
self.quant_config = self.cfg.quant_config
self.scheduler_config = self.cfg.scheduler_config

self.max_num_seqs = self.parallel_config.max_num_seqs
self.max_num_seqs = self.scheduler_config.max_num_seqs
self.max_model_len = self.parallel_config.max_model_len
self.speculative_method = self.speculative_config.method
self.max_draft_token_num = self.speculative_config.num_speculative_tokens
Expand Down
Loading
Loading