Skip to content

basic log streaming #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e47c02d
basic log streaming
pmcurtin Jul 22, 2024
ce8480c
change logging structure
pmcurtin Jul 23, 2024
9c5e804
improvements to worker logging
pmcurtin Jul 24, 2024
c794bbd
hostnames instead of agent-i
pmcurtin Jul 24, 2024
6f109ed
types and formatting
pmcurtin Jul 24, 2024
5bb1933
format, again...
pmcurtin Jul 24, 2024
930dfa2
fix ci test
pmcurtin Jul 24, 2024
46e93cf
format
pmcurtin Jul 24, 2024
c466254
refactoring
pmcurtin Jul 31, 2024
b366b32
stdout logging
pmcurtin Jul 31, 2024
1c3f8f9
create log directory, fix CI test
pmcurtin Aug 12, 2024
5c84428
docstring, switch log naming
pmcurtin Aug 13, 2024
621c116
fix docstring
pmcurtin Aug 13, 2024
777917e
docstrings and docs
pmcurtin Aug 16, 2024
83231b6
use autodoc
pmcurtin Aug 16, 2024
d2aec56
final docs fix for logging
pmcurtin Aug 16, 2024
7b6da6a
Merge branch 'main' into logging
pmcurtin Aug 16, 2024
acef4fb
logging cosmetics
apoorvkh Aug 17, 2024
444b461
refactoring
apoorvkh Aug 17, 2024
707c3e4
fix test
pmcurtin Aug 17, 2024
ce1c067
LogSpec -> LogMap
pmcurtin Aug 17, 2024
ee8e6e8
switch to LogMap in CI test
pmcurtin Aug 17, 2024
ce11416
updates to LogMap api
apoorvkh Aug 18, 2024
5e973d9
adding mapping to logmap
apoorvkh Aug 18, 2024
2e4fd71
LogMap implements __and__
apoorvkh Aug 20, 2024
752598f
environment.Auto class
apoorvkh Aug 20, 2024
ab79473
changes to streamed log record handling
apoorvkh Aug 22, 2024
c7532e1
dummy auto builder for logging handlers
apoorvkh Aug 23, 2024
aefdd4d
Update logging_utils.py
apoorvkh Aug 23, 2024
9ed5e68
Merge branch 'main' of github.com:apoorvkh/torchrunx into logging
apoorvkh Aug 23, 2024
bea3276
ignore vscode configs
pmcurtin Aug 23, 2024
2e4b3be
fix CI test. make default handlers
pmcurtin Aug 23, 2024
5ed20b6
fix formatting and typing
pmcurtin Aug 23, 2024
e99c91a
removed "Auto" class, just using Literal["auto"]
apoorvkh Aug 30, 2024
b8b5445
adjustments to logging
apoorvkh Aug 31, 2024
66cea38
add agent stream capture, flushing
pmcurtin Sep 2, 2024
5397fd6
StreamLogger to LoggingStream(StringIO)
apoorvkh Sep 2, 2024
4baa151
use default ruff lint rules
apoorvkh Sep 2, 2024
1c5e86e
extend-select import linting rules
apoorvkh Sep 2, 2024
92fe35c
linting
apoorvkh Sep 2, 2024
5cd1385
added log level for std streams
apoorvkh Sep 2, 2024
a640c2a
ThreadingTCPServer to TCPServer
apoorvkh Sep 2, 2024
805a285
refactoring handler build functions
apoorvkh Sep 3, 2024
61451c3
switch to Filter class
apoorvkh Sep 3, 2024
a13f23a
more refactoring for logging utils
apoorvkh Sep 3, 2024
abc7267
env variables in launcher for controlling logging
apoorvkh Sep 3, 2024
90e032e
using pathlib
apoorvkh Sep 3, 2024
f744391
overriding shutdown() with timeout
apoorvkh Sep 3, 2024
a0f106c
Merge branch 'main' of github.com:apoorvkh/torchrunx into logging
apoorvkh Sep 3, 2024
ad7456b
linting and typing
apoorvkh Sep 3, 2024
1d42ccd
fix CI test
pmcurtin Sep 5, 2024
7df0948
DEBUG -> NOTSET
pmcurtin Sep 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
torchrunx_logs/
.pixi/
logs/
test_logs/
_build/
out/
output/
.ruff_cache/
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
16 changes: 12 additions & 4 deletions docs/source/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,29 @@ In addition to ``torchrunx.launch``, we provide the ``torchrunx.Launcher`` datac
launcher.run(distributed_function, {})

