Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,6 @@ def gcs_actor_scheduling_enabled():
)

RAY_GC_MIN_COLLECT_INTERVAL = env_float("RAY_GC_MIN_COLLECT_INTERVAL_S", 5)

WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR = "WARN_BLOCKING_GET_INSIDE_ASYNC"
DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC = True
23 changes: 18 additions & 5 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,7 @@ def show_in_dashboard(message: str, key: str = "", dtype: str = "text"):


# Global variable to make sure we only send out the warning once.
blocking_get_inside_async_warned = False
blocking_get_inside_async_logged = False


@overload
Expand Down Expand Up @@ -2905,15 +2905,28 @@ def get(
worker.check_connected()

if hasattr(worker, "core_worker") and worker.core_worker.current_actor_is_asyncio():
global blocking_get_inside_async_warned
if not blocking_get_inside_async_warned:
logger.warning(
global blocking_get_inside_async_logged
if not blocking_get_inside_async_logged:
from ray._private.ray_constants import (
DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC,
WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR,
env_bool,
)

blocking_get_inside_async_text = (
"Using blocking ray.get inside async actor. "
"This blocks the event loop. Please use `await` "
"on object ref with asyncio.gather if you want to "
"yield execution to the event loop instead."
)
blocking_get_inside_async_warned = True
if env_bool(
WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR,
DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC,
):
logger.warning(blocking_get_inside_async_text)
else:
logger.debug(blocking_get_inside_async_text)
blocking_get_inside_async_logged = True

with profiling.profile("ray.get"):
# TODO(sang): Should make ObjectRefGenerator
Expand Down
11 changes: 10 additions & 1 deletion python/ray/train/v2/_internal/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os
from typing import Dict

from ray._private.ray_constants import env_bool, env_set_by_user
from ray._private.ray_constants import (
WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR,
env_bool,
env_set_by_user,
)

# Unsupported configs can use this value to detect if the user has set it.
_UNSUPPORTED = "UNSUPPORTED"
Expand Down Expand Up @@ -76,9 +80,13 @@
# GET_ACTOR_TIMEOUT_S_ENV_VAR * CONTROLLERS_TO_POLL_PER_ITERATION_ENV_VAR should be
# way less than STATE_ACTOR_RECONCILIATION_INTERVAL_S_ENV_VAR.
CONTROLLERS_TO_POLL_PER_ITERATION: int = 5

# Environment variable for Train execution callbacks
RAY_TRAIN_CALLBACKS_ENV_VAR = "RAY_TRAIN_CALLBACKS"

# Ray Train does not warn by default when using blocking ray.get inside async actor.
DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC_VALUE = "0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this "0" instead of False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC is a bool because it's the default value used by the env_bool function, which returns a bool

DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC_VALUE is a string because DataParallelTrainer explicitly sets os.environ to it, and os.environ is a dict from strings to strings.

I agree it's confusing though so lmk if there's a cleaner way to do this.


# Environment variables to propagate from the driver to the controller,
# and then from the controller to the workers.
ENV_VARS_TO_PROPAGATE = {
Expand All @@ -93,6 +101,7 @@
ENABLE_WORKER_STRUCTURED_LOGGING_ENV_VAR,
ENABLE_STATE_ACTOR_RECONCILIATION_ENV_VAR,
STATE_ACTOR_RECONCILIATION_INTERVAL_S_ENV_VAR,
WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR,
}


Expand Down
14 changes: 13 additions & 1 deletion python/ray/train/v2/api/data_parallel_trainer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import logging
import os
import signal
import sys
import threading
from typing import Any, Callable, Dict, List, Optional, Union

import ray
from ray._common.usage import usage_lib
from ray._private.ray_constants import env_bool
from ray._private.ray_constants import (
WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR,
env_bool,
)
from ray.actor import ActorHandle
from ray.air._internal.usage import tag_train_v2_trainer
from ray.train import (
Expand Down Expand Up @@ -38,6 +42,7 @@
from ray.train.v2._internal.callbacks.state_manager import StateManagerCallback
from ray.train.v2._internal.callbacks.user_callback import UserCallbackHandler
from ray.train.v2._internal.constants import (
DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC_VALUE,
METRICS_ENABLED_ENV_VAR,
get_env_vars_to_propagate,
)
Expand Down Expand Up @@ -104,9 +109,16 @@ def __init__(
if metadata is not None:
raise DeprecationWarning(_GET_METADATA_DEPRECATION_MESSAGE)

self._set_default_env_vars()
usage_lib.record_library_usage("train")
tag_train_v2_trainer(self)

def _set_default_env_vars(self):
if WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR not in os.environ:
os.environ[
WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR
] = DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC_VALUE

def _get_train_func(self) -> Callable[[], None]:
return construct_train_func(
self.train_loop_per_worker,
Expand Down
20 changes: 19 additions & 1 deletion python/ray/train/v2/tests/test_data_parallel_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
import pytest

import ray
from ray._private.ray_constants import WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR
from ray.tests.client_test_utils import create_remote_signal_actor
from ray.train import BackendConfig, Checkpoint, RunConfig, ScalingConfig, UserCallback
from ray.train.backend import Backend
from ray.train.constants import RAY_CHDIR_TO_TRIAL_DIR, _get_ray_train_session_dir
from ray.train.tests.util import create_dict_checkpoint
from ray.train.v2._internal.constants import is_v2_enabled
from ray.train.v2._internal.constants import (
DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC_VALUE,
is_v2_enabled,
)
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer
from ray.train.v2.api.exceptions import WorkerGroupError
from ray.train.v2.api.result import Result
Expand Down Expand Up @@ -305,6 +309,20 @@ def test_sigint_abort(ray_start_4_cpus, spam_sigint):
process.join()


@pytest.mark.parametrize("env_var_set", [True, False])
def test_set_default_env_vars(env_var_set, monkeypatch):
if env_var_set:
monkeypatch.setenv(WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR, "1")
DataParallelTrainer(lambda: "not used")
if env_var_set:
assert os.environ[WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR] == "1"
else:
assert (
os.environ[WARN_BLOCKING_GET_INSIDE_ASYNC_ENV_VAR]
== DEFAULT_WARN_BLOCKING_GET_INSIDE_ASYNC_VALUE
)


if __name__ == "__main__":
import sys

Expand Down