"""Code allowing tools to define extra files associated with an output datset."""
import abc
import logging
import operator
import os
import re
from collections.abc import Callable
from tempfile import NamedTemporaryFile
from typing import (
Any,
Optional,
TYPE_CHECKING,
Union,
)
from galaxy.model import (
Dataset,
DatasetInstance,
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
JOB_IO_NAME_MAX_LENGTH,
JobOutputNameTooLongError,
)
from galaxy.model.dataset_collections import builder
from galaxy.model.dataset_collections.structure import UninitializedTree
from galaxy.model.dataset_collections.type_description import COLLECTION_TYPE_DESCRIPTION_FACTORY
from galaxy.model.store.discover import (
discover_target_directory,
DiscoveredFile,
JsonCollectedDatasetMatch,
MaxDiscoveredFilesExceededError,
MetadataSourceProvider as AbstractMetadataSourceProvider,
ModelPersistenceContext,
PermissionProvider as AbstractPermissionProvider,
persist_elements_to_folder,
persist_elements_to_hdca,
persist_hdas,
RegexCollectedDatasetMatch,
SessionlessModelPersistenceContext,
UNSET,
)
from galaxy.objectstore import (
ObjectStore,
persist_extra_files,
)
from galaxy.tool_util.parser.output_collection_def import (
DEFAULT_DATASET_COLLECTOR_DESCRIPTION,
INPUT_DBKEY_TOKEN,
ToolProvidedMetadataDatasetCollection,
)
from galaxy.tool_util.parser.output_objects import (
ToolOutput,
ToolOutputCollection,
)
from galaxy.tool_util.provided_metadata import BaseToolProvidedMetadata
from galaxy.util import (
shrink_and_unicodify,
unicodify,
)
if TYPE_CHECKING:
from galaxy.model import (
Job,
LibraryFolder,
)
from galaxy.model.store import (
BaseDirectoryImportModelStore,
DirectoryModelExportStore,
)
from galaxy.schema.schema import JobState
DATASET_ID_TOKEN = "DATASET_ID"
log = logging.getLogger(__name__)
# PermissionProvider and MetadataSourceProvider are abstractions over input data used to
# collect and produce dynamic outputs.
[docs]
class PermissionProvider(AbstractPermissionProvider):
[docs]
def __init__(self, inp_data, security_agent, job):
self._job = job
self._security_agent = security_agent
self._inp_data = inp_data
self._user = job.user
self._permissions = None
@property
def permissions(self):
if self._permissions is None:
inp_data = self._inp_data
existing_datasets = [inp for inp in inp_data.values() if inp]
if existing_datasets:
permissions = self._security_agent.guess_derived_permissions_for_datasets(existing_datasets)
else:
# No valid inputs, we will use history defaults
permissions = self._security_agent.history_get_default_permissions(self._job.history)
self._permissions = permissions
return self._permissions
[docs]
def set_default_hda_permissions(self, primary_data):
if (permissions := self.permissions) is not UNSET:
self._security_agent.set_all_dataset_permissions(primary_data.dataset, permissions, new=True, flush=False)
[docs]
def copy_dataset_permissions(self, init_from, primary_data):
self._security_agent.copy_dataset_permissions(init_from.dataset, primary_data.dataset, flush=False)
[docs]
def collect_dynamic_outputs(
job_context: "BaseJobContext",
output_collections: dict[str, Any],
):
# unmapped outputs do not correspond to explicit outputs of the tool, they were inferred entirely
# from the tool provided metadata (e.g. galaxy.json).
for unnamed_output_dict in job_context.tool_provided_metadata.get_unnamed_outputs():
assert "destination" in unnamed_output_dict
assert "elements" in unnamed_output_dict
destination = unnamed_output_dict["destination"]
elements = unnamed_output_dict["elements"]
# If rows are specified at the collection level, add them to individual elements
# This is a defensive check in case rows weren't already distributed in data_fetch.py
if "rows" in unnamed_output_dict:
rows_dict = unnamed_output_dict["rows"]
for element in elements:
element_name = element.get("name")
if element_name and element_name in rows_dict and "row" not in element:
element["row"] = rows_dict[element_name]
assert "type" in destination
destination_type = destination["type"]
assert destination_type in ["library_folder", "hdca", "hdas"]
# three destination types we need to handle here - "library_folder" (place discovered files in a library folder),
# "hdca" (place discovered files in a history dataset collection), and "hdas" (place discovered files in a history
# as stand-alone datasets).
if destination_type == "library_folder":
# populate a library folder (needs to have already been created)
library_folder = job_context.get_library_folder(destination)
persist_elements_to_folder(job_context, elements, library_folder)
job_context.persist_library_folder(library_folder)
elif destination_type == "hdca":
# create or populate a dataset collection in the history
assert "collection_type" in unnamed_output_dict
object_id = destination.get("object_id")
if object_id:
hdca = job_context.get_hdca(object_id)
else:
name = unnamed_output_dict.get("name", "unnamed collection")
collection_type = unnamed_output_dict["collection_type"]
collection_type_description = COLLECTION_TYPE_DESCRIPTION_FACTORY.for_collection_type(collection_type)
structure = UninitializedTree(collection_type_description)
hdca = job_context.create_hdca(name, structure)
copy_collection_metadata_from_target_dict(hdca, unnamed_output_dict)
output_collections[name] = hdca
job_context.add_dataset_collection(hdca)
error_message = unnamed_output_dict.get("error_message")
if error_message:
hdca.collection.handle_population_failed(error_message)
else:
persist_elements_to_hdca(job_context, elements, hdca, collector=DEFAULT_DATASET_COLLECTOR)
elif destination_type == "hdas":
persist_hdas(elements, job_context, final_job_state=job_context.final_job_state)
for name, has_collection in output_collections.items():
output_collection_def = job_context.output_collection_def(name)
if not output_collection_def:
continue
if not output_collection_def.dynamic_structure:
continue
# Could be HDCA for normal jobs or a DC for mapping
# jobs.
if hasattr(has_collection, "collection"):
collection = has_collection.collection
else:
collection = has_collection
# We are adding dynamic collections, which may be precreated, but their actually state is still new!
collection.populated_state = collection.populated_states.NEW
# Clear any existing elements to avoid duplicates when re-populating
collection.elements.clear()
collection.element_count = None
try:
collection_builder = builder.BoundCollectionBuilder(collection)
dataset_collectors = [
dataset_collector(description) for description in output_collection_def.dataset_collector_descriptions
]
output_name = output_collection_def.name
filenames = job_context.find_files(output_name, collection, dataset_collectors)
job_context.populate_collection_elements(
collection,
collection_builder,
filenames,
name=output_collection_def.name,
metadata_source_name=output_collection_def.metadata_source,
final_job_state=job_context.final_job_state,
change_datatype_actions=job_context.change_datatype_actions,
)
collection_builder.populate()
except MaxDiscoveredFilesExceededError:
# Mark the collection as population-failed so it is not left in NEW,
# then let the outer metadata/job handler record this in job_messages.
collection.handle_population_failed("Job generated more than the maximum number of output datasets.")
# Register the (failed) collection with the job context so that in
# the extended-metadata path the updated populated_state is
# serialized to the export store, and the host side imports the
# FAILED collection state rather than leaving it stuck in NEW.
job_context.add_dataset_collection(has_collection)
raise
except Exception:
log.exception("Problem gathering output collection.")
collection.handle_population_failed("Problem building datasets for collection.")
job_context.add_dataset_collection(has_collection)
[docs]
class BaseJobContext(ModelPersistenceContext):
final_job_state: "JobState"
max_discovered_files: Union[int, float]
tool_provided_metadata: BaseToolProvidedMetadata
job_working_directory: str
[docs]
def add_dataset_collection(self, collection):
pass
[docs]
def find_files(self, output_name, collection, dataset_collectors):
discovered_files: list[DiscoveredFile] = []
for discovered_file in discover_files(
output_name, self.tool_provided_metadata, dataset_collectors, self.job_working_directory, collection
):
self.increment_discovered_file_count()
discovered_files.append(discovered_file)
return discovered_files
[docs]
@abc.abstractmethod
def get_job_id(self) -> int: ...
@property
@abc.abstractmethod
def change_datatype_actions(self) -> dict[str, Any]: ...
[docs]
@abc.abstractmethod
def create_hdca(self, name: str, structure: UninitializedTree) -> Union[HistoryDatasetCollectionAssociation]: ...
[docs]
@abc.abstractmethod
def get_hdca(self, object_id) -> HistoryDatasetCollectionAssociation: ...
[docs]
@abc.abstractmethod
def get_library_folder(self, destination: dict[str, Any]) -> "LibraryFolder": ...
[docs]
@abc.abstractmethod
def output_collection_def(self, name: str) -> Union[None, ToolOutputCollection]: ...
[docs]
@abc.abstractmethod
def output_def(self, name: str) -> Union[None, ToolOutput]: ...
[docs]
class SessionlessJobContext(SessionlessModelPersistenceContext, BaseJobContext):
export_store: Optional["DirectoryModelExportStore"]
[docs]
def __init__(
self,
metadata_params,
tool_provided_metadata: BaseToolProvidedMetadata,
object_store: Optional[ObjectStore],
export_store: Optional["DirectoryModelExportStore"],
import_store: "BaseDirectoryImportModelStore",
working_directory: str,
final_job_state: "JobState",
max_discovered_files: Optional[int],
job: Optional["Job"] = None,
):
# TODO: use a metadata source provider... (pop from inputs and add parameter)
super().__init__(object_store, export_store, working_directory)
self.metadata_params = metadata_params
self.tool_provided_metadata = tool_provided_metadata
self.import_store = import_store
self.final_job_state = final_job_state
self.max_discovered_files = float("inf") if max_discovered_files is None else max_discovered_files
self.discovered_file_count = 0
self._job = job
@property
def job(self):
return self._job
@property
def change_datatype_actions(self):
return self.metadata_params.get("change_datatype_actions", {})
@property
def sa_session(self):
return self.import_store.sa_session
[docs]
def output_collection_def(self, name):
tool_as_dict = self.metadata_params["tool"]
output_collection_defs = tool_as_dict["output_collections"]
if name not in output_collection_defs:
return None
output_collection_def_dict = output_collection_defs[name]
output_collection_def = ToolOutputCollection.from_dict(name, output_collection_def_dict)
return output_collection_def
[docs]
def output_def(self, name):
tool_as_dict = self.metadata_params["tool"]
output_defs = tool_as_dict["outputs"]
if name not in output_defs:
return None
output_def_dict = output_defs[name]
output_def = ToolOutput.from_dict(name, output_def_dict)
return output_def
[docs]
def job_id(self):
return "non-session bound job"
[docs]
def get_hdca(self, object_id):
hdca = self.sa_session.query(HistoryDatasetCollectionAssociation).find(int(object_id))
if hdca:
self.export_store.add_dataset_collection(hdca)
for collection_dataset in hdca.dataset_instances:
include_files = True
self.export_store.add_dataset(collection_dataset, include_files=include_files)
self.export_store.collection_datasets.add(collection_dataset.id)
return hdca
[docs]
def add_dataset_collection(self, collection):
self.export_store.add_dataset_collection(collection)
for collection_dataset in collection.dataset_instances:
include_files = True
self.export_store.add_dataset(collection_dataset, include_files=include_files)
self.export_store.collection_datasets.add(collection_dataset.id)
[docs]
def add_output_dataset_association(self, name, dataset_instance):
if name and len(name) > JOB_IO_NAME_MAX_LENGTH:
raise JobOutputNameTooLongError(
f"Tool produced an output name that exceeds the {JOB_IO_NAME_MAX_LENGTH} character name length limit "
f"(got {len(name)} characters), tool is likely broken"
)
assert self.export_store
self.export_store.add_job_output_dataset_associations(self.get_job_id(), name, dataset_instance)
[docs]
def get_job_id(self):
return self.metadata_params["job_id_tag"]
[docs]
def get_implicit_collection_jobs_association_id(self):
return self.metadata_params.get("implicit_collection_jobs_association_id")
[docs]
def collect_primary_datasets(job_context: BaseJobContext, output: dict[str, DatasetInstance], input_ext):
job_working_directory = job_context.job_working_directory
# Loop through output file names, looking for generated primary
# datasets in form specified by discover dataset patterns or in tool provided metadata.
new_outdata_name = None
primary_datasets: dict[str, dict[str, DatasetInstance]] = {}
storage_callbacks: list[Callable] = []
for name, outdata in output.items():
primary_output_assigned = False
dataset_collectors = [DEFAULT_DATASET_COLLECTOR]
output_def = job_context.output_def(name)
if output_def is not None:
dataset_collectors = [
dataset_collector(description) for description in output_def.dataset_collector_descriptions
]
filenames = {}
for discovered_file in discover_files(
name, job_context.tool_provided_metadata, dataset_collectors, job_working_directory, outdata
):
job_context.increment_discovered_file_count()
filenames[discovered_file.path] = discovered_file
for filename_index, (filename, discovered_file) in enumerate(filenames.items()):
extra_file_collector = discovered_file.collector
fields_match = discovered_file.match
if not fields_match:
# Before I guess pop() would just have thrown an IndexError
raise Exception(f"Problem parsing metadata fields for file {filename}")
designation = fields_match.designation
ext = fields_match.ext
if ext == "input":
ext = input_ext
ext = ext.lower()
dbkey = fields_match.dbkey
if dbkey == INPUT_DBKEY_TOKEN:
dbkey = job_context.input_dbkey
if filename_index == 0 and extra_file_collector.assign_primary_output:
new_outdata_name = fields_match.name or f"{outdata.name} ({designation})"
outdata.change_datatype(ext)
outdata.dbkey = dbkey
outdata.designation = designation
outdata.dataset.external_filename = None # resets filename_override
# Move data from temp location to dataset location
if not outdata.dataset.purged:
assert job_context.object_store
job_context.object_store.update_from_file(outdata.dataset, file_name=filename, create=True)
primary_output_assigned = True
continue
if name not in primary_datasets:
primary_datasets[name] = {}
visible = fields_match.visible
# Create new primary dataset
new_primary_name = fields_match.name or f"{outdata.name} ({designation})"
info = outdata.info
# TODO: should be able to disambiguate files in different directories...
new_primary_filename = os.path.split(filename)[-1]
new_primary_datasets_attributes = job_context.tool_provided_metadata.get_new_dataset_meta_by_basename(
name, new_primary_filename
)
extra_files = None
if new_primary_datasets_attributes:
extra_files_path = new_primary_datasets_attributes.get("extra_files", None)
if extra_files_path:
extra_files = os.path.join(job_working_directory, extra_files_path)
primary_data = job_context.create_dataset(
ext,
designation,
visible,
dbkey,
new_primary_name,
filename,
extra_files=extra_files,
info=info,
init_from=outdata,
dataset_attributes=new_primary_datasets_attributes,
creating_job_id=job_context.get_job_id() if job_context else None,
storage_callbacks=storage_callbacks,
purged=outdata.dataset.purged,
)
try:
# Associate new dataset with job
job_context.add_output_dataset_association(f"__new_primary_file_{name}|{designation}__", primary_data)
except JobOutputNameTooLongError:
primary_data.dataset.state = Dataset.states.DISCARDED
primary_data.dataset.file_size = 0
job_context.add_datasets_to_history([primary_data], for_output_dataset=outdata)
raise
job_context.add_datasets_to_history([primary_data], for_output_dataset=outdata)
# Add dataset to return dict
primary_datasets[name][designation] = primary_data
if primary_output_assigned:
outdata.name = new_outdata_name
outdata.init_meta()
if not outdata.dataset.purged:
try:
outdata.set_meta()
except Exception:
# We don't want to fail here on a single "bad" discovered dataset
log.debug("set meta failed for %s", outdata, exc_info=True)
outdata.state = HistoryDatasetAssociation.states.FAILED_METADATA
outdata.set_peek()
outdata.discovered = True # type: ignore[attr-defined]
sa_session = job_context.sa_session
if sa_session:
sa_session.add(outdata)
# Move discovered outputs to storage and set metdata / peeks
for callback in storage_callbacks:
callback()
return primary_datasets
[docs]
def discover_files(output_name, tool_provided_metadata, extra_file_collectors, job_working_directory, matchable):
extra_file_collectors = extra_file_collectors
if extra_file_collectors and extra_file_collectors[0].discover_via == "tool_provided_metadata":
# just load entries from tool provided metadata...
assert len(extra_file_collectors) == 1
extra_file_collector = extra_file_collectors[0]
target_directory = discover_target_directory(extra_file_collector.directory, job_working_directory)
for dataset in tool_provided_metadata.get_new_datasets(output_name):
filename = dataset["filename"]
path = os.path.join(target_directory, filename)
yield DiscoveredFile(
path,
extra_file_collector,
JsonCollectedDatasetMatch(dataset, extra_file_collector, filename, path=path),
)
else:
for match, collector in walk_over_file_collectors(extra_file_collectors, job_working_directory, matchable):
yield DiscoveredFile(match.path, collector, match)
[docs]
def walk_over_file_collectors(extra_file_collectors, job_working_directory, matchable):
for extra_file_collector in extra_file_collectors:
assert extra_file_collector.discover_via == "pattern"
for match in walk_over_extra_files(
extra_file_collector.directory, extra_file_collector, job_working_directory, matchable
):
yield match, extra_file_collector
[docs]
def dataset_collector(dataset_collection_description):
if dataset_collection_description is DEFAULT_DATASET_COLLECTOR_DESCRIPTION:
# Use 'is' and 'in' operators, so lets ensure this is
# treated like a singleton.
return DEFAULT_DATASET_COLLECTOR
else:
if dataset_collection_description.discover_via == "pattern":
return DatasetCollector(dataset_collection_description)
else:
return ToolMetadataDatasetCollector(dataset_collection_description)
[docs]
class DatasetCollector:
[docs]
def __init__(self, dataset_collection_description):
self.discover_via = dataset_collection_description.discover_via
# dataset_collection_description is an abstract description
# built from the tool parsing module - see galaxy.tool_util.parser.output_collection_def
self.sort_key = dataset_collection_description.sort_key
self.sort_reverse = dataset_collection_description.sort_reverse
self.sort_comp = dataset_collection_description.sort_comp
self.pattern = dataset_collection_description.pattern
self.default_dbkey = dataset_collection_description.default_dbkey
self.default_ext = dataset_collection_description.default_ext
self.default_visible = dataset_collection_description.default_visible
self.directory = dataset_collection_description.directory
self.assign_primary_output = dataset_collection_description.assign_primary_output
self.recurse = dataset_collection_description.recurse
self.match_relative_path = dataset_collection_description.match_relative_path
def _pattern_for_dataset(self, dataset_instance=None):
token_replacement = r"\d+"
if dataset_instance:
token_replacement = str(dataset_instance.id)
return self.pattern.replace(DATASET_ID_TOKEN, token_replacement)
[docs]
def match(self, dataset_instance, filename, path=None, parent_paths=None):
pattern = self._pattern_for_dataset(dataset_instance)
if self.match_relative_path and parent_paths:
filename = os.path.join(*parent_paths, filename)
match_object = None
if re_match := re.match(pattern, filename):
match_object = RegexCollectedDatasetMatch(re_match, self, filename, path=path)
return match_object
[docs]
def sort(self, matches):
reverse = self.sort_reverse
sort_key = self.sort_key
sort_comp = self.sort_comp
assert sort_key in ["filename", "dbkey", "name", "designation"]
assert sort_comp in ["lexical", "numeric"]
key = operator.attrgetter(sort_key)
if sort_comp == "numeric":
key = _compose(int, key)
return sorted(matches, key=key, reverse=reverse)
def _compose(f, g):
return lambda x: f(g(x))
DEFAULT_DATASET_COLLECTOR = DatasetCollector(DEFAULT_DATASET_COLLECTOR_DESCRIPTION)
DEFAULT_TOOL_PROVIDED_DATASET_COLLECTOR = ToolMetadataDatasetCollector(ToolProvidedMetadataDatasetCollection())
[docs]
def read_exit_code_from(exit_code_file, id_tag):
"""Read exit code reported for a Galaxy job."""
try:
# This should be an 8-bit exit code, but read ahead anyway:
exit_code_str = open(exit_code_file).read(32)
except Exception:
# By default, the exit code is 0, which typically indicates success.
exit_code_str = "0"
try:
# Decode the exit code. If it's bogus, then just use 0.
exit_code = int(exit_code_str)
except ValueError:
galaxy_id_tag = id_tag
log.warning(f"({galaxy_id_tag}) Exit code '{exit_code_str}' invalid. Using 0.")
exit_code = 0
return exit_code
[docs]
def default_exit_code_file(files_dir, id_tag):
return os.path.join(files_dir, f"galaxy_{id_tag}.ec")
[docs]
def collect_shrinked_content_from_path(path):
try:
with open(path, "rb") as fh:
return shrink_and_unicodify(fh.read().strip())
except FileNotFoundError:
return None