From fadd1ba4a1b27d54c9f985723a4d4e79702639bc Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Mon, 15 Aug 2022 15:05:34 +0200 Subject: [PATCH 01/18] add parallel model runner and update running traditional classifiers --- autoPyTorch/api/base_task.py | 214 +++++++++---------- autoPyTorch/evaluation/abstract_evaluator.py | 4 + autoPyTorch/utils/parallel_model_runner.py | 195 +++++++++++++++++ 3 files changed, 295 insertions(+), 118 deletions(-) create mode 100644 autoPyTorch/utils/parallel_model_runner.py diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index c5468eae7..4b02dbc00 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -69,6 +69,7 @@ start_log_server, ) from autoPyTorch.utils.parallel import preload_modules +from ..utils.parallel_model_runner import run_models_on_dataset from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements from autoPyTorch.utils.results_manager import MetricResults, ResultsManager, SearchResults from autoPyTorch.utils.results_visualizer import ColorLabelSettings, PlotSettingParams, ResultsVisualizer @@ -799,113 +800,36 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: assert self._dask_client is not None self._logger.info("Starting to create traditional classifier predictions.") - starttime = time.time() # Initialise run history for the traditional classifiers - run_history = RunHistory() memory_limit = self._memory_limit if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) available_classifiers = get_available_traditional_learners() - dask_futures = [] - - total_number_classifiers = len(available_classifiers) - for n_r, classifier in enumerate(available_classifiers): - - # Only launch a task if there is time - start_time = time.time() - if time_left >= func_eval_time_limit_secs: - self._logger.info(f"{n_r}: Started fitting {classifier} with cutoff={func_eval_time_limit_secs}") - scenario_mock = unittest.mock.Mock() - scenario_mock.wallclock_limit = time_left - # This stats object is a hack - maybe the SMAC stats object should - # already be generated here! - stats = Stats(scenario_mock) - stats.start_timing() - ta = ExecuteTaFuncWithQueue( - pynisher_context=self._multiprocessing_context, - backend=self._backend, - seed=self.seed, - multi_objectives=["cost"], - metric=self._metric, - logger_port=self._logger_port, - cost_for_crash=get_cost_of_crash(self._metric), - abort_on_first_run_crash=False, - initial_num_run=self._backend.get_next_num_run(), - stats=stats, - memory_limit=memory_limit, - disable_file_output=self._disable_file_output, - all_supported_metrics=self._all_supported_metrics, - ) - dask_futures.append([ - classifier, - self._dask_client.submit( - ta.run, config=classifier, - cutoff=func_eval_time_limit_secs, - ) - ]) - - # When managing time, we need to take into account the allocated time resources, - # which are dependent on the number of cores. 'dask_futures' is a proxy to the number - # of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most - # 4 task in parallel. Every 'cutoff' seconds, we generate up to 4 tasks. - # If we only have 4 workers and there are 4 futures in dask_futures, it means that every - # worker has a task. We would not like to launch another job until a worker is available. To this - # end, the following if-statement queries the number of active jobs, and forces to wait for a job - # completion via future.result(), so that a new worker is available for the next iteration. - if len(dask_futures) >= self.n_jobs: - - # How many workers to wait before starting fitting the next iteration - workers_to_wait = 1 - if n_r >= total_number_classifiers - 1 or time_left <= func_eval_time_limit_secs: - # If on the last iteration, flush out all tasks - workers_to_wait = len(dask_futures) - - while workers_to_wait >= 1: - workers_to_wait -= 1 - # We launch dask jobs only when there are resources available. - # This allow us to control time allocation properly, and early terminate - # the traditional machine learning pipeline - cls, future = dask_futures.pop(0) - status, cost, runtime, additional_info = future.result() - if status == StatusType.SUCCESS: - self._logger.info( - "Fitting {} took {} [sec] and got performance: {}.\n" - "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) - ) - configuration = additional_info['pipeline_configuration'] - origin = additional_info['configuration_origin'] - additional_info.pop('pipeline_configuration') - run_history.add(config=configuration, cost=cost, - time=runtime, status=status, seed=self.seed, - starttime=starttime, endtime=starttime + runtime, - origin=origin, additional_info=additional_info) - else: - if additional_info.get('exitcode') == -6: - self._logger.error( - "Traditional prediction for {} failed with run state {},\n" - "because the provided memory limits were too tight.\n" - "Please increase the 'ml_memory_limit' and try again.\n" - "If you still get the problem, please open an issue\n" - "and paste the additional info.\n" - "Additional info:\n{}".format(cls, str(status), dict_repr(additional_info)) - ) - else: - self._logger.error( - "Traditional prediction for {} failed with run state {}.\nAdditional info:\n{}".format( - cls, str(status), dict_repr(additional_info) - ) - ) - - # In the case of a serial execution, calling submit halts the run for a resource - # dynamically adjust time in this case - time_left -= int(time.time() - start_time) - - # Exit if no more time is available for a new classifier - if time_left < func_eval_time_limit_secs: - self._logger.warning("Not enough time to fit all traditional machine learning models." - "Please consider increasing the run time to further improve performance.") - break + model_configs = [(classifier, self.pipeline_options[self.pipeline_options['budget_type']]) for classifier in available_classifiers] + + run_history, _ = run_models_on_dataset( + time_left=time_left, + func_eval_time_limit_secs=func_eval_time_limit_secs, + model_configs=model_configs, + logger=self._logger, + logger_port=self._logger_port, + metric=self._metric, + dask_client=self._dask_client, + backend=self._backend, + memory_limit=self._memory_limit, + disable_file_output=self._disable_file_output, + all_supported_metrics=self._all_supported_metrics, + include=self.include_components, + exclude=self.exclude_components, + search_space_updates=self.search_space_updates, + pipeline_options=self.pipeline_options, + seed=self.seed, + multiprocessing_context=self._multiprocessing_context, + n_jobs=self.n_jobs, + current_search_space=self.search_space, + smac_initial_run=self._backend.get_next_num_run() + ) self._logger.debug("Run history traditional: {}".format(run_history)) # add run history of traditional to api run history @@ -1378,8 +1302,25 @@ def _get_fit_dictionary( def refit( self, - dataset: BaseDataset, - split_id: int = 0 + dataset: Optional[BaseDataset] = None, + X_train: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + y_train: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + X_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + y_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + dataset_name: Optional[str] = None, + resampling_strategy: Optional[Union[HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes]] = None, + resampling_strategy_args: Optional[Dict[str, Any]] = None, + run_time_limit_secs: int = 60, + memory_limit: Optional[int] = None, + eval_metric: Optional[str] = None, + all_supported_metrics: bool = False, + budget_type: Optional[str] = None, + include_components: Optional[Dict[str, Any]] = None, + exclude_components: Optional[Dict[str, Any]] = None, + search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None, + budget: Optional[float] = None, + pipeline_options: Optional[Dict] = None, + disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, ) -> "BaseTask": """ Refit all models found with fit to new data. @@ -1406,6 +1347,21 @@ def refit( self """ + if dataset is None: + if ( + X_train is not None + and y_train is not None + ): + raise ValueError("No dataset provided, must provide X_train, y_train tensors") + dataset = self.get_dataset(X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + resampling_strategy=resampling_strategy, + resampling_strategy_args=resampling_strategy_args, + dataset_name=dataset_name + ) + self.dataset_name = dataset.dataset_name if self._logger is None: @@ -1419,26 +1375,48 @@ def refit( dataset_properties = dataset.get_dataset_properties(dataset_requirements) self._backend.save_datamanager(dataset) + include_components = self.include_components if include_components is None else include_components + exclude_components = self.exclude_components if exclude_components is None else exclude_components + search_space_updates = self.search_space_updates if search_space_updates is None else search_space_updates + + scenario_mock = unittest.mock.Mock() + scenario_mock.wallclock_limit = run_time_limit_secs + # This stats object is a hack - maybe the SMAC stats object should + # already be generated here! + stats = Stats(scenario_mock) + + if memory_limit is None and getattr(self, '_memory_limit', None) is not None: + memory_limit = self._memory_limit + + metric = get_metrics(dataset_properties=dataset_properties, + names=[eval_metric] if eval_metric is not None else None, + all_supported_metrics=False).pop() + + pipeline_options = self.pipeline_options.copy().update(pipeline_options) if pipeline_options is not None \ + else self.pipeline_options.copy() + + assert pipeline_options is not None + + if budget_type is not None: + pipeline_options.update({'budget_type': budget_type}) + else: + budget_type = pipeline_options['budget_type'] + + budget = budget if budget is not None else pipeline_options[budget_type] + + if disable_file_output is None: + disable_file_output = getattr(self, '_disable_file_output', []) + + stats.start_timing() + if self.models_ is None or len(self.models_) == 0 or self.ensemble_ is None: self._load_models() - # Refit is not applicable when ensemble_size is set to zero. - if self.ensemble_ is None: - raise ValueError("Refit can only be called if 'ensemble_size != 0'") - + configurations = [] for identifier in self.models_: model = self.models_[identifier] - # this updates the model inplace, it can then later be used in - # predict method - - # try to fit the model. If it fails, shuffle the data. This - # could alleviate the problem in algorithms that depend on - # the ordering of the data. - X = self._get_fit_dictionary( - dataset_properties=copy.copy(dataset_properties), - dataset=dataset, - split_id=split_id) - fit_and_suppress_warnings(self._logger, model, X, y=None) + configurations.append(model.config) + self._clean_logger() diff --git a/autoPyTorch/evaluation/abstract_evaluator.py b/autoPyTorch/evaluation/abstract_evaluator.py index d20a96b75..a03980d4a 100644 --- a/autoPyTorch/evaluation/abstract_evaluator.py +++ b/autoPyTorch/evaluation/abstract_evaluator.py @@ -805,6 +805,10 @@ def finish_up(self, loss: Dict[str, float], train_loss: Dict[str, float], if test_loss is not None: additional_run_info['test_loss'] = test_loss + # Add information to additional info that can be useful for other functionalities + additional_run_info['configuration'] = self.configuration if not isinstance(self.configuration, Configuration) else self.configuration.get_dictionary() + additional_run_info['budget'] = self.budget + rval_dict = {'loss': cost, 'additional_run_info': additional_run_info, 'status': status} diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py new file mode 100644 index 000000000..a92a21986 --- /dev/null +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -0,0 +1,195 @@ +import time +import math +from typing import Any, Dict, List, Tuple, Union +import unittest + +from ConfigSpace.configuration_space import Configuration, ConfigurationSpace + +import dask.distributed + +from smac.runhistory.runhistory import RunHistory +from smac.stats.stats import Stats +from smac.tae import StatusType + +from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash +from autoPyTorch.automl_common.common.utils.backend import Backend +from autoPyTorch.utils.common import dict_repr + + +def run_models_on_dataset( + time_left: int, + func_eval_time_limit_secs: int, + model_configs: List[Tuple[Union[str, Configuration]]], + logger, + logger_port, + metric, + dask_client: dask.distributed.Client, + backend: Backend, + memory_limit: int, + disable_file_output, + all_supported_metrics: bool, + include, + exclude, + search_space_updates, + pipeline_options, + seed: int, + multiprocessing_context, + n_jobs: int, + current_search_space: ConfigurationSpace, + smac_initial_run: int +) -> RunHistory: + """ + Runs models specified by `model_configs` on dask parallel infrastructure. + + Args: + time_left (int): _description_ + func_eval_time_limit_secs (int): _description_ + model_configs (List[Tuple[Union[str, Configuration]]]): _description_ + logger (_type_): _description_ + logger_port (_type_): _description_ + metric (_type_): _description_ + dask_client (dask.distributed.Client): _description_ + backend (Backend): _description_ + memory_limit (int): _description_ + disable_file_output (_type_): _description_ + all_supported_metrics (bool): _description_ + include (_type_): _description_ + exclude (_type_): _description_ + search_space_updates (_type_): _description_ + pipeline_options (_type_): _description_ + seed (int): _description_ + multiprocessing_context (_type_): _description_ + n_jobs (int): _description_ + current_search_space (ConfigurationSpace): _description_ + smac_initial_run (int): _description_ + + Returns: + RunHistory: _description_ + """ + starttime = time.time() + run_history = RunHistory() + memory_limit = memory_limit + if memory_limit is not None: + memory_limit = int(math.ceil(memory_limit)) + model_identifiers = [] + total_models = len(model_configs) + dask_futures = [] + for n_r, (config, budget) in enumerate(model_configs): + + # Only launch a task if there is time + start_time = time.time() + if time_left >= func_eval_time_limit_secs: + logger.info(f"{n_r}: Started fitting {config} with cutoff={func_eval_time_limit_secs}") + scenario_mock = unittest.mock.Mock() + scenario_mock.wallclock_limit = time_left + # This stats object is a hack - maybe the SMAC stats object should + # already be generated here! + stats = Stats(scenario_mock) + stats.start_timing() + + if isinstance(config, Configuration): + config.config_id = n_r + init_num_run = smac_initial_run + else: + init_num_run = smac_initial_run + n_r + + ta = ExecuteTaFuncWithQueue( + pynisher_context=multiprocessing_context, + backend=backend, + seed=seed, + metric=metric, + multi_objectives=["cost"], + logger_port=logger_port, + pipeline_config=pipeline_options, + cost_for_crash=get_cost_of_crash(metric), + abort_on_first_run_crash=False, + initial_num_run=init_num_run, + stats=stats, + memory_limit=memory_limit, + disable_file_output=disable_file_output, + all_supported_metrics=all_supported_metrics, + include=include, + exclude=exclude, + search_space_updates=search_space_updates + ) + dask_futures.append([ + config, + dask_client.submit( + ta.run, config=config, + cutoff=func_eval_time_limit_secs, + budget=budget + ) + ]) + + # When managing time, we need to take into account the allocated time resources, + # which are dependent on the number of cores. 'dask_futures' is a proxy to the number + # of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most + # 4 task in parallel. Every 'cutoff' seconds, we generate up to 4 tasks. + # If we only have 4 workers and there are 4 futures in dask_futures, it means that every + # worker has a task. We would not like to launch another job until a worker is available. To this + # end, the following if-statement queries the number of active jobs, and forces to wait for a job + # completion via future.result(), so that a new worker is available for the next iteration. + if len(dask_futures) >= n_jobs: + + # How many workers to wait before starting fitting the next iteration + workers_to_wait = 1 + if n_r >= total_models - 1 or time_left <= func_eval_time_limit_secs: + # If on the last iteration, flush out all tasks + workers_to_wait = len(dask_futures) + + while workers_to_wait >= 1: + workers_to_wait -= 1 + # We launch dask jobs only when there are resources available. + # This allow us to control time allocation properly, and early terminate + # the traditional machine learning pipeline + cls, future = dask_futures.pop(0) + status, cost, runtime, additional_info = future.result() + + if status == StatusType.SUCCESS: + logger.info( + "Fitting {} took {} [sec] and got performance: {}.\n" + "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) + ) + origin = additional_info['configuration_origin'] + config = additional_info['configuration'] + budget = additional_info['budget'] + + # indicates the finished model is part of autopytorch search space + if isinstance(config, dict): + configuration = Configuration(current_search_space, config) + else: + # we assume that it is a traditional model and `pipeline_configuration` specifies the configuration. + configuration = additional_info.pop('pipeline_configuration') + + run_history.add(config=configuration, cost=cost, + time=runtime, status=status, seed=seed, + starttime=starttime, endtime=starttime + runtime, + origin=origin, additional_info=additional_info) + model_identifiers.append((seed, additional_info['num_run'], float(budget))) + else: + if additional_info.get('exitcode') == -6: + logger.error( + "Traditional prediction for {} failed with run state {},\n" + "because the provided memory limits were too tight.\n" + "Please increase the 'ml_memory_limit' and try again.\n" + "If you still get the problem, please open an issue\n" + "and paste the additional info.\n" + "Additional info:\n{}".format(cls, str(status), dict_repr(additional_info)) + ) + else: + logger.error( + "Traditional prediction for {} failed with run state {}.\nAdditional info:\n{}".format( + cls, str(status), dict_repr(additional_info) + ) + ) + model_identifiers.append(None) + # In the case of a serial execution, calling submit halts the run for a resource + # dynamically adjust time in this case + time_left -= int(time.time() - start_time) + + # Exit if no more time is available for a new classifier + if time_left < func_eval_time_limit_secs: + logger.warning("Not enough time to fit all machine learning models." + "Please consider increasing the run time to further improve performance.") + break + return run_history, model_identifiers From dc191285709424e81c7d256b86535c72256cfaf1 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Mon, 15 Aug 2022 15:30:15 +0200 Subject: [PATCH 02/18] update pipeline config to pipeline options --- autoPyTorch/api/base_task.py | 59 ++++++++++++------- autoPyTorch/evaluation/abstract_evaluator.py | 20 +++---- autoPyTorch/evaluation/tae.py | 18 +++--- autoPyTorch/evaluation/test_evaluator.py | 10 ++-- ...time_series_forecasting_train_evaluator.py | 12 ++-- autoPyTorch/evaluation/train_evaluator.py | 12 ++-- autoPyTorch/optimizer/smbo.py | 8 +-- autoPyTorch/utils/parallel_model_runner.py | 4 +- test/test_api/test_base_api.py | 18 +++--- test/test_api/utils.py | 8 +-- .../test_abstract_evaluator.py | 2 +- test/test_evaluation/test_evaluators.py | 8 +-- .../test_forecasting_evaluators.py | 10 ++-- 13 files changed, 104 insertions(+), 85 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 4b02dbc00..d2dd4faf5 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -34,11 +34,13 @@ from autoPyTorch import metrics from autoPyTorch.automl_common.common.utils.backend import Backend, create from autoPyTorch.constants import ( + CLASSIFICATION_TASKS, FORECASTING_BUDGET_TYPE, FORECASTING_TASKS, REGRESSION_TASKS, STRING_TO_OUTPUT_TYPES, STRING_TO_TASK_TYPES, + TABULAR_TASKS, TIMESERIES_FORECASTING, ) from autoPyTorch.data.base_validator import BaseInputValidator @@ -52,7 +54,7 @@ ) from autoPyTorch.ensemble.ensemble_builder import EnsembleBuilderManager from autoPyTorch.ensemble.singlebest_ensemble import SingleBest -from autoPyTorch.evaluation.abstract_evaluator import fit_and_suppress_warnings +from autoPyTorch.evaluation.abstract_evaluator import MyTraditionalTabularClassificationPipeline, MyTraditionalTabularRegressionPipeline, fit_and_suppress_warnings from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash from autoPyTorch.evaluation.utils import DisableFileOutputParameters from autoPyTorch.optimizer.smbo import AutoMLSMBO @@ -444,14 +446,14 @@ def ensemble_performance_history(self) -> List[Dict[str, Any]]: def trajectory(self) -> Optional[List]: return self._results_manager.trajectory - def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None: + def set_pipeline_options(self, **pipeline_options_kwargs: Any) -> None: """ Check whether arguments are valid and then sets them to the current pipeline configuration. Args: - **pipeline_config_kwargs: Valid config options include "num_run", + **pipeline_options_kwargs: Valid config options include "num_run", "device", "budget_type", "epochs", "runtime", "torch_num_threads", "early_stopping", "use_tensorboard_logger", "metrics_during_training" @@ -460,7 +462,7 @@ def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None: None """ unknown_keys = [] - for option, value in pipeline_config_kwargs.items(): + for option, value in pipeline_options_kwargs.items(): if option in self.pipeline_options.keys(): pass else: @@ -471,7 +473,7 @@ def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None: " expected arguments to be in {}". format(unknown_keys, self.pipeline_options.keys())) - self.pipeline_options.update(pipeline_config_kwargs) + self.pipeline_options.update(pipeline_options_kwargs) def get_pipeline_options(self) -> dict: """ @@ -1196,7 +1198,7 @@ def _search( all_supported_metrics=self._all_supported_metrics, smac_scenario_args=smac_scenario_args, get_smac_object_callback=get_smac_object_callback, - pipeline_config=self.pipeline_options, + pipeline_options=self.pipeline_options, min_budget=min_budget, max_budget=max_budget, ensemble_callback=proc_ensemble, @@ -1310,14 +1312,12 @@ def refit( dataset_name: Optional[str] = None, resampling_strategy: Optional[Union[HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes]] = None, resampling_strategy_args: Optional[Dict[str, Any]] = None, + total_walltime_limit: int = 120, run_time_limit_secs: int = 60, memory_limit: Optional[int] = None, eval_metric: Optional[str] = None, all_supported_metrics: bool = False, budget_type: Optional[str] = None, - include_components: Optional[Dict[str, Any]] = None, - exclude_components: Optional[Dict[str, Any]] = None, - search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None, budget: Optional[float] = None, pipeline_options: Optional[Dict] = None, disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, @@ -1332,8 +1332,8 @@ def refit( given. This method may also be used together with holdout to avoid only using 66% of the training data to fit the final model. - Refit uses the estimator pipeline_config attribute, which the user - can interact via the get_pipeline_config()/set_pipeline_config() + Refit uses the estimator pipeline_options attribute, which the user + can interact via the get_pipeline_options()/set_pipeline_options() methods. Args: @@ -1375,10 +1375,6 @@ def refit( dataset_properties = dataset.get_dataset_properties(dataset_requirements) self._backend.save_datamanager(dataset) - include_components = self.include_components if include_components is None else include_components - exclude_components = self.exclude_components if exclude_components is None else exclude_components - search_space_updates = self.search_space_updates if search_space_updates is None else search_space_updates - scenario_mock = unittest.mock.Mock() scenario_mock.wallclock_limit = run_time_limit_secs # This stats object is a hack - maybe the SMAC stats object should @@ -1412,11 +1408,34 @@ def refit( if self.models_ is None or len(self.models_) == 0 or self.ensemble_ is None: self._load_models() - configurations = [] + model_configs = [] for identifier in self.models_: model = self.models_[identifier] - configurations.append(model.config) + budget = identifier[-1] # identifier is num_run, seed, budget + model_configs.append(budget, model.config) + _, model_identifiers = run_models_on_dataset( + time_left=total_walltime_limit, + func_eval_time_limit_secs=run_time_limit_secs, + model_configs=model_configs, + logger=self._logger, + logger_port=self._logger_port, + metric=metric, + dask_client=self._dask_client, + backend=self._backend, + memory_limit=memory_limit, + disable_file_output=disable_file_output, + all_supported_metrics=all_supported_metrics, + include=self.include_components, + exclude=self.exclude_components, + search_space_updates=self.search_space_updates, + pipeline_options=pipeline_options, + seed=self.seed, + multiprocessing_context=self._multiprocessing_context, + n_jobs=self.n_jobs, + current_search_space=self.search_space, + smac_initial_run=self._backend.get_next_num_run() + ) self._clean_logger() @@ -1451,8 +1470,8 @@ def fit_pipeline( A pipeline configuration can be specified if None, uses default - Fit uses the estimator pipeline_config attribute, which the user - can interact via the get_pipeline_config()/set_pipeline_config() + Fit uses the estimator pipeline_options attribute, which the user + can interact via the get_pipeline_options()/set_pipeline_options() methods. Args: @@ -1644,7 +1663,7 @@ def fit_pipeline( include=include_components, exclude=exclude_components, search_space_updates=search_space_updates, - pipeline_config=pipeline_options, + pipeline_options=pipeline_options, pynisher_context=self._multiprocessing_context, ) diff --git a/autoPyTorch/evaluation/abstract_evaluator.py b/autoPyTorch/evaluation/abstract_evaluator.py index a03980d4a..b0ff086c4 100644 --- a/autoPyTorch/evaluation/abstract_evaluator.py +++ b/autoPyTorch/evaluation/abstract_evaluator.py @@ -347,7 +347,7 @@ class AbstractEvaluator(object): An evaluator is an object that: + constructs a pipeline (i.e. a classification or regression estimator) for a given - pipeline_config and run settings (budget, seed) + pipeline_options and run settings (budget, seed) + Fits and trains this pipeline (TrainEvaluator) or tests a given configuration (TestEvaluator) @@ -369,7 +369,7 @@ class AbstractEvaluator(object): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type. Currently, only epoch and time are allowed. - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -430,7 +430,7 @@ def __init__(self, backend: Backend, budget: float, configuration: Union[int, str, Configuration], budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, seed: int = 1, output_y_hat_optimization: bool = True, num_run: Optional[int] = None, @@ -523,10 +523,10 @@ def __init__(self, backend: Backend, self._init_params = init_params assert self.pipeline_class is not None, "Could not infer pipeline class" - pipeline_config = pipeline_config if pipeline_config is not None \ + pipeline_options = pipeline_options if pipeline_options is not None \ else self.pipeline_class.get_default_pipeline_options() - self.budget_type = pipeline_config['budget_type'] if budget_type is None else budget_type - self.budget = pipeline_config[self.budget_type] if budget == 0 else budget + self.budget_type = pipeline_options['budget_type'] if budget_type is None else budget_type + self.budget = pipeline_options[self.budget_type] if budget == 0 else budget self.num_run = 0 if num_run is None else num_run @@ -539,7 +539,7 @@ def __init__(self, backend: Backend, port=logger_port, ) - self._init_fit_dictionary(logger_port=logger_port, pipeline_config=pipeline_config, metrics_dict=metrics_dict) + self._init_fit_dictionary(logger_port=logger_port, pipeline_options=pipeline_options, metrics_dict=metrics_dict) self.Y_optimization: Optional[np.ndarray] = None self.Y_actual_train: Optional[np.ndarray] = None self.pipelines: Optional[List[BaseEstimator]] = None @@ -597,7 +597,7 @@ def _init_datamanager_info( def _init_fit_dictionary( self, logger_port: int, - pipeline_config: Dict[str, Any], + pipeline_options: Dict[str, Any], metrics_dict: Optional[Dict[str, List[str]]] = None, ) -> None: """ @@ -608,7 +608,7 @@ def _init_fit_dictionary( Logging is performed using a socket-server scheme to be robust against many parallel entities that want to write to the same file. This integer states the socket port for the communication channel. - pipeline_config (Dict[str, Any]): + pipeline_options (Dict[str, Any]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -634,7 +634,7 @@ def _init_fit_dictionary( 'optimize_metric': self.metric.name }) - self.fit_dictionary.update(pipeline_config) + self.fit_dictionary.update(pipeline_options) # If the budget is epochs, we want to limit that in the fit dictionary if self.budget_type == 'epochs': self.fit_dictionary['epochs'] = self.budget diff --git a/autoPyTorch/evaluation/tae.py b/autoPyTorch/evaluation/tae.py index b1650113c..3eaea6720 100644 --- a/autoPyTorch/evaluation/tae.py +++ b/autoPyTorch/evaluation/tae.py @@ -123,7 +123,7 @@ def __init__( abort_on_first_run_crash: bool, pynisher_context: str, multi_objectives: List[str], - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, initial_num_run: int = 1, stats: Optional[Stats] = None, run_obj: str = 'quality', @@ -198,13 +198,13 @@ def __init__( self.disable_file_output = disable_file_output self.init_params = init_params - self.budget_type = pipeline_config['budget_type'] if pipeline_config is not None else budget_type + self.budget_type = pipeline_options['budget_type'] if pipeline_options is not None else budget_type - self.pipeline_config: Dict[str, Union[int, str, float]] = dict() - if pipeline_config is None: - pipeline_config = replace_string_bool_to_bool(json.load(open( + self.pipeline_options: Dict[str, Union[int, str, float]] = dict() + if pipeline_options is None: + pipeline_options = replace_string_bool_to_bool(json.load(open( os.path.join(os.path.dirname(__file__), '../configs/default_pipeline_options.json')))) - self.pipeline_config.update(pipeline_config) + self.pipeline_options.update(pipeline_options) self.logger_port = logger_port if self.logger_port is None: @@ -225,7 +225,7 @@ def __init__( def _check_and_get_default_budget(self) -> float: budget_type_choices_tabular = ('epochs', 'runtime') budget_choices = { - budget_type: float(self.pipeline_config.get(budget_type, np.inf)) + budget_type: float(self.pipeline_options.get(budget_type, np.inf)) for budget_type in budget_type_choices_tabular } @@ -234,7 +234,7 @@ def _check_and_get_default_budget(self) -> float: budget_type_choices = budget_type_choices_tabular + FORECASTING_BUDGET_TYPE # budget is defined by epochs by default - budget_type = str(self.pipeline_config.get('budget_type', 'epochs')) + budget_type = str(self.pipeline_options.get('budget_type', 'epochs')) if self.budget_type is not None: budget_type = self.budget_type @@ -361,7 +361,7 @@ def run( init_params=init_params, budget=budget, budget_type=self.budget_type, - pipeline_config=self.pipeline_config, + pipeline_options=self.pipeline_options, logger_port=self.logger_port, all_supported_metrics=self.all_supported_metrics, search_space_updates=self.search_space_updates diff --git a/autoPyTorch/evaluation/test_evaluator.py b/autoPyTorch/evaluation/test_evaluator.py index 4d5b0ae91..12b7bc31d 100644 --- a/autoPyTorch/evaluation/test_evaluator.py +++ b/autoPyTorch/evaluation/test_evaluator.py @@ -51,7 +51,7 @@ class TestEvaluator(AbstractEvaluator): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -113,7 +113,7 @@ def __init__( budget: float, configuration: Union[int, str, Configuration], budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, seed: int = 1, output_y_hat_optimization: bool = False, num_run: Optional[int] = None, @@ -141,7 +141,7 @@ def __init__( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates ) @@ -206,7 +206,7 @@ def eval_test_function( include: Optional[Dict[str, Any]], exclude: Optional[Dict[str, Any]], disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, budget_type: str = None, init_params: Optional[Dict[str, Any]] = None, logger_port: Optional[int] = None, @@ -230,7 +230,7 @@ def eval_test_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates) evaluator.fit_predict_and_loss() diff --git a/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py b/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py index 0940d1e9a..07a87ede4 100644 --- a/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py +++ b/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py @@ -40,7 +40,7 @@ class TimeSeriesForecastingTrainEvaluator(TrainEvaluator): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -106,7 +106,7 @@ def __init__(self, backend: Backend, queue: Queue, metric: autoPyTorchMetric, budget: float, budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, configuration: Optional[Configuration] = None, seed: int = 1, output_y_hat_optimization: bool = True, @@ -138,7 +138,7 @@ def __init__(self, backend: Backend, queue: Queue, logger_port=logger_port, keep_models=keep_models, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates ) self.datamanager = backend.load_datamanager() @@ -456,7 +456,7 @@ def forecasting_eval_train_function( include: Optional[Dict[str, Any]], exclude: Optional[Dict[str, Any]], disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, budget_type: str = None, init_params: Optional[Dict[str, Any]] = None, logger_port: Optional[int] = None, @@ -490,7 +490,7 @@ def forecasting_eval_train_function( The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -550,7 +550,7 @@ def forecasting_eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, max_budget=max_budget, min_num_test_instances=min_num_test_instances, diff --git a/autoPyTorch/evaluation/train_evaluator.py b/autoPyTorch/evaluation/train_evaluator.py index 142af6bcc..e88c8eaca 100644 --- a/autoPyTorch/evaluation/train_evaluator.py +++ b/autoPyTorch/evaluation/train_evaluator.py @@ -60,7 +60,7 @@ class TrainEvaluator(AbstractEvaluator): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -121,7 +121,7 @@ def __init__(self, backend: Backend, queue: Queue, budget: float, configuration: Union[int, str, Configuration], budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, seed: int = 1, output_y_hat_optimization: bool = True, num_run: Optional[int] = None, @@ -149,7 +149,7 @@ def __init__(self, backend: Backend, queue: Queue, budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates ) @@ -420,7 +420,7 @@ def eval_train_function( include: Optional[Dict[str, Any]], exclude: Optional[Dict[str, Any]], disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, budget_type: str = None, init_params: Optional[Dict[str, Any]] = None, logger_port: Optional[int] = None, @@ -452,7 +452,7 @@ def eval_train_function( The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -506,7 +506,7 @@ def eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, ) evaluator.fit_predict_and_loss() diff --git a/autoPyTorch/optimizer/smbo.py b/autoPyTorch/optimizer/smbo.py index 53eae4696..92bf7bb87 100644 --- a/autoPyTorch/optimizer/smbo.py +++ b/autoPyTorch/optimizer/smbo.py @@ -111,7 +111,7 @@ def __init__(self, watcher: StopWatch, n_jobs: int, dask_client: Optional[dask.distributed.Client], - pipeline_config: Dict[str, Any], + pipeline_options: Dict[str, Any], start_num_run: int = 1, seed: int = 1, resampling_strategy: Union[HoldoutValTypes, @@ -227,7 +227,7 @@ def __init__(self, self.backend = backend self.all_supported_metrics = all_supported_metrics - self.pipeline_config = pipeline_config + self.pipeline_options = pipeline_options # the configuration space self.config_space = config_space @@ -326,7 +326,7 @@ def run_smbo(self, func: Optional[Callable] = None ta=func, logger_port=self.logger_port, all_supported_metrics=self.all_supported_metrics, - pipeline_config=self.pipeline_config, + pipeline_options=self.pipeline_options, search_space_updates=self.search_space_updates, pynisher_context=self.pynisher_context, ) @@ -376,7 +376,7 @@ def run_smbo(self, func: Optional[Callable] = None ) scenario_dict.update(self.smac_scenario_args) - budget_type = self.pipeline_config['budget_type'] + budget_type = self.pipeline_options['budget_type'] if budget_type in FORECASTING_BUDGET_TYPE: if STRING_TO_TASK_TYPES.get(self.task_type, -1) != TIMESERIES_FORECASTING: raise ValueError('Forecasting Budget type is only available for forecasting task!') diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index a92a21986..11e04bbe7 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -100,7 +100,7 @@ def run_models_on_dataset( metric=metric, multi_objectives=["cost"], logger_port=logger_port, - pipeline_config=pipeline_options, + pipeline_options=pipeline_options, cost_for_crash=get_cost_of_crash(metric), abort_on_first_run_crash=False, initial_num_run=init_num_run, @@ -159,7 +159,7 @@ def run_models_on_dataset( configuration = Configuration(current_search_space, config) else: # we assume that it is a traditional model and `pipeline_configuration` specifies the configuration. - configuration = additional_info.pop('pipeline_configuration') + configuration = additional_info.pop('pipeline_optionsuration') run_history.add(config=configuration, cost=cost, time=runtime, status=status, seed=seed, diff --git a/test/test_api/test_base_api.py b/test/test_api/test_base_api.py index bb8f9c061..ac01af25e 100644 --- a/test/test_api/test_base_api.py +++ b/test/test_api/test_base_api.py @@ -27,7 +27,7 @@ def test_nonsupported_arguments(fit_dictionary_tabular): api = BaseTask() with pytest.raises(ValueError, match=r".*Invalid configuration arguments given.*"): - api.set_pipeline_config(unsupported=True) + api.set_pipeline_options(unsupported=True) with pytest.raises(ValueError, match=r".*No search space initialised and no dataset.*"): api.get_search_space() api.resampling_strategy = None @@ -95,7 +95,7 @@ def test_show_models(fit_dictionary_tabular): assert re.search(expected, api.show_models()) is not None -def test_set_pipeline_config(): +def test_set_pipeline_options(): # checks if we can correctly change the pipeline options BaseTask.__abstractmethods__ = set() estimator = BaseTask() @@ -103,7 +103,7 @@ def test_set_pipeline_config(): "budget_type": "epochs", "epochs": 51, "runtime": 360} - estimator.set_pipeline_config(**pipeline_options) + estimator.set_pipeline_options(**pipeline_options) assert pipeline_options.items() <= estimator.get_pipeline_options().items() @@ -118,12 +118,12 @@ def test_pipeline_get_budget(fit_dictionary_tabular, min_budget, max_budget, bud estimator = BaseTask(task_type='tabular_classification', ensemble_size=0) # Fixture pipeline config - default_pipeline_config = { + default_pipeline_options = { 'device': 'cpu', 'budget_type': 'epochs', 'epochs': 50, 'runtime': 3600, 'torch_num_threads': 1, 'early_stopping': 20, 'use_tensorboard_logger': False, 'metrics_during_training': True, 'optimize_metric': 'accuracy' } - default_pipeline_config.update(expected) + default_pipeline_options.update(expected) # Create pre-requisites dataset = fit_dictionary_tabular['backend'].load_datamanager() @@ -141,7 +141,7 @@ def test_pipeline_get_budget(fit_dictionary_tabular, min_budget, max_budget, bud enable_traditional_pipeline=False, total_walltime_limit=20, func_eval_time_limit_secs=10, load_models=False) - assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_config'] == default_pipeline_config + assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_options'] == default_pipeline_options assert list(smac_mock.call_args)[1]['max_budget'] == max_budget assert list(smac_mock.call_args)[1]['initial_budget'] == min_budget @@ -174,12 +174,12 @@ def test_pipeline_get_budget_forecasting(fit_dictionary_forecasting, min_budget, BaseTask.__abstractmethods__ = set() estimator = BaseTask(task_type='time_series_forecasting', ensemble_size=0) # Fixture pipeline config - default_pipeline_config = { + default_pipeline_options = { 'device': 'cpu', 'budget_type': 'epochs', 'epochs': 50, 'runtime': 3600, 'torch_num_threads': 1, 'early_stopping': 20, 'use_tensorboard_logger': False, 'metrics_during_training': True, 'optimize_metric': 'mean_MASE_forecasting' } - default_pipeline_config.update(expected) + default_pipeline_options.update(expected) # Create pre-requisites dataset = fit_dictionary_forecasting['backend'].load_datamanager() @@ -198,6 +198,6 @@ def test_pipeline_get_budget_forecasting(fit_dictionary_forecasting, min_budget, total_walltime_limit=20, func_eval_time_limit_secs=10, memory_limit=8192, load_models=False) - assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_config'] == default_pipeline_config + assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_options'] == default_pipeline_options assert list(smac_mock.call_args)[1]['max_budget'] == max_budget assert list(smac_mock.call_args)[1]['initial_budget'] == min_budget diff --git a/test/test_api/utils.py b/test/test_api/utils.py index bbee9a3c4..22e8c2a63 100644 --- a/test/test_api/utils.py +++ b/test/test_api/utils.py @@ -94,7 +94,7 @@ def dummy_eval_train_function( include, exclude, disable_file_output, - pipeline_config=None, + pipeline_options=None, budget_type=None, init_params=None, logger_port=None, @@ -118,7 +118,7 @@ def dummy_eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, ) evaluator.fit_predict_and_loss() @@ -137,7 +137,7 @@ def dummy_forecasting_eval_train_function( include, exclude, disable_file_output, - pipeline_config=None, + pipeline_options=None, budget_type=None, init_params=None, logger_port=None, @@ -163,7 +163,7 @@ def dummy_forecasting_eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, max_budget=max_budget, min_num_test_instances=min_num_test_instances, diff --git a/test/test_evaluation/test_abstract_evaluator.py b/test/test_evaluation/test_abstract_evaluator.py index a0be2c3f3..bb9df88e7 100644 --- a/test/test_evaluation/test_abstract_evaluator.py +++ b/test/test_evaluation/test_abstract_evaluator.py @@ -307,7 +307,7 @@ def test_error_unsupported_budget_type(self): backend=backend, output_y_hat_optimization=False, queue=queue_mock, - pipeline_config={'budget_type': "error", 'error': 0}, + pipeline_options={'budget_type': "error", 'error': 0}, metric=accuracy, budget=0, configuration=1) diff --git a/test/test_evaluation/test_evaluators.py b/test/test_evaluation/test_evaluators.py index 2ca32af10..7261a81fa 100644 --- a/test/test_evaluation/test_evaluators.py +++ b/test/test_evaluation/test_evaluators.py @@ -102,7 +102,7 @@ def test_holdout(self, pipeline_mock): queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(backend_api, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -141,7 +141,7 @@ def test_cv(self, pipeline_mock): queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(backend_api, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -246,7 +246,7 @@ def test_predict_proba_binary_classification(self, mock): queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(self.backend_mock, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.fit_predict_and_loss() Y_optimization_pred = self.backend_mock.save_numrun_to_dir.call_args_list[0][1][ @@ -283,7 +283,7 @@ def test_additional_metrics_during_training(self, pipeline_mock): queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(backend_api, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, all_supported_metrics=True) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, all_supported_metrics=True) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) diff --git a/test/test_evaluation/test_forecasting_evaluators.py b/test/test_evaluation/test_forecasting_evaluators.py index 580402d5c..66c026e9c 100644 --- a/test/test_evaluation/test_forecasting_evaluators.py +++ b/test/test_evaluation/test_forecasting_evaluators.py @@ -60,7 +60,7 @@ def test_budget_type_choices(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_config={'budget_type': budget_type, + pipeline_options={'budget_type': budget_type, budget_type: 0.1}, min_num_test_instances=100) self.assertTrue('epochs' not in evaluator.fit_dictionary) @@ -93,7 +93,7 @@ def test_holdout(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, min_num_test_instances=100) self.assertTrue('epochs' in evaluator.fit_dictionary) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) @@ -148,7 +148,7 @@ def test_cv(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_optionss={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -197,7 +197,7 @@ def test_proxy_val_set(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0.3, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, min_num_test_instances=1) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -247,7 +247,7 @@ def test_finish_up(self, pipeline_mock, queue_mock): queue_mock, configuration=configuration, metric=mean_MASE_forecasting, budget=0.3, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, min_num_test_instances=1) val_splits = D.splits[0][1] From ce0611bbcb2872fae71acff056673dcc2e22d917 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 16 Aug 2022 15:49:48 +0200 Subject: [PATCH 03/18] working refit function --- autoPyTorch/api/base_task.py | 71 +++++++++++++------ autoPyTorch/ensemble/abstract_ensemble.py | 13 ++++ autoPyTorch/utils/parallel_model_runner.py | 4 +- .../example_tabular_classification.py | 40 +++++++++-- 4 files changed, 99 insertions(+), 29 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index d2dd4faf5..c66cda7ea 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -53,6 +53,7 @@ ResamplingStrategies, ) from autoPyTorch.ensemble.ensemble_builder import EnsembleBuilderManager +from ..ensemble.ensemble_selection import EnsembleSelection from autoPyTorch.ensemble.singlebest_ensemble import SingleBest from autoPyTorch.evaluation.abstract_evaluator import MyTraditionalTabularClassificationPipeline, MyTraditionalTabularRegressionPipeline, fit_and_suppress_warnings from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash @@ -637,7 +638,10 @@ def _close_dask_client(self) -> None: self._is_dask_client_internally_created = False del self._is_dask_client_internally_created - def _load_models(self) -> bool: + def _load_models( + self, + resampling_strategy: Optional[Union[CrossValTypes, HoldoutValTypes, NoResamplingStrategyTypes]] = None + ) -> bool: """ Loads the models saved in the temporary directory @@ -646,7 +650,8 @@ def _load_models(self) -> bool: Returns: None """ - if self.resampling_strategy is None: + resampling_strategy = resampling_strategy if resampling_strategy is not None else self.resampling_strategy + if resampling_strategy is None: raise ValueError("Resampling strategy is needed to determine what models to load") self.ensemble_ = self._backend.load_ensemble(self.seed) @@ -657,10 +662,10 @@ def _load_models(self) -> bool: if self.ensemble_: identifiers = self.ensemble_.get_selected_model_identifiers() self.models_ = self._backend.load_models_by_identifiers(identifiers) - if isinstance(self.resampling_strategy, CrossValTypes): + if isinstance(resampling_strategy, CrossValTypes): self.cv_models_ = self._backend.load_cv_models_by_identifiers(identifiers) - if isinstance(self.resampling_strategy, CrossValTypes): + if isinstance(resampling_strategy, CrossValTypes): if len(self.cv_models_) == 0: raise ValueError('No models fitted!') @@ -1310,7 +1315,7 @@ def refit( X_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, y_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, dataset_name: Optional[str] = None, - resampling_strategy: Optional[Union[HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes]] = None, + resampling_strategy: Union[HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes] = NoResamplingStrategyTypes.no_resampling, resampling_strategy_args: Optional[Dict[str, Any]] = None, total_walltime_limit: int = 120, run_time_limit_secs: int = 60, @@ -1349,8 +1354,8 @@ def refit( if dataset is None: if ( - X_train is not None - and y_train is not None + X_train is None + and y_train is None ): raise ValueError("No dataset provided, must provide X_train, y_train tensors") dataset = self.get_dataset(X_train=X_train, @@ -1365,7 +1370,9 @@ def refit( self.dataset_name = dataset.dataset_name if self._logger is None: - self._logger = self._get_logger(str(self.dataset_name)) + self._logger = self._get_logger(f"RefitLogger-{self.dataset_name}") + + self._logger.debug(f"Starting refit") dataset_requirements = get_dataset_requirements( info=dataset.get_required_dataset_info(), @@ -1411,10 +1418,11 @@ def refit( model_configs = [] for identifier in self.models_: model = self.models_[identifier] - budget = identifier[-1] # identifier is num_run, seed, budget - model_configs.append(budget, model.config) - - _, model_identifiers = run_models_on_dataset( + budget = identifier[-1] # identifier is seed, num_run, budget + model_configs.append((model.config, budget)) + + self._logger.debug(f"Refitting {model_configs}") + run_history, _ = run_models_on_dataset( time_left=total_walltime_limit, func_eval_time_limit_secs=run_time_limit_secs, model_configs=model_configs, @@ -1436,7 +1444,28 @@ def refit( current_search_space=self.search_space, smac_initial_run=self._backend.get_next_num_run() ) + replace_old_identifiers_to_refit_identifiers = {} + + self._logger.debug(f"Finished refit training") + old_identifier_index = None + for run_key, run_value in run_history.data.items(): + config = run_value.additional_info['configuration'] + refit_identifier = (self.seed, run_value.additional_info['num_run'], run_key.budget) + for i, (configuration, _) in enumerate(model_configs): + if isinstance(configuration, Configuration): + configuration = configuration.get_dictionary() + self._logger.debug(f"Matching {config} with {configuration}") + if config == configuration: + old_identifier_index = i + break + if old_identifier_index is not None: + replace_old_identifiers_to_refit_identifiers[list(self.models_.keys())[old_identifier_index]] = refit_identifier + else: + self._logger.warning(f"Refit for {config} failed. Updating ensemble weights accordingly.") + old_identifier_index = None + self.ensemble_.update_identifiers(replace_old_identifiers_to_refit_identifiers) + self._load_models(resampling_strategy=resampling_strategy) self._clean_logger() return self @@ -1578,8 +1607,8 @@ def fit_pipeline( if dataset is None: if ( - X_train is not None - and y_train is not None + X_train is None + and y_train is None ): raise ValueError("No dataset provided, must provide X_train, y_train tensors") dataset = self.get_dataset(X_train=X_train, @@ -1600,22 +1629,22 @@ def fit_pipeline( # search process, it makes sense to set it to 0 configuration.__setattr__('config_id', 0) + include_components = self.include_components if include_components is None else include_components + exclude_components = self.exclude_components if exclude_components is None else exclude_components + search_space_updates = self.search_space_updates if search_space_updates is None else search_space_updates + # get dataset properties dataset_requirements = get_dataset_requirements( info=dataset.get_required_dataset_info(), - include=self.include_components, - exclude=self.exclude_components, - search_space_updates=self.search_space_updates) + include=include_components, + exclude=exclude_components, + search_space_updates=search_space_updates) dataset_properties = dataset.get_dataset_properties(dataset_requirements) self._backend.save_datamanager(dataset) if self._logger is None: self._logger = self._get_logger(dataset.dataset_name) - include_components = self.include_components if include_components is None else include_components - exclude_components = self.exclude_components if exclude_components is None else exclude_components - search_space_updates = self.search_space_updates if search_space_updates is None else search_space_updates - scenario_mock = unittest.mock.Mock() scenario_mock.wallclock_limit = run_time_limit_secs # This stats object is a hack - maybe the SMAC stats object should diff --git a/autoPyTorch/ensemble/abstract_ensemble.py b/autoPyTorch/ensemble/abstract_ensemble.py index 072b6d260..9ad2060e0 100644 --- a/autoPyTorch/ensemble/abstract_ensemble.py +++ b/autoPyTorch/ensemble/abstract_ensemble.py @@ -1,4 +1,5 @@ from abc import ABCMeta, abstractmethod +from copy import copy from typing import Any, Dict, List, Tuple, Union import numpy as np @@ -9,6 +10,9 @@ class AbstractEnsemble(object): __metaclass__ = ABCMeta + def __init__(self): + self.identifiers_: List[Tuple[int, int, float]] = [] + @abstractmethod def fit( self, @@ -76,3 +80,12 @@ def get_validation_performance(self) -> float: Returns: Score """ + + def update_identifiers( + self, + replace_identifiers_mapping: Dict[Tuple[int, int, float], Tuple[int, int, float]] + ) -> None: + identifiers = copy(self.identifiers_) + for i, identifier in enumerate(self.identifiers_): + identifiers[i] = replace_identifiers_mapping.get(identifier, identifier) + self.identifiers_ = identifiers diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 11e04bbe7..117dfc7ef 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -37,7 +37,7 @@ def run_models_on_dataset( n_jobs: int, current_search_space: ConfigurationSpace, smac_initial_run: int -) -> RunHistory: +) -> Union[RunHistory, List[Tuple[int, int, float]]]: """ Runs models specified by `model_configs` on dask parallel infrastructure. @@ -159,7 +159,7 @@ def run_models_on_dataset( configuration = Configuration(current_search_space, config) else: # we assume that it is a traditional model and `pipeline_configuration` specifies the configuration. - configuration = additional_info.pop('pipeline_optionsuration') + configuration = additional_info.pop('pipeline_configuration') run_history.add(config=configuration, cost=cost, time=runtime, status=status, seed=seed, diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index 636281eff..b22f92dda 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -3,13 +3,15 @@ Tabular Classification ====================== -The following example shows how to fit a sample classification model -with AutoPyTorch +The following example shows how to fit a simple classification ensemble +with AutoPyTorch and refit the found ensemble. """ import os import tempfile as tmp import warnings +from autoPyTorch.datasets.resampling_strategy import CrossValTypes, NoResamplingStrategyTypes + os.environ['JOBLIB_TEMP_FOLDER'] = tmp.gettempdir() os.environ['OMP_NUM_THREADS'] = '1' os.environ['OPENBLAS_NUM_THREADS'] = '1' @@ -62,13 +64,39 @@ ) ############################################################################ -# Print the final ensemble performance -# ==================================== +# Print the final ensemble performance before refit +# ================================================= + y_pred = api.predict(X_test) score = api.score(y_pred, y_test) print(score) -# Print the final ensemble built by AutoPyTorch -print(api.show_models()) # Print statistics from search print(api.sprint_statistics()) + +########################################################################### +# Refit the models on the full dataset. +# ===================================== + +api.refit( + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + dataset_name="Australian", + # you can change the resampling strategy to + # for example, CrossValTypes.k_fold_cross_validation + # to fit k fold models and have a voting classifier + # resampling_strategy=CrossValTypes.k_fold_cross_validation +) + +############################################################################ +# Print the final ensemble performance after refit +# ================================================ + +y_pred = api.predict(X_test) +score = api.score(y_pred, y_test) +print(score) + +# Print the final ensemble built by AutoPyTorch +print(api.show_models()) From 20c4cd35450574f145c84ca836caf627c2c42040 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 16 Aug 2022 16:46:43 +0200 Subject: [PATCH 04/18] fix mypy and flake --- autoPyTorch/api/base_task.py | 23 +++--- autoPyTorch/evaluation/abstract_evaluator.py | 3 +- autoPyTorch/utils/parallel_model_runner.py | 76 ++++++++++--------- .../test_forecasting_evaluators.py | 2 +- 4 files changed, 56 insertions(+), 48 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index c66cda7ea..c8b4cb8d0 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -34,13 +34,11 @@ from autoPyTorch import metrics from autoPyTorch.automl_common.common.utils.backend import Backend, create from autoPyTorch.constants import ( - CLASSIFICATION_TASKS, FORECASTING_BUDGET_TYPE, FORECASTING_TASKS, REGRESSION_TASKS, STRING_TO_OUTPUT_TYPES, STRING_TO_TASK_TYPES, - TABULAR_TASKS, TIMESERIES_FORECASTING, ) from autoPyTorch.data.base_validator import BaseInputValidator @@ -53,9 +51,7 @@ ResamplingStrategies, ) from autoPyTorch.ensemble.ensemble_builder import EnsembleBuilderManager -from ..ensemble.ensemble_selection import EnsembleSelection from autoPyTorch.ensemble.singlebest_ensemble import SingleBest -from autoPyTorch.evaluation.abstract_evaluator import MyTraditionalTabularClassificationPipeline, MyTraditionalTabularRegressionPipeline, fit_and_suppress_warnings from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash from autoPyTorch.evaluation.utils import DisableFileOutputParameters from autoPyTorch.optimizer.smbo import AutoMLSMBO @@ -72,7 +68,7 @@ start_log_server, ) from autoPyTorch.utils.parallel import preload_modules -from ..utils.parallel_model_runner import run_models_on_dataset +from autoPyTorch.utils.parallel_model_runner import run_models_on_dataset from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements from autoPyTorch.utils.results_manager import MetricResults, ResultsManager, SearchResults from autoPyTorch.utils.results_visualizer import ColorLabelSettings, PlotSettingParams, ResultsVisualizer @@ -813,8 +809,9 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) available_classifiers = get_available_traditional_learners() - model_configs = [(classifier, self.pipeline_options[self.pipeline_options['budget_type']]) for classifier in available_classifiers] - + model_configs = [(classifier, self.pipeline_options[self.pipeline_options['budget_type']]) + for classifier in available_classifiers] + run_history, _ = run_models_on_dataset( time_left=time_left, func_eval_time_limit_secs=func_eval_time_limit_secs, @@ -1315,7 +1312,8 @@ def refit( X_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, y_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, dataset_name: Optional[str] = None, - resampling_strategy: Union[HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes] = NoResamplingStrategyTypes.no_resampling, + resampling_strategy: Union[ + HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes] = NoResamplingStrategyTypes.no_resampling, resampling_strategy_args: Optional[Dict[str, Any]] = None, total_walltime_limit: int = 120, run_time_limit_secs: int = 60, @@ -1372,7 +1370,7 @@ def refit( if self._logger is None: self._logger = self._get_logger(f"RefitLogger-{self.dataset_name}") - self._logger.debug(f"Starting refit") + self._logger.debug("Starting refit") dataset_requirements = get_dataset_requirements( info=dataset.get_required_dataset_info(), @@ -1420,7 +1418,7 @@ def refit( model = self.models_[identifier] budget = identifier[-1] # identifier is seed, num_run, budget model_configs.append((model.config, budget)) - + self._logger.debug(f"Refitting {model_configs}") run_history, _ = run_models_on_dataset( time_left=total_walltime_limit, @@ -1446,7 +1444,7 @@ def refit( ) replace_old_identifiers_to_refit_identifiers = {} - self._logger.debug(f"Finished refit training") + self._logger.debug("Finished refit training") old_identifier_index = None for run_key, run_value in run_history.data.items(): config = run_value.additional_info['configuration'] @@ -1459,7 +1457,8 @@ def refit( old_identifier_index = i break if old_identifier_index is not None: - replace_old_identifiers_to_refit_identifiers[list(self.models_.keys())[old_identifier_index]] = refit_identifier + replace_old_identifiers_to_refit_identifiers[ + list(self.models_.keys())[old_identifier_index]] = refit_identifier else: self._logger.warning(f"Refit for {config} failed. Updating ensemble weights accordingly.") old_identifier_index = None diff --git a/autoPyTorch/evaluation/abstract_evaluator.py b/autoPyTorch/evaluation/abstract_evaluator.py index b0ff086c4..69fb090f5 100644 --- a/autoPyTorch/evaluation/abstract_evaluator.py +++ b/autoPyTorch/evaluation/abstract_evaluator.py @@ -806,7 +806,8 @@ def finish_up(self, loss: Dict[str, float], train_loss: Dict[str, float], additional_run_info['test_loss'] = test_loss # Add information to additional info that can be useful for other functionalities - additional_run_info['configuration'] = self.configuration if not isinstance(self.configuration, Configuration) else self.configuration.get_dictionary() + additional_run_info['configuration'] = self.configuration \ + if not isinstance(self.configuration, Configuration) else self.configuration.get_dictionary() additional_run_info['budget'] = self.budget rval_dict = {'loss': cost, diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 117dfc7ef..9ce577e82 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -1,7 +1,8 @@ -import time +import logging import math -from typing import Any, Dict, List, Tuple, Union +import time import unittest +from typing import Dict, List, Optional, Tuple, Union from ConfigSpace.configuration_space import Configuration, ConfigurationSpace @@ -11,67 +12,72 @@ from smac.stats.stats import Stats from smac.tae import StatusType -from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash from autoPyTorch.automl_common.common.utils.backend import Backend +from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash +from autoPyTorch.evaluation.utils import DisableFileOutputParameters +from autoPyTorch.pipeline.components.training.metrics.base import autoPyTorchMetric from autoPyTorch.utils.common import dict_repr +from autoPyTorch.utils.hyperparameter_search_space_update import HyperparameterSearchSpaceUpdates +from autoPyTorch.utils.logging_ import PicklableClientLogger def run_models_on_dataset( time_left: int, func_eval_time_limit_secs: int, - model_configs: List[Tuple[Union[str, Configuration]]], - logger, - logger_port, - metric, + model_configs: List[Tuple[str, Configuration]], + logger: PicklableClientLogger, + metric: autoPyTorchMetric, dask_client: dask.distributed.Client, backend: Backend, - memory_limit: int, - disable_file_output, all_supported_metrics: bool, - include, - exclude, - search_space_updates, - pipeline_options, seed: int, - multiprocessing_context, + multiprocessing_context: str, n_jobs: int, current_search_space: ConfigurationSpace, - smac_initial_run: int -) -> Union[RunHistory, List[Tuple[int, int, float]]]: + smac_initial_run: int, + include: Optional[Dict[str, List[str]]] = None, + exclude: Optional[Dict[str, List[str]]] = None, + search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None, + logger_port: Optional[int] = logging.handlers.DEFAULT_TCP_LOGGING_PORT, + memory_limit: Optional[int] = None, + disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, + pipeline_options: Optional[Dict] = None, +) -> Tuple[RunHistory, List[Optional[Tuple[int, int, float]]]]: """ Runs models specified by `model_configs` on dask parallel infrastructure. Args: time_left (int): _description_ func_eval_time_limit_secs (int): _description_ - model_configs (List[Tuple[Union[str, Configuration]]]): _description_ - logger (_type_): _description_ - logger_port (_type_): _description_ - metric (_type_): _description_ + model_configs (List[Tuple[str, Configuration]]): _description_ + logger (PicklableClientLogger): _description_ + metric (autoPyTorchMetric): _description_ dask_client (dask.distributed.Client): _description_ backend (Backend): _description_ memory_limit (int): _description_ disable_file_output (_type_): _description_ all_supported_metrics (bool): _description_ - include (_type_): _description_ - exclude (_type_): _description_ - search_space_updates (_type_): _description_ pipeline_options (_type_): _description_ seed (int): _description_ - multiprocessing_context (_type_): _description_ + multiprocessing_context (str): _description_ n_jobs (int): _description_ current_search_space (ConfigurationSpace): _description_ smac_initial_run (int): _description_ + include (Optional[Dict[str, List[str]]], optional): _description_. Defaults to None. + exclude (Optional[Dict[str, List[str]]], optional): _description_. Defaults to None. + search_space_updates (Optional[HyperparameterSearchSpaceUpdates], optional): _description_. Defaults to None. + logger_port (Optional[int], optional): _description_. Defaults to logging.handlers.DEFAULT_TCP_LOGGING_PORT. Returns: - RunHistory: _description_ + Union[RunHistory, List[Tuple[int, int, float]]]: _description_ """ + starttime = time.time() run_history = RunHistory() memory_limit = memory_limit if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) - model_identifiers = [] + model_identifiers: List[Optional[Tuple[int, int, float]]] = [] total_models = len(model_configs) dask_futures = [] for n_r, (config, budget) in enumerate(model_configs): @@ -149,23 +155,24 @@ def run_models_on_dataset( logger.info( "Fitting {} took {} [sec] and got performance: {}.\n" "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) - ) - origin = additional_info['configuration_origin'] - config = additional_info['configuration'] - budget = additional_info['budget'] + ) + origin: str = additional_info['configuration_origin'] + current_config: Union[str, dict] = additional_info['configuration'] + current_budget: float = additional_info['budget'] # indicates the finished model is part of autopytorch search space - if isinstance(config, dict): - configuration = Configuration(current_search_space, config) + if isinstance(current_config, dict): + configuration = Configuration(current_search_space, current_config) # type: ignore[misc] else: - # we assume that it is a traditional model and `pipeline_configuration` specifies the configuration. + # we assume that it is a traditional model and `pipeline_configuration` + # specifies the configuration. configuration = additional_info.pop('pipeline_configuration') run_history.add(config=configuration, cost=cost, time=runtime, status=status, seed=seed, starttime=starttime, endtime=starttime + runtime, origin=origin, additional_info=additional_info) - model_identifiers.append((seed, additional_info['num_run'], float(budget))) + model_identifiers.append((seed, additional_info['num_run'], float(current_budget))) else: if additional_info.get('exitcode') == -6: logger.error( @@ -192,4 +199,5 @@ def run_models_on_dataset( logger.warning("Not enough time to fit all machine learning models." "Please consider increasing the run time to further improve performance.") break + return run_history, model_identifiers diff --git a/test/test_evaluation/test_forecasting_evaluators.py b/test/test_evaluation/test_forecasting_evaluators.py index 66c026e9c..472d66501 100644 --- a/test/test_evaluation/test_forecasting_evaluators.py +++ b/test/test_evaluation/test_forecasting_evaluators.py @@ -61,7 +61,7 @@ def test_budget_type_choices(self, pipeline_mock): configuration=configuration, metric=mean_MASE_forecasting, budget=0, pipeline_options={'budget_type': budget_type, - budget_type: 0.1}, + budget_type: 0.1}, min_num_test_instances=100) self.assertTrue('epochs' not in evaluator.fit_dictionary) if budget_type == 'resolution': From 916d185a986e8dfb2398624e7a445d392001695d Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 16 Aug 2022 18:23:06 +0200 Subject: [PATCH 05/18] suggestions from review --- autoPyTorch/ensemble/abstract_ensemble.py | 2 +- autoPyTorch/utils/parallel_model_runner.py | 119 +++++++++++------- .../example_tabular_classification.py | 8 +- 3 files changed, 80 insertions(+), 49 deletions(-) diff --git a/autoPyTorch/ensemble/abstract_ensemble.py b/autoPyTorch/ensemble/abstract_ensemble.py index 9ad2060e0..0a3174ea9 100644 --- a/autoPyTorch/ensemble/abstract_ensemble.py +++ b/autoPyTorch/ensemble/abstract_ensemble.py @@ -85,7 +85,7 @@ def update_identifiers( self, replace_identifiers_mapping: Dict[Tuple[int, int, float], Tuple[int, int, float]] ) -> None: - identifiers = copy(self.identifiers_) + identifiers = self.identifiers_.copy() for i, identifier in enumerate(self.identifiers_): identifiers[i] = replace_identifiers_mapping.get(identifier, identifier) self.identifiers_ = identifiers diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 9ce577e82..c66bf8152 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -74,12 +74,11 @@ def run_models_on_dataset( starttime = time.time() run_history = RunHistory() - memory_limit = memory_limit if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) model_identifiers: List[Optional[Tuple[int, int, float]]] = [] total_models = len(model_configs) - dask_futures = [] + dask_futures: List[dask.distributed.Future] = [] for n_r, (config, budget) in enumerate(model_configs): # Only launch a task if there is time @@ -148,48 +147,16 @@ def run_models_on_dataset( # We launch dask jobs only when there are resources available. # This allow us to control time allocation properly, and early terminate # the traditional machine learning pipeline - cls, future = dask_futures.pop(0) - status, cost, runtime, additional_info = future.result() - - if status == StatusType.SUCCESS: - logger.info( - "Fitting {} took {} [sec] and got performance: {}.\n" - "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) - ) - origin: str = additional_info['configuration_origin'] - current_config: Union[str, dict] = additional_info['configuration'] - current_budget: float = additional_info['budget'] - - # indicates the finished model is part of autopytorch search space - if isinstance(current_config, dict): - configuration = Configuration(current_search_space, current_config) # type: ignore[misc] - else: - # we assume that it is a traditional model and `pipeline_configuration` - # specifies the configuration. - configuration = additional_info.pop('pipeline_configuration') - - run_history.add(config=configuration, cost=cost, - time=runtime, status=status, seed=seed, - starttime=starttime, endtime=starttime + runtime, - origin=origin, additional_info=additional_info) - model_identifiers.append((seed, additional_info['num_run'], float(current_budget))) - else: - if additional_info.get('exitcode') == -6: - logger.error( - "Traditional prediction for {} failed with run state {},\n" - "because the provided memory limits were too tight.\n" - "Please increase the 'ml_memory_limit' and try again.\n" - "If you still get the problem, please open an issue\n" - "and paste the additional info.\n" - "Additional info:\n{}".format(cls, str(status), dict_repr(additional_info)) - ) - else: - logger.error( - "Traditional prediction for {} failed with run state {}.\nAdditional info:\n{}".format( - cls, str(status), dict_repr(additional_info) - ) - ) - model_identifiers.append(None) + _process_result( + current_search_space=current_search_space, + dask_futures=dask_futures, + run_history=run_history, + model_identifiers=model_identifiers, + seed=seed, + starttime=starttime, + logger=logger) + + # In the case of a serial execution, calling submit halts the run for a resource # dynamically adjust time in this case time_left -= int(time.time() - start_time) @@ -201,3 +168,67 @@ def run_models_on_dataset( break return run_history, model_identifiers + + +def _process_result( + dask_futures: List[dask.distributed.Future], + current_search_space: ConfigurationSpace, + run_history: RunHistory, + model_identifiers: List[Tuple[int, int, float]], + seed: int, + starttime: float, + logger: PicklableClientLogger +) -> None: + """ + Update run_history and model_identifiers in-place using results of the + latest finishing model. + + Args: + dask_futures (List[dask.distributed.Future]): _description_ + run_history (RunHistory): _description_ + model_identifiers (List[Tuple[int, int, float]]): _description_ + seed (int): _description_ + starttime (float): _description_ + logger (PicklableClientLogger): _description_ + """ + cls, future = dask_futures.pop(0) + status, cost, runtime, additional_info = future.result() + if status == StatusType.SUCCESS: + logger.info( + "Fitting {} took {} [sec] and got performance: {}.\n" + "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) + ) + origin: str = additional_info['configuration_origin'] + current_config: Union[str, dict] = additional_info['configuration'] + current_budget: float = additional_info['budget'] + + # indicates the finished model is part of autopytorch search space + if isinstance(current_config, dict): + configuration = Configuration(current_search_space, current_config) # type: ignore[misc] + else: + # we assume that it is a traditional model and `pipeline_configuration` + # specifies the configuration. + configuration = additional_info.pop('pipeline_configuration') + + run_history.add(config=configuration, cost=cost, + time=runtime, status=status, seed=seed, + starttime=starttime, endtime=starttime + runtime, + origin=origin, additional_info=additional_info) + model_identifiers.append((seed, additional_info['num_run'], float(current_budget))) + else: + model_identifiers.append(None) + if additional_info.get('exitcode') == -6: + logger.error( + "Traditional prediction for {} failed with run state {},\n" + "because the provided memory limits were too tight.\n" + "Please increase the 'ml_memory_limit' and try again.\n" + "If you still get the problem, please open an issue\n" + "and paste the additional info.\n" + "Additional info:\n{}".format(cls, str(status), dict_repr(additional_info)) + ) + else: + logger.error( + "Traditional prediction for {} failed with run state {}.\nAdditional info:\n{}".format( + cls, str(status), dict_repr(additional_info) + ) + ) diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index b22f92dda..7c7cd68f7 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -42,10 +42,10 @@ api = TabularClassificationTask( # To maintain logs of the run, you can uncomment the # Following lines - # temporary_directory='./tmp/autoPyTorch_example_tmp_01', - # output_directory='./tmp/autoPyTorch_example_out_01', - # delete_tmp_folder_after_terminate=False, - # delete_output_folder_after_terminate=False, + temporary_directory='./tmp/autoPyTorch_example_tmp_01', + output_directory='./tmp/autoPyTorch_example_out_01', + delete_tmp_folder_after_terminate=False, + delete_output_folder_after_terminate=False, seed=42, ) From c7111050bb37247f4076c85dfd36d2021fd67a71 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 16 Aug 2022 18:28:56 +0200 Subject: [PATCH 06/18] fix mypy and flake --- autoPyTorch/ensemble/abstract_ensemble.py | 1 - autoPyTorch/utils/parallel_model_runner.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/autoPyTorch/ensemble/abstract_ensemble.py b/autoPyTorch/ensemble/abstract_ensemble.py index 0a3174ea9..0f04fe38a 100644 --- a/autoPyTorch/ensemble/abstract_ensemble.py +++ b/autoPyTorch/ensemble/abstract_ensemble.py @@ -1,5 +1,4 @@ from abc import ABCMeta, abstractmethod -from copy import copy from typing import Any, Dict, List, Tuple, Union import numpy as np diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index c66bf8152..98e51d45b 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -156,7 +156,6 @@ def run_models_on_dataset( starttime=starttime, logger=logger) - # In the case of a serial execution, calling submit halts the run for a resource # dynamically adjust time in this case time_left -= int(time.time() - start_time) @@ -174,7 +173,7 @@ def _process_result( dask_futures: List[dask.distributed.Future], current_search_space: ConfigurationSpace, run_history: RunHistory, - model_identifiers: List[Tuple[int, int, float]], + model_identifiers: List[Optional[Tuple[int, int, float]]], seed: int, starttime: float, logger: PicklableClientLogger From f7de6109ded0e4451831cd3eb8da0c5d70795493 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 16 Aug 2022 18:44:49 +0200 Subject: [PATCH 07/18] suggestions from review --- autoPyTorch/api/base_task.py | 2 +- examples/20_basics/example_tabular_classification.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index c8b4cb8d0..ff30d7ba8 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -1460,7 +1460,7 @@ def refit( replace_old_identifiers_to_refit_identifiers[ list(self.models_.keys())[old_identifier_index]] = refit_identifier else: - self._logger.warning(f"Refit for {config} failed. Updating ensemble weights accordingly.") + self._logger.warning(f"Refit for {config} failed. Model fitted during search will be used instead.") old_identifier_index = None self.ensemble_.update_identifiers(replace_old_identifiers_to_refit_identifiers) diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index 7c7cd68f7..b22f92dda 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -42,10 +42,10 @@ api = TabularClassificationTask( # To maintain logs of the run, you can uncomment the # Following lines - temporary_directory='./tmp/autoPyTorch_example_tmp_01', - output_directory='./tmp/autoPyTorch_example_out_01', - delete_tmp_folder_after_terminate=False, - delete_output_folder_after_terminate=False, + # temporary_directory='./tmp/autoPyTorch_example_tmp_01', + # output_directory='./tmp/autoPyTorch_example_out_01', + # delete_tmp_folder_after_terminate=False, + # delete_output_folder_after_terminate=False, seed=42, ) From ecabb4d84d1e7276aea162d3c2dd15da73b63f76 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 16 Aug 2022 19:39:23 +0200 Subject: [PATCH 08/18] finish documentation --- autoPyTorch/api/base_task.py | 8 +- autoPyTorch/utils/parallel_model_runner.py | 142 +++++++++++++++------ 2 files changed, 106 insertions(+), 44 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index ff30d7ba8..6855e5e92 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -812,7 +812,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: model_configs = [(classifier, self.pipeline_options[self.pipeline_options['budget_type']]) for classifier in available_classifiers] - run_history, _ = run_models_on_dataset( + run_history = run_models_on_dataset( time_left=time_left, func_eval_time_limit_secs=func_eval_time_limit_secs, model_configs=model_configs, @@ -832,7 +832,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: multiprocessing_context=self._multiprocessing_context, n_jobs=self.n_jobs, current_search_space=self.search_space, - smac_initial_run=self._backend.get_next_num_run() + initial_num_run=self._backend.get_next_num_run() ) self._logger.debug("Run history traditional: {}".format(run_history)) @@ -1420,7 +1420,7 @@ def refit( model_configs.append((model.config, budget)) self._logger.debug(f"Refitting {model_configs}") - run_history, _ = run_models_on_dataset( + run_history = run_models_on_dataset( time_left=total_walltime_limit, func_eval_time_limit_secs=run_time_limit_secs, model_configs=model_configs, @@ -1440,7 +1440,7 @@ def refit( multiprocessing_context=self._multiprocessing_context, n_jobs=self.n_jobs, current_search_space=self.search_space, - smac_initial_run=self._backend.get_next_num_run() + initial_num_run=self._backend.get_next_num_run() ) replace_old_identifiers_to_refit_identifiers = {} diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 98e51d45b..00cb0e78e 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -34,7 +34,7 @@ def run_models_on_dataset( multiprocessing_context: str, n_jobs: int, current_search_space: ConfigurationSpace, - smac_initial_run: int, + initial_num_run: int, include: Optional[Dict[str, List[str]]] = None, exclude: Optional[Dict[str, List[str]]] = None, search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None, @@ -42,41 +42,104 @@ def run_models_on_dataset( memory_limit: Optional[int] = None, disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, pipeline_options: Optional[Dict] = None, -) -> Tuple[RunHistory, List[Optional[Tuple[int, int, float]]]]: +) -> RunHistory: """ Runs models specified by `model_configs` on dask parallel infrastructure. Args: - time_left (int): _description_ - func_eval_time_limit_secs (int): _description_ - model_configs (List[Tuple[str, Configuration]]): _description_ - logger (PicklableClientLogger): _description_ - metric (autoPyTorchMetric): _description_ - dask_client (dask.distributed.Client): _description_ - backend (Backend): _description_ - memory_limit (int): _description_ - disable_file_output (_type_): _description_ - all_supported_metrics (bool): _description_ - pipeline_options (_type_): _description_ - seed (int): _description_ - multiprocessing_context (str): _description_ - n_jobs (int): _description_ - current_search_space (ConfigurationSpace): _description_ - smac_initial_run (int): _description_ - include (Optional[Dict[str, List[str]]], optional): _description_. Defaults to None. - exclude (Optional[Dict[str, List[str]]], optional): _description_. Defaults to None. - search_space_updates (Optional[HyperparameterSearchSpaceUpdates], optional): _description_. Defaults to None. - logger_port (Optional[int], optional): _description_. Defaults to logging.handlers.DEFAULT_TCP_LOGGING_PORT. + time_left (int): + Time limit in seconds for the search of appropriate models. + By increasing this value, autopytorch has a higher + chance of finding better models. + func_eval_time_limit_secs (int): + Time limit for a single call to the machine learning model. + Model fitting will be terminated if the machine + learning algorithm runs over the time limit. Set + this value high enough so that typical machine + learning algorithms can be fit on the training + data. + Set to np.inf in case no time limit is desired. + model_configs (List[Tuple[str, Configuration]]): + List containing the configuration and the budget for the model to be evaluated. + logger (PicklableClientLogger): + Logger + metric (autoPyTorchMetric): + autoPyTorchMetric to be used for evaluation. + dask_client (dask.distributed.Client): + dask client where the function evaluation jobs are submitted. + backend (Backend): + Current backend object where the data is stored. The backend + is used to interact with the disk. + all_supported_metrics (bool): + If True, all metrics supporting current task will be calculated + for each pipeline. + seed (int): + Seed to be used for reproducibility. + multiprocessing_context (str): + context used for spawning child processes. + n_jobs (int): + Number of consecutive processes to spawn. + current_search_space (ConfigurationSpace): + The search space of the neural networks which will be used to instantiate Configuration objects. + initial_num_run (int): + Initial num run for running the models. + include (Optional[Dict[str, List[str]]]): + Dictionary containing components to include. Key is the node + name and Value is an Iterable of the names of the components + to include. Only these components will be present in the + search space. Defaults to None. + exclude (Optional[Dict[str, List[str]]]): + Dictionary containing components to exclude. Key is the node + name and Value is an Iterable of the names of the components + to exclude. All except these components will be present in + the search space. Defaults to None. + search_space_updates (Optional[HyperparameterSearchSpaceUpdates]): + Search space updates that can be used to modify the search + space of particular components or choice modules of the pipeline. + Defaults to None. + logger_port (Optional[int]): + Port used to create the logging server. Defaults to logging.handlers.DEFAULT_TCP_LOGGING_PORT. + memory_limit (Optional[int]): + Memory limit in MB for the machine learning algorithm. + Autopytorch will stop fitting the machine learning algorithm + if it tries to allocate more than memory_limit MB. If None + is provided, no memory limit is set. In case of multi-processing, + memory_limit will be per job. This memory limit also applies to + the ensemble creation process. Defaults to None. + disable_file_output (Optional[List[Union[str, DisableFileOutputParameters]]]): + Used as a list to pass more fine-grained + information on what to save. Must be a member of `DisableFileOutputParameters`. + Allowed elements in the list are: + + + `y_optimization`: + do not save the predictions for the optimization set, + which would later on be used to build an ensemble. Note that SMAC + optimizes a metric evaluated on the optimization set. + + `pipeline`: + do not save any individual pipeline files + + `pipelines`: + In case of cross validation, disables saving the joint model of the + pipelines fit on each fold. + + `y_test`: + do not save the predictions for the test set. + + `all`: + do not save any of the above. + For more information check `autoPyTorch.evaluation.utils.DisableFileOutputParameters`. + Defaults to None. + pipeline_options (Optional[Dict]): + Valid config options include "device", + "torch_num_threads", "early_stopping", "use_tensorboard_logger", + "metrics_during_training". Returns: - Union[RunHistory, List[Tuple[int, int, float]]]: _description_ + RunHistory: + run_history: + Run History of training all the models in model_configs """ - starttime = time.time() run_history = RunHistory() if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) - model_identifiers: List[Optional[Tuple[int, int, float]]] = [] total_models = len(model_configs) dask_futures: List[dask.distributed.Future] = [] for n_r, (config, budget) in enumerate(model_configs): @@ -94,9 +157,9 @@ def run_models_on_dataset( if isinstance(config, Configuration): config.config_id = n_r - init_num_run = smac_initial_run + init_num_run = initial_num_run else: - init_num_run = smac_initial_run + n_r + init_num_run = initial_num_run + n_r ta = ExecuteTaFuncWithQueue( pynisher_context=multiprocessing_context, @@ -151,7 +214,6 @@ def run_models_on_dataset( current_search_space=current_search_space, dask_futures=dask_futures, run_history=run_history, - model_identifiers=model_identifiers, seed=seed, starttime=starttime, logger=logger) @@ -166,29 +228,32 @@ def run_models_on_dataset( "Please consider increasing the run time to further improve performance.") break - return run_history, model_identifiers + return run_history def _process_result( dask_futures: List[dask.distributed.Future], current_search_space: ConfigurationSpace, run_history: RunHistory, - model_identifiers: List[Optional[Tuple[int, int, float]]], seed: int, starttime: float, logger: PicklableClientLogger ) -> None: """ - Update run_history and model_identifiers in-place using results of the + Update run_history in-place using results of the latest finishing model. Args: - dask_futures (List[dask.distributed.Future]): _description_ - run_history (RunHistory): _description_ - model_identifiers (List[Tuple[int, int, float]]): _description_ - seed (int): _description_ - starttime (float): _description_ - logger (PicklableClientLogger): _description_ + dask_futures (List[dask.distributed.Future]): + List of dask futures which are used to get the results of a finished run. + run_history (RunHistory): + RunHistory object to be appended with the finished run + seed (int): + Seed used for reproducibility. + starttime (float): + starttime of the runs. + logger (PicklableClientLogger): + Logger. """ cls, future = dask_futures.pop(0) status, cost, runtime, additional_info = future.result() @@ -199,7 +264,6 @@ def _process_result( ) origin: str = additional_info['configuration_origin'] current_config: Union[str, dict] = additional_info['configuration'] - current_budget: float = additional_info['budget'] # indicates the finished model is part of autopytorch search space if isinstance(current_config, dict): @@ -213,9 +277,7 @@ def _process_result( time=runtime, status=status, seed=seed, starttime=starttime, endtime=starttime + runtime, origin=origin, additional_info=additional_info) - model_identifiers.append((seed, additional_info['num_run'], float(current_budget))) else: - model_identifiers.append(None) if additional_info.get('exitcode') == -6: logger.error( "Traditional prediction for {} failed with run state {},\n" From 5071ac6870482ae7a66ef7c81226fb648e8a245a Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Wed, 17 Aug 2022 15:45:07 +0200 Subject: [PATCH 09/18] fix tests --- autoPyTorch/api/base_task.py | 5 +++++ test/test_api/utils.py | 5 +++++ test/test_evaluation/test_evaluators.py | 4 ++++ test/test_evaluation/test_forecasting_evaluators.py | 5 ++++- test/test_utils/test_parallel_model_runner.py | 4 ++++ 5 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 test/test_utils/test_parallel_model_runner.py diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 6855e5e92..0e2a8e727 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -1372,6 +1372,11 @@ def refit( self._logger.debug("Starting refit") + if self.n_jobs == 1: + self._dask_client = SingleThreadedClient() + else: + self._create_dask_client() + dataset_requirements = get_dataset_requirements( info=dataset.get_required_dataset_info(), include=self.include_components, diff --git a/test/test_api/utils.py b/test/test_api/utils.py index 22e8c2a63..701c455c1 100644 --- a/test/test_api/utils.py +++ b/test/test_api/utils.py @@ -63,6 +63,11 @@ def _fit_and_predict(self, pipeline, fold: int, train_indices, test_indices=test_indices, ) + # the configuration is used in refit where + # pipeline.config is used to retrieve the + # original configuration. + pipeline.config = self.configuration + if add_pipeline_to_self: self.pipeline = pipeline else: diff --git a/test/test_evaluation/test_evaluators.py b/test/test_evaluation/test_evaluators.py index 7261a81fa..0f0f15cdc 100644 --- a/test/test_evaluation/test_evaluators.py +++ b/test/test_evaluation/test_evaluators.py @@ -97,6 +97,7 @@ def test_holdout(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -136,6 +137,7 @@ def test_cv(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -278,6 +280,7 @@ def test_additional_metrics_during_training(self, pipeline_mock): D = get_binary_classification_datamanager() configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -339,6 +342,7 @@ def test_no_resampling(self, pipeline_mock): pipeline_mock.get_default_pipeline_options.return_value = {'budget_type': 'epochs', 'epochs': 10} configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, 'autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() diff --git a/test/test_evaluation/test_forecasting_evaluators.py b/test/test_evaluation/test_forecasting_evaluators.py index 472d66501..5eea055df 100644 --- a/test/test_evaluation/test_forecasting_evaluators.py +++ b/test/test_evaluation/test_forecasting_evaluators.py @@ -85,6 +85,7 @@ def test_holdout(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -140,6 +141,7 @@ def test_cv(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -148,7 +150,7 @@ def test_cv(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_optionss={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -189,6 +191,7 @@ def test_proxy_val_set(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() diff --git a/test/test_utils/test_parallel_model_runner.py b/test/test_utils/test_parallel_model_runner.py new file mode 100644 index 000000000..92e875780 --- /dev/null +++ b/test/test_utils/test_parallel_model_runner.py @@ -0,0 +1,4 @@ +import pytest + +def test_run_models_on_dataset(backend, ): + pass \ No newline at end of file From 56f4a73a85ff5a0f1cdf34aa9b5a9e81bc34e876 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Wed, 17 Aug 2022 16:46:21 +0200 Subject: [PATCH 10/18] add test for parallel model runner --- autoPyTorch/utils/parallel_model_runner.py | 10 +-- test/test_utils/test_parallel_model_runner.py | 68 ++++++++++++++++++- 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 00cb0e78e..0216017c3 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -29,12 +29,12 @@ def run_models_on_dataset( metric: autoPyTorchMetric, dask_client: dask.distributed.Client, backend: Backend, - all_supported_metrics: bool, seed: int, multiprocessing_context: str, - n_jobs: int, current_search_space: ConfigurationSpace, - initial_num_run: int, + n_jobs: int = 1, + initial_num_run: int = 1, + all_supported_metrics: bool = False, include: Optional[Dict[str, List[str]]] = None, exclude: Optional[Dict[str, List[str]]] = None, search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None, @@ -280,7 +280,7 @@ def _process_result( else: if additional_info.get('exitcode') == -6: logger.error( - "Traditional prediction for {} failed with run state {},\n" + "Prediction for {} failed with run state {},\n" "because the provided memory limits were too tight.\n" "Please increase the 'ml_memory_limit' and try again.\n" "If you still get the problem, please open an issue\n" @@ -289,7 +289,7 @@ def _process_result( ) else: logger.error( - "Traditional prediction for {} failed with run state {}.\nAdditional info:\n{}".format( + "Prediction for {} failed with run state {}.\nAdditional info:\n{}".format( cls, str(status), dict_repr(additional_info) ) ) diff --git a/test/test_utils/test_parallel_model_runner.py b/test/test_utils/test_parallel_model_runner.py index 92e875780..e554805af 100644 --- a/test/test_utils/test_parallel_model_runner.py +++ b/test/test_utils/test_parallel_model_runner.py @@ -1,4 +1,68 @@ import pytest +import unittest.mock -def test_run_models_on_dataset(backend, ): - pass \ No newline at end of file +from ConfigSpace import Configuration + +from smac.tae import StatusType + +from autoPyTorch.pipeline.components.training.metrics.utils import get_metrics +from autoPyTorch.utils.single_thread_client import SingleThreadedClient +from autoPyTorch.utils.logging_ import PicklableClientLogger +from autoPyTorch.utils.parallel_model_runner import run_models_on_dataset +from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements + +from test.test_evaluation.evaluation_util import get_binary_classification_datamanager +from test.test_api.utils import dummy_eval_train_function + + +@unittest.mock.patch('autoPyTorch.evaluation.tae.eval_train_function', + new=dummy_eval_train_function) +def test_run_models_on_dataset(backend): + dataset = get_binary_classification_datamanager() + backend.save_datamanager(dataset) + # Search for a good configuration + dataset_requirements = get_dataset_requirements( + info=dataset.get_required_dataset_info() + ) + dataset_properties = dataset.get_dataset_properties(dataset_requirements) + search_space = get_configuration_space(info=dataset_properties) + num_random_configs = 5 + model_configurations = [(search_space.sample_configuration(), 1) for _ in range(num_random_configs)] + # Add a traditional model + model_configurations.append(('lgb', 1)) + + metric = get_metrics(dataset_properties=dataset_properties, + names=["accuracy"], + all_supported_metrics=False).pop() + logger = unittest.mock.Mock(spec=PicklableClientLogger) + + dask_client = SingleThreadedClient() + + runhistory = run_models_on_dataset( + time_left=15, + func_eval_time_limit_secs=5, + model_configs=model_configurations, + logger=logger, + metric=metric, + dask_client=dask_client, + backend=backend, + seed=1, + multiprocessing_context="fork", + current_search_space=search_space, + ) + + has_successful_model = False + has_matching_config = False + # assert atleast 1 successfully fitted model + for run_key, run_value in runhistory.data.items(): + if run_value.status == StatusType.SUCCESS: + has_successful_model = True + configuration = run_value.additional_info['configuration'] + for (config, _) in model_configurations: + if isinstance(config, Configuration): + config = config.get_dictionary() + if config == configuration: + has_matching_config = True + + assert has_successful_model, "Atleast 1 model should be successfully trained" + assert has_matching_config, "Configurations should match with the passed model configurations" From 52a8f422dfbd06b737d553380b152560a6e9c58b Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Wed, 17 Aug 2022 16:52:45 +0200 Subject: [PATCH 11/18] fix flake --- examples/20_basics/example_tabular_classification.py | 2 +- test/test_utils/test_parallel_model_runner.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index b22f92dda..291e017ac 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -10,7 +10,7 @@ import tempfile as tmp import warnings -from autoPyTorch.datasets.resampling_strategy import CrossValTypes, NoResamplingStrategyTypes +from autoPyTorch.datasets.resampling_strategy import CrossValTypes os.environ['JOBLIB_TEMP_FOLDER'] = tmp.gettempdir() os.environ['OMP_NUM_THREADS'] = '1' diff --git a/test/test_utils/test_parallel_model_runner.py b/test/test_utils/test_parallel_model_runner.py index e554805af..a0a163f6e 100644 --- a/test/test_utils/test_parallel_model_runner.py +++ b/test/test_utils/test_parallel_model_runner.py @@ -1,18 +1,16 @@ -import pytest import unittest.mock +from test.test_api.utils import dummy_eval_train_function +from test.test_evaluation.evaluation_util import get_binary_classification_datamanager from ConfigSpace import Configuration from smac.tae import StatusType from autoPyTorch.pipeline.components.training.metrics.utils import get_metrics -from autoPyTorch.utils.single_thread_client import SingleThreadedClient from autoPyTorch.utils.logging_ import PicklableClientLogger from autoPyTorch.utils.parallel_model_runner import run_models_on_dataset from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements - -from test.test_evaluation.evaluation_util import get_binary_classification_datamanager -from test.test_api.utils import dummy_eval_train_function +from autoPyTorch.utils.single_thread_client import SingleThreadedClient @unittest.mock.patch('autoPyTorch.evaluation.tae.eval_train_function', @@ -32,8 +30,8 @@ def test_run_models_on_dataset(backend): model_configurations.append(('lgb', 1)) metric = get_metrics(dataset_properties=dataset_properties, - names=["accuracy"], - all_supported_metrics=False).pop() + names=["accuracy"], + all_supported_metrics=False).pop() logger = unittest.mock.Mock(spec=PicklableClientLogger) dask_client = SingleThreadedClient() From dec24357a79a3de5e3abae53cd9e9ac93cd22969 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Thu, 18 Aug 2022 00:44:37 +0200 Subject: [PATCH 12/18] fix tests --- autoPyTorch/api/base_task.py | 130 ++++++++++++++---- .../example_tabular_classification.py | 8 +- test/test_api/test_api.py | 73 ++++++++-- 3 files changed, 172 insertions(+), 39 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 0e2a8e727..ca68333f9 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -636,7 +636,6 @@ def _close_dask_client(self) -> None: def _load_models( self, - resampling_strategy: Optional[Union[CrossValTypes, HoldoutValTypes, NoResamplingStrategyTypes]] = None ) -> bool: """ @@ -646,9 +645,9 @@ def _load_models( Returns: None """ - resampling_strategy = resampling_strategy if resampling_strategy is not None else self.resampling_strategy - if resampling_strategy is None: + if self.resampling_strategy is None: raise ValueError("Resampling strategy is needed to determine what models to load") + self.ensemble_ = self._backend.load_ensemble(self.seed) # If no ensemble is loaded, try to get the best performing model @@ -658,10 +657,10 @@ def _load_models( if self.ensemble_: identifiers = self.ensemble_.get_selected_model_identifiers() self.models_ = self._backend.load_models_by_identifiers(identifiers) - if isinstance(resampling_strategy, CrossValTypes): + if isinstance(self.resampling_strategy, CrossValTypes): self.cv_models_ = self._backend.load_cv_models_by_identifiers(identifiers) - if isinstance(resampling_strategy, CrossValTypes): + if isinstance(self.resampling_strategy, CrossValTypes): if len(self.cv_models_) == 0: raise ValueError('No models fitted!') @@ -1312,8 +1311,7 @@ def refit( X_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, y_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, dataset_name: Optional[str] = None, - resampling_strategy: Union[ - HoldoutValTypes, CrossValTypes, NoResamplingStrategyTypes] = NoResamplingStrategyTypes.no_resampling, + resampling_strategy: ResamplingStrategies = NoResamplingStrategyTypes.no_resampling, resampling_strategy_args: Optional[Dict[str, Any]] = None, total_walltime_limit: int = 120, run_time_limit_secs: int = 60, @@ -1326,26 +1324,93 @@ def refit( disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, ) -> "BaseTask": """ - Refit all models found with fit to new data. - - Necessary when using cross-validation. During training, autoPyTorch - fits each model k times on the dataset, but does not keep any trained - model and can therefore not be used to predict for new data points. - This methods fits all models found during a call to fit on the data - given. This method may also be used together with holdout to avoid - only using 66% of the training data to fit the final model. + Fit all the models found in the ensemble on the whole training set X_train. + Therefore, we recommend using `NoResamplingStrategy` to be able to do that. Nevertheless, it + is still able to fit using other splitting techniques such as hold out or cross validation. Refit uses the estimator pipeline_options attribute, which the user can interact via the get_pipeline_options()/set_pipeline_options() methods. Args: - dataset (Dataset): - The argument that will provide the dataset splits. It can either - be a dictionary with the splits, or the dataset object which can - generate the splits based on different restrictions. - split_id (int): - split id to fit on. + dataset (BaseDataset): + An object of the appropriate child class of `BaseDataset`, + that will be used to fit the pipeline + X_train, y_train, X_test, y_test: Union[np.ndarray, List, pd.DataFrame] + A pair of features (X_train) and targets (y_train) used to fit a + pipeline. Additionally, a holdout of this pairs (X_test, y_test) can + be provided to track the generalization performance of each stage. + dataset_name (Optional[str]): + Name of the dataset, if None, random value is used. + resampling_strategy (ResamplingStrategies): + Strategy to split the training data. Defaults to + NoResamplingStrategyTypes.no_resampling. + resampling_strategy_args (Optional[Dict[str, Any]]): + Arguments required for the chosen resampling strategy. If None, uses + the default values provided in DEFAULT_RESAMPLING_PARAMETERS + in ```datasets/resampling_strategy.py```. + dataset_name (Optional[str]): + name of the dataset, used as experiment name. + total_walltime_limit (int): + Total time that can be used by all the models to be refitted. Defaults to 120. + run_time_limit_secs (int: default=60): + Time limit for a single call to the machine learning model. + Model fitting will be terminated if the machine learning algorithm + runs over the time limit. Set this value high enough so that + typical machine learning algorithms can be fit on the training + data. + memory_limit (Optional[int]): + Memory limit in MB for the machine learning algorithm. autopytorch + will stop fitting the machine learning algorithm if it tries + to allocate more than memory_limit MB. If None is provided, + no memory limit is set. In case of multi-processing, memory_limit + will be per job. This memory limit also applies to the ensemble + creation process. + eval_metric (Optional[str]): + Name of the metric that is used to evaluate a pipeline. + all_supported_metrics (bool: default=True): + if True, all metrics supporting current task will be calculated + for each pipeline and results will be available via cv_results + budget_type (str): + Type of budget to be used when fitting the pipeline. + It can be one of: + + + `epochs`: The training of each pipeline will be terminated after + a number of epochs have passed. This number of epochs is determined by the + budget argument of this method. + + `runtime`: The training of each pipeline will be terminated after + a number of seconds have passed. This number of seconds is determined by the + budget argument of this method. The overall fitting time of a pipeline is + controlled by func_eval_time_limit_secs. 'runtime' only controls the allocated + time to train a pipeline, but it does not consider the overall time it takes + to create a pipeline (data loading and preprocessing, other i/o operations, etc.). + budget (Optional[float]): + Budget to fit a single run of the pipeline. If not + provided, uses the default in the pipeline config + pipeline_options (Optional[Dict]): + Valid config options include "device", + "torch_num_threads", "early_stopping", "use_tensorboard_logger", + "metrics_during_training" + disable_file_output (Optional[List[Union[str, DisableFileOutputParameters]]]): + Used as a list to pass more fine-grained + information on what to save. Must be a member of `DisableFileOutputParameters`. + Allowed elements in the list are: + + + `y_optimization`: + do not save the predictions for the optimization set, + which would later on be used to build an ensemble. Note that SMAC + optimizes a metric evaluated on the optimization set. + + `pipeline`: + do not save any individual pipeline files + + `pipelines`: + In case of cross validation, disables saving the joint model of the + pipelines fit on each fold. + + `y_test`: + do not save the predictions for the test set. + + `all`: + do not save any of the above. + For more information check `autoPyTorch.evaluation.utils.DisableFileOutputParameters`. + Returns: self """ @@ -1367,6 +1432,9 @@ def refit( self.dataset_name = dataset.dataset_name + # Used when loading models + self.resampling_strategy = resampling_strategy + if self._logger is None: self._logger = self._get_logger(f"RefitLogger-{self.dataset_name}") @@ -1383,6 +1451,7 @@ def refit( exclude=self.exclude_components, search_space_updates=self.search_space_updates) dataset_properties = dataset.get_dataset_properties(dataset_requirements) + self._backend.save_datamanager(dataset) scenario_mock = unittest.mock.Mock() @@ -1447,13 +1516,13 @@ def refit( current_search_space=self.search_space, initial_num_run=self._backend.get_next_num_run() ) + replace_old_identifiers_to_refit_identifiers = {} self._logger.debug("Finished refit training") old_identifier_index = None - for run_key, run_value in run_history.data.items(): + for _, run_value in run_history.data.items(): config = run_value.additional_info['configuration'] - refit_identifier = (self.seed, run_value.additional_info['num_run'], run_key.budget) for i, (configuration, _) in enumerate(model_configs): if isinstance(configuration, Configuration): configuration = configuration.get_dictionary() @@ -1462,14 +1531,21 @@ def refit( old_identifier_index = i break if old_identifier_index is not None: - replace_old_identifiers_to_refit_identifiers[ - list(self.models_.keys())[old_identifier_index]] = refit_identifier + old_identifier = list(self.models_.keys())[old_identifier_index] + refit_identifier = (self.seed, run_value.additional_info['num_run'], old_identifier[2]) + replace_old_identifiers_to_refit_identifiers[old_identifier] = refit_identifier else: - self._logger.warning(f"Refit for {config} failed. Model fitted during search will be used instead.") + warnings.warn(f"Refit for {config} failed. Model fitted during search will be used instead.") old_identifier_index = None self.ensemble_.update_identifiers(replace_old_identifiers_to_refit_identifiers) - self._load_models(resampling_strategy=resampling_strategy) + self.run_history.update(run_history, DataOrigin.EXTERNAL_SAME_INSTANCES) + run_history.save_json(os.path.join(self._backend.internals_directory, 'refit_run_history.json'), + save_external=True) + # store ensemble for later use, with large iteration + self._backend.save_ensemble(self.ensemble_, 10**8, self.seed) + + self._load_models() self._clean_logger() return self diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index 291e017ac..74a37c94b 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -42,10 +42,10 @@ api = TabularClassificationTask( # To maintain logs of the run, you can uncomment the # Following lines - # temporary_directory='./tmp/autoPyTorch_example_tmp_01', - # output_directory='./tmp/autoPyTorch_example_out_01', - # delete_tmp_folder_after_terminate=False, - # delete_output_folder_after_terminate=False, + temporary_directory='./tmp/autoPyTorch_example_tmp_01', + output_directory='./tmp/autoPyTorch_example_out_01', + delete_tmp_folder_after_terminate=False, + delete_output_folder_after_terminate=False, seed=42, ) diff --git a/test/test_api/test_api.py b/test/test_api/test_api.py index 465d74c6b..157496138 100644 --- a/test/test_api/test_api.py +++ b/test/test_api/test_api.py @@ -46,6 +46,43 @@ HOLDOUT_NUM_SPLITS = 1 +def refit_test_estimator( + estimator, + X_train, + y_train, + X_test, + y_test, +): + estimator.refit( + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + + # Check if the refit models are actually used in the new ensemble. + refit_ensemble_identifiers = estimator.ensemble_.get_selected_model_identifiers() + refit_run_history_path = os.path.join(estimator._backend.internals_directory, 'refit_run_history.json') + assert os.path.exists(refit_run_history_path) + + refit_run_history: RunHistory = RunHistory() + refit_run_history.update_from_json(refit_run_history_path, estimator.search_space) + + all_refit_runs_in_new_ensemble = [] + model_num_runs = [] + for run_key, run_value in refit_run_history.data.items(): + any_refit_run_in_new_ensemble = False + num_run = run_value.additional_info["num_run"] + model_num_runs.append(num_run) + for identifier in refit_ensemble_identifiers: + if num_run == identifier[1]: + any_refit_run_in_new_ensemble = True + break + all_refit_runs_in_new_ensemble.append(any_refit_run_in_new_ensemble) + + assert all(all_refit_runs_in_new_ensemble), "All successful runs in the refit should be a part of the new ensemble" + + # Test # ==== @unittest.mock.patch('autoPyTorch.evaluation.tae.eval_train_function', @@ -186,8 +223,9 @@ def test_tabular_classification(openml_id, resampling_strategy, backend, resampl # Ensemble Builder produced an ensemble estimator.ensemble_ is not None + ensemble_identifiers = estimator.ensemble_.identifiers_ # There should be a weight for each element of the ensemble - assert len(estimator.ensemble_.identifiers_) == len(estimator.ensemble_.weights_) + assert len(ensemble_identifiers) == len(estimator.ensemble_.weights_) y_pred = estimator.predict(X_test) assert np.shape(y_pred)[0] == np.shape(X_test)[0] @@ -207,6 +245,15 @@ def test_tabular_classification(openml_id, resampling_strategy, backend, resampl successful_num_run) assert 'train_loss' in incumbent_results + # Test refit on dummy data + refit_test_estimator( + estimator=estimator, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + # Check that we can pickle dump_file = os.path.join(estimator._backend.temporary_directory, 'dump.pkl') @@ -217,9 +264,6 @@ def test_tabular_classification(openml_id, resampling_strategy, backend, resampl restored_estimator = pickle.load(f) restored_estimator.predict(X_test) - # Test refit on dummy data - estimator.refit(dataset=backend.load_datamanager()) - # Make sure that a configuration space is stored in the estimator assert isinstance(estimator.get_search_space(), CS.ConfigurationSpace) @@ -387,6 +431,15 @@ def test_tabular_regression(openml_name, resampling_strategy, backend, resamplin successful_num_run) assert 'train_loss' in incumbent_results, estimator.run_history.data + # Test refit on dummy data + refit_test_estimator( + estimator=estimator, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + # Check that we can pickle dump_file = os.path.join(estimator._backend.temporary_directory, 'dump.pkl') @@ -397,9 +450,6 @@ def test_tabular_regression(openml_name, resampling_strategy, backend, resamplin restored_estimator = pickle.load(f) restored_estimator.predict(X_test) - # Test refit on dummy data - estimator.refit(dataset=backend.load_datamanager()) - # Make sure that a configuration space is stored in the estimator assert isinstance(estimator.get_search_space(), CS.ConfigurationSpace) @@ -580,7 +630,14 @@ def test_time_series_forecasting(forecasting_toy_dataset, resampling_strategy, b assert np.shape(y_pred) == np.shape(y_test) # Test refit on dummy data - estimator.refit(dataset=backend.load_datamanager()) + refit_test_estimator( + estimator=estimator, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + # Make sure that a configuration space is stored in the estimator assert isinstance(estimator.get_search_space(), CS.ConfigurationSpace) From 2e4f0e8a2eac5bb22f7f1c92c211bb7edea0310a Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Fri, 19 Aug 2022 11:33:19 +0200 Subject: [PATCH 13/18] fix traditional prediction for refit --- autoPyTorch/api/base_task.py | 7 ++- autoPyTorch/evaluation/abstract_evaluator.py | 3 +- .../setup/traditional_ml/base_model.py | 11 +++-- .../base_traditional_learner.py | 27 ++++++----- .../traditional_learner/learners.py | 46 ++++++++++++------- .../20_basics/example_tabular_regression.py | 38 ++++++++++++--- 6 files changed, 89 insertions(+), 43 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index ca68333f9..1f36c009b 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -1435,8 +1435,7 @@ def refit( # Used when loading models self.resampling_strategy = resampling_strategy - if self._logger is None: - self._logger = self._get_logger(f"RefitLogger-{self.dataset_name}") + self._logger = self._get_logger("RefitLogger") self._logger.debug("Starting refit") @@ -1542,6 +1541,7 @@ def refit( self.run_history.update(run_history, DataOrigin.EXTERNAL_SAME_INSTANCES) run_history.save_json(os.path.join(self._backend.internals_directory, 'refit_run_history.json'), save_external=True) + # store ensemble for later use, with large iteration self._backend.save_ensemble(self.ensemble_, 10**8, self.seed) @@ -1848,8 +1848,7 @@ def predict( # Parallelize predictions across models with n_jobs processes. # Each process computes predictions in chunks of batch_size rows. - if self._logger is None: - self._logger = self._get_logger("Predict-Logger") + self._logger = self._get_logger("Predict-Logger") if self.ensemble_ is None and not self._load_models(): raise ValueError("No ensemble found. Either fit has not yet " diff --git a/autoPyTorch/evaluation/abstract_evaluator.py b/autoPyTorch/evaluation/abstract_evaluator.py index 69fb090f5..069228726 100644 --- a/autoPyTorch/evaluation/abstract_evaluator.py +++ b/autoPyTorch/evaluation/abstract_evaluator.py @@ -195,7 +195,8 @@ def get_additional_run_info(self) -> Dict[str, Any]: Can be found in autoPyTorch/pipeline/components/setup/traditional_ml/estimator_configs """ return {'pipeline_configuration': self.configuration, - 'trainer_configuration': self.pipeline.named_steps['model_trainer'].choice.model.get_config()} + 'trainer_configuration': self.pipeline.named_steps['model_trainer'].choice.model.get_config(), + 'configuration_origin': 'traditional'} def get_pipeline_representation(self) -> Dict[str, str]: return self.pipeline.get_pipeline_representation() diff --git a/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py b/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py index 7d26c5481..8b4723066 100644 --- a/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py +++ b/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py @@ -52,8 +52,7 @@ def __init__( self.add_fit_requirements([ FitRequirement('X_train', (np.ndarray, list, pd.DataFrame), user_defined=False, dataset_property=False), FitRequirement('y_train', (np.ndarray, list, pd.Series,), user_defined=False, dataset_property=False), - FitRequirement('train_indices', (np.ndarray, list), user_defined=False, dataset_property=False), - FitRequirement('val_indices', (np.ndarray, list), user_defined=False, dataset_property=False)]) + FitRequirement('train_indices', (np.ndarray, list), user_defined=False, dataset_property=False)]) def fit(self, X: Dict[str, Any], y: Any = None) -> autoPyTorchSetupComponent: """ @@ -90,8 +89,14 @@ def fit(self, X: Dict[str, Any], y: Any = None) -> autoPyTorchSetupComponent: # train model blockPrint() + val_indices = X.get('val_indices', None) + X_val = None + y_val = None + if val_indices is not None: + X_val = X['X_train'][val_indices] + y_val = X['y_train'][val_indices] self.fit_output = self.model.fit(X['X_train'][X['train_indices']], X['y_train'][X['train_indices']], - X['X_train'][X['val_indices']], X['y_train'][X['val_indices']]) + X_val, y_val) enablePrint() # infer diff --git a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py index 9c0166a9f..eaf40feb3 100644 --- a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py +++ b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py @@ -68,6 +68,8 @@ def __init__(self, self.is_classification = STRING_TO_TASK_TYPES[task_type] not in REGRESSION_TASKS + self.has_val_set = False + self.metric = get_metrics(dataset_properties={'task_type': task_type, 'output_type': output_type}, names=[optimize_metric] if optimize_metric is not None else None)[0] @@ -132,8 +134,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: """ Method that fits the underlying estimator Args: @@ -152,8 +154,8 @@ def _fit(self, def fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> Dict[str, Any]: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> Dict[str, Any]: """ Fit the model (possible using the validation set for early stopping) and return the results on the training and validation set. @@ -172,7 +174,10 @@ def fit(self, X_train: np.ndarray, Dictionary containing the results. see _get_results() """ X_train = self._preprocess(X_train) - X_val = self._preprocess(X_val) + + if X_val is not None: + self.has_val_set = True + X_val = self._preprocess(X_val) self._prepare_model(X_train, y_train) @@ -253,14 +258,14 @@ def _get_results(self, Dictionary containing the results """ pred_train = self.predict(X_train, predict_proba=self.is_classification, preprocess=False) - pred_val = self.predict(X_val, predict_proba=self.is_classification, preprocess=False) results = dict() - - results["val_preds"] = pred_val.tolist() - results["labels"] = y_val.tolist() - results["train_score"] = self.metric(y_train, pred_train) - results["val_score"] = self.metric(y_val, pred_val) + + if self.has_val_set: + pred_val = self.predict(X_val, predict_proba=self.is_classification, preprocess=False) + results["labels"] = y_val.tolist() + results["val_preds"] = pred_val.tolist() + results["val_score"] = self.metric(y_val, pred_val) return results diff --git a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py index 220c52dcd..fca02aa32 100644 --- a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py +++ b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py @@ -45,8 +45,10 @@ def _prepare_model(self, X_train: np.ndarray, y_train: np.ndarray ) -> None: - early_stopping = 150 if X_train.shape[0] > 10000 else max(round(150 * 10000 / X_train.shape[0]), 10) - self.config["early_stopping_rounds"] = early_stopping + + if self.has_val_set: + early_stopping = 150 if X_train.shape[0] > 10000 else max(round(150 * 10000 / X_train.shape[0]), 10) + self.config["early_stopping_rounds"] = early_stopping if not self.is_classification: self.model = LGBMRegressor(**self.config, random_state=self.random_state) else: @@ -57,11 +59,14 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None ) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" - self.model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) + eval_set = None + if self.has_val_set: + eval_set = [(X_val, y_val)] + self.model.fit(X_train, y_train, eval_set=eval_set) def predict(self, X_test: np.ndarray, predict_proba: bool = False, @@ -125,15 +130,21 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None + ) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" - early_stopping = 150 if X_train.shape[0] > 10000 else max(round(150 * 10000 / X_train.shape[0]), 10) categoricals = [ind for ind in range(X_train.shape[1]) if isinstance(X_train[0, ind], str)] X_train_pooled = Pool(data=X_train, label=y_train, cat_features=categoricals) - X_val_pooled = Pool(data=X_val, label=y_val, cat_features=categoricals) + X_val_pooled = None + if self.has_val_set: + X_val_pooled = Pool(data=X_val, label=y_val, cat_features=categoricals) + early_stopping: Optional[int] = 150 if X_train.shape[0] > 10000 else max( + round(150 * 10000 / X_train.shape[0]), 10) + else: + early_stopping = None self.model.fit(X_train_pooled, eval_set=X_val_pooled, @@ -189,8 +200,9 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None + ) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) @@ -244,8 +256,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) if self.config["warm_start"]: @@ -303,8 +315,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) @@ -346,8 +358,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) diff --git a/examples/20_basics/example_tabular_regression.py b/examples/20_basics/example_tabular_regression.py index 127f26829..0215fe485 100644 --- a/examples/20_basics/example_tabular_regression.py +++ b/examples/20_basics/example_tabular_regression.py @@ -53,16 +53,40 @@ ) ############################################################################ -# Print the final ensemble performance -# ==================================== +# Print the final ensemble performance before refit +# ================================================= y_pred = api.predict(X_test) - -# Rescale the Neural Network predictions into the original target range score = api.score(y_pred, y_test) - print(score) -# Print the final ensemble built by AutoPyTorch -print(api.show_models()) # Print statistics from search print(api.sprint_statistics()) + +########################################################################### +# Refit the models on the full dataset. +# ===================================== + +api.refit( + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + dataset_name="Boston", + total_walltime_limit=1000, + run_time_limit_secs=200 + # you can change the resampling strategy to + # for example, CrossValTypes.k_fold_cross_validation + # to fit k fold models and have a voting classifier + # resampling_strategy=CrossValTypes.k_fold_cross_validation +) + +############################################################################ +# Print the final ensemble performance after refit +# ================================================ + +y_pred = api.predict(X_test) +score = api.score(y_pred, y_test) +print(score) + +# Print the final ensemble built by AutoPyTorch +print(api.show_models()) From 5779043e0385d9280bf73302c2b6e455e57eb39d Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Fri, 19 Aug 2022 11:36:37 +0200 Subject: [PATCH 14/18] suggestions from review --- autoPyTorch/api/base_task.py | 2 +- examples/20_basics/example_tabular_classification.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 1f36c009b..db048f00a 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -820,7 +820,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: metric=self._metric, dask_client=self._dask_client, backend=self._backend, - memory_limit=self._memory_limit, + memory_limit=memory_limit, disable_file_output=self._disable_file_output, all_supported_metrics=self._all_supported_metrics, include=self.include_components, diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index 74a37c94b..291e017ac 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -42,10 +42,10 @@ api = TabularClassificationTask( # To maintain logs of the run, you can uncomment the # Following lines - temporary_directory='./tmp/autoPyTorch_example_tmp_01', - output_directory='./tmp/autoPyTorch_example_out_01', - delete_tmp_folder_after_terminate=False, - delete_output_folder_after_terminate=False, + # temporary_directory='./tmp/autoPyTorch_example_tmp_01', + # output_directory='./tmp/autoPyTorch_example_out_01', + # delete_tmp_folder_after_terminate=False, + # delete_output_folder_after_terminate=False, seed=42, ) From a72d8e2201472c0ac4e79f7a788ed189c20cdd73 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Mon, 22 Aug 2022 15:48:13 +0200 Subject: [PATCH 15/18] add warning for failed processing of results --- autoPyTorch/utils/parallel_model_runner.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 0216017c3..456862644 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -1,3 +1,4 @@ +from distutils.command.config import config import logging import math import time @@ -271,12 +272,17 @@ def _process_result( else: # we assume that it is a traditional model and `pipeline_configuration` # specifies the configuration. - configuration = additional_info.pop('pipeline_configuration') + configuration = additional_info.pop('pipeline_configuration', None) - run_history.add(config=configuration, cost=cost, - time=runtime, status=status, seed=seed, - starttime=starttime, endtime=starttime + runtime, - origin=origin, additional_info=additional_info) + if configuration is not None: + run_history.add(config=configuration, cost=cost, + time=runtime, status=status, seed=seed, + starttime=starttime, endtime=starttime + runtime, + origin=origin, additional_info=additional_info) + else: + logger.warning(f"Something went wrong while processing the results of {current_config}." + f"with additional_info: {additional_info} and status_type: {status}. " + f"Refer to the log file for more information.\nSkipping for now.") else: if additional_info.get('exitcode') == -6: logger.error( From 1a73f056edb49ffdd48521d79fc19205b4832364 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Mon, 22 Aug 2022 16:18:21 +0200 Subject: [PATCH 16/18] remove unnecessary change --- autoPyTorch/utils/parallel_model_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py index 456862644..d4237f683 100644 --- a/autoPyTorch/utils/parallel_model_runner.py +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -1,4 +1,3 @@ -from distutils.command.config import config import logging import math import time From 2801c7a77a74c4b3038a4982f2b7e4d43102f923 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Mon, 22 Aug 2022 16:25:24 +0200 Subject: [PATCH 17/18] update autopytorch version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7a8e7bac6..422c6f24d 100755 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ # noinspection PyInterpreter setuptools.setup( name="autoPyTorch", - version="0.2", + version="0.2.1", author="AutoML Freiburg", author_email="eddiebergmanhs@gmail.com", description=("Auto-PyTorch searches neural architectures using smac"), From 6b5bcdab0696a92b438959d71a384d7b310f9ff2 Mon Sep 17 00:00:00 2001 From: Ravin Kohli Date: Tue, 23 Aug 2022 15:34:57 +0200 Subject: [PATCH 18/18] update autopytorch version number and the example file --- autoPyTorch/__version__.py | 2 +- examples/20_basics/example_tabular_regression.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/autoPyTorch/__version__.py b/autoPyTorch/__version__.py index 94b9a71f5..36509b4a7 100644 --- a/autoPyTorch/__version__.py +++ b/autoPyTorch/__version__.py @@ -1,4 +1,4 @@ """Version information.""" # The following line *must* be the last in the module, exactly as formatted: -__version__ = "0.2" +__version__ = "0.2.1" diff --git a/examples/20_basics/example_tabular_regression.py b/examples/20_basics/example_tabular_regression.py index 0215fe485..6357d23e1 100644 --- a/examples/20_basics/example_tabular_regression.py +++ b/examples/20_basics/example_tabular_regression.py @@ -50,6 +50,7 @@ optimize_metric='r2', total_walltime_limit=300, func_eval_time_limit_secs=50, + dataset_name="Boston" ) ############################################################################ @@ -72,8 +73,8 @@ X_test=X_test, y_test=y_test, dataset_name="Boston", - total_walltime_limit=1000, - run_time_limit_secs=200 + total_walltime_limit=500, + run_time_limit_secs=50 # you can change the resampling strategy to # for example, CrossValTypes.k_fold_cross_validation # to fit k fold models and have a voting classifier