|
38 | 38 | "dataset = \"images\"\n", |
39 | 39 | "\n", |
40 | 40 | "num_workers = None\n", |
41 | | - "cluster_kwargs = {}\n", |
| 41 | + "cluster_kwargs = {\n", |
| 42 | + " \"preexec_commands\": (\n", |
| 43 | + " \"singularity exec ~/nanshe_nanshe_workflow_latest.img \\\\\",\n", |
| 44 | + " ),\n", |
| 45 | + "}\n", |
42 | 46 | "client_kwargs = {}\n", |
43 | 47 | "\n", |
44 | 48 | "\n", |
|
80 | 84 | "metadata": {}, |
81 | 85 | "outputs": [], |
82 | 86 | "source": [ |
83 | | - "import time\n", |
84 | | - "import distributed\n", |
85 | | - "import dask_drmaa\n", |
86 | | - "from nanshe_workflow.par import set_num_workers\n", |
87 | | - "\n", |
88 | | - "\n", |
89 | | - "def startup_distributed(nworkers):\n", |
90 | | - " nworkers = int(nworkers)\n", |
91 | | - "\n", |
92 | | - " if dask_drmaa:\n", |
93 | | - " cluster = dask_drmaa.DRMAACluster(\n", |
94 | | - " preexec_commands=(\n", |
95 | | - " \"singularity exec ~/nanshe_nanshe_workflow_latest.img \\\\\",\n", |
96 | | - " ),\n", |
97 | | - " template={\"jobEnvironment\": os.environ}\n", |
98 | | - " )\n", |
99 | | - " cluster.start_workers(nworkers)\n", |
100 | | - " else:\n", |
101 | | - " # Either `dask_drmaa` is unavailable or DRMAA cannot start.\n", |
102 | | - " # Fallback to a local Distributed client instead.\n", |
103 | | - " cluster = distributed.LocalCluster(\n", |
104 | | - " n_workers=nworkers, threads_per_worker=1\n", |
105 | | - " )\n", |
106 | | - "\n", |
107 | | - " client = distributed.Client(cluster)\n", |
108 | | - " while (\n", |
109 | | - " (client.status == \"running\") and\n", |
110 | | - " (len(client.scheduler_info()[\"workers\"]) < nworkers)\n", |
111 | | - " ):\n", |
112 | | - " time.sleep(1)\n", |
113 | | - "\n", |
114 | | - " return client\n", |
115 | | - "\n", |
| 87 | + "from nanshe_workflow.par import set_num_workers, startup_distributed\n", |
116 | 88 | "\n", |
117 | 89 | "num_workers = set_num_workers(num_workers)\n", |
118 | 90 | "\n", |
|
0 commit comments