Skip to content

Mixing CPU and GPU workers results in poor scheduling decisions #9128

@vianneylanguille

Description

@vianneylanguille

Describe the issue:
From the docs there is no restriction to use SpecCluster with heterogeneous workers (e.g. different resources).
However it does not seem to be properly handling resources.
The test below (play with cluster_type variable) show the behavior between a speccluster with CPU/GPU resources (with virtually unlimited CPU resources, set to 100) a specclsuter with two workers without resources and 1 with GPU, and a Local Cluster

It turns out there is actually not a good handling of mixed resources
there was such an issue reported #7887 .

if confirmed, it would be worth indicating it in the doc that resources have to be uniform

Minimal Complete Verifiable Example:

# -*- coding: utf-8 -*-

from dask.distributed import SpecCluster, Worker, Scheduler, wait, LocalCluster
from dataclasses import dataclass
import numpy as np
import time, webbrowser


@dataclass
class Test():    
    def propagate(self, t, f = 0.1, f3=None):  
        time.sleep(f)
        return t

cluster_type = 'Local'

# clsuter with defined resources both CPU and GPU
if cluster_type == 'SpecCluster_with_CPUGPU_resources':
    
    scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
    workers = {
        'my-worker_CPU': {"cls": Worker, "options": {"nthreads": 10, 'resources':{"CPU":100}}},
        'my-worker_CPU2': {"cls": Worker, "options": {"nthreads": 10, 'resources':{"CPU":100}}},
        'my-worker_GPU': {"cls": Worker, "options": {"nthreads": 1, 'resources':{"GPU":1}}},
    }
    cluster = SpecCluster(scheduler=scheduler, workers=workers, asynchronous=False)
    res_CPU = {'CPU': 1}
    res_GPU = {'GPU': 1}
    
# cluster with only GPU resources defined
elif cluster_type == 'SpecCluster_with_GPU_resources':
    
    scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
    workers = {
        'my-worker_CPU': {"cls": Worker, "options": {"nthreads": 10}},
        'my-worker_CPU2': {"cls": Worker, "options": {"nthreads": 10}},
        'my-worker_GPU': {"cls": Worker, "options": {"nthreads": 10, 'resources':{"GPU":1}}},
    }
    cluster = SpecCluster(scheduler=scheduler, workers=workers, asynchronous=False)
    res_CPU = None
    res_GPU = {'GPU': 1}
# local cluster
else:    
    cluster = LocalCluster(n_workers=3)
    res_CPU = None
    res_GPU = None
    
    
client = cluster.get_client()
webbrowser.open(client.dashboard_link)
test_object = Test()
futures = []

tt = np.arange(0, 365*86400, 10)
tt = [10, 20]

for i in range(200):
    
    f = client.submit(func=test_object.propagate, t=[i, i+1], resources = res_CPU)
    f2 = client.submit(func=test_object.propagate, f=0.001, t=[i+1, i+2], resources = res_GPU)    
    f3 = client.submit(func=test_object.propagate, t=f, f3=f2, resources = res_CPU)
    
    futures.append(f3)
    
res = client.compute(futures)
    
wait(futures)
    

Anything else we need to know?:

Environment:

  • Dask version:
  • Python version:
  • Operating System:
  • Install method (conda, pip, source):

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is broken

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions