import os
from abc import (
ABCMeta,
abstractmethod,
)
from typing import (
Any,
Dict,
)
from galaxy.job_execution.setup import JobIO
from galaxy.model import Job
[docs]class ComputeEnvironment(metaclass=ABCMeta):
"""Definition of the job as it will be run on the (potentially) remote
compute server.
"""
[docs] @abstractmethod
def output_names(self):
"""Output unqualified filenames defined by job."""
[docs] @abstractmethod
def output_path_rewrite(self, dataset):
"""Output path for specified dataset."""
[docs] @abstractmethod
def unstructured_path_rewrite(self, path):
"""Rewrite loc file paths, etc.."""
[docs] @abstractmethod
def working_directory(self):
"""Job working directory (potentially remote)"""
[docs] @abstractmethod
def config_directory(self):
"""Directory containing config files (potentially remote)"""
[docs] @abstractmethod
def env_config_directory(self):
"""Working directory (possibly as environment variable evaluation)."""
[docs] @abstractmethod
def sep(self):
"""os.path.sep for the platform this job will execute in."""
[docs] @abstractmethod
def new_file_path(self):
"""Absolute path to dump new files for this job on compute server."""
[docs] @abstractmethod
def version_path(self):
"""Location of the version file for the underlying tool."""
[docs] @abstractmethod
def home_directory(self):
"""Home directory of target job - none if HOME should not be set."""
[docs] @abstractmethod
def tmp_directory(self):
"""Temp directory of target job - none if HOME should not be set."""
[docs] @abstractmethod
def galaxy_url(self):
"""URL to access Galaxy API from for this compute environment."""
[docs] @abstractmethod
def get_file_sources_dict(self) -> Dict[str, Any]:
"""Return file sources dict for current user."""
[docs]class SimpleComputeEnvironment:
[docs] def config_directory(self):
return os.path.join(self.working_directory(), "configs") # type: ignore[attr-defined]
[docs] def sep(self):
return os.path.sep
[docs]class SharedComputeEnvironment(SimpleComputeEnvironment, ComputeEnvironment):
"""Default ComputeEnvironment for job and task wrapper to pass
to ToolEvaluator - valid when Galaxy and compute share all the relevant
file systems.
"""
job_id: JobIO
job: Job
[docs] def __init__(self, job_io: JobIO, job: Job):
self.job_io = job_io
self.job = job
[docs] def get_file_sources_dict(self) -> Dict[str, Any]:
return self.job_io.file_sources_dict
[docs] def output_names(self):
return self.job_io.get_output_basenames()
[docs] def output_paths(self):
return self.job_io.get_output_fnames()
[docs] def output_path_rewrite(self, dataset):
return str(self.job_io.get_output_path(dataset))
[docs] def unstructured_path_rewrite(self, path):
return None
[docs] def working_directory(self):
return self.job_io.working_directory
[docs] def env_config_directory(self):
"""Working directory (possibly as environment variable evaluation)."""
return "$_GALAXY_JOB_DIR"
[docs] def new_file_path(self):
return self.job_io.new_file_path
[docs] def version_path(self):
return self.job_io.version_path
[docs] def home_directory(self):
return self.job_io.home_directory
[docs] def tmp_directory(self):
return self.job_io.tmp_directory
[docs] def galaxy_url(self):
return self.job_io.galaxy_url