Skip to content

add initial/experimental support for installing extensions in parallel #3667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6b77925
add support for installing extensions in parallel
boegel Apr 21, 2021
c811d9c
Merge branch 'develop' into install_extensions
boegel Oct 13, 2021
5f86294
Merge branch 'develop' into install_extensions
boegel Oct 20, 2021
a1cf735
Merge branch 'develop' into install_extensions
boegel Oct 21, 2021
6053c0c
also update extensions progress bar when installing extensions in par…
boegel Oct 21, 2021
3fe1f68
mark support for installing extensions in parallel as experimental + …
boegel Oct 21, 2021
2e5c6ab
start extensions progress bar a bit earlier, also mention preparatory…
boegel Oct 21, 2021
3748d9c
add and use update_exts_progress_bar method to EasyBlock
boegel Oct 21, 2021
45a2627
fix formatting for extension progress bar when installing extensions …
boegel Oct 21, 2021
78faeab
update extensions progress bar with more detailed info when creating …
boegel Oct 21, 2021
fdc8a1a
only update extensions progress bar in init_ext_instances if there ac…
boegel Oct 22, 2021
a4502f1
Merge branch 'develop' into install_extensions
boegel Oct 22, 2021
137ac32
Merge branch 'develop' into install_extensions
boegel Oct 22, 2021
85273d0
use check_async_cmd in Extension.async_cmd_check
boegel Oct 22, 2021
b2938dd
remove import for unused complete_cmd from framework/extension.py
boegel Oct 22, 2021
805a461
Merge branch 'develop' into install_extensions
boegel Oct 25, 2021
447c1da
fix occasional failure in test_run_cmd_async
boegel Oct 25, 2021
d82ade9
check early for opt-in to using experimental feature when --parallel-…
boegel Oct 25, 2021
42c0bb3
tweak extensions progress bar label to also show 'X/Y done' when inst…
boegel Oct 25, 2021
c5d598f
inject short sleep before checking status of failing asynchronous com…
boegel Oct 26, 2021
73af425
drop 'parallel' argument for install_extensions, to avoid having to o…
boegel Oct 26, 2021
16e02ea
add run_async method to install extension asynchronously
boegel Oct 26, 2021
60c5d15
move printing of progress info on installing extensions in parallel a…
boegel Oct 26, 2021
3e186fb
return True in Extension.async_cmd_check if async_cmd_info is set to …
boegel Oct 26, 2021
0dd9061
add test for installing extensions in parallel
boegel Oct 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 220 additions & 56 deletions easybuild/framework/easyblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,14 +1616,18 @@ def skip_extensions(self):
- use this to detect existing extensions and to remove them from self.ext_instances
- based on initial R version
"""
self.update_exts_progress_bar("skipping installed extensions")

# obtaining untemplated reference value is required here to support legacy string templates like name/version
exts_filter = self.cfg.get_ref('exts_filter')

if not exts_filter or len(exts_filter) == 0:
raise EasyBuildError("Skipping of extensions, but no exts_filter set in easyconfig")

exts_cnt = len(self.ext_instances)

res = []
for ext_inst in self.ext_instances:
for idx, ext_inst in enumerate(self.ext_instances):
cmd, stdin = resolve_exts_filter_template(exts_filter, ext_inst)
(cmdstdouterr, ec) = run_cmd(cmd, log_all=False, log_ok=False, simple=False, inp=stdin, regexp=False)
self.log.info("exts_filter result %s %s", cmdstdouterr, ec)
Expand All @@ -1634,8 +1638,203 @@ def skip_extensions(self):
else:
print_msg("skipping extension %s" % ext_inst.name, silent=self.silent, log=self.log)

self.update_exts_progress_bar("skipping installed extensions (%d/%d checked)" % (idx + 1, exts_cnt))

self.ext_instances = res

def install_extensions(self, install=True, parallel=False):
"""
Install extensions.

:param install: actually install extensions, don't just prepare environment for installing
:param parallel: install extensions in parallel

"""
self.log.debug("List of loaded modules: %s", self.modules_tool.list())

if build_option('parallel_extensions_install') and parallel:
self.log.experimental("installing extensions in parallel")
self.install_extensions_parallel(install=install)
else:
self.install_extensions_sequential(install=install)

def install_extensions_sequential(self, install=True):
"""
Install extensions sequentially.

