Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.job_execution.datasets
"""
Utility classes allowing Job interface to reason about datasets.
"""
import os.path
from abc import (
ABCMeta,
abstractmethod
)
[docs]def dataset_path_rewrites(dataset_paths):
dataset_paths_with_rewrites = [path for path in dataset_paths if getattr(path, "false_path", None)]
return {dp.real_path: dp for dp in dataset_paths_with_rewrites}
[docs]class DatasetPath:
[docs] def __init__(
self,
dataset_id,
real_path,
false_path=None,
false_extra_files_path=None,
false_metadata_path=None,
mutable=True,
dataset_uuid=None,
object_store_id=None,
):
self.dataset_id = dataset_id
self.dataset_uuid = dataset_uuid
self.object_store_id = object_store_id
self.real_path = real_path
self.false_path = false_path
self.false_extra_files_path = false_extra_files_path
self.false_metadata_path = false_metadata_path
self.mutable = mutable
def __str__(self):
if self.false_path is None:
return self.real_path
else:
return self.false_path
[docs] def with_path_for_job(self, false_path, false_extra_files_path=None, false_metadata_path=None):
"""
Clone the dataset path but with a new false_path.
"""
dataset_path = self
if false_path is not None:
dataset_path = DatasetPath(
dataset_id=self.dataset_id,
real_path=self.real_path,
false_path=false_path,
false_extra_files_path=false_extra_files_path,
false_metadata_path=false_metadata_path,
mutable=self.mutable,
)
return dataset_path
[docs]class DatasetPathRewriter(metaclass=ABCMeta):
""" Used by runner to rewrite paths. """
[docs] @abstractmethod
def rewrite_dataset_path(self, dataset, dataset_type):
"""
Dataset type is 'input' or 'output'.
Return None to indicate not to rewrite this path.
"""
[docs]class NullDatasetPathRewriter:
""" Used by default for jobwrapper, do not rewrite anything.
"""
[docs] def rewrite_dataset_path(self, dataset, dataset_type):
""" Keep path the same.
"""
return None
[docs]class OutputsToWorkingDirectoryPathRewriter:
""" Rewrites all paths to place them in the specified working
directory for normal jobs when Galaxy is configured with
app.config.outputs_to_working_directory. Job runner base class
is responsible for copying these out after job is complete.
"""
[docs] def __init__(self, working_directory, outputs_directory_name):
self.working_directory = working_directory
self.outputs_directory_name = outputs_directory_name
[docs] def rewrite_dataset_path(self, dataset, dataset_type):
""" Keep path the same.
"""
if dataset_type == 'output':
base_output_directory = os.path.abspath(self.working_directory)
if self.outputs_directory_name is not None:
base_output_directory = os.path.join(base_output_directory, self.outputs_directory_name)
# set false_path to uuid, no harm even if object store uses id
false_path = os.path.join(base_output_directory, "galaxy_dataset_%s.dat" % dataset.dataset.uuid)
return false_path
else:
return None
[docs]class TaskPathRewriter:
""" Rewrites all paths to place them in the specified working
directory for TaskWrapper. TaskWrapper is responsible for putting
them there and pulling them out.
"""
[docs] def __init__(self, working_directory, job_dataset_path_rewriter):
self.working_directory = working_directory
self.job_dataset_path_rewriter = job_dataset_path_rewriter
[docs] def rewrite_dataset_path(self, dataset, dataset_type):
"""
"""
dataset_file_name = dataset.file_name
job_file_name = self.job_dataset_path_rewriter.rewrite_dataset_path(dataset, dataset_type) or dataset_file_name
return os.path.join(self.working_directory, os.path.basename(job_file_name))