Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.deps.requirements
import copy
import os
from typing import (
    Any,
    Callable,
    cast,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
)
from typing_extensions import (
    get_args,
    Literal,
)
from galaxy.tool_util_models.tool_source import JavascriptRequirement
from galaxy.util import (
    asbool,
    string_as_bool,
    xml_text,
)
from galaxy.util.oset import OrderedSet
DEFAULT_REQUIREMENT_TYPE = "package"
DEFAULT_REQUIREMENT_VERSION = None
[docs]
class ToolRequirement:
    """
    Represents an external requirement that must be available for the tool to
    run (for example, a program, package, or library).  Requirements can
    optionally assert a specific version.
    """
[docs]
    def __init__(
        self,
        name: str,
        type: Optional[str] = None,
        version: Optional[str] = None,
        specs: Optional[Iterable["RequirementSpecification"]] = None,
    ) -> None:
        if specs is None:
            specs = []
        self.name = name
        self.type = type
        self.version = version
        self.specs = specs
[docs]
    def to_dict(self) -> Dict[str, Any]:
        specs = [s.to_dict() for s in self.specs]
        return dict(name=self.name, type=self.type, version=self.version, specs=specs)
[docs]
    @classmethod
    def from_dict(cls, d: Dict[str, Any]) -> "ToolRequirement":
        version = d.get("version")
        name = d["name"]
        type = d.get("type")
        specs = [RequirementSpecification.from_dict(s) for s in d.get("specs", [])]
        return cls(name=name, type=type, version=version, specs=specs)
    def __eq__(self, other: Any) -> bool:
        return (
            self.name == other.name
            and self.type == other.type
            and self.version == other.version
            and self.specs == other.specs
        )
    def __hash__(self) -> int:
        return hash((self.name, self.type, self.version, frozenset(self.specs)))
    def __str__(self) -> str:
        return f"ToolRequirement[{self.name},version={self.version},type={self.type},specs={self.specs}]"
    __repr__ = __str__
[docs]
class RequirementSpecification:
    """Refine a requirement using a URI."""
[docs]
    def __init__(self, uri: str, version: Optional[str] = None) -> None:
        self.uri = uri
        self.version = version
    @property
    def specifies_version(self) -> bool:
        return self.version is not None
    @property
    def short_name(self) -> str:
        return self.uri.split("/")[-1]
[docs]
    @classmethod
    def from_dict(cls, dict: Dict) -> "RequirementSpecification":
        uri = dict["uri"]
        version = dict.get("version", None)
        return cls(uri=uri, version=version)
    def __eq__(self, other: Any) -> bool:
        return self.uri == other.uri and self.version == other.version
    def __hash__(self) -> int:
        return hash((self.uri, self.version))
[docs]
class ToolRequirements:
    """
    Represents all requirements (packages, env vars) needed to run a tool.
    """
[docs]
    def __init__(self, tool_requirements: Optional[List[Union[ToolRequirement, Dict[str, Any]]]] = None) -> None:
        if tool_requirements:
            if not isinstance(tool_requirements, list):
                raise ToolRequirementsException("ToolRequirements Constructor expects a list")
            self.tool_requirements = OrderedSet(
                r if isinstance(r, ToolRequirement) else ToolRequirement.from_dict(r) for r in tool_requirements
            )
        else:
            self.tool_requirements = OrderedSet()
[docs]
    @classmethod
    def from_list(cls, requirements: List[Union[ToolRequirement, Dict[str, Any]]]) -> "ToolRequirements":
        return cls(requirements)
    @property
    def resolvable(self) -> "ToolRequirements":
        return ToolRequirements([r for r in self.tool_requirements if r.type in {"package", "set_environment"}])
    @property
    def packages(self) -> "ToolRequirements":
        return ToolRequirements([r for r in self.tool_requirements if r.type == "package"])
[docs]
    def append(self, requirement: Union[ToolRequirement, Dict[str, Any]]) -> None:
        if not isinstance(requirement, ToolRequirement):
            requirement = ToolRequirement.from_dict(requirement)
        self.tool_requirements.add(requirement)
    def __eq__(self, other: Any) -> bool:
        return (
            len(self.tool_requirements & other.tool_requirements)
            == len(self.tool_requirements)
            == len(other.tool_requirements)
        )
    def __iter__(self) -> Iterator[ToolRequirement]:
        yield from self.tool_requirements
    def __getitem__(self, ii) -> ToolRequirement:
        return list(self.tool_requirements)[ii]
    def __len__(self) -> int:
        return len(self.tool_requirements)
    def __hash__(self) -> int:
        return sum(r.__hash__() for r in self.tool_requirements)
