Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6edd728
use weaver-specific CWL extensions activation for schema validation
fmigneault Dec 2, 2022
9224de9
remove unused import
fmigneault Dec 2, 2022
12aac0e
fix loaded schema extensions format + improve test to ensure specific…
fmigneault Dec 2, 2022
c28d76c
[wip] define more CWL extension requirements
fmigneault Dec 2, 2022
73fc98c
add more CWL requirement schema definitions
fmigneault Dec 3, 2022
2bf181e
fix requirement schema class name
fmigneault Dec 3, 2022
12ebdfe
Merge branch 'master' into setup-cwl-extensions
fmigneault Dec 5, 2022
f4a85be
fix custom schema loaded for CWL validation
fmigneault Dec 5, 2022
04f2d44
Merge branch 'setup-cwl-extensions' of https://github.com/crim-ca/wea…
fmigneault Dec 5, 2022
ca768c9
Merge branch 'master' into setup-cwl-extensions
fmigneault Dec 5, 2022
4787a53
Merge branch 'master' into setup-cwl-extensions
fmigneault Dec 5, 2022
587ba29
Merge branch 'master' into setup-cwl-extensions
fmigneault Dec 5, 2022
cd7e7e6
fix resource reader reference for Python 3.10 support
fmigneault Dec 5, 2022
77619f2
fix linting
fmigneault Dec 5, 2022
7140214
fix load resource cross-python versions
fmigneault Dec 5, 2022
c514239
fix lint
fmigneault Dec 5, 2022
b15e966
fix cache handling during cwl extension test resolution
fmigneault Dec 5, 2022
7bc84db
use with open on module resource file + compat py36
fmigneault Dec 5, 2022
37111b7
fix typo
fmigneault Dec 6, 2022
4b3de3f
Merge branch 'master' into setup-cwl-extensions
fmigneault Dec 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ Changes:
--------
- Update Docker images to use more recent Python 3.10 by default instead of Python 3.7.
All CI pipeline, tests and validation checks are also performed with Python 3.10.
Unit and functional tests remain evaluated for all Python versions since 3.6 up to 3.11.
Unit and functional tests remain evaluated for all Python versions since 3.6 (legacy) up to 3.11 (experimental).
- Update to latest ``cwltool==3.1.20221201130942`` to provide ``v1.2`` extension definitions.
- Add `CWL` extensions activation for specific features supported by `Weaver` for more adequate schema validation.
- Add `Job` log message size checks to better control what gets logged during the `Application Package` execution to
avoid large documents causing problems when attempting save them to storage database.
- Update documentation with examples for ``cwltool:CUDARequirement``, ``ResourceRequirement`` and ``NetworkAccess``.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ cryptography
# use cwltool gpu-enabled support until integrated within the original tool
# (https://github.com/common-workflow-language/common-workflow-language/issues/587)
### git+https://github.com/crim-ca/cwltool@docker-gpu#egg=cwltool; python_version >= "3"
cwltool==3.1.20220913185150
cwltool==3.1.20221201130942
# defused required for json2xml
defusedxml
docker
Expand Down
96 changes: 94 additions & 2 deletions tests/processes/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- :mod:`tests.functional.wps_package`.
"""
import contextlib
import copy
import io
import logging
import os
Expand All @@ -14,6 +15,8 @@
import tempfile
from typing import TYPE_CHECKING

import cwltool.process
import mock
import pytest

from tests.utils import assert_equal_any_order
Expand All @@ -23,9 +26,12 @@
CWL_REQUIREMENT_APP_DOCKER,
CWL_REQUIREMENT_APP_DOCKER_GPU,
CWL_REQUIREMENT_CUDA,
CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS
CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS,
CWL_REQUIREMENT_CUDA_NAMESPACE,
CWL_REQUIREMENT_PROCESS_GENERATOR,
CWL_REQUIREMENT_TIME_LIMIT
)
from weaver.processes.wps_package import WpsPackage, _update_package_compatibility
from weaver.processes.wps_package import WpsPackage, _load_package_content, _update_package_compatibility
from weaver.wps.service import WorkerRequest

if TYPE_CHECKING:
Expand Down Expand Up @@ -385,3 +391,89 @@ def test_update_package_compatibility(original, expected):
expected = _combine(cwl_base, expected)
test_cwl = _update_package_compatibility(original)
assert_equal_requirements_any_order(test_cwl, expected)


def test_cwl_extension_requirements_no_error():
"""
Validate that specific :term:`CWL` extensions supported by Weaver can be loaded.

