Skip to content

[backport] Fixes for the latest dask. (#11291) #11429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

trivialfis
Copy link
Member

@trivialfis trivialfis commented Apr 23, 2025

@trivialfis
Copy link
Member Author

trivialfis commented Apr 23, 2025

My Dask installation:

>>> dask.__version__
'2025.2.0'
>>> distributed.__version__
'2025.2.0'
>>> dask_cuda.__version__
'25.04.00'

CPU: All pass

GPU log:
=================================================== FAILURES ====================================================____________________________________ TestDistributedGPU.test_dask_dataframe _____________________________________
self = <test_distributed.test_gpu_with_dask.test_gpu_with_dask.TestDistributedGPU object at 0x7d160459f560>
local_cuda_client = <Client: 'tcp://127.0.0.1:45799' processes=2 threads=2, memory=125.01 GiB>

    @pytest.mark.skipif(**tm.no_dask_cudf())
    def test_dask_dataframe(self, local_cuda_client: Client) -> None:
>       run_with_dask_dataframe(dxgb.DaskDMatrix, local_cuda_client)

tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py:252: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py:115: in run_with_dask_dataframe
    has_null = X.isnull().values.any().compute()
../../../.anaconda/envs/xgboost_dev/lib/python3.12/site-packages/dask/base.py:374: in compute
    (result,) = compute(self, traverse=False, **kwargs)
../../../.anaconda/envs/xgboost_dev/lib/python3.12/site-packages/dask/base.py:655: in compute
    dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
../../../.anaconda/envs/xgboost_dev/lib/python3.12/site-packages/dask/base.py:424: in collections_to_dsk
    dsk = opt(dsk, keys, **kwargs)
../../../.anaconda/envs/xgboost_dev/lib/python3.12/site-packages/dask/array/optimization.py:62: in optimize
    dsk = fuse_linear_task_spec(dsk, keys=keys)
../../../.anaconda/envs/xgboost_dev/lib/python3.12/site-packages/dask/_task_spec.py:1072: in fuse_linear_task_spec
    result[renamed_key] = Task.fuse(*linear_chain, key=renamed_key)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = ('to_string_dtype-fused-values-any-28afeb47fab28713e15c8dc7dcc8b57d', 0, 0)
tasks = (<Task ('_to_string_dtype-fused-values-185adba3bbf694c18e775d029684175f', 0) _execute_subgraph(...)>, <Task ('any-28afeb47fab28713e15c8dc7dcc8b57d', 0, 0) any(..., ...)>)
all_keys = {('_to_string_dtype-fused-values-185adba3bbf694c18e775d029684175f', 0), ('any-28afeb47fab28713e15c8dc7dcc8b57d', 0, 0)}
all_deps = {'Booster-510b2311389449a39d54839ab6a8f4df', ('_to_string_dtype-fused-values-185adba3bbf694c18e775d029684175f', 0, 0), ('from-dask-array-fe13aea7f90d68a3fbb86cb9fd68cd08', 0), ('from-dask-array-cf6220ea0cf73b81a1dffcbe17e25be7', 0)}

    @staticmethod
    def fuse(*tasks: GraphNode, key: KeyType | None = None) -> GraphNode:
        """Fuse a set of tasks into a single task.
    
        The tasks are fused into a single task that will execute the tasks in a
        subgraph. The internal tasks are no longer accessible from the outside.
    
        All provided tasks must form a valid subgraph that will reduce to a
        single key. If multiple outputs are possible with the provided tasks, an
        exception will be raised.
    
        The tasks will not be rewritten but instead a new Task will be created
        that will merely reference the old task objects. This way, Task objects
        may be reused in multiple fused tasks.
    
        Parameters
        ----------
        key : KeyType | None, optional
            The key of the new Task object. If None provided, the key of the
            final task will be used.
    
        See also
        --------
        GraphNode.substitute : Easer substitution of dependencies
        """
        if any(t.key is None for t in tasks):
            raise ValueError("Cannot fuse tasks with missing keys")
        if len(tasks) == 1:
            return tasks[0].substitute({}, key=key)
        all_keys = set()
        all_deps: set[KeyType] = set()
        for t in tasks:
            all_deps.update(t.dependencies)
            all_keys.add(t.key)
        external_deps = tuple(sorted(all_deps - all_keys, key=hash))
        leafs = all_keys - all_deps
        if len(leafs) > 1:
>           raise ValueError(f"Cannot fuse tasks with multiple outputs {leafs}")
E           ValueError: Cannot fuse tasks with multiple outputs {('any-28afeb47fab28713e15c8dc7dcc8b57d', 0, 0), ('_to_string_dtype-fused-values-185adba3bbf694c18e775d029684175f', 0)}

@trivialfis
Copy link
Member Author

trivialfis commented Apr 24, 2025

Python 3.12.0 | packaged by conda-forge | (main, Oct  3 2023, 08:43:22) [GCC 12.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import dask
>>> import dask_cudf
>>> dask.__version__
'2024.12.1'
>>> dask_cudf.__version__
'25.02.02'

GPU: Same error as above.

CPU tests:

FAILED tests/test_distributed/test_with_dask/test_with_dask.py::test_boost_from_prediction[hist] - TypeError: '<' not supported between instances of 'str' and 'int'
FAILED tests/test_distributed/test_with_dask/test_with_dask.py::test_boost_from_prediction[approx] - TypeError: '<' not supported between instances of 'str' and 'int'
FAILED tests/test_distributed/test_with_dask/test_with_dask.py::test_worker_left - assert False
FAILED tests/test_distributed/test_with_dask/test_with_dask.py::test_worker_restarted - assert False
CPU log
======================================================================================= FAILURES =======================================================================================
___________________________________________________________________________ test_boost_from_prediction[hist] ___________________________________________________________________________

tree_method = 'hist', client = <Client: 'tcp://127.0.0.1:33751' processes=2 threads=64, memory=125.66 GiB>

    @pytest.mark.parametrize("tree_method", ["hist", "approx"])
    def test_boost_from_prediction(tree_method: str, client: "Client") -> None:
        from sklearn.datasets import load_breast_cancer, load_digits
    
        X_, y_ = load_breast_cancer(return_X_y=True)
        X, y = dd.from_array(X_, chunksize=200), dd.from_array(y_, chunksize=200)
>       run_boost_from_prediction(X, y, tree_method, "cpu", client)

tests/test_distributed/test_with_dask/test_with_dask.py:590: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/test_distributed/test_with_dask/test_with_dask.py:541: in run_boost_from_prediction
    X, y, margin = deterministic_repartition(client, X, y, margin)
tests/test_distributed/test_with_dask/test_with_dask.py:174: in deterministic_repartition
    if any(X.map_partitions(lambda x: _is_cudf_df(x)).compute()):
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_collection.py:479: in compute
    out = out.optimize(fuse=fuse)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_collection.py:594: in optimize
    return new_collection(self.expr.optimize(fuse=fuse))
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_expr.py:93: in optimize
    return optimize(self, **kwargs)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_expr.py:3118: in optimize
    return optimize_until(expr, stage)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_expr.py:3069: in optimize_until
    expr = result.simplify()
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:397: in simplify
    new = expr.simplify_once(dependents=dependents, simplified={})
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:375: in simplify_once
    new = operand.simplify_once(
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:375: in simplify_once
    new = operand.simplify_once(
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:358: in simplify_once
    out = child._simplify_up(expr, dependents)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_concat.py:245: in _simplify_up
    if all(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <zip object at 0x7c92e6019ec0>

    if all(
>       sorted(cols) == sorted(get_columns_or_name(frame))
        for frame, cols in zip(self._frames, columns_frame)
    ):
E   TypeError: '<' not supported between instances of 'str' and 'int'

../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_concat.py:246: TypeError
---------------------------------------------------------------------------------- Captured log setup ----------------------------------------------------------------------------------
INFO     distributed.scheduler:scheduler.py:5924 Receive client connection: Client-8c1b43b8-20e1-11f0-92b4-00be43bdbb7d
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51076
---------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------
WARNING  distributed.shuffle._scheduler_plugin:_scheduler_plugin.py:214 Shuffle 3953d8509ca656fbbc7016a3dc951d61 initialized by task ('shuffle-transfer-3953d8509ca656fbbc7016a3dc951d61', 2) executed on worker tcp://127.0.0.1:35501
WARNING  distributed.shuffle._scheduler_plugin:_scheduler_plugin.py:541 Shuffle 3953d8509ca656fbbc7016a3dc951d61 deactivated due to stimulus 'task-finished-1745481354.213109'
INFO     distributed.worker:worker.py:3171 Run out-of-band function '_start_tracker'
-------------------------------------------------------------------------------- Captured log teardown ---------------------------------------------------------------------------------
INFO     distributed.scheduler:scheduler.py:5969 Remove client Client-8c1b43b8-20e1-11f0-92b4-00be43bdbb7d
INFO     distributed.core:core.py:908 Received 'close-stream' from tcp://127.0.0.1:51076; closing.
INFO     distributed.scheduler:scheduler.py:5969 Remove client Client-8c1b43b8-20e1-11f0-92b4-00be43bdbb7d
INFO     distributed.scheduler:scheduler.py:5961 Close client connection: Client-8c1b43b8-20e1-11f0-92b4-00be43bdbb7d
__________________________________________________________________________ test_boost_from_prediction[approx] __________________________________________________________________________

tree_method = 'approx', client = <Client: 'tcp://127.0.0.1:33751' processes=2 threads=64, memory=125.66 GiB>

    @pytest.mark.parametrize("tree_method", ["hist", "approx"])
    def test_boost_from_prediction(tree_method: str, client: "Client") -> None:
        from sklearn.datasets import load_breast_cancer, load_digits
    
        X_, y_ = load_breast_cancer(return_X_y=True)
        X, y = dd.from_array(X_, chunksize=200), dd.from_array(y_, chunksize=200)
>       run_boost_from_prediction(X, y, tree_method, "cpu", client)

tests/test_distributed/test_with_dask/test_with_dask.py:590: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/test_distributed/test_with_dask/test_with_dask.py:541: in run_boost_from_prediction
    X, y, margin = deterministic_repartition(client, X, y, margin)
tests/test_distributed/test_with_dask/test_with_dask.py:174: in deterministic_repartition
    if any(X.map_partitions(lambda x: _is_cudf_df(x)).compute()):
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_collection.py:479: in compute
    out = out.optimize(fuse=fuse)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_collection.py:594: in optimize
    return new_collection(self.expr.optimize(fuse=fuse))
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_expr.py:93: in optimize
    return optimize(self, **kwargs)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_expr.py:3118: in optimize
    return optimize_until(expr, stage)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_expr.py:3069: in optimize_until
    expr = result.simplify()
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:397: in simplify
    new = expr.simplify_once(dependents=dependents, simplified={})
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:375: in simplify_once
    new = operand.simplify_once(
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:375: in simplify_once
    new = operand.simplify_once(
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_core.py:358: in simplify_once
    out = child._simplify_up(expr, dependents)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_concat.py:245: in _simplify_up
    if all(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <zip object at 0x7c92e61f2400>

    if all(
>       sorted(cols) == sorted(get_columns_or_name(frame))
        for frame, cols in zip(self._frames, columns_frame)
    ):
E   TypeError: '<' not supported between instances of 'str' and 'int'

../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/dask_expr/_concat.py:246: TypeError
---------------------------------------------------------------------------------- Captured log setup ----------------------------------------------------------------------------------
INFO     distributed.scheduler:scheduler.py:5924 Receive client connection: Client-8c946088-20e1-11f0-92b4-00be43bdbb7d
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:51088
---------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------
WARNING  distributed.shuffle._scheduler_plugin:_scheduler_plugin.py:214 Shuffle ea6a4627e6e8b8121c51e225b66efd97 initialized by task ('shuffle-transfer-ea6a4627e6e8b8121c51e225b66efd97', 2) executed on worker tcp://127.0.0.1:35501
WARNING  distributed.shuffle._scheduler_plugin:_scheduler_plugin.py:541 Shuffle ea6a4627e6e8b8121c51e225b66efd97 deactivated due to stimulus 'task-finished-1745481354.8945446'
INFO     distributed.worker:worker.py:3171 Run out-of-band function '_start_tracker'
-------------------------------------------------------------------------------- Captured log teardown ---------------------------------------------------------------------------------
INFO     distributed.scheduler:scheduler.py:5969 Remove client Client-8c946088-20e1-11f0-92b4-00be43bdbb7d
INFO     distributed.core:core.py:908 Received 'close-stream' from tcp://127.0.0.1:51088; closing.
INFO     distributed.scheduler:scheduler.py:5969 Remove client Client-8c946088-20e1-11f0-92b4-00be43bdbb7d
INFO     distributed.scheduler:scheduler.py:5961 Close client connection: Client-8c946088-20e1-11f0-92b4-00be43bdbb7d
___________________________________________________________________________________ test_worker_left ___________________________________________________________________________________

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:80: in inner
    with self._recreate_cm():
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/distributed/utils_test.py:1785: in clean
    with check_process_leak(check=processes):
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/distributed/utils_test.py:1685: in check_process_leak
    term_or_kill_active_children(timeout=term_timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

timeout = 3

    def term_or_kill_active_children(timeout: float) -> None:
        """Send SIGTERM to get_mp_context().active_children(), wait up to 3 seconds for processes
        to die, then send SIGKILL to the survivors
        """
        children = get_mp_context().active_children()
        for proc in children:
            proc.terminate()
    
        children = wait_active_children(timeout=timeout)
        for proc in children:
            proc.kill()
    
        children = wait_active_children(timeout=30)
        if children:  # pragma: nocover
            logger.warning("Leaked unkillable children processes: %s", children)
            # It should be impossible to ignore SIGKILL on Linux/MacOSX
>           assert WINDOWS
E           assert False

../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/distributed/utils_test.py:1666: AssertionError
---------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------
INFO     distributed.core:core.py:893 Connection to tcp://127.0.0.1:51062 has been closed.
INFO     distributed.scheduler:scheduler.py:5969 Remove client Client-worker-8baee832-20e1-11f0-9647-00be43bdbb7d
INFO     distributed.core:core.py:893 Connection to tcp://127.0.0.1:37736 has been closed.
INFO     distributed.scheduler:scheduler.py:5969 Remove client Client-worker-84b251b4-20e1-11f0-964c-00be43bdbb7d
INFO     distributed.scheduler:scheduler.py:5961 Close client connection: Client-worker-8baee832-20e1-11f0-9647-00be43bdbb7d
INFO     distributed.scheduler:scheduler.py:5961 Close client connection: Client-worker-84b251b4-20e1-11f0-964c-00be43bdbb7d
INFO     distributed.core:core.py:893 Connection to tcp://127.0.0.1:37698 has been closed.
INFO     distributed.scheduler:scheduler.py:5433 Remove worker addr: tcp://127.0.0.1:41829 name: 0 (stimulus_id='handle-worker-cleanup-1745481478.8347006')
INFO     distributed.core:core.py:893 Connection to tcp://127.0.0.1:37704 has been closed.
INFO     distributed.scheduler:scheduler.py:5433 Remove worker addr: tcp://127.0.0.1:35501 name: 1 (stimulus_id='handle-worker-cleanup-1745481478.8508594')
INFO     distributed.scheduler:scheduler.py:5568 Lost all workers
WARNING  distributed.process:process.py:270 [<AsyncProcess Dask Worker process (from Nanny)>] process 136775 exit status was already read will report exitcode 255
INFO     distributed.nanny:nanny.py:805 Worker process 136775 was killed by unknown signal
WARNING  distributed.nanny:nanny.py:568 Restarting worker
WARNING  distributed.process:process.py:270 [<AsyncProcess Dask Worker process (from Nanny)>] process 136780 exit status was already read will report exitcode 255
INFO     distributed.nanny:nanny.py:805 Worker process 136780 was killed by unknown signal
WARNING  distributed.nanny:nanny.py:568 Restarting worker
INFO     distributed.scheduler:scheduler.py:4576 Register worker addr: tcp://127.0.0.1:34077 name: 0
INFO     distributed.scheduler:scheduler.py:6170 Starting worker compute stream, tcp://127.0.0.1:34077
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:59048
INFO     distributed.scheduler:scheduler.py:4576 Register worker addr: tcp://127.0.0.1:41439 name: 1
INFO     distributed.scheduler:scheduler.py:6170 Starting worker compute stream, tcp://127.0.0.1:41439
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:59050
INFO     distributed.core:core.py:893 Connection to tcp://127.0.0.1:59050 has been closed.
INFO     distributed.scheduler:scheduler.py:5433 Remove worker addr: tcp://127.0.0.1:41439 name: 1 (stimulus_id='handle-worker-cleanup-1745481481.7751205')
INFO     distributed.core:core.py:893 Connection to tcp://127.0.0.1:59048 has been closed.
INFO     distributed.scheduler:scheduler.py:5433 Remove worker addr: tcp://127.0.0.1:34077 name: 0 (stimulus_id='handle-worker-cleanup-1745481481.7753751')
INFO     distributed.scheduler:scheduler.py:5568 Lost all workers
WARNING  distributed.process:process.py:270 [<AsyncProcess Dask Worker process (from Nanny)>] process 143249 exit status was already read will report exitcode 255
INFO     distributed.nanny:nanny.py:805 Worker process 143249 was killed by unknown signal
WARNING  distributed.nanny:nanny.py:568 Restarting worker
WARNING  distributed.process:process.py:270 [<AsyncProcess Dask Worker process (from Nanny)>] process 143245 exit status was already read will report exitcode 255
INFO     distributed.nanny:nanny.py:805 Worker process 143245 was killed by unknown signal
WARNING  distributed.nanny:nanny.py:568 Restarting worker
INFO     distributed.scheduler:scheduler.py:4576 Register worker addr: tcp://127.0.0.1:46279 name: 1
INFO     distributed.scheduler:scheduler.py:6170 Starting worker compute stream, tcp://127.0.0.1:46279
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:59078
INFO     distributed.scheduler:scheduler.py:4576 Register worker addr: tcp://127.0.0.1:39509 name: 0
INFO     distributed.scheduler:scheduler.py:6170 Starting worker compute stream, tcp://127.0.0.1:39509
INFO     distributed.core:core.py:883 Starting established connection to tcp://127.0.0.1:59082
WARNING  distributed.utils_test:utils_test.py:1664 Leaked unkillable children processes: [<SpawnProcess name='Dask Worker process (from Nanny)' pid=143273 parent=135860 started daemon>, <SpawnProcess name='Dask Worker process (from Nanny)' pid=143269 parent=135860 started daemon>]
________________________________________________________________________________ test_worker_restarted _________________________________________________________________________________

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:80: in inner
    with self._recreate_cm():
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/distributed/utils_test.py:1785: in clean
    with check_process_leak(check=processes):
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/distributed/utils_test.py:1685: in check_process_leak
    term_or_kill_active_children(timeout=term_timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

timeout = 3

    def term_or_kill_active_children(timeout: float) -> None:
        """Send SIGTERM to get_mp_context().active_children(), wait up to 3 seconds for processes
        to die, then send SIGKILL to the survivors
        """
        children = get_mp_context().active_children()
        for proc in children:
            proc.terminate()
    
        children = wait_active_children(timeout=timeout)
        for proc in children:
            proc.kill()
    
        children = wait_active_children(timeout=30)
        if children:  # pragma: nocover
            logger.warning("Leaked unkillable children processes: %s", children)
            # It should be impossible to ignore SIGKILL on Linux/MacOSX
>           assert WINDOWS
E           assert False

../../../.anaconda/envs/xgboost_dev_2502/lib/python3.12/site-packages/distributed/utils_test.py:1666: AssertionError

@trivialfis trivialfis changed the base branch from release_2.1.0 to release_3.0.0 April 26, 2025 07:30
@trivialfis trivialfis changed the base branch from release_3.0.0 to release_2.1.0 April 26, 2025 07:30
@trivialfis trivialfis closed this May 23, 2025
@trivialfis trivialfis deleted the backport-11291 branch May 23, 2025 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant