Source code for galaxy.job_execution.compute_environment

import os
from abc import (
    ABCMeta,
    abstractmethod,
)
from typing import (
    Any,
    Dict,
)

from galaxy.job_execution.setup import JobIO
from galaxy.model import Job


[docs]def dataset_path_to_extra_path(path: str) -> str: base_path = path[0 : -len(".dat")] return f"{base_path}_files"
[docs]class ComputeEnvironment(metaclass=ABCMeta): """Definition of the job as it will be run on the (potentially) remote compute server. """
[docs] @abstractmethod def output_names(self): """Output unqualified filenames defined by job."""
[docs] @abstractmethod def input_path_rewrite(self, dataset): """Input path for specified dataset."""
[docs] @abstractmethod def output_path_rewrite(self, dataset): """Output path for specified dataset."""
[docs] @abstractmethod def input_extra_files_rewrite(self, dataset): """Input extra files path rewrite for specified dataset."""
[docs] @abstractmethod def output_extra_files_rewrite(self, dataset): """Output extra files path rewrite for specified dataset."""
[docs] @abstractmethod def input_metadata_rewrite(self, dataset, metadata_value): """Input metadata path rewrite for specified dataset."""
[docs] @abstractmethod def unstructured_path_rewrite(self, path): """Rewrite loc file paths, etc.."""
[docs] @abstractmethod def working_directory(self): """Job working directory (potentially remote)"""
[docs] @abstractmethod def config_directory(self): """Directory containing config files (potentially remote)"""
[docs] @abstractmethod def env_config_directory(self): """Working directory (possibly as environment variable evaluation)."""
[docs] @abstractmethod def sep(self): """os.path.sep for the platform this job will execute in."""
[docs] @abstractmethod def new_file_path(self): """Absolute path to dump new files for this job on compute server."""
[docs] @abstractmethod def tool_directory(self): """Absolute path to tool files for this job on compute server."""
[docs] @abstractmethod def version_path(self): """Location of the version file for the underlying tool."""
[docs] @abstractmethod def home_directory(self): """Home directory of target job - none if HOME should not be set."""
[docs] @abstractmethod def tmp_directory(self): """Temp directory of target job - none if HOME should not be set."""
[docs] @abstractmethod def galaxy_url(self): """URL to access Galaxy API from for this compute environment."""
[docs] @abstractmethod def get_file_sources_dict(self) -> Dict[str, Any]: """Return file sources dict for current user."""
[docs]class SimpleComputeEnvironment:
[docs] def config_directory(self): return os.path.join(self.working_directory(), "configs") # type: ignore[attr-defined]
[docs] def sep(self): return os.path.sep
[docs]class SharedComputeEnvironment(SimpleComputeEnvironment, ComputeEnvironment): """Default ComputeEnvironment for job and task wrapper to pass to ToolEvaluator - valid when Galaxy and compute share all the relevant file systems. """ job_id: JobIO job: Job
[docs] def __init__(self, job_io: JobIO, job: Job): self.job_io = job_io self.job = job
[docs] def get_file_sources_dict(self) -> Dict[str, Any]: return self.job_io.file_sources_dict
[docs] def output_names(self): return self.job_io.get_output_basenames()
[docs] def output_paths(self): return self.job_io.get_output_fnames()
[docs] def input_path_rewrite(self, dataset): return str(self.job_io.get_input_path(dataset))
[docs] def output_path_rewrite(self, dataset): return str(self.job_io.get_output_path(dataset))
[docs] def input_extra_files_rewrite(self, dataset): input_path_rewrite = self.input_path_rewrite(dataset) return dataset_path_to_extra_path(input_path_rewrite)
[docs] def output_extra_files_rewrite(self, dataset): output_path_rewrite = self.output_path_rewrite(dataset) return dataset_path_to_extra_path(output_path_rewrite)
[docs] def input_metadata_rewrite(self, dataset, metadata_value): return None
[docs] def unstructured_path_rewrite(self, path): return None
[docs] def working_directory(self): return self.job_io.working_directory
[docs] def env_config_directory(self): """Working directory (possibly as environment variable evaluation).""" return "$_GALAXY_JOB_DIR"
[docs] def new_file_path(self): return self.job_io.new_file_path
[docs] def version_path(self): return self.job_io.version_path
[docs] def tool_directory(self): return self.job_io.tool_directory
[docs] def home_directory(self): return self.job_io.home_directory
[docs] def tmp_directory(self): return self.job_io.tmp_directory
[docs] def galaxy_url(self): return self.job_io.galaxy_url