Skip to content

Commit 69e9f94

Browse files
authored
Remove max_tasks_per_child=1 limitation from processes executor (#515)
* Allow max_tasks_per_child to be set for the `processes` executor, default to not set. * Parameterize test_mem_utilization to run on lithops and processes executors * Run slow tests on PRs
1 parent 59c593d commit 69e9f94

File tree

4 files changed

+104
-56
lines changed

4 files changed

+104
-56
lines changed

.github/workflows/slow-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
name: Slow tests
22

33
on:
4+
pull_request:
45
schedule:
56
# Every weekday at 03:49 UTC, see https://crontab.guru/
67
- cron: "49 3 * * 1-5"

cubed/runtime/executors/local.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,19 @@ async def async_execute_dag(
179179
if spec is not None:
180180
check_runtime_memory(spec, max_workers)
181181
if use_processes:
182+
max_tasks_per_child = kwargs.pop("max_tasks_per_child", None)
182183
context = multiprocessing.get_context("spawn")
183184
# max_tasks_per_child is only supported from Python 3.11
184-
concurrent_executor = ProcessPoolExecutor(
185-
max_workers=max_workers, mp_context=context, max_tasks_per_child=1
186-
)
185+
if max_tasks_per_child is None:
186+
concurrent_executor = ProcessPoolExecutor(
187+
max_workers=max_workers, mp_context=context
188+
)
189+
else:
190+
concurrent_executor = ProcessPoolExecutor(
191+
max_workers=max_workers,
192+
mp_context=context,
193+
max_tasks_per_child=max_tasks_per_child,
194+
)
187195
else:
188196
concurrent_executor = ThreadPoolExecutor(max_workers=max_workers)
189197
try:

cubed/tests/test_mem_utilization.py

Lines changed: 90 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,60 @@
11
import math
2+
import platform
23
import shutil
4+
import sys
35
from functools import partial, reduce
46

57
import pytest
68

7-
from cubed.core.ops import partial_reduce
8-
from cubed.core.optimization import multiple_inputs_optimize_dag
9-
10-
pytest.importorskip("lithops")
11-
129
import cubed
1310
import cubed.array_api as xp
1411
import cubed.random
1512
from cubed.backend_array_api import namespace as nxp
13+
from cubed.core.ops import partial_reduce
14+
from cubed.core.optimization import multiple_inputs_optimize_dag
1615
from cubed.extensions.history import HistoryCallback
1716
from cubed.extensions.mem_warn import MemoryWarningCallback
18-
from cubed.runtime.executors.lithops import LithopsExecutor
17+
from cubed.runtime.create import create_executor
1918
from cubed.tests.utils import LITHOPS_LOCAL_CONFIG
2019

20+
ALLOWED_MEM = 2_000_000_000
21+
22+
EXECUTORS = {}
23+
24+
if platform.system() != "Windows":
25+
EXECUTORS["processes"] = create_executor("processes")
26+
27+
# Run with max_tasks_per_child=1 so that each task is run in a new process,
28+
# allowing us to perform a stronger check on peak memory
29+
if sys.version_info >= (3, 11):
30+
executor_options = dict(max_tasks_per_child=1)
31+
EXECUTORS["processes-single-task"] = create_executor(
32+
"processes", executor_options
33+
)
34+
35+
try:
36+
executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1)
37+
EXECUTORS["lithops"] = create_executor("lithops", executor_options)
38+
except ImportError:
39+
pass
40+
2141

2242
@pytest.fixture()
2343
def spec(tmp_path, reserved_mem):
24-
return cubed.Spec(tmp_path, allowed_mem=2_000_000_000, reserved_mem=reserved_mem)
44+
return cubed.Spec(tmp_path, allowed_mem=ALLOWED_MEM, reserved_mem=reserved_mem)
45+
46+
47+
@pytest.fixture(
48+
scope="module",
49+
params=EXECUTORS.values(),
50+
ids=EXECUTORS.keys(),
51+
)
52+
def executor(request):
53+
return request.param
2554

2655

2756
@pytest.fixture(scope="module")
28-
def reserved_mem():
29-
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
57+
def reserved_mem(executor):
3058
res = cubed.measure_reserved_mem(executor) * 1.1 # add some wiggle room
3159
return round_up_to_multiple(res, 10_000_000) # round up to nearest multiple of 10MB
3260

@@ -40,58 +68,58 @@ def round_up_to_multiple(x, multiple=10):
4068

4169

4270
@pytest.mark.slow
43-
def test_index(tmp_path, spec):
71+
def test_index(tmp_path, spec, executor):
4472
a = cubed.random.random(
4573
(10000, 10000), chunks=(5000, 5000), spec=spec
4674
) # 200MB chunks
4775
b = a[1:, :]
48-
run_operation(tmp_path, "index", b)
76+
run_operation(tmp_path, executor, "index", b)
4977

5078

5179
@pytest.mark.slow
52-
def test_index_step(tmp_path, spec):
80+
def test_index_step(tmp_path, spec, executor):
5381
a = cubed.random.random(
5482
(10000, 10000), chunks=(5000, 5000), spec=spec
5583
) # 200MB chunks
5684
b = a[::2, :]
57-
run_operation(tmp_path, "index_step", b)
85+
run_operation(tmp_path, executor, "index_step", b)
5886

5987

6088
# Creation Functions
6189

6290

6391
@pytest.mark.slow
64-
def test_eye(tmp_path, spec):
92+
def test_eye(tmp_path, spec, executor):
6593
a = xp.eye(10000, 10000, chunks=(5000, 5000), spec=spec)
66-
run_operation(tmp_path, "eye", a)
94+
run_operation(tmp_path, executor, "eye", a)
6795

6896

6997
@pytest.mark.slow
70-
def test_tril(tmp_path, spec):
98+
def test_tril(tmp_path, spec, executor):
7199
a = cubed.random.random(
72100
(10000, 10000), chunks=(5000, 5000), spec=spec
73101
) # 200MB chunks
74102
b = xp.tril(a)
75-
run_operation(tmp_path, "tril", b)
103+
run_operation(tmp_path, executor, "tril", b)
76104

77105

78106
# Elementwise Functions
79107

80108

81109
@pytest.mark.slow
82-
def test_add(tmp_path, spec):
110+
def test_add(tmp_path, spec, executor):
83111
a = cubed.random.random(
84112
(10000, 10000), chunks=(5000, 5000), spec=spec
85113
) # 200MB chunks
86114
b = cubed.random.random(
87115
(10000, 10000), chunks=(5000, 5000), spec=spec
88116
) # 200MB chunks
89117
c = xp.add(a, b)
90-
run_operation(tmp_path, "add", c)
118+
run_operation(tmp_path, executor, "add", c)
91119

92120

93121
@pytest.mark.slow
94-
def test_add_reduce_left(tmp_path, spec):
122+
def test_add_reduce_left(tmp_path, spec, executor):
95123
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold left.
96124
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
97125
#
@@ -111,11 +139,13 @@ def test_add_reduce_left(tmp_path, spec):
111139
]
112140
result = reduce(lambda x, y: xp.add(x, y), arrs)
113141
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
114-
run_operation(tmp_path, "add_reduce_left", result, optimize_function=opt_fn)
142+
run_operation(
143+
tmp_path, executor, "add_reduce_left", result, optimize_function=opt_fn
144+
)
115145

116146

117147
@pytest.mark.slow
118-
def test_add_reduce_right(tmp_path, spec):
148+
def test_add_reduce_right(tmp_path, spec, executor):
119149
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold right.
120150
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
121151
#
@@ -137,23 +167,25 @@ def test_add_reduce_right(tmp_path, spec):
137167
]
138168
result = reduce(lambda x, y: xp.add(y, x), reversed(arrs))
139169
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
140-
run_operation(tmp_path, "add_reduce_right", result, optimize_function=opt_fn)
170+
run_operation(
171+
tmp_path, executor, "add_reduce_right", result, optimize_function=opt_fn
172+
)
141173

142174

143175
@pytest.mark.slow
144-
def test_negative(tmp_path, spec):
176+
def test_negative(tmp_path, spec, executor):
145177
a = cubed.random.random(
146178
(10000, 10000), chunks=(5000, 5000), spec=spec
147179
) # 200MB chunks
148180
b = xp.negative(a)
149-
run_operation(tmp_path, "negative", b)
181+
run_operation(tmp_path, executor, "negative", b)
150182

151183

152184
# Linear Algebra Functions
153185

154186

155187
@pytest.mark.slow
156-
def test_matmul(tmp_path, spec):
188+
def test_matmul(tmp_path, spec, executor):
157189
a = cubed.random.random(
158190
(10000, 10000), chunks=(5000, 5000), spec=spec
159191
) # 200MB chunks
@@ -163,20 +195,20 @@ def test_matmul(tmp_path, spec):
163195
c = xp.astype(a, xp.float32)
164196
d = xp.astype(b, xp.float32)
165197
e = xp.matmul(c, d)
166-
run_operation(tmp_path, "matmul", e)
198+
run_operation(tmp_path, executor, "matmul", e)
167199

168200

169201
@pytest.mark.slow
170-
def test_matrix_transpose(tmp_path, spec):
202+
def test_matrix_transpose(tmp_path, spec, executor):
171203
a = cubed.random.random(
172204
(10000, 10000), chunks=(5000, 5000), spec=spec
173205
) # 200MB chunks
174206
b = xp.matrix_transpose(a)
175-
run_operation(tmp_path, "matrix_transpose", b)
207+
run_operation(tmp_path, executor, "matrix_transpose", b)
176208

177209

178210
@pytest.mark.slow
179-
def test_tensordot(tmp_path, spec):
211+
def test_tensordot(tmp_path, spec, executor):
180212
a = cubed.random.random(
181213
(10000, 10000), chunks=(5000, 5000), spec=spec
182214
) # 200MB chunks
@@ -186,14 +218,14 @@ def test_tensordot(tmp_path, spec):
186218
c = xp.astype(a, xp.float32)
187219
d = xp.astype(b, xp.float32)
188220
e = xp.tensordot(c, d, axes=1)
189-
run_operation(tmp_path, "tensordot", e)
221+
run_operation(tmp_path, executor, "tensordot", e)
190222

191223

192224
# Manipulation Functions
193225

194226

195227
@pytest.mark.slow
196-
def test_concat(tmp_path, spec):
228+
def test_concat(tmp_path, spec, executor):
197229
# Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries
198230
a = cubed.random.random(
199231
(9999, 10000), chunks=(5000, 5000), spec=spec
@@ -202,81 +234,80 @@ def test_concat(tmp_path, spec):
202234
(10000, 10000), chunks=(5000, 5000), spec=spec
203235
) # 200MB chunks
204236
c = xp.concat((a, b), axis=0)
205-
run_operation(tmp_path, "concat", c)
237+
run_operation(tmp_path, executor, "concat", c)
206238

207239

208240
@pytest.mark.slow
209-
def test_reshape(tmp_path, spec):
241+
def test_reshape(tmp_path, spec, executor):
210242
a = cubed.random.random(
211243
(10000, 10000), chunks=(5000, 5000), spec=spec
212244
) # 200MB chunks
213245
# need intermediate reshape due to limitations in Dask's reshape_rechunk
214246
b = xp.reshape(a, (5000, 2, 10000))
215247
c = xp.reshape(b, (5000, 20000))
216-
run_operation(tmp_path, "reshape", c)
248+
run_operation(tmp_path, executor, "reshape", c)
217249

218250

219251
@pytest.mark.slow
220-
def test_stack(tmp_path, spec):
252+
def test_stack(tmp_path, spec, executor):
221253
a = cubed.random.random(
222254
(10000, 10000), chunks=(5000, 5000), spec=spec
223255
) # 200MB chunks
224256
b = cubed.random.random(
225257
(10000, 10000), chunks=(5000, 5000), spec=spec
226258
) # 200MB chunks
227259
c = xp.stack((a, b), axis=0)
228-
run_operation(tmp_path, "stack", c)
260+
run_operation(tmp_path, executor, "stack", c)
229261

230262

231263
# Searching Functions
232264

233265

234266
@pytest.mark.slow
235-
def test_argmax(tmp_path, spec):
267+
def test_argmax(tmp_path, spec, executor):
236268
a = cubed.random.random(
237269
(10000, 10000), chunks=(5000, 5000), spec=spec
238270
) # 200MB chunks
239271
b = xp.argmax(a, axis=0)
240-
run_operation(tmp_path, "argmax", b)
272+
run_operation(tmp_path, executor, "argmax", b)
241273

242274

243275
# Statistical Functions
244276

245277

246278
@pytest.mark.slow
247-
def test_max(tmp_path, spec):
279+
def test_max(tmp_path, spec, executor):
248280
a = cubed.random.random(
249281
(10000, 10000), chunks=(5000, 5000), spec=spec
250282
) # 200MB chunks
251283
b = xp.max(a, axis=0)
252-
run_operation(tmp_path, "max", b)
284+
run_operation(tmp_path, executor, "max", b)
253285

254286

255287
@pytest.mark.slow
256-
def test_mean(tmp_path, spec):
288+
def test_mean(tmp_path, spec, executor):
257289
a = cubed.random.random(
258290
(10000, 10000), chunks=(5000, 5000), spec=spec
259291
) # 200MB chunks
260292
b = xp.mean(a, axis=0)
261-
run_operation(tmp_path, "mean", b)
293+
run_operation(tmp_path, executor, "mean", b)
262294

263295

264296
@pytest.mark.slow
265-
def test_sum_partial_reduce(tmp_path, spec):
297+
def test_sum_partial_reduce(tmp_path, spec, executor):
266298
a = cubed.random.random(
267299
(40000, 10000), chunks=(5000, 5000), spec=spec
268300
) # 200MB chunks
269301
b = partial_reduce(a, nxp.sum, split_every={0: 8})
270-
run_operation(tmp_path, "sum_partial_reduce", b)
302+
run_operation(tmp_path, executor, "sum_partial_reduce", b)
271303

272304

273305
# Internal functions
274306

275307

276-
def run_operation(tmp_path, name, result_array, *, optimize_function=None):
308+
def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None):
277309
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False)
278310
# result_array.visualize(f"cubed-{name}", optimize_function=optimize_function)
279-
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
280311
hist = HistoryCallback()
281312
mem_warn = MemoryWarningCallback()
282313
# use store=None to write to temporary zarr
@@ -291,8 +322,19 @@ def run_operation(tmp_path, name, result_array, *, optimize_function=None):
291322
df = hist.stats_df
292323
print(df)
293324

325+
# check peak memory does not exceed allowed mem
326+
assert (df["peak_measured_mem_end_mb_max"] <= ALLOWED_MEM // 1_000_000).all()
327+
328+
# check change in peak memory is no more than projected mem
329+
assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all()
330+
294331
# check projected_mem_utilization does not exceed 1
295-
assert (df["projected_mem_utilization"] <= 1.0).all()
332+
# except on processes executor that runs multiple tasks in a process
333+
if (
334+
executor.name != "processes"
335+
or executor.kwargs.get("max_tasks_per_child", None) == 1
336+
):
337+
assert (df["projected_mem_utilization"] <= 1.0).all()
296338

297339
# delete temp files for this test immediately since they are so large
298340
shutil.rmtree(tmp_path)

cubed/tests/utils.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import platform
2-
import sys
32
from typing import Iterable
43

54
import networkx as nx
@@ -29,10 +28,8 @@
2928
# ThreadsExecutor calls `peak_measured_mem` which is not supported on Windows
3029
ALL_EXECUTORS.append(create_executor("threads"))
3130

32-
# ProcessesExecutor uses an API available from 3.11 onwards (max_tasks_per_child)
33-
if sys.version_info >= (3, 11):
34-
ALL_EXECUTORS.append(create_executor("processes"))
35-
MAIN_EXECUTORS.append(create_executor("processes"))
31+
ALL_EXECUTORS.append(create_executor("processes"))
32+
MAIN_EXECUTORS.append(create_executor("processes"))
3633

3734
try:
3835
ALL_EXECUTORS.append(create_executor("beam"))

0 commit comments

Comments
 (0)