[docs]
    def to_dict(self) -> List[Dict[str, Any]]:
        return [r.to_dict() for r in self.tool_requirements]
DEFAULT_CONTAINER_TYPE = "docker"
DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES = False
DEFAULT_CONTAINER_SHELL = "/bin/sh"  # Galaxy assumes bash, but containers are usually thinner.
[docs]
class ContainerDescription:
[docs]
    def __init__(
        self,
        identifier: str,
        type: str = DEFAULT_CONTAINER_TYPE,
        resolve_dependencies: bool = DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES,
        shell: str = DEFAULT_CONTAINER_SHELL,
    ) -> None:
        # Force to lowercase because container image names must be lowercase.
        # Cached singularity images include the path on disk, so only lowercase
        # the image identifier portion. Note, the tag can also contain upper case
        # letters.
        self.identifier = identifier
        if identifier:
            parts = identifier.rsplit(os.sep, 1)
            if ":" in parts[-1]:
                name, tag = parts[-1].rsplit(":", 1)
                parts[-1] = f"{name.lower()}:{tag}"
            else:
                parts[-1] = parts[-1].lower()
            self.identifier = os.sep.join(parts)
        self.type = type
        self.resolve_dependencies = resolve_dependencies
        self.shell = shell
        self.explicit = False
[docs]
    def to_dict(self, *args, **kwds) -> Dict[str, Any]:
        return dict(
            identifier=self.identifier,
            type=self.type,
            resolve_dependencies=self.resolve_dependencies,
            shell=self.shell,
        )
[docs]
    @classmethod
    def from_dict(cls, dict: Dict[str, Any]) -> "ContainerDescription":
        identifier = dict["identifier"]
        type = dict.get("type", DEFAULT_CONTAINER_TYPE)
        resolve_dependencies = dict.get("resolve_dependencies", DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES)
        shell = dict.get("shell", DEFAULT_CONTAINER_SHELL)
        return cls(
            identifier=identifier,
            type=type,
            resolve_dependencies=resolve_dependencies,
            shell=shell,
        )
    def __str__(self) -> str:
        return f"ContainerDescription[identifier={self.identifier},type={self.type}]"
ResourceType = Literal[
    "cores_min",
    "cores_max",
    "ram_min",
    "ram_max",
    "tmpdir_min",
    "tmpdir_max",
    "cuda_version_min",
    "cuda_compute_capability",
    "gpu_memory_min",
    "cuda_device_count_min",
    "cuda_device_count_max",
    "shm_size",
]
VALID_RESOURCE_TYPES = get_args(ResourceType)
[docs]
class ResourceRequirement:
[docs]
    def __init__(self, value_or_expression: Union[int, float, str], resource_type: ResourceType) -> None:
        self.value_or_expression = value_or_expression
        if not resource_type:
            raise ValueError("Missing resource requirement type")
        if resource_type not in VALID_RESOURCE_TYPES:
            raise ValueError(f"Invalid resource requirement type '{resource_type}'")
        self.resource_type = resource_type
        try:
            float(self.value_or_expression)
            self.runtime_required = False
        except ValueError:
            self.runtime_required = True
[docs]
    def to_dict(self) -> Dict[str, Any]:
        return {"resource_type": self.resource_type, "value_or_expression": self.value_or_expression}
[docs]
    def get_value(self, runtime: Optional[Dict] = None, js_evaluator: Optional[Callable] = None) -> float:
        if self.runtime_required:
            # TODO: hook up evaluator
            # return js_evaluator(self.value_or_expression, runtime)
            raise NotImplementedError(
                f"{self.value_or_expression} is not an integer or float value, expressions currently not implemented"
            )
        return float(self.value_or_expression)
