Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,5 @@ configs/development.yaml
# Docker
.dockerignore
Dockerfile.dev
discussion
tmp_test
8 changes: 7 additions & 1 deletion examples/offline_inference/qwen2_5_omni/end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,13 @@ def parse_args():
default=16000,
help="Sampling rate for audio loading (default: 16000).",
)

parser.add_argument("--worker-backend", type=str, default="process", choices=["process", "ray"], help="backend")
parser.add_argument(
"--ray-address",
type=str,
default=None,
help="Address of the Ray cluster.",
)
return parser.parse_args()


Expand Down
24 changes: 24 additions & 0 deletions vllm_omni/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0

from .connectors import (
ConnectorSpec,
MooncakeConnector,
OmniConnectorBase,
OmniConnectorFactory,
OmniTransferConfig,
SharedMemoryConnector,
load_omni_transfer_config,
)

__all__ = [
# Config
"ConnectorSpec",
"OmniTransferConfig",
# Connectors
"OmniConnectorBase",
"OmniConnectorFactory",
"MooncakeConnector",
"SharedMemoryConnector",
# Utilities
"load_omni_transfer_config",
]
109 changes: 109 additions & 0 deletions vllm_omni/distributed/connectors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# VLLM-Omni Distributed Connectors

This guide explains how to configure and use distributed connectors in vllm-omni for multi-stage pipelines.

## 1. Overview