:param install: actually install extensions, don't just prepare environment for installing
"""
self.log.info("Installing extensions sequentially...")

exts_cnt = len(self.ext_instances)

for idx, ext in enumerate(self.ext_instances):

self.log.info("Starting extension %s", ext.name)

# always go back to original work dir to avoid running stuff from a dir that no longer exists
change_dir(self.orig_workdir)

progress_info = "Installing '%s' extension (%s/%s)" % (ext.name, idx + 1, exts_cnt)
self.update_exts_progress_bar(progress_info)

tup = (ext.name, ext.version or '', idx + 1, exts_cnt)
print_msg("installing extension %s %s (%d/%d)..." % tup, silent=self.silent, log=self.log)
start_time = datetime.now()

if self.dry_run:
tup = (ext.name, ext.version, ext.__class__.__name__)
msg = "\n* installing extension %s %s using '%s' easyblock\n" % tup
self.dry_run_msg(msg)

self.log.debug("List of loaded modules: %s", self.modules_tool.list())

# prepare toolchain build environment, but only when not doing a dry run
# since in that case the build environment is the same as for the parent
if self.dry_run:
self.dry_run_msg("defining build environment based on toolchain (options) and dependencies...")
else:
# don't reload modules for toolchain, there is no need since they will be loaded already;
# the (fake) module for the parent software gets loaded before installing extensions
ext.toolchain.prepare(onlymod=self.cfg['onlytcmod'], silent=True, loadmod=False,
rpath_filter_dirs=self.rpath_filter_dirs)

# real work
if install:
try:
ext.prerun()
with self.module_generator.start_module_creation():
txt = ext.run()
if txt:
self.module_extra_extensions += txt
ext.postrun()
finally:
if not self.dry_run:
ext_duration = datetime.now() - start_time
if ext_duration.total_seconds() >= 1:
print_msg("\t... (took %s)", time2str(ext_duration), log=self.log, silent=self.silent)
elif self.logdebug or build_option('trace'):
print_msg("\t... (took < 1 sec)", log=self.log, silent=self.silent)

self.update_exts_progress_bar(progress_info, progress_size=1)

def install_extensions_parallel(self, install=True):
"""
Install extensions in parallel.

:param install: actually install extensions, don't just prepare environment for installing
"""
self.log.info("Installing extensions in parallel...")

running_exts = []
installed_ext_names = []

all_ext_names = [x['name'] for x in self.exts_all]
self.log.debug("List of names of all extensions: %s", all_ext_names)

# take into account that some extensions may be installed already
to_install_ext_names = [x.name for x in self.ext_instances]
installed_ext_names = [n for n in all_ext_names if n not in to_install_ext_names]

exts_cnt = len(all_ext_names)
exts_queue = self.ext_instances[:]

def update_exts_progress_bar_helper(running_exts, progress_size):
"""Helper function to update extensions progress bar."""
running_exts_cnt = len(running_exts)
if running_exts_cnt > 1:
progress_info = "Installing %d extensions: " % running_exts_cnt
elif running_exts_cnt == 1:
progress_info = "Installing extension "
else:
progress_info = "Not installing extensions (yet)"

progress_info += ', '.join(e.name for e in running_exts)
progress_info += " (%d/%d done)" % (len(installed_ext_names), exts_cnt)
self.update_exts_progress_bar(progress_info, progress_size=progress_size)

iter_id = 0
while exts_queue or running_exts:

iter_id += 1

# always go back to original work dir to avoid running stuff from a dir that no longer exists
change_dir(self.orig_workdir)

# check for extension installations that have completed
if running_exts:
self.log.info("Checking for completed extension installations (%d running)...", len(running_exts))
for ext in running_exts[:]:
if self.dry_run or ext.async_cmd_check():
self.log.info("Installation of %s completed!", ext.name)
ext.postrun()
running_exts.remove(ext)
installed_ext_names.append(ext.name)
update_exts_progress_bar_helper(running_exts, 1)
else:
self.log.debug("Installation of %s is still running...", ext.name)

# print progress info every now and then
if iter_id % 1 == 0:
msg = "%d out of %d extensions installed (%d queued, %d running: %s)"
installed_cnt, queued_cnt, running_cnt = len(installed_ext_names), len(exts_queue), len(running_exts)
if running_cnt <= 3:
running_ext_names = ', '.join(x.name for x in running_exts)
else:
running_ext_names = ', '.join(x.name for x in running_exts[:3]) + ", ..."
print_msg(msg % (installed_cnt, exts_cnt, queued_cnt, running_cnt, running_ext_names), log=self.log)

# try to start as many extension installations as we can, taking into account number of available cores,
# but only consider first 100 extensions still in the queue
max_iter = min(100, len(exts_queue))

for _ in range(max_iter):

if not (exts_queue and len(running_exts) < self.cfg['parallel']):
break

# check whether extension at top of the queue is ready to install
ext = exts_queue.pop(0)

pending_deps = [x for x in ext.required_deps if x not in installed_ext_names]

if self.dry_run:
tup = (ext.name, ext.version, ext.__class__.__name__)
msg = "\n* installing extension %s %s using '%s' easyblock\n" % tup
self.dry_run_msg(msg)
running_exts.append(ext)

# if some of the required dependencies are not installed yet, requeue this extension
elif pending_deps:

# make sure all required dependencies are actually going to be installed,
# to avoid getting stuck in an infinite loop!
missing_deps = [x for x in ext.required_deps if x not in all_ext_names]
if missing_deps:
raise EasyBuildError("Missing required dependencies for %s are not going to be installed: %s",
ext.name, ', '.join(missing_deps))
else:
self.log.info("Required dependencies missing for extension %s (%s), adding it back to queue...",
ext.name, ', '.join(pending_deps))
# purposely adding extension back in the queue at Nth place rather than at the end,
# since we assume that the required dependencies will be installed soon...
exts_queue.insert(max_iter, ext)

else:
tup = (ext.name, ext.version or '')
print_msg("starting installation of extension %s %s..." % tup, silent=self.silent, log=self.log)

# don't reload modules for toolchain, there is no need since they will be loaded already;
# the (fake) module for the parent software gets loaded before installing extensions
ext.toolchain.prepare(onlymod=self.cfg['onlytcmod'], silent=True, loadmod=False,
rpath_filter_dirs=self.rpath_filter_dirs)
if install:
ext.prerun()
ext.run(asynchronous=True)
running_exts.append(ext)
self.log.info("Started installation of extension %s in the background...", ext.name)
update_exts_progress_bar_helper(running_exts, 0)

#
# MISCELLANEOUS UTILITY FUNCTIONS
#
Expand Down Expand Up @@ -2307,7 +2506,11 @@ def init_ext_instances(self):
error_msg = "Improper default extension class specification, should be string: %s (%s)"
raise EasyBuildError(error_msg, exts_defaultclass, type(exts_defaultclass))

for ext in self.exts:
exts_cnt = len(self.exts)

self.update_exts_progress_bar("creating internal datastructures for extensions")

for idx, ext in enumerate(self.exts):
ext_name = ext['name']
self.log.debug("Creating class instance for extension %s...", ext_name)

Expand Down Expand Up @@ -2368,6 +2571,15 @@ def init_ext_instances(self):
self.log.debug("Installing extension %s with class %s (from %s)", ext_name, class_name, mod_path)

self.ext_instances.append(inst)
pbar_label = "creating internal datastructures for extensions "
pbar_label += "(%d/%d done)" % (idx + 1, exts_cnt)
self.update_exts_progress_bar(pbar_label)

def update_exts_progress_bar(self, info, progress_size=0):
"""
Update extensions progress bar with specified info and amount of progress made
"""
update_progress_bar(PROGRESS_BAR_EXTENSIONS, label=info, progress_size=progress_size)

def extensions_step(self, fetch=False, install=True):
"""
Expand All @@ -2390,9 +2602,12 @@ def extensions_step(self, fetch=False, install=True):

