Skip to content

Commit 50d1857

Browse files
committed
work towards faster start and stop (Supervisor#131)
1 parent b2eb9a6 commit 50d1857

File tree

3 files changed

+118
-88
lines changed

3 files changed

+118
-88
lines changed

supervisor/rpcinterface.py

Lines changed: 105 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
import datetime
44
import errno
5+
import types
56

67
from supervisor.compat import as_string
78
from supervisor.compat import unicode
@@ -27,7 +28,10 @@
2728
from supervisor.states import getSupervisorStateDescription
2829
from supervisor.states import ProcessStates
2930
from supervisor.states import getProcessStateDescription
30-
from supervisor.states import RUNNING_STATES
31+
from supervisor.states import (
32+
RUNNING_STATES,
33+
STOPPED_STATES,
34+
)
3135

3236
API_VERSION = '3.0'
3337

@@ -273,49 +277,56 @@ def startProcess(self, name, wait=True):
273277
except (NotExecutable, NoPermission) as why:
274278
raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
275279

276-
started = []
280+
if process.get_state() in RUNNING_STATES:
281+
raise RPCError(Faults.ALREADY_STARTED, name)
277282

278-
startsecs = process.config.startsecs
283+
process.spawn()
279284

280-
def startit():
281-
if not started:
285+
# We call reap() in order to more quickly obtain the side effects of
286+
# process.finish(), which reap() eventually ends up calling. This
287+
# might be the case if the spawn() was successful but then the process
288+
# died before its startsecs elapsed or it exited with an unexpected
289+
# exit code. In particular, finish() may set spawnerr, which we can
290+
# check and immediately raise an RPCError, avoiding the need to
291+
# defer by returning a callback.
282292

283-
if process.get_state() in RUNNING_STATES:
284-
raise RPCError(Faults.ALREADY_STARTED, name)
293+
self.supervisord.reap()
285294

286-
process.spawn()
295+
if process.spawnerr:
296+
raise RPCError(Faults.SPAWN_ERROR, name)
287297

298+
# We call process.transition() in order to more quickly obtain its
299+
# side effects. In particular, it might set the process' state from
300+
# STARTING->RUNNING if the process has a startsecs==0.
301+
process.transition()
302+
303+
if wait and process.get_state() != ProcessStates.RUNNING:
304+
# by default, this branch will almost always be hit for processes
305+
# with default startsecs configurations, because the default number
306+
# of startsecs for a process is "1", and the process will not have
307+
# entered the RUNNING state yet even though we've called
308+
# transition() on it. This is because a process is not considered
309+
# RUNNING until it has stayed up > startsecs.
310+
311+
def onwait():
288312
if process.spawnerr:
289313
raise RPCError(Faults.SPAWN_ERROR, name)
290314

291-
# we use a list here to fake out lexical scoping;
292-
# using a direct assignment to 'started' in the
293-
# function appears to not work (symptom: 2nd or 3rd
294-
# call through, it forgets about 'started', claiming
295-
# it's undeclared).
296-
started.append(time.time())
315+
state = process.get_state()
297316

298-
if not wait or not startsecs:
299-
return True
317+
if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
318+
raise RPCError(Faults.ABNORMAL_TERMINATION, name)
300319

301-
t = time.time()
302-
runtime = (t - started[0])
303-
state = process.get_state()
304-
305-
if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
306-
raise RPCError(Faults.ABNORMAL_TERMINATION, name)
320+
if state == ProcessStates.RUNNING:
321+
return True
307322

308-
if runtime < startsecs:
309323
return NOT_DONE_YET
310324

311-
if state == ProcessStates.RUNNING:
312-
return True
313-
314-
raise RPCError(Faults.ABNORMAL_TERMINATION, name)
325+
onwait.delay = 0.05
326+
onwait.rpcinterface = self
327+
return onwait # deferred
315328

316-
startit.delay = 0.05
317-
startit.rpcinterface = self
318-
return startit # deferred
329+
return True
319330

320331
def startProcessGroup(self, name, wait=True):
321332
""" Start all processes in the group named 'name'
@@ -373,36 +384,43 @@ def stopProcess(self, name, wait=True):
373384
group_name, process_name = split_namespec(name)
374385
return self.stopProcessGroup(group_name, wait)
375386

376-
stopped = []
377-
called = []
378-
379-
def killit():
380-
if not called:
381-
if process.get_state() not in RUNNING_STATES:
382-
raise RPCError(Faults.NOT_RUNNING)
383-
# use a mutable for lexical scoping; see startProcess
384-
called.append(1)
385-
386-
if not stopped:
387-
msg = process.stop()
388-
if msg is not None:
389-
raise RPCError(Faults.FAILED, msg)
390-
stopped.append(1)
391-
392-
if wait:
387+
if process.get_state() not in RUNNING_STATES:
388+
raise RPCError(Faults.NOT_RUNNING)
389+
390+
msg = process.stop()
391+
if msg is not None:
392+
raise RPCError(Faults.FAILED, msg)
393+
394+
# We'll try to reap any killed child. FWIW, reap calls waitpid, and
395+
# then, if waitpid returns a pid, calls finish() on the process with
396+
# that pid, which drains any I/O from the process' dispatchers and
397+
# changes the process' state. I chose to call reap without once=True
398+
# because we don't really care if we reap more than one child. Even if
399+
# we only reap one child. we may not even be reaping the child that we
400+
# just stopped (this is all async, and process.stop() may not work, and
401+
# we'll need to wait for SIGKILL during process.transition() as the
402+
# result of normal select looping).
403+
404+
self.supervisord.reap()
405+
406+
if wait and process.get_state() not in STOPPED_STATES:
407+
408+
def onwait():
409+
# process will eventually enter a stopped state by
410+
# virtue of the supervisord.reap() method being called
411+
# during normal operations
412+
self.supervisord.options.logger.info(
413+
'waiting for %s to stop' % process.config.name
414+
)
415+
if process.get_state() not in STOPPED_STATES:
393416
return NOT_DONE_YET
394-
else:
395-
return True
396-
397-
if process.get_state() not in (ProcessStates.STOPPED,
398-
ProcessStates.EXITED):
399-
return NOT_DONE_YET
400-
else:
401417
return True
402418

403-
killit.delay = 0.2
404-
killit.rpcinterface = self
405-
return killit # deferred
419+
onwait.delay = 0
420+
onwait.rpcinterface = self
421+
return onwait # deferred
422+
423+
return True
406424

407425
def stopProcessGroup(self, name, wait=True):
408426
""" Stop all processes in the process group named 'name'
@@ -792,46 +810,55 @@ def allfunc(
792810
callbacks=callbacks, # used only to fool scoping, never passed by caller
793811
results=results, # used only to fool scoping, never passed by caller
794812
):
813+
795814
if not callbacks:
796815

797816
for group, process in processes:
798817
name = make_namespec(group.config.name, process.config.name)
799818
if predicate(process):
800819
try:
801820
callback = func(name, **extra_kwargs)
802-
callbacks.append((group, process, callback))
803821
except RPCError as e:
804822
results.append({'name':process.config.name,
805823
'group':group.config.name,
806824
'status':e.code,
807825
'description':e.text})
808826
continue
827+
if isinstance(callback, types.FunctionType):
828+
callbacks.append((group, process, callback))
829+
else:
830+
results.append(
831+
{'name':process.config.name,
832+
'group':group.config.name,
833+
'status':Faults.SUCCESS,
834+
'description':'OK'}
835+
)
809836

810837
if not callbacks:
811838
return results
812839

813-
group, process, callback = callbacks.pop(0)
840+
for struct in callbacks[:]:
814841

815-
try:
816-
value = callback()
817-
except RPCError as e:
818-
results.append(
819-
{'name':process.config.name,
820-
'group':group.config.name,
821-
'status':e.code,
822-
'description':e.text})
823-
return NOT_DONE_YET
842+
group, process, cb = struct
824843

825-
if value is NOT_DONE_YET:
826-
# push it back into the queue; it will finish eventually
827-
callbacks.append((group, process, callback))
828-
else:
829-
results.append(
830-
{'name':process.config.name,
831-
'group':group.config.name,
832-
'status':Faults.SUCCESS,
833-
'description':'OK'}
834-
)
844+
try:
845+
value = cb()
846+
except RPCError as e:
847+
results.append(
848+
{'name':process.config.name,
849+
'group':group.config.name,
850+
'status':e.code,
851+
'description':e.text})
852+
value = None
853+
854+
if value is not NOT_DONE_YET:
855+
results.append(
856+
{'name':process.config.name,
857+
'group':group.config.name,
858+
'status':Faults.SUCCESS,
859+
'description':'OK'}
860+
)
861+
callbacks.remove(struct)
835862

836863
if callbacks:
837864
return NOT_DONE_YET

supervisor/tests/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,9 @@ def set_procattr(self, process_name, attr_name, val, group_name=None):
10221022
process = self.process_groups[group_name].processes[process_name]
10231023
setattr(process, attr_name, val)
10241024

1025+
def reap(self):
1026+
self.reaped = True
1027+
10251028
class DummyDispatcher:
10261029
write_event_handled = False
10271030
read_event_handled = False

supervisor/tests/test_rpcinterfaces.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,10 @@ def test_startProcess_already_started(self):
316316
supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
317317
supervisord.set_procattr('foo', 'pid', 10)
318318
interface = self._makeOne(supervisord)
319-
callback = interface.startProcess('foo')
320-
self._assertRPCError(xmlrpc.Faults.ALREADY_STARTED,
321-
callback)
319+
self._assertRPCError(
320+
xmlrpc.Faults.ALREADY_STARTED,
321+
interface.startProcess, 'foo'
322+
)
322323

323324
def test_startProcess_bad_group_name(self):
324325
options = DummyOptions()
@@ -372,26 +373,25 @@ def test_startProcess_spawnerr(self):
372373
process = supervisord.process_groups['foo'].processes['foo']
373374
process.spawnerr = 'abc'
374375
interface = self._makeOne(supervisord)
375-
callback = interface.startProcess('foo')
376-
self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback)
376+
self._assertRPCError(
377+
xmlrpc.Faults.SPAWN_ERROR,
378+
interface.startProcess,
379+
'foo'
380+
)
377381

378382
def test_startProcess(self):
379-
from supervisor import http
380383
options = DummyOptions()
381384
pconfig = DummyPConfig(options, 'foo', __file__, autostart=False,
382385
startsecs=.01)
383386
from supervisor.process import ProcessStates
384387
supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
385388
supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED)
386389
interface = self._makeOne(supervisord)
387-
callback = interface.startProcess('foo')
388-
self.assertEqual(callback(), http.NOT_DONE_YET)
390+
result = interface.startProcess('foo')
389391
process = supervisord.process_groups['foo'].processes['foo']
390392
self.assertEqual(process.spawned, True)
391393
self.assertEqual(interface.update_text, 'startProcess')
392394
process.state = ProcessStates.RUNNING
393-
time.sleep(.02)
394-
result = callback()
395395
self.assertEqual(result, True)
396396

397397
def test_startProcess_nowait(self):

0 commit comments

Comments
 (0)