When initialized, the :term:`CWL` factory will validate the document requirement references by resoling against
the registered definitions to ensure they are all correctly formatted and provide all necessary details.

By default, only the "base" schemas for the specified ``cwlVersion`` in the :term:`CWL` document are employed.
Extensions supported by Weaver will raise a validation error.

This test ensures that known extensions such as :data:`CWL_REQUIREMENT_CUDA` will be resolved without error.
Unknown or unsupported definitions should however continue raising the validation error.
"""
cwl = {
"cwlVersion": "v1.2",
"class": "CommandLineTool",
"baseCommand": ["echo", "test"],
"inputs": {},
"outputs": {},
"requirements": {CWL_REQUIREMENT_CUDA: dict(CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS)},
"$namespaces": dict(CWL_REQUIREMENT_CUDA_NAMESPACE)
}

# default behaviour without loading supported extensions should fail validation
with mock.patch("weaver.processes.wps_package._load_supported_schemas", side_effect=lambda: None):
# mock caches to ensure that previous tests did not already perform schema registration,
# making the "unknown" extensions for below test to actually be defined and valid in advance
with mock.patch.dict("weaver.processes.wps_package.PACKAGE_SCHEMA_CACHE", {}, clear=True):
with mock.patch.dict("cwltool.process.SCHEMA_CACHE", {}, clear=True):
cwltool.process.use_standard_schema("v1.2") # enforce standard CWL without any extension

with pytest.raises(cwltool.process.ValidationException) as exc_info:
_load_package_content(cwl, "test")
message = str(exc_info.value)
assert all(
info in message for info in [
"checking field `requirements`",
"Field `class` contains undefined reference to",
CWL_REQUIREMENT_CUDA.split(":", 1)[-1],
]
), "Validation error should have been caused by missing CWL CUDA extension schema, not something else."

# no error expected after when supported schema extensions are applied
# here we reset the caches again to ensure the standard schema are overridden by the custom selection of extensions
with mock.patch.dict("weaver.processes.wps_package.PACKAGE_SCHEMA_CACHE", {}, clear=True):
with mock.patch.dict("cwltool.process.SCHEMA_CACHE", {}, clear=True):
_load_package_content(cwl, "test")

# even though the extensions are now enabled,
# validation should allow them only for the relevant versions where they are applicable
cwl_old = copy.deepcopy(cwl)
cwl_old["cwlVersion"] = "v1.0"
cwl_old["requirements"] = {
# note: 'TimeLimit' (v1.0) renamed to 'ToolTimeLimit' (v1.1 and beyond)
CWL_REQUIREMENT_TIME_LIMIT: {"timelimit": 10}
}
with pytest.raises(cwltool.process.ValidationException) as exc_info:
_load_package_content(cwl_old, "test")
message = str(exc_info.value)
assert all(
info in message for info in [
"checking field `requirements`",
"Field `class` contains undefined reference to",
CWL_REQUIREMENT_TIME_LIMIT.split(":", 1)[-1],
]
), "Validation error should have been caused by missing CWL ToolTimeLimit extension schema, not something else."

# test unsupported schema extension to ensure still disallowed
cwl["requirements"] = {
CWL_REQUIREMENT_PROCESS_GENERATOR: {
"class": "CommandLineTool",
"run": copy.deepcopy(cwl),
}
}
with pytest.raises(cwltool.process.ValidationException) as exc_info:
_load_package_content(cwl, "test")
message = str(exc_info.value)
assert all(
info in message for info in [
"checking field `requirements`",
"Field `class` contains undefined reference to",
CWL_REQUIREMENT_PROCESS_GENERATOR,
]
), "Validation failure should have been caused by unsupported CWL extension schema, not something else."
34 changes: 32 additions & 2 deletions weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class OpenSearchField(Constants):
LOCAL_FILE_SCHEME = "opensearchfile" # must be a valid url scheme parsable by urlparse


CWL_NAMESPACE_ID = "cwl"
CWL_NAMESPACE_URL = "https://w3id.org/cwl/cwl#"
CWL_NAMESPACE = MappingProxyType({CWL_NAMESPACE_ID: CWL_NAMESPACE_URL})

CWL_TOOL_NAMESPACE_ID = "cwltool"
CWL_TOOL_NAMESPACE_URL = "http://commonwl.org/cwltool#"
CWL_TOOL_NAMESPACE = MappingProxyType({CWL_TOOL_NAMESPACE_ID: CWL_TOOL_NAMESPACE_URL})

# FIXME: convert to 'Constants' class
# CWL package (requirements/hints) corresponding to `ProcessType.APPLICATION`
CWL_REQUIREMENT_APP_BUILTIN = "BuiltinRequirement"
Expand Down Expand Up @@ -95,22 +103,38 @@ class OpenSearchField(Constants):
"""

# FIXME: convert to 'Constants' class
CWL_REQUIREMENT_CUDA = "cwltool:CUDARequirement"
# NOTE: depending on the 'cwlVersion' of the document, some items are extensions or native to the standard specification
CWL_REQUIREMENT_CUDA = f"{CWL_TOOL_NAMESPACE_ID}:CUDARequirement"
CWL_REQUIREMENT_CUDA_NAMESPACE = CWL_TOOL_NAMESPACE
CWL_REQUIREMENT_ENV_VAR = "EnvVarRequirement"
CWL_REQUIREMENT_INIT_WORKDIR = "InitialWorkDirRequirement"
CWL_REQUIREMENT_INLINE_JAVASCRIPT = "InlineJavascriptRequirement"
CWL_REQUIREMENT_INPLACE_UPDATE = "InplaceUpdateRequirement"
CWL_REQUIREMENT_LOAD_LISTING = "LoadListingRequirement"
CWL_REQUIREMENT_MPI = "MPIRequirement" # no implication yet
CWL_REQUIREMENT_NETWORK_ACCESS = "NetworkAccess"
CWL_REQUIREMENT_PROCESS_GENERATOR = "ProcessGenerator"
CWL_REQUIREMENT_RESOURCE = "ResourceRequirement"
CWL_REQUIREMENT_SCATTER = "ScatterFeatureRequirement"
CWL_REQUIREMENT_SECRETS = "Secrets"
CWL_REQUIREMENT_TIME_LIMIT = "ToolTimeLimit"
CWL_REQUIREMENT_WORK_REUSE = "WorkReuse" # default is to reuse, employed to explicitly disable

