Skip to content

Make URL Access Pluggable #5323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 15 additions & 118 deletions src/toil/batchSystems/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib
import logging
import pkgutil
import warnings
Expand All @@ -21,6 +20,7 @@

from toil.lib.compatibility import deprecated
from toil.lib.memoize import memoize
import toil.lib.plugins

if TYPE_CHECKING:
from toil.batchSystems.abstractBatchSystem import AbstractBatchSystem
Expand All @@ -40,17 +40,14 @@ def add_batch_system_factory(

:param class_factory: A function that returns a batch system class (NOT an instance), which implements :class:`toil.batchSystems.abstractBatchSystem.AbstractBatchSystem`.
"""
_registry_keys.append(key)
_registry[key] = class_factory
toil.lib.plugins.register_plugin("batch_system", key, class_factory)


def get_batch_systems() -> Sequence[str]:
"""
Get the names of all the availsble batch systems.
Get the names of all the available batch systems.
"""
_load_all_plugins()

return _registry_keys
return toil.lib.plugins.get_plugin_names("batch_system")


def get_batch_system(key: str) -> type["AbstractBatchSystem"]:
Expand All @@ -60,8 +57,7 @@ def get_batch_system(key: str) -> type["AbstractBatchSystem"]:
:raises: KeyError if the key is not the name of a batch system, and
ImportError if the batch system's class cannot be loaded.
"""

return _registry[key]()
return toil.lib.plugins.get_plugin("batch_system", key)()


DEFAULT_BATCH_SYSTEM = "single_machine"
Expand Down Expand Up @@ -126,114 +122,15 @@ def kubernetes_batch_system_factory():


#####
# Registry implementation
#####

_registry: dict[str, Callable[[], type["AbstractBatchSystem"]]] = {
"aws_batch": aws_batch_batch_system_factory,
"single_machine": single_machine_batch_system_factory,
"grid_engine": gridengine_batch_system_factory,
"lsf": lsf_batch_system_factory,
"mesos": mesos_batch_system_factory,
"slurm": slurm_batch_system_factory,
"torque": torque_batch_system_factory,
"htcondor": htcondor_batch_system_factory,
"kubernetes": kubernetes_batch_system_factory,
}
_registry_keys = list(_registry.keys())

# We will load any packages starting with this prefix and let them call
# add_batch_system_factory()
_PLUGIN_NAME_PREFIX = "toil_batch_system_"


@memoize
def _load_all_plugins() -> None:
"""
Load all the batch system plugins that are installed.
"""

for finder, name, is_pkg in pkgutil.iter_modules():
# For all installed packages
if name.startswith(_PLUGIN_NAME_PREFIX):
# If it is a Toil batch system plugin, import it
importlib.import_module(name)


#####
# Deprecated API
# Registers all built-in batch system
#####

# We used to directly access these constants, but now the Right Way to use this
# module is add_batch_system_factory() to register and get_batch_systems() to
# get the list/get_batch_system() to get a class by name.


def __getattr__(name):
"""
Implement a fallback attribute getter to handle deprecated constants.

See <https://stackoverflow.com/a/48242860>.
"""
if name == "BATCH_SYSTEM_FACTORY_REGISTRY":
warnings.warn(
"BATCH_SYSTEM_FACTORY_REGISTRY is deprecated; use get_batch_system() or add_batch_system_factory()",
DeprecationWarning,
)
return _registry
elif name == "BATCH_SYSTEMS":
warnings.warn(
"BATCH_SYSTEMS is deprecated; use get_batch_systems()", DeprecationWarning
)
return _registry_keys
else:
raise AttributeError(f"Module {__name__} ahs no attribute {name}")


@deprecated(new_function_name="add_batch_system_factory")
def addBatchSystemFactory(
key: str, batchSystemFactory: Callable[[], type["AbstractBatchSystem"]]
):
"""
Deprecated method to add a batch system.
"""
return add_batch_system_factory(key, batchSystemFactory)


#####
# Testing utilities
#####

# We need a snapshot save/restore system for testing. We can't just tamper with
# the globals because module-level globals are their own references, so we
# can't touch this module's global name bindings from a client module.


def save_batch_system_plugin_state() -> (
tuple[list[str], dict[str, Callable[[], type["AbstractBatchSystem"]]]]
):
"""
Return a snapshot of the plugin registry that can be restored to remove
added plugins. Useful for testing the plugin system in-process with other
tests.
"""

snapshot = (list(_registry_keys), dict(_registry))
return snapshot


def restore_batch_system_plugin_state(
snapshot: tuple[list[str], dict[str, Callable[[], type["AbstractBatchSystem"]]]]
):
"""
Restore the batch system registry state to a snapshot from
save_batch_system_plugin_state().
"""

# We need to apply the snapshot without rebinding the names, because that
# won't affect modules that imported the names.
wanted_batch_systems, wanted_registry = snapshot
_registry_keys.clear()
_registry_keys.extend(wanted_batch_systems)
_registry.clear()
_registry.update(wanted_registry)
add_batch_system_factory("aws_batch", aws_batch_batch_system_factory)
add_batch_system_factory("single_machine", single_machine_batch_system_factory)
add_batch_system_factory("grid_engine", gridengine_batch_system_factory)
add_batch_system_factory("lsf", lsf_batch_system_factory)
add_batch_system_factory("mesos", mesos_batch_system_factory)
add_batch_system_factory("slurm", slurm_batch_system_factory)
add_batch_system_factory("torque", torque_batch_system_factory)
add_batch_system_factory("htcondor", htcondor_batch_system_factory)
add_batch_system_factory("kubernetes", kubernetes_batch_system_factory)
106 changes: 106 additions & 0 deletions src/toil/lib/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright (C) 2015-2025 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Generic plugin system for Toil plugins.

Plugins come in Python packages named::

toil_{PLUGIN_TYPE}_{WHATEVER}

When looking for plugins, Toil will list all the Python packages with the right
name prefix for the given type of plugin, and load them. The plugin modules
then have an opportunity to import :meth:`register_plugin` and register
themselves.
"""

import importlib
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to have the copyright header at the top here like we have in the other Toil source files, and also maybe a note (module-level docstring?) about what this file is for.

from typing import Any, Literal, Union
import pkgutil
from toil.lib.memoize import memoize


PluginType = Union[Literal["batch_system"], Literal["url_access"]]
plugin_types: list[PluginType] = ["batch_system", "url_access"]

_registry: dict[str, dict[str, Any]] = {k: {} for k in plugin_types}

def register_plugin(
plugin_type: PluginType, plugin_name: str, plugin_being_registered: Any
) -> None:
"""
Adds a plugin to the registry for the given type of plugin.
Comment on lines +39 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the type annotations don't define it, it might be good to note here what the expected plugin values are meant to be for a given plugin type (i.e. functions that return a class implementing blah interface).

It also might be good to explain what plugin_name is used for: for patch systems it is the value the user passes to --batchSystem to actually use it, and for URLs it is the URL scheme value that the plugin gets called to handle. It's not really the "name of the plugin" like the variable name would suggest; you shouldn't pass "My Cool Plugin" or something.


:param plugin_name: For batch systems, this is the string the user will use
to select the batch system on the command line with ``--batchSystem``.
For URL access plugins, this is the URL scheme that the plugin
implements.
:param plugin_being_registered: This is a function that, when called,
imports and returns a plugin-provided class type. For batch systems,
the resulting type must extend
:class:`toil.batchSystems.abstractBatchSystem.AbstractBatchSystem`. For
URL access plugins, it must extend :class:`toil.lib.url.URLAccess`.
Note that the function used here should return the class itslef; it
should not construct an instance of the class.
"""
_registry[plugin_type][plugin_name] = plugin_being_registered

def remove_plugin(
plugin_type: PluginType, plugin_name: str) -> None:
"""
Removes a plugin from the registry for the given type of plugin.
"""
try:
del _registry[plugin_type][plugin_name]
except KeyError:
# If the plugin does not exist, it can be ignored
pass

def get_plugin_names(plugin_type:PluginType) -> list[str]:
"""
Get the names of all the available plugins of the given type.
"""
_load_all_plugins(plugin_type)
return list(_registry[plugin_type].keys())

def get_plugin(plugin_type: PluginType, plugin_name: str) -> Any:
"""
Get a plugin class factory function by name.

:raises: KeyError if plugin_name is not the name of a plugin of the given
type.
"""
_load_all_plugins(plugin_type)
return _registry[plugin_type][plugin_name]


def _plugin_name_prefix(plugin_type: PluginType) -> str:
"""
Get prefix for plugin type.

Any packages with prefix will count as toil plugins of that type.
"""
return f"toil_{plugin_type}_"

@memoize
def _load_all_plugins(plugin_type: PluginType) -> None:
"""
Load all the plugins of the given type that are installed.
"""
prefix = _plugin_name_prefix(plugin_type)
for finder, name, is_pkg in pkgutil.iter_modules():
# For all installed packages
if name.startswith(prefix):
# If it is a Toil batch system plugin, import it
importlib.import_module(name)
90 changes: 57 additions & 33 deletions src/toil/lib/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from toil.lib.exceptions import UnimplementedURLException
from toil.lib.memoize import memoize
from toil.lib.plugins import register_plugin, get_plugin

try:
from botocore.exceptions import ProxyConnectionError
Expand Down Expand Up @@ -250,7 +251,7 @@ def _supports_url(cls, url: ParseResult, export: bool = False) -> bool:
@classmethod
def _find_url_implementation(
cls, url: ParseResult, export: bool = False
) -> Type["URLAccess"]:
) -> type["URLAccess"]:
"""
Returns the URLAccess subclass that supports the given URL.

Expand All @@ -259,38 +260,61 @@ def _find_url_implementation(
:param bool export: Determines if the url is supported for exporting

"""
for implementation in cls._url_access_classes():
if implementation._supports_url(url, export):
return implementation
try:
implementation_factory = get_plugin("url_access", url.scheme.lower())
except KeyError:
raise UnimplementedURLException(url, "export" if export else "import")

try:
implementation = cast(Type[URLAccess], implementation_factory())
except (ImportError, ProxyConnectionError):
logger.debug(
"Unable to import implementation for scheme '%s', as is expected if the corresponding extra was "
"omitted at installation time.",
url.scheme.lower(),
)
raise UnimplementedURLException(url, "export" if export else "import")

if implementation._supports_url(url, export):
return implementation
raise UnimplementedURLException(url, "export" if export else "import")

@staticmethod
@memoize
def _url_access_classes() -> list[Type["URLAccess"]]:
"""
A list of concrete URLAccess implementations whose dependencies are installed.
#####
# Built-in url access
#####

"""
url_access_class_names = (
"toil.jobStores.fileJobStore.FileJobStore",
"toil.jobStores.googleJobStore.GoogleJobStore",
"toil.jobStores.aws.jobStore.AWSJobStore",
"toil.jobStores.abstractJobStore.JobStoreSupport",
)
url_access_classes = []
for class_name in url_access_class_names:
module_name, class_name = class_name.rsplit(".", 1)
from importlib import import_module

try:
module = import_module(module_name)
except (ImportError, ProxyConnectionError):
logger.debug(
"Unable to import '%s' as is expected if the corresponding extra was "
"omitted at installation time.",
module_name,
)
else:
url_access_class = getattr(module, class_name)
url_access_classes.append(url_access_class)
return url_access_classes
def file_job_store_factory() -> type[URLAccess]:
from toil.jobStores.fileJobStore import FileJobStore

return FileJobStore


def google_job_store_factory() -> type[URLAccess]:
from toil.jobStores.googleJobStore import GoogleJobStore

return GoogleJobStore


def aws_job_store_factory() -> type[URLAccess]:
from toil.jobStores.aws.jobStore import AWSJobStore

return AWSJobStore


def job_store_support_factory() -> type[URLAccess]:
from toil.jobStores.abstractJobStore import JobStoreSupport

return JobStoreSupport

#make sure my py still works and the tests work
# can then get rid of _url_access_classes method

#####
# Registers all built-in urls
#####
register_plugin("url_access", "file", file_job_store_factory)
register_plugin("url_access", "gs", google_job_store_factory)
register_plugin("url_access", "s3", aws_job_store_factory)
register_plugin("url_access", "http", job_store_support_factory)
register_plugin("url_access", "https", job_store_support_factory)
register_plugin("url_access", "ftp", job_store_support_factory)
Loading
Loading