Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.job_execution.datasets

"""
Utility classes allowing Job interface to reason about datasets.
"""
import os.path
from abc import (
    ABCMeta,
    abstractmethod
)

import six


[docs]def dataset_path_rewrites(dataset_paths): dataset_paths_with_rewrites = [path for path in dataset_paths if getattr(path, "false_path", None)] return dict((dp.real_path, dp) for dp in dataset_paths_with_rewrites)
[docs]class DatasetPath(object):
[docs] def __init__( self, dataset_id, real_path, false_path=None, false_extra_files_path=None, false_metadata_path=None, mutable=True, dataset_uuid=None, object_store_id=None, ): self.dataset_id = dataset_id self.dataset_uuid = dataset_uuid self.object_store_id = object_store_id self.real_path = real_path self.false_path = false_path self.false_extra_files_path = false_extra_files_path self.false_metadata_path = false_metadata_path self.mutable = mutable
def __str__(self): if self.false_path is None: return self.real_path else: return self.false_path
[docs] def with_path_for_job(self, false_path, false_extra_files_path=None, false_metadata_path=None): """ Clone the dataset path but with a new false_path. """ dataset_path = self if false_path is not None: dataset_path = DatasetPath( dataset_id=self.dataset_id, real_path=self.real_path, false_path=false_path, false_extra_files_path=false_extra_files_path, false_metadata_path=false_metadata_path, mutable=self.mutable, ) return dataset_path
[docs]@six.add_metaclass(ABCMeta) class DatasetPathRewriter(object): """ Used by runner to rewrite paths. """
[docs] @abstractmethod def rewrite_dataset_path(self, dataset, dataset_type): """ Dataset type is 'input' or 'output'. Return None to indicate not to rewrite this path. """
[docs]class NullDatasetPathRewriter(object): """ Used by default for jobwrapper, do not rewrite anything. """
[docs] def rewrite_dataset_path(self, dataset, dataset_type): """ Keep path the same. """ return None
[docs]class OutputsToWorkingDirectoryPathRewriter(object): """ Rewrites all paths to place them in the specified working directory for normal jobs when Galaxy is configured with app.config.outputs_to_working_directory. Job runner base class is responsible for copying these out after job is complete. """
[docs] def __init__(self, working_directory, outputs_directory_name): self.working_directory = working_directory self.outputs_directory_name = outputs_directory_name
[docs] def rewrite_dataset_path(self, dataset, dataset_type): """ Keep path the same. """ if dataset_type == 'output': base_output_directory = os.path.abspath(self.working_directory) if self.outputs_directory_name is not None: base_output_directory = os.path.join(base_output_directory, self.outputs_directory_name) # set false_path to uuid, no harm even if object store uses id false_path = os.path.join(base_output_directory, "galaxy_dataset_%s.dat" % dataset.dataset.uuid) return false_path else: return None
[docs]class TaskPathRewriter(object): """ Rewrites all paths to place them in the specified working directory for TaskWrapper. TaskWrapper is responsible for putting them there and pulling them out. """
[docs] def __init__(self, working_directory, job_dataset_path_rewriter): self.working_directory = working_directory self.job_dataset_path_rewriter = job_dataset_path_rewriter
[docs] def rewrite_dataset_path(self, dataset, dataset_type): """ """ dataset_file_name = dataset.file_name job_file_name = self.job_dataset_path_rewriter.rewrite_dataset_path(dataset, dataset_type) or dataset_file_name return os.path.join(self.working_directory, os.path.basename(job_file_name))