From c6e31fada3af7d94715e3e08047aa8aa38e0ad12 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Tue, 14 Feb 2012 02:27:52 -0200 Subject: [PATCH 01/12] Start replacing select.select() for select.poll() --- supervisor/supervisord.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index 85b82d16f..2ac5271aa 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -182,6 +182,7 @@ def runforever(self): timeout = 1 # this cannot be fewer than the smallest TickEvent (5) socket_map = self.options.get_socket_map() + poller = select.poll() while 1: combined_map = {} @@ -214,8 +215,18 @@ def runforever(self): if dispatcher.writable(): w.append(fd) + # fixme: find out the bitmask + # http://docs.python.org/library/select.html#select.poll.register + # default is: POLLIN | POLLPRI | POLLOUT + # + # I don't think there is a problem on registering the same file + # descriptor multiple times, since all it does is add the file + # descriptor to an internal dict, so duplicate registers just + # override existing ones + poller.register(fd) + try: - r, w, x = self.options.select(r, w, x, timeout) + poll_events = poller.poll(timeout * 1000) except select.error, err: r = w = x = [] if err[0] == errno.EINTR: @@ -223,6 +234,12 @@ def runforever(self): else: raise + for fd, bitmask in poll_events: + if bitmask & select.POLLIN or bitmask & select.POLLPRI: + r.append(fd) + if bitmask & select.POLLOUT: + w.append(fd) + for fd in r: if combined_map.has_key(fd): try: From 9d6cdf76697b07dc327f05bbd8805220510ae53f Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Thu, 16 Feb 2012 01:48:39 -0200 Subject: [PATCH 02/12] Move poll() logic to it's own class, and adapt Supervisor to use it. Also fixing the poll() usage, my previous commit was registering all file descriptors to all events, now register just on the expected events. --- supervisor/poller.py | 39 ++++++++++++ supervisor/supervisord.py | 36 ++--------- supervisor/tests/test_poller.py | 94 ++++++++++++++++++++++++++++ supervisor/tests/test_supervisord.py | 37 +++++------ 4 files changed, 156 insertions(+), 50 deletions(-) create mode 100644 supervisor/poller.py create mode 100644 supervisor/tests/test_poller.py diff --git a/supervisor/poller.py b/supervisor/poller.py new file mode 100644 index 000000000..0deb33f27 --- /dev/null +++ b/supervisor/poller.py @@ -0,0 +1,39 @@ +import select +import errno + +class Poller: + ''' + Wrapper for select.poll() + ''' + + READ = select.POLLIN | select.POLLPRI + WRITE = select.POLLOUT + + def __init__(self, options): + self.options = options + self._poller = select.poll() + + def register_readable(self, fd): + self._poller.register(fd, self.READ) + + def register_writable(self, fd): + self._poller.register(fd, self.WRITE) + + def poll(self, timeout): + fds = self._poll_fds(timeout) + readables, writables = [], [] + for fd, eventmask in fds: + if eventmask & self.READ: + readables.append(fd) + if eventmask & self.WRITE: + writables.append(fd) + return readables, writables + + def _poll_fds(self, timeout): + try: + return self._poller.poll(timeout) + except select.error, err: + if err[0] == errno.EINTR: + self.options.logger.blather('EINTR encountered in select') + return [] + raise diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index 2ac5271aa..c316e2da5 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -43,6 +43,7 @@ from supervisor import events from supervisor.states import SupervisorStates from supervisor.states import getProcessStateDescription +from supervisor.poller import Poller class Supervisor: stopping = False # set after we detect that we are handling a stop request @@ -52,6 +53,7 @@ class Supervisor: def __init__(self, options): self.options = options + self.poller = Poller(options) self.process_groups = {} self.ticks = {} @@ -182,7 +184,6 @@ def runforever(self): timeout = 1 # this cannot be fewer than the smallest TickEvent (5) socket_map = self.options.get_socket_map() - poller = select.poll() while 1: combined_map = {} @@ -207,38 +208,13 @@ def runforever(self): # killing everything), it's OK to swtop or reload raise asyncore.ExitNow - r, w, x = [], [], [] - for fd, dispatcher in combined_map.items(): if dispatcher.readable(): - r.append(fd) + self.poller.register_readable(fd) if dispatcher.writable(): - w.append(fd) - - # fixme: find out the bitmask - # http://docs.python.org/library/select.html#select.poll.register - # default is: POLLIN | POLLPRI | POLLOUT - # - # I don't think there is a problem on registering the same file - # descriptor multiple times, since all it does is add the file - # descriptor to an internal dict, so duplicate registers just - # override existing ones - poller.register(fd) - - try: - poll_events = poller.poll(timeout * 1000) - except select.error, err: - r = w = x = [] - if err[0] == errno.EINTR: - self.options.logger.blather('EINTR encountered in select') - else: - raise - - for fd, bitmask in poll_events: - if bitmask & select.POLLIN or bitmask & select.POLLPRI: - r.append(fd) - if bitmask & select.POLLOUT: - w.append(fd) + self.poller.register_writable(fd) + + r, w = self.poller.poll(timeout * 1000) for fd in r: if combined_map.has_key(fd): diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py new file mode 100644 index 000000000..7a7a9a6e0 --- /dev/null +++ b/supervisor/tests/test_poller.py @@ -0,0 +1,94 @@ +import sys +import unittest +import errno +import select + +from supervisor.poller import Poller +from supervisor.tests.base import DummyOptions + +class PollerTests(unittest.TestCase): + + def _makeOne(self, options): + return Poller(options) + + def test_register_readable(self): + select_poll = DummySelectPoll() + poller = self._makeOne(DummyOptions()) + poller._poller = select_poll + poller.register_readable(6) + poller.register_readable(7) + self.assertEqual(select_poll.registered_as_readable, [6,7]) + + def test_register_writable(self): + select_poll = DummySelectPoll() + poller = self._makeOne(DummyOptions()) + poller._poller = select_poll + poller.register_writable(6) + self.assertEqual(select_poll.registered_as_writable, [6]) + + def test_poll_returns_readables_and_writables(self): + select_poll = DummySelectPoll(result={'readables': [6,7], 'writables': [7,8]}) + poller = self._makeOne(DummyOptions()) + poller._poller = select_poll + poller.register_readable(6) + poller.register_readable(7) + poller.register_writable(8) + readables, writables = poller.poll(1000) + self.assertEqual(readables, [6,7]) + self.assertEqual(writables, [7,8]) + + def test_poll_ignore_eintr(self): + select_poll = DummySelectPoll(error=errno.EINTR) + options = DummyOptions() + poller = self._makeOne(options) + poller._poller = select_poll + poller.register_readable(9) + poller.poll(1000) + self.assertEqual(options.logger.data[0], 'EINTR encountered in select') + + def test_poll_uncaught_exception(self): + select_poll = DummySelectPoll(error=errno.EBADF) + options = DummyOptions() + poller = self._makeOne(options) + poller._poller = select_poll + poller.register_readable(9) + self.assertRaises(select.error, poller.poll, (1000,)) + + +class DummySelectPoll: + def __init__(self, result=None, error=None): + self.result = result or {'readables': [], 'writables': []} + self.error = error + self.registered_as_readable = [] + self.registered_as_writable = [] + + def register(self, fd, eventmask): + if eventmask == Poller.READ: + self.registered_as_readable.append(fd) + elif eventmask == Poller.WRITE: + self.registered_as_writable.append(fd) + else: + raise ValueError("Registered a fd on unknown eventmask: '{0}'".format(eventmask)) + + def poll(self, timeout): + self._raise_error_if_defined() + return self._format_expected_result() + + def _raise_error_if_defined(self): + if self.error: + raise select.error(self.error) + + def _format_expected_result(self): + fds = [] + for fd in self.result['readables']: + fds.append((fd, Poller.READ)) + for fd in self.result['writables']: + fds.append((fd, Poller.WRITE)) + return fds + + +def test_suite(): + return unittest.findTestCases(sys.modules[__name__]) + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') diff --git a/supervisor/tests/test_supervisord.py b/supervisor/tests/test_supervisord.py index 9bc5322f4..64c2204f5 100644 --- a/supervisor/tests/test_supervisord.py +++ b/supervisor/tests/test_supervisord.py @@ -371,26 +371,10 @@ def test_runforever_calls_tick(self): supervisord.runforever() self.assertEqual(len(supervisord.ticks), 3) - def test_runforever_select_eintr(self): - options = DummyOptions() - import errno - options.select_error = errno.EINTR - supervisord = self._makeOne(options) - options.test = True - supervisord.runforever() - self.assertEqual(options.logger.data[0], 'EINTR encountered in select') - - def test_runforever_select_uncaught_exception(self): - options = DummyOptions() - import errno - options.select_error = errno.EBADF - supervisord = self._makeOne(options) - import select - options.test = True - self.assertRaises(select.error, supervisord.runforever) - - def test_runforever_select_dispatchers(self): + def test_runforever_poll_dispatchers(self): options = DummyOptions() + poller = DummyPoller(options) + poller.result = [6], [7, 8] supervisord = self._makeOne(options) pconfig = DummyPConfig(options, 'foo', '/bin/foo',) process = DummyProcess(pconfig) @@ -401,7 +385,7 @@ def test_runforever_select_dispatchers(self): error = DummyDispatcher(writable=True, error=OSError) pgroup.dispatchers = {6:readable, 7:writable, 8:error} supervisord.process_groups = {'foo': pgroup} - options.select_result = [6], [7, 8], [] + supervisord.poller = poller options.test = True supervisord.runforever() self.assertEqual(pgroup.transitioned, True) @@ -521,6 +505,19 @@ def callback(event): self.assertEqual(len(L), 6) self.assertEqual(L[-1].__class__, events.Tick3600Event) +class DummyPoller: + def __init__(self, options): + self.result = [] + + def register_readable(self, fd): + pass + + def register_writable(self, fd): + pass + + def poll(self, timeout): + return self.result + def test_suite(): return unittest.findTestCases(sys.modules[__name__]) From 41152394076426273459f5daac560708b6ead6b7 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Thu, 16 Feb 2012 22:25:07 -0200 Subject: [PATCH 03/12] Remove select() method from Options class used by Supervisor --- supervisor/options.py | 4 ---- supervisor/tests/base.py | 8 -------- supervisor/tests/test_supervisord.py | 4 +++- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/supervisor/options.py b/supervisor/options.py index c9f3afb87..8be7d6dc6 100644 --- a/supervisor/options.py +++ b/supervisor/options.py @@ -13,7 +13,6 @@ import resource import stat import pkg_resources -import select import glob import platform import warnings @@ -1156,9 +1155,6 @@ def cleanup_fds(self): except OSError: pass - def select(self, r, w, x, timeout): - return select.select(r, w, x, timeout) - def kill(self, pid, signal): os.kill(pid, signal) diff --git a/supervisor/tests/base.py b/supervisor/tests/base.py index 51de48f6c..10d8b0b69 100644 --- a/supervisor/tests/base.py +++ b/supervisor/tests/base.py @@ -54,8 +54,6 @@ def __init__(self): self.privsdropped = None self.logs_reopened = False self.environment_processed = False - self.select_result = [], [], [] - self.select_error = None self.write_accept = None self.write_error = None self.tempfile_name = '/foo/bar' @@ -220,12 +218,6 @@ def process_environment(self): def mktempfile(self, prefix, suffix, dir): return self.tempfile_name - def select(self, r, w, x, timeout): - import select - if self.select_error: - raise select.error(self.select_error) - return self.select_result - def remove(self, path): import os if self.remove_error: diff --git a/supervisor/tests/test_supervisord.py b/supervisor/tests/test_supervisord.py index 64c2204f5..308b707fe 100644 --- a/supervisor/tests/test_supervisord.py +++ b/supervisor/tests/test_supervisord.py @@ -395,6 +395,8 @@ def test_runforever_poll_dispatchers(self): def test_runforever_select_dispatcher_exitnow(self): options = DummyOptions() + poller = DummyPoller(options) + poller.result = [6], [] supervisord = self._makeOne(options) pconfig = DummyPConfig(options, 'foo', '/bin/foo',) process = DummyProcess(pconfig) @@ -404,7 +406,7 @@ def test_runforever_select_dispatcher_exitnow(self): exitnow = DummyDispatcher(readable=True, error=asyncore.ExitNow) pgroup.dispatchers = {6:exitnow} supervisord.process_groups = {'foo': pgroup} - options.select_result = [6], [], [] + supervisord.poller = poller options.test = True self.assertRaises(asyncore.ExitNow, supervisord.runforever) From 33fc3071cb98e650631f6a00b825222c2bf9fc39 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Fri, 17 Feb 2012 01:14:28 -0200 Subject: [PATCH 04/12] Ignores and unregisters closed file descriptors on poll() --- supervisor/poller.py | 16 +++++++++++++++ supervisor/tests/test_poller.py | 35 +++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 0deb33f27..31447d7d7 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -1,5 +1,6 @@ import select import errno +import time class Poller: ''' @@ -19,10 +20,15 @@ def register_readable(self, fd): def register_writable(self, fd): self._poller.register(fd, self.WRITE) + def unregister(self, fd): + self._poller.unregister(fd) + def poll(self, timeout): fds = self._poll_fds(timeout) readables, writables = [], [] for fd, eventmask in fds: + if self._ignore_invalid(fd, eventmask): + continue if eventmask & self.READ: readables.append(fd) if eventmask & self.WRITE: @@ -37,3 +43,13 @@ def _poll_fds(self, timeout): self.options.logger.blather('EINTR encountered in select') return [] raise + + def _ignore_invalid(self, fd, eventmask): + if eventmask & select.POLLNVAL: + # POLLNVAL means `fd` value is invalid, not open. + # When a process quits it's `fd`s are closed so there + # is no more reason to keep this `fd` registered + # If the process restarts it's `fd`s are registered again + self.unregister(fd) + return True + return False diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index 7a7a9a6e0..0e6d838d1 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -27,7 +27,9 @@ def test_register_writable(self): self.assertEqual(select_poll.registered_as_writable, [6]) def test_poll_returns_readables_and_writables(self): - select_poll = DummySelectPoll(result={'readables': [6,7], 'writables': [7,8]}) + select_poll = DummySelectPoll(result=[(6, Poller.READ), + (7, Poller.READ), + (8, Poller.WRITE)]) poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_readable(6) @@ -35,7 +37,7 @@ def test_poll_returns_readables_and_writables(self): poller.register_writable(8) readables, writables = poller.poll(1000) self.assertEqual(readables, [6,7]) - self.assertEqual(writables, [7,8]) + self.assertEqual(writables, [8]) def test_poll_ignore_eintr(self): select_poll = DummySelectPoll(error=errno.EINTR) @@ -54,13 +56,24 @@ def test_poll_uncaught_exception(self): poller.register_readable(9) self.assertRaises(select.error, poller.poll, (1000,)) + def test_poll_ignores_and_unregisters_closed_fd(self): + select_poll = DummySelectPoll(result=[(6, select.POLLNVAL), + (7, Poller.READ)]) + poller = self._makeOne(DummyOptions()) + poller._poller = select_poll + poller.register_readable(6) + poller.register_readable(7) + readables, writables = poller.poll(1000) + self.assertEqual(readables, [7]) + self.assertEqual(select_poll.unregistered, [6]) class DummySelectPoll: def __init__(self, result=None, error=None): - self.result = result or {'readables': [], 'writables': []} + self.result = result or [] self.error = error self.registered_as_readable = [] self.registered_as_writable = [] + self.unregistered = [] def register(self, fd, eventmask): if eventmask == Poller.READ: @@ -70,21 +83,13 @@ def register(self, fd, eventmask): else: raise ValueError("Registered a fd on unknown eventmask: '{0}'".format(eventmask)) - def poll(self, timeout): - self._raise_error_if_defined() - return self._format_expected_result() + def unregister(self, fd): + self.unregistered.append(fd) - def _raise_error_if_defined(self): + def poll(self, timeout): if self.error: raise select.error(self.error) - - def _format_expected_result(self): - fds = [] - for fd in self.result['readables']: - fds.append((fd, Poller.READ)) - for fd in self.result['writables']: - fds.append((fd, Poller.WRITE)) - return fds + return self.result def test_suite(): From 09c70c70de52199a2b463b7f6cf1c914fa88503e Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Wed, 7 Mar 2012 22:31:31 -0300 Subject: [PATCH 05/12] Adjust log message to be more explicit --- supervisor/poller.py | 2 +- supervisor/tests/test_poller.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 31447d7d7..09e0ac18c 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -40,7 +40,7 @@ def _poll_fds(self, timeout): return self._poller.poll(timeout) except select.error, err: if err[0] == errno.EINTR: - self.options.logger.blather('EINTR encountered in select') + self.options.logger.blather('EINTR encountered in poll') return [] raise diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index 0e6d838d1..431b5197a 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -46,7 +46,7 @@ def test_poll_ignore_eintr(self): poller._poller = select_poll poller.register_readable(9) poller.poll(1000) - self.assertEqual(options.logger.data[0], 'EINTR encountered in select') + self.assertEqual(options.logger.data[0], 'EINTR encountered in poll') def test_poll_uncaught_exception(self): select_poll = DummySelectPoll(error=errno.EBADF) From aaea2736a8b18fefde4d57a0261e94a4e7872cf3 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Wed, 7 Mar 2012 22:42:54 -0300 Subject: [PATCH 06/12] Handle POLLHUP on poll() --- supervisor/poller.py | 2 +- supervisor/tests/test_poller.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 09e0ac18c..c1da83055 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -7,7 +7,7 @@ class Poller: Wrapper for select.poll() ''' - READ = select.POLLIN | select.POLLPRI + READ = select.POLLIN | select.POLLPRI | select.POLLHUP WRITE = select.POLLOUT def __init__(self, options): diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index 431b5197a..09a891234 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -27,16 +27,18 @@ def test_register_writable(self): self.assertEqual(select_poll.registered_as_writable, [6]) def test_poll_returns_readables_and_writables(self): - select_poll = DummySelectPoll(result=[(6, Poller.READ), - (7, Poller.READ), - (8, Poller.WRITE)]) + select_poll = DummySelectPoll(result=[(6, select.POLLIN), + (7, select.POLLPRI), + (8, select.POLLOUT), + (9, select.POLLHUP)]) poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_readable(6) poller.register_readable(7) poller.register_writable(8) + poller.register_readable(9) readables, writables = poller.poll(1000) - self.assertEqual(readables, [6,7]) + self.assertEqual(readables, [6,7,9]) self.assertEqual(writables, [8]) def test_poll_ignore_eintr(self): From 0bf114540af82d76227a07282d6049ae16d52a07 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Sun, 22 Apr 2012 02:19:14 -0300 Subject: [PATCH 07/12] Add kqueue() support for poller --- supervisor/poller.py | 81 +++++++++++++++++++++-- supervisor/supervisord.py | 2 +- supervisor/tests/test_poller.py | 111 ++++++++++++++++++++++++++++++-- 3 files changed, 185 insertions(+), 9 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index c1da83055..71f987257 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -1,8 +1,31 @@ import select import errno import time +import signal -class Poller: +class BasePoller: + + def __init__(self, options): + self.options = options + self.initialize() + + def initialize(self): + pass + + def register_readable(self, fd): + raise NotImplementedError + + def register_writable(self, fd): + raise NotImplementedError + + def unregister(self, fd): + raise NotImplementedError + + def poll(self, timeout): + raise NotImplementedError + + +class PollPoller(BasePoller): ''' Wrapper for select.poll() ''' @@ -10,8 +33,7 @@ class Poller: READ = select.POLLIN | select.POLLPRI | select.POLLHUP WRITE = select.POLLOUT - def __init__(self, options): - self.options = options + def initialize(self): self._poller = select.poll() def register_readable(self, fd): @@ -37,7 +59,7 @@ def poll(self, timeout): def _poll_fds(self, timeout): try: - return self._poller.poll(timeout) + return self._poller.poll(timeout * 1000) except select.error, err: if err[0] == errno.EINTR: self.options.logger.blather('EINTR encountered in poll') @@ -53,3 +75,54 @@ def _ignore_invalid(self, fd, eventmask): self.unregister(fd) return True return False + + +class KQueuePoller(BasePoller): + ''' + Wrapper for select.kqueue()/kevent() + ''' + + max_events = 1000 + + def initialize(self): + self._kqueue = select.kqueue() + + def register_readable(self, fd): + kevent = select.kevent(fd, filter=select.KQ_FILTER_READ, + flags=select.KQ_EV_ADD) + self._kqueue.control([kevent], 0) + + def register_writable(self, fd): + kevent = select.kevent(fd, filter=select.KQ_FILTER_WRITE, + flags=select.KQ_EV_ADD) + self._kqueue.control([kevent], 0) + + def unregister(self, fd): + kevent = select.kevent(fd, filter=(select.KQ_FILTER_READ | select.KQ_FILTER_WRITE), + flags=select.KQ_EV_DELETE) + self._kqueue.control([kevent], 0) + + def poll(self, timeout): + readables, writables = [], [] + + try: + kevents = self._kqueue.control(None, self.max_events, timeout) + except OSError, error: + if error.errno == errno.EINTR: + self.options.logger.blather('EINTR encountered in poll') + return readables, writables + raise + + for kevent in kevents: + if kevent.filter == select.KQ_FILTER_READ: + readables.append(kevent.ident) + if kevent.filter == select.KQ_FILTER_WRITE: + writables.append(kevent.ident) + + return readables, writables + + +if hasattr(select, "kqueue"): + Poller = KQueuePoller +else: + Poller = PollPoller diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index c316e2da5..a6daefb80 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -214,7 +214,7 @@ def runforever(self): if dispatcher.writable(): self.poller.register_writable(fd) - r, w = self.poller.poll(timeout * 1000) + r, w = self.poller.poll(timeout) for fd in r: if combined_map.has_key(fd): diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index 09a891234..5d77ca09a 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -3,10 +3,73 @@ import errno import select -from supervisor.poller import Poller +from supervisor.poller import Poller, KQueuePoller from supervisor.tests.base import DummyOptions -class PollerTests(unittest.TestCase): +class KQueuePollerTests(unittest.TestCase): + + def _makeOne(self, options): + return KQueuePoller(options) + + def test_register_readable(self): + kqueue = DummyKQueue() + poller = self._makeOne(DummyOptions()) + poller._kqueue = kqueue + poller.register_readable(6) + self.assertEqual(len(kqueue.registered_kevents), 1) + self.assertReadEventAdded(kqueue, kqueue.registered_kevents[0], 6) + + def test_register_writable(self): + kqueue = DummyKQueue() + poller = self._makeOne(DummyOptions()) + poller._kqueue = kqueue + poller.register_writable(7) + self.assertEqual(len(kqueue.registered_kevents), 1) + self.assertWriteEventAdded(kqueue, kqueue.registered_kevents[0], 7) + + def test_poll_returns_readables_and_writables(self): + kqueue = DummyKQueue(result=[(6, select.KQ_FILTER_READ), + (7, select.KQ_FILTER_READ), + (8, select.KQ_FILTER_WRITE)]) + poller = self._makeOne(DummyOptions()) + poller._kqueue = kqueue + poller.register_readable(6) + poller.register_readable(7) + poller.register_writable(8) + readables, writables = poller.poll(1000) + self.assertEqual(readables, [6,7]) + self.assertEqual(writables, [8]) + + def test_poll_ignores_eintr(self): + kqueue = DummyKQueue(raise_errno=errno.EINTR) + options = DummyOptions() + poller = self._makeOne(options) + poller._kqueue = kqueue + poller.register_readable(6) + poller.poll(1000) + self.assertEqual(options.logger.data[0], 'EINTR encountered in poll') + + def test_poll_uncaught_exception(self): + kqueue = DummyKQueue(raise_errno=errno.EBADF) + options = DummyOptions() + poller = self._makeOne(options) + poller._kqueue = kqueue + poller.register_readable(6) + self.assertRaises(OSError, poller.poll, (1000,)) + + def assertReadEventAdded(self, kqueue, kevent, fd): + self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_READ) + + def assertWriteEventAdded(self, kqueue, kevent, fd): + self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_WRITE) + + def assertEventAdded(self, kqueue, kevent, fd, filter_spec): + self.assertEqual(kevent.ident, fd) + self.assertEqual(kevent.filter, filter_spec) + self.assertEqual(kevent.flags, select.KQ_EV_ADD) + + +class PollerPollTests(unittest.TestCase): def _makeOne(self, options): return Poller(options) @@ -41,7 +104,7 @@ def test_poll_returns_readables_and_writables(self): self.assertEqual(readables, [6,7,9]) self.assertEqual(writables, [8]) - def test_poll_ignore_eintr(self): + def test_poll_ignores_eintr(self): select_poll = DummySelectPoll(error=errno.EINTR) options = DummyOptions() poller = self._makeOne(options) @@ -69,7 +132,7 @@ def test_poll_ignores_and_unregisters_closed_fd(self): self.assertEqual(readables, [7]) self.assertEqual(select_poll.unregistered, [6]) -class DummySelectPoll: +class DummySelectPoll(object): def __init__(self, result=None, error=None): self.result = result or [] self.error = error @@ -94,6 +157,46 @@ def poll(self, timeout): return self.result +class DummyKQueue(object): + def __init__(self, result=None, raise_errno=None): + self.result = result or [] + self.errno = raise_errno + self.registered_kevents = [] + self.registered_flags = [] + + def control(self, kevents, max_events, timeout=None): + if kevents is None: # being called on poll() + self.assert_max_events_on_poll(max_events) + self.raise_error() + return self.build_result() + + self.assert_max_events_on_register(max_events) + self.registered_kevents.extend(kevents) + + def raise_error(self): + if self.errno: + ex = OSError() + ex.errno = self.errno + raise ex + + def build_result(self): + return [FakeKEvent(ident, filter) for ident,filter in self.result] + + def assert_max_events_on_poll(self, max_events): + assert max_events == KQueuePoller.max_events, ( + "`max_events` parameter of `kqueue.control() should be %d" + % KQueuePoller.max_events) + + def assert_max_events_on_register(self, max_events): + assert max_events == 0, ( + "`max_events` parameter of `kqueue.control()` should be 0 on register") + +class FakeKEvent(object): + def __init__(self, ident, filter): + self.ident = ident + self.filter = filter + + def test_suite(): return unittest.findTestCases(sys.modules[__name__]) From c58271a5f750fecf564b796718bff888bc97876d Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Sun, 17 Jun 2012 23:47:44 -0300 Subject: [PATCH 08/12] Add select() support for poller when no poll() or kqueue() is available. Make all tests pass for py>=2.5 --- supervisor/poller.py | 56 ++++++++++++++--- supervisor/tests/test_poller.py | 106 +++++++++++++++++++++++++++++--- 2 files changed, 147 insertions(+), 15 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 71f987257..81fcb205d 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -24,17 +24,49 @@ def unregister(self, fd): def poll(self, timeout): raise NotImplementedError +class SelectPoller(BasePoller): -class PollPoller(BasePoller): - ''' - Wrapper for select.poll() - ''' + def initialize(self): + self._select = select + self.readable = [] + self.writable = [] + + def register_readable(self, fd): + self.readable.append(fd) - READ = select.POLLIN | select.POLLPRI | select.POLLHUP - WRITE = select.POLLOUT + def register_writable(self, fd): + self.writable.append(fd) + + def unregister(self, fd): + if fd in self.readable: + self.readable.remove(fd) + if fd in self.writable: + self.writable.remove(fd) + + def unregister_all(self): + self.writable = [] + self.readable = [] + + def poll(self, timeout): + try: + r, w, x = self._select.select(self.readable, self.writable, [], timeout) + except select.error, err: + if err[0] == errno.EINTR: + self.options.logger.blather('EINTR encountered in poll') + return [], [] + if err[0] == errno.EBADF: + self.options.logger.blather('EBADF encountered in poll') + self.unregister_all() + return [], [] + raise + return r, w + +class PollPoller(BasePoller): def initialize(self): self._poller = select.poll() + self.READ = select.POLLIN | select.POLLPRI | select.POLLHUP + self.WRITE = select.POLLOUT def register_readable(self, fd): self._poller.register(fd, self.READ) @@ -122,7 +154,15 @@ def poll(self, timeout): return readables, writables -if hasattr(select, "kqueue"): +def implements_poll(): + return hasattr(select, 'poll') + +def implements_kqueue(): + return hasattr(select, 'kqueue') + +if implements_kqueue(): Poller = KQueuePoller -else: +elif implements_poll(): Poller = PollPoller +else: + Poller = SelectPoller diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index 5d77ca09a..b79f14a96 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -3,10 +3,76 @@ import errno import select -from supervisor.poller import Poller, KQueuePoller +from supervisor.poller import SelectPoller, PollPoller, KQueuePoller +from supervisor.poller import implements_poll, implements_kqueue from supervisor.tests.base import DummyOptions -class KQueuePollerTests(unittest.TestCase): +# this base class is used instead of unittest.TestCase to hide +# a TestCase subclass from test runner when the implementation is +# not available +SkipTestCase = object + +class SelectPollerTests(unittest.TestCase): + + def _makeOne(self, options): + return SelectPoller(options) + + def test_register_readable(self): + poller = self._makeOne(DummyOptions()) + poller.register_readable(6) + poller.register_readable(7) + self.assertEqual(poller.readable, [6,7]) + + def test_register_writable(self): + poller = self._makeOne(DummyOptions()) + poller.register_writable(6) + poller.register_writable(7) + self.assertEqual(poller.writable, [6,7]) + + def test_poll_returns_readables_and_writables(self): + _select = DummySelect(result={'readables': [6], + 'writables': [8]}) + poller = self._makeOne(DummyOptions()) + poller._select = _select + poller.register_readable(6) + poller.register_readable(7) + poller.register_writable(8) + readables, writables = poller.poll(1) + self.assertEqual(readables, [6]) + self.assertEqual(writables, [8]) + + def test_poll_ignores_eintr(self): + _select = DummySelect(error=errno.EINTR) + options = DummyOptions() + poller = self._makeOne(options) + poller._select = _select + poller.register_readable(6) + poller.poll(1) + self.assertEqual(options.logger.data[0], 'EINTR encountered in poll') + + def test_poll_ignores_ebadf(self): + _select = DummySelect(error=errno.EBADF) + options = DummyOptions() + poller = self._makeOne(options) + poller._select = _select + poller.register_readable(6) + poller.poll(1) + self.assertEqual(options.logger.data[0], 'EBADF encountered in poll') + + def test_poll_uncaught_exception(self): + _select = DummySelect(error=errno.EPERM) + options = DummyOptions() + poller = self._makeOne(options) + poller._select = _select + poller.register_readable(6) + self.assertRaises(select.error, poller.poll, (1,)) + +if implements_kqueue(): + KQueuePollerTestsBase = unittest.TestCase +else: + KQueuePollerTestsBase = SkipTestCase + +class KQueuePollerTests(KQueuePollerTestsBase): def _makeOne(self, options): return KQueuePoller(options) @@ -69,10 +135,15 @@ def assertEventAdded(self, kqueue, kevent, fd, filter_spec): self.assertEqual(kevent.flags, select.KQ_EV_ADD) -class PollerPollTests(unittest.TestCase): +if implements_poll(): + PollerPollTestsBase = unittest.TestCase +else: + PollerPollTestsBase = SkipTestCase + +class PollerPollTests(PollerPollTestsBase): def _makeOne(self, options): - return Poller(options) + return PollPoller(options) def test_register_readable(self): select_poll = DummySelectPoll() @@ -123,7 +194,7 @@ def test_poll_uncaught_exception(self): def test_poll_ignores_and_unregisters_closed_fd(self): select_poll = DummySelectPoll(result=[(6, select.POLLNVAL), - (7, Poller.READ)]) + (7, select.POLLPRI)]) poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_readable(6) @@ -132,7 +203,25 @@ def test_poll_ignores_and_unregisters_closed_fd(self): self.assertEqual(readables, [7]) self.assertEqual(select_poll.unregistered, [6]) +class DummySelect(object): + ''' + Fake implementation of select.select() + ''' + def __init__(self, result=None, error=None): + result = result or {} + self.readables = result.get('readables', []) + self.writables = result.get('writables', []) + self.error = error + + def select(self, r, w, x, timeout): + if self.error: + raise select.error(self.error) + return self.readables, self.writables, [] + class DummySelectPoll(object): + ''' + Fake implementation of select.poll() + ''' def __init__(self, result=None, error=None): self.result = result or [] self.error = error @@ -141,9 +230,9 @@ def __init__(self, result=None, error=None): self.unregistered = [] def register(self, fd, eventmask): - if eventmask == Poller.READ: + if eventmask == select.POLLIN | select.POLLPRI | select.POLLHUP: self.registered_as_readable.append(fd) - elif eventmask == Poller.WRITE: + elif eventmask == select.POLLOUT: self.registered_as_writable.append(fd) else: raise ValueError("Registered a fd on unknown eventmask: '{0}'".format(eventmask)) @@ -158,6 +247,9 @@ def poll(self, timeout): class DummyKQueue(object): + ''' + Fake implementation of select.kqueue() + ''' def __init__(self, result=None, raise_errno=None): self.result = result or [] self.errno = raise_errno From 84290dc0ee120f9ab4d6782c2666a1ab6b63986a Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Mon, 18 Jun 2012 01:23:32 -0300 Subject: [PATCH 09/12] Handle EBADF on kqueue poller backend --- supervisor/poller.py | 16 +++++++++-- supervisor/tests/test_poller.py | 49 ++++++++++++++++++++++++--------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 81fcb205d..215fdca48 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -122,17 +122,27 @@ def initialize(self): def register_readable(self, fd): kevent = select.kevent(fd, filter=select.KQ_FILTER_READ, flags=select.KQ_EV_ADD) - self._kqueue.control([kevent], 0) + self._kqueue_control(fd, kevent) def register_writable(self, fd): kevent = select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=select.KQ_EV_ADD) - self._kqueue.control([kevent], 0) + self._kqueue_control(fd, kevent) def unregister(self, fd): kevent = select.kevent(fd, filter=(select.KQ_FILTER_READ | select.KQ_FILTER_WRITE), flags=select.KQ_EV_DELETE) - self._kqueue.control([kevent], 0) + self._kqueue_control(fd, kevent) + + def _kqueue_control(self, fd, kevent): + try: + self._kqueue.control([kevent], 0) + except OSError as error: + if error.errno == errno.EBADF: + self.options.logger.blather('EBADF encountered in kqueue. ' + 'Invalid file descriptor %s' % fd) + else: + raise def poll(self, timeout): readables, writables = [], [] diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index b79f14a96..9d2db72fb 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -58,6 +58,8 @@ def test_poll_ignores_ebadf(self): poller.register_readable(6) poller.poll(1) self.assertEqual(options.logger.data[0], 'EBADF encountered in poll') + self.assertEqual(poller.readable, []) + self.assertEqual(poller.writable, []) def test_poll_uncaught_exception(self): _select = DummySelect(error=errno.EPERM) @@ -65,7 +67,7 @@ def test_poll_uncaught_exception(self): poller = self._makeOne(options) poller._select = _select poller.register_readable(6) - self.assertRaises(select.error, poller.poll, (1,)) + self.assertRaises(select.error, poller.poll, 1) if implements_kqueue(): KQueuePollerTestsBase = unittest.TestCase @@ -107,7 +109,7 @@ def test_poll_returns_readables_and_writables(self): self.assertEqual(writables, [8]) def test_poll_ignores_eintr(self): - kqueue = DummyKQueue(raise_errno=errno.EINTR) + kqueue = DummyKQueue(raise_errno_poll=errno.EINTR) options = DummyOptions() poller = self._makeOne(options) poller._kqueue = kqueue @@ -115,13 +117,32 @@ def test_poll_ignores_eintr(self): poller.poll(1000) self.assertEqual(options.logger.data[0], 'EINTR encountered in poll') + def test_register_readable_and_writable_ignores_ebadf(self): + _kqueue = DummyKQueue(raise_errno_register=errno.EBADF) + options = DummyOptions() + poller = self._makeOne(options) + poller._kqueue = _kqueue + poller.register_readable(6) + poller.register_writable(7) + self.assertEqual(options.logger.data[0], + 'EBADF encountered in kqueue. Invalid file descriptor 6') + self.assertEqual(options.logger.data[1], + 'EBADF encountered in kqueue. Invalid file descriptor 7') + + def test_register_uncaught_exception(self): + _kqueue = DummyKQueue(raise_errno_register=errno.ENOMEM) + options = DummyOptions() + poller = self._makeOne(options) + poller._kqueue = _kqueue + self.assertRaises(OSError, poller.register_readable, 5) + def test_poll_uncaught_exception(self): - kqueue = DummyKQueue(raise_errno=errno.EBADF) + kqueue = DummyKQueue(raise_errno_poll=errno.EINVAL) options = DummyOptions() poller = self._makeOne(options) poller._kqueue = kqueue poller.register_readable(6) - self.assertRaises(OSError, poller.poll, (1000,)) + self.assertRaises(OSError, poller.poll, 1000) def assertReadEventAdded(self, kqueue, kevent, fd): self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_READ) @@ -190,7 +211,7 @@ def test_poll_uncaught_exception(self): poller = self._makeOne(options) poller._poller = select_poll poller.register_readable(9) - self.assertRaises(select.error, poller.poll, (1000,)) + self.assertRaises(select.error, poller.poll, 1000) def test_poll_ignores_and_unregisters_closed_fd(self): select_poll = DummySelectPoll(result=[(6, select.POLLNVAL), @@ -250,26 +271,28 @@ class DummyKQueue(object): ''' Fake implementation of select.kqueue() ''' - def __init__(self, result=None, raise_errno=None): + def __init__(self, result=None, raise_errno_poll=None, raise_errno_register=None): self.result = result or [] - self.errno = raise_errno + self.errno_poll = raise_errno_poll + self.errno_register = raise_errno_register self.registered_kevents = [] self.registered_flags = [] def control(self, kevents, max_events, timeout=None): if kevents is None: # being called on poll() self.assert_max_events_on_poll(max_events) - self.raise_error() + self.raise_error(self.errno_poll) return self.build_result() self.assert_max_events_on_register(max_events) + self.raise_error(self.errno_register) self.registered_kevents.extend(kevents) - def raise_error(self): - if self.errno: - ex = OSError() - ex.errno = self.errno - raise ex + def raise_error(self, err): + if not err: return + ex = OSError() + ex.errno = err + raise ex def build_result(self): return [FakeKEvent(ident, filter) for ident,filter in self.result] From c9c40763b7d2a8014e344723cdc3f2c97283d041 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Mon, 18 Jun 2012 01:47:41 -0300 Subject: [PATCH 10/12] Use set instead of list for readables and writables on SelectPoller --- supervisor/poller.py | 16 +++++++++------- supervisor/tests/test_poller.py | 8 ++++---- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 215fdca48..82aa04779 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -28,14 +28,13 @@ class SelectPoller(BasePoller): def initialize(self): self._select = select - self.readable = [] - self.writable = [] + self._init_fdsets() def register_readable(self, fd): - self.readable.append(fd) + self.readable.add(fd) def register_writable(self, fd): - self.writable.append(fd) + self.writable.add(fd) def unregister(self, fd): if fd in self.readable: @@ -44,8 +43,7 @@ def unregister(self, fd): self.writable.remove(fd) def unregister_all(self): - self.writable = [] - self.readable = [] + self._init_fdsets() def poll(self, timeout): try: @@ -61,6 +59,10 @@ def poll(self, timeout): raise return r, w + def _init_fdsets(self): + self.readable = set() + self.writable = set() + class PollPoller(BasePoller): def initialize(self): @@ -137,7 +139,7 @@ def unregister(self, fd): def _kqueue_control(self, fd, kevent): try: self._kqueue.control([kevent], 0) - except OSError as error: + except OSError, error: if error.errno == errno.EBADF: self.options.logger.blather('EBADF encountered in kqueue. ' 'Invalid file descriptor %s' % fd) diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index 9d2db72fb..b6fdb7d7b 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -21,13 +21,13 @@ def test_register_readable(self): poller = self._makeOne(DummyOptions()) poller.register_readable(6) poller.register_readable(7) - self.assertEqual(poller.readable, [6,7]) + self.assertEqual(sorted(poller.readable), [6,7]) def test_register_writable(self): poller = self._makeOne(DummyOptions()) poller.register_writable(6) poller.register_writable(7) - self.assertEqual(poller.writable, [6,7]) + self.assertEqual(sorted(poller.writable), [6,7]) def test_poll_returns_readables_and_writables(self): _select = DummySelect(result={'readables': [6], @@ -58,8 +58,8 @@ def test_poll_ignores_ebadf(self): poller.register_readable(6) poller.poll(1) self.assertEqual(options.logger.data[0], 'EBADF encountered in poll') - self.assertEqual(poller.readable, []) - self.assertEqual(poller.writable, []) + self.assertEqual(list(poller.readable), []) + self.assertEqual(list(poller.writable), []) def test_poll_uncaught_exception(self): _select = DummySelect(error=errno.EPERM) From c6beefd8afbdac064ef90202067b845560c42400 Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Mon, 18 Jun 2012 03:04:12 -0300 Subject: [PATCH 11/12] Restore kqueue after daemonize because it's not inherited on fork() --- supervisor/options.py | 7 +++++++ supervisor/poller.py | 21 +++++++++++++++++++++ supervisor/supervisord.py | 8 +++----- supervisor/tests/base.py | 14 ++++++++++++++ supervisor/tests/test_options.py | 8 ++++++++ supervisor/tests/test_poller.py | 24 ++++++++++++++++++++++++ supervisor/tests/test_supervisord.py | 21 ++------------------- 7 files changed, 79 insertions(+), 24 deletions(-) diff --git a/supervisor/options.py b/supervisor/options.py index 8be7d6dc6..415538197 100644 --- a/supervisor/options.py +++ b/supervisor/options.py @@ -50,6 +50,7 @@ from supervisor import loggers from supervisor import states from supervisor import xmlrpc +from supervisor import poller mydir = os.path.abspath(os.path.dirname(__file__)) version_txt = os.path.join(mydir, 'version.txt') @@ -431,6 +432,7 @@ def __init__(self): self.process_group_configs = [] self.parse_warnings = [] self.signal_receiver = SignalReceiver() + self.poller = poller.Poller(self) def version(self, dummy): """Print version to stdout and exit(0). @@ -982,6 +984,11 @@ def server_configs_from_parser(self, parser): return configs def daemonize(self): + self.poller.before_daemonize() + self._daemonize() + self.poller.after_daemonize() + + def _daemonize(self): # To daemonize, we need to become the leader of our own session # (process) group. If we do not, signals sent to our # parent process will also be sent to us. This might be bad because diff --git a/supervisor/poller.py b/supervisor/poller.py index 82aa04779..3b5dc863c 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -24,6 +24,13 @@ def unregister(self, fd): def poll(self, timeout): raise NotImplementedError + def before_daemonize(self): + pass + + def after_daemonize(self): + pass + + class SelectPoller(BasePoller): def initialize(self): @@ -120,13 +127,17 @@ class KQueuePoller(BasePoller): def initialize(self): self._kqueue = select.kqueue() + self.readables = set() + self.writables = set() def register_readable(self, fd): + self.readables.add(fd) kevent = select.kevent(fd, filter=select.KQ_FILTER_READ, flags=select.KQ_EV_ADD) self._kqueue_control(fd, kevent) def register_writable(self, fd): + self.writables.add(fd) kevent = select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=select.KQ_EV_ADD) self._kqueue_control(fd, kevent) @@ -165,6 +176,16 @@ def poll(self, timeout): return readables, writables + def before_daemonize(self): + self._kqueue.close() + self._kqueue = None + + def after_daemonize(self): + self._kqueue = select.kqueue() + for fd in self.readables: + self.register_readable(fd) + for fd in self.writables: + self.register_writable(fd) def implements_poll(): return hasattr(select, 'poll') diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index a6daefb80..50d5ec46a 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -43,7 +43,6 @@ from supervisor import events from supervisor.states import SupervisorStates from supervisor.states import getProcessStateDescription -from supervisor.poller import Poller class Supervisor: stopping = False # set after we detect that we are handling a stop request @@ -53,7 +52,6 @@ class Supervisor: def __init__(self, options): self.options = options - self.poller = Poller(options) self.process_groups = {} self.ticks = {} @@ -210,11 +208,11 @@ def runforever(self): for fd, dispatcher in combined_map.items(): if dispatcher.readable(): - self.poller.register_readable(fd) + self.options.poller.register_readable(fd) if dispatcher.writable(): - self.poller.register_writable(fd) + self.options.poller.register_writable(fd) - r, w = self.poller.poll(timeout) + r, w = self.options.poller.poll(timeout) for fd in r: if combined_map.has_key(fd): diff --git a/supervisor/tests/base.py b/supervisor/tests/base.py index 10d8b0b69..fc5241f29 100644 --- a/supervisor/tests/base.py +++ b/supervisor/tests/base.py @@ -67,6 +67,7 @@ def __init__(self): self.changed_directory = False self.chdir_error = None self.umaskset = None + self.poller = DummyPoller(self) def getLogger(self, *args, **kw): logger = DummyLogger() @@ -1035,6 +1036,19 @@ def __init__(self, serial='abc'): def __str__(self): return 'dummy event' + +class DummyPoller: + def __init__(self, options): + self.result = [], [] + + def register_readable(self, fd): + pass + + def register_writable(self, fd): + pass + + def poll(self, timeout): + return self.result def dummy_handler(event, result): pass diff --git a/supervisor/tests/test_options.py b/supervisor/tests/test_options.py index 0e618ebd3..c86a2e4a3 100644 --- a/supervisor/tests/test_options.py +++ b/supervisor/tests/test_options.py @@ -1586,6 +1586,14 @@ def test_dropPrivileges_nonroot_different_user(self): msg = instance.dropPrivileges(42) self.assertEqual(msg, "Can't drop privilege as nonroot user") + def test_daemonize_notifies_poller_before_and_after_fork(self): + instance = self._makeOne() + instance._daemonize = lambda: None + instance.poller = Mock() + instance.daemonize() + instance.poller.before_daemonize.assert_called_once_with() + instance.poller.after_daemonize.assert_called_once_with() + class TestProcessConfig(unittest.TestCase): def _getTargetClass(self): from supervisor.options import ProcessConfig diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index b6fdb7d7b..b64651c8a 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -2,6 +2,7 @@ import unittest import errno import select +from mock import Mock from supervisor.poller import SelectPoller, PollPoller, KQueuePoller from supervisor.poller import implements_poll, implements_kqueue @@ -84,6 +85,7 @@ def test_register_readable(self): poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_readable(6) + self.assertEqual(list(poller.readables), [6]) self.assertEqual(len(kqueue.registered_kevents), 1) self.assertReadEventAdded(kqueue, kqueue.registered_kevents[0], 6) @@ -92,6 +94,7 @@ def test_register_writable(self): poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_writable(7) + self.assertEqual(list(poller.writables), [7]) self.assertEqual(len(kqueue.registered_kevents), 1) self.assertWriteEventAdded(kqueue, kqueue.registered_kevents[0], 7) @@ -144,6 +147,27 @@ def test_poll_uncaught_exception(self): poller.register_readable(6) self.assertRaises(OSError, poller.poll, 1000) + def test_before_daemonize_closes_kqueue(self): + mock_kqueue = Mock() + options = DummyOptions() + poller = self._makeOne(options) + poller._kqueue = mock_kqueue + poller.before_daemonize() + mock_kqueue.close.assert_called_once_with() + self.assertEqual(poller._kqueue, None) + + def test_after_daemonize_restores_kqueue(self): + options = DummyOptions() + poller = self._makeOne(options) + poller.readables = [1] + poller.writables = [3] + poller.register_readable = Mock() + poller.register_writable = Mock() + poller.after_daemonize() + self.assertTrue(isinstance(poller._kqueue, select.kqueue)) + poller.register_readable.assert_called_with(1) + poller.register_writable.assert_called_with(3) + def assertReadEventAdded(self, kqueue, kevent, fd): self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_READ) diff --git a/supervisor/tests/test_supervisord.py b/supervisor/tests/test_supervisord.py index 308b707fe..b8e06b3e0 100644 --- a/supervisor/tests/test_supervisord.py +++ b/supervisor/tests/test_supervisord.py @@ -373,8 +373,7 @@ def test_runforever_calls_tick(self): def test_runforever_poll_dispatchers(self): options = DummyOptions() - poller = DummyPoller(options) - poller.result = [6], [7, 8] + options.poller.result = [6], [7, 8] supervisord = self._makeOne(options) pconfig = DummyPConfig(options, 'foo', '/bin/foo',) process = DummyProcess(pconfig) @@ -385,7 +384,6 @@ def test_runforever_poll_dispatchers(self): error = DummyDispatcher(writable=True, error=OSError) pgroup.dispatchers = {6:readable, 7:writable, 8:error} supervisord.process_groups = {'foo': pgroup} - supervisord.poller = poller options.test = True supervisord.runforever() self.assertEqual(pgroup.transitioned, True) @@ -395,8 +393,7 @@ def test_runforever_poll_dispatchers(self): def test_runforever_select_dispatcher_exitnow(self): options = DummyOptions() - poller = DummyPoller(options) - poller.result = [6], [] + options.poller.result = [6], [] supervisord = self._makeOne(options) pconfig = DummyPConfig(options, 'foo', '/bin/foo',) process = DummyProcess(pconfig) @@ -406,7 +403,6 @@ def test_runforever_select_dispatcher_exitnow(self): exitnow = DummyDispatcher(readable=True, error=asyncore.ExitNow) pgroup.dispatchers = {6:exitnow} supervisord.process_groups = {'foo': pgroup} - supervisord.poller = poller options.test = True self.assertRaises(asyncore.ExitNow, supervisord.runforever) @@ -507,19 +503,6 @@ def callback(event): self.assertEqual(len(L), 6) self.assertEqual(L[-1].__class__, events.Tick3600Event) -class DummyPoller: - def __init__(self, options): - self.result = [] - - def register_readable(self, fd): - pass - - def register_writable(self, fd): - pass - - def poll(self, timeout): - return self.result - def test_suite(): return unittest.findTestCases(sys.modules[__name__]) From b442130f7f22983855f777bd423c46daaef5159d Mon Sep 17 00:00:00 2001 From: Igor Sobreira Date: Mon, 18 Jun 2012 19:00:10 -0300 Subject: [PATCH 12/12] Remove file descriptor from sets on unregister() for kqueue poller --- supervisor/poller.py | 9 +++++++- supervisor/tests/test_poller.py | 38 ++++++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/supervisor/poller.py b/supervisor/poller.py index 3b5dc863c..86e347ce0 100644 --- a/supervisor/poller.py +++ b/supervisor/poller.py @@ -117,7 +117,6 @@ def _ignore_invalid(self, fd, eventmask): return True return False - class KQueuePoller(BasePoller): ''' Wrapper for select.kqueue()/kevent() @@ -145,6 +144,7 @@ def register_writable(self, fd): def unregister(self, fd): kevent = select.kevent(fd, filter=(select.KQ_FILTER_READ | select.KQ_FILTER_WRITE), flags=select.KQ_EV_DELETE) + self._forget_fd(fd) self._kqueue_control(fd, kevent) def _kqueue_control(self, fd, kevent): @@ -157,6 +157,13 @@ def _kqueue_control(self, fd, kevent): else: raise + def _forget_fd(self, fd): + for collection in (self.readables, self.writables): + try: + collection.remove(fd) + except KeyError: + pass + def poll(self, timeout): readables, writables = [], [] diff --git a/supervisor/tests/test_poller.py b/supervisor/tests/test_poller.py index b64651c8a..d8ea84868 100644 --- a/supervisor/tests/test_poller.py +++ b/supervisor/tests/test_poller.py @@ -87,7 +87,7 @@ def test_register_readable(self): poller.register_readable(6) self.assertEqual(list(poller.readables), [6]) self.assertEqual(len(kqueue.registered_kevents), 1) - self.assertReadEventAdded(kqueue, kqueue.registered_kevents[0], 6) + self.assertReadEventAdded(kqueue.registered_kevents[0], 6) def test_register_writable(self): kqueue = DummyKQueue() @@ -96,7 +96,21 @@ def test_register_writable(self): poller.register_writable(7) self.assertEqual(list(poller.writables), [7]) self.assertEqual(len(kqueue.registered_kevents), 1) - self.assertWriteEventAdded(kqueue, kqueue.registered_kevents[0], 7) + self.assertWriteEventAdded(kqueue.registered_kevents[0], 7) + + def test_unregister(self): + kqueue = DummyKQueue() + poller = self._makeOne(DummyOptions()) + poller._kqueue = kqueue + poller.register_writable(7) + poller.register_readable(8) + poller.unregister(7) + poller.unregister(100) # not registered, ignore error + self.assertEqual(list(poller.writables), []) + self.assertEqual(list(poller.readables), [8]) + self.assertWriteEventAdded(kqueue.registered_kevents[0], 7) + self.assertReadEventAdded(kqueue.registered_kevents[1], 8) + self.assertDeletedEvent(kqueue.registered_kevents[2], 7) def test_poll_returns_readables_and_writables(self): kqueue = DummyKQueue(result=[(6, select.KQ_FILTER_READ), @@ -168,16 +182,20 @@ def test_after_daemonize_restores_kqueue(self): poller.register_readable.assert_called_with(1) poller.register_writable.assert_called_with(3) - def assertReadEventAdded(self, kqueue, kevent, fd): - self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_READ) + def assertReadEventAdded(self, kevent, fd): + self.assertKevent(kevent, fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) + + def assertWriteEventAdded(self, kevent, fd): + self.assertKevent(kevent, fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) - def assertWriteEventAdded(self, kqueue, kevent, fd): - self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_WRITE) + def assertDeletedEvent(self, kevent, fd): + self.assertKevent(kevent, fd, select.KQ_FILTER_READ | select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) - def assertEventAdded(self, kqueue, kevent, fd, filter_spec): - self.assertEqual(kevent.ident, fd) - self.assertEqual(kevent.filter, filter_spec) - self.assertEqual(kevent.flags, select.KQ_EV_ADD) + def assertKevent(self, kevent, ident, filter, flags): + self.assertEqual(kevent.ident, ident) + self.assertEqual(kevent.filter, filter) + self.assertEqual(kevent.flags, flags) if implements_poll():