fake_mod_data = self.load_fake_module(purge=True, extra_modules=build_dep_mods)

start_progress_bar(PROGRESS_BAR_EXTENSIONS, len(self.cfg['exts_list']))

self.prepare_for_extensions()

if fetch:
self.update_exts_progress_bar("fetching extension sources/patches")
self.exts = self.collect_exts_file_info(fetch_files=True)

self.exts_all = self.exts[:] # retain a copy of all extensions, regardless of filtering/skipping
Expand All @@ -2411,65 +2626,14 @@ def extensions_step(self, fetch=False, install=True):
if self.skip:
self.skip_extensions()

exts_cnt = len(self.ext_instances)

start_progress_bar(PROGRESS_BAR_EXTENSIONS, exts_cnt)

for idx, ext in enumerate(self.ext_instances):

self.log.debug("Starting extension %s" % ext.name)

# always go back to original work dir to avoid running stuff from a dir that no longer exists
change_dir(self.orig_workdir)

progress_label = "Installing '%s' extension" % ext.name
update_progress_bar(PROGRESS_BAR_EXTENSIONS, label=progress_label)

tup = (ext.name, ext.version or '', idx + 1, exts_cnt)
print_msg("installing extension %s %s (%d/%d)..." % tup, silent=self.silent)

start_time = datetime.now()

if self.dry_run:
tup = (ext.name, ext.version, ext.__class__.__name__)
msg = "\n* installing extension %s %s using '%s' easyblock\n" % tup
self.dry_run_msg(msg)

self.log.debug("List of loaded modules: %s", self.modules_tool.list())

# prepare toolchain build environment, but only when not doing a dry run
# since in that case the build environment is the same as for the parent
if self.dry_run:
self.dry_run_msg("defining build environment based on toolchain (options) and dependencies...")
else:
# don't reload modules for toolchain, there is no need since they will be loaded already;
# the (fake) module for the parent software gets loaded before installing extensions
ext.toolchain.prepare(onlymod=self.cfg['onlytcmod'], silent=True, loadmod=False,
rpath_filter_dirs=self.rpath_filter_dirs)

# real work
if install:
try:
ext.prerun()
with self.module_generator.start_module_creation():
txt = ext.run()
if txt:
self.module_extra_extensions += txt
ext.postrun()
finally:
if not self.dry_run:
ext_duration = datetime.now() - start_time
if ext_duration.total_seconds() >= 1:
print_msg("\t... (took %s)", time2str(ext_duration), log=self.log, silent=self.silent)
elif self.logdebug or build_option('trace'):
print_msg("\t... (took < 1 sec)", log=self.log, silent=self.silent)

stop_progress_bar(PROGRESS_BAR_EXTENSIONS, visible=False)
self.install_extensions(install=install)

# cleanup (unload fake module, remove fake module dir)
if fake_mod_data:
self.clean_up_fake_module(fake_mod_data)

stop_progress_bar(PROGRESS_BAR_EXTENSIONS, visible=False)

def package_step(self):
"""Package installed software (e.g., into an RPM), if requested, using selected package tool."""

Expand Down
Loading