.. autoclass:: torchrunx.Launcher

.. autofunction:: torchrunx.Launcher.run
:members:
.. .. autofunction:: torchrunx.Launcher.run

Logging
-------

All logs are generated in the folder provided as the ``logs`` argument to :mod:`torchrunx.launch`. Each worker agent generates a log, named based on the current date and time, followed by the agent hostname. Each worker also has a log, named identically to their agent's log file except for the addition of the worker's local rank at the end of the name. Each agent includes the output from local worker 0 in its log. The launcher renders agent 0's log to ``stdout`` in real time.
Logs are generated at the worker and agent level, and are specified to :mod:`torchrunx.launch` via the ``log_spec`` argument. By default, a :mod:`torchrunx.DefaultLogSpec` is instantiated, causing logs at the worker and agent levels to be logged to files under ``'./logs'``, and the rank 0 worker's output streams are streamed to the launcher ``stdout``. Logs are prefixed with a timestamp by default. Agent logs have the format ``{timestamp}-{agent hostname}.log`` and workers have the format ``{timestamp}-{agent hostname}[{worker local rank}].log``.

Custom logging classes can be subclassed from the :mod:`torchrunx.LogSpec` class. Any subclass must have a ``get_map`` method returning a dictionary mapping logger names to lists of :mod:`logging.Handler` objects, in order to be passed to :mod:`torchrunx.launch`. The logger names are of the format ``{agent hostname}`` for agents and ``{agent hostname}[{worker local rank}]`` for workers. The :mod:`torchrunx.DefaultLogSpec` maps all the loggers to :mod:`logging.Filehandler` object pointing to the files mentioned in the previous paragraph. It additionally maps the global rank 0 worker to a :mod:`logging.StreamHandler`, which writes logs the launcher's ``stdout`` stream.

.. autoclass:: torchrunx.LogSpec
:members:

.. autoclass:: torchrunx.DefaultLogSpec
:members:

..
TODO: example log structure

Worker environment
------------------

The :mod:`torchrunx.launch` ``env_vars`` argument allows the user to specify which evnironmental variables should be copied to the agents from the launcher environment. By default, it attempts to copy variables related to Python and important packages/technologies that **torchrunx** uses such as PyTorch, NCCL, CUDA, and more. Strings provided are matched with the names of environmental variables using ``fnmatch`` - standard UNIX filename pattern matching. The variables are inserted into the agent environments, and then copied to workers' environments when they are spawned.
The :mod:`torchrunx.launch` ``env_vars`` argument allows the user to specify which environmental variables should be copied to the agents from the launcher environment. By default, it attempts to copy variables related to Python and important packages/technologies that **torchrunx** uses such as PyTorch, NCCL, CUDA, and more. Strings provided are matched with the names of environmental variables using ``fnmatch`` - standard UNIX filename pattern matching. The variables are inserted into the agent environments, and then copied to workers' environments when they are spawned.

:mod:`torchrunx.launch` also accepts the ``env_file`` argument, which is designed to expose more advanced environmental configuration to the user. When a file is provided as this argument, the launcher will source the file on each node before executing the agent. This allows for custom bash scripts to be provided in the environmental variables, and allows for node-specific environmental variables to be set.

Expand Down
637 changes: 341 additions & 296 deletions pixi.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ include = ["pyproject.toml", "src/**/*.py", "tests/**/*.py"]
line-length = 100
src = ["src", "tests"]
[tool.ruff.lint]
select = ["E", "F", "I"]
extend-select = ["I"]

[tool.pyright]
include = ["src", "tests"]
Expand Down
6 changes: 4 additions & 2 deletions src/torchrunx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .environment import auto_hosts, auto_workers, slurm_hosts, slurm_workers
from .launcher import Launcher, launch

