From 5285d39831fd075a6a8d554075d3cf2e8546488b Mon Sep 17 00:00:00 2001 From: HunterAP Date: Wed, 16 Jun 2021 00:15:12 -0400 Subject: [PATCH 01/28] Added initial rework of the concurrent.futures module --- stdlib/concurrent/futures/process.pyi | 135 ++++++++++++++++++++++++-- stdlib/concurrent/futures/thread.pyi | 53 ++++++---- 2 files changed, 162 insertions(+), 26 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index a66557671ce5..8044a07f3fa6 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -1,9 +1,117 @@ +import multiprocessing as mp +import multiprocessing.connection +import multiprocessing.queues as mpq import sys -from typing import Any, Callable, Optional, Tuple +import threading +import weakref +from collections.abc import Mapping +from concurrent.futures import _base +from types import TracebackType +from typing import Any, Callable, Iterable, Optional, Sequence, Tuple -from ._base import Executor +_threads_wakeups: weakref.WeakKeyDictionary +_global_shutdown: bool -EXTRA_QUEUED_CALLS: Any +class _ThreadWakeup: + _closed: bool + _reader: multiprocessing.connection.PipeConnection + _writer: multiprocessing.connection.PipeConnection + def __init__(self) -> None: ... + def close(self) -> None: ... + def wakeup(self) -> None: ... + def clear(self) -> None: ... + +def _python_exit() -> None: ... + +EXTRA_QUEUED_CALLS: int + +_MAX_WINDOWS_WORKERS: int + +class _RemoteTraceback(Exception): + def __init__(self, tb: TracebackType) -> None: ... + tb: TracebackType + def __str__(self) -> str: ... + +class _ExceptionWithTraceback: + exc: BaseException + tb: TracebackType + def __init__(self, exc: BaseException, tb: TracebackType) -> None: ... + def __reduce__(self) -> tuple: ... + +def _rebuild_exc(exc, tb) -> Exception: ... + +class _WorkItem(object): + future: _base.Future + fn: Callable + args: Iterable[Any] + kwargs: Mapping[str, Any] + def __init__(self, future: _base.Future, fn: Callable, args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + +class _ResultItem(object): + work_id: int + exception: Exception + result: Any + def __init__(self, work_id: int, exception: Optional[Exception], result: Optional[Any] = ...) -> None: ... + +class _CallItem(object): + work_id: int + fn: Callable + args: Tuple[Any, ...] + kwargs: Mapping[str, Any] + def __init__( + self, work_id: int, fn: Callable = ..., args: Tuple[Any, ...] = ..., kwargs: Mapping[str, Any] = ... + ) -> None: ... + +class _SafeQueue(mpq.Queue): + pending_work_items: dict[int, _WorkItem] + shutdown_lock: threading.Lock + thread_wakeup: _ThreadWakeup + def __init__( + self, + max_size: Optional[int] = ..., + *, + ctx: mp.context.SpawnContext, + pending_work_items: dict[int, _WorkItem], + shutdown_lock: threading.Lock, + thread_wakeup: _ThreadWakeup, + ) -> None: ... + def _on_queue_feeder_error(self, e, obj) -> None: ... + +def _get_chunks(*iterables: Any, chunksize: int) -> tuple: ... +def _process_chunk(fn: Callable = ..., chunk: tuple = ...) -> Sequence: ... +def _sendback_result( + result_queue: mpq.Queue[_ResultItem], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... +) -> None: ... +def _process_worker( + call_queue: mpq.Queue[_CallItem], result_queue: mpq.Queue[_ResultItem], initializer: Callable, initargs: Any +) -> None: ... + +class _ExecutorManagerThread(threading.Thread): + thread_wakeup: _ThreadWakeup + shutdown_lock: threading.Lock + executor_reference: weakref.ref + processes: Mapping + call_queue: mpq.Queue[_CallItem] + result_queue: mpq.Queue[_ResultItem] + work_ids_queue: mpq.Queue[int] + pending_work_items: dict[int, _WorkItem] + def __init__(self, executor: ProcessPoolExecutor) -> None: ... + def run(self) -> None: ... + def add_call_item_to_queue(self) -> None: ... + def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ... + def process_result_item(self, result_item: Any) -> None: ... + def is_shutting_down(self) -> bool: ... + def terminate_broken(self, cause: str) -> None: ... + def flag_executor_shutting_down(self) -> None: ... + def shutdown_workers(self) -> None: ... + def join_executor_internals(self) -> None: ... + def get_n_children_alive(self) -> int: ... + +_system_limits_checked: bool +_system_limited: Optional[bool] + +def _check_system_limits() -> None: ... +def _chain_from_iterable_of_lists(iterable: Sequence) -> Any: ... if sys.version_info >= (3, 7): from ._base import BrokenExecutor @@ -14,15 +122,30 @@ else: if sys.version_info >= (3, 7): from multiprocessing.context import BaseContext - class ProcessPoolExecutor(Executor): + class ProcessPoolExecutor(_base.Executor): + _shutdown_thread: bool + _shutdown_lock: threading.Lock + _idle_worker_semaphore: threading.Semaphore + _broken: bool + _queue_count: int + _pending_work_items: dict[int, _WorkItem] + _cancel_pending_futures: bool + _executor_manager_thread_wakeup: _ThreadWakeup + _result_queue = mpq.SimpleQueue + _work_ids: mpq.Queue def __init__( self, max_workers: Optional[int] = ..., mp_context: Optional[BaseContext] = ..., - initializer: Optional[Callable[..., None]] = ..., + initializer: Optional[Callable] = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... + def _start_executor_manager_thread(self) -> None: ... + def _adjust_process_count(self) -> None: ... + def submit(self, fn, /, *args, **kwargs) -> _base.Future: ... + def map(self, fn: Callable, *iterables: Any, timeout: float = ..., chunksize: int = ...) -> Sequence[Any]: ... + def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... else: - class ProcessPoolExecutor(Executor): + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers: Optional[int] = ...) -> None: ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 9826922ca74f..008cfb453b80 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -1,23 +1,42 @@ -import queue +import multiprocessing.queues as mpq import sys -from typing import Any, Callable, Generic, Iterable, Mapping, Optional, Tuple, TypeVar +import threading +import weakref +from collections.abc import Mapping +from concurrent.futures import _base +from types import TracebackType +from typing import Any, Callable, Iterable, Optional, Sequence, Tuple -from ._base import Executor, Future +_threads_queues: weakref.WeakKeyDictionary +_shutdown: bool +_global_shutdown_lock: threading.Lock -if sys.version_info >= (3, 7): - from ._base import BrokenExecutor - class BrokenThreadPool(BrokenExecutor): ... +def _python_exit() -> None: ... if sys.version_info >= (3, 9): from types import GenericAlias -_S = TypeVar("_S") +class _WorkItem(object): + future: _base.Future + fn: Callable + args: Iterable[Any] + kwargs: Mapping[str, Any] + def __init__(self, future: _base.Future, fn: Callable, args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def run(self) -> None: ... + if sys.version_info >= (3, 9): + def __class_getitem__(cls, item: Any) -> GenericAlias: ... + +def _worker(executor_reference: weakref.ref, work_queue: mpq.Queue[Any], initializer: Callable, initargs: Any) -> None: ... -class ThreadPoolExecutor(Executor): +if sys.version_info >= (3, 7): + from ._base import BrokenExecutor + class BrokenThreadPool(BrokenExecutor): ... + +class ThreadPoolExecutor(_base.Executor): if sys.version_info >= (3, 7): - _work_queue: queue.SimpleQueue[Any] + _work_queue: mpq.SimpleQueue[_WorkItem] else: - _work_queue: queue.Queue[Any] + _work_queue: mpq.Queue[_WorkItem] if sys.version_info >= (3, 7): def __init__( self, @@ -28,13 +47,7 @@ class ThreadPoolExecutor(Executor): ) -> None: ... else: def __init__(self, max_workers: Optional[int] = ..., thread_name_prefix: str = ...) -> None: ... - -class _WorkItem(Generic[_S]): - future: Future[_S] - fn: Callable[..., _S] - args: Iterable[Any] - kwargs: Mapping[str, Any] - def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... - def run(self) -> None: ... - if sys.version_info >= (3, 9): - def __class_getitem__(cls, item: Any) -> GenericAlias: ... + def submit(self, fn, /, *args, **kwargs) -> _base.Future: ... + def _adjust_process_count(self) -> None: ... + def _initializer_failed(self) -> None: ... + def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... From 3d8d8ab823f036b83228a28904bc5401bb2b38b3 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Wed, 16 Jun 2021 01:25:57 -0400 Subject: [PATCH 02/28] Minor fixes --- stdlib/concurrent/futures/process.pyi | 45 ++++++++++++++------------- stdlib/concurrent/futures/thread.pyi | 15 +++++++-- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 8044a07f3fa6..e8ed4e55311f 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -6,6 +6,7 @@ import threading import weakref from collections.abc import Mapping from concurrent.futures import _base +from multiprocessing.context import BaseContext from types import TracebackType from typing import Any, Callable, Iterable, Optional, Sequence, Tuple @@ -120,19 +121,23 @@ if sys.version_info >= (3, 7): else: class BrokenProcessPool(RuntimeError): ... -if sys.version_info >= (3, 7): - from multiprocessing.context import BaseContext - class ProcessPoolExecutor(_base.Executor): - _shutdown_thread: bool - _shutdown_lock: threading.Lock - _idle_worker_semaphore: threading.Semaphore - _broken: bool - _queue_count: int - _pending_work_items: dict[int, _WorkItem] - _cancel_pending_futures: bool - _executor_manager_thread_wakeup: _ThreadWakeup - _result_queue = mpq.SimpleQueue - _work_ids: mpq.Queue +class ProcessPoolExecutor(_base.Executor): + _mp_context: Optional[BaseContext] + _initializer: Optional[Callable, None] = ... + _initargs: Optional[Any, None] = ... + _executor_manager_thread: _ThreadWakeup + _processes: {} + _shutdown_thread: bool + _shutdown_lock: threading.Lock + _idle_worker_semaphore: threading.Semaphore + _broken: bool + _queue_count: int + _pending_work_items: dict[int, _WorkItem] + _cancel_pending_futures: bool + _executor_manager_thread_wakeup: _ThreadWakeup + _result_queue = mpq.SimpleQueue + _work_ids: mpq.Queue + if sys.version_info >= (3, 7): def __init__( self, max_workers: Optional[int] = ..., @@ -140,12 +145,10 @@ if sys.version_info >= (3, 7): initializer: Optional[Callable] = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... - def _start_executor_manager_thread(self) -> None: ... - def _adjust_process_count(self) -> None: ... - def submit(self, fn, /, *args, **kwargs) -> _base.Future: ... - def map(self, fn: Callable, *iterables: Any, timeout: float = ..., chunksize: int = ...) -> Sequence[Any]: ... - def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... - -else: - class ProcessPoolExecutor(_base.Executor): + else: def __init__(self, max_workers: Optional[int] = ...) -> None: ... + def _start_executor_manager_thread(self) -> None: ... + def _adjust_process_count(self) -> None: ... + def submit(self, fn: Callable, /, *args: Any, **kwargs: Any) -> _base.Future: ... + def map(self, fn: Callable, *iterables: Any, timeout: float = ..., chunksize: int = ...) -> Sequence[Any]: ... + def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 008cfb453b80..92d9401d5281 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -33,6 +33,15 @@ if sys.version_info >= (3, 7): class BrokenThreadPool(BrokenExecutor): ... class ThreadPoolExecutor(_base.Executor): + _max_workers: int + _idle_semaphore: threading.Semaphore + _threads: set + _broken: bool + _shutdown: bool + _shutdown_lock: threading.Lock + _thread_name_prefix: Optional[str] = ... + _initializer: Optional[Callable, None] = ... + _initargs: Optional[Any, None] = ... if sys.version_info >= (3, 7): _work_queue: mpq.SimpleQueue[_WorkItem] else: @@ -42,12 +51,12 @@ class ThreadPoolExecutor(_base.Executor): self, max_workers: Optional[int] = ..., thread_name_prefix: str = ..., - initializer: Optional[Callable[..., None]] = ..., + initializer: Optional[Callable, None] = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... else: def __init__(self, max_workers: Optional[int] = ..., thread_name_prefix: str = ...) -> None: ... - def submit(self, fn, /, *args, **kwargs) -> _base.Future: ... - def _adjust_process_count(self) -> None: ... + def submit(self, fn: Callable, /, *args: Any, **kwargs: Any) -> _base.Future: ... + def _adjust_thread_count(self) -> None: ... def _initializer_failed(self) -> None: ... def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... From 69a5a03ba3b287953916c0008de070cbb17d2d3e Mon Sep 17 00:00:00 2001 From: HunterAP Date: Wed, 16 Jun 2021 11:35:41 -0400 Subject: [PATCH 03/28] Fixed some generics & changed to collections over typing for some types --- stdlib/concurrent/futures/_base.pyi | 3 ++ stdlib/concurrent/futures/process.pyi | 61 +++++++++++++++------------ stdlib/concurrent/futures/thread.pyi | 17 +++++--- 3 files changed, 48 insertions(+), 33 deletions(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index ca4087948c3e..4cf64107fa36 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -10,6 +10,7 @@ from typing import ( Iterable, Iterator, List, + Mapping, Optional, Protocol, Sequence, @@ -29,6 +30,8 @@ RUNNING: str CANCELLED: str CANCELLED_AND_NOTIFIED: str FINISHED: str +_FUTURE_STATES: Sequence +_STATE_TO_DESCRIPTION_MAP: Mapping LOGGER: Logger class Error(Exception): ... diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index e8ed4e55311f..171be4e7fd2a 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -4,11 +4,15 @@ import multiprocessing.queues as mpq import sys import threading import weakref -from collections.abc import Mapping +from collections.abc import Generator, Iterable, Mapping, MutableMapping, Sequence from concurrent.futures import _base from multiprocessing.context import BaseContext from types import TracebackType -from typing import Any, Callable, Iterable, Optional, Sequence, Tuple +from typing import Any, Callable, Generic, Optional, Tuple, TypeVar + +_WI = TypeVar["_WI"] +_RI = TypeVar["_RI"] +_CI = TypeVar["_CI"] _threads_wakeups: weakref.WeakKeyDictionary _global_shutdown: bool @@ -41,30 +45,30 @@ class _ExceptionWithTraceback: def _rebuild_exc(exc, tb) -> Exception: ... -class _WorkItem(object): - future: _base.Future - fn: Callable +class _WorkItem(Generic[_WI]): + future: _base.Future[_WI] + fn: Callable[..., _WI] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__(self, future: _base.Future, fn: Callable, args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def __init__( + self, future: _base.Future[_WI], fn: Callable[..., _WI], args: Iterable[Any], kwargs: Mapping[str, Any] + ) -> None: ... -class _ResultItem(object): +class _ResultItem(Generic[_RI]): work_id: int exception: Exception result: Any - def __init__(self, work_id: int, exception: Optional[Exception], result: Optional[Any] = ...) -> None: ... + def __init__(self, work_id: int, exception: Optional[Exception] = ..., result: Optional[Any] = ...) -> None: ... -class _CallItem(object): +class _CallItem(Generic[_CI]): work_id: int - fn: Callable - args: Tuple[Any, ...] + fn: Callable[..., _WI] + args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__( - self, work_id: int, fn: Callable = ..., args: Tuple[Any, ...] = ..., kwargs: Mapping[str, Any] = ... - ) -> None: ... + def __init__(self, work_id: int, fn: Callable[..., _WI], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... class _SafeQueue(mpq.Queue): - pending_work_items: dict[int, _WorkItem] + pending_work_items: MutableMapping[int, Generic[_WI]] shutdown_lock: threading.Lock thread_wakeup: _ThreadWakeup def __init__( @@ -72,19 +76,22 @@ class _SafeQueue(mpq.Queue): max_size: Optional[int] = ..., *, ctx: mp.context.SpawnContext, - pending_work_items: dict[int, _WorkItem], + pending_work_items: MutableMapping[int, Generic[_WI]], shutdown_lock: threading.Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... def _on_queue_feeder_error(self, e, obj) -> None: ... -def _get_chunks(*iterables: Any, chunksize: int) -> tuple: ... +def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple, None, None]: ... def _process_chunk(fn: Callable = ..., chunk: tuple = ...) -> Sequence: ... def _sendback_result( - result_queue: mpq.Queue[_ResultItem], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... + result_queue: mpq.SimpleQueue[Generic[_RI]], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... ) -> None: ... def _process_worker( - call_queue: mpq.Queue[_CallItem], result_queue: mpq.Queue[_ResultItem], initializer: Callable, initargs: Any + call_queue: mpq.Queue[Generic[_CI]], + result_queue: mpq.SimpleQueue[Generic[_RI]], + initializer: Optional[Callable[..., None]] = ..., + initargs: Tuple[Any, ...] = ..., ) -> None: ... class _ExecutorManagerThread(threading.Thread): @@ -92,10 +99,10 @@ class _ExecutorManagerThread(threading.Thread): shutdown_lock: threading.Lock executor_reference: weakref.ref processes: Mapping - call_queue: mpq.Queue[_CallItem] - result_queue: mpq.Queue[_ResultItem] + call_queue: mpq.Queue[Generic[_CI]] + result_queue: mpq.SimpleQueue[Generic[_RI]] work_ids_queue: mpq.Queue[int] - pending_work_items: dict[int, _WorkItem] + pending_work_items: MutableMapping[int, Generic[_WI]] def __init__(self, executor: ProcessPoolExecutor) -> None: ... def run(self) -> None: ... def add_call_item_to_queue(self) -> None: ... @@ -123,8 +130,8 @@ else: class ProcessPoolExecutor(_base.Executor): _mp_context: Optional[BaseContext] - _initializer: Optional[Callable, None] = ... - _initargs: Optional[Any, None] = ... + _initializer: Optional[Callable[..., None]] = ... + _initargs: Tuple[Any, ...] = ... _executor_manager_thread: _ThreadWakeup _processes: {} _shutdown_thread: bool @@ -132,17 +139,17 @@ class ProcessPoolExecutor(_base.Executor): _idle_worker_semaphore: threading.Semaphore _broken: bool _queue_count: int - _pending_work_items: dict[int, _WorkItem] + _pending_work_items: MutableMapping[int, Generic[_WI]] _cancel_pending_futures: bool _executor_manager_thread_wakeup: _ThreadWakeup - _result_queue = mpq.SimpleQueue + _result_queue: mpq.SimpleQueue _work_ids: mpq.Queue if sys.version_info >= (3, 7): def __init__( self, max_workers: Optional[int] = ..., mp_context: Optional[BaseContext] = ..., - initializer: Optional[Callable] = ..., + initializer: Optional[Callable[..., None]] = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... else: diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 92d9401d5281..df7fc7dbfb9b 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -2,10 +2,10 @@ import multiprocessing.queues as mpq import sys import threading import weakref -from collections.abc import Mapping +from collections.abc import Generator, Iterable, Mapping, Sequence from concurrent.futures import _base from types import TracebackType -from typing import Any, Callable, Iterable, Optional, Sequence, Tuple +from typing import Any, Callable, Optional, Tuple _threads_queues: weakref.WeakKeyDictionary _shutdown: bool @@ -26,7 +26,12 @@ class _WorkItem(object): if sys.version_info >= (3, 9): def __class_getitem__(cls, item: Any) -> GenericAlias: ... -def _worker(executor_reference: weakref.ref, work_queue: mpq.Queue[Any], initializer: Callable, initargs: Any) -> None: ... +def _worker( + executor_reference: weakref.ref, + work_queue: mpq.Queue[Any], + initializer: Optional[Callable[..., None]], + initargs: Tuple[Any, ...], +) -> None: ... if sys.version_info >= (3, 7): from ._base import BrokenExecutor @@ -40,8 +45,8 @@ class ThreadPoolExecutor(_base.Executor): _shutdown: bool _shutdown_lock: threading.Lock _thread_name_prefix: Optional[str] = ... - _initializer: Optional[Callable, None] = ... - _initargs: Optional[Any, None] = ... + _initializer: Optional[Callable[..., None]] = ... + _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): _work_queue: mpq.SimpleQueue[_WorkItem] else: @@ -51,7 +56,7 @@ class ThreadPoolExecutor(_base.Executor): self, max_workers: Optional[int] = ..., thread_name_prefix: str = ..., - initializer: Optional[Callable, None] = ..., + initializer: Optional[Callable[..., None]] = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... else: From c408496c0238688f2f4509936e417e58857b3cbd Mon Sep 17 00:00:00 2001 From: HunterAP Date: Wed, 16 Jun 2021 11:43:37 -0400 Subject: [PATCH 04/28] Switched thread to use queues instead of multiprocessing.queues --- stdlib/concurrent/futures/thread.pyi | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index df7fc7dbfb9b..5cf3e5f9d855 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -6,6 +6,9 @@ from collections.abc import Generator, Iterable, Mapping, Sequence from concurrent.futures import _base from types import TracebackType from typing import Any, Callable, Optional, Tuple +import queue + +_WI = TypeVar["_WI"] _threads_queues: weakref.WeakKeyDictionary _shutdown: bool @@ -16,9 +19,9 @@ def _python_exit() -> None: ... if sys.version_info >= (3, 9): from types import GenericAlias -class _WorkItem(object): - future: _base.Future - fn: Callable +class _WorkItem(Generic[_WI]): + future: _base.Future[_WI] + fn: Callable[..., _WI] args: Iterable[Any] kwargs: Mapping[str, Any] def __init__(self, future: _base.Future, fn: Callable, args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... @@ -28,7 +31,7 @@ class _WorkItem(object): def _worker( executor_reference: weakref.ref, - work_queue: mpq.Queue[Any], + work_queue: queue.SimpleQueue[Any], initializer: Optional[Callable[..., None]], initargs: Tuple[Any, ...], ) -> None: ... @@ -48,9 +51,9 @@ class ThreadPoolExecutor(_base.Executor): _initializer: Optional[Callable[..., None]] = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): - _work_queue: mpq.SimpleQueue[_WorkItem] + _work_queue: mpq.SimpleQueue[Generic[_WI]] else: - _work_queue: mpq.Queue[_WorkItem] + _work_queue: queue.Queue[Generic[_WI]] if sys.version_info >= (3, 7): def __init__( self, From cf486b150672682f95b3e9b3bcff531b16d97d74 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Wed, 16 Jun 2021 19:23:28 -0400 Subject: [PATCH 05/28] More fixes --- stdlib/concurrent/futures/_base.pyi | 20 +++----------------- stdlib/concurrent/futures/process.pyi | 2 +- stdlib/concurrent/futures/thread.pyi | 10 +++++----- 3 files changed, 9 insertions(+), 23 deletions(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index 4cf64107fa36..c2bb44b9d5c0 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -1,23 +1,9 @@ import sys import threading from abc import abstractmethod +from collections.abc import Container, Iterable, Iterator, Mapping, MutableMapping, Sequence, Set from logging import Logger -from typing import ( - Any, - Callable, - Container, - Generic, - Iterable, - Iterator, - List, - Mapping, - Optional, - Protocol, - Sequence, - Set, - TypeVar, - overload, -) +from typing import Any, Callable, Generic, Optional, Protocol, TypeVar, overload if sys.version_info >= (3, 9): from types import GenericAlias @@ -102,7 +88,7 @@ def wait(fs: Iterable[Future[_T]], timeout: Optional[float] = ..., return_when: class _Waiter: event: threading.Event - finished_futures: List[Future[Any]] + finished_futures: Sequence[Future[Any]] def __init__(self) -> None: ... def add_result(self, future: Future[Any]) -> None: ... def add_exception(self, future: Future[Any]) -> None: ... diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 171be4e7fd2a..2ab4071c5077 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -33,8 +33,8 @@ EXTRA_QUEUED_CALLS: int _MAX_WINDOWS_WORKERS: int class _RemoteTraceback(Exception): + tb: str def __init__(self, tb: TracebackType) -> None: ... - tb: TracebackType def __str__(self) -> str: ... class _ExceptionWithTraceback: diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 5cf3e5f9d855..5c622d65b9a8 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -1,12 +1,11 @@ -import multiprocessing.queues as mpq +import queue import sys import threading import weakref from collections.abc import Generator, Iterable, Mapping, Sequence from concurrent.futures import _base from types import TracebackType -from typing import Any, Callable, Optional, Tuple -import queue +from typing import Any, Callable, Generic, Optional, Tuple, TypeVar _WI = TypeVar["_WI"] @@ -51,7 +50,7 @@ class ThreadPoolExecutor(_base.Executor): _initializer: Optional[Callable[..., None]] = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): - _work_queue: mpq.SimpleQueue[Generic[_WI]] + _work_queue: queue.SimpleQueue[Generic[_WI]] else: _work_queue: queue.Queue[Generic[_WI]] if sys.version_info >= (3, 7): @@ -64,7 +63,8 @@ class ThreadPoolExecutor(_base.Executor): ) -> None: ... else: def __init__(self, max_workers: Optional[int] = ..., thread_name_prefix: str = ...) -> None: ... - def submit(self, fn: Callable, /, *args: Any, **kwargs: Any) -> _base.Future: ... def _adjust_thread_count(self) -> None: ... def _initializer_failed(self) -> None: ... + def submit(self, fn: Callable, /, *args: Any, **kwargs: Any) -> _base.Future: ... + def map(self, fn: Callable, *iterables: Any, timeout: float = ..., chunksize: int = ...) -> Sequence[Any]: ... def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... From e63acf4ddcbb655b0bb889c118c516a8ed2fd534 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 17 Jun 2021 04:27:27 -0400 Subject: [PATCH 06/28] More fixes following results from running tests locally --- stdlib/concurrent/futures/_base.pyi | 6 +-- stdlib/concurrent/futures/process.pyi | 77 ++++++++++++--------------- stdlib/concurrent/futures/thread.pyi | 47 ++++++++-------- 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index c2bb44b9d5c0..5e95a94edecc 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -1,7 +1,7 @@ import sys import threading from abc import abstractmethod -from collections.abc import Container, Iterable, Iterator, Mapping, MutableMapping, Sequence, Set +from collections.abc import Container, Iterable, Iterator, Mapping, Sequence, Set from logging import Logger from typing import Any, Callable, Generic, Optional, Protocol, TypeVar, overload @@ -16,8 +16,8 @@ RUNNING: str CANCELLED: str CANCELLED_AND_NOTIFIED: str FINISHED: str -_FUTURE_STATES: Sequence -_STATE_TO_DESCRIPTION_MAP: Mapping +_FUTURE_STATES: Sequence[str] +_STATE_TO_DESCRIPTION_MAP: Mapping[str, str] LOGGER: Logger class Error(Exception): ... diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 2ab4071c5077..0e23b7ab99b4 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -1,26 +1,22 @@ import multiprocessing as mp -import multiprocessing.connection +import multiprocessing.connection as mpconn +import multiprocessing.context as mpcont import multiprocessing.queues as mpq import sys import threading import weakref -from collections.abc import Generator, Iterable, Mapping, MutableMapping, Sequence +from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence from concurrent.futures import _base -from multiprocessing.context import BaseContext from types import TracebackType -from typing import Any, Callable, Generic, Optional, Tuple, TypeVar - -_WI = TypeVar["_WI"] -_RI = TypeVar["_RI"] -_CI = TypeVar["_CI"] +from typing import Any, Callable, Optional, Tuple, Union _threads_wakeups: weakref.WeakKeyDictionary _global_shutdown: bool class _ThreadWakeup: _closed: bool - _reader: multiprocessing.connection.PipeConnection - _writer: multiprocessing.connection.PipeConnection + _reader: mpconn.Connection + _writer: mpconn.Connection def __init__(self) -> None: ... def close(self) -> None: ... def wakeup(self) -> None: ... @@ -41,68 +37,66 @@ class _ExceptionWithTraceback: exc: BaseException tb: TracebackType def __init__(self, exc: BaseException, tb: TracebackType) -> None: ... - def __reduce__(self) -> tuple: ... + def __reduce__(self) -> Union[str, Tuple[Any, ...]]: ... def _rebuild_exc(exc, tb) -> Exception: ... -class _WorkItem(Generic[_WI]): - future: _base.Future[_WI] - fn: Callable[..., _WI] +class _WorkItem(object): + future: _base.Future + fn: Callable[..., Any] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__( - self, future: _base.Future[_WI], fn: Callable[..., _WI], args: Iterable[Any], kwargs: Mapping[str, Any] - ) -> None: ... + def __init__(self, future: _base.Future, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... -class _ResultItem(Generic[_RI]): +class _ResultItem(object): work_id: int exception: Exception result: Any def __init__(self, work_id: int, exception: Optional[Exception] = ..., result: Optional[Any] = ...) -> None: ... -class _CallItem(Generic[_CI]): +class _CallItem(object): work_id: int - fn: Callable[..., _WI] + fn: Callable[..., Any] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__(self, work_id: int, fn: Callable[..., _WI], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... class _SafeQueue(mpq.Queue): - pending_work_items: MutableMapping[int, Generic[_WI]] + pending_work_items: MutableMapping[int, _WorkItem] shutdown_lock: threading.Lock thread_wakeup: _ThreadWakeup def __init__( self, max_size: Optional[int] = ..., *, - ctx: mp.context.SpawnContext, - pending_work_items: MutableMapping[int, Generic[_WI]], + ctx: mpcont.BaseContext, + pending_work_items: MutableMapping[int, _WorkItem], shutdown_lock: threading.Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... def _on_queue_feeder_error(self, e, obj) -> None: ... -def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple, None, None]: ... -def _process_chunk(fn: Callable = ..., chunk: tuple = ...) -> Sequence: ... +def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... +def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... def _sendback_result( - result_queue: mpq.SimpleQueue[Generic[_RI]], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... + result_queue: mpq.SimpleQueue[_WorkItem], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... ) -> None: ... def _process_worker( - call_queue: mpq.Queue[Generic[_CI]], - result_queue: mpq.SimpleQueue[Generic[_RI]], - initializer: Optional[Callable[..., None]] = ..., - initargs: Tuple[Any, ...] = ..., + call_queue: mpq.Queue[_CallItem], + result_queue: mpq.SimpleQueue[_ResultItem], + initializer: Optional[Callable[..., None]], + initargs: Tuple[Any, ...], ) -> None: ... class _ExecutorManagerThread(threading.Thread): thread_wakeup: _ThreadWakeup shutdown_lock: threading.Lock executor_reference: weakref.ref - processes: Mapping - call_queue: mpq.Queue[Generic[_CI]] - result_queue: mpq.SimpleQueue[Generic[_RI]] + processes: MutableMapping[int, mpcont.Process] + call_queue: mpq.Queue[_CallItem] + result_queue: mpq.SimpleQueue[_ResultItem] work_ids_queue: mpq.Queue[int] - pending_work_items: MutableMapping[int, Generic[_WI]] + pending_work_items: MutableMapping[int, _WorkItem] def __init__(self, executor: ProcessPoolExecutor) -> None: ... def run(self) -> None: ... def add_call_item_to_queue(self) -> None: ... @@ -119,7 +113,7 @@ _system_limits_checked: bool _system_limited: Optional[bool] def _check_system_limits() -> None: ... -def _chain_from_iterable_of_lists(iterable: Sequence) -> Any: ... +def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ... if sys.version_info >= (3, 7): from ._base import BrokenExecutor @@ -129,17 +123,17 @@ else: class BrokenProcessPool(RuntimeError): ... class ProcessPoolExecutor(_base.Executor): - _mp_context: Optional[BaseContext] + _mp_context: Optional[mpcont.BaseContext] _initializer: Optional[Callable[..., None]] = ... _initargs: Tuple[Any, ...] = ... _executor_manager_thread: _ThreadWakeup - _processes: {} + _processes: MutableMapping[int, mpcont.Process] _shutdown_thread: bool _shutdown_lock: threading.Lock _idle_worker_semaphore: threading.Semaphore _broken: bool _queue_count: int - _pending_work_items: MutableMapping[int, Generic[_WI]] + _pending_work_items: MutableMapping[int, _WorkItem] _cancel_pending_futures: bool _executor_manager_thread_wakeup: _ThreadWakeup _result_queue: mpq.SimpleQueue @@ -148,7 +142,7 @@ class ProcessPoolExecutor(_base.Executor): def __init__( self, max_workers: Optional[int] = ..., - mp_context: Optional[BaseContext] = ..., + mp_context: Optional[mpcont.BaseContext] = ..., initializer: Optional[Callable[..., None]] = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... @@ -156,6 +150,3 @@ class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers: Optional[int] = ...) -> None: ... def _start_executor_manager_thread(self) -> None: ... def _adjust_process_count(self) -> None: ... - def submit(self, fn: Callable, /, *args: Any, **kwargs: Any) -> _base.Future: ... - def map(self, fn: Callable, *iterables: Any, timeout: float = ..., chunksize: int = ...) -> Sequence[Any]: ... - def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 5c622d65b9a8..af9a0e86da81 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -2,12 +2,9 @@ import queue import sys import threading import weakref -from collections.abc import Generator, Iterable, Mapping, Sequence +from collections.abc import Iterable, Mapping, Set from concurrent.futures import _base -from types import TracebackType -from typing import Any, Callable, Generic, Optional, Tuple, TypeVar - -_WI = TypeVar["_WI"] +from typing import Any, Callable, Optional, Tuple _threads_queues: weakref.WeakKeyDictionary _shutdown: bool @@ -18,22 +15,33 @@ def _python_exit() -> None: ... if sys.version_info >= (3, 9): from types import GenericAlias -class _WorkItem(Generic[_WI]): - future: _base.Future[_WI] - fn: Callable[..., _WI] +class _WorkItem(object): + future: _base.Future[Any] + fn: Callable[..., Any] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__(self, future: _base.Future, fn: Callable, args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def __init__( + self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any] + ) -> None: ... def run(self) -> None: ... if sys.version_info >= (3, 9): def __class_getitem__(cls, item: Any) -> GenericAlias: ... -def _worker( - executor_reference: weakref.ref, - work_queue: queue.SimpleQueue[Any], - initializer: Optional[Callable[..., None]], - initargs: Tuple[Any, ...], -) -> None: ... +if sys.version_info >= (3, 7): + def _worker( + executor_reference: weakref.ref, + work_queue: queue.SimpleQueue[Any], + initializer: Optional[Callable[..., None]], + initargs: Tuple[Any, ...], + ) -> None: ... + +else: + def _worker( + executor_reference: weakref.ref, + work_queue: queue.Queue[Any], + initializer: Optional[Callable[..., None]], + initargs: Tuple[Any, ...], + ) -> None: ... if sys.version_info >= (3, 7): from ._base import BrokenExecutor @@ -42,7 +50,7 @@ if sys.version_info >= (3, 7): class ThreadPoolExecutor(_base.Executor): _max_workers: int _idle_semaphore: threading.Semaphore - _threads: set + _threads: Set[threading.Thread] _broken: bool _shutdown: bool _shutdown_lock: threading.Lock @@ -50,9 +58,9 @@ class ThreadPoolExecutor(_base.Executor): _initializer: Optional[Callable[..., None]] = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): - _work_queue: queue.SimpleQueue[Generic[_WI]] + _work_queue: queue.SimpleQueue[_WorkItem] else: - _work_queue: queue.Queue[Generic[_WI]] + _work_queue: queue.Queue[_WorkItem] if sys.version_info >= (3, 7): def __init__( self, @@ -65,6 +73,3 @@ class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers: Optional[int] = ..., thread_name_prefix: str = ...) -> None: ... def _adjust_thread_count(self) -> None: ... def _initializer_failed(self) -> None: ... - def submit(self, fn: Callable, /, *args: Any, **kwargs: Any) -> _base.Future: ... - def map(self, fn: Callable, *iterables: Any, timeout: float = ..., chunksize: int = ...) -> Sequence[Any]: ... - def shutdown(self, wait: bool = ..., *, cancel_futures: bool = ...) -> None: ... From 49150a626170eafdfa2fbccfca061300fb473e86 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 17:01:42 -0400 Subject: [PATCH 07/28] Tmp commit of changes --- stdlib/concurrent/futures/process.pyi | 6 +++--- stdlib/concurrent/futures/thread.pyi | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 352ee129d870..9e777325dbe2 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -10,7 +10,7 @@ from concurrent.futures import _base from types import TracebackType from typing import Any, Callable, Optional, Tuple, Union -_threads_wakeups: weakref.WeakKeyDictionary +_threads_wakeups: Mapping[Any, Any] _global_shutdown: bool class _ThreadWakeup: @@ -39,7 +39,7 @@ class _ExceptionWithTraceback: def __init__(self, exc: BaseException, tb: TracebackType) -> None: ... def __reduce__(self) -> Union[str, Tuple[Any, ...]]: ... -def _rebuild_exc(exc, tb) -> Exception: ... +def _rebuild_exc(exc: Exception, tb: str) -> Exception: ... class _WorkItem(object): future: _base.Future @@ -74,7 +74,7 @@ class _SafeQueue(mpq.Queue): shutdown_lock: threading.Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... - def _on_queue_feeder_error(self, e, obj) -> None: ... + def _on_queue_feeder_error(self, e: Exception, obj) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index ad5cb95553ef..c120efc82770 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -6,7 +6,7 @@ from collections.abc import Iterable, Mapping, Set from concurrent.futures import _base from typing import Any, Callable, Optional, Tuple -_threads_queues: weakref.WeakKeyDictionary +_threads_queues: Mapping[Any, Any] _shutdown: bool _global_shutdown_lock: threading.Lock From a7b8a870d32023d5b727fb6a1c9ba2e35e66191c Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 18:28:43 -0400 Subject: [PATCH 08/28] Minor flake8 fix --- stdlib/concurrent/futures/_base.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index e5738e59f91d..6dd6737ba97e 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -4,7 +4,7 @@ from _typeshed import Self from abc import abstractmethod from collections.abc import Container, Iterable, Iterator, Mapping, Sequence, Set from logging import Logger -from typing import Any, Callable, Container, Generic, Iterable, Iterator, Protocol, Sequence, Set, TypeVar, overload +from typing import Any, Callable, Generic, Protocol, TypeVar, overload if sys.version_info >= (3, 9): from types import GenericAlias From 6c5b37b8d81152bfe31191bc708dfcf255569c34 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 18:50:38 -0400 Subject: [PATCH 09/28] Fixing some issues --- stdlib/concurrent/futures/process.pyi | 12 ++++++------ stdlib/concurrent/futures/thread.pyi | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 9e777325dbe2..bc0b56bf37c3 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -42,11 +42,11 @@ class _ExceptionWithTraceback: def _rebuild_exc(exc: Exception, tb: str) -> Exception: ... class _WorkItem(object): - future: _base.Future + future: _base.Future[Any] fn: Callable[..., Any] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__(self, future: _base.Future, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def __init__(self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... class _ResultItem(object): work_id: int @@ -74,7 +74,7 @@ class _SafeQueue(mpq.Queue): shutdown_lock: threading.Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... - def _on_queue_feeder_error(self, e: Exception, obj) -> None: ... + def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... @@ -91,7 +91,7 @@ def _process_worker( class _ExecutorManagerThread(threading.Thread): thread_wakeup: _ThreadWakeup shutdown_lock: threading.Lock - executor_reference: weakref.ref + executor_reference: weakref.ref[Any, Callable] processes: MutableMapping[int, mpcont.Process] call_queue: mpq.Queue[_CallItem] result_queue: mpq.SimpleQueue[_ResultItem] @@ -136,8 +136,8 @@ class ProcessPoolExecutor(_base.Executor): _pending_work_items: MutableMapping[int, _WorkItem] _cancel_pending_futures: bool _executor_manager_thread_wakeup: _ThreadWakeup - _result_queue: mpq.SimpleQueue - _work_ids: mpq.Queue + _result_queue: mpq.SimpleQueue[Any] + _work_ids: mpq.Queue[Any] if sys.version_info >= (3, 7): def __init__( self, diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index c120efc82770..0f38923a936b 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -29,7 +29,7 @@ class _WorkItem(object): if sys.version_info >= (3, 7): def _worker( - executor_reference: weakref.ref, + executor_reference: weakref.ref[Any, Callable], work_queue: queue.SimpleQueue[Any], initializer: Optional[Callable[..., None]], initargs: Tuple[Any, ...], @@ -37,7 +37,7 @@ if sys.version_info >= (3, 7): else: def _worker( - executor_reference: weakref.ref, + executor_reference: weakref.ref[Any, Callable], work_queue: queue.Queue[Any], initializer: Optional[Callable[..., None]], initargs: Tuple[Any, ...], From 8fe6f7d133c50bfeb2cdeb860d14afbf4fb181f9 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 19:37:57 -0400 Subject: [PATCH 10/28] Fixed a weakref.ref issue --- stdlib/concurrent/futures/process.pyi | 6 ++++-- stdlib/concurrent/futures/thread.pyi | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index bc0b56bf37c3..931d8eda74e9 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -46,7 +46,9 @@ class _WorkItem(object): fn: Callable[..., Any] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__(self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def __init__( + self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any] + ) -> None: ... class _ResultItem(object): work_id: int @@ -91,7 +93,7 @@ def _process_worker( class _ExecutorManagerThread(threading.Thread): thread_wakeup: _ThreadWakeup shutdown_lock: threading.Lock - executor_reference: weakref.ref[Any, Callable] + executor_reference: weakref.ref[Any] processes: MutableMapping[int, mpcont.Process] call_queue: mpq.Queue[_CallItem] result_queue: mpq.SimpleQueue[_ResultItem] diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 0f38923a936b..c7c8d681d9fd 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -37,7 +37,7 @@ if sys.version_info >= (3, 7): else: def _worker( - executor_reference: weakref.ref[Any, Callable], + executor_reference: weakref.ref[Any], work_queue: queue.Queue[Any], initializer: Optional[Callable[..., None]], initargs: Tuple[Any, ...], From cb7843bc07c7d8c6c5fac98710b2293f98c2d6f7 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 19:45:10 -0400 Subject: [PATCH 11/28] Fixed one more weakref issue --- stdlib/concurrent/futures/thread.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index c7c8d681d9fd..b15115c5445e 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -29,7 +29,7 @@ class _WorkItem(object): if sys.version_info >= (3, 7): def _worker( - executor_reference: weakref.ref[Any, Callable], + executor_reference: weakref.ref[Any], work_queue: queue.SimpleQueue[Any], initializer: Optional[Callable[..., None]], initargs: Tuple[Any, ...], From 6c82496e251662d8ce09e0ca1d26d546873720bc Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 20:23:33 -0400 Subject: [PATCH 12/28] Fixed some issues with required version --- stdlib/concurrent/futures/process.pyi | 32 ++++++++++++++------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 931d8eda74e9..1fb727d59bcd 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -63,20 +63,21 @@ class _CallItem(object): kwargs: Mapping[str, Any] def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... -class _SafeQueue(mpq.Queue): - pending_work_items: MutableMapping[int, _WorkItem] - shutdown_lock: threading.Lock - thread_wakeup: _ThreadWakeup - def __init__( - self, - max_size: Optional[int] = ..., - *, - ctx: mpcont.BaseContext, - pending_work_items: MutableMapping[int, _WorkItem], - shutdown_lock: threading.Lock, - thread_wakeup: _ThreadWakeup, - ) -> None: ... - def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... +if sys.version_info >= (3, 7): + class _SafeQueue(mpq.Queue): + pending_work_items: MutableMapping[int, _WorkItem] + shutdown_lock: threading.Lock + thread_wakeup: _ThreadWakeup + def __init__( + self, + max_size: Optional[int] = ..., + *, + ctx: mpcont.BaseContext, + pending_work_items: MutableMapping[int, _WorkItem], + shutdown_lock: threading.Lock, + thread_wakeup: _ThreadWakeup, + ) -> None: ... + def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... @@ -150,5 +151,6 @@ class ProcessPoolExecutor(_base.Executor): ) -> None: ... else: def __init__(self, max_workers: int | None = ...) -> None: ... - def _start_executor_manager_thread(self) -> None: ... + if sys.version_info >= (3, 9): + def _start_executor_manager_thread(self) -> None: ... def _adjust_process_count(self) -> None: ... From bfdbc0342547aaefb643f613fb31f1dbe0bfa909 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 20:27:29 -0400 Subject: [PATCH 13/28] Fixed more python min version requirements --- stdlib/concurrent/futures/process.pyi | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 1fb727d59bcd..5009f70d49ad 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -68,15 +68,24 @@ if sys.version_info >= (3, 7): pending_work_items: MutableMapping[int, _WorkItem] shutdown_lock: threading.Lock thread_wakeup: _ThreadWakeup - def __init__( - self, - max_size: Optional[int] = ..., - *, - ctx: mpcont.BaseContext, - pending_work_items: MutableMapping[int, _WorkItem], - shutdown_lock: threading.Lock, - thread_wakeup: _ThreadWakeup, - ) -> None: ... + if sys.version_info >= (3, 9): + def __init__( + self, + max_size: Optional[int] = ..., + *, + ctx: mpcont.BaseContext, + pending_work_items: MutableMapping[int, _WorkItem], + shutdown_lock: threading.Lock, + thread_wakeup: _ThreadWakeup, + ) -> None: ... + else: + def __init__( + self, + max_size: Optional[int] = ..., + *, + ctx: mpcont.BaseContext, + pending_work_items: MutableMapping[int, _WorkItem] + ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... From c5ec620d1f5d3fc2f3cc3d7432f621ce1f6da4f6 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 20:34:16 -0400 Subject: [PATCH 14/28] More min version fixes --- stdlib/concurrent/futures/process.pyi | 19 ++++++++++++------- stdlib/concurrent/futures/thread.pyi | 10 +++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 5009f70d49ad..85e86771d8c9 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -84,7 +84,7 @@ if sys.version_info >= (3, 7): max_size: Optional[int] = ..., *, ctx: mpcont.BaseContext, - pending_work_items: MutableMapping[int, _WorkItem] + pending_work_items: MutableMapping[int, _WorkItem], ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... @@ -93,12 +93,17 @@ def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Gen def _sendback_result( result_queue: mpq.SimpleQueue[_WorkItem], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... ) -> None: ... -def _process_worker( - call_queue: mpq.Queue[_CallItem], - result_queue: mpq.SimpleQueue[_ResultItem], - initializer: Optional[Callable[..., None]], - initargs: Tuple[Any, ...], -) -> None: ... + +if sys.version_info >= (3, 7): + def _process_worker( + call_queue: mpq.Queue[_CallItem], + result_queue: mpq.SimpleQueue[_ResultItem], + initializer: Optional[Callable[..., None]], + initargs: Tuple[Any, ...], + ) -> None: ... + +else: + def _process_worker(call_queue: mpq.Queue[_CallItem], result_queue: mpq.SimpleQueue[_ResultItem]) -> None: ... class _ExecutorManagerThread(threading.Thread): thread_wakeup: _ThreadWakeup diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index b15115c5445e..a9ee63ba8313 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -36,12 +36,7 @@ if sys.version_info >= (3, 7): ) -> None: ... else: - def _worker( - executor_reference: weakref.ref[Any], - work_queue: queue.Queue[Any], - initializer: Optional[Callable[..., None]], - initargs: Tuple[Any, ...], - ) -> None: ... + def _worker(executor_reference: weakref.ref[Any], work_queue: queue.Queue[Any]) -> None: ... if sys.version_info >= (3, 7): from ._base import BrokenExecutor @@ -72,4 +67,5 @@ class ThreadPoolExecutor(_base.Executor): else: def __init__(self, max_workers: int | None = ..., thread_name_prefix: str = ...) -> None: ... def _adjust_thread_count(self) -> None: ... - def _initializer_failed(self) -> None: ... + if sys.version_info >= (3, 7): + def _initializer_failed(self) -> None: ... From 4fb3b5c176b6c7f3e23b6c3e9ce1ceb03338d0de Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 20:39:20 -0400 Subject: [PATCH 15/28] Fixed misc error in workflow regarded outdated pip --- .github/workflows/tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 69ef378df314..d528892513ff 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -109,6 +109,8 @@ jobs: uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} + - name: Update pip + run: python -m pip install -U pip - name: Install dependencies run: pip install $(grep mypy== requirements-tests-py3.txt) - name: Run stubtest From 6dc70ccee5585c16bce90438d8e522a47994fa8b Mon Sep 17 00:00:00 2001 From: HunterAP Date: Fri, 27 Aug 2021 23:47:45 -0400 Subject: [PATCH 16/28] Replaced any usage of Optional and Union with proper form as described in the contributions guide --- stdlib/concurrent/futures/process.pyi | 24 ++++++++++-------------- stdlib/concurrent/futures/thread.pyi | 10 +++++----- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 85e86771d8c9..7efecb56a105 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -8,7 +8,7 @@ import weakref from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence from concurrent.futures import _base from types import TracebackType -from typing import Any, Callable, Optional, Tuple, Union +from typing import Any, Callable, Tuple _threads_wakeups: Mapping[Any, Any] _global_shutdown: bool @@ -37,7 +37,7 @@ class _ExceptionWithTraceback: exc: BaseException tb: TracebackType def __init__(self, exc: BaseException, tb: TracebackType) -> None: ... - def __reduce__(self) -> Union[str, Tuple[Any, ...]]: ... + def __reduce__(self) -> str | Tuple[Any, ...]: ... def _rebuild_exc(exc: Exception, tb: str) -> Exception: ... @@ -54,7 +54,7 @@ class _ResultItem(object): work_id: int exception: Exception result: Any - def __init__(self, work_id: int, exception: Optional[Exception] = ..., result: Optional[Any] = ...) -> None: ... + def __init__(self, work_id: int, exception: Exception | None = ..., result: Any | None = ...) -> None: ... class _CallItem(object): work_id: int @@ -71,7 +71,7 @@ if sys.version_info >= (3, 7): if sys.version_info >= (3, 9): def __init__( self, - max_size: Optional[int] = ..., + max_size: int | None = ..., *, ctx: mpcont.BaseContext, pending_work_items: MutableMapping[int, _WorkItem], @@ -80,25 +80,21 @@ if sys.version_info >= (3, 7): ) -> None: ... else: def __init__( - self, - max_size: Optional[int] = ..., - *, - ctx: mpcont.BaseContext, - pending_work_items: MutableMapping[int, _WorkItem], + self, max_size: int | None = ..., *, ctx: mpcont.BaseContext, pending_work_items: MutableMapping[int, _WorkItem] ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... def _sendback_result( - result_queue: mpq.SimpleQueue[_WorkItem], work_id: int, result: Optional[Any] = ..., exception: Optional[Exception] = ... + result_queue: mpq.SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ... ) -> None: ... if sys.version_info >= (3, 7): def _process_worker( call_queue: mpq.Queue[_CallItem], result_queue: mpq.SimpleQueue[_ResultItem], - initializer: Optional[Callable[..., None]], + initializer: Callable | None, initargs: Tuple[Any, ...], ) -> None: ... @@ -127,7 +123,7 @@ class _ExecutorManagerThread(threading.Thread): def get_n_children_alive(self) -> int: ... _system_limits_checked: bool -_system_limited: Optional[bool] +_system_limited: bool | None def _check_system_limits() -> None: ... def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ... @@ -140,8 +136,8 @@ else: class BrokenProcessPool(RuntimeError): ... class ProcessPoolExecutor(_base.Executor): - _mp_context: Optional[mpcont.BaseContext] - _initializer: Optional[Callable[..., None]] = ... + _mp_context: mpcont.BaseContext | None = ... + _initializer: Callable | None = ... _initargs: Tuple[Any, ...] = ... _executor_manager_thread: _ThreadWakeup _processes: MutableMapping[int, mpcont.Process] diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index a9ee63ba8313..07bbc1abbb19 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -4,7 +4,7 @@ import threading import weakref from collections.abc import Iterable, Mapping, Set from concurrent.futures import _base -from typing import Any, Callable, Optional, Tuple +from typing import Any, Callable, Tuple _threads_queues: Mapping[Any, Any] _shutdown: bool @@ -31,7 +31,7 @@ if sys.version_info >= (3, 7): def _worker( executor_reference: weakref.ref[Any], work_queue: queue.SimpleQueue[Any], - initializer: Optional[Callable[..., None]], + initializer: Callable[..., None], initargs: Tuple[Any, ...], ) -> None: ... @@ -49,8 +49,8 @@ class ThreadPoolExecutor(_base.Executor): _broken: bool _shutdown: bool _shutdown_lock: threading.Lock - _thread_name_prefix: Optional[str] = ... - _initializer: Optional[Callable[..., None]] = ... + _thread_name_prefix: str | None = ... + _initializer: Callable | None = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): _work_queue: queue.SimpleQueue[_WorkItem] @@ -61,7 +61,7 @@ class ThreadPoolExecutor(_base.Executor): self, max_workers: int | None = ..., thread_name_prefix: str = ..., - initializer: Callable[..., None] | None = ..., + initializer: Callable | None = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... else: From 261beda1aedd1e59b5e2549d765f9fadc16a88c3 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Sat, 28 Aug 2021 00:05:09 -0400 Subject: [PATCH 17/28] Fixed issue with using Callable definition --- stdlib/concurrent/futures/process.pyi | 4 ++-- stdlib/concurrent/futures/thread.pyi | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 7efecb56a105..f0936979950f 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -94,7 +94,7 @@ if sys.version_info >= (3, 7): def _process_worker( call_queue: mpq.Queue[_CallItem], result_queue: mpq.SimpleQueue[_ResultItem], - initializer: Callable | None, + initializer: Callable[..., None] | None, initargs: Tuple[Any, ...], ) -> None: ... @@ -137,7 +137,7 @@ else: class ProcessPoolExecutor(_base.Executor): _mp_context: mpcont.BaseContext | None = ... - _initializer: Callable | None = ... + _initializer: Callable[..., None] | None = ... _initargs: Tuple[Any, ...] = ... _executor_manager_thread: _ThreadWakeup _processes: MutableMapping[int, mpcont.Process] diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 07bbc1abbb19..9e151ff77727 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -50,7 +50,7 @@ class ThreadPoolExecutor(_base.Executor): _shutdown: bool _shutdown_lock: threading.Lock _thread_name_prefix: str | None = ... - _initializer: Callable | None = ... + _initializer: Callable[..., None] | None = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): _work_queue: queue.SimpleQueue[_WorkItem] @@ -61,7 +61,7 @@ class ThreadPoolExecutor(_base.Executor): self, max_workers: int | None = ..., thread_name_prefix: str = ..., - initializer: Callable | None = ..., + initializer: Callable[..., None] | None = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... else: From 580cdf8668ca30033966dfbb630b08190cf9ba0c Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 01:43:53 -0400 Subject: [PATCH 18/28] Fixed last seen issues as per review --- stdlib/concurrent/futures/_base.pyi | 6 +- stdlib/concurrent/futures/process.pyi | 80 +++++++++++++-------------- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index 6dd6737ba97e..38f60ccd6a54 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -17,8 +17,8 @@ RUNNING: str CANCELLED: str CANCELLED_AND_NOTIFIED: str FINISHED: str -_FUTURE_STATES: Sequence[str] -_STATE_TO_DESCRIPTION_MAP: Mapping[str, str] +_FUTURE_STATES: list[str] +_STATE_TO_DESCRIPTION_MAP: dict[str, str] LOGGER: Logger class Error(Exception): ... @@ -89,7 +89,7 @@ def wait(fs: Iterable[Future[_T]], timeout: float | None = ..., return_when: str class _Waiter: event: threading.Event - finished_futures: Sequence[Future[Any]] + finished_futures: list[Future[Any]] def __init__(self) -> None: ... def add_result(self, future: Future[Any]) -> None: ... def add_exception(self, future: Future[Any]) -> None: ... diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index f0936979950f..8f932354b743 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -1,7 +1,6 @@ -import multiprocessing as mp -import multiprocessing.connection as mpconn -import multiprocessing.context as mpcont -import multiprocessing.queues as mpq +from multiprocessing.connection import Connection +from multiprocessing.context import BaseContext, Process +from multiprocessing.queues import Queue, SimpleQueue import sys import threading import weakref @@ -10,13 +9,13 @@ from concurrent.futures import _base from types import TracebackType from typing import Any, Callable, Tuple -_threads_wakeups: Mapping[Any, Any] +_threads_wakeups: MutableMapping[Any, Any] _global_shutdown: bool class _ThreadWakeup: _closed: bool - _reader: mpconn.Connection - _writer: mpconn.Connection + _reader: Connection + _writer: Connection def __init__(self) -> None: ... def close(self) -> None: ... def wakeup(self) -> None: ... @@ -64,7 +63,7 @@ class _CallItem(object): def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... if sys.version_info >= (3, 7): - class _SafeQueue(mpq.Queue): + class _SafeQueue(Queue): pending_work_items: MutableMapping[int, _WorkItem] shutdown_lock: threading.Lock thread_wakeup: _ThreadWakeup @@ -73,54 +72,55 @@ if sys.version_info >= (3, 7): self, max_size: int | None = ..., *, - ctx: mpcont.BaseContext, + ctx: BaseContext, pending_work_items: MutableMapping[int, _WorkItem], shutdown_lock: threading.Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... else: def __init__( - self, max_size: int | None = ..., *, ctx: mpcont.BaseContext, pending_work_items: MutableMapping[int, _WorkItem] + self, max_size: int | None = ..., *, ctx: .BaseContext, pending_work_items: MutableMapping[int, _WorkItem] ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... def _sendback_result( - result_queue: mpq.SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ... + result_queue: SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ... ) -> None: ... if sys.version_info >= (3, 7): def _process_worker( - call_queue: mpq.Queue[_CallItem], - result_queue: mpq.SimpleQueue[_ResultItem], + call_queue: Queue[_CallItem], + result_queue: SimpleQueue[_ResultItem], initializer: Callable[..., None] | None, initargs: Tuple[Any, ...], ) -> None: ... else: - def _process_worker(call_queue: mpq.Queue[_CallItem], result_queue: mpq.SimpleQueue[_ResultItem]) -> None: ... - -class _ExecutorManagerThread(threading.Thread): - thread_wakeup: _ThreadWakeup - shutdown_lock: threading.Lock - executor_reference: weakref.ref[Any] - processes: MutableMapping[int, mpcont.Process] - call_queue: mpq.Queue[_CallItem] - result_queue: mpq.SimpleQueue[_ResultItem] - work_ids_queue: mpq.Queue[int] - pending_work_items: MutableMapping[int, _WorkItem] - def __init__(self, executor: ProcessPoolExecutor) -> None: ... - def run(self) -> None: ... - def add_call_item_to_queue(self) -> None: ... - def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ... - def process_result_item(self, result_item: Any) -> None: ... - def is_shutting_down(self) -> bool: ... - def terminate_broken(self, cause: str) -> None: ... - def flag_executor_shutting_down(self) -> None: ... - def shutdown_workers(self) -> None: ... - def join_executor_internals(self) -> None: ... - def get_n_children_alive(self) -> int: ... + def _process_worker(call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem]) -> None: ... + +if sys.version_info >= (3, 9): + class _ExecutorManagerThread(threading.Thread): + thread_wakeup: _ThreadWakeup + shutdown_lock: threading.Lock + executor_reference: weakref.ref[Any] + processes: MutableMapping[int, Process] + call_queue: Queue[_CallItem] + result_queue: SimpleQueue[_ResultItem] + work_ids_queue: Queue[int] + pending_work_items: MutableMapping[int, _WorkItem] + def __init__(self, executor: ProcessPoolExecutor) -> None: ... + def run(self) -> None: ... + def add_call_item_to_queue(self) -> None: ... + def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ... + def process_result_item(self, result_item: Any) -> None: ... + def is_shutting_down(self) -> bool: ... + def terminate_broken(self, cause: str) -> None: ... + def flag_executor_shutting_down(self) -> None: ... + def shutdown_workers(self) -> None: ... + def join_executor_internals(self) -> None: ... + def get_n_children_alive(self) -> int: ... _system_limits_checked: bool _system_limited: bool | None @@ -136,11 +136,11 @@ else: class BrokenProcessPool(RuntimeError): ... class ProcessPoolExecutor(_base.Executor): - _mp_context: mpcont.BaseContext | None = ... + _mp_context: BaseContext | None = ... _initializer: Callable[..., None] | None = ... _initargs: Tuple[Any, ...] = ... _executor_manager_thread: _ThreadWakeup - _processes: MutableMapping[int, mpcont.Process] + _processes: MutableMapping[int, Process] _shutdown_thread: bool _shutdown_lock: threading.Lock _idle_worker_semaphore: threading.Semaphore @@ -149,13 +149,13 @@ class ProcessPoolExecutor(_base.Executor): _pending_work_items: MutableMapping[int, _WorkItem] _cancel_pending_futures: bool _executor_manager_thread_wakeup: _ThreadWakeup - _result_queue: mpq.SimpleQueue[Any] - _work_ids: mpq.Queue[Any] + _result_queue: SimpleQueue[Any] + _work_ids: Queue[Any] if sys.version_info >= (3, 7): def __init__( self, max_workers: int | None = ..., - mp_context: mpcont.BaseContext | None = ..., + mp_context: BaseContext | None = ..., initializer: Callable[..., None] | None = ..., initargs: Tuple[Any, ...] = ..., ) -> None: ... From 563b89bd03beb038f5e9f488ea2b67c93c96c43c Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 02:12:39 -0400 Subject: [PATCH 19/28] Fixed some basic issues & more proper import calls --- stdlib/concurrent/futures/_base.pyi | 2 +- stdlib/concurrent/futures/process.pyi | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index 38f60ccd6a54..3a7b56082281 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -2,7 +2,7 @@ import sys import threading from _typeshed import Self from abc import abstractmethod -from collections.abc import Container, Iterable, Iterator, Mapping, Sequence, Set +from collections.abc import Container, Iterable, Iterator, Sequence, Set from logging import Logger from typing import Any, Callable, Generic, Protocol, TypeVar, overload diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 8f932354b743..ec7b1d84feb4 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -1,11 +1,11 @@ -from multiprocessing.connection import Connection -from multiprocessing.context import BaseContext, Process -from multiprocessing.queues import Queue, SimpleQueue import sys import threading import weakref from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence -from concurrent.futures import _base +from concurrent.futures._base import Executor, Future +from multiprocessing.connection import Connection +from multiprocessing.context import BaseContext, Process +from multiprocessing.queues import Queue, SimpleQueue from types import TracebackType from typing import Any, Callable, Tuple @@ -41,13 +41,11 @@ class _ExceptionWithTraceback: def _rebuild_exc(exc: Exception, tb: str) -> Exception: ... class _WorkItem(object): - future: _base.Future[Any] + future: Future[Any] fn: Callable[..., Any] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__( - self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any] - ) -> None: ... + def __init__(self, future: Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... class _ResultItem(object): work_id: int @@ -63,7 +61,7 @@ class _CallItem(object): def __init__(self, work_id: int, fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... if sys.version_info >= (3, 7): - class _SafeQueue(Queue): + class _SafeQueue(Queue[Future[Any]]): pending_work_items: MutableMapping[int, _WorkItem] shutdown_lock: threading.Lock thread_wakeup: _ThreadWakeup @@ -79,7 +77,7 @@ if sys.version_info >= (3, 7): ) -> None: ... else: def __init__( - self, max_size: int | None = ..., *, ctx: .BaseContext, pending_work_items: MutableMapping[int, _WorkItem] + self, max_size: int | None = ..., *, ctx: BaseContext, pending_work_items: MutableMapping[int, _WorkItem] ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... @@ -129,13 +127,13 @@ def _check_system_limits() -> None: ... def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ... if sys.version_info >= (3, 7): - from ._base import BrokenExecutor + from concurrent.futures._base import BrokenExecutor class BrokenProcessPool(BrokenExecutor): ... else: class BrokenProcessPool(RuntimeError): ... -class ProcessPoolExecutor(_base.Executor): +class ProcessPoolExecutor(Executor): _mp_context: BaseContext | None = ... _initializer: Callable[..., None] | None = ... _initargs: Tuple[Any, ...] = ... From c7fec2580e9419e21ba8b6723b2bf87dbcce1943 Mon Sep 17 00:00:00 2001 From: HunterAP23 Date: Thu, 2 Sep 2021 09:45:14 -0400 Subject: [PATCH 20/28] Update stdlib/concurrent/futures/process.pyi Co-authored-by: Sebastian Rittau --- stdlib/concurrent/futures/process.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index ec7b1d84feb4..3ec7ebde54d2 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -81,7 +81,7 @@ if sys.version_info >= (3, 7): ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... -def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... +def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any, ...], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... def _sendback_result( result_queue: SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ... From d83538c999783c18726d4dda34581dcd970f28c8 Mon Sep 17 00:00:00 2001 From: HunterAP23 Date: Thu, 2 Sep 2021 09:45:30 -0400 Subject: [PATCH 21/28] Update stdlib/concurrent/futures/process.pyi Co-authored-by: Sebastian Rittau --- stdlib/concurrent/futures/process.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 3ec7ebde54d2..40c0e148ee76 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -112,7 +112,7 @@ if sys.version_info >= (3, 9): def run(self) -> None: ... def add_call_item_to_queue(self) -> None: ... def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ... - def process_result_item(self, result_item: Any) -> None: ... + def process_result_item(self, result_item) -> None: ... def is_shutting_down(self) -> bool: ... def terminate_broken(self, cause: str) -> None: ... def flag_executor_shutting_down(self) -> None: ... From 26186c019eacb7c74690ef3c9fb0c60a270b261e Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 09:45:50 -0400 Subject: [PATCH 22/28] Minor fixes --- stdlib/concurrent/futures/process.pyi | 2 +- stdlib/concurrent/futures/thread.pyi | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index ec7b1d84feb4..3ec7ebde54d2 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -81,7 +81,7 @@ if sys.version_info >= (3, 7): ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... -def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any], None, None]: ... +def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any, ...], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... def _sendback_result( result_queue: SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ... diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 9e151ff77727..0329fc6446b4 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -3,8 +3,8 @@ import sys import threading import weakref from collections.abc import Iterable, Mapping, Set -from concurrent.futures import _base -from typing import Any, Callable, Tuple +from concurrent.futures._base import Future, Executor +from typing import Any, Callable, Generic, Tuple _threads_queues: Mapping[Any, Any] _shutdown: bool @@ -15,13 +15,15 @@ def _python_exit() -> None: ... if sys.version_info >= (3, 9): from types import GenericAlias -class _WorkItem(object): - future: _base.Future[Any] - fn: Callable[..., Any] +_S = TypeVar("_S") + +class _WorkItem(Generic[_S]): + future: Future[_S] + fn: Callable[..., _S] args: Iterable[Any] kwargs: Mapping[str, Any] def __init__( - self, future: _base.Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any] + self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any] ) -> None: ... def run(self) -> None: ... if sys.version_info >= (3, 9): @@ -39,10 +41,10 @@ else: def _worker(executor_reference: weakref.ref[Any], work_queue: queue.Queue[Any]) -> None: ... if sys.version_info >= (3, 7): - from ._base import BrokenExecutor + from concurrent.futures._base import BrokenExecutor class BrokenThreadPool(BrokenExecutor): ... -class ThreadPoolExecutor(_base.Executor): +class ThreadPoolExecutor(Executor): _max_workers: int _idle_semaphore: threading.Semaphore _threads: Set[threading.Thread] From bc4139d35518fe4757cc36d24d5a88da65a18699 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 12:44:50 -0400 Subject: [PATCH 23/28] More minor fixes --- stdlib/concurrent/futures/process.pyi | 33 +++++++++++++++------------ stdlib/concurrent/futures/thread.pyi | 13 +++++------ 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 40c0e148ee76..e11371eb0c75 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -2,12 +2,13 @@ import sys import threading import weakref from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence -from concurrent.futures._base import Executor, Future from multiprocessing.connection import Connection from multiprocessing.context import BaseContext, Process from multiprocessing.queues import Queue, SimpleQueue from types import TracebackType -from typing import Any, Callable, Tuple +from typing import Any, Callable, Generic, Tuple, TypeVar + +from ._base import Executor, Future _threads_wakeups: MutableMapping[Any, Any] _global_shutdown: bool @@ -40,20 +41,22 @@ class _ExceptionWithTraceback: def _rebuild_exc(exc: Exception, tb: str) -> Exception: ... -class _WorkItem(object): - future: Future[Any] - fn: Callable[..., Any] +_S = TypeVar("_S") + +class _WorkItem(Generic[_S]): + future: Future[_S] + fn: Callable[..., _S] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__(self, future: Future[Any], fn: Callable[..., Any], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... + def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... -class _ResultItem(object): +class _ResultItem: work_id: int exception: Exception result: Any def __init__(self, work_id: int, exception: Exception | None = ..., result: Any | None = ...) -> None: ... -class _CallItem(object): +class _CallItem: work_id: int fn: Callable[..., Any] args: Iterable[Any] @@ -62,7 +65,7 @@ class _CallItem(object): if sys.version_info >= (3, 7): class _SafeQueue(Queue[Future[Any]]): - pending_work_items: MutableMapping[int, _WorkItem] + pending_work_items: dict[int, _WorkItem[Any]] shutdown_lock: threading.Lock thread_wakeup: _ThreadWakeup if sys.version_info >= (3, 9): @@ -71,20 +74,20 @@ if sys.version_info >= (3, 7): max_size: int | None = ..., *, ctx: BaseContext, - pending_work_items: MutableMapping[int, _WorkItem], + pending_work_items: dict[int, _WorkItem[Any]], shutdown_lock: threading.Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... else: def __init__( - self, max_size: int | None = ..., *, ctx: BaseContext, pending_work_items: MutableMapping[int, _WorkItem] + self, max_size: int | None = ..., *, ctx: BaseContext, pending_work_items: dict[int, _WorkItem[Any]] ) -> None: ... def _on_queue_feeder_error(self, e: Exception, obj: _CallItem) -> None: ... def _get_chunks(*iterables: Any, chunksize: int) -> Generator[Tuple[Any, ...], None, None]: ... def _process_chunk(fn: Callable[..., Any], chunk: Tuple[Any, None, None]) -> Generator[Any, None, None]: ... def _sendback_result( - result_queue: SimpleQueue[_WorkItem], work_id: int, result: Any | None = ..., exception: Exception | None = ... + result_queue: SimpleQueue[_WorkItem[Any]], work_id: int, result: Any | None = ..., exception: Exception | None = ... ) -> None: ... if sys.version_info >= (3, 7): @@ -107,7 +110,7 @@ if sys.version_info >= (3, 9): call_queue: Queue[_CallItem] result_queue: SimpleQueue[_ResultItem] work_ids_queue: Queue[int] - pending_work_items: MutableMapping[int, _WorkItem] + pending_work_items: dict[int, _WorkItem[Any]] def __init__(self, executor: ProcessPoolExecutor) -> None: ... def run(self) -> None: ... def add_call_item_to_queue(self) -> None: ... @@ -127,7 +130,7 @@ def _check_system_limits() -> None: ... def _chain_from_iterable_of_lists(iterable: Iterable[MutableSequence[Any]]) -> Any: ... if sys.version_info >= (3, 7): - from concurrent.futures._base import BrokenExecutor + from ._base import BrokenExecutor class BrokenProcessPool(BrokenExecutor): ... else: @@ -144,7 +147,7 @@ class ProcessPoolExecutor(Executor): _idle_worker_semaphore: threading.Semaphore _broken: bool _queue_count: int - _pending_work_items: MutableMapping[int, _WorkItem] + _pending_work_items: dict[int, _WorkItem[Any]] _cancel_pending_futures: bool _executor_manager_thread_wakeup: _ThreadWakeup _result_queue: SimpleQueue[Any] diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index 0329fc6446b4..d5c72bd3c9df 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -3,8 +3,9 @@ import sys import threading import weakref from collections.abc import Iterable, Mapping, Set -from concurrent.futures._base import Future, Executor -from typing import Any, Callable, Generic, Tuple +from typing import Any, Callable, Generic, Tuple, TypeVar + +from ._base import Executor, Future _threads_queues: Mapping[Any, Any] _shutdown: bool @@ -22,9 +23,7 @@ class _WorkItem(Generic[_S]): fn: Callable[..., _S] args: Iterable[Any] kwargs: Mapping[str, Any] - def __init__( - self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any] - ) -> None: ... + def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ... def run(self) -> None: ... if sys.version_info >= (3, 9): def __class_getitem__(cls, item: Any) -> GenericAlias: ... @@ -41,7 +40,7 @@ else: def _worker(executor_reference: weakref.ref[Any], work_queue: queue.Queue[Any]) -> None: ... if sys.version_info >= (3, 7): - from concurrent.futures._base import BrokenExecutor + from ._base import BrokenExecutor class BrokenThreadPool(BrokenExecutor): ... class ThreadPoolExecutor(Executor): @@ -55,7 +54,7 @@ class ThreadPoolExecutor(Executor): _initializer: Callable[..., None] | None = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): - _work_queue: queue.SimpleQueue[_WorkItem] + _work_queue: queue.SimpleQueue[_WorkItem[Any]] else: _work_queue: queue.Queue[_WorkItem] if sys.version_info >= (3, 7): From 7bf42263d076434fbdeeab1211e27a108ff3cc10 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 13:57:05 -0400 Subject: [PATCH 24/28] Fixed up some issues & cleaned up imports --- stdlib/concurrent/futures/process.pyi | 22 +++++++++++----------- stdlib/concurrent/futures/thread.pyi | 18 +++++++++--------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index e11371eb0c75..4b218c12784c 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -1,12 +1,12 @@ import sys -import threading -import weakref from collections.abc import Generator, Iterable, Mapping, MutableMapping, MutableSequence from multiprocessing.connection import Connection from multiprocessing.context import BaseContext, Process from multiprocessing.queues import Queue, SimpleQueue +from threading import Lock, Semaphore, Thread from types import TracebackType -from typing import Any, Callable, Generic, Tuple, TypeVar +from typing import Any, Callable, Generic, Tuple, TypeVar, Union +from weakref import ref from ._base import Executor, Future @@ -66,7 +66,7 @@ class _CallItem: if sys.version_info >= (3, 7): class _SafeQueue(Queue[Future[Any]]): pending_work_items: dict[int, _WorkItem[Any]] - shutdown_lock: threading.Lock + shutdown_lock: Lock thread_wakeup: _ThreadWakeup if sys.version_info >= (3, 9): def __init__( @@ -75,7 +75,7 @@ if sys.version_info >= (3, 7): *, ctx: BaseContext, pending_work_items: dict[int, _WorkItem[Any]], - shutdown_lock: threading.Lock, + shutdown_lock: Lock, thread_wakeup: _ThreadWakeup, ) -> None: ... else: @@ -102,10 +102,10 @@ else: def _process_worker(call_queue: Queue[_CallItem], result_queue: SimpleQueue[_ResultItem]) -> None: ... if sys.version_info >= (3, 9): - class _ExecutorManagerThread(threading.Thread): + class _ExecutorManagerThread(Thread): thread_wakeup: _ThreadWakeup - shutdown_lock: threading.Lock - executor_reference: weakref.ref[Any] + shutdown_lock: Lock + executor_reference: ref[Any] processes: MutableMapping[int, Process] call_queue: Queue[_CallItem] result_queue: SimpleQueue[_ResultItem] @@ -115,7 +115,7 @@ if sys.version_info >= (3, 9): def run(self) -> None: ... def add_call_item_to_queue(self) -> None: ... def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ... - def process_result_item(self, result_item) -> None: ... + def process_result_item(self, result_item: Union[int, _ResultItem]) -> None: ... def is_shutting_down(self) -> bool: ... def terminate_broken(self, cause: str) -> None: ... def flag_executor_shutting_down(self) -> None: ... @@ -143,8 +143,8 @@ class ProcessPoolExecutor(Executor): _executor_manager_thread: _ThreadWakeup _processes: MutableMapping[int, Process] _shutdown_thread: bool - _shutdown_lock: threading.Lock - _idle_worker_semaphore: threading.Semaphore + _shutdown_lock: Lock + _idle_worker_semaphore: Semaphore _broken: bool _queue_count: int _pending_work_items: dict[int, _WorkItem[Any]] diff --git a/stdlib/concurrent/futures/thread.pyi b/stdlib/concurrent/futures/thread.pyi index d5c72bd3c9df..7a35bfc6ed77 100644 --- a/stdlib/concurrent/futures/thread.pyi +++ b/stdlib/concurrent/futures/thread.pyi @@ -1,15 +1,15 @@ import queue import sys -import threading -import weakref from collections.abc import Iterable, Mapping, Set +from threading import Lock, Semaphore, Thread from typing import Any, Callable, Generic, Tuple, TypeVar +from weakref import ref from ._base import Executor, Future _threads_queues: Mapping[Any, Any] _shutdown: bool -_global_shutdown_lock: threading.Lock +_global_shutdown_lock: Lock def _python_exit() -> None: ... @@ -30,14 +30,14 @@ class _WorkItem(Generic[_S]): if sys.version_info >= (3, 7): def _worker( - executor_reference: weakref.ref[Any], + executor_reference: ref[Any], work_queue: queue.SimpleQueue[Any], initializer: Callable[..., None], initargs: Tuple[Any, ...], ) -> None: ... else: - def _worker(executor_reference: weakref.ref[Any], work_queue: queue.Queue[Any]) -> None: ... + def _worker(executor_reference: ref[Any], work_queue: queue.Queue[Any]) -> None: ... if sys.version_info >= (3, 7): from ._base import BrokenExecutor @@ -45,18 +45,18 @@ if sys.version_info >= (3, 7): class ThreadPoolExecutor(Executor): _max_workers: int - _idle_semaphore: threading.Semaphore - _threads: Set[threading.Thread] + _idle_semaphore: Semaphore + _threads: Set[Thread] _broken: bool _shutdown: bool - _shutdown_lock: threading.Lock + _shutdown_lock: Lock _thread_name_prefix: str | None = ... _initializer: Callable[..., None] | None = ... _initargs: Tuple[Any, ...] = ... if sys.version_info >= (3, 7): _work_queue: queue.SimpleQueue[_WorkItem[Any]] else: - _work_queue: queue.Queue[_WorkItem] + _work_queue: queue.Queue[_WorkItem[Any]] if sys.version_info >= (3, 7): def __init__( self, From 9bae25fe71cc41994c12077fc061d08d27204511 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 14:01:00 -0400 Subject: [PATCH 25/28] Removed usage of Union --- stdlib/concurrent/futures/process.pyi | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/concurrent/futures/process.pyi b/stdlib/concurrent/futures/process.pyi index 4b218c12784c..4ae791361bb1 100644 --- a/stdlib/concurrent/futures/process.pyi +++ b/stdlib/concurrent/futures/process.pyi @@ -5,7 +5,7 @@ from multiprocessing.context import BaseContext, Process from multiprocessing.queues import Queue, SimpleQueue from threading import Lock, Semaphore, Thread from types import TracebackType -from typing import Any, Callable, Generic, Tuple, TypeVar, Union +from typing import Any, Callable, Generic, Tuple, TypeVar from weakref import ref from ._base import Executor, Future @@ -115,7 +115,7 @@ if sys.version_info >= (3, 9): def run(self) -> None: ... def add_call_item_to_queue(self) -> None: ... def wait_result_broken_or_wakeup(self) -> tuple[Any, bool, str]: ... - def process_result_item(self, result_item: Union[int, _ResultItem]) -> None: ... + def process_result_item(self, result_item: int | _ResultItem) -> None: ... def is_shutting_down(self) -> bool: ... def terminate_broken(self, cause: str) -> None: ... def flag_executor_shutting_down(self) -> None: ... From b350059b7d95898eb4db033918fd17848e4baa39 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 14:32:33 -0400 Subject: [PATCH 26/28] Changed wait method to use Set of Future to work with mypy-primer for Optuna repo --- stdlib/concurrent/futures/_base.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index 3a7b56082281..728bbd1a5c13 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -85,7 +85,7 @@ class DoneAndNotDoneFutures(Sequence[Set[Future[_T]]]): @overload def __getitem__(self, s: slice) -> DoneAndNotDoneFutures[_T]: ... -def wait(fs: Iterable[Future[_T]], timeout: float | None = ..., return_when: str = ...) -> DoneAndNotDoneFutures[_T]: ... +def wait(fs: Set[Future[_T]], timeout: float | None = ..., return_when: str = ...) -> DoneAndNotDoneFutures[_T]: ... class _Waiter: event: threading.Event From 001c56e667311559528404975096c4f5c36e1984 Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 15:10:37 -0400 Subject: [PATCH 27/28] Reverted change to wait method and DoneAndNotDoneFutures class --- stdlib/concurrent/futures/_base.pyi | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index 728bbd1a5c13..f6037ad03c07 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -75,7 +75,7 @@ class Executor: def as_completed(fs: Iterable[Future[_T]], timeout: float | None = ...) -> Iterator[Future[_T]]: ... # Ideally this would be a namedtuple, but mypy doesn't support generic tuple types. See #1976 -class DoneAndNotDoneFutures(Sequence[Set[Future[_T]]]): +class DoneAndNotDoneFutures(Sequence[_T]): done: Set[Future[_T]] not_done: Set[Future[_T]] def __new__(_cls, done: Set[Future[_T]], not_done: Set[Future[_T]]) -> DoneAndNotDoneFutures[_T]: ... @@ -85,7 +85,7 @@ class DoneAndNotDoneFutures(Sequence[Set[Future[_T]]]): @overload def __getitem__(self, s: slice) -> DoneAndNotDoneFutures[_T]: ... -def wait(fs: Set[Future[_T]], timeout: float | None = ..., return_when: str = ...) -> DoneAndNotDoneFutures[_T]: ... +def wait(fs: Iterable[Future[_T]], timeout: float | None = ..., return_when: str = ...) -> DoneAndNotDoneFutures[_T]: ... class _Waiter: event: threading.Event From d7020fef0df6939dfd185a6a73a826648e642b8d Mon Sep 17 00:00:00 2001 From: HunterAP Date: Thu, 2 Sep 2021 15:15:34 -0400 Subject: [PATCH 28/28] Fixed DoneAndNotDoneFutures again --- stdlib/concurrent/futures/_base.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/concurrent/futures/_base.pyi b/stdlib/concurrent/futures/_base.pyi index f6037ad03c07..3a7b56082281 100644 --- a/stdlib/concurrent/futures/_base.pyi +++ b/stdlib/concurrent/futures/_base.pyi @@ -75,7 +75,7 @@ class Executor: def as_completed(fs: Iterable[Future[_T]], timeout: float | None = ...) -> Iterator[Future[_T]]: ... # Ideally this would be a namedtuple, but mypy doesn't support generic tuple types. See #1976 -class DoneAndNotDoneFutures(Sequence[_T]): +class DoneAndNotDoneFutures(Sequence[Set[Future[_T]]]): done: Set[Future[_T]] not_done: Set[Future[_T]] def __new__(_cls, done: Set[Future[_T]], not_done: Set[Future[_T]]) -> DoneAndNotDoneFutures[_T]: ...