From bda88d3bf0c92aba41e75a6d80ba5d524874e443 Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 1 Oct 2024 15:18:31 +0200 Subject: [PATCH 1/8] Added timeout to run_shell_cmd --- easybuild/tools/run.py | 83 ++++++++++++++++++++++++++++++++++++++---- test/framework/run.py | 43 ++++++++++++++++++++++ 2 files changed, 118 insertions(+), 8 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 2e118e0860..461d790390 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -46,6 +46,7 @@ import subprocess import tempfile import time +import threading from collections import namedtuple from datetime import datetime @@ -355,12 +356,60 @@ def _answer_question(stdout, proc, qa_patterns, qa_wait_patterns): return match_found +def _read_pipe(pipe, output): + """Helper function to read from a pipe and store output in a list. + :param pipe: pipe to read from + :param output: list to store output in + """ + out = b'' + for line in iter(pipe.readline, b''): + _log.debug(f"Captured: {line.decode(errors='ignore').rstrip()}") + out += line + output.append(out) + +def read_pipe(pipe, timeout=None): + """Read from a pipe using a separate thread to avoid blocking and implement a timeout. + :param pipe: pipe to read from + :param timeout: timeout in seconds (default: None = no timeout) + + :return: data read from pipe + + :raises TimeoutError: when reading from pipe takes longer than specified timeout + """ + + output = [] + t = threading.Thread(target=_read_pipe, args=(pipe, output)) + t.start() + t.join(timeout) + if t.is_alive(): + raise TimeoutError() + return output[0] + +def terminate_process(proc, timeout=20): + """ + Terminate specified process (subprocess.Popen instance). + Attempt to terminate the process using proc.terminate(), and if that fails, use proc.kill(). + + :param proc: process to terminate + :param timeout: timeout in seconds to wait for process to terminate + """ + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + _log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL") + proc.kill() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") + @run_shell_cmd_cache def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None, hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True, output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True, - qa_patterns=None, qa_wait_patterns=None, qa_timeout=100): + timeout=None, qa_patterns=None, qa_wait_patterns=None, qa_timeout=100): """ Run specified (interactive) shell command, and capture output + exit code. @@ -378,6 +427,7 @@ def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=N :param asynchronous: indicate that command is being run asynchronously :param task_id: task ID for specified shell command (included in return value) :param with_hooks: trigger pre/post run_shell_cmd hooks (if defined) + :param timeout: timeout in seconds for command execution :param qa_patterns: list of 2-tuples with patterns for questions + corresponding answers :param qa_wait_patterns: list of strings with patterns for non-questions :param qa_timeout: amount of seconds to wait until more output is produced when there is no matching question @@ -524,16 +574,27 @@ def to_cmd_str(cmd): time_no_match = 0 prev_stdout = '' + # collect output piece-wise, while checking for questions to answer (if qa_patterns is provided) + start = time.time() while exit_code is None: - # collect output line by line, while checking for questions to answer (if qa_patterns is provided) - for line in iter(proc.stdout.readline, b''): - _log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}") - stdout += line + if timeout and time.time() - start > timeout: + error_msg = f"Timeout during `{cmd}` after {timeout} seconds!" + _log.warning(error_msg) + terminate_process(proc) + raise EasyBuildError(error_msg) + try: + t = timeout - (time.time() - start) if timeout else None + stdout += read_pipe(proc.stdout, timeout=t) or b'' + except TimeoutError: + pass # note: we assume that there won't be any questions in stderr output if split_stderr: - for line in iter(proc.stderr.readline, b''): - stderr += line + try: + t = timeout - (time.time() - start) if timeout else None + stderr += read_pipe(proc.stderr, timeout=t) or b'' + except TimeoutError: + pass if qa_patterns: # only check for question patterns if additional output is available @@ -565,7 +626,13 @@ def to_cmd_str(cmd): if split_stderr: stderr += proc.stderr.read() or b'' else: - (stdout, stderr) = proc.communicate(input=stdin) + try: + (stdout, stderr) = proc.communicate(input=stdin, timeout=timeout) + except subprocess.TimeoutExpired as err: + error_msg = f"Timeout during `{cmd}` after {timeout} seconds" + _log.warning(error_msg) + terminate_process(proc) + raise EasyBuildError(error_msg) # return output as a regular string rather than a byte sequence (and non-UTF-8 characters get stripped out) # getpreferredencoding normally gives 'utf-8' but can be ASCII (ANSI_X3.4-1968) diff --git a/test/framework/run.py b/test/framework/run.py index 1c5cf3a422..524383787a 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1726,6 +1726,49 @@ def test_run_shell_cmd_eof_stdin(self): self.assertEqual(res.exit_code, 0, "Non-streaming output: Command timed out") self.assertEqual(res.output, inp) + def test_run_shell_cmd_timeout(self): + """Test use of run_shell_cmd with a timeout.""" + cmd = 'sleep 1; echo hello' + # Failure on process timeout + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5 + ) + + # Success + with self.mocked_stdout_stderr(): + res = run_shell_cmd(cmd, timeout=3) + self.assertEqual(res.exit_code, 0) + self.assertEqual(res.output, "hello\n") + + def test_run_shell_cmd_timeout_stream(self): + """Test use of run_shell_cmd with a timeout.""" + data = '0'*128 + # Failure on process timeout + cmd = f'while true; do echo {data} && sleep 0.1; done' + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5, stream_output=True + ) + + # Failure on stdout read timeout + cmd = 'cat -' + with self.mocked_stdout_stderr(): + self.assertErrorRegex( + EasyBuildError, "Timeout during `.*` after .* seconds", + run_shell_cmd, cmd, timeout=.5, stream_output=True + ) + + # Success + cmd = 'sleep .5 && echo hello' + with self.mocked_stdout_stderr(): + res = run_shell_cmd(cmd, timeout=1.5, stream_output=True) + + self.assertEqual(res.exit_code, 0) + self.assertEqual(res.output, "hello\n") + def test_run_cmd_async(self): """Test asynchronously running of a shell command via run_cmd + complete_cmd.""" From 1d47a017b6e20e4c9448cca477844c49e24a4876 Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 1 Oct 2024 15:35:36 +0200 Subject: [PATCH 2/8] linting --- easybuild/tools/run.py | 5 ++++- test/framework/run.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 461d790390..6c6ba0751a 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -356,6 +356,7 @@ def _answer_question(stdout, proc, qa_patterns, qa_wait_patterns): return match_found + def _read_pipe(pipe, output): """Helper function to read from a pipe and store output in a list. :param pipe: pipe to read from @@ -367,6 +368,7 @@ def _read_pipe(pipe, output): out += line output.append(out) + def read_pipe(pipe, timeout=None): """Read from a pipe using a separate thread to avoid blocking and implement a timeout. :param pipe: pipe to read from @@ -385,6 +387,7 @@ def read_pipe(pipe, timeout=None): raise TimeoutError() return output[0] + def terminate_process(proc, timeout=20): """ Terminate specified process (subprocess.Popen instance). @@ -628,7 +631,7 @@ def to_cmd_str(cmd): else: try: (stdout, stderr) = proc.communicate(input=stdin, timeout=timeout) - except subprocess.TimeoutExpired as err: + except subprocess.TimeoutExpired: error_msg = f"Timeout during `{cmd}` after {timeout} seconds" _log.warning(error_msg) terminate_process(proc) diff --git a/test/framework/run.py b/test/framework/run.py index 524383787a..4d65965a7e 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1760,7 +1760,7 @@ def test_run_shell_cmd_timeout_stream(self): EasyBuildError, "Timeout during `.*` after .* seconds", run_shell_cmd, cmd, timeout=.5, stream_output=True ) - + # Success cmd = 'sleep .5 && echo hello' with self.mocked_stdout_stderr(): From 979c51cb3761bd9d59647de64da2c7fd8974a139 Mon Sep 17 00:00:00 2001 From: crivella Date: Wed, 2 Oct 2024 13:15:27 +0200 Subject: [PATCH 3/8] Should not run `kill` if `terminate` worked --- easybuild/tools/run.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 6c6ba0751a..2149af5987 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -401,11 +401,12 @@ def terminate_process(proc, timeout=20): proc.wait(timeout=timeout) except subprocess.TimeoutExpired: _log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL") - proc.kill() - try: - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") + + proc.kill() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") @run_shell_cmd_cache From a0c104e865a2dea9ad02cb8b6f0faa5f1f171cce Mon Sep 17 00:00:00 2001 From: crivella Date: Wed, 2 Oct 2024 13:28:38 +0200 Subject: [PATCH 4/8] Avoid blocking test if running without timeout implementation --- test/framework/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/run.py b/test/framework/run.py index 4d65965a7e..eaae391e58 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1746,7 +1746,7 @@ def test_run_shell_cmd_timeout_stream(self): """Test use of run_shell_cmd with a timeout.""" data = '0'*128 # Failure on process timeout - cmd = f'while true; do echo {data} && sleep 0.1; done' + cmd = f'for i in {{1..20}}; do echo {data} && sleep 0.1; done' with self.mocked_stdout_stderr(): self.assertErrorRegex( EasyBuildError, "Timeout during `.*` after .* seconds", @@ -1754,7 +1754,7 @@ def test_run_shell_cmd_timeout_stream(self): ) # Failure on stdout read timeout - cmd = 'cat -' + cmd = 'timeout 1 cat -' with self.mocked_stdout_stderr(): self.assertErrorRegex( EasyBuildError, "Timeout during `.*` after .* seconds", From 40a62073686266adb3976ba98e84b2fb3afeb8d1 Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 4 Mar 2025 16:46:32 +0100 Subject: [PATCH 5/8] Default return in read_pipe --- easybuild/tools/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 2149af5987..482527a94e 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -588,7 +588,7 @@ def to_cmd_str(cmd): raise EasyBuildError(error_msg) try: t = timeout - (time.time() - start) if timeout else None - stdout += read_pipe(proc.stdout, timeout=t) or b'' + stdout += read_pipe(proc.stdout, timeout=t) except TimeoutError: pass @@ -596,7 +596,7 @@ def to_cmd_str(cmd): if split_stderr: try: t = timeout - (time.time() - start) if timeout else None - stderr += read_pipe(proc.stderr, timeout=t) or b'' + stderr += read_pipe(proc.stderr, timeout=t) except TimeoutError: pass From 7d71efb66760f3de1aa22a9385fef55c1386941e Mon Sep 17 00:00:00 2001 From: crivella Date: Tue, 4 Mar 2025 16:48:34 +0100 Subject: [PATCH 6/8] The blocking behavior of `cat -` was resolved by a5c232a --- test/framework/run.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/test/framework/run.py b/test/framework/run.py index eaae391e58..1198d45a97 100644 --- a/test/framework/run.py +++ b/test/framework/run.py @@ -1753,14 +1753,6 @@ def test_run_shell_cmd_timeout_stream(self): run_shell_cmd, cmd, timeout=.5, stream_output=True ) - # Failure on stdout read timeout - cmd = 'timeout 1 cat -' - with self.mocked_stdout_stderr(): - self.assertErrorRegex( - EasyBuildError, "Timeout during `.*` after .* seconds", - run_shell_cmd, cmd, timeout=.5, stream_output=True - ) - # Success cmd = 'sleep .5 && echo hello' with self.mocked_stdout_stderr(): From ffa9a697b798bf7f1415f651f0d0217d22e86ae5 Mon Sep 17 00:00:00 2001 From: crivella Date: Thu, 22 May 2025 13:44:46 +0200 Subject: [PATCH 7/8] Improved code readability by moving checks inside `read_pipe` now renamed to `read_process_pipe` --- easybuild/tools/run.py | 55 ++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index 482527a94e..d6f57d992c 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -369,22 +369,40 @@ def _read_pipe(pipe, output): output.append(out) -def read_pipe(pipe, timeout=None): - """Read from a pipe using a separate thread to avoid blocking and implement a timeout. - :param pipe: pipe to read from +def read_process_pipe(proc, pipe_name, start=None, timeout=None): + """Read from a pipe form a process using a separate thread to avoid blocking and implement a timeout. + :param proc: process to read from + :param pipe_name: name of the pipe to read from (stdout or stderr) + :param start: time when the process was started (used to calculate timeout) :param timeout: timeout in seconds (default: None = no timeout) :return: data read from pipe - :raises TimeoutError: when reading from pipe takes longer than specified timeout + :raises EasyBuildError: when reading from pipe takes longer than specified timeout """ + pipe = getattr(proc, pipe_name, None) + if pipe is None: + raise EasyBuildError(f"Pipe '{pipe_name}' not found in process '{proc}'. This is probably a bug.") + + error_msg = "Unexpected timeout error during read_process_pipe" + current_timeout = None + if start is not None and timeout is not None: + current_timeout = timeout - (time.time() - start) + error_msg = f"Timeout during `{proc.args}` after {timeout} seconds" + if current_timeout <= 0: + _log.warning(error_msg) + try: + terminate_process(proc) + except subprocess.TimeoutExpired as exc: + _log.warning(f"Failed to terminate process '{proc.args}': {exc}") + raise EasyBuildError(error_msg) output = [] t = threading.Thread(target=_read_pipe, args=(pipe, output)) t.start() - t.join(timeout) + t.join(current_timeout) if t.is_alive(): - raise TimeoutError() + raise EasyBuildError(error_msg) return output[0] @@ -395,6 +413,8 @@ def terminate_process(proc, timeout=20): :param proc: process to terminate :param timeout: timeout in seconds to wait for process to terminate + + :raises subprocess.TimeoutExpired: if process does not terminate within specified timeout """ proc.terminate() try: @@ -403,11 +423,7 @@ def terminate_process(proc, timeout=20): _log.warning(f"Process did not terminate after {timeout} seconds, sending SIGKILL") proc.kill() - try: - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - raise EasyBuildError(f"Process `{proc.args}` did not terminate after {timeout} seconds, giving up") - + proc.wait(timeout=timeout) @run_shell_cmd_cache def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None, @@ -581,24 +597,11 @@ def to_cmd_str(cmd): # collect output piece-wise, while checking for questions to answer (if qa_patterns is provided) start = time.time() while exit_code is None: - if timeout and time.time() - start > timeout: - error_msg = f"Timeout during `{cmd}` after {timeout} seconds!" - _log.warning(error_msg) - terminate_process(proc) - raise EasyBuildError(error_msg) - try: - t = timeout - (time.time() - start) if timeout else None - stdout += read_pipe(proc.stdout, timeout=t) - except TimeoutError: - pass + stdout += read_process_pipe(proc, 'stdout', start=start, timeout=timeout) # note: we assume that there won't be any questions in stderr output if split_stderr: - try: - t = timeout - (time.time() - start) if timeout else None - stderr += read_pipe(proc.stderr, timeout=t) - except TimeoutError: - pass + stderr += read_process_pipe(proc, 'stderr', start=start, timeout=timeout) if qa_patterns: # only check for question patterns if additional output is available From 87e78ef23860dab022aaffb1e897605f24aba782 Mon Sep 17 00:00:00 2001 From: crivella Date: Thu, 22 May 2025 13:46:42 +0200 Subject: [PATCH 8/8] lint --- easybuild/tools/run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/easybuild/tools/run.py b/easybuild/tools/run.py index d6f57d992c..cd5b6e51df 100644 --- a/easybuild/tools/run.py +++ b/easybuild/tools/run.py @@ -425,6 +425,7 @@ def terminate_process(proc, timeout=20): proc.kill() proc.wait(timeout=timeout) + @run_shell_cmd_cache def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None, hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True,