Skip to content

Commit 365df24

Browse files
xiaohong42root
andauthored
[fully_async, rollout, trainer, tool, cfg] fix: ROCm async training compatibility for AMD MI300X (#6062)
## What does this PR do? Fix multiple issues that prevent fully async FSDP2 training from working on AMD ROCm platforms (MI300X series). **Environment:** - AMD Instinct MI3xx (8× GPU, 192 GB HBM each), ROCm 7.2, PyTorch 2.10+rocm7.2, vLLM v0.18.1rc1 - Cross-validated on NVIDIA H20 with CUDA (no regression observed) **Training curves (MI3xx vs H20) and training script will be attached in PR comments.** [dapo_7b_fully_async.sh](https://github.com/user-attachments/files/26700697/dapo_7b_fully_async.sh) <img width="2234" height="1181" alt="qwen2 5_7b_fully_async_1" src="https://github.com/user-attachments/assets/81bd6651-9f1e-4450-b8b9-68149264536f" /> ### Checklist Before Starting - [x] Search for similar PRs: https://github.com/verl-project/verl/pulls?q=is%3Apr+rocm+async - [x] Format: `[{modules}] {type}: {description}` ### Test Validated by full async FSDP2 DAPO/GRPO RL + ReTool training on AMD MI3xx: - 140+ training steps completed without errors, deadlocks or OOM - Reward improved from -0.8 to 0 over 12+ hours of training - Cross-validated on NVIDIA H20: all changes are platform-safe (AMD-specific env vars are ignored on NVIDIA/Ascend; ZMQ handle changes use platform-independent rank logic) ### API and Usage Example No API changes. All fixes are internal implementation details. ### Design & Code Changes 1. **Add `HSA_NO_SCRATCH_RECLAIM` env var** (`constants_ppo.py`) - Required by AMD RCCL on MI300X; without it, FSDP initialization fails with `ncclSystemError` - Added alongside existing platform-specific vars (HCCL for Ascend); ignored on non-AMD platforms 2. **Fix `numpy.bool_` JSON serialization** (`ray_trainer.py`) - Add `default=str` fallback for `json.dumps` since numpy 2.x `bool_` is no longer a Python `bool` subclass 3. **ZMQ IPC handle: use `(replica_rank, local_rank)` instead of GPU UUID** (`vllm_rollout.py`, `utils.py`, `vllm_async_server.py`) - On ROCm, `CheckpointEngineWorker` and vLLM worker see different GPU UUIDs due to different `CUDA_VISIBLE_DEVICES`/`HIP_VISIBLE_DEVICES` settings - Sender uses `rollout_rank % local_world_size` to derive node-local rank, matching vLLM worker's `local_rank` on every node (fixes multi-node mismatch) - `replica_rank` prefix avoids socket collisions when multiple replicas share a node - `VERL_REPLICA_RANK` env var is set in `vLLMHttpServer.__init__` and inherited by vLLM worker subprocesses - Both `vLLMColocateWorkerExtension` and `vLLMOmniColocateWorkerExtension` are updated 4. **Clean up stale ZMQ IPC socket files** (`bucketed_weight_transfer.py`) - Remove leftover `.sock` files before `bind()` and after `_cleanup()` to prevent `Address already in use` on restart 5. **Fix Hydra searchpath** (`fully_async_ppo_trainer.yaml`) - Use `pkg://verl.trainer.config` instead of `file://verl/trainer/config` for editable installs 6. **Sandbox Ray actor reuse** (`sandbox_fusion_tools.py`) - Add `name` and `get_if_exists=True` to prevent duplicate `ExecutionWorker` actor creation ### Platform Compatibility All changes are safe for non-AMD platforms: - `HSA_NO_SCRATCH_RECLAIM`: AMD-specific env var, silently ignored on NVIDIA/Ascend - ZMQ handle changes: use platform-independent rank arithmetic; `VERL_REPLICA_RANK` defaults to `"0"` for single-replica setups - Other changes (json default=str, IPC cleanup, Hydra pkg://, actor reuse) are pure logic improvements ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [x] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). The official documents will be compiled after the merger. - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: These fixes target ROCm-specific runtime behavior (HIP memory management, RCCL env vars, GPU UUID mismatch) that cannot be reproduced in CI without AMD GPU hardware. - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).) - [x] If your PR is related to the `recipe` submodule, please also update the reference to the submodule commit via `git submodule update --remote` or `cd recipe && git pull origin main`. --------- Co-authored-by: root <root@mi308-ccs-aus-e04-40.prov.aus.ccs.cpe.ice.amd.com>
1 parent f2b1c98 commit 365df24

8 files changed

Lines changed: 39 additions & 12 deletions

File tree

verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
hydra:
22
searchpath:
3-
- file://verl/trainer/config
3+
- pkg://verl.trainer.config
44

55
defaults:
66
- ppo_trainer

verl/tools/sandbox_fusion_tools.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def init_execution_pool(
9090
if mode == PoolMode.ThreadMode:
9191
return (
9292
ray.remote(ExecutionWorker)
93-
.options(max_concurrency=num_workers)
93+
.options(name="sandbox-execution-pool", get_if_exists=True, max_concurrency=num_workers)
9494
.remote(enable_global_rate_limit=enable_global_rate_limit, rate_limit=rate_limit)
9595
)
9696
else:

verl/trainer/constants_ppo.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
# https://www.hiascend.com/document/detail/zh/canncommercial/83RC1/maintenref/envvar/envref_07_0143.html
3232
"HCCL_HOST_SOCKET_PORT_RANGE": "auto",
3333
"HCCL_NPU_SOCKET_PORT_RANGE": "auto",
34+
"HSA_NO_SCRATCH_RECLAIM": "1",
3435
},
3536
}
3637