CWL_REQUIREMENT_FEATURES = frozenset([
CWL_REQUIREMENT_CUDA,
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_INPLACE_UPDATE,
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
CWL_REQUIREMENT_LOAD_LISTING,
# CWL_REQUIREMENT_MPI, # no implication yet
CWL_REQUIREMENT_NETWORK_ACCESS,
CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138)
# CWL_REQUIREMENT_PROCESS_GENERATOR, # explicitly unsupported, works against Weaver's behavior
CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138)
CWL_REQUIREMENT_SCATTER,
# CWL_REQUIREMENT_SECRETS, # FIXME: support CWL Secrets (https://github.com/crim-ca/weaver/issues/511)
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE, # allow it, but makes sense only for Workflow steps if cwltool handles it by itself
])
"""
Set of :term:`CWL` requirements that corresponds to extra functionalities not completely defining
Expand Down Expand Up @@ -200,9 +224,15 @@ class ProcessSchema(Constants):
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
CWL_REQUIREMENT_INPLACE_UPDATE,
CWL_REQUIREMENT_LOAD_LISTING,
CWL_REQUIREMENT_MPI,
CWL_REQUIREMENT_NETWORK_ACCESS,
CWL_REQUIREMENT_RESOURCE,
CWL_REQUIREMENT_SCATTER,
CWL_REQUIREMENT_SECRETS,
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE,
]
ProcessSchemaType = Literal[ProcessSchema.OGC, ProcessSchema.OLD]
WPS_ComplexType = Literal[WPS_COMPLEX, WPS_COMPLEX_DATA, WPS_REFERENCE]
Expand Down
66 changes: 66 additions & 0 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import yaml
from cwltool.context import LoadingContext, RuntimeContext
from cwltool.factory import Factory as CWLFactory, WorkflowStatus as CWLException
from cwltool.process import use_custom_schema
from pyramid.httpexceptions import HTTPOk, HTTPServiceUnavailable
from pywps import Process
from pywps.inout.basic import SOURCE_TYPE
Expand Down Expand Up @@ -68,9 +69,11 @@
CWL_REQUIREMENT_APP_WPS1,
CWL_REQUIREMENT_CUDA,
CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS,
CWL_REQUIREMENT_CUDA_NAMESPACE,
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_RESOURCE,
CWL_REQUIREMENTS_SUPPORTED,
CWL_TOOL_NAMESPACE_URL,
PACKAGE_COMPLEX_TYPES,
PACKAGE_DIRECTORY_TYPE,
PACKAGE_EXTENSIONS,
Expand Down Expand Up @@ -114,6 +117,7 @@
get_settings,
list_directory_recursive,
null,
open_module_resource_file,
request_extra,
setup_loggers
)
Expand Down Expand Up @@ -193,6 +197,8 @@
PACKAGE_PROGRESS_PREP_OUT = 98
PACKAGE_PROGRESS_DONE = 100

PACKAGE_SCHEMA_CACHE = {} # type: Dict[str, Tuple[str, str]]


def get_status_location_log_path(status_location, out_dir=None):
# type: (str, Optional[str]) -> str
Expand Down Expand Up @@ -388,6 +394,8 @@ def _update_package_compatibility(package):
package["requirements"] = r_list
if h_list:
package["hints"] = h_list
package.setdefault("$namespaces", {})
package["$namespaces"].update(CWL_REQUIREMENT_CUDA_NAMESPACE.copy())
LOGGER.warning(
"CWL package definition updated using '%s' backward-compatibility definition.\n%s",
CWL_REQUIREMENT_APP_DOCKER_GPU,
Expand All @@ -396,6 +404,63 @@ def _update_package_compatibility(package):
return package


def _load_supported_schemas():
"""
Loads :term:`CWL` schemas supported by `Weaver` to avoid validation errors when provided in requirements.

Use a similar strategy as :func:`cwltool.main.setup_schema`, but skipping the :term:`CLI` context and limiting
loaded schema definitions to those that `Weaver` allows. Drops extensions that could cause misbehaving
functionalities when other :term:`Process` types than :term:`CWL`-based :term:`Application Package` are used.