__all__ = ["Launcher", "launch", "slurm_hosts", "slurm_workers", "auto_hosts", "auto_workers"]
__all__ = [
"Launcher",
"launch",
]
7 changes: 6 additions & 1 deletion src/torchrunx/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
parser = ArgumentParser()
parser.add_argument("--launcher-hostname", type=str)
parser.add_argument("--launcher-port", type=int)
parser.add_argument("--logger-port", type=int)
parser.add_argument("--world-size", type=int)
parser.add_argument("--rank", type=int)
args = parser.parse_args()
Expand All @@ -18,4 +19,8 @@
rank=args.rank,
)

main(launcher_agent_group)
main(
launcher_agent_group,
logger_hostname=args.launcher_hostname,
logger_port=args.logger_port,
)
147 changes: 82 additions & 65 deletions src/torchrunx/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import datetime
import logging
import os
import socket
import sys
Expand All @@ -14,6 +15,7 @@
from torch.distributed.elastic.multiprocessing import start_processes
from typing_extensions import Self

from .logging_utils import log_records_to_socket, redirect_stdio_to_logger
from .utils import (
AgentPayload,
AgentStatus,
Expand All @@ -26,14 +28,16 @@
@dataclass
class WorkerArgs:
function: Callable
master_hostname: str
master_port: int
logger_hostname: str
logger_port: int
main_agent_hostname: str
main_agent_port: int
backend: Literal["mpi", "gloo", "nccl", "ucc", None]
rank: int
local_rank: int
local_world_size: int
world_size: int
log_file: os.PathLike
hostname: str
timeout: int

def to_bytes(self) -> bytes:
Expand All @@ -44,114 +48,125 @@ def from_bytes(cls, serialized: bytes) -> Self:
return cloudpickle.loads(serialized)


class WorkerTee(object):
def __init__(self, name: os.PathLike | str, mode: str):
self.file = open(name, mode)
self.stdout = sys.stdout
sys.stdout = self
def entrypoint(serialized_worker_args: bytes):
worker_args = WorkerArgs.from_bytes(serialized_worker_args)

def __enter__(self):
return self
logger = logging.getLogger()

def __exit__(self, exception_type, exception_value, exception_traceback):
self.__del__()
log_records_to_socket(
logger=logger,
hostname=worker_args.hostname,
worker_rank=worker_args.local_rank,
logger_hostname=worker_args.logger_hostname,
logger_port=worker_args.logger_port,
)

def __del__(self):
sys.stdout = self.stdout
self.file.close()
redirect_stdio_to_logger(logger)

def write(self, data):
self.file.write(data)
self.stdout.write(data)
store = dist.TCPStore( # pyright: ignore[reportPrivateImportUsage]
host_name=worker_args.main_agent_hostname,
port=worker_args.main_agent_port,
world_size=worker_args.world_size,
is_master=(worker_args.rank == 0),
)

def flush(self):
self.file.flush()
backend = worker_args.backend
if backend is None:
backend = "nccl" if torch.cuda.is_available() else "gloo"

logger.debug(f"using backend: {backend}")

def entrypoint(serialized_worker_args: bytes):
worker_args = WorkerArgs.from_bytes(serialized_worker_args)
dist.init_process_group(
backend=backend,
world_size=worker_args.world_size,
rank=worker_args.rank,
store=store,
timeout=datetime.timedelta(seconds=worker_args.timeout),
)

os.environ["RANK"] = str(worker_args.rank)
os.environ["LOCAL_RANK"] = str(worker_args.local_rank)
os.environ["LOCAL_WORLD_SIZE"] = str(worker_args.local_world_size)
os.environ["WORLD_SIZE"] = str(worker_args.world_size)
os.environ["MASTER_ADDR"] = worker_args.main_agent_hostname
os.environ["MASTER_PORT"] = str(worker_args.main_agent_port)

logger.debug(f"executing function: {worker_args.function}")

with WorkerTee(worker_args.log_file, "w"):
store = dist.TCPStore( # pyright: ignore[reportPrivateImportUsage]
host_name=worker_args.master_hostname,
port=worker_args.master_port,
world_size=worker_args.world_size,
is_master=(worker_args.rank == 0),
)

backend = worker_args.backend
if backend is None:
backend = "nccl" if torch.cuda.is_available() else "gloo"
dist.init_process_group(
backend=backend,
world_size=worker_args.world_size,
rank=worker_args.rank,
store=store,
timeout=datetime.timedelta(seconds=worker_args.timeout),
)

os.environ["RANK"] = str(worker_args.rank)
os.environ["LOCAL_RANK"] = str(worker_args.local_rank)
os.environ["LOCAL_WORLD_SIZE"] = str(worker_args.local_world_size)
os.environ["WORLD_SIZE"] = str(worker_args.world_size)
os.environ["MASTER_ADDR"] = worker_args.master_hostname
os.environ["MASTER_PORT"] = str(worker_args.master_port)

return worker_args.function()


def main(launcher_agent_group: LauncherAgentGroup):
r = worker_args.function()

# flush streams
sys.stdout.flush()
sys.stderr.flush()

return r


def main(launcher_agent_group: LauncherAgentGroup, logger_hostname: str, logger_port: int):
agent_rank = launcher_agent_group.rank - 1

payload = AgentPayload(
hostname=socket.getfqdn(),
port=get_open_port(),
process_id=os.getpid(),
)
# DefaultLogsSpecs(log_dir=None, tee=Std.ALL, local_ranks_filter={0}),

all_payloads = launcher_agent_group.sync_payloads(payload=payload)
launcher_payload: LauncherPayload = all_payloads[0] # pyright: ignore[reportAssignmentType]
main_agent_payload: AgentPayload = all_payloads[1] # pyright: ignore[reportAssignmentType]

hostname = launcher_payload.hostnames[agent_rank]
worker_world_size = launcher_payload.worker_world_size
worker_global_ranks = launcher_payload.worker_global_ranks[agent_rank]
worker_log_files = launcher_payload.worker_log_files[agent_rank]
num_workers = len(worker_global_ranks)

logger = logging.getLogger()

log_records_to_socket(
logger=logger,
hostname=hostname,
worker_rank=None,
logger_hostname=logger_hostname,
logger_port=logger_port,
)

redirect_stdio_to_logger(logger)

if torch.__version__ >= "2.3":
# DefaultLogsSpecs only exists in torch >= 2.3
from torch.distributed.elastic.multiprocessing import DefaultLogsSpecs

log_arg = DefaultLogsSpecs(log_dir=tempfile.mkdtemp())
log_kwargs = {"logs_specs": DefaultLogsSpecs(log_dir=tempfile.mkdtemp())}
else:
log_arg = tempfile.mkdtemp()
log_kwargs = {"log_dir": tempfile.mkdtemp()}

# spawn workers

ctx = start_processes(
f"{hostname}_",
entrypoint,
{
name=f"{hostname}_",
entrypoint=entrypoint,
args={
i: (
WorkerArgs(
function=launcher_payload.fn,
master_hostname=main_agent_payload.hostname,
master_port=main_agent_payload.port,
logger_hostname=logger_hostname,
logger_port=logger_port,
main_agent_hostname=main_agent_payload.hostname,
main_agent_port=main_agent_payload.port,
backend=launcher_payload.backend,
rank=worker_global_ranks[i],
local_rank=i,
local_world_size=num_workers,
world_size=worker_world_size,
log_file=worker_log_files[i],
hostname=launcher_payload.hostnames[agent_rank],
timeout=launcher_payload.timeout,
).to_bytes(),
)
for i in range(num_workers)
},
{i: {} for i in range(num_workers)},
log_arg, # type: ignore
envs={i: {} for i in range(num_workers)},
**log_kwargs, # pyright: ignore [reportArgumentType]
)
logger.info("starting processes")

try:
status = AgentStatus()
Expand All @@ -172,3 +187,5 @@ def main(launcher_agent_group: LauncherAgentGroup):
raise
finally:
ctx.close()
sys.stdout.flush()
sys.stderr.flush()
2 changes: 1 addition & 1 deletion src/torchrunx/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def slurm_workers() -> int:
# TODO: is it possible to allocate uneven GPUs across nodes?
return len(os.environ["SLURM_JOB_GPUS"].split(","))
elif "SLURM_GPUS_PER_NODE" in os.environ:
return int(os.environ['SLURM_GPUS_PER_NODE'])
return int(os.environ["SLURM_GPUS_PER_NODE"])
else:
# TODO: should we assume that we plan to do one worker per CPU?
return int(os.environ["SLURM_CPUS_ON_NODE"])
Expand Down
Loading
Loading