Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 46 additions & 50 deletions nipype/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
Requires Packages to be installed
"""
from __future__ import print_function, division, unicode_literals, absolute_import
from future import standard_library
standard_library.install_aliases()
import gc

from builtins import range, object, open, str, bytes

from configparser import NoOptionError
from copy import deepcopy
import datetime
from datetime import datetime as dt
Expand All @@ -26,7 +25,6 @@
import select
import subprocess as sp
import sys
import time
from textwrap import wrap
from warnings import warn
import simplejson as json
Expand All @@ -43,6 +41,8 @@
traits, Undefined, TraitDictObject, TraitListObject, TraitError, isdefined,
File, Directory, DictStrStr, has_metadata, ImageFile)
from ..external.due import due
from future import standard_library
standard_library.install_aliases()

nipype_version = Version(__version__)
iflogger = logging.getLogger('interface')
Expand All @@ -58,6 +58,7 @@
class Str(traits.Unicode):
"""Replacement for the default traits.Str based in bytes"""


traits.Str = Str


Expand Down Expand Up @@ -634,16 +635,16 @@ def __deepcopy__(self, memo):
return memo[id_self]
dup_dict = deepcopy(self.get(), memo)
# access all keys
for key in self.copyable_trait_names():
if key in self.__dict__.keys():
_ = getattr(self, key)
# for key in self.copyable_trait_names():
# if key in self.__dict__.keys():
# _ = getattr(self, key)
# clone once
dup = self.clone_traits(memo=memo)
for key in self.copyable_trait_names():
try:
_ = getattr(dup, key)
except:
pass
# for key in self.copyable_trait_names():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamic traits won't work without these. i'm not sure we have a test for it but this is the relevant issue and PR.

enthought/traits#373

# try:
# _ = getattr(dup, key)
# except:
# pass
# clone twice
dup = self.clone_traits(memo=memo)
dup.trait_set(**dup_dict)
Expand Down Expand Up @@ -1260,6 +1261,7 @@ class SimpleInterface(BaseInterface):
>>> os.chdir(old.strpath)

"""

def __init__(self, from_file=None, resource_monitor=None, **inputs):
super(SimpleInterface, self).__init__(
from_file=from_file, resource_monitor=resource_monitor, **inputs)
Expand Down Expand Up @@ -1387,8 +1389,7 @@ def run_command(runtime, output=None, timeout=0.01):
shell=True,
cwd=runtime.cwd,
env=env,
close_fds=True,
)
close_fds=True)
result = {
'stdout': [],
'stderr': [],
Expand Down Expand Up @@ -1427,12 +1428,7 @@ def _process(drain=0):
temp.sort()
result['merged'] = [r[1] for r in temp]

if output == 'allatonce':
stdout, stderr = proc.communicate()
result['stdout'] = read_stream(stdout, logger=iflogger)
result['stderr'] = read_stream(stderr, logger=iflogger)

elif output.startswith('file'):
if output.startswith('file'):
proc.wait()
if outfile is not None:
stdout.flush()
Expand All @@ -1452,12 +1448,18 @@ def _process(drain=0):
result['merged'] = result['stdout']
result['stdout'] = []
else:
proc.communicate() # Discard stdout and stderr
stdout, stderr = proc.communicate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make these stdoutstr, stdinstr, to be consistent with if output.startswith('file'), so that the same del statement will work in all cases.

Alternately, it might be simplest to just run gc.collect() in CommandLine._run_interface, after this function is called and everything is allowed to go out of scope.

if output == 'allatonce': # Discard stdout and stderr otherwise
result['stdout'] = read_stream(stdout, logger=iflogger)
result['stderr'] = read_stream(stderr, logger=iflogger)

runtime.returncode = proc.returncode
proc.terminate() # Ensure we are done
gc.collect() # Force GC for a cleanup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal here is just to clean up proc internals, not stdout/stderr, etc?


runtime.stderr = '\n'.join(result['stderr'])
runtime.stdout = '\n'.join(result['stdout'])
runtime.merged = '\n'.join(result['merged'])
runtime.returncode = proc.returncode
return runtime


Expand All @@ -1467,21 +1469,26 @@ def get_dependencies(name, environ):
Uses otool on darwin, ldd on linux. Currently doesn't support windows.

"""
cmd = None
if sys.platform == 'darwin':
proc = sp.Popen('otool -L `which %s`' % name,
stdout=sp.PIPE,
stderr=sp.PIPE,
shell=True,
env=environ)
cmd = 'otool -L `which {}`'.format
elif 'linux' in sys.platform:
proc = sp.Popen('ldd `which %s`' % name,
stdout=sp.PIPE,
stderr=sp.PIPE,
shell=True,
env=environ)
else:
cmd = 'ldd -L `which {}`'.format

if cmd is None:
return 'Platform %s not supported' % sys.platform
o, e = proc.communicate()

try:
proc = sp.Popen(
cmd(name), stdout=sp.PIPE, stderr=sp.PIPE, shell=True,
env=environ, close_fds=True)
o, e = proc.communicate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd replace e with _, since it's unused.

proc.terminate()
gc.collect()
except:
iflogger.warning(
'Could not get linked libraries for "%s".', name)
return 'Failed collecting dependencies'
return o.rstrip()


Expand Down Expand Up @@ -1572,6 +1579,9 @@ def __init__(self, command=None, terminal_output=None, **inputs):
# Set command. Input argument takes precedence
self._cmd = command or getattr(self, '_cmd', None)

# Store dependencies in runtime object
self._ldd = str2bool(config.get('execution', 'get_linked_libs', 'true'))

if self._cmd is None:
raise Exception("Missing command")

Expand Down Expand Up @@ -1619,21 +1629,6 @@ def raise_exception(self, runtime):
def _get_environ(self):
return getattr(self.inputs, 'environ', {})

def version_from_command(self, flag='-v'):
cmdname = self.cmd.split()[0]
env = dict(os.environ)
if _exists_in_path(cmdname, env):
out_environ = self._get_environ()
env.update(out_environ)
proc = sp.Popen(' '.join((cmdname, flag)),
shell=True,
env=env,
stdout=sp.PIPE,
stderr=sp.PIPE,
)
o, e = proc.communicate()
return o
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an API change. Even if not used internally, someone may depend on this method. Does nipype have a process for deprecating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should start using https://pypi.python.org/pypi/deprecation ?


def _run_interface(self, runtime, correct_return_codes=(0,)):
"""Execute command via subprocess

Expand Down Expand Up @@ -1664,7 +1659,8 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
(self.cmd.split()[0], runtime.hostname))

