From e958fbc8cb638b9b405b87446222fa837575b779 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 11 Sep 2023 15:54:25 -0600 Subject: [PATCH 1/9] Add RecvChannel.close() and SendChannel.close(). --- Lib/test/support/interpreters.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index eeff3abe0324e5..21731bc19b8f64 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -171,6 +171,9 @@ def recv_nowait(self, default=_NOT_SET): else: return _channels.recv(self._id, default) + def close(self): + _channels.close(self._id, recv=True) + class SendChannel(_ChannelEnd): """The sending end of a cross-interpreter channel.""" @@ -196,3 +199,6 @@ def send_nowait(self, obj): # None. This should be fixed when channel_send_wait() is added. # See bpo-32604 and gh-19829. return _channels.send(self._id, obj) + + def close(self): + _channels.close(self._id, send=True) From 74b3003f0a666e74e277b3d705d8ba57990c3775 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 11 Sep 2023 16:11:38 -0600 Subject: [PATCH 2/9] Expose additional exception types. --- Lib/test/support/interpreters.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 21731bc19b8f64..d497e508a6b401 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -7,7 +7,8 @@ # aliases: from _xxsubinterpreters import is_shareable from _xxinterpchannels import ( - ChannelError, ChannelNotFoundError, ChannelEmptyError, + ChannelError, ChannelNotFoundError, ChannelClosedError, + ChannelEmptyError, ChannelNotEmptyError, ) From e424ab556ef40ed12007fbe158e76077f916e404 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 12 Sep 2023 08:55:09 -0600 Subject: [PATCH 3/9] Call _channels._channel_id() in _ChannelEnd.__init__(). --- Lib/test/support/interpreters.py | 21 +++++++++++++++++---- Modules/_xxinterpchannelsmodule.c | 25 +++++++++++++++---------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index d497e508a6b401..a13ab0e17c7487 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -118,10 +118,16 @@ def list_all_channels(): class _ChannelEnd: """The base class for RecvChannel and SendChannel.""" - def __init__(self, id): - if not isinstance(id, (int, _channels.ChannelID)): - raise TypeError(f'id must be an int, got {id!r}') - self._id = id + _end = None + + def __init__(self, cid): + if self._end == 'send': + cid = _channels._channel_id(cid, send=True) + elif self._end == 'recv': + cid = _channels._channel_id(cid, recv=True) + else: + raise NotImplementedError(self._end) + self._id = cid def __repr__(self): return f'{type(self).__name__}(id={int(self._id)})' @@ -148,6 +154,8 @@ def id(self): class RecvChannel(_ChannelEnd): """The receiving end of a cross-interpreter channel.""" + _end = 'recv' + def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds """Return the next object from the channel. @@ -179,6 +187,8 @@ def close(self): class SendChannel(_ChannelEnd): """The sending end of a cross-interpreter channel.""" + _end = 'send' + def send(self, obj): """Send the object (i.e. its data) to the channel's receiving end. @@ -203,3 +213,6 @@ def send_nowait(self, obj): def close(self): _channels.close(self._id, send=True) + + +_channels._register_end_types(SendChannel, RecvChannel) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 6096f88421a73a..bf23b011240036 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -1529,17 +1529,20 @@ typedef struct channelid { struct channel_id_converter_data { PyObject *module; int64_t cid; + int end; }; static int channel_id_converter(PyObject *arg, void *ptr) { int64_t cid; + int end = 0; struct channel_id_converter_data *data = ptr; module_state *state = get_module_state(data->module); assert(state != NULL); if (PyObject_TypeCheck(arg, state->ChannelIDType)) { cid = ((channelid *)arg)->id; + end = ((channelid *)arg)->end; } else if (PyIndex_Check(arg)) { cid = PyLong_AsLongLong(arg); @@ -1559,6 +1562,7 @@ channel_id_converter(PyObject *arg, void *ptr) return 0; } data->cid = cid; + data->end = end; return 1; } @@ -1600,6 +1604,7 @@ _channelid_new(PyObject *mod, PyTypeObject *cls, { static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL}; int64_t cid; + int end; struct channel_id_converter_data cid_data = { .module = mod, }; @@ -1614,6 +1619,7 @@ _channelid_new(PyObject *mod, PyTypeObject *cls, return NULL; } cid = cid_data.cid; + end = cid_data.end; // Handle "send" and "recv". if (send == 0 && recv == 0) { @@ -1621,14 +1627,17 @@ _channelid_new(PyObject *mod, PyTypeObject *cls, "'send' and 'recv' cannot both be False"); return NULL; } - - int end = 0; - if (send == 1) { + else if (send == 1) { if (recv == 0 || recv == -1) { end = CHANNEL_SEND; } + else { + assert(recv == 1); + end = 0; + } } else if (recv == 1) { + assert(send == 0 || send == -1); end = CHANNEL_RECV; } @@ -2346,13 +2355,9 @@ channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } PyTypeObject *cls = state->ChannelIDType; - PyObject *mod = get_module_from_owned_type(cls); - if (mod == NULL) { - return NULL; - } - PyObject *cid = _channelid_new(mod, cls, args, kwds); - Py_DECREF(mod); - return cid; + assert(get_module_from_owned_type(cls) == self); + + return _channelid_new(self, cls, args, kwds); } static PyMethodDef module_functions[] = { From 90d024d399d5a66b7184b3c300984abbc7406d86 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 12 Sep 2023 08:58:00 -0600 Subject: [PATCH 4/9] Factor out _get_current_module_state(). --- Modules/_xxinterpchannelsmodule.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index bf23b011240036..cf37b708682665 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -218,6 +218,21 @@ get_module_state(PyObject *mod) return state; } +static module_state * +_get_current_module_state(void) +{ + PyObject *mod = _get_current_module(); + if (mod == NULL) { + // XXX import it? + PyErr_SetString(PyExc_RuntimeError, + MODULE_NAME " module not imported yet"); + return NULL; + } + module_state *state = get_module_state(mod); + Py_DECREF(mod); + return state; +} + static int traverse_module_state(module_state *state, visitproc visit, void *arg) { From 996596ba9fe0902703a47c0f91151608aa69f616 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 12 Sep 2023 09:00:16 -0600 Subject: [PATCH 5/9] Make RecvChannel and SendChannel shareable. --- Modules/_xxinterpchannelsmodule.c | 122 ++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index cf37b708682665..92cb41baafbe6b 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -198,6 +198,9 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags) /* module state *************************************************************/ typedef struct { + PyTypeObject *send_channel_type; + PyTypeObject *recv_channel_type; + /* heap types */ PyTypeObject *ChannelIDType; @@ -252,6 +255,9 @@ traverse_module_state(module_state *state, visitproc visit, void *arg) static int clear_module_state(module_state *state) { + Py_CLEAR(state->send_channel_type); + Py_CLEAR(state->recv_channel_type); + /* heap types */ if (state->ChannelIDType != NULL) { (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType); @@ -1967,6 +1973,91 @@ static PyType_Spec ChannelIDType_spec = { }; +/* SendChannel and RecvChannel classes */ + +// XXX Use a new __xid__ protocol instead? + +static PyTypeObject * +_get_current_channel_end_type(int end) +{ + module_state *state = _get_current_module_state(); + if (state == NULL) { + return NULL; + } + PyTypeObject *cls; + if (end == CHANNEL_SEND) { + cls = state->send_channel_type; + } + else { + assert(end == CHANNEL_RECV); + cls = state->recv_channel_type; + } + if (cls == NULL) { + // XXX Use some other exception type? + PyErr_SetString(PyExc_RuntimeError, "interpreters module not imported yet"); + return NULL; + } + return cls; +} + +static PyObject * +_channel_end_from_xid(_PyCrossInterpreterData *data) +{ + channelid *cid = (channelid *)_channelid_from_xid(data); + if (cid == NULL) { + return NULL; + } + PyTypeObject *cls = _get_current_channel_end_type(cid->end); + if (cls == NULL) { + return NULL; + } + PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)cid); + Py_DECREF(cid); + return obj; +} + +static int +_channel_end_shared(PyThreadState *tstate, PyObject *obj, + _PyCrossInterpreterData *data) +{ + PyObject *cidobj = PyObject_GetAttrString(obj, "_id"); + if (cidobj == NULL) { + return -1; + } + if (_channelid_shared(tstate, cidobj, data) < 0) { + return -1; + } + data->new_object = _channel_end_from_xid; + return 0; +} + +static int +set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv) +{ + module_state *state = get_module_state(mod); + if (state == NULL) { + return -1; + } + + if (state->send_channel_type != NULL + || state->recv_channel_type != NULL) + { + PyErr_SetString(PyExc_TypeError, "already registered"); + return -1; + } + state->send_channel_type = (PyTypeObject *)Py_NewRef(send); + state->recv_channel_type = (PyTypeObject *)Py_NewRef(recv); + + if (_PyCrossInterpreterData_RegisterClass(send, _channel_end_shared)) { + return -1; + } + if (_PyCrossInterpreterData_RegisterClass(recv, _channel_end_shared)) { + return -1; + } + + return 0; +} + /* module level code ********************************************************/ /* globals is the process-global state for the module. It holds all @@ -2375,6 +2466,35 @@ channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds) return _channelid_new(self, cls, args, kwds); } +static PyObject * +channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"send", "recv", NULL}; + PyObject *send; + PyObject *recv; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "OO:_register_end_types", kwlist, + &send, &recv)) { + return NULL; + } + if (!PyType_Check(send)) { + PyErr_SetString(PyExc_TypeError, "expected a type for 'send'"); + return NULL; + } + if (!PyType_Check(recv)) { + PyErr_SetString(PyExc_TypeError, "expected a type for 'recv'"); + return NULL; + } + PyTypeObject *cls_send = (PyTypeObject *)send; + PyTypeObject *cls_recv = (PyTypeObject *)recv; + + if (set_channel_end_types(self, cls_send, cls_recv) < 0) { + return NULL; + } + + Py_RETURN_NONE; +} + static PyMethodDef module_functions[] = { {"create", channel_create, METH_NOARGS, channel_create_doc}, @@ -2394,6 +2514,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, channel_release_doc}, {"_channel_id", _PyCFunction_CAST(channel__channel_id), METH_VARARGS | METH_KEYWORDS, NULL}, + {"_register_end_types", _PyCFunction_CAST(channel__register_end_types), + METH_VARARGS | METH_KEYWORDS, NULL}, {NULL, NULL} /* sentinel */ }; From 56fec100f8402b4efc9f65bfe9d60c07ccb6fb7d Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 19 Sep 2023 10:12:52 -0600 Subject: [PATCH 6/9] Import the interpreters module if needed. --- Modules/_xxinterpchannelsmodule.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 92cb41baafbe6b..c2228835cdc3c3 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -1993,9 +1993,21 @@ _get_current_channel_end_type(int end) cls = state->recv_channel_type; } if (cls == NULL) { - // XXX Use some other exception type? - PyErr_SetString(PyExc_RuntimeError, "interpreters module not imported yet"); - return NULL; + PyObject *highlevel = PyImport_ImportModule("interpreters"); + if (highlevel == NULL) { + PyErr_Clear(); + highlevel = PyImport_ImportModule("test.support.interpreters"); + if (highlevel == NULL) { + return NULL; + } + } + if (end == CHANNEL_SEND) { + cls = state->send_channel_type; + } + else { + cls = state->recv_channel_type; + } + assert(cls != NULL); } return cls; } From b3f29ceaa82074e3fba6c3e77e14520ced7921b6 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 19 Sep 2023 10:16:45 -0600 Subject: [PATCH 7/9] Use _get_current_channel_end_type() in _channel_from_cid(). --- Modules/_xxinterpchannelsmodule.c | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index c2228835cdc3c3..d5be76f1f0e38e 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -1803,21 +1803,12 @@ channelid_richcompare(PyObject *self, PyObject *other, int op) return res; } +static PyTypeObject * _get_current_channel_end_type(int end); + static PyObject * _channel_from_cid(PyObject *cid, int end) { - PyObject *highlevel = PyImport_ImportModule("interpreters"); - if (highlevel == NULL) { - PyErr_Clear(); - highlevel = PyImport_ImportModule("test.support.interpreters"); - if (highlevel == NULL) { - return NULL; - } - } - const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" : - "SendChannel"; - PyObject *cls = PyObject_GetAttrString(highlevel, clsname); - Py_DECREF(highlevel); + PyObject *cls = (PyObject *)_get_current_channel_end_type(end); if (cls == NULL) { return NULL; } From cac4a7686ffffe5b877391ce5470eb09129863a5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 2 Oct 2023 13:35:33 -0600 Subject: [PATCH 8/9] Add tests. --- Lib/test/test_interpreters.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py index 9c0dac7d6c61fb..8aa6d5b83604db 100644 --- a/Lib/test/test_interpreters.py +++ b/Lib/test/test_interpreters.py @@ -574,6 +574,22 @@ def test_list_all(self): after = set(interpreters.list_all_channels()) self.assertEqual(after, created) + def test_shareable(self): + rch, sch = interpreters.create_channel() + + self.assertTrue( + interpreters.is_shareable(rch)) + self.assertTrue( + interpreters.is_shareable(sch)) + + sch.send_nowait(rch) + sch.send_nowait(sch) + rch2 = rch.recv() + sch2 = rch.recv() + + self.assertEqual(rch2, rch) + self.assertEqual(sch2, sch) + class TestRecvChannelAttrs(TestBase): From a8f6e5e63856a9eac1f6f7ccd3adff90bd26ed13 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 2 Oct 2023 13:58:57 -0600 Subject: [PATCH 9/9] Fix _ChannelEnd.__init__(). --- Lib/test/support/interpreters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index a13ab0e17c7487..d2beba31e80283 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -122,9 +122,9 @@ class _ChannelEnd: def __init__(self, cid): if self._end == 'send': - cid = _channels._channel_id(cid, send=True) + cid = _channels._channel_id(cid, send=True, force=True) elif self._end == 'recv': - cid = _channels._channel_id(cid, recv=True) + cid = _channels._channel_id(cid, recv=True, force=True) else: raise NotImplementedError(self._end) self._id = cid