[docs]
def resource_requirements_from_list(requirements: Iterable[Dict[str, Any]]) -> List[ResourceRequirement]:
    cwl_to_galaxy = {
        "coresMin": "cores_min",
        "coresMax": "cores_max",
        "ramMin": "ram_min",
        "ramMax": "ram_max",
        "tmpdirMin": "tmpdir_min",
        "tmpdirMax": "tmpdir_max",
        "cudaVersionMin": "cuda_version_min",
        "cudaComputeCapability": "cuda_compute_capability",
        "gpuMemoryMin": "gpu_memory_min",
        "cudaDeviceCountMin": "cuda_device_count_min",
        "cudaDeviceCountMax": "cuda_device_count_max",
        "ShmSize": "shm_size",
    }
    rr = []
    for r in requirements:
        if r.get("class") == "ResourceRequirement":
            valid_key_set = set(cwl_to_galaxy.keys())
        elif r.get("type") == "resource":
            valid_key_set = set(cwl_to_galaxy.values())
        else:
            continue
        for key in valid_key_set.intersection(set(r.keys())):
            value = r[key]
            key = cast(ResourceType, cwl_to_galaxy.get(key, key))
            if value is not None:
                # all resoure requirement fields are optional
                rr.append(ResourceRequirement(value_or_expression=value, resource_type=key))
    return rr
[docs]
class BaseCredential:
[docs]
    def __init__(
        self,
        name: str,
        inject_as_env: str,
        optional: bool = False,
        label: str = "",
        description: str = "",
    ) -> None:
        self.name = name
        self.inject_as_env = inject_as_env
        self.optional = optional
        self.label = label
        self.description = description
        if not self.name:
            raise ValueError("Missing credential (secret/variable) name")
        if not self.inject_as_env:
            raise ValueError("Missing inject_as_env")
[docs]
    def to_dict(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "optional": self.optional,
            "label": self.label,
            "description": self.description,
        }
[docs]
class Secret(BaseCredential):
[docs]
    @classmethod
    def from_element(cls, elem) -> "Secret":
        return cls(
            name=elem.get("name"),
            inject_as_env=elem.get("inject_as_env"),
            optional=string_as_bool(elem.get("optional", "false")),
            label=elem.get("label", ""),
            description=elem.get("description", ""),
        )
[docs]
class Variable(BaseCredential):
[docs]
    @classmethod
    def from_element(cls, elem) -> "Variable":
        return cls(
            name=elem.get("name"),
            inject_as_env=elem.get("inject_as_env"),
            optional=string_as_bool(elem.get("optional", "false")),
            label=elem.get("label", ""),
            description=elem.get("description", ""),
        )
[docs]
class CredentialsRequirement:
[docs]
    def __init__(
        self,
        name: str,
        version: str,
        label: str = "",
        description: str = "",
        optional: bool = False,
        secrets: Optional[List[Secret]] = None,
        variables: Optional[List[Variable]] = None,
    ) -> None:
        self.name = name
        self.version = version
        self.label = label
        self.description = description
        self.optional = optional
        self.secrets = secrets if secrets is not None else []
        self.variables = variables if variables is not None else []
        if not self.name:
            raise ValueError("Missing user credentials name")
        if not self.version:
            raise ValueError("Missing version")
[docs]
    def to_dict(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "version": self.version,
            "label": self.label,
            "description": self.description,
            "optional": self.optional,
            "secrets": [s.to_dict() for s in self.secrets],
            "variables": [v.to_dict() for v in self.variables],
        }
[docs]
    @classmethod
    def from_dict(cls, dict: Dict[str, Any]) -> "CredentialsRequirement":
        name = dict["name"]
        version = dict["version"]
        label = dict.get("label", "")
        description = dict.get("description", "")
        optional = dict.get("optional", False)
        secrets = [Secret.from_element(s) for s in dict.get("secrets", [])]
        variables = [Variable.from_element(v) for v in dict.get("variables", [])]
        return cls(
            name=name,
            version=version,
            label=label,
            description=description,
            optional=optional,
            secrets=secrets,
            variables=variables,
        )
