Skip to content

Add option to execute battles on specific cores #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions algobattle/battle.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class FightHandler:
_solver: Solver
_battle: "Battle"
_ui: FightUiProxy
_set_cpus: str | None = None

def _saved(self, fight: Fight) -> Fight:
self._battle.fight_results.append(fight)
Expand Down Expand Up @@ -139,6 +140,7 @@ async def run(
cpus=cpus_generator,
battle_input=generator_battle_input,
battle_output=generator_battle_output,
set_cpus=self._set_cpus,
ui=ui.generator,
)
ui.update("generator", gen_result.info)
Expand All @@ -153,6 +155,7 @@ async def run(
cpus=cpus_solver,
battle_input=solver_battle_input,
battle_output=solver_battle_output,
set_cpus=self._set_cpus,
ui=ui.solver,
)
ui.update("solver", sol_result.info)
Expand Down
23 changes: 19 additions & 4 deletions algobattle/docker_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class DockerConfig(BaseModel):

build_timeout: float | None = None
safe_build: bool = False
set_cpus: str | list[str] | None = None
generator: RunParameters = RunParameters()
solver: RunParameters = RunParameters()
advanced_run_params: "AdvancedRunArgs | None" = None
Expand Down Expand Up @@ -199,6 +200,7 @@ async def run(
timeout: float | None = None,
memory: int | None = None,
cpus: int = 1,
set_cpus: str | None = None,
ui: ProgramUiProxy | None = None,
) -> float:
"""Runs a docker image.
Expand All @@ -209,6 +211,8 @@ async def run(
timeout: Timeout in seconds.
memory: Memory limit in MB.
cpus: Number of physical cpus the container can use.
set_cpus: Which cpus to execute the container on. Either a comma separated list or a hyphen-separated range.
A value of `None` means the container can use any core (but still only `cpus` many of them).
ui: Interface to update the ui with new data about the executing program.

Raises:
Expand All @@ -222,8 +226,8 @@ async def run(
"""
name = f"algobattle_{uuid1().hex[:8]}"
if memory is not None:
memory = int(memory * 1000000)
cpus = int(cpus * 1000000000)
memory = memory * 1_000_000
cpus = cpus * 1_000_000_000

mounts = []
if input_dir is not None:
Expand All @@ -243,6 +247,7 @@ async def run(
nano_cpus=cpus,
detach=True,
mounts=mounts,
cpuset_cpus=set_cpus,
**self.run_kwargs,
),
)
Expand Down Expand Up @@ -410,6 +415,7 @@ async def _run(
cpus: int = ...,
battle_input: Encodable | None = None,
battle_output: type[Encodable] | None = None,
set_cpus: str | None = None,
ui: ProgramUiProxy | None = None,
) -> GeneratorResult | SolverResult:
"""Execute the program, processing input and output data."""
Expand Down Expand Up @@ -450,7 +456,9 @@ async def _run(
)

