Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,9 @@ def _fetch_request():
else:
time.sleep(0.005)

except RuntimeError as e:
if "cannot schedule new futures after shutdown" in str(e):
break
except Exception as e:
err_msg = "Error happend while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
self.llm_logger.error(err_msg)
Expand Down
22 changes: 20 additions & 2 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ def start(self, api_server_pid=None):
Initializes the engine and starts its sub-services.
If `api_server_pid` is defined, will launch a thread
to keep getting request from zmq_server.

NOTE: To clarify the launch order of the components of the LLM engine:
1. First, launch splitwise scheduler (if necessary) and expert services (if necessary).
2. Then, launch common engine, which includes some background threads that inserts tasks and receives ouptuts.
3. Most importantly, launch workers and cache services. The launch order of them are listed as follows.

| Profile | Mixed | PrefixCache | Cache -> Worker | Worker -> Cache |
|---------|-------|-------------|-----------------|-----------------|
| 1 | 1 | 1 | 0 | 1 |
| 1 | 1 | 0 | 0 | 0 |
| 1 | 0 | 1 | 0 | 1 |
| 1 | 0 | 0 | 0 | 1 |
| 0 | 1 | 1 | 0 | 1 |
| 0 | 1 | 0 | 0 | 0 |
| 0 | 0 | 1 | 1 | 0 |
| 0 | 0 | 0 | 1 | 0 |

4. Finally, inform user the engine has successfully started.

"""
assert not self.is_started, "The engine is already started."
start_time = time.time()
Expand All @@ -109,7 +128,6 @@ def start(self, api_server_pid=None):
self.ipc_signal_suffix = self.cfg.parallel_config.engine_worker_queue_port[0]
self._init_worker_signals()

# Launch components: scheduler, cache_manager, expert_service et.al.
self.launch_components()

self.engine.start()
Expand Down Expand Up @@ -151,7 +169,7 @@ def check_worker_initialize_status_func(res: dict):
# and then start the cache manager
if self.do_profile:
self._stop_profile()
elif self.cfg.cache_config.enable_prefix_caching:
elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching:
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix)

Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/inter_communicator/ipc_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ def __init__(
name = name + f".{suffix}"

if create:
llm_logger.debug(f"creating ipc signal: {name}")
if shared_memory_exists(name):
llm_logger.warning(f"ShareMemory: {name} already exists, delete it")
SharedMemory(name=name, create=False).unlink()
self.shm = SharedMemory(create=True, size=array.nbytes, name=name)
self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf)
self.value[:] = array # Initialize with input array data
else:
llm_logger.debug(f"attaching ipc signal: {name}")
self.shm = SharedMemory(name=name)
self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf)

Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
StructuredOutputsConfig,
)
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
from fastdeploy.inter_communicator import ExistTaskStatus, IPCSignal, ModelWeightsStatus
from fastdeploy.inter_communicator import ExistTaskStatus, IPCSignal, ModelWeightsStatus, shared_memory_exists
from fastdeploy.model_executor.layers.quantization import parse_quant_config
from fastdeploy.model_executor.utils import v1_loader_support
from fastdeploy.platforms import current_platform
Expand Down Expand Up @@ -427,7 +427,7 @@ def graph_optimize_and_warm_up_model(self) -> None:
array=prefilled_step_idx_data,
dtype=np.int32,
suffix=gpu_id,
create=False,
create=not shared_memory_exists(prefilled_step_name),
)
step_shm_value.value[0] = -1

Expand Down
Loading