Skip to content

[ADD] Forkserver as default multiprocessing strategy #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion autoPyTorch/api/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
setup_logger,
start_log_server,
)
from autoPyTorch.utils.parallel import preload_modules
from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements
from autoPyTorch.utils.single_thread_client import SingleThreadedClient
from autoPyTorch.utils.stopwatch import StopWatch


Expand Down Expand Up @@ -190,7 +192,16 @@ def __init__(

self.stop_logging_server = None # type: Optional[multiprocessing.synchronize.Event]

# Single core, local runs should use fork
# to prevent the __main__ requirements in
# examples. Nevertheless, multi-process runs
# have spawn as requirement to reduce the
# possibility of a deadlock
self._dask_client = None
self._multiprocessing_context = 'forkserver'
if self.n_jobs == 1:
self._multiprocessing_context = 'fork'
self._dask_client = SingleThreadedClient()

self.search_space_updates = search_space_updates
if search_space_updates is not None:
Expand Down Expand Up @@ -300,7 +311,8 @@ def _get_logger(self, name: str) -> PicklableClientLogger:
# under the above logging configuration setting
# We need to specify the logger_name so that received records
# are treated under the logger_name ROOT logger setting
context = multiprocessing.get_context('spawn')
context = multiprocessing.get_context(self._multiprocessing_context)
preload_modules(context)
self.stop_logging_server = context.Event()
port = context.Value('l') # be safe by using a long
port.value = -1
Expand Down Expand Up @@ -505,6 +517,7 @@ def _do_dummy_prediction(self) -> None:
stats = Stats(scenario_mock)
stats.start_timing()
ta = ExecuteTaFuncWithQueue(
pynisher_context=self._multiprocessing_context,
backend=self._backend,
seed=self.seed,
metric=self._metric,
Expand Down Expand Up @@ -599,6 +612,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs:
stats = Stats(scenario_mock)
stats.start_timing()
ta = ExecuteTaFuncWithQueue(
pynisher_context=self._multiprocessing_context,
backend=self._backend,
seed=self.seed,
metric=self._metric,
Expand Down Expand Up @@ -929,6 +943,7 @@ def _search(
random_state=self.seed,
precision=precision,
logger_port=self._logger_port,
pynisher_context=self._multiprocessing_context,
)
self._stopwatch.stop_task(ensemble_task_name)

Expand Down Expand Up @@ -969,6 +984,7 @@ def _search(
start_num_run=self._backend.get_next_num_run(peek=True),
search_space_updates=self.search_space_updates,
portfolio_selection=portfolio_selection,
pynisher_context=self._multiprocessing_context,
)
try:
run_history, self.trajectory, budget_type = \
Expand Down Expand Up @@ -1299,5 +1315,6 @@ def _print_debug_info_to_log(self) -> None:
self._logger.debug(' System: %s', platform.system())
self._logger.debug(' Machine: %s', platform.machine())
self._logger.debug(' Platform: %s', platform.platform())
self._logger.debug(' multiprocessing_context: %s', str(self._multiprocessing_context))
for key, value in vars(self).items():
self._logger.debug(f"\t{key}->{value}")
11 changes: 8 additions & 3 deletions autoPyTorch/ensemble/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from autoPyTorch.pipeline.components.training.metrics.base import autoPyTorchMetric
from autoPyTorch.pipeline.components.training.metrics.utils import calculate_loss, calculate_score
from autoPyTorch.utils.logging_ import get_named_client_logger
from autoPyTorch.utils.parallel import preload_modules

Y_ENSEMBLE = 0
Y_TEST = 1
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
ensemble_memory_limit: Optional[int],
random_state: int,
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
pynisher_context: str = 'fork',
):
""" SMAC callback to handle ensemble building
Args:
Expand Down Expand Up @@ -111,6 +113,8 @@ def __init__(
read at most n new prediction files in each iteration
logger_port: int
port in where to publish a msg
pynisher_context: str
The multiprocessing context for pynisher. One of spawn/fork/forkserver.

Returns:
List[Tuple[int, float, float, float]]:
Expand All @@ -135,6 +139,7 @@ def __init__(
self.ensemble_memory_limit = ensemble_memory_limit
self.random_state = random_state
self.logger_port = logger_port
self.pynisher_context = pynisher_context

# Store something similar to SMAC's runhistory
self.history = [] # type: List[Dict[str, float]]
Expand All @@ -160,7 +165,6 @@ def __call__(
def build_ensemble(
self,
dask_client: dask.distributed.Client,
pynisher_context: str = 'spawn',
unit_test: bool = False
) -> None:

Expand Down Expand Up @@ -236,7 +240,7 @@ def build_ensemble(
iteration=self.iteration,
return_predictions=False,
priority=100,
pynisher_context=pynisher_context,
pynisher_context=self.pynisher_context,
logger_port=self.logger_port,
unit_test=unit_test,
))
Expand Down Expand Up @@ -585,11 +589,11 @@ def __init__(
def run(
self,
iteration: int,
pynisher_context: str,
time_left: Optional[float] = None,
end_at: Optional[float] = None,
time_buffer: int = 5,
return_predictions: bool = False,
pynisher_context: str = 'spawn', # only change for unit testing!
) -> Tuple[
List[Dict[str, float]],
int,
Expand Down Expand Up @@ -655,6 +659,7 @@ def run(
if wall_time_in_s < 1:
break
context = multiprocessing.get_context(pynisher_context)
preload_modules(context)

safe_ensemble_script = pynisher.enforce_limits(
wall_time_in_s=wall_time_in_s,
Expand Down
48 changes: 25 additions & 23 deletions autoPyTorch/evaluation/tae.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from autoPyTorch.utils.common import replace_string_bool_to_bool
from autoPyTorch.utils.hyperparameter_search_space_update import HyperparameterSearchSpaceUpdates
from autoPyTorch.utils.logging_ import PicklableClientLogger, get_named_client_logger
from autoPyTorch.utils.parallel import preload_modules


def fit_predict_try_except_decorator(
Expand Down Expand Up @@ -92,29 +93,29 @@ class ExecuteTaFuncWithQueue(AbstractTAFunc):
"""

def __init__(
self,
backend: Backend,
seed: int,
metric: autoPyTorchMetric,
cost_for_crash: float,
abort_on_first_run_crash: bool,
pipeline_config: typing.Optional[typing.Dict[str, typing.Any]] = None,
initial_num_run: int = 1,
stats: typing.Optional[Stats] = None,
run_obj: str = 'quality',
par_factor: int = 1,
output_y_hat_optimization: bool = True,
include: typing.Optional[typing.Dict[str, typing.Any]] = None,
exclude: typing.Optional[typing.Dict[str, typing.Any]] = None,
memory_limit: typing.Optional[int] = None,
disable_file_output: bool = False,
init_params: typing.Dict[str, typing.Any] = None,
budget_type: str = None,
ta: typing.Optional[typing.Callable] = None,
logger_port: int = None,
all_supported_metrics: bool = True,
pynisher_context: str = 'spawn',
search_space_updates: typing.Optional[HyperparameterSearchSpaceUpdates] = None
self,
backend: Backend,
seed: int,
metric: autoPyTorchMetric,
cost_for_crash: float,
abort_on_first_run_crash: bool,
pynisher_context: str,
pipeline_config: typing.Optional[typing.Dict[str, typing.Any]] = None,
initial_num_run: int = 1,
stats: typing.Optional[Stats] = None,
run_obj: str = 'quality',
par_factor: int = 1,
output_y_hat_optimization: bool = True,
include: typing.Optional[typing.Dict[str, typing.Any]] = None,
exclude: typing.Optional[typing.Dict[str, typing.Any]] = None,
memory_limit: typing.Optional[int] = None,
disable_file_output: bool = False,
init_params: typing.Dict[str, typing.Any] = None,
budget_type: str = None,
ta: typing.Optional[typing.Callable] = None,
logger_port: int = None,
all_supported_metrics: bool = True,
search_space_updates: typing.Optional[HyperparameterSearchSpaceUpdates] = None
):

eval_function = autoPyTorch.evaluation.train_evaluator.eval_function
Expand Down Expand Up @@ -249,6 +250,7 @@ def run(
) -> typing.Tuple[StatusType, float, float, typing.Dict[str, typing.Any]]:

context = multiprocessing.get_context(self.pynisher_context)
preload_modules(context)
queue: multiprocessing.queues.Queue = context.Queue()

if not (instance_specific is None or instance_specific == '0'):
Expand Down
9 changes: 7 additions & 2 deletions autoPyTorch/optimizer/smbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def __init__(self,
ensemble_callback: typing.Optional[EnsembleBuilderManager] = None,
logger_port: typing.Optional[int] = None,
search_space_updates: typing.Optional[HyperparameterSearchSpaceUpdates] = None,
portfolio_selection: typing.Optional[str] = None
portfolio_selection: typing.Optional[str] = None,
pynisher_context: str = 'spawn',
):
"""
Interface to SMAC. This method calls the SMAC optimize method, and allows
Expand Down Expand Up @@ -156,6 +157,8 @@ def __init__(self,
Additional arguments to the smac scenario
get_smac_object_callback (typing.Optional[typing.Callable]):
Allows to create a user specified SMAC object
pynisher_context (str):
A string indicating the multiprocessing context to use
ensemble_callback (typing.Optional[EnsembleBuilderManager]):
A callback used in this scenario to start ensemble building subtasks
portfolio_selection (str), (default=None):
Expand Down Expand Up @@ -204,6 +207,7 @@ def __init__(self,
self.disable_file_output = disable_file_output
self.smac_scenario_args = smac_scenario_args
self.get_smac_object_callback = get_smac_object_callback
self.pynisher_context = pynisher_context

self.ensemble_callback = ensemble_callback

Expand Down Expand Up @@ -274,7 +278,8 @@ def run_smbo(self, func: typing.Optional[typing.Callable] = None
logger_port=self.logger_port,
all_supported_metrics=self.all_supported_metrics,
pipeline_config=self.pipeline_config,
search_space_updates=self.search_space_updates
search_space_updates=self.search_space_updates,
pynisher_context=self.pynisher_context,
)
ta = ExecuteTaFuncWithQueue
self.logger.info("Created TA")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import os
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The series of changes you made should be achieved by writing the following in each __init__.py:

from autoPyTorch.pipeline.components.<something 1>.<something 2>.<something 3> import base_<something 4>_choice

<something 5>Choice = base_<something 4>_choice.<something 5>Choice

If there are any intentions behind the changes, let me know the reasons as a response here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this in the PR description, but let me state it here again. The problem is that forkserver, when communicating the pre-loaded modules from the parent process to the forked server client (the entity that spawns new process) seems to require the __init__.pyhas the choices defined on it. I made this change to mimic how the pipeline is implemented in auto-sklearn.

Defining the choice in the init is not sufficient, as the problem is how python organized the loading of the modules. Yes, maybe we could do some intelligent sourcing the choices via relative paths, but the criteria that I follow was to be similar to auto-sklearn so that we share both the same code (and solution to problems).

Copy link
Collaborator

@nabenabe0928 nabenabe0928 May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the response and sorry for the late reply.

I made this change to mimic how the pipeline is implemented in auto-sklearn.

It is not a good practice to write many codes in __init__.py.
We should mimic good practices, but not bad practices.

Defining the choice in the init is not sufficient

Could you give me the error you got when you run?
I do not get any errors when I modify the codes in the development branch accordingly.

from collections import OrderedDict
from typing import Any, Dict, List, Optional

import ConfigSpace.hyperparameters as CSH
from ConfigSpace.configuration_space import ConfigurationSpace

from autoPyTorch.pipeline.components.base_choice import autoPyTorchChoice
from autoPyTorch.pipeline.components.base_component import (
ThirdPartyComponents,
autoPyTorchComponent,
find_components,
)
from autoPyTorch.pipeline.components.preprocessing.image_preprocessing.normalise.base_normalizer import BaseNormalizer


normalise_directory = os.path.split(__file__)[0]
_normalizers = find_components(__package__,
normalise_directory,
BaseNormalizer)

_addons = ThirdPartyComponents(BaseNormalizer)


def add_normalizer(normalizer: BaseNormalizer) -> None:
_addons.add_component(normalizer)


class NormalizerChoice(autoPyTorchChoice):
"""
Allows for dynamically choosing normalizer component at runtime
"""

def get_components(self) -> Dict[str, autoPyTorchComponent]:
"""Returns the available normalizer components

Args:
None

Returns:
Dict[str, autoPyTorchComponent]: all BaseNormalizer components available
as choices for encoding the categorical columns
"""
components = OrderedDict()
components.update(_normalizers)
components.update(_addons.components)
return components

def get_hyperparameter_search_space(self,
dataset_properties: Optional[Dict[str, Any]] = None,
default: Optional[str] = None,
include: Optional[List[str]] = None,
exclude: Optional[List[str]] = None) -> ConfigurationSpace:
cs = ConfigurationSpace()

if dataset_properties is None:
dataset_properties = dict()

dataset_properties = {**self.dataset_properties, **dataset_properties}

available_preprocessors = self.get_available_components(dataset_properties=dataset_properties,
include=include,
exclude=exclude)

if len(available_preprocessors) == 0:
raise ValueError("no image normalizers found, please add an image normalizer")

if default is None:
defaults = ['ImageNormalizer', 'NoNormalizer']
for default_ in defaults:
if default_ in available_preprocessors:
if include is not None and default_ not in include:
continue
if exclude is not None and default_ in exclude:
continue
default = default_
break

updates = self._get_search_space_updates()
if '__choice__' in updates.keys():
choice_hyperparameter = updates['__choice__']
if not set(choice_hyperparameter.value_range).issubset(available_preprocessors):
raise ValueError("Expected given update for {} to have "
"choices in {} got {}".format(self.__class__.__name__,
available_preprocessors,
choice_hyperparameter.value_range))
preprocessor = CSH.CategoricalHyperparameter('__choice__',
choice_hyperparameter.value_range,
default_value=choice_hyperparameter.default_value)
else:
preprocessor = CSH.CategoricalHyperparameter('__choice__',
list(available_preprocessors.keys()),
default_value=default)
cs.add_hyperparameter(preprocessor)

# add only child hyperparameters of preprocessor choices
for name in preprocessor.choices:
preprocessor_configuration_space = available_preprocessors[name].\
get_hyperparameter_search_space(dataset_properties)
parent_hyperparameter = {'parent': preprocessor, 'value': name}
cs.add_configuration_space(name, preprocessor_configuration_space,
parent_hyperparameter=parent_hyperparameter)

self.configuration_space = cs
self.dataset_properties = dataset_properties
return cs
Loading