Connectors enable data transfer between pipeline stages (e.g., Thinker -> Talker).
Currently supported connectors are:
1. **SharedMemoryConnector**: Uses system shared memory.
2. **MooncakeConnector**: Uses [Mooncake](https://github.com/kvcache-ai/Mooncake).

* **SharedMemoryConnector (Default)**: Zero-copy, lowest latency. Best for **single-node** deployments. Auto-configured if no connectors are specified.
* **MooncakeConnector**: TCP/RDMA based. Best for **multi-node** distributed deployments. Requires a Mooncake Master service.

## 2. Installation (Mooncake)

If using `MooncakeConnector`, install the library first:

```bash
# For CUDA-enabled systems (Recommended)
pip install mooncake-transfer-engine

# For non-CUDA systems
pip install mooncake-transfer-engine-non-cuda
```

## 3. Using MooncakeConnector

### 3.1 Start Mooncake Master

Start the master service on your primary node:

```bash
# if you use mooncake SSD storage
mkdir -p ./mc_storage

mooncake_master \
--rpc_port=50051 \
--enable_http_metadata_server=true \
--http_metadata_server_host=0.0.0.0 \
--http_metadata_server_port=8080 \
--metrics_port=9003 \
--root_fs_dir=./mc_storage/ \
--cluster_id=mc-local-1 &
```

### 3.2 Configuration (YAML)

Edit your stage config (e.g., `qwen2_5_omni.yaml`).

**Step 1: Define Connector in Global Runtime**

```yaml
runtime:
connectors:
connector_of_mooncake:
name: MooncakeConnector
extra:
host: "127.0.0.1" # Local Worker IP
metadata_server: "http://<MASTER_IP>:8080/metadata"
master: "<MASTER_IP>:50051"
segment: 512000000 # 512MB segment
localbuf: 64000000 # 64MB buffer
proto: "tcp" # "tcp" or "rdma"
```

**Step 2: Reference in Stages**

Explicitly link stages using `input_connectors` and `output_connectors`:

```yaml
stage_args:
- stage_id: 0
# ...
output_connectors:
to_stage_1: connector_of_mooncake

- stage_id: 1
# ...
input_connectors:
from_stage_0: connector_of_mooncake
```

## 4. Using SharedMemoryConnector (Auto-Mode)

**Best for single-node.**

The system will automatically create SHM connectors based on `runtime.edges` if no explicit connectors are defined.

### Threshold Configuration
By default, payloads larger than **64KB** (default threshold) are transferred via shared memory, while smaller ones use the control queue.

To adjust this threshold (e.g., to 1GB), add the following to your `runtime.connectors`:

```yaml
runtime:
connectors:
connector_of_shared_memory:
name: SharedMemoryConnector
extra:
shm_threshold_bytes: 1024 # 1KB threshold
```

## 5. Summary

| Use Case | Recommended Connector | Configuration |
| :--- | :--- | :--- |
| **Single Node** | `SharedMemoryConnector` | **None** (Automatic) or Custom Threshold |
| **Multi Node** | `MooncakeConnector` | Explicit YAML + Mooncake Master |
36 changes: 36 additions & 0 deletions vllm_omni/distributed/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# SPDX-License-Identifier: Apache-2.0

from .base import OmniConnectorBase
from .config import ConnectorSpec, OmniTransferConfig
from .factory import OmniConnectorFactory
from .mooncake_connector import MooncakeConnector
from .shm_connector import SharedMemoryConnector
from .utils import (
build_stage_connectors,
get_connectors_config_for_stage,
get_stage_connector_config,
initialize_connectors_from_config,
initialize_orchestrator_connectors,
load_omni_transfer_config,
)

__all__ = [
# Config
"ConnectorSpec",
"OmniTransferConfig",
# Base classes and implementations
"OmniConnectorBase",
# Factory
"OmniConnectorFactory",
# Specific implementations
"MooncakeConnector",
"SharedMemoryConnector",
# Utilities
"load_omni_transfer_config",
"initialize_connectors_from_config",
"get_connectors_config_for_stage",
# Manager helpers
"initialize_orchestrator_connectors",
"get_stage_connector_config",
"build_stage_connectors",
]
167 changes: 167 additions & 0 deletions vllm_omni/distributed/connectors/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# SPDX-License-Identifier: Apache-2.0
# temporary for compatibility with vllm_omni.entrypoints.omni_stage.py
# and vllm_omni.entrypoints.omni_llm.py

import time
from typing import Any, Callable, Optional

from vllm_omni.distributed.connectors.logging import get_connector_logger

logger = get_connector_logger(__name__)


def try_send_via_connector(
connector: Any,
stage_id: int,
next_stage_id: int,
req_id: str,
next_inputs: Any,
sampling_params: Any,
original_prompt: Any,
next_stage_queue_submit_fn: Callable[[dict[str, Any]], None],
metrics: Any,
) -> bool:
"""
Attempts to send data via OmniConnector.
Returns True if successful, False otherwise.
Encapsulates the logic of preparing payload, sending via connector,
sending notification, and recording metrics.
"""
try:
t0 = time.time()

# Prepare data for connector
payload_data = {
"engine_inputs": next_inputs,
"sampling_params": sampling_params,
"metadata": {
"original_prompt": original_prompt,
"stage_transition": f"{stage_id}->{next_stage_id}",
"timestamp": time.time(),
},
}

# Send data via connector
success, serialized_size, metadata = connector.put(str(stage_id), str(next_stage_id), str(req_id), payload_data)

if success:
# Send lightweight notification via queue
notify_payload = {
"request_id": req_id,
"sampling_params": sampling_params,
"from_connector": True,
"from_stage": str(stage_id),
"to_stage": str(next_stage_id),
"sent_ts": time.time(),
}
# Merge connector metadata (e.g. shm handle or inline data) into queue payload
if metadata:
notify_payload["connector_metadata"] = metadata

next_stage_queue_submit_fn(notify_payload)

t1 = time.time()
tx_ms = (t1 - t0) * 1000.0

metrics.on_forward(
stage_id,
next_stage_id,
req_id,
serialized_size, # Use size from connector
float(tx_ms),
True, # Mark as using connector
)
return True
else:
# If put returned False, we let the caller handle fallback
return False

except Exception as e:
logger.warning(
"[Orchestrator] OmniConnector failed for req %s: %s; falling back to queue",
req_id,
e,
)
return False


def try_recv_via_connector(
task: dict[str, Any],
connectors: dict[Any, Any],
stage_id: int,
) -> tuple[Any, Optional[dict[str, Any]]]:
"""
Attempts to resolve input data from either connector or IPC.
Returns (engine_inputs, rx_metrics) or (None, None) if failed/skipped.
"""
rid = task["request_id"]

if task.get("from_connector"):
from_stage = task.get("from_stage")
to_stage = str(stage_id)

if not from_stage:
logger.error(
"[Stage-%s] 'from_connector' is true but 'from_stage' is missing for request %s", stage_id, rid
)
return None, None

# Get connector for this edge
connector_key = (from_stage, to_stage)
connector = connectors.get(connector_key)

if connector:
try:
# Get data from connector with timeout
_t_start = time.time()
connector_metadata = task.get("connector_metadata")
payload = connector.get(from_stage, to_stage, str(rid), metadata=connector_metadata)
_t_end = time.time()

if payload:
if isinstance(payload, tuple):
payload_data, serialized_size = payload
else:
payload_data = payload
serialized_size = len(connector.serialize_obj(payload_data))
else:
payload_data = None
serialized_size = 0

if payload_data and isinstance(payload_data, dict):
ein = payload_data.get("engine_inputs")
decode_ms = (_t_end - _t_start) * 1000.0

rx_metrics = {"rx_decode_time_ms": decode_ms, "rx_transfer_bytes": serialized_size}
return ein, rx_metrics
else:
logger.error(
"[Stage-%s] Failed to get data from connector for request %s or payload is empty", stage_id, rid
)
return None, None
except Exception as e:
logger.error("[Stage-%s] Error retrieving data from connector for request %s: %s", stage_id, rid, e)
return None, None
else:
logger.error(
"[Stage-%s] No connector found for edge %s -> %s for request %s", stage_id, from_stage, to_stage, rid
)
return None, None
else:
# Data comes from queue as usual (e.g. seed request for Stage-0)
# Since fallback logic is deprecated, we assume this is a direct inputs payload.
# We still need to decode it if it used SHM (via legacy stage_utils logic, or new shm_connector format)
# For Stage-0 specifically, 'engine_inputs' is often directly in the task dict.

# Try to use the new stage_utils which uses OmniSerializer
from vllm_omni.entrypoints.stage_utils import maybe_load_from_ipc_with_metrics

try:
ein, metrics = maybe_load_from_ipc_with_metrics(task, "engine_inputs", "engine_inputs_shm")
# If metrics are empty or zero, we might want to populate dummy metrics
return ein, metrics
except Exception:
# If engine_inputs is missing, it might be a different kind of payload,
# but for Stage-0 seed it should be there.
# We'll return None to let caller handle error if strictly required.
return None, None
Loading