From d379eb6e557c70e4c8478db6e999d4d4275c29a0 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 18:54:07 -0600 Subject: [PATCH 1/7] Add an arg converter for threading timeouts. --- Include/pythread.h | 6 ++++++ Python/thread.c | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/Include/pythread.h b/Include/pythread.h index bb9d86412218ad..82c6b060da0c32 100644 --- a/Include/pythread.h +++ b/Include/pythread.h @@ -162,6 +162,12 @@ PyAPI_FUNC(int) PyThread_tss_set(Py_tss_t *key, void *value); PyAPI_FUNC(void *) PyThread_tss_get(Py_tss_t *key); #endif /* New in 3.7 */ +#ifndef Py_LIMITED_API +PyAPI_DATA(const PY_TIMEOUT_T) _PyThread_TIMEOUT_NOT_SET; +// This is for use with PyArg_ParseTupleAndKeywords(): +PyAPI_FUNC(int) _PyThread_timeout_arg_converter(PyObject *, void *); +#endif + #ifdef __cplusplus } #endif diff --git a/Python/thread.c b/Python/thread.c index a10f5728dc0ceb..b94c86d7ae9d44 100644 --- a/Python/thread.c +++ b/Python/thread.c @@ -224,3 +224,32 @@ PyThread_GetInfo(void) PyStructSequence_SET_ITEM(threadinfo, pos++, value); return threadinfo; } + +#define TIMEOUT_NOT_SET _PYTIME_FROMSECONDS(-1) +// This is equivalent to _PyTime_AsMicroseconds(timeout_not_set): +const PY_TIMEOUT_T _PyThread_TIMEOUT_NOT_SET = (TIMEOUT_NOT_SET - 999) / 1000; + +int +_PyThread_timeout_arg_converter(PyObject *arg, void *ptr) +{ + if (arg == NULL) { + *(PY_TIMEOUT_T *)ptr = _PyThread_TIMEOUT_NOT_SET; + return 0; + } + + _PyTime_t time = TIMEOUT_NOT_SET; + if (_PyTime_FromSecondsObject(&time, arg, _PyTime_ROUND_TIMEOUT) < 0) { + return -1; + } + if (time < 0 && time != TIMEOUT_NOT_SET) { + PyErr_SetString(PyExc_ValueError, "timeout value must be positive"); + return -1; + } + PY_TIMEOUT_T timeout = _PyTime_AsMicroseconds(time, _PyTime_ROUND_TIMEOUT); + if (timeout >= PY_TIMEOUT_MAX) { + PyErr_SetString(PyExc_OverflowError, "timeout value is too large"); + return -1; + } + *(PY_TIMEOUT_T *)ptr = timeout; + return 0; +} From 7a80a2914617d0d7ed1885877038553cc93d0be6 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 19:41:04 -0600 Subject: [PATCH 2/7] Add _Py_DECREF_in_interpreter(). --- Include/cpython/pystate.h | 4 +++ Python/pystate.c | 69 +++++++++++++++++++++++++-------------- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index f292da1d3c6c5e..9da089f36a1cd8 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -199,6 +199,10 @@ PyAPI_FUNC(const PyConfig*) _PyInterpreterState_GetConfig(PyInterpreterState *in PyAPI_FUNC(const PyConfig*) _Py_GetConfig(void); +/* cross-interpreter operations */ + +PyAPI_FUNC(int) _Py_DECREF_in_interpreter(PyInterpreterState *, PyObject *); + /* cross-interpreter data */ struct _xid; diff --git a/Python/pystate.c b/Python/pystate.c index dd95750027241b..89510b6f5c8b16 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1447,6 +1447,46 @@ PyGILState_Release(PyGILState_STATE oldstate) /* cross-interpreter data */ /**************************/ +/* cross-interpreter operations */ + +static void +_call_in_interpreter(struct _gilstate_runtime_state *gilstate, + PyInterpreterState *interp, + void (*func)(void *), void *arg) +{ + /* We would use Py_AddPendingCall() if it weren't specific to the + * main interpreter (see bpo-33608). In the meantime we take a + * naive approach. + */ + PyThreadState *save_tstate = NULL; + if (interp != _PyRuntimeGILState_GetThreadState(gilstate)->interp) { + // XXX Using the "head" thread isn't strictly correct. + PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); + // XXX Possible GILState issues? + save_tstate = _PyThreadState_Swap(gilstate, tstate); + } + + func(arg); + + // Switch back. + if (save_tstate != NULL) { + _PyThreadState_Swap(gilstate, save_tstate); + } +} + +static int +_decref_pyobj(void *obj) +{ + Py_DECREF(obj); + return 0; +} + +int +_Py_DECREF_in_interpreter(PyInterpreterState *interp, PyObject *obj) +{ + return _PyEval_AddPendingCall(interp, _decref_pyobj, obj); +} + /* cross-interpreter data */ crossinterpdatafunc _PyCrossInterpreterData_Lookup(PyObject *); @@ -1539,31 +1579,6 @@ _release_xidata(void *arg) Py_XDECREF(data->obj); } -static void -_call_in_interpreter(struct _gilstate_runtime_state *gilstate, - PyInterpreterState *interp, - void (*func)(void *), void *arg) -{ - /* We would use Py_AddPendingCall() if it weren't specific to the - * main interpreter (see bpo-33608). In the meantime we take a - * naive approach. - */ - PyThreadState *save_tstate = NULL; - if (interp != _PyRuntimeGILState_GetThreadState(gilstate)->interp) { - // XXX Using the "head" thread isn't strictly correct. - PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); - // XXX Possible GILState issues? - save_tstate = _PyThreadState_Swap(gilstate, tstate); - } - - func(arg); - - // Switch back. - if (save_tstate != NULL) { - _PyThreadState_Swap(gilstate, save_tstate); - } -} - void _PyCrossInterpreterData_Release(_PyCrossInterpreterData *data) { @@ -1796,6 +1811,10 @@ _register_builtins_for_crossinterpreter_data(struct _xidregistry *xidregistry) } } +/******************************/ +/* END cross-interpreter data */ +/******************************/ + _PyFrameEvalFunction _PyInterpreterState_GetEvalFrameFunc(PyInterpreterState *interp) From a3c2678d7537b0310885d862450a4e7698726e23 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 18:56:18 -0600 Subject: [PATCH 3/7] Add a lock wrapper class. --- Modules/_xxsubinterpretersmodule.c | 135 +++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 15e80559ec6f60..05cc4401eec1f5 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -3,6 +3,7 @@ /* low-level access to interpreter primitives */ #include "Python.h" +#include "pythread.h" #include "frameobject.h" #include "interpreteridobject.h" @@ -276,6 +277,137 @@ _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass) } } +/* locks */ + +typedef struct _lockobj { + PyObject_HEAD + PyThread_type_lock lock; + PyInterpreterState *owner; + int done; +} _lockobj; + +static int +_lockobj_init(_lockobj *lock) +{ + lock->lock = PyThread_allocate_lock(); + if (lock->lock == NULL) { + return -1; + } + lock->done = 0; + lock->owner = _get_current(); + return 0; +} + +static void +_lockobj_free(_lockobj *lock) +{ + PyThread_free_lock(lock->lock); + PyObject_Del(lock); +} + +// This is cross-interpreter safe. +static int +_lockobj_acquire(_lockobj *lock) +{ + // Do not wait. + return PyThread_acquire_lock(lock->lock, 0); +} + +// This is cross-interpreter safe. +static void +_lockobj_release(_lockobj *lock) +{ + PyThread_release_lock(lock->lock); +} + +static PyObject * +_lockobj_call(PyObject *self, PyObject *args, PyObject *kwargs) +{ + static char *kwlist[] = {"timeout", NULL}; + PY_TIMEOUT_T timeout = _PyThread_TIMEOUT_NOT_SET; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O&:__call__", kwlist, + _PyThread_timeout_arg_converter, + &timeout)) { + return NULL; + } + _lockobj *lock = (_lockobj *)self; + + if (lock->done) { + Py_RETURN_TRUE; + } + + // Wait for the lock to be released. + _PyTime_t end = timeout > 0 ? _PyTime_GetMonotonicClock() + timeout : 0; + PyLockStatus r = PY_LOCK_FAILURE; + do { + _PyTime_t microseconds = _PyTime_AsMicroseconds(timeout, + _PyTime_ROUND_CEILING); + + /* first a simple non-blocking try without releasing the GIL */ + r = PyThread_acquire_lock_timed(lock->lock, 0, 0); + if (r == PY_LOCK_FAILURE && microseconds != 0) { + Py_BEGIN_ALLOW_THREADS + r = PyThread_acquire_lock_timed(lock, microseconds, 0); + Py_END_ALLOW_THREADS + } + + if (r == PY_LOCK_INTR) { + /* Run signal handlers if we were interrupted. Propagate + * exceptions from signal handlers, such as KeyboardInterrupt, by + * passing up PY_LOCK_INTR. */ + if (Py_MakePendingCalls() < 0) { + return NULL; + } + + /* If we're using a timeout, recompute the timeout after processing + * signals, since those can take time. */ + if (timeout > 0) { + timeout = end - _PyTime_GetMonotonicClock(); + + /* Check for negative values, since those mean block forever. + */ + if (timeout < 0) { + r = PY_LOCK_FAILURE; + } + } + } + } while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */ + if (r == PY_LOCK_FAILURE) { + Py_RETURN_FALSE; + } + + // Success! + _lockobj_release(lock); + lock->done = 1; + Py_RETURN_TRUE; +} + +static PyTypeObject _lockobjtype = { + PyVarObject_HEAD_INIT(&PyType_Type, 0) + .tp_name = "_xxsubinterpreters.lock", + .tp_doc = PyDoc_STR("a basic waitable wrapper around a mutex"), + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_basicsize = sizeof(_lockobj), + // functionality + .tp_new = NULL, // It cannot be instantiated from Python code. + .tp_dealloc = (destructor)_lockobj_free, + .tp_call = _lockobj_call, +}; + +static _lockobj * +_lockobj_new(void) +{ + _lockobj *lock = PyObject_New(_lockobj, &_lockobjtype); + if (lock == NULL) { + return NULL; + } + if (_lockobj_init(lock) != 0) { + PyMem_Free(lock); + return NULL; + } + return lock; +} + /* channel-specific code ****************************************************/ @@ -2618,6 +2750,9 @@ PyInit__xxsubinterpreters(void) if (PyType_Ready(&ChannelIDtype) != 0) { return NULL; } + if (PyType_Ready(&_lockobjtype) != 0) { + return NULL; + } /* Create the module */ PyObject *module = PyModule_Create(&interpretersmodule); From 98433f592bb834819d3d79751221cef0da2730b4 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 19:22:51 -0600 Subject: [PATCH 4/7] Factor out _lockobj_free(). --- Modules/_xxsubinterpretersmodule.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 05cc4401eec1f5..9059ecc42a90c1 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -299,7 +299,7 @@ _lockobj_init(_lockobj *lock) } static void -_lockobj_free(_lockobj *lock) +_lockobj_dealloc(_lockobj *lock) { PyThread_free_lock(lock->lock); PyObject_Del(lock); @@ -390,7 +390,7 @@ static PyTypeObject _lockobjtype = { .tp_basicsize = sizeof(_lockobj), // functionality .tp_new = NULL, // It cannot be instantiated from Python code. - .tp_dealloc = (destructor)_lockobj_free, + .tp_dealloc = (destructor)_lockobj_dealloc, .tp_call = _lockobj_call, }; @@ -408,6 +408,18 @@ _lockobj_new(void) return lock; } +static void +_lockobj_free(_lockobj *lock) +{ + _lockobj_release(lock); + if (lock->owner == NULL || lock->owner == _get_current()) { + Py_DECREF(lock); + } else { + int res = _Py_DECREF_in_interpreter(lock->owner, (PyObject *)lock); + assert(res == 0); + } +} + /* channel-specific code ****************************************************/ From b60ff284c0fb99550175d1cba49243f6261750ab Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 19:24:20 -0600 Subject: [PATCH 5/7] Add _channelitem.recvlock. --- Modules/_xxsubinterpretersmodule.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 9059ecc42a90c1..efdd7fa83ebe11 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -497,6 +497,7 @@ struct _channelitem; typedef struct _channelitem { _PyCrossInterpreterData *data; + _lockobj *recvlock; struct _channelitem *next; } _channelitem; @@ -509,6 +510,7 @@ _channelitem_new(void) return NULL; } item->data = NULL; + item->recvlock = NULL; item->next = NULL; return item; } @@ -521,6 +523,9 @@ _channelitem_clear(_channelitem *item) PyMem_Free(item->data); item->data = NULL; } + if (item->recvlock != NULL) { + _lockobj_free(item->recvlock); + } item->next = NULL; } @@ -546,6 +551,7 @@ _channelitem_popped(_channelitem *item) { _PyCrossInterpreterData *data = item->data; item->data = NULL; + // The lock (if any) is released here: _channelitem_free(item); return data; } From 070f508d4cb953f7c95580a0a103029fc8feca69 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 19:26:24 -0600 Subject: [PATCH 6/7] Pass recvlock through from _channel_send() to the channelitem. --- Modules/_xxsubinterpretersmodule.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index efdd7fa83ebe11..d71b7010f3e056 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -593,13 +593,15 @@ _channelqueue_free(_channelqueue *queue) } static int -_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) +_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, + _lockobj *recvlock) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; + item->recvlock = recvlock; queue->count += 1; if (queue->first == NULL) { @@ -911,7 +913,7 @@ _channel_free(_PyChannelState *chan) static int _channel_add(_PyChannelState *chan, int64_t interp, - _PyCrossInterpreterData *data) + _PyCrossInterpreterData *data, _lockobj *recvlock) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -924,7 +926,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; } - if (_channelqueue_put(chan->queue, data) != 0) { + if (_channelqueue_put(chan->queue, data, recvlock) != 0) { goto done; } @@ -1430,7 +1432,8 @@ _channel_destroy(_channels *channels, int64_t id) } static int -_channel_send(_channels *channels, int64_t id, PyObject *obj) +_channel_send(_channels *channels, int64_t id, PyObject *obj, + _lockobj *recvlock) { PyInterpreterState *interp = _get_current(); if (interp == NULL) { @@ -1464,7 +1467,8 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj) } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, + recvlock); PyThread_release_lock(mutex); if (res != 0) { _PyCrossInterpreterData_Release(data); @@ -2562,7 +2566,7 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - if (_channel_send(&_globals.channels, cid, obj) != 0) { + if (_channel_send(&_globals.channels, cid, obj, NULL) != 0) { return NULL; } Py_RETURN_NONE; From bdc89265099c138cabae9e65c5d0bd5a75a7a741 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 30 Apr 2020 19:26:55 -0600 Subject: [PATCH 7/7] Add channel_send_wait(). --- Modules/_xxsubinterpretersmodule.c | 39 ++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index d71b7010f3e056..f3713d72eb48a5 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -2577,6 +2577,43 @@ PyDoc_STRVAR(channel_send_doc, \n\ Add the object's data to the channel's queue."); +static PyObject * +channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "obj", NULL}; + int64_t cid; + PyObject *obj; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist, + channel_id_converter, &cid, &obj)) { + return NULL; + } + + _lockobj *lock = _lockobj_new(); + if (lock == NULL) { + return NULL; + } + if (_lockobj_acquire(lock) != 0) { + PyErr_SetString(PyExc_RuntimeError, "could not acquire lock"); + _lockobj_dealloc(lock); + return NULL; + } + if (_channel_send(&_globals.channels, cid, obj, lock) != 0) { + _lockobj_dealloc(lock); + return NULL; + } + Py_INCREF(lock); + return (PyObject*)lock; +} + +PyDoc_STRVAR(channel_send_wait_doc, +"channel_send_wait(cid, obj)\n\ +\n\ +Add the object's data to the channel's queue.\n\ +\n\ +The returned callable will block until the object is received.\n\ +Note that it takes an optional 'timeout' arg like\n\ +threading.Lock.acquire() does."); + static PyObject * channel_recv(PyObject *self, PyObject *args, PyObject *kwds) { @@ -2729,6 +2766,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc}, {"channel_send", (PyCFunction)(void(*)(void))channel_send, METH_VARARGS | METH_KEYWORDS, channel_send_doc}, + {"channel_send_wait", (PyCFunction)(void(*)(void))channel_send_wait, + METH_VARARGS | METH_KEYWORDS, channel_send_wait_doc}, {"channel_recv", (PyCFunction)(void(*)(void))channel_recv, METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, {"channel_close", (PyCFunction)(void(*)(void))channel_close,