runtime.command_path = cmd_path
runtime.dependencies = get_dependencies(executable_name, runtime.environ)
runtime.dependencies = (get_dependencies(executable_name, runtime.environ)
if self._ldd else '<skipped>')
runtime = run_command(runtime, output=self.terminal_output)
if runtime.returncode is None or \
runtime.returncode not in correct_return_codes:
Expand Down
13 changes: 7 additions & 6 deletions nipype/pipeline/engine/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

logger = logging.getLogger('workflow')


class Workflow(EngineBase):
"""Controls the setup and execution of a pipeline of processes."""

Expand Down Expand Up @@ -196,7 +197,7 @@ def connect(self, *args, **kwargs):
# determine their inputs/outputs depending on
# connection settings. Skip these modules in the check
if dest in connected_ports[destnode]:
raise Exception("""
raise Exception("""\
Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already
connected.
""" % (srcnode, source, destnode, dest, dest, destnode))
Expand Down Expand Up @@ -297,7 +298,7 @@ def disconnect(self, *args):
remove = []
for edge in conn:
if edge in ed_conns:
idx = ed_conns.index(edge)
# idx = ed_conns.index(edge)
remove.append((edge[0], edge[1]))

logger.debug('disconnect(): remove list %s', to_str(remove))
Expand Down Expand Up @@ -426,7 +427,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical',
base_dir = os.getcwd()
base_dir = make_output_dir(base_dir)
if graph2use in ['hierarchical', 'colored']:
if self.name[:1].isdigit(): # these graphs break if int
if self.name[:1].isdigit(): # these graphs break if int
raise ValueError('{} graph failed, workflow name cannot begin '
'with a number'.format(graph2use))
dotfilename = op.join(base_dir, dotfilename)
Expand Down Expand Up @@ -646,7 +647,7 @@ def _write_report_info(self, workingdir, name, graph):
# Avoid RuntimeWarning: divide by zero encountered in log10
num_nodes = len(nodes)
if num_nodes > 0:
index_name = np.ceil(np.log10(num_nodes)).astype(int)
index_name = np.ceil(np.log10(num_nodes)).astype(int)
else:
index_name = 0
template = '%%0%dd_' % index_name
Expand Down Expand Up @@ -794,10 +795,10 @@ def _get_outputs(self):
setattr(outputdict, node.name, outputs)
return outputdict

def _set_input(self, object, name, newvalue):
def _set_input(self, objekt, name, newvalue):
"""Trait callback function to update a node input
"""
object.traits()[name].node.set_input(name, newvalue)
objekt.traits()[name].node.set_input(name, newvalue)

def _set_node_input(self, node, param, source, sourceinfo):
"""Set inputs of a node given the edge connection"""
Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, plugin_args=None):

# Instantiate different thread pools for non-daemon processes
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
'non' if non_daemon else '', self.processors, self.memory_gb)
'non' * int(non_daemon), self.processors, self.memory_gb)

NipypePool = NonDaemonPool if non_daemon else Pool
try:
Expand Down