Skip to content

Replace select() usage for a Poller class witch uses either kqueue(), poll() or select() backend #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions supervisor/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import resource
import stat
import pkg_resources
import select
import glob
import platform
import warnings
Expand Down Expand Up @@ -51,6 +50,7 @@
from supervisor import loggers
from supervisor import states
from supervisor import xmlrpc
from supervisor import poller

mydir = os.path.abspath(os.path.dirname(__file__))
version_txt = os.path.join(mydir, 'version.txt')
Expand Down Expand Up @@ -432,6 +432,7 @@ def __init__(self):
self.process_group_configs = []
self.parse_warnings = []
self.signal_receiver = SignalReceiver()
self.poller = poller.Poller(self)

def version(self, dummy):
"""Print version to stdout and exit(0).
Expand Down Expand Up @@ -983,6 +984,11 @@ def server_configs_from_parser(self, parser):
return configs

def daemonize(self):
self.poller.before_daemonize()
self._daemonize()
self.poller.after_daemonize()

def _daemonize(self):
# To daemonize, we need to become the leader of our own session
# (process) group. If we do not, signals sent to our
# parent process will also be sent to us. This might be bad because
Expand Down Expand Up @@ -1156,9 +1162,6 @@ def cleanup_fds(self):
except OSError:
pass

def select(self, r, w, x, timeout):
return select.select(r, w, x, timeout)

def kill(self, pid, signal):
os.kill(pid, signal)

Expand Down
208 changes: 208 additions & 0 deletions supervisor/poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import select
import errno
import time
import signal

class BasePoller:

def __init__(self, options):
self.options = options
self.initialize()

def initialize(self):
pass

def register_readable(self, fd):
raise NotImplementedError

def register_writable(self, fd):
raise NotImplementedError

def unregister(self, fd):
raise NotImplementedError

def poll(self, timeout):
raise NotImplementedError

def before_daemonize(self):
pass

def after_daemonize(self):
pass


class SelectPoller(BasePoller):

def initialize(self):
self._select = select
self._init_fdsets()

def register_readable(self, fd):
self.readable.add(fd)

def register_writable(self, fd):
self.writable.add(fd)

def unregister(self, fd):
if fd in self.readable:
self.readable.remove(fd)
if fd in self.writable:
self.writable.remove(fd)

def unregister_all(self):
self._init_fdsets()

def poll(self, timeout):
try:
r, w, x = self._select.select(self.readable, self.writable, [], timeout)
except select.error, err:
if err[0] == errno.EINTR:
self.options.logger.blather('EINTR encountered in poll')
return [], []
if err[0] == errno.EBADF:
self.options.logger.blather('EBADF encountered in poll')
self.unregister_all()
return [], []
raise
return r, w

def _init_fdsets(self):
self.readable = set()
self.writable = set()

class PollPoller(BasePoller):

def initialize(self):
self._poller = select.poll()
self.READ = select.POLLIN | select.POLLPRI | select.POLLHUP
self.WRITE = select.POLLOUT

def register_readable(self, fd):
self._poller.register(fd, self.READ)

def register_writable(self, fd):
self._poller.register(fd, self.WRITE)

def unregister(self, fd):
self._poller.unregister(fd)

def poll(self, timeout):
fds = self._poll_fds(timeout)
readables, writables = [], []
for fd, eventmask in fds:
if self._ignore_invalid(fd, eventmask):
continue
if eventmask & self.READ:
readables.append(fd)
if eventmask & self.WRITE:
writables.append(fd)
return readables, writables

def _poll_fds(self, timeout):
try:
return self._poller.poll(timeout * 1000)
except select.error, err:
if err[0] == errno.EINTR:
self.options.logger.blather('EINTR encountered in poll')
return []
raise

def _ignore_invalid(self, fd, eventmask):
if eventmask & select.POLLNVAL:
# POLLNVAL means `fd` value is invalid, not open.
# When a process quits it's `fd`s are closed so there
# is no more reason to keep this `fd` registered
# If the process restarts it's `fd`s are registered again
self.unregister(fd)
return True
return False

class KQueuePoller(BasePoller):
'''
Wrapper for select.kqueue()/kevent()
'''

max_events = 1000

def initialize(self):
self._kqueue = select.kqueue()
self.readables = set()
self.writables = set()

def register_readable(self, fd):
self.readables.add(fd)
kevent = select.kevent(fd, filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
self._kqueue_control(fd, kevent)

def register_writable(self, fd):
self.writables.add(fd)
kevent = select.kevent(fd, filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD)
self._kqueue_control(fd, kevent)

def unregister(self, fd):
kevent = select.kevent(fd, filter=(select.KQ_FILTER_READ | select.KQ_FILTER_WRITE),
flags=select.KQ_EV_DELETE)
self._forget_fd(fd)
self._kqueue_control(fd, kevent)

def _kqueue_control(self, fd, kevent):
try:
self._kqueue.control([kevent], 0)
except OSError, error:
if error.errno == errno.EBADF:
self.options.logger.blather('EBADF encountered in kqueue. '
'Invalid file descriptor %s' % fd)
else:
raise

def _forget_fd(self, fd):
for collection in (self.readables, self.writables):
try:
collection.remove(fd)
except KeyError:
pass

def poll(self, timeout):
readables, writables = [], []

try:
kevents = self._kqueue.control(None, self.max_events, timeout)
except OSError, error:
if error.errno == errno.EINTR:
self.options.logger.blather('EINTR encountered in poll')
return readables, writables
raise

for kevent in kevents:
if kevent.filter == select.KQ_FILTER_READ:
readables.append(kevent.ident)
if kevent.filter == select.KQ_FILTER_WRITE:
writables.append(kevent.ident)

return readables, writables

def before_daemonize(self):
self._kqueue.close()
self._kqueue = None

def after_daemonize(self):
self._kqueue = select.kqueue()
for fd in self.readables:
self.register_readable(fd)
for fd in self.writables:
self.register_writable(fd)

def implements_poll():
return hasattr(select, 'poll')

def implements_kqueue():
return hasattr(select, 'kqueue')

if implements_kqueue():
Poller = KQueuePoller
elif implements_poll():
Poller = PollPoller
else:
Poller = SelectPoller
17 changes: 4 additions & 13 deletions supervisor/supervisord.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,13 @@ def runforever(self):
# killing everything), it's OK to swtop or reload
raise asyncore.ExitNow

r, w, x = [], [], []

for fd, dispatcher in combined_map.items():
if dispatcher.readable():
r.append(fd)
self.options.poller.register_readable(fd)
if dispatcher.writable():
w.append(fd)

try:
r, w, x = self.options.select(r, w, x, timeout)
except select.error, err:
r = w = x = []
if err[0] == errno.EINTR:
self.options.logger.blather('EINTR encountered in select')
else:
raise
self.options.poller.register_writable(fd)

r, w = self.options.poller.poll(timeout)

for fd in r:
if combined_map.has_key(fd):
Expand Down
22 changes: 14 additions & 8 deletions supervisor/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ def __init__(self):
self.privsdropped = None
self.logs_reopened = False
self.environment_processed = False
self.select_result = [], [], []
self.select_error = None
self.write_accept = None
self.write_error = None
self.tempfile_name = '/foo/bar'
Expand All @@ -69,6 +67,7 @@ def __init__(self):
self.changed_directory = False
self.chdir_error = None
self.umaskset = None
self.poller = DummyPoller(self)

def getLogger(self, *args, **kw):
logger = DummyLogger()
Expand Down Expand Up @@ -220,12 +219,6 @@ def process_environment(self):
def mktempfile(self, prefix, suffix, dir):
return self.tempfile_name

def select(self, r, w, x, timeout):
import select
if self.select_error:
raise select.error(self.select_error)
return self.select_result

def remove(self, path):
import os
if self.remove_error:
Expand Down Expand Up @@ -1043,6 +1036,19 @@ def __init__(self, serial='abc'):

def __str__(self):
return 'dummy event'

class DummyPoller:
def __init__(self, options):
self.result = [], []

def register_readable(self, fd):
pass

def register_writable(self, fd):
pass

def poll(self, timeout):
return self.result

def dummy_handler(event, result):
pass
Expand Down
8 changes: 8 additions & 0 deletions supervisor/tests/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,14 @@ def test_dropPrivileges_nonroot_different_user(self):
msg = instance.dropPrivileges(42)
self.assertEqual(msg, "Can't drop privilege as nonroot user")

def test_daemonize_notifies_poller_before_and_after_fork(self):
instance = self._makeOne()
instance._daemonize = lambda: None
instance.poller = Mock()
instance.daemonize()
instance.poller.before_daemonize.assert_called_once_with()
instance.poller.after_daemonize.assert_called_once_with()

class TestProcessConfig(unittest.TestCase):
def _getTargetClass(self):
from supervisor.options import ProcessConfig
Expand Down
Loading