try:
runtime = await self.image.run(input, output, timeout=timeout, memory=space, cpus=cpus, ui=ui)
runtime = await self.image.run(
input, output, timeout=timeout, memory=space, cpus=cpus, ui=ui, set_cpus=set_cpus
)
except ExecutionError as e:
return result_class(
ProgramRunInfo(
Expand Down Expand Up @@ -566,6 +574,7 @@ async def run(
cpus: int = ...,
battle_input: Encodable | None = None,
battle_output: type[Encodable] | None = None,
set_cpus: str | None = None,
ui: ProgramUiProxy | None = None,
) -> GeneratorResult:
"""Executes the generator and parses its output into a problem instance.
Expand All @@ -577,6 +586,8 @@ async def run(
cpus: Number of physical cpus the generator can use.
battle_input: Additional data that will be given to the generator.
battle_output: Class that will be used to parse additional data the generator outputs.
set_cpus: Which cpus to execute the container on. Either a comma separated list or a hyphen-separated range.
A value of `None` means the container can use any core (but still only `cpus` many of them).
ui: Interface the program execution uses to update the ui.

Returns:
Expand All @@ -592,6 +603,7 @@ async def run(
cpus=cpus,
battle_input=battle_input,
battle_output=battle_output,
set_cpus=set_cpus,
ui=ui,
),
)
Expand Down Expand Up @@ -632,6 +644,7 @@ async def run(
cpus: int = ...,
battle_input: Encodable | None = None,
battle_output: type[Encodable] | None = None,
set_cpus: str | None = None,
ui: ProgramUiProxy | None = None,
) -> SolverResult:
"""Executes the solver on the given problem instance and parses its output into a problem solution.
Expand All @@ -643,6 +656,8 @@ async def run(
cpus: Number of physical cpus the solver can use.
battle_input: Additional data that will be given to the solver.
battle_output: Class that will be used to parse additional data the solver outputs.
set_cpus: Which cpus to execute the container on. Either a comma separated list or a hyphen-separated range.
A value of `None` means the container can use any core (but still only `cpus` many of them).
ui: Interface the program execution uses to update the ui.

Returns:
Expand All @@ -658,6 +673,7 @@ async def run(
cpus=cpus,
battle_input=battle_input,
battle_output=battle_output,
set_cpus=set_cpus,
ui=ui,
),
)
Expand Down Expand Up @@ -710,7 +726,6 @@ class _UlimitArgs(TypedDict):
cpu_rt_period: int | None = None
cpu_rt_runtime: int | None = None
cpu_shares: int | None = None
cpuset_cpus: str | None = None
cpuset_mems: str | None = None
device_cgroup_rules: list[str] | None = None
device_read_bps: list[_DeviceRate] | None = None
Expand Down
28 changes: 23 additions & 5 deletions algobattle/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from itertools import combinations
from pathlib import Path
import tomllib
from typing import Mapping, Self, overload
from typing import Mapping, Self, cast, overload

from pydantic import validator, Field
from anyio import create_task_group, CapacityLimiter, TASK_STATUS_IGNORED
Expand Down Expand Up @@ -49,6 +49,14 @@ def val_battle_configs(cls, vals):
out[name] = battle_cls.BattleConfig.parse_obj(data)
return out

@validator("docker")
def val_set_cpus(cls, v: DockerConfig, values) -> DockerConfig:
"""Validates that each battle that is being executed is assigned some cpu cores."""
if isinstance(v.set_cpus, list) and values["parallel_battles"] > len(v.set_cpus):
raise ValueError("Number of parallel battles exceeds the number of set_cpu specifier strings.")
else:
return v

@classmethod
def from_file(cls, file: Path) -> Self:
"""Parses a config object from a toml file."""
Expand All @@ -75,16 +83,20 @@ async def _run_battle(
matchup: Matchup,
config: Battle.BattleConfig,
problem: type[Problem],
cpus: list[str | None],
ui: "Ui",
limiter: CapacityLimiter,
*,
task_status: TaskStatus = TASK_STATUS_IGNORED,
) -> None:
async with limiter:
set_cpus = cpus.pop()
ui.start_battle(matchup)
task_status.started()
battle_ui = ui.get_battle_observer(matchup)
handler = FightHandler(matchup.generator.generator, matchup.solver.solver, battle, battle_ui.fight_ui)
handler = FightHandler(
matchup.generator.generator, matchup.solver.solver, battle, battle_ui.fight_ui, set_cpus
)
try:
await battle.run_battle(
handler,
Expand All @@ -94,8 +106,8 @@ async def _run_battle(
)
except Exception as e:
battle.run_exception = str_with_traceback(e)
finally:
ui.battle_completed(matchup)
cpus.append(set_cpus)
ui.battle_completed(matchup)

@classmethod
async def run(
Expand All @@ -122,6 +134,7 @@ async def run(
Image.run_kwargs = config.docker.advanced_run_params.to_docker_args()
if config.docker.advanced_build_params is not None:
Image.run_kwargs = config.docker.advanced_build_params.to_docker_args()

with TeamHandler.build(config.teams, problem, config.docker) as teams:
result = cls(
active_teams=[t.name for t in teams.active],
Expand All @@ -132,11 +145,16 @@ async def run(
battle_config = config.battle[config.battle_type]
limiter = CapacityLimiter(config.parallel_battles)
current_default_thread_limiter().total_tokens = config.parallel_battles
set_cpus = config.docker.set_cpus
if isinstance(set_cpus, list):
match_cpus = cast(list[str | None], set_cpus[: config.parallel_battles])
else:
match_cpus = [set_cpus] * config.parallel_battles
async with create_task_group() as tg:
for matchup in teams.matchups:
battle = battle_cls()
result.results[matchup.generator.name][matchup.solver.name] = battle
await tg.start(result._run_battle, battle, matchup, battle_config, problem, ui, limiter)
await tg.start(result._run_battle, battle, matchup, battle_config, problem, match_cpus, ui, limiter)
return result

@overload
Expand Down