-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[train] Hard-deprecate MosaicTrainer and remove SklearnTrainer
#42814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
124d099
9920d4d
5c3a86c
e35a6aa
38aec83
16d6c99
3b850ea
fd823a3
77ba966
e96f703
4e9cdf0
b72ffc9
3ac6b40
1a748c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,255 +1,30 @@ | ||
| import inspect | ||
| import warnings | ||
| from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Type | ||
| from ray.util.annotations import Deprecated | ||
|
|
||
| from composer.loggers.logger_destination import LoggerDestination | ||
| from composer.trainer import Trainer | ||
| _DEPRECATION_MESSAGE = ( | ||
| "`ray.train.mosaic.MosaicTrainer` is deprecated. " | ||
| "Use `ray.train.torch.TorchTrainer` instead. " | ||
| "See this issue for a migration example: " | ||
| "https://github.com/ray-project/ray/issues/42257" | ||
| ) | ||
|
|
||
| from ray.train import Checkpoint, DataConfig, RunConfig, ScalingConfig | ||
| from ray.train.mosaic._mosaic_utils import RayLogger | ||
| from ray.train.torch import TorchConfig, TorchTrainer | ||
| from ray.train.trainer import GenDataset | ||
| from ray.util import PublicAPI | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ray.data.preprocessor import Preprocessor | ||
|
|
||
|
|
||
| @PublicAPI(stability="alpha") | ||
| class MosaicTrainer(TorchTrainer): | ||
| """A Trainer for data parallel Mosaic Composers on PyTorch training. | ||
|
|
||
| This Trainer runs the ``composer.trainer.Trainer.fit()`` method on multiple | ||
| Ray Actors. The training is carried out in a distributed fashion through PyTorch | ||
| DDP. These actors already have the necessary torch process group already | ||
| configured for distributed PyTorch training. | ||
|
|
||
| The training function ran on every Actor will first run the | ||
| specified ``trainer_init_per_worker`` function to obtain an instantiated | ||
| ``composer.Trainer`` object. The ``trainer_init_per_worker`` function | ||
| will have access to preprocessed train and evaluation datasets. | ||
|
|
||
| Example: | ||
|
|
||
| .. | ||
| TODO(yunxuan): Enable the test after we resolve the mosaicml dependency issue | ||
|
|
||
| .. testcode:: | ||
| :skipif: True | ||
|
|
||
| import torch.utils.data | ||
| import torchvision | ||
| from torchvision import transforms, datasets | ||
|
|
||
| from composer.models.tasks import ComposerClassifier | ||
| import composer.optim | ||
| from composer.algorithms import LabelSmoothing | ||
|
|
||
| import ray | ||
| import ray.train as train | ||
| from ray.train import ScalingConfig | ||
| from ray.train.mosaic import MosaicTrainer | ||
|
|
||
| def trainer_init_per_worker(config): | ||
| # prepare the model for distributed training and wrap with | ||
| # ComposerClassifier for Composer Trainer compatibility | ||
| model = torchvision.models.resnet18(num_classes=10) | ||
| model = ComposerClassifier(ray.train.torch.prepare_model(model)) | ||
|
|
||
| # prepare train/test dataset | ||
| mean = (0.507, 0.487, 0.441) | ||
| std = (0.267, 0.256, 0.276) | ||
| cifar10_transforms = transforms.Compose( | ||
| [transforms.ToTensor(), transforms.Normalize(mean, std)] | ||
| ) | ||
| data_directory = "~/data" | ||
| train_dataset = datasets.CIFAR10( | ||
| data_directory, | ||
| train=True, | ||
| download=True, | ||
| transform=cifar10_transforms | ||
| ) | ||
|
|
||
| # prepare train dataloader | ||
| batch_size_per_worker = BATCH_SIZE // session.get_world_size() | ||
| train_dataloader = torch.utils.data.DataLoader( | ||
| train_dataset, | ||
| batch_size=batch_size_per_worker | ||
| ) | ||
| train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader) | ||
|
|
||
| # prepare optimizer | ||
| optimizer = composer.optim.DecoupledSGDW( | ||
| model.parameters(), | ||
| lr=0.05, | ||
| momentum=0.9, | ||
| weight_decay=2.0e-3, | ||
| ) | ||
|
|
||
| return composer.trainer.Trainer( | ||
| model=model, | ||
| train_dataloader=train_dataloader, | ||
| optimizers=optimizer, | ||
| **config | ||
| ) | ||
|
|
||
| scaling_config = ScalingConfig(num_workers=2, use_gpu=True) | ||
| trainer_init_config = { | ||
| "max_duration": "1ba", | ||
| "algorithms": [LabelSmoothing()], | ||
| } | ||
|
|
||
| trainer = MosaicTrainer( | ||
| trainer_init_per_worker=trainer_init_per_worker, | ||
| trainer_init_config=trainer_init_config, | ||
| scaling_config=scaling_config, | ||
| ) | ||
|
|
||
| trainer.fit() | ||
|
|
||
| .. testoutput:: | ||
| :hide: | ||
|
|
||
| ... | ||
|
|
||
| Args: | ||
| trainer_init_per_worker: The function that returns an instantiated | ||
| ``composer.Trainer`` object and takes in configuration | ||
| dictionary (``config``) as an argument. This dictionary is based on | ||
| ``trainer_init_config`` and is modified for Ray - Composer integration. | ||
| datasets: Any Datasets to use for training. At the moment, we do not support | ||
| passing datasets to the trainer and using the dataset shards in the trainer | ||
| loop. Instead, configure and load the datasets inside | ||
| ``trainer_init_per_worker`` function | ||
| trainer_init_config: Configurations to pass into ``trainer_init_per_worker`` as | ||
| kwargs. Although the kwargs can be hard-coded in the | ||
| ``trainer_init_per_worker``, using the config allows the flexibility of | ||
| reusing the same worker init function while changing the trainer arguments. | ||
| For example, when hyperparameter tuning you can reuse the | ||
| same ``trainer_init_per_worker`` function with different hyperparameter | ||
| values rather than having multiple ``trainer_init_per_worker`` functions | ||
| with different hard-coded hyperparameter values. | ||
| torch_config: Configuration for setting up the PyTorch backend. If set to | ||
| None, use the default configuration. This replaces the ``backend_config`` | ||
| arg of ``DataParallelTrainer``. Same as in ``TorchTrainer``. | ||
| scaling_config: Configuration for how to scale data parallel training. | ||
| dataset_config: Configuration for dataset ingest. | ||
| run_config: Configuration for the execution of the training run. | ||
| resume_from_checkpoint: A ``ray.train.Checkpoint`` to resume training from. | ||
| # TODO(justinvyu): [code_removal] Delete in Ray 2.11. | ||
| @Deprecated | ||
| class MosaicTrainer: | ||
| """Deprecated. See this issue for a migration example: | ||
| https://github.com/ray-project/ray/issues/42257 | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| trainer_init_per_worker: Callable[[Optional[Dict]], Trainer], | ||
| *, | ||
| datasets: Optional[Dict[str, GenDataset]] = None, | ||
| trainer_init_config: Optional[Dict] = None, | ||
| torch_config: Optional[TorchConfig] = None, | ||
| scaling_config: Optional[ScalingConfig] = None, | ||
| dataset_config: Optional[DataConfig] = None, | ||
| run_config: Optional[RunConfig] = None, | ||
| preprocessor: Optional["Preprocessor"] = None, | ||
| resume_from_checkpoint: Optional[Checkpoint] = None, | ||
| ): | ||
|
|
||
| warnings.warn( | ||
| "This MosaicTrainer will be deprecated in Ray 2.8. " | ||
| "It is recommended to use the TorchTrainer instead.", | ||
| DeprecationWarning, | ||
| ) | ||
|
|
||
| self._validate_trainer_init_per_worker( | ||
| trainer_init_per_worker, "trainer_init_per_worker" | ||
| ) | ||
| def __new__(cls, *args, **kwargs): | ||
| raise DeprecationWarning(_DEPRECATION_MESSAGE) | ||
|
|
||
| self._validate_datasets(datasets) | ||
| self._validate_trainer_init_config(trainer_init_config) | ||
|
|
||
| if resume_from_checkpoint: | ||
| # TODO(ml-team): Reenable after Mosaic checkpointing is supported | ||
| raise NotImplementedError | ||
|
|
||
| super().__init__( | ||
| train_loop_per_worker=_mosaic_train_loop_per_worker, | ||
| train_loop_config=self._create_trainer_init_config( | ||
| trainer_init_per_worker, trainer_init_config | ||
| ), | ||
| torch_config=torch_config, | ||
| scaling_config=scaling_config, | ||
| dataset_config=dataset_config, | ||
| run_config=run_config, | ||
| datasets=datasets, | ||
| preprocessor=preprocessor, | ||
| resume_from_checkpoint=resume_from_checkpoint, | ||
| ) | ||
| def __init__(self, *args, **kwargs): | ||
| raise DeprecationWarning(_DEPRECATION_MESSAGE) | ||
|
|
||
| @classmethod | ||
| def _create_trainer_init_config( | ||
| cls, | ||
| trainer_init_per_worker: Callable[[Optional[Dict]], Trainer], | ||
| trainer_init_config: Optional[Dict[str, Any]], | ||
| ) -> Dict[str, Any]: | ||
| trainer_init_config = trainer_init_config.copy() if trainer_init_config else {} | ||
| if "_trainer_init_per_worker" in trainer_init_config: | ||
| raise ValueError( | ||
| "'_trainer_init_per_worker' is a reserved key in `trainer_init_config`." | ||
| ) | ||
| trainer_init_config["_trainer_init_per_worker"] = trainer_init_per_worker | ||
| return trainer_init_config | ||
| def restore(cls, *args, **kwargs): | ||
| raise DeprecationWarning(_DEPRECATION_MESSAGE) | ||
|
|
||
| @classmethod | ||
| def restore(cls: Type["MosaicTrainer"], **kwargs) -> "MosaicTrainer": | ||
| # TODO(ml-team): Reenable after Mosaic checkpointing is supported | ||
| raise NotImplementedError | ||
|
|
||
| def _validate_trainer_init_per_worker( | ||
| self, trainer_init_per_worker: Callable, fn_name: str | ||
| ) -> None: | ||
| num_params = len(inspect.signature(trainer_init_per_worker).parameters) | ||
| if num_params != 1: | ||
| raise ValueError( | ||
| f"{fn_name} should take in at most 1 argument (`config`), " | ||
| f"but it accepts {num_params} arguments instead." | ||
| ) | ||
|
|
||
| def _validate_datasets(self, datasets) -> None: | ||
| if not (datasets is None or len(datasets) == 0): | ||
| raise ValueError( | ||
| "MosaicTrainer does not support providing dataset shards \ | ||
| to `trainer_init_per_worker`. Instead of passing in the dataset into \ | ||
| MosaicTrainer, define a dataloader and use `prepare_dataloader` \ | ||
| inside the `trainer_init_per_worker`." | ||
| ) | ||
|
|
||
| def _validate_trainer_init_config(self, config) -> None: | ||
| if config is not None and "loggers" in config: | ||
| warnings.warn( | ||
| "Composer's Loggers (any subclass of LoggerDestination) are \ | ||
| not supported for MosaicComposer. Use Ray provided loggers instead" | ||
| ) | ||
|
|
||
|
|
||
| def _mosaic_train_loop_per_worker(config): | ||
| """Per-worker training loop for Mosaic Composers.""" | ||
| trainer_init_per_worker = config.pop("_trainer_init_per_worker") | ||
|
|
||
| # Replace Composer's Loggers with RayLogger | ||
| ray_logger = RayLogger(keys=config.pop("log_keys", [])) | ||
|
|
||
| # initialize Composer trainer | ||
| trainer: Trainer = trainer_init_per_worker(config) | ||
|
|
||
| # Remove Composer's Loggers if there are any added in the trainer_init_per_worker | ||
| # this removes the logging part of the loggers | ||
| filtered_callbacks = list() | ||
| for callback in trainer.state.callbacks: | ||
| if not isinstance(callback, LoggerDestination): | ||
| filtered_callbacks.append(callback) | ||
| filtered_callbacks.append(ray_logger) | ||
| trainer.state.callbacks = filtered_callbacks | ||
|
|
||
| # this prevents data to be routed to all the Composer Loggers | ||
| trainer.logger.destinations = (ray_logger,) | ||
|
|
||
| # call the trainer | ||
| trainer.fit() | ||
| def can_restore(cls, *args, **kwargs): | ||
| raise DeprecationWarning(_DEPRECATION_MESSAGE) | ||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,14 +9,29 @@ | |
| from ray.air.util.data_batch_conversion import _unwrap_ndarray_object_type_if_needed | ||
| from ray.train.predictor import Predictor | ||
| from ray.train.sklearn import SklearnCheckpoint | ||
| from ray.train.sklearn._sklearn_utils import _set_cpu_params | ||
| from ray.util.annotations import PublicAPI | ||
| from ray.util.joblib import register_ray | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ray.data.preprocessor import Preprocessor | ||
|
|
||
|
|
||
| # thread_count is a catboost parameter | ||
| SKLEARN_CPU_PARAM_NAMES = ["n_jobs", "thread_count"] | ||
|
|
||
|
|
||
| def _set_cpu_params(estimator: BaseEstimator, num_cpus: int) -> None: | ||
| """Sets all CPU-related params to num_cpus (incl. nested).""" | ||
| cpu_params = { | ||
| param: num_cpus | ||
| for param in estimator.get_params(deep=True) | ||
| if any( | ||
| param.endswith(cpu_param_name) for cpu_param_name in SKLEARN_CPU_PARAM_NAMES | ||
| ) | ||
| } | ||
| estimator.set_params(**cpu_params) | ||
|
|
||
|
|
||
| @PublicAPI(stability="alpha") | ||
| class SklearnPredictor(Predictor): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need SklearnPredictor?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still have all the predictors around but not shown in docs. Maybe we can remove them all at some point at once? |
||
| """A predictor for scikit-learn compatible estimators. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't be able to have a migration example because of the threading issue though, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was planning to just keep it as "not possible for now", then update the issue later. Or should I just remove the migration link for Mosaic trainer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah we can have a GH Issue that says it's not supported right now (the current one points to sklearn)