Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.job_execution.datasets

"""
Utility classes allowing Job interface to reason about datasets.
"""
import os.path
from abc import (
    ABCMeta,
    abstractmethod,
)


[docs]def dataset_path_rewrites(dataset_paths): dataset_paths_with_rewrites = [path for path in dataset_paths if getattr(path, "false_path", None)] return {dp.real_path: dp for dp in dataset_paths_with_rewrites}
[docs]class DatasetPath:
[docs] def __init__( self, dataset_id, real_path, false_path=None, false_extra_files_path=None, false_metadata_path=None, mutable=True, dataset_uuid=None, object_store_id=None, ): self.dataset_id = dataset_id self.dataset_uuid = dataset_uuid self.object_store_id = object_store_id self.real_path = real_path self.false_path = false_path self.false_extra_files_path = false_extra_files_path self.false_metadata_path = false_metadata_path self.mutable = mutable
def __str__(self): if self.false_path is None: return self.real_path else: return self.false_path
[docs] def with_path_for_job(self, false_path, false_extra_files_path=None, false_metadata_path=None): """ Clone the dataset path but with a new false_path. """ dataset_path = self if false_path is not None: dataset_path = DatasetPath( dataset_id=self.dataset_id, real_path=self.real_path, false_path=false_path, false_extra_files_path=false_extra_files_path, false_metadata_path=false_metadata_path, mutable=self.mutable, ) return dataset_path
[docs]class DatasetPathRewriter(metaclass=ABCMeta): """Used by runner to rewrite paths."""
[docs] @abstractmethod def rewrite_dataset_path(self, dataset, dataset_type): """ Dataset type is 'input' or 'output'. Return None to indicate not to rewrite this path. """
[docs]class NullDatasetPathRewriter(DatasetPathRewriter): """Used by default for jobwrapper, do not rewrite anything."""
[docs] def rewrite_dataset_path(self, dataset, dataset_type): """Keep path the same.""" return None
[docs]class OutputsToWorkingDirectoryPathRewriter(DatasetPathRewriter): """Rewrites all paths to place them in the specified working directory for normal jobs when Galaxy is configured with app.config.outputs_to_working_directory. Job runner base class is responsible for copying these out after job is complete. """
[docs] def __init__(self, working_directory, outputs_directory_name): self.working_directory = working_directory self.outputs_directory_name = outputs_directory_name
[docs] def rewrite_dataset_path(self, dataset, dataset_type): """Keep path the same.""" if dataset_type == "output": base_output_directory = os.path.abspath(self.working_directory) if self.outputs_directory_name is not None: base_output_directory = os.path.join(base_output_directory, self.outputs_directory_name) # set false_path to uuid, no harm even if object store uses id false_path = os.path.join(base_output_directory, f"galaxy_dataset_{dataset.dataset.uuid}.dat") return false_path else: return None
[docs]class TaskPathRewriter(DatasetPathRewriter): """Rewrites all paths to place them in the specified working directory for TaskWrapper. TaskWrapper is responsible for putting them there and pulling them out. """
[docs] def __init__(self, working_directory, job_dataset_path_rewriter): self.working_directory = working_directory self.job_dataset_path_rewriter = job_dataset_path_rewriter
[docs] def rewrite_dataset_path(self, dataset, dataset_type): """ """ dataset_file_name = dataset.file_name job_file_name = self.job_dataset_path_rewriter.rewrite_dataset_path(dataset, dataset_type) or dataset_file_name return os.path.join(self.working_directory, os.path.basename(job_file_name))
[docs]def get_path_rewriter( outputs_to_working_directory, working_directory, outputs_directory, is_task ) -> DatasetPathRewriter: job_dataset_path_rewriter: DatasetPathRewriter = ( OutputsToWorkingDirectoryPathRewriter(working_directory, outputs_directory) if outputs_to_working_directory else NullDatasetPathRewriter() ) if is_task: return TaskPathRewriter(working_directory, job_dataset_path_rewriter=job_dataset_path_rewriter) return job_dataset_path_rewriter