verl/trainer/ppo/ray_trainer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def _dump_generations(self, inputs, outputs, gts, scores, reward_extra_infos_dic
422422
lines = []
423423
for i in range(n):
424424
entry = {k: v[i] for k, v in base_data.items()}
425-
lines.append(json.dumps(entry, ensure_ascii=False))
425+
lines.append(json.dumps(entry, ensure_ascii=False, default=str))
426426

427427
with open(filename, "w") as f:
428428
f.write("\n".join(lines) + "\n")

verl/workers/rollout/vllm_rollout/bucketed_weight_transfer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ async def async_send_weights(self, weights):
155155

156156
def _init_socket(self):
157157
"""Initialize ZMQ REQ socket and bind."""
158+
if self.zmq_handle.startswith("ipc://"):
159+
ipc_path = self.zmq_handle[len("ipc://") :]
160+
try:
161+
os.remove(ipc_path)
162+
except OSError:
163+
pass
158164
self.socket = self.zmq_context.socket(zmq.REQ)
159165
self.socket.bind(self.zmq_handle)
160166

@@ -185,6 +191,12 @@ def _cleanup(self):
185191
if self.socket is not None:
186192
self.socket.close()
187193
self.socket = None
194+
if self.zmq_handle.startswith("ipc://"):
195+
ipc_path = self.zmq_handle[len("ipc://") :]
196+
try:
197+
os.remove(ipc_path)
198+
except OSError:
199+
pass
188200
del self.buffer
189201
self.buffer = None
190202
if self.shm is not None:

verl/workers/rollout/vllm_rollout/utils.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,13 @@ def _update_weights(self, weights: list[tuple[str, torch.Tensor]], peft_config:
264264
self.model_runner.model.load_weights(weights)
265265

266266
def _get_zmq_handle(self) -> str:
267-
"""Get ZMQ handle for communication."""
268-
if not hasattr(self, "device_uuid") or not self.device_uuid:
269-
self.device_uuid = get_device_uuid(self.device.index)
270-
return f"ipc:///tmp/rl-colocate-zmq-{self.device_uuid}.sock"
267+
"""Get ZMQ handle for communication.
268+
Uses replica_rank + local_rank to form handle so it matches the sender side
269+
regardless of CUDA_VISIBLE_DEVICES differences, and avoids collisions
270+
when multiple replicas share the same node.
271+
"""
272+
replica_rank = os.environ.get("VERL_REPLICA_RANK", "0")
273+
return f"ipc:///tmp/rl-colocate-zmq-replica-{replica_rank}-rank-{self.local_rank}.sock"
271274

272275

273276
class vLLMOmniColocateWorkerExtension(_OmniWorkerBase):
@@ -330,10 +333,13 @@ def _update_weights(self, weights: list[tuple[str, torch.Tensor]], peft_config:
330333
self.load_weights(weights)
331334

332335
def _get_zmq_handle(self) -> str:
333-
"""Get ZMQ handle for communication."""
334-
if not hasattr(self, "device_uuid") or not self.device_uuid:
335-
self.device_uuid = get_device_uuid(self.device.index)
336-
return f"ipc:///tmp/rl-colocate-zmq-{self.device_uuid}.sock"
336+
"""Get ZMQ handle for communication.
337+
Uses replica_rank + local_rank to form handle so it matches the sender side
338+
regardless of CUDA_VISIBLE_DEVICES differences, and avoids collisions
339+
when multiple replicas share the same node.
340+
"""
341+
replica_rank = os.environ.get("VERL_REPLICA_RANK", "0")
342+
return f"ipc:///tmp/rl-colocate-zmq-replica-{replica_rank}-rank-{self.local_rank}.sock"
337343

338344

339345
class SuppressSignalInThread:

verl/workers/rollout/vllm_rollout/vllm_async_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def __init__(
108108
cuda_visible_devices (str): cuda visible devices.
109109
"""
110110
os.environ[get_visible_devices_keyword()] = cuda_visible_devices
111+
os.environ["VERL_REPLICA_RANK"] = str(replica_rank)
111112

112113
self.config = self._init_config(config)
113114
self.model_config = self._init_model_config(model_config)

verl/workers/rollout/vllm_rollout/vllm_rollout.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,14 @@ def __init__(
9595
self.sleep_level = VLLM_SLEEP_LEVEL
9696

9797
self.device_uuid = get_device_uuid(get_device_id())
98-
self.zmq_handle = f"ipc:///tmp/rl-colocate-zmq-{self.device_uuid}.sock"
98+
# Use replica_rank + node-local rank to form ZMQ handle instead of GPU UUID,
99+
# because CheckpointEngineWorker and vLLM worker may see different GPU UUIDs
100+
# when CUDA_VISIBLE_DEVICES differs between processes (common on ROCm/AMD).
101+
# Must use node-local rank (not rollout_rank) so it matches vLLM worker's
102+
# local_rank on every node. Include replica_rank to avoid collisions when
103+
# multiple replicas share a node.
104+
local_rank = self.rollout_rank % local_world_size
105+
self.zmq_handle = f"ipc:///tmp/rl-colocate-zmq-replica-{self.replica_rank}-rank-{local_rank}.sock"
99106

100107
self.use_shm = not is_support_ipc()
101108
if self.use_shm:

0 commit comments

Comments
 (0)