Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions renpybuild/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
import shutil
from pathlib import Path
import subprocess
import shutil

import jinja2

import renpybuild.run

from typing import Any


Expand Down Expand Up @@ -226,7 +223,9 @@ def set_names(self, kind : str, task : str, name : str):
else:
self.var("dlpa", "{{distlib}}/py{{ python }}-{{ platform }}-{{ arch }}")

renpybuild.run.build_environment(self)
from .run import build_environment

build_environment(self)

def expand(self, s : str, **kwargs) -> str:
"""
Expand Down Expand Up @@ -319,15 +318,20 @@ def run(self, command : str, verbose : bool=False, quiet : bool=False, **kwargs)
"""

command = self.expand(command, **kwargs)
renpybuild.run.run(command, self, verbose, quiet)

from .run import run

run(command, self, verbose, quiet)

def run_group(self):
"""
Creates a run_group. This is a context manager with a run method,
that allows multiple commands to be run in parallel.
"""

return renpybuild.run.RunGroup(self)
from .run import RunGroup

return RunGroup(self)

def clean(self, d : str="{{build}}"):
"""
Expand Down
156 changes: 109 additions & 47 deletions renpybuild/run.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import re
import sys
import shlex
import subprocess
import sys
import sysconfig
import threading

import jinja2
from concurrent.futures import Future, ThreadPoolExecutor, as_completed

from .context import Context

# This caches the results of emsdk_environment.
emsdk_cache : dict[str, str] = { }
Expand Down Expand Up @@ -441,50 +442,34 @@ def run(command, context, verbose=False, quiet=False):
traceback.print_stack()
sys.exit(1)

class RunCommand(threading.Thread):

def __init__(self, command, context):
super().__init__()

command = context.expand(command)
self.command = shlex.split(command)

self.cwd = context.cwd
self.environ = context.environ.copy()

self.start()

class CommandResult:
"""Stores the result of a single command execution."""

def run(self):
result = subprocess.run(self.command, cwd=self.cwd, env=self.environ, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8")
self.output = result.stdout
self.code = result.returncode

def wait(self):
self.join()
def __init__(self, command_str: str, future: Future[tuple[str, int]]):
self.command_str = command_str
self.future = future
self.output: str = ""
self.code: int = 0
self.done: bool = False

def report(self):
print ("-" * 78)

for i in self.command:
if " " in i:
print(repr(i), end=" ")
else:
print(i, end=" ")

print()
print("-" * 78)
print(self.command_str)
print()
print(self.output)

if self.code != 0:
print()
print(f"Process failed with {self.code}.")

class RunGroup(object):

def __init__(self, context):
class RunGroup:
def __init__(self, context: Context, wait_all: bool = True):
self.executor = ThreadPoolExecutor()
self.context = context
self.tasks = [ ]
self.futures: list[CommandResult] = []
self.wait_all = wait_all

def __enter__(self):
return self
Expand All @@ -493,22 +478,99 @@ def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
return

for i in self.tasks:
i.wait()
# If there are no tasks to do, exit early
if not next(not f.done for f in self.futures):
return

good = [ i for i in self.tasks if i.code == 0 ]
bad = [ i for i in self.tasks if i.code != 0 ]
futures = [f.future for f in self.futures]
total = len(futures)
completed = 0
failed = 0

for i in good:
i.report()
stderr = sys.stderr
if stderr is None:
stderr = open(os.devnull, "w")

for i in bad:
i.report()
spinner_chars = "|/-\\"
spinner_i = 0
last_write_len = 0
interrupted = False

while futures:
try:
future = next(as_completed(futures, 0.1))
cmd_result = next(f for f in self.futures if f.future == future)
futures.remove(future)

# Clean last spinner output
print(f"\r{' ' * last_write_len}\r", end="", file=stderr)

cmd_result.output, cmd_result.code = future.result()
cmd_result.done = True
cmd_result.report()

if cmd_result.code == 0:
completed += 1
else:
failed += 1

except TimeoutError:
pass

except KeyboardInterrupt:
interrupted = True

# Update spinner output after new report or timeout
spinner_i = (spinner_i + 1) % len(spinner_chars)
failed_str = f" ({failed} failed)" if failed else ""
out_text = f"Run group working... {spinner_chars[spinner_i]} {completed}/{total}{failed_str}"
print(f"\r{out_text}", end="", file=stderr)
stderr.flush()
last_write_len = len(out_text)

if not self.wait_all and failed > 0:
break

if interrupted:
break

if interrupted:
self.executor.shutdown(wait=False, cancel_futures=True)
print("\nRun group interrupted.")
raise KeyboardInterrupt

# Clear the spinner line before exiting
print(f"\r{' ' * last_write_len}\r", end="", file=stderr)
stderr.flush()

if failed > 0:
if self.wait_all:
print(f"\n{failed} tasks failed.")

for result in (r for r in self.futures if r.code):
result.report()

if bad:
print()
print("{} tasks failed.".format(len(bad)))
sys.exit(1)

def run(self, command):
self.tasks.append(RunCommand(command, self.context))
def _execute_command(self, command: list[str], cwd: str, env: dict):
process = subprocess.run(
command,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
check=False,
)
return process.stdout, process.returncode

def run(self, command: str):
command = self.context.expand(command)
cmd_env = self.context.environ.copy()
future = self.executor.submit(
self._execute_command,
shlex.split(command),
str(self.context.cwd),
cmd_env,
)
self.futures.append(CommandResult(command, future))