[docs]
def parse_requirements_from_lists(
    software_requirements: List[Union[ToolRequirement, Dict[str, Any]]],
    containers: Iterable[Dict[str, Any]],
    resource_requirements: Iterable[Dict[str, Any]],
    javascript_requirements: List[Dict[str, Any]],
    credentials: Iterable[Dict[str, Any]],
) -> Tuple[
    ToolRequirements,
    List[ContainerDescription],
    List[ResourceRequirement],
    List[JavascriptRequirement],
    List[CredentialsRequirement],
]:
    return (
        ToolRequirements.from_list(software_requirements),
        [ContainerDescription.from_dict(c) for c in containers],
        resource_requirements_from_list(resource_requirements),
        [JavascriptRequirement(**r) for r in javascript_requirements],
        [CredentialsRequirement.from_dict(s) for s in credentials],
    )
[docs]
def parse_requirements_from_xml(xml_root, parse_resources_and_credentials: bool = False):
    """
    Parses requirements, containers and optionally resource requirements from Xml tree.
    >>> from galaxy.util import parse_xml_string
    >>> def load_requirements(contents, parse_resources_and_credentials=False):
    ...     contents_document = '''<tool><requirements>%s</requirements></tool>'''
    ...     root = parse_xml_string(contents_document % contents)
    ...     return parse_requirements_from_xml(root, parse_resources_and_credentials=parse_resources_and_credentials)
    >>> reqs, containers = load_requirements('''<requirement>bwa</requirement>''')
    >>> reqs[0].name
    'bwa'
    >>> reqs[0].version is None
    True
    >>> reqs[0].type
    'package'
    >>> reqs, containers = load_requirements('''<requirement type="binary" version="1.3.3">cufflinks</requirement>''')
    >>> reqs[0].name
    'cufflinks'
    >>> reqs[0].version
    '1.3.3'
    >>> reqs[0].type
    'binary'
    """
    requirements_elem = xml_root.find("requirements")
    requirement_elems = []
    container_elems = []
    if requirements_elem is not None:
        requirement_elems = requirements_elem.findall("requirement")
        container_elems = requirements_elem.findall("container")
    requirements = ToolRequirements()
    for requirement_elem in requirement_elems:
        name = xml_text(requirement_elem)
        type = requirement_elem.get("type", DEFAULT_REQUIREMENT_TYPE)
        version = requirement_elem.get("version", DEFAULT_REQUIREMENT_VERSION)
        requirement = ToolRequirement(name=name, type=type, version=version)
        requirements.append(requirement)
    containers = [container_from_element(c) for c in container_elems]
    if parse_resources_and_credentials:
        resource_elems = requirements_elem.findall("resource") if requirements_elem is not None else []
        resources = [resource_from_element(r) for r in resource_elems]
        javascript_requirements: List[Dict[str, Any]] = []
        credentials_elems = requirements_elem.findall("credentials") if requirements_elem is not None else []
        credentials = [credentials_from_element(s) for s in credentials_elems]
        return requirements, containers, resources, javascript_requirements, credentials
    return requirements, containers
[docs]
def resource_from_element(resource_elem) -> ResourceRequirement:
    value_or_expression = xml_text(resource_elem)
    resource_type = resource_elem.get("type")
    return ResourceRequirement(value_or_expression=value_or_expression, resource_type=resource_type)
[docs]
def container_from_element(container_elem) -> ContainerDescription:
    identifier = xml_text(container_elem)
    type = container_elem.get("type", DEFAULT_CONTAINER_TYPE)
    resolve_dependencies = asbool(container_elem.get("resolve_dependencies", DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES))
    shell = container_elem.get("shell", DEFAULT_CONTAINER_SHELL)
    container = ContainerDescription(
        identifier=identifier,
        type=type,
        resolve_dependencies=resolve_dependencies,
        shell=shell,
    )
    return container
[docs]
def credentials_from_element(credentials_elem) -> CredentialsRequirement:
    name = credentials_elem.get("name")
    version = credentials_elem.get("version")
    label = credentials_elem.get("label", "")
    description = credentials_elem.get("description", "")
    optional = string_as_bool(credentials_elem.get("optional", "false"))
    secrets = [Secret.from_element(elem) for elem in credentials_elem.findall("secret")]
    variables = [Variable.from_element(elem) for elem in credentials_elem.findall("variable")]
    return CredentialsRequirement(
        name=name,
        version=version,
        label=label,
        description=description,
        optional=optional,
        secrets=secrets,
        variables=variables,
    )