This operation must be called before the :class:`CWLFactory` attempts loading and validating a :term:`CWL` document.
"""
schema_supported = [name.rsplit(":", 1)[-1] for name in CWL_REQUIREMENTS_SUPPORTED]

# explicitly omit dev versions, only released versions allowed
extension_resources = {
"v1.0": "extensions.yml",
"v1.1": "extensions-v1.1.yml",
"v1.2": "extensions-v1.2.yml",
}
for version, ext_version_file in extension_resources.items():
# use our own cache on top of cwltool cache to distinguish between 'v1.x' names
# pointing at "CWL standard", "cwltool-flavored extensions" or "weaver-flavored extensions"
if version in PACKAGE_SCHEMA_CACHE:
LOGGER.debug("Reusing cached CWL %s schema extensions.", version)
continue
LOGGER.debug("Loading CWL %s schema extensions...", version)
with open_module_resource_file(cwltool, ext_version_file) as r_file:
schema = yaml.safe_load(r_file)

extensions = schema["$graph"]
extensions_supported = []
extensions_imports = []
extensions_enabled = set()
extensions_dropped = set()
for ext in extensions:
if "name" not in ext and "$import" in ext:
extensions_imports.append(ext)
continue
ext_name = ext["name"]
if ext_name in schema_supported:
extensions_enabled.add(ext_name)
extensions_supported.append(ext)
else:
extensions_dropped.add(ext_name)
extensions_enabled = sorted(list(extensions_enabled))
extensions_dropped = sorted(list(extensions_dropped))
LOGGER.debug(
"Configuring CWL %s schema extensions:\n Enabled: %s\n Dropped: %s",
version, extensions_enabled, extensions_dropped,
)
schema["$graph"] = extensions_imports + extensions_supported

schema_data = bytes2str(yaml.safe_dump(schema, encoding="utf-8", sort_keys=False))
schema_base = CWL_TOOL_NAMESPACE_URL.split("#", 1)[0]
use_custom_schema(version, schema_base, schema_data)
PACKAGE_SCHEMA_CACHE[version] = (schema_base, schema_data)


@overload
def _load_package_content(package_dict, # type: CWL
package_name=PACKAGE_DEFAULT_FILE_NAME, # type: str
Expand Down Expand Up @@ -498,6 +563,7 @@ def _load_package_content(package_dict, # type: CWL
if only_dump_file:
return

_load_supported_schemas()
factory = CWLFactory(loading_context=loading_context, runtime_context=runtime_context)
package = factory.make(tmp_json_cwl) # type: CWLFactoryCallable
shutil.rmtree(tmp_dir)
Expand Down
29 changes: 27 additions & 2 deletions weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from concurrent.futures import ALL_COMPLETED, CancelledError, ThreadPoolExecutor, as_completed, wait as wait_until
from copy import deepcopy
from datetime import datetime
from pkgutil import get_loader
from typing import TYPE_CHECKING, overload
from urllib.parse import ParseResult, unquote, urlparse, urlunsplit

Expand Down Expand Up @@ -64,13 +65,15 @@
from weaver.xml_util import HTML_TREE_BUILDER, XML

if TYPE_CHECKING:
from types import FrameType
import importlib.abc
from types import FrameType, ModuleType
from typing import (
Any,
AnyStr,
Callable,
Dict,
List,
IO,
Iterable,
Iterator,
MutableMapping,
Expand Down Expand Up @@ -990,6 +993,28 @@ def import_target(target, default_root=None):
return getattr(mod, target, None)


def open_module_resource_file(module, file_path):
# type: (Union[str, ModuleType], str) -> IO[bytes]
"""
Opens a resource (data file) from an installed module.

:returns: File stream handler to read contents as needed.
"""
loader = get_loader(module)
# Python <=3.6, no 'get_resource_reader' or 'open_resource' on loader/reader
# Python >=3.10, no 'open_resource' directly on loader
# Python 3.7-3.9, both permitted in combination
try:
try:
reader = loader.get_resource_reader() # type: importlib.abc.ResourceReader # noqa
except AttributeError:
reader = loader # noqa
return reader.open_resource(file_path)
except AttributeError:
path = os.path.join(module.__path__[0], file_path)
return open(path, mode="r", encoding="utf-8")


def now(tz_name=None):
# type: (Optional[str]) -> datetime
"""
Expand Down Expand Up @@ -2917,7 +2942,7 @@ def fetch_reference(reference, # type: str
"""
if reference.endswith("/"):
path = fetch_directory(reference, out_dir, out_method=out_method, settings=settings, **option_kwargs)
path = path if out_listing else (f"{os.path.realpath(out_dir)}/")
path = path if out_listing else f"{os.path.realpath(out_dir)}/"
else:
path = fetch_file(reference, out_dir, out_method=out_method, settings=settings, **option_kwargs)
return [path] if out_listing and isinstance(path, str) else path
Expand Down
Loading