Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.job_execution.datasets
"""
Utility classes allowing Job interface to reason about datasets.
"""
import os.path
from abc import (
ABCMeta,
abstractmethod,
)
[docs]def dataset_path_rewrites(dataset_paths):
dataset_paths_with_rewrites = [path for path in dataset_paths if getattr(path, "false_path", None)]
return {dp.real_path: dp for dp in dataset_paths_with_rewrites}
[docs]class DatasetPath:
[docs] def __init__(
self,
dataset_id,
real_path,
false_path=None,
false_extra_files_path=None,
false_metadata_path=None,
mutable=True,
dataset_uuid=None,
object_store_id=None,
):
self.dataset_id = dataset_id
self.dataset_uuid = dataset_uuid
self.object_store_id = object_store_id
self.real_path = real_path
self.false_path = false_path
self.false_extra_files_path = false_extra_files_path
self.false_metadata_path = false_metadata_path
self.mutable = mutable
def __str__(self):
if self.false_path is None:
return self.real_path
else:
return self.false_path
[docs] def with_path_for_job(self, false_path, false_extra_files_path=None, false_metadata_path=None):
"""
Clone the dataset path but with a new false_path.
"""
dataset_path = self
if false_path is not None:
dataset_path = DatasetPath(
dataset_id=self.dataset_id,
real_path=self.real_path,
false_path=false_path,
false_extra_files_path=false_extra_files_path,
false_metadata_path=false_metadata_path,
mutable=self.mutable,
)
return dataset_path
[docs]class DatasetPathRewriter(metaclass=ABCMeta):
"""Used by runner to rewrite paths."""
[docs] @abstractmethod
def rewrite_dataset_path(self, dataset, dataset_type):
"""
Dataset type is 'input' or 'output'.
Return None to indicate not to rewrite this path.
"""
[docs]class NullDatasetPathRewriter(DatasetPathRewriter):
"""Used by default for jobwrapper, do not rewrite anything."""
[docs]class OutputsToWorkingDirectoryPathRewriter(DatasetPathRewriter):
"""Rewrites all paths to place them in the specified working
directory for normal jobs when Galaxy is configured with
app.config.outputs_to_working_directory. Job runner base class
is responsible for copying these out after job is complete.
"""
[docs] def __init__(self, working_directory, outputs_directory_name):
self.working_directory = working_directory
self.outputs_directory_name = outputs_directory_name
[docs] def rewrite_dataset_path(self, dataset, dataset_type):
"""Keep path the same."""
if dataset_type == "output":
base_output_directory = os.path.abspath(self.working_directory)
if self.outputs_directory_name is not None:
base_output_directory = os.path.join(base_output_directory, self.outputs_directory_name)
# set false_path to uuid, no harm even if object store uses id
false_path = os.path.join(base_output_directory, f"galaxy_dataset_{dataset.dataset.uuid}.dat")
return false_path
else:
return None
[docs]class TaskPathRewriter(DatasetPathRewriter):
"""Rewrites all paths to place them in the specified working
directory for TaskWrapper. TaskWrapper is responsible for putting
them there and pulling them out.
"""
[docs] def __init__(self, working_directory, job_dataset_path_rewriter):
self.working_directory = working_directory
self.job_dataset_path_rewriter = job_dataset_path_rewriter
[docs] def rewrite_dataset_path(self, dataset, dataset_type):
""" """
dataset_file_name = dataset.file_name
job_file_name = self.job_dataset_path_rewriter.rewrite_dataset_path(dataset, dataset_type) or dataset_file_name
return os.path.join(self.working_directory, os.path.basename(job_file_name))
[docs]def get_path_rewriter(
outputs_to_working_directory, working_directory, outputs_directory, is_task
) -> DatasetPathRewriter:
job_dataset_path_rewriter: DatasetPathRewriter = (
OutputsToWorkingDirectoryPathRewriter(working_directory, outputs_directory)
if outputs_to_working_directory
else NullDatasetPathRewriter()
)
if is_task:
return TaskPathRewriter(working_directory, job_dataset_path_rewriter=job_dataset_path_rewriter)
return job_dataset_path_rewriter