Skip to content

Commit 33773ca

Browse files
committed
wip
1 parent d50dabd commit 33773ca

File tree

3 files changed

+27
-3
lines changed

3 files changed

+27
-3
lines changed

Lib/concurrent/futures/_base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ def submit(self, fn, /, *args, **kwargs):
562562
"""
563563
raise NotImplementedError()
564564

565-
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
565+
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None, as_completed=False):
566566
"""Returns an iterator equivalent to map(fn, iter).
567567
568568
Args:
@@ -579,6 +579,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
579579
iterables pauses until a result is yielded from the buffer.
580580
If None, all input elements are eagerly collected, and a task is
581581
submitted for each.
582+
as_completed: Set to yield the results as they become available.
582583
583584
Returns:
584585
An iterator equivalent to: map(func, *iterables) but the calls may

Lib/concurrent/futures/process.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs):
813813
return f
814814
submit.__doc__ = _base.Executor.submit.__doc__
815815

816-
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
816+
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None, as_completed=False):
817817
"""Returns an iterator equivalent to map(fn, iter).
818818
819819
Args:
@@ -829,6 +829,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
829829
iterables pauses until a result is yielded from the buffer.
830830
If None, all input elements are eagerly collected, and a task is
831831
submitted for each.
832+
as_completed: Set to yield the results as they become available.
832833
833834
Returns:
834835
An iterator equivalent to: map(func, *iterables) but the calls may
@@ -845,7 +846,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
845846
results = super().map(partial(_process_chunk, fn),
846847
itertools.batched(zip(*iterables), chunksize),
847848
timeout=timeout,
848-
buffersize=buffersize)
849+
buffersize=buffersize
850+
as_completed=as_completed)
849851
return _chain_from_iterable_of_lists(results)
850852

851853
def shutdown(self, wait=True, *, cancel_futures=False):

Lib/test/test_concurrent_futures/executor.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ def make_dummy_object(_):
2626
return MyObject()
2727

2828

29+
def sleep_and_return(seconds: int):
30+
time.sleep(seconds)
31+
return seconds
32+
33+
2934
class ExecutorTest:
3035

3136
# Executor.shutdown() and context manager usage is tested by
@@ -168,6 +173,22 @@ def test_map_buffersize_when_error(self):
168173
msg="ints should be either processed, or buffered, or not fetched.",
169174
)
170175

176+
def test_map_as_completed(self):
177+
ints = [4, 2, 1, 0]
178+
self.assertListEqual(
179+
list(self.executor.map(sleep_and_return, ints, as_completed=True)),
180+
[0, 1, 2, 4],
181+
msg="should yield in First Done First Out if `as_completed=True`.",
182+
)
183+
184+
def test_map_buffersize_as_completed(self):
185+
ints = [4, 2, 1, 0]
186+
self.assertListEqual(
187+
list(self.executor.map(sleep_and_return, ints, buffersize=2, as_completed=True)),
188+
[2, 1, 0, 4],
189+
msg="should yield in First Done First Out within buffer if `buffersize` is set and `as_completed=True`.",
190+
)
191+
171192
def test_shutdown_race_issue12456(self):
172193
# Issue #12456: race condition at shutdown where trying to post a
173194
# sentinel in the call queue blocks (the queue is full while processes

0 commit comments

Comments
 (0)