Source code for galaxy.model.store

import abc
import contextlib
import datetime
import logging
import os
import shutil
import tarfile
import tempfile
from collections import defaultdict
from collections.abc import (
    Iterable,
    Iterator,
)
from dataclasses import dataclass
from enum import Enum
from json import (
    dump,
    dumps,
    load,
)
from tempfile import mkdtemp
from types import TracebackType
from typing import (
    Any,
    Callable,
    cast,
    Literal,
    Optional,
    TYPE_CHECKING,
    Union,
)
from urllib.parse import urlparse

from bdbag import bdbag_api as bdb
from boltons.iterutils import remap
from pydantic import (
    BaseModel,
    ConfigDict,
)
from rocrate.model.computationalworkflow import (
    ComputationalWorkflow,
    WorkflowDescription,
)
from rocrate.rocrate import ROCrate
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.sql import expression
from typing_extensions import Protocol

from galaxy.datatypes.registry import Registry
from galaxy.exceptions import (
    MalformedContents,
    ObjectNotFound,
    RequestParameterInvalidException,
)
from galaxy.files import (
    ConfiguredFileSources,
    ProvidesFileSourcesUserContext,
)
from galaxy.files.uris import stream_url_to_file
from galaxy.model.base import ensure_object_added_to_session
from galaxy.model.mapping import GalaxyModelMapping
from galaxy.model.metadata import MetadataCollection
from galaxy.model.orm.util import (
    add_object_to_object_session,
    add_object_to_session,
    get_object_session,
)
from galaxy.model.tags import GalaxyTagHandler
from galaxy.objectstore import (
    BaseObjectStore,
    ObjectStore,
    persist_extra_files,
)
from galaxy.schema.bco import (
    BioComputeObjectCore,
    DescriptionDomain,
    DescriptionDomainUri,
    ErrorDomain,
    InputAndOutputDomain,
    InputAndOutputDomainUri,
    InputSubdomainItem,
    OutputSubdomainItem,
    ParametricDomain,
    ParametricDomainItem,
    PipelineStep,
    ProvenanceDomain,
    UsabilityDomain,
    XrefItem,
)
from galaxy.schema.bco.io_domain import Uri
from galaxy.schema.bco.util import (
    extension_domains,
    galaxy_execution_domain,
    get_contributors,
    write_to_file,
)
from galaxy.schema.schema import (
    DatasetStateField,
    ModelStoreFormat,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util import (
    FILENAME_VALID_CHARS,
    in_directory,
    safe_makedirs,
)
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import (
    CompressedFile,
    make_fast_zipfile,
)
from galaxy.util.path import StrPath
from ._bco_convert_utils import (
    bco_workflow_version,
    SoftwarePrerequisiteTracker,
)
from .ro_crate_utils import WorkflowRunCrateProfileBuilder
from ..custom_types import json_encoder
from ..item_attrs import (
    add_item_annotation,
    get_item_annotation_str,
)
from ... import model

if TYPE_CHECKING:
    from galaxy.managers.workflows import WorkflowContentsManager
    from galaxy.model import (
        HistoryItem,
        ImplicitCollectionJobs,
    )
    from galaxy.model.tags import GalaxyTagHandlerSession

log = logging.getLogger(__name__)

ObjectKeyType = Union[str, int]

ATTRS_FILENAME_HISTORY = "history_attrs.txt"
ATTRS_FILENAME_DATASETS = "datasets_attrs.txt"
ATTRS_FILENAME_JOBS = "jobs_attrs.txt"
ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS = "implicit_collection_jobs_attrs.txt"
ATTRS_FILENAME_COLLECTIONS = "collections_attrs.txt"
ATTRS_FILENAME_EXPORT = "export_attrs.txt"
ATTRS_FILENAME_LIBRARIES = "libraries_attrs.txt"
ATTRS_FILENAME_LIBRARY_FOLDERS = "library_folders_attrs.txt"
ATTRS_FILENAME_INVOCATIONS = "invocation_attrs.txt"
ATTRS_FILENAME_CONVERSIONS = "implicit_dataset_conversions.txt"
TRACEBACK = "traceback.txt"
GALAXY_EXPORT_VERSION = "2"

DICT_STORE_ATTRS_KEY_HISTORY = "history"
DICT_STORE_ATTRS_KEY_DATASETS = "datasets"
DICT_STORE_ATTRS_KEY_COLLECTIONS = "collections"
DICT_STORE_ATTRS_KEY_CONVERSIONS = "implicit_dataset_conversions"
DICT_STORE_ATTRS_KEY_JOBS = "jobs"
DICT_STORE_ATTRS_KEY_IMPLICIT_COLLECTION_JOBS = "implicit_collection_jobs"
DICT_STORE_ATTRS_KEY_LIBRARIES = "libraries"
DICT_STORE_ATTRS_KEY_INVOCATIONS = "invocations"


JsonDictT = dict[str, Any]


[docs] class StoreAppProtocol(Protocol): """Define the parts of a Galaxy-like app consumed by model store.""" datatypes_registry: Registry object_store: BaseObjectStore security: IdEncodingHelper tag_handler: GalaxyTagHandler model: GalaxyModelMapping file_sources: ConfiguredFileSources workflow_contents_manager: "WorkflowContentsManager"
[docs] class ImportDiscardedDataType(Enum): # Don't allow discarded 'okay' datasets on import, datasets will be marked deleted. FORBID = "forbid" # Allow datasets to be imported as experimental DISCARDED datasets that are not deleted if file data unavailable. ALLOW = "allow" # Import all datasets as discarded regardless of whether file data is available in the store. FORCE = "force"
[docs] class DatasetAttributeImportModel(BaseModel): state: Optional[DatasetStateField] = None external_filename: Optional[str] = None _extra_files_path: Optional[str] = None file_size: Optional[int] = None object_store_id: Optional[str] = None total_size: Optional[int] = None created_from_basename: Optional[str] = None uuid: Optional[str] = None model_config = ConfigDict(extra="ignore")
DEFAULT_DISCARDED_DATA_TYPE = ImportDiscardedDataType.FORBID
[docs] class ImportOptions: allow_edit: bool allow_library_creation: bool allow_dataset_object_edit: bool discarded_data: ImportDiscardedDataType
[docs] def __init__( self, allow_edit: bool = False, allow_library_creation: bool = False, allow_dataset_object_edit: Optional[bool] = None, discarded_data: ImportDiscardedDataType = DEFAULT_DISCARDED_DATA_TYPE, ) -> None: self.allow_edit = allow_edit self.allow_library_creation = allow_library_creation if allow_dataset_object_edit is None: self.allow_dataset_object_edit = allow_edit else: self.allow_dataset_object_edit = allow_dataset_object_edit self.discarded_data = discarded_data
[docs] class SessionlessContext:
[docs] def __init__(self) -> None: self.objects: dict[type, dict] = defaultdict(dict)
[docs] def execute(self, query: Any, *args, **kwargs) -> Any: pass
[docs] def delete(self, obj: model.RepresentById) -> None: self.objects[obj.__class__].pop(obj.id, None)
[docs] def scalars(self, query: Any, *args, **kwargs) -> Any: pass
[docs] def commit(self) -> None: pass
[docs] def flush(self) -> None: pass
[docs] def add(self, obj: model.RepresentById) -> None: self.objects[obj.__class__][obj.id] = obj
[docs] def query(self, model_class: type[model.RepresentById]) -> Bunch: def find(obj_id): return self.objects.get(model_class, {}).get(obj_id) or None def filter_by(*args, **kwargs): # TODO: Hack for history export archive, should support this too return Bunch(first=lambda: next(iter(self.objects.get(model_class, {None: None})))) return Bunch(find=find, get=find, filter_by=filter_by)
[docs] def get(self, model_class: type[model.RepresentById], primary_key: Any): # patch for SQLAlchemy 2.0 compatibility return self.query(model_class).get(primary_key)
[docs] def replace_metadata_file( metadata: dict[str, Any], dataset_instance: model.DatasetInstance, sa_session: Union[SessionlessContext, scoped_session], ) -> dict[str, Any]: def remap_objects(p, k, obj): if isinstance(obj, dict) and "model_class" in obj and obj["model_class"] == "MetadataFile": metadata_file = model.MetadataFile(dataset=dataset_instance, uuid=obj["uuid"]) sa_session.add(metadata_file) return (k, metadata_file) return (k, obj) return remap(metadata, remap_objects)
[docs] class ModelImportStore(metaclass=abc.ABCMeta): app: Optional[StoreAppProtocol] archive_dir: str sa_session: Union[scoped_session, SessionlessContext]
[docs] def __init__( self, import_options: Optional[ImportOptions] = None, app: Optional[StoreAppProtocol] = None, user: Optional[model.User] = None, object_store: Optional[ObjectStore] = None, tag_handler: Optional["GalaxyTagHandlerSession"] = None, ) -> None: if object_store is None: if app is not None: object_store = app.object_store self.object_store = object_store self.app = app if app is not None: self.sa_session = app.model.session self.sessionless = False else: self.sa_session = SessionlessContext() self.sessionless = True self.user = user self.import_options = import_options or ImportOptions() self.dataset_state_serialized = True self.tag_handler = tag_handler if self.defines_new_history(): self.import_history_encoded_id = self.new_history_properties().get("encoded_id") else: self.import_history_encoded_id = None
[docs] @abc.abstractmethod def workflow_paths(self) -> Iterator[tuple[str, str]]: ...
[docs] @abc.abstractmethod def defines_new_history(self) -> bool: """Does this store define a new history to create."""
[docs] @abc.abstractmethod def new_history_properties(self) -> dict[str, Any]: """Dict of history properties if defines_new_history() is truthy."""
[docs] @abc.abstractmethod def datasets_properties(self) -> list[dict[str, Any]]: """Return a list of HDA properties."""
[docs] def library_properties(self) -> list[dict[str, Any]]: """Return a list of library properties.""" return []
[docs] @abc.abstractmethod def invocations_properties(self) -> list[dict[str, Any]]: ...
[docs] @abc.abstractmethod def collections_properties(self) -> list[dict[str, Any]]: """Return a list of HDCA properties."""
[docs] @abc.abstractmethod def implicit_dataset_conversion_properties(self) -> list[dict[str, Any]]: """Return a list of ImplicitlyConvertedDatasetAssociation properties."""
[docs] @abc.abstractmethod def jobs_properties(self) -> list[dict[str, Any]]: """Return a list of jobs properties."""
[docs] @abc.abstractmethod def implicit_collection_jobs_properties(self) -> list[dict[str, Any]]: ...
@property @abc.abstractmethod def object_key(self) -> str: """Key used to connect objects in metadata. Legacy exports used 'hid' but associated objects may not be from the same history and a history may contain multiple objects with the same 'hid'. """ @property def file_source_root(self) -> Optional[str]: """Source of valid file data.""" return None
[docs] def trust_hid(self, obj_attrs: dict[str, Any]) -> bool: """Trust HID when importing objects into a new History.""" return ( self.import_history_encoded_id is not None and obj_attrs.get("history_encoded_id") == self.import_history_encoded_id )
[docs] @contextlib.contextmanager def target_history( self, default_history: Optional[model.History] = None, legacy_history_naming: bool = True ) -> Iterator[Optional[model.History]]: new_history = None if self.defines_new_history(): history_properties = self.new_history_properties() history_name = history_properties.get("name") if history_name and legacy_history_naming: history_name = f"imported from archive: {history_name}" elif history_name: pass # history_name = history_name else: history_name = "unnamed imported history" # Create history. new_history = model.History(name=history_name, user=self.user) new_history.importing = True hid_counter = history_properties.get("hid_counter") genome_build = history_properties.get("genome_build") # TODO: This seems like it shouldn't be imported, try to test and verify we can calculate this # and get away without it. -John if hid_counter: new_history.hid_counter = hid_counter if genome_build: new_history.genome_build = genome_build self._session_add(new_history) self._flush() if self.user: add_item_annotation(self.sa_session, self.user, new_history, history_properties.get("annotation")) history: Optional[model.History] = new_history else: history = default_history yield history if new_history is not None: # Done importing. new_history.importing = False self._flush()
[docs] def perform_import( self, history: Optional[model.History] = None, new_history: bool = False, job: Optional[model.Job] = None ) -> "ObjectImportTracker": object_import_tracker = ObjectImportTracker() datasets_attrs = self.datasets_properties() collections_attrs = self.collections_properties() self._import_datasets(object_import_tracker, datasets_attrs, history, new_history, job) self._import_dataset_copied_associations(object_import_tracker, datasets_attrs) self._import_libraries(object_import_tracker) self._import_collection_instances(object_import_tracker, collections_attrs, history, new_history) self._import_collection_implicit_input_associations(object_import_tracker, collections_attrs) self._import_collection_copied_associations(object_import_tracker, collections_attrs) self._import_implicit_dataset_conversions(object_import_tracker) self._reassign_hids(object_import_tracker, history) self._import_jobs(object_import_tracker, history) self._import_implicit_collection_jobs(object_import_tracker) self._import_workflow_invocations(object_import_tracker, history) self._flush() return object_import_tracker
def _attach_dataset_hashes( self, dataset_or_file_attrs: dict[str, Any], dataset_instance: model.DatasetInstance, ) -> None: if "hashes" in dataset_or_file_attrs: for hash_attrs in dataset_or_file_attrs["hashes"]: hash_obj = model.DatasetHash() hash_obj.hash_value = hash_attrs["hash_value"] hash_obj.hash_function = hash_attrs["hash_function"] hash_obj.extra_files_path = hash_attrs["extra_files_path"] dataset_instance.dataset.hashes.append(hash_obj) def _attach_dataset_sources( self, dataset_or_file_attrs: dict[str, Any], dataset_instance: model.DatasetInstance, ) -> None: if "sources" in dataset_or_file_attrs: for source_attrs in dataset_or_file_attrs["sources"]: source_obj = model.DatasetSource() source_obj.source_uri = source_attrs["source_uri"] transform_actions = source_attrs["transform"] recorded_requested_transform = "requested_transform" in source_attrs if recorded_requested_transform: source_obj.requested_transform = source_attrs["requested_transform"] if dataset_instance.state != "deferred": source_obj.transform = transform_actions else: # legacy transform actions - if this is a deferred dataset treat as requested # transform actions if dataset_instance.state == "deferred": source_obj.requested_transform = transform_actions or [] else: source_obj.transform = transform_actions source_obj.extra_files_path = source_attrs["extra_files_path"] for hash_attrs in source_attrs["hashes"]: hash_obj = model.DatasetSourceHash() hash_obj.hash_value = hash_attrs["hash_value"] hash_obj.hash_function = hash_attrs["hash_function"] source_obj.hashes.append(hash_obj) dataset_instance.dataset.sources.append(source_obj) def _import_datasets( self, object_import_tracker: "ObjectImportTracker", datasets_attrs: list[dict[str, Any]], history: Optional[model.History], new_history: bool, job: Optional[model.Job], ) -> None: object_key = self.object_key def handle_dataset_object_edit(dataset_instance, dataset_attrs): if "dataset" in dataset_attrs: assert self.import_options.allow_dataset_object_edit dataset_attributes = DatasetAttributeImportModel(**dataset_attrs["dataset"]).model_dump( exclude_unset=True, ) for attribute, value in dataset_attributes.items(): setattr(dataset_instance.dataset, attribute, value) if dataset_instance.dataset.purged: dataset_instance.dataset.full_delete() self._attach_dataset_hashes(dataset_attrs["dataset"], dataset_instance) self._attach_dataset_sources(dataset_attrs["dataset"], dataset_instance) if "id" in dataset_attrs["dataset"] and self.import_options.allow_edit: dataset_instance.dataset.id = dataset_attrs["dataset"]["id"] for dataset_association in dataset_instance.dataset.history_associations: if ( dataset_association is not dataset_instance and dataset_association.extension == dataset_instance.extension or dataset_association.extension == "auto" ): copy_dataset_instance_metadata_attributes( source=dataset_instance, target=dataset_association ) if job: dataset_instance.dataset.job_id = job.id for dataset_attrs in datasets_attrs: if "state" not in dataset_attrs: self.dataset_state_serialized = False if "id" in dataset_attrs and self.import_options.allow_edit and not self.sessionless: model_class = getattr(model, dataset_attrs["model_class"]) dataset_instance = self.sa_session.get(model_class, dataset_attrs["id"]) assert isinstance(dataset_instance, model.DatasetInstance) attributes = [ "name", "extension", "info", "blurb", "peek", "designation", "visible", "metadata", "tool_version", "validated_state", "validated_state_message", ] for attribute in attributes: if attribute in dataset_attrs: value = dataset_attrs[attribute] if attribute == "metadata": value = replace_metadata_file(value, dataset_instance, self.sa_session) setattr(dataset_instance, attribute, value) handle_dataset_object_edit(dataset_instance, dataset_attrs) else: metadata_deferred = dataset_attrs.get("metadata_deferred", False) metadata = dataset_attrs.get("metadata") if metadata is None and not metadata_deferred: raise MalformedContents("metadata_deferred must be true if no metadata found in dataset attributes") if metadata is None: metadata = {"dbkey": "?"} model_class = dataset_attrs.get("model_class", "HistoryDatasetAssociation") if model_class == "HistoryDatasetAssociation": # Check if this HDA should reuse a dataset from a copied-from HDA reuse_dataset = None copied_from_chain = dataset_attrs.get("copied_from_history_dataset_association_id_chain", []) if copied_from_chain: # Look for the source HDA in the current import set copied_from_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdas_by_key) if copied_from_key and copied_from_key in object_import_tracker.hdas_by_key: source_hda = object_import_tracker.hdas_by_key[copied_from_key] # Reuse the dataset from the source HDA reuse_dataset = source_hda.dataset # Create dataset and HDA. dataset_instance = model.HistoryDatasetAssociation( name=dataset_attrs["name"], extension=dataset_attrs["extension"], info=dataset_attrs["info"], blurb=dataset_attrs["blurb"], peek=dataset_attrs["peek"], designation=dataset_attrs["designation"], visible=dataset_attrs["visible"], deleted=dataset_attrs.get("deleted", False), dbkey=metadata["dbkey"], tool_version=metadata.get("tool_version"), metadata_deferred=metadata_deferred, history=history, create_dataset=reuse_dataset is None, dataset=reuse_dataset, flush=False, sa_session=self.sa_session, ) dataset_instance._metadata = metadata elif model_class == "LibraryDatasetDatasetAssociation": # Create dataset and LDDA. dataset_instance = model.LibraryDatasetDatasetAssociation( name=dataset_attrs["name"], extension=dataset_attrs["extension"], info=dataset_attrs["info"], blurb=dataset_attrs["blurb"], peek=dataset_attrs["peek"], designation=dataset_attrs["designation"], visible=dataset_attrs["visible"], deleted=dataset_attrs.get("deleted", False), dbkey=metadata["dbkey"], tool_version=metadata.get("tool_version"), metadata_deferred=metadata_deferred, user=self.user, create_dataset=True, flush=False, sa_session=self.sa_session, ) else: raise Exception("Unknown dataset instance type encountered") metadata = replace_metadata_file(metadata, dataset_instance, self.sa_session) if self.sessionless: dataset_instance._metadata_collection = MetadataCollection( dataset_instance, session=self.sa_session ) dataset_instance._metadata = metadata else: dataset_instance.metadata = metadata self._attach_raw_id_if_editing(dataset_instance, dataset_attrs) # Older style... if self.import_options.allow_edit: if "uuid" in dataset_attrs: dataset_instance.dataset.uuid = dataset_attrs["uuid"] if "dataset_uuid" in dataset_attrs: dataset_instance.dataset.uuid = dataset_attrs["dataset_uuid"] self._session_add(dataset_instance) if model_class == "HistoryDatasetAssociation": hda = cast(model.HistoryDatasetAssociation, dataset_instance) # don't use add_history to manage HID handling across full import to try to preserve # HID structure. hda.history = history if new_history and self.trust_hid(dataset_attrs): hda.hid = dataset_attrs["hid"] else: object_import_tracker.requires_hid.append(hda) else: pass # ldda = cast(model.LibraryDatasetDatasetAssociation, dataset_instance) # ldda.user = self.user file_source_root = self.file_source_root # If dataset is in the dictionary - we will assert this dataset is tied to the Galaxy instance # and the import options are configured for allowing editing the dataset (e.g. for metadata setting). # Otherwise, we will check for "file" information instead of dataset information - currently this includes # "file_name", "extra_files_path". if "dataset" in dataset_attrs: handle_dataset_object_edit(dataset_instance, dataset_attrs) else: file_name = dataset_attrs.get("file_name") if file_name: assert file_source_root # Do security check and move/copy dataset data. archive_path = os.path.abspath(os.path.join(file_source_root, file_name)) if os.path.islink(archive_path): raise MalformedContents(f"Invalid dataset path: {archive_path}") temp_dataset_file_name = os.path.realpath(archive_path) if not in_directory(temp_dataset_file_name, file_source_root): raise MalformedContents(f"Invalid dataset path: {temp_dataset_file_name}") has_good_source = False file_metadata = dataset_attrs.get("file_metadata") or {} if "sources" in file_metadata: for source_attrs in file_metadata["sources"]: extra_files_path = source_attrs["extra_files_path"] if extra_files_path is None: has_good_source = True discarded_data = self.import_options.discarded_data dataset_state = dataset_attrs.get("state", dataset_instance.states.OK) if dataset_state == dataset_instance.states.DEFERRED: dataset_instance.state = dataset_instance.states.DEFERRED dataset_instance.deleted = False if isinstance(dataset_instance, model.HistoryDatasetAssociation): dataset_instance.purged = False assert dataset_instance.dataset dataset_instance.dataset.deleted = False dataset_instance.dataset.purged = False elif ( not file_name or not os.path.exists(temp_dataset_file_name) or discarded_data is ImportDiscardedDataType.FORCE ): is_discarded = not has_good_source target_state = ( dataset_instance.states.DISCARDED if is_discarded else dataset_instance.states.DEFERRED ) dataset_instance.state = target_state deleted = is_discarded and (discarded_data == ImportDiscardedDataType.FORBID) dataset_instance.deleted = deleted if isinstance(dataset_instance, model.HistoryDatasetAssociation): dataset_instance.purged = deleted assert dataset_instance.dataset dataset_instance.dataset.state = target_state dataset_instance.dataset.deleted = deleted dataset_instance.dataset.purged = deleted else: dataset_instance.state = dataset_state if not self.object_store: raise Exception(f"self.object_store is missing from {self}.") if not dataset_instance.dataset.purged: self.object_store.update_from_file( dataset_instance.dataset, file_name=temp_dataset_file_name, create=True ) # Import additional files if present. Histories exported previously might not have this attribute set. dataset_extra_files_path = dataset_attrs.get("extra_files_path", None) if dataset_extra_files_path: assert file_source_root dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path) persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance) # Only trust file size if the dataset is purged. If we keep the data we should check the file size. dataset_instance.dataset.file_size = None dataset_instance.dataset.set_total_size() # update the filesize record in the database if dataset_instance.deleted: dataset_instance.dataset.deleted = True self._attach_dataset_hashes(file_metadata, dataset_instance) self._attach_dataset_sources(file_metadata, dataset_instance) if "created_from_basename" in file_metadata: dataset_instance.dataset.created_from_basename = file_metadata["created_from_basename"] if model_class == "HistoryDatasetAssociation" and self.user: add_item_annotation(self.sa_session, self.user, dataset_instance, dataset_attrs["annotation"]) tag_list = dataset_attrs.get("tags") if tag_list: if not self.tag_handler: raise Exception(f"Missing self.tag_handler on {self}.") self.tag_handler.set_tags_from_list( user=self.user, item=dataset_instance, new_tags_list=tag_list, flush=False ) if self.app: # If dataset instance is discarded or deferred, don't attempt to regenerate # metadata for it. if dataset_instance.state == dataset_instance.states.OK: regenerate_kwds: dict[str, Any] = {} if job: regenerate_kwds["user"] = job.user regenerate_kwds["session_id"] = job.session_id elif history: user = history.user regenerate_kwds["user"] = user if user is None and history.galaxy_sessions[0]: regenerate_kwds["session_id"] = history.galaxy_sessions[0].galaxy_session.id else: regenerate_kwds["session_id"] = None else: # Need a user to run library jobs to generate metadata... pass if not self.import_options.allow_edit: # external import, metadata files need to be regenerated (as opposed to extended metadata dataset import) if self.app.datatypes_registry.set_external_metadata_tool: self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed( dataset_instance, history, **regenerate_kwds ) else: # Try to set metadata directly. @mvdbeek thinks we should only record the datasets try: if dataset_instance.has_metadata_files: dataset_instance.datatype.set_meta(dataset_instance) except Exception: log.debug(f"Metadata setting failed on {dataset_instance}", exc_info=True) dataset_instance.state = dataset_instance.dataset.states.FAILED_METADATA if model_class == "HistoryDatasetAssociation": if not isinstance(dataset_instance, model.HistoryDatasetAssociation): raise Exception( "Mismatch between model class and Python class, " f"expected HistoryDatasetAssociation, got a {type(dataset_instance)}: {dataset_instance}" ) if object_key in dataset_attrs: object_import_tracker.hdas_by_key[dataset_attrs[object_key]] = dataset_instance else: assert "id" in dataset_attrs object_import_tracker.hdas_by_id[dataset_attrs["id"]] = dataset_instance else: if not isinstance(dataset_instance, model.LibraryDatasetDatasetAssociation): raise Exception( "Mismatch between model class and Python class, " f"expected LibraryDatasetDatasetAssociation, got a {type(dataset_instance)}: {dataset_instance}" ) if object_key in dataset_attrs: object_import_tracker.lddas_by_key[dataset_attrs[object_key]] = dataset_instance else: assert "id" in dataset_attrs object_import_tracker.lddas_by_key[dataset_attrs["id"]] = dataset_instance def _import_libraries(self, object_import_tracker: "ObjectImportTracker") -> None: object_key = self.object_key def import_folder(folder_attrs, root_folder=None): if root_folder: library_folder = root_folder else: name = folder_attrs["name"] description = folder_attrs["description"] genome_build = folder_attrs["genome_build"] deleted = folder_attrs["deleted"] library_folder = model.LibraryFolder(name=name, description=description, genome_build=genome_build) library_folder.deleted = deleted self._session_add(library_folder) for sub_folder_attrs in folder_attrs.get("folders", []): sub_folder = import_folder(sub_folder_attrs) library_folder.add_folder(sub_folder) for ld_attrs in folder_attrs.get("datasets", []): ld = model.LibraryDataset( folder=library_folder, name=ld_attrs["name"], info=ld_attrs["info"], order_id=ld_attrs["order_id"] ) if "ldda" in ld_attrs: ldda = object_import_tracker.lddas_by_key[ld_attrs["ldda"][object_key]] ld.library_dataset_dataset_association = ldda self._session_add(ld) self.sa_session.commit() return library_folder libraries_attrs = self.library_properties() for library_attrs in libraries_attrs: if ( library_attrs["model_class"] == "LibraryFolder" and library_attrs.get("id") and not self.sessionless and self.import_options.allow_edit ): library_folder = self.sa_session.get(model.LibraryFolder, library_attrs["id"]) import_folder(library_attrs, root_folder=library_folder) else: assert self.import_options.allow_library_creation name = library_attrs["name"] description = library_attrs["description"] synopsis = library_attrs["synopsis"] library = model.Library(name=name, description=description, synopsis=synopsis) self._session_add(library) object_import_tracker.libraries_by_key[library_attrs[object_key]] = library if "root_folder" in library_attrs: library.root_folder = import_folder(library_attrs["root_folder"]) def _import_collection_instances( self, object_import_tracker: "ObjectImportTracker", collections_attrs: list[dict[str, Any]], history: Optional[model.History], new_history: bool, ) -> None: object_key = self.object_key def import_collection(collection_attrs): def materialize_elements(dc): if "elements" not in collection_attrs: return elements_attrs = collection_attrs["elements"] for element_attrs in elements_attrs: dce = model.DatasetCollectionElement( collection=dc, element=model.DatasetCollectionElement.UNINITIALIZED_ELEMENT, element_index=element_attrs["element_index"], element_identifier=element_attrs["element_identifier"], columns=element_attrs.get("columns"), ) if "encoded_id" in element_attrs: object_import_tracker.dces_by_key[element_attrs["encoded_id"]] = dce if "hda" in element_attrs: hda_attrs = element_attrs["hda"] if object_key in hda_attrs: hda_key = hda_attrs[object_key] hdas_by_key = object_import_tracker.hdas_by_key if hda_key in hdas_by_key: hda = hdas_by_key[hda_key] else: raise KeyError( f"Failed to find exported hda with key [{hda_key}] of type [{object_key}] in [{hdas_by_key}]" ) else: hda_id = hda_attrs["id"] hdas_by_id = object_import_tracker.hdas_by_id if hda_id not in hdas_by_id: raise Exception(f"Failed to find HDA with id [{hda_id}] in [{hdas_by_id}]") hda = hdas_by_id[hda_id] dce.hda = hda elif "child_collection" in element_attrs: dce.child_collection = import_collection(element_attrs["child_collection"]) else: raise Exception("Unknown collection element type encountered.") dc.element_count = len(elements_attrs) if "id" in collection_attrs and self.import_options.allow_edit and not self.sessionless: dc = self.sa_session.get(model.DatasetCollection, collection_attrs["id"]) attributes = [ "collection_type", "populated_state", "populated_state_message", "column_definitions", "element_count", ] for attribute in attributes: if attribute in collection_attrs: setattr(dc, attribute, collection_attrs.get(attribute)) materialize_elements(dc) else: # create collection dc = model.DatasetCollection(collection_type=collection_attrs["type"]) dc.populated_state = collection_attrs["populated_state"] dc.populated_state_message = collection_attrs.get("populated_state_message") dc.column_definitions = collection_attrs.get("column_definitions") self._attach_raw_id_if_editing(dc, collection_attrs) materialize_elements(dc) self._session_add(dc) return dc history_sa_session = get_object_session(history) for collection_attrs in collections_attrs: if "collection" in collection_attrs: dc = import_collection(collection_attrs["collection"]) if "id" in collection_attrs and self.import_options.allow_edit and not self.sessionless: hdca = self.sa_session.get(model.HistoryDatasetCollectionAssociation, collection_attrs["id"]) assert hdca is not None # TODO: edit attributes... else: hdca = model.HistoryDatasetCollectionAssociation( collection=dc, visible=True, name=collection_attrs["display_name"], implicit_output_name=collection_attrs.get("implicit_output_name"), ) self._attach_raw_id_if_editing(hdca, collection_attrs) add_object_to_session(hdca, history_sa_session) hdca.history = history if new_history and self.trust_hid(collection_attrs): hdca.hid = collection_attrs["hid"] else: object_import_tracker.requires_hid.append(hdca) self._session_add(hdca) if object_key in collection_attrs: object_import_tracker.hdcas_by_key[collection_attrs[object_key]] = hdca else: assert "id" in collection_attrs object_import_tracker.hdcas_by_id[collection_attrs["id"]] = hdca else: import_collection(collection_attrs) def _attach_raw_id_if_editing( self, obj: model.RepresentById, attrs: dict[str, Any], ) -> None: if self.sessionless and "id" in attrs and self.import_options.allow_edit: obj.id = attrs["id"] def _import_collection_implicit_input_associations( self, object_import_tracker: "ObjectImportTracker", collections_attrs: list[dict[str, Any]] ) -> None: object_key = self.object_key for collection_attrs in collections_attrs: if "id" in collection_attrs: # Existing object, not a new one, this property is immutable via model stores currently. continue hdca = object_import_tracker.hdcas_by_key[collection_attrs[object_key]] if "implicit_input_collections" in collection_attrs: implicit_input_collections = collection_attrs["implicit_input_collections"] for implicit_input_collection in implicit_input_collections: name = implicit_input_collection["name"] input_collection_identifier = implicit_input_collection["input_dataset_collection"] if input_collection_identifier in object_import_tracker.hdcas_by_key: input_dataset_collection = object_import_tracker.hdcas_by_key[input_collection_identifier] hdca.add_implicit_input_collection(name, input_dataset_collection) def _import_dataset_copied_associations( self, object_import_tracker: "ObjectImportTracker", datasets_attrs: list[dict[str, Any]] ) -> None: object_key = self.object_key # Re-establish copied_from_history_dataset_association relationships so history extraction # has a greater chance of working in this history, for reproducibility. for dataset_attrs in datasets_attrs: if "id" in dataset_attrs: # Existing object, not a new one, this property is not immutable via model stores currently. continue dataset_key = dataset_attrs[object_key] if dataset_key not in object_import_tracker.hdas_by_key: continue hda = object_import_tracker.hdas_by_key[dataset_key] copied_from_chain = dataset_attrs.get("copied_from_history_dataset_association_id_chain", []) copied_from_object_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdas_by_key) if not copied_from_object_key: continue # Re-establish the chain if we can. if copied_from_object_key in object_import_tracker.hdas_by_key: hda.copied_from_history_dataset_association = object_import_tracker.hdas_by_key[copied_from_object_key] else: # We're at the end of the chain and this HDA was copied from an HDA # outside the history. So when we find this job and are looking for inputs/outputs # attach to this node... unless we've already encountered another dataset # copied from that jobs output... in that case we are going to cheat and # say this dataset was copied from that one. It wasn't in the original Galaxy # instance but I think it is fine to pretend in order to create a DAG here. hda_copied_from_sinks = object_import_tracker.hda_copied_from_sinks if copied_from_object_key in hda_copied_from_sinks: hda.copied_from_history_dataset_association = object_import_tracker.hdas_by_key[ hda_copied_from_sinks[copied_from_object_key] ] else: hda_copied_from_sinks[copied_from_object_key] = dataset_key def _import_collection_copied_associations( self, object_import_tracker: "ObjectImportTracker", collections_attrs: list[dict[str, Any]] ) -> None: object_key = self.object_key # Re-establish copied_from_history_dataset_collection_association relationships so history extraction # has a greater chance of working in this history, for reproducibility. Very similar to HDA code above # see comments there. for collection_attrs in collections_attrs: if "id" in collection_attrs: # Existing object, not a new one, this property is immutable via model stores currently. continue dataset_collection_key = collection_attrs[object_key] if dataset_collection_key not in object_import_tracker.hdcas_by_key: continue hdca = object_import_tracker.hdcas_by_key[dataset_collection_key] copied_from_chain = collection_attrs.get("copied_from_history_dataset_collection_association_id_chain", []) copied_from_object_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdcas_by_key) if not copied_from_object_key: continue # Re-establish the chain if we can, again see comments for hdas above for this to make more # sense. hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks if copied_from_object_key in object_import_tracker.hdcas_by_key: source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key] if source_hdca is not hdca: # We may not have the copied source, in which case the first included HDCA in the chain # acts as the source, so here we make sure we don't create a cycle. hdca.copied_from_history_dataset_collection_association = source_hdca else: if copied_from_object_key in hdca_copied_from_sinks: source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]] if source_hdca is not hdca: hdca.copied_from_history_dataset_collection_association = source_hdca else: hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key def _reassign_hids(self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History]) -> None: # assign HIDs for newly created objects that didn't match original history requires_hid = object_import_tracker.requires_hid requires_hid_len = len(requires_hid) if requires_hid_len > 0 and not self.sessionless: if not history: raise Exception("Optional history is required here.") for obj in requires_hid: history.stage_addition(obj) history.add_pending_items() if object_import_tracker.copy_hid_for: # in an if to avoid flush if unneeded for from_dataset, to_dataset in object_import_tracker.copy_hid_for: to_dataset.hid = from_dataset.hid self._session_add(to_dataset) self._flush() def _import_workflow_invocations( self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History] ) -> None: # # Create jobs. # object_key = self.object_key for workflow_key, workflow_path in self.workflow_paths(): workflows_directory = os.path.join(self.archive_dir, "workflows") if not self.app: raise Exception(f"Missing require self.app in {self}.") workflow = self.app.workflow_contents_manager.read_workflow_from_path( self.app, self.user, workflow_path, allow_in_directory=workflows_directory ) object_import_tracker.workflows_by_key[workflow_key] = workflow invocations_attrs = self.invocations_properties() for invocation_attrs in invocations_attrs: assert not self.import_options.allow_edit imported_invocation = model.WorkflowInvocation() imported_invocation.history = history ensure_object_added_to_session(imported_invocation, object_in_session=history) workflow_key = invocation_attrs["workflow"] if workflow_key not in object_import_tracker.workflows_by_key: raise Exception(f"Failed to find key {workflow_key} in {object_import_tracker.workflows_by_key.keys()}") workflow = object_import_tracker.workflows_by_key[workflow_key] imported_invocation.workflow = workflow state = invocation_attrs["state"] if state in model.WorkflowInvocation.non_terminal_states: state = model.WorkflowInvocation.states.CANCELLED imported_invocation.state = state restore_times(imported_invocation, invocation_attrs) self._session_add(imported_invocation) self._flush() def attach_workflow_step(imported_object, attrs): order_index = attrs["order_index"] imported_object.workflow_step = workflow.step_by_index(order_index) # noqa: B023 for step_attrs in invocation_attrs["steps"]: imported_invocation_step = model.WorkflowInvocationStep() imported_invocation_step.workflow_invocation = imported_invocation ensure_object_added_to_session(imported_invocation_step, session=self.sa_session) attach_workflow_step(imported_invocation_step, step_attrs) restore_times(imported_invocation_step, step_attrs) imported_invocation_step.action = step_attrs["action"] # TODO: ensure terminal... imported_invocation_step.state = step_attrs["state"] if "job" in step_attrs: job = object_import_tracker.jobs_by_key[step_attrs["job"][object_key]] imported_invocation_step.job = job ensure_object_added_to_session(imported_invocation_step, object_in_session=job) elif "implicit_collection_jobs" in step_attrs: icj = object_import_tracker.implicit_collection_jobs_by_key[ step_attrs["implicit_collection_jobs"][object_key] ] imported_invocation_step.implicit_collection_jobs = icj # TODO: handle step outputs... output_dicts = step_attrs["outputs"] step_outputs = [] for output_dict in output_dicts: step_output = model.WorkflowInvocationStepOutputDatasetAssociation() step_output.output_name = output_dict["output_name"] dataset_link_attrs = output_dict["dataset"] if dataset_link_attrs: dataset = object_import_tracker.find_hda(dataset_link_attrs[object_key]) assert dataset step_output.dataset = dataset step_outputs.append(step_output) imported_invocation_step.output_datasets = step_outputs output_collection_dicts = step_attrs["output_collections"] step_output_collections = [] for output_collection_dict in output_collection_dicts: step_output_collection = model.WorkflowInvocationStepOutputDatasetCollectionAssociation() step_output_collection.output_name = output_collection_dict["output_name"] dataset_collection_link_attrs = output_collection_dict["dataset_collection"] if dataset_collection_link_attrs: dataset_collection = object_import_tracker.find_hdca(dataset_collection_link_attrs[object_key]) assert dataset_collection step_output_collection.dataset_collection = dataset_collection step_output_collections.append(step_output_collection) imported_invocation_step.output_dataset_collections = step_output_collections input_parameters = [] for input_parameter_attrs in invocation_attrs["input_parameters"]: input_parameter = model.WorkflowRequestInputParameter() input_parameter.value = input_parameter_attrs["value"] input_parameter.name = input_parameter_attrs["name"] input_parameter.type = input_parameter_attrs["type"] input_parameter.workflow_invocation = imported_invocation self._session_add(input_parameter) input_parameters.append(input_parameter) # invocation_attrs["input_parameters"] = input_parameters step_states = [] for step_state_attrs in invocation_attrs["step_states"]: step_state = model.WorkflowRequestStepState() step_state.value = step_state_attrs["value"] attach_workflow_step(step_state, step_state_attrs) step_state.workflow_invocation = imported_invocation self._session_add(step_state) step_states.append(step_state) input_step_parameters = [] for input_step_parameter_attrs in invocation_attrs["input_step_parameters"]: input_step_parameter = model.WorkflowRequestInputStepParameter() input_step_parameter.parameter_value = input_step_parameter_attrs["parameter_value"] attach_workflow_step(input_step_parameter, input_step_parameter_attrs) input_step_parameter.workflow_invocation = imported_invocation self._session_add(input_step_parameter) input_step_parameters.append(input_step_parameter) input_datasets = [] for input_dataset_attrs in invocation_attrs["input_datasets"]: input_dataset = model.WorkflowRequestToInputDatasetAssociation() attach_workflow_step(input_dataset, input_dataset_attrs) input_dataset.workflow_invocation = imported_invocation input_dataset.name = input_dataset_attrs["name"] dataset_link_attrs = input_dataset_attrs["dataset"] if dataset_link_attrs: dataset = object_import_tracker.find_hda(dataset_link_attrs[object_key]) assert dataset input_dataset.dataset = dataset self._session_add(input_dataset) input_datasets.append(input_dataset) input_dataset_collections = [] for input_dataset_collection_attrs in invocation_attrs["input_dataset_collections"]: input_dataset_collection = model.WorkflowRequestToInputDatasetCollectionAssociation() attach_workflow_step(input_dataset_collection, input_dataset_collection_attrs) input_dataset_collection.workflow_invocation = imported_invocation input_dataset_collection.name = input_dataset_collection_attrs["name"] dataset_collection_link_attrs = input_dataset_collection_attrs["dataset_collection"] if dataset_collection_link_attrs: dataset_collection = object_import_tracker.find_hdca(dataset_collection_link_attrs[object_key]) assert dataset_collection input_dataset_collection.dataset_collection = dataset_collection self._session_add(input_dataset_collection) input_dataset_collections.append(input_dataset_collection) output_dataset_collections = [] for output_dataset_collection_attrs in invocation_attrs["output_dataset_collections"]: output_dataset_collection = model.WorkflowInvocationOutputDatasetCollectionAssociation() output_dataset_collection.workflow_invocation = imported_invocation attach_workflow_step(output_dataset_collection, output_dataset_collection_attrs) workflow_output = output_dataset_collection_attrs["workflow_output"] label = workflow_output.get("label") workflow_output = workflow.workflow_output_for(label) output_dataset_collection.workflow_output = workflow_output output_dataset_collection_attrs = output_dataset_collection_attrs.get("dataset_collection") or {} if hdca_id := output_dataset_collection_attrs.get(object_key): hdca = object_import_tracker.find_hdca(hdca_id) if hdca: output_dataset_collection.dataset_collection = hdca self._session_add(output_dataset_collection) output_dataset_collections.append(output_dataset_collection) output_datasets = [] for output_dataset_attrs in invocation_attrs["output_datasets"]: output_dataset = model.WorkflowInvocationOutputDatasetAssociation() output_dataset.workflow_invocation = imported_invocation attach_workflow_step(output_dataset, output_dataset_attrs) workflow_output = output_dataset_attrs["workflow_output"] label = workflow_output.get("label") workflow_output = workflow.workflow_output_for(label) output_dataset.workflow_output = workflow_output output_dataset_dataset_attrs = output_dataset_attrs.get("dataset") or {} if dataset_id := output_dataset_dataset_attrs.get(object_key): dataset = object_import_tracker.find_hda(dataset_id) if dataset: output_dataset.dataset = dataset self._session_add(output_dataset) output_datasets.append(output_dataset) output_values = [] for output_value_attrs in invocation_attrs["output_values"]: output_value = model.WorkflowInvocationOutputValue() output_value.workflow_invocation = imported_invocation output_value.value = output_value_attrs["value"] attach_workflow_step(output_value, output_value_attrs) workflow_output = output_value_attrs["workflow_output"] label = workflow_output.get("label") workflow_output = workflow.workflow_output_for(label) output_value.workflow_output = workflow_output self._session_add(output_value) output_values.append(output_value) if object_key in invocation_attrs: object_import_tracker.invocations_by_key[invocation_attrs[object_key]] = imported_invocation def _import_jobs(self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History]) -> None: self._flush() object_key = self.object_key _find_hda = object_import_tracker.find_hda _find_hdca = object_import_tracker.find_hdca _find_dce = object_import_tracker.find_dce # # Create jobs. # jobs_attrs = self.jobs_properties() # Create each job. history_sa_session = get_object_session(history) for job_attrs in jobs_attrs: if "id" in job_attrs and not self.sessionless: # only thing we allow editing currently is associations for incoming jobs. assert self.import_options.allow_edit job = self.sa_session.get(model.Job, job_attrs["id"]) self._connect_job_io(job, job_attrs, _find_hda, _find_hdca, _find_dce) # type: ignore[attr-defined] self._set_job_attributes(job, job_attrs, force_terminal=False) # type: ignore[attr-defined] # Don't edit job continue imported_job = model.Job() imported_job.id = cast(int, job_attrs.get("id")) imported_job.user = self.user add_object_to_session(imported_job, history_sa_session) imported_job.history = history imported_job.imported = True imported_job.tool_id = job_attrs["tool_id"] imported_job.tool_version = job_attrs["tool_version"] self._set_job_attributes(imported_job, job_attrs, force_terminal=True) # type: ignore[attr-defined] restore_times(imported_job, job_attrs) self._session_add(imported_job) # Connect jobs to input and output datasets. params = self._normalize_job_parameters(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) # type: ignore[attr-defined] for name, value in params.items(): # Transform parameter values when necessary. imported_job.add_parameter(name, dumps(value)) self._connect_job_io(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) # type: ignore[attr-defined] if object_key in job_attrs: object_import_tracker.jobs_by_key[job_attrs[object_key]] = imported_job def _import_implicit_dataset_conversions(self, object_import_tracker: "ObjectImportTracker") -> None: implicit_dataset_conversion_attrs = self.implicit_dataset_conversion_properties() for idc_attrs in implicit_dataset_conversion_attrs: # I don't know what metadata_safe does per se... should we copy this property or # just set it to False? metadata_safe = False idc = model.ImplicitlyConvertedDatasetAssociation(metadata_safe=metadata_safe, for_import=True) idc.type = idc_attrs["file_type"] # We may not have exported the parent, so only set the parent_hda attribute if we did. if (parent_hda_id := idc_attrs.get("parent_hda")) and ( parent_hda := object_import_tracker.hdas_by_key.get(parent_hda_id) ): # exports created prior to 24.2 may not have a parent if the parent had been purged idc.parent_hda = parent_hda if idc_attrs.get("hda"): idc.dataset = object_import_tracker.hdas_by_key[idc_attrs["hda"]] # we have the dataset and the parent, lets ensure they land up with the same HID if idc.dataset and idc.parent_hda: try: object_import_tracker.requires_hid.remove(idc.dataset) except ValueError: pass # we wanted to remove it anyway. # A HDA can be the parent of multiple implicitly converted dataset, # that's thy we use [(source, target)] here object_import_tracker.copy_hid_for.append((idc.parent_hda, idc.dataset)) self._session_add(idc) def _import_implicit_collection_jobs(self, object_import_tracker: "ObjectImportTracker") -> None: object_key = self.object_key implicit_collection_jobs_attrs = self.implicit_collection_jobs_properties() for icj_attrs in implicit_collection_jobs_attrs: icj = model.ImplicitCollectionJobs() icj.populated_state = icj_attrs["populated_state"] icj.jobs = [] for order_index, job in enumerate(icj_attrs["jobs"]): icja = model.ImplicitCollectionJobsJobAssociation() add_object_to_object_session(icja, icj) icja.implicit_collection_jobs = icj if job in object_import_tracker.jobs_by_key: job_instance = object_import_tracker.jobs_by_key[job] add_object_to_object_session(icja, job_instance) icja.job = job_instance icja.order_index = order_index icj.jobs.append(icja) self._session_add(icja) object_import_tracker.implicit_collection_jobs_by_key[icj_attrs[object_key]] = icj self._session_add(icj) def _session_add(self, obj: model.RepresentById) -> None: self.sa_session.add(obj) def _flush(self) -> None: self.sa_session.commit()
def _copied_from_object_key( copied_from_chain: list[ObjectKeyType], objects_by_key: Union[ dict[ObjectKeyType, model.HistoryDatasetAssociation], dict[ObjectKeyType, model.HistoryDatasetCollectionAssociation], ], ) -> Optional[ObjectKeyType]: if len(copied_from_chain) == 0: return None # Okay this gets fun, we need the last thing in the chain to reconnect jobs # from outside the history to inputs/outputs in this history but there may # be cycles in the chain that lead outside the original history, so just eliminate # all IDs not from this history except the last one. filtered_copied_from_chain = [] for i, copied_from_key in enumerate(copied_from_chain): filter_id = (i != len(copied_from_chain) - 1) and (copied_from_key not in objects_by_key) if not filter_id: filtered_copied_from_chain.append(copied_from_key) copied_from_chain = filtered_copied_from_chain if len(copied_from_chain) == 0: return None copied_from_object_key = copied_from_chain[0] return copied_from_object_key
[docs] class ObjectImportTracker: """Keep track of new and existing imported objects. Needed to re-establish connections and such in multiple passes. """ libraries_by_key: dict[ObjectKeyType, model.Library] hdas_by_key: dict[ObjectKeyType, model.HistoryDatasetAssociation] hdas_by_id: dict[int, model.HistoryDatasetAssociation] hdcas_by_key: dict[ObjectKeyType, model.HistoryDatasetCollectionAssociation] hdcas_by_id: dict[int, model.HistoryDatasetCollectionAssociation] dces_by_key: dict[ObjectKeyType, model.DatasetCollectionElement] dces_by_id: dict[int, model.DatasetCollectionElement] lddas_by_key: dict[ObjectKeyType, model.LibraryDatasetDatasetAssociation] hda_copied_from_sinks: dict[ObjectKeyType, ObjectKeyType] hdca_copied_from_sinks: dict[ObjectKeyType, ObjectKeyType] jobs_by_key: dict[ObjectKeyType, model.Job] requires_hid: list["HistoryItem"] copy_hid_for: list[tuple["HistoryItem", "HistoryItem"]]
[docs] def __init__(self) -> None: self.libraries_by_key = {} self.hdas_by_key = {} self.hdas_by_id = {} self.hdcas_by_key = {} self.hdcas_by_id = {} self.dces_by_key = {} self.dces_by_id = {} self.lddas_by_key = {} self.hda_copied_from_sinks = {} self.hdca_copied_from_sinks = {} self.jobs_by_key = {} self.invocations_by_key: dict[str, model.WorkflowInvocation] = {} self.implicit_collection_jobs_by_key: dict[str, ImplicitCollectionJobs] = {} self.workflows_by_key: dict[str, model.Workflow] = {} self.requires_hid = [] self.copy_hid_for = [] self.new_history: Optional[model.History] = None
[docs] def find_hda( self, input_key: ObjectKeyType, hda_id: Optional[int] = None ) -> Optional[model.HistoryDatasetAssociation]: hda = None if input_key in self.hdas_by_key: hda = self.hdas_by_key[input_key] elif isinstance(input_key, int) and input_key in self.hdas_by_id: # TODO: untangle this, I don't quite understand why we hdas_by_key and hdas_by_id hda = self.hdas_by_id[input_key] if input_key in self.hda_copied_from_sinks: hda = self.hdas_by_key[self.hda_copied_from_sinks[input_key]] return hda
[docs] def find_hdca(self, input_key: ObjectKeyType) -> Optional[model.HistoryDatasetCollectionAssociation]: hdca = None if input_key in self.hdcas_by_key: hdca = self.hdcas_by_key[input_key] elif isinstance(input_key, int) and input_key in self.hdcas_by_id: hdca = self.hdcas_by_id[input_key] if input_key in self.hdca_copied_from_sinks: hdca = self.hdcas_by_key[self.hdca_copied_from_sinks[input_key]] return hdca
[docs] def find_dce(self, input_key: ObjectKeyType) -> Optional[model.DatasetCollectionElement]: dce = None if input_key in self.dces_by_key: dce = self.dces_by_key[input_key] elif isinstance(input_key, int) and input_key in self.dces_by_id: dce = self.dces_by_id[input_key] return dce
[docs] class FileTracebackException(Exception):
[docs] def __init__(self, traceback: str, *args, **kwargs) -> None: self.traceback = traceback
[docs] def get_import_model_store_for_directory( archive_dir: str, **kwd ) -> Union["DirectoryImportModelStore1901", "DirectoryImportModelStoreLatest"]: traceback_file = os.path.join(archive_dir, TRACEBACK) if not os.path.isdir(archive_dir): raise Exception( f"Could not find import model store for directory [{archive_dir}] (full path [{os.path.abspath(archive_dir)}])" ) if os.path.exists(os.path.join(archive_dir, ATTRS_FILENAME_EXPORT)): if os.path.exists(traceback_file): with open(traceback_file) as tb: raise FileTracebackException(traceback=tb.read()) return DirectoryImportModelStoreLatest(archive_dir, **kwd) else: return DirectoryImportModelStore1901(archive_dir, **kwd)
[docs] class DictImportModelStore(ModelImportStore): object_key = "encoded_id"
[docs] def __init__( self, store_as_dict: dict[str, Any], **kwd, ) -> None: self._store_as_dict = store_as_dict super().__init__(**kwd) self.archive_dir = ""
[docs] def defines_new_history(self) -> bool: return DICT_STORE_ATTRS_KEY_HISTORY in self._store_as_dict
[docs] def new_history_properties(self) -> dict[str, Any]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_HISTORY) or {}
[docs] def datasets_properties( self, ) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_DATASETS) or []
[docs] def collections_properties(self) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_COLLECTIONS) or []
[docs] def implicit_dataset_conversion_properties(self) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_CONVERSIONS) or []
[docs] def library_properties( self, ) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_LIBRARIES) or []
[docs] def jobs_properties(self) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_JOBS) or []
[docs] def implicit_collection_jobs_properties(self) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_IMPLICIT_COLLECTION_JOBS) or []
[docs] def invocations_properties(self) -> list[dict[str, Any]]: return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_INVOCATIONS) or []
[docs] def workflow_paths(self) -> Iterator[tuple[str, str]]: return yield
[docs] def get_import_model_store_for_dict( as_dict: dict[str, Any], **kwd, ) -> DictImportModelStore: return DictImportModelStore(as_dict, **kwd)
[docs] class BaseDirectoryImportModelStore(ModelImportStore): @abc.abstractmethod def _normalize_job_parameters( self, imported_job: model.Job, job_attrs: dict[str, Any], _find_hda: Callable, _find_hdca: Callable, _find_dce: Callable, ) -> dict[str, Any]: ... @abc.abstractmethod def _connect_job_io( self, imported_job: model.Job, job_attrs: dict[str, Any], _find_hda: Callable, _find_hdca: Callable, _find_dce: Callable, ) -> None: ... @property def file_source_root(self) -> str: return self.archive_dir
[docs] def defines_new_history(self) -> bool: new_history_attributes = os.path.join(self.archive_dir, ATTRS_FILENAME_HISTORY) return os.path.exists(new_history_attributes)
[docs] def new_history_properties(self) -> dict[str, Any]: new_history_attributes = os.path.join(self.archive_dir, ATTRS_FILENAME_HISTORY) history_properties = load(open(new_history_attributes)) return history_properties
[docs] def datasets_properties(self) -> list[dict[str, Any]]: datasets_attrs_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_DATASETS) datasets_attrs = load(open(datasets_attrs_file_name)) provenance_file_name = f"{datasets_attrs_file_name}.provenance" if os.path.exists(provenance_file_name): provenance_attrs = load(open(provenance_file_name)) datasets_attrs += provenance_attrs return datasets_attrs
[docs] def collections_properties(self) -> list[dict[str, Any]]: return self._read_list_if_exists(ATTRS_FILENAME_COLLECTIONS)
[docs] def implicit_dataset_conversion_properties(self) -> list[dict[str, Any]]: return self._read_list_if_exists(ATTRS_FILENAME_CONVERSIONS)
[docs] def library_properties( self, ) -> list[dict[str, Any]]: libraries_attrs = self._read_list_if_exists(ATTRS_FILENAME_LIBRARIES) libraries_attrs.extend(self._read_list_if_exists(ATTRS_FILENAME_LIBRARY_FOLDERS)) return libraries_attrs
[docs] def jobs_properties( self, ) -> list[dict[str, Any]]: return self._read_list_if_exists(ATTRS_FILENAME_JOBS)
[docs] def implicit_collection_jobs_properties(self) -> list[dict[str, Any]]: implicit_collection_jobs_attrs_file_name = os.path.join( self.archive_dir, ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS ) try: return load(open(implicit_collection_jobs_attrs_file_name)) except FileNotFoundError: return []
[docs] def invocations_properties( self, ) -> list[dict[str, Any]]: return self._read_list_if_exists(ATTRS_FILENAME_INVOCATIONS)
[docs] def workflow_paths(self) -> Iterator[tuple[str, str]]: workflows_directory = os.path.join(self.archive_dir, "workflows") if not os.path.exists(workflows_directory): return for name in os.listdir(workflows_directory): if name.endswith(".ga") or name.endswith(".abstract.cwl") or name.endswith(".html"): continue assert name.endswith(".gxwf.yml") workflow_key = name[0 : -len(".gxwf.yml")] yield workflow_key, os.path.join(workflows_directory, name)
def _set_job_attributes( self, imported_job: model.Job, job_attrs: dict[str, Any], force_terminal: bool = False ) -> None: ATTRIBUTES = ( "info", "exit_code", "traceback", "job_messages", "tool_stdout", "tool_stderr", "job_stdout", "job_stderr", "galaxy_version", ) for attribute in ATTRIBUTES: value = job_attrs.get(attribute) if value is not None: setattr(imported_job, attribute, value) if "stdout" in job_attrs: imported_job.tool_stdout = job_attrs.get("stdout") imported_job.tool_stderr = job_attrs.get("stderr") raw_state = job_attrs.get("state") if force_terminal and raw_state and raw_state not in model.Job.terminal_states: raw_state = model.Job.states.ERROR if raw_state: imported_job.set_state(raw_state) def _read_list_if_exists(self, file_name: str, required: bool = False) -> list[dict[str, Any]]: file_name = os.path.join(self.archive_dir, file_name) if os.path.exists(file_name): attrs = load(open(file_name)) else: if required: raise Exception(f"Failed to find file [{file_name}] in model store archive") attrs = [] return attrs
[docs] def restore_times( model_object: Union[model.Job, model.WorkflowInvocation, model.WorkflowInvocationStep], attrs: dict[str, Any] ) -> None: try: model_object.create_time = datetime.datetime.strptime(attrs["create_time"], "%Y-%m-%dT%H:%M:%S.%f") except Exception: pass try: model_object.update_time = datetime.datetime.strptime(attrs["update_time"], "%Y-%m-%dT%H:%M:%S.%f") except Exception: pass
[docs] class DirectoryImportModelStore1901(BaseDirectoryImportModelStore): object_key = "hid"
[docs] def __init__(self, archive_dir: str, **kwd) -> None: archive_dir = os.path.realpath(archive_dir) # BioBlend previous to 17.01 exported histories with an extra subdir. if not os.path.exists(os.path.join(archive_dir, ATTRS_FILENAME_HISTORY)): for d in os.listdir(archive_dir): if os.path.isdir(os.path.join(archive_dir, d)): archive_dir = os.path.join(archive_dir, d) break self.archive_dir = archive_dir super().__init__(**kwd)
def _connect_job_io( self, imported_job: model.Job, job_attrs: dict[str, Any], _find_hda: Callable, _find_hdca: Callable, _find_dce: Callable, ) -> None: for output_key in job_attrs["output_datasets"]: output_hda = _find_hda(output_key) if output_hda: if not self.dataset_state_serialized: # dataset state has not been serialized, get state from job output_hda.state = imported_job.state imported_job.add_output_dataset(output_hda.name, output_hda) if "input_mapping" in job_attrs: for input_name, input_key in job_attrs["input_mapping"].items(): input_hda = _find_hda(input_key) if input_hda: imported_job.add_input_dataset(input_name, input_hda) def _normalize_job_parameters( self, imported_job: model.Job, job_attrs: dict[str, Any], _find_hda: Callable, _find_hdca: Callable, _find_dce: Callable, ) -> dict[str, Any]: def remap_objects(p, k, obj): if isinstance(obj, dict) and obj.get("__HistoryDatasetAssociation__", False): imported_hda = _find_hda(obj[self.object_key]) if imported_hda: return (k, {"src": "hda", "id": imported_hda.id}) return (k, obj) params = job_attrs["params"] params = remap(params, remap_objects) return params
[docs] def trust_hid(self, obj_attrs: dict[str, Any]) -> bool: # We didn't do object tracking so we pretty much have to trust the HID and accept # that it will be wrong a lot. return True
[docs] class DirectoryImportModelStoreLatest(BaseDirectoryImportModelStore): object_key = "encoded_id"
[docs] def __init__(self, archive_dir: str, **kwd) -> None: archive_dir = os.path.realpath(archive_dir) self.archive_dir = archive_dir super().__init__(**kwd)
def _connect_job_io( self, imported_job: model.Job, job_attrs: dict[str, Any], _find_hda: Callable, _find_hdca: Callable, _find_dce: Callable, ) -> None: if imported_job.command_line is None: imported_job.command_line = job_attrs.get("command_line") if "input_dataset_mapping" in job_attrs: for input_name, input_keys in job_attrs["input_dataset_mapping"].items(): input_keys = input_keys or [] for input_key in input_keys: input_hda = _find_hda(input_key) if input_hda: imported_job.add_input_dataset(input_name, input_hda) if "input_dataset_collection_mapping" in job_attrs: for input_name, input_keys in job_attrs["input_dataset_collection_mapping"].items(): input_keys = input_keys or [] for input_key in input_keys: input_hdca = _find_hdca(input_key) if input_hdca: imported_job.add_input_dataset_collection(input_name, input_hdca) if "input_dataset_collection_element_mapping" in job_attrs: for input_name, input_keys in job_attrs["input_dataset_collection_element_mapping"].items(): input_keys = input_keys or [] for input_key in input_keys: input_dce = _find_dce(input_key) if input_dce: imported_job.add_input_dataset_collection_element(input_name, input_dce) if "output_dataset_mapping" in job_attrs: for output_name, output_keys in job_attrs["output_dataset_mapping"].items(): output_keys = output_keys or [] for output_key in output_keys: output_hda = _find_hda(output_key) if output_hda: if not self.dataset_state_serialized: # dataset state has not been serialized, get state from job output_hda.state = imported_job.state imported_job.add_output_dataset(output_name, output_hda) if "output_dataset_collection_mapping" in job_attrs: for output_name, output_keys in job_attrs["output_dataset_collection_mapping"].items(): output_keys = output_keys or [] for output_key in output_keys: output_hdca = _find_hdca(output_key) if output_hdca: imported_job.add_output_dataset_collection(output_name, output_hdca) def _normalize_job_parameters( self, imported_job: model.Job, job_attrs: dict[str, Any], _find_hda: Callable, _find_hdca: Callable, _find_dce: Callable, ) -> dict[str, Any]: def remap_objects(p, k, obj): if isinstance(obj, dict) and "src" in obj and obj["src"] in ["hda", "hdca", "dce"]: if obj["src"] == "hda": imported_hda = _find_hda(obj["id"]) if imported_hda: new_id = imported_hda.id else: new_id = None elif obj["src"] == "hdca": imported_hdca = _find_hdca(obj["id"]) if imported_hdca: new_id = imported_hdca.id else: new_id = None elif obj["src"] == "dce": imported_dce = _find_dce(obj["id"]) if imported_dce: new_id = imported_dce.id else: new_id = None else: raise NotImplementedError() new_obj = obj.copy() if not new_id and self.import_options.allow_edit: # We may not have exported all job parameters, such as dces, # but we shouldn't set the object_id to none in that case. new_id = obj["id"] new_obj["id"] = new_id return (k, new_obj) return (k, obj) params = job_attrs["params"] params = remap(params, remap_objects) return cast(dict[str, Any], params)
[docs] class BagArchiveImportModelStore(DirectoryImportModelStoreLatest):
[docs] def __init__(self, bag_archive: str, **kwd) -> None: archive_dir = tempfile.mkdtemp() bdb.extract_bag(bag_archive, output_path=archive_dir) # Why this line though...? archive_dir = os.path.join(archive_dir, os.listdir(archive_dir)[0]) bdb.revert_bag(archive_dir) super().__init__(archive_dir, **kwd)
[docs] class ModelExportStore(metaclass=abc.ABCMeta):
[docs] @abc.abstractmethod def export_history( self, history: model.History, include_hidden: bool = False, include_deleted: bool = False ) -> None: """Export history to store."""
[docs] @abc.abstractmethod def export_library( self, library: model.Library, include_hidden: bool = False, include_deleted: bool = False ) -> None: """Export library to store."""
[docs] @abc.abstractmethod def export_library_folder( self, library_folder: model.LibraryFolder, include_hidden: bool = False, include_deleted: bool = False ) -> None: """Export library folder to store."""
[docs] @abc.abstractmethod def export_workflow_invocation(self, workflow_invocation, include_hidden=False, include_deleted=False): """Export workflow invocation to store."""
[docs] @abc.abstractmethod def add_dataset_collection( self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation] ): """Add Dataset Collection or HDCA to export store."""
[docs] @abc.abstractmethod def add_dataset(self, dataset: model.DatasetInstance, include_files: bool = True): """ Add HDA to export store. ``include_files`` controls whether file contents are exported as well. """
@abc.abstractmethod def __enter__(self): """Export store should be used as context manager.""" @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): """Export store should be used as context manager."""
[docs] class DirectoryModelExportStore(ModelExportStore): app: Optional[StoreAppProtocol] file_sources: Optional[ConfiguredFileSources]
[docs] def __init__( self, export_directory: StrPath, app: Optional[StoreAppProtocol] = None, file_sources: Optional[ConfiguredFileSources] = None, for_edit: bool = False, serialize_dataset_objects: Optional[bool] = None, export_files: Optional[str] = None, strip_metadata_files: bool = True, serialize_jobs: bool = True, user_context=None, ) -> None: """ :param export_directory: path to export directory. Will be created if it does not exist. :param app: Galaxy App or app-like object. Must be provided if `for_edit` and/or `serialize_dataset_objects` are True :param for_edit: Allow modifying existing HDA and dataset metadata during import. :param serialize_dataset_objects: If True will encode IDs using the host secret. Defaults `for_edit`. :param export_files: How files should be exported, can be 'symlink', 'copy' or None, in which case files will not be serialized. :param serialize_jobs: Include job data in model export. Not needed for set_metadata script. """ if not os.path.exists(export_directory): os.makedirs(export_directory) sessionless = False if app is not None: self.app = app security = app.security sessionless = False if file_sources is None: file_sources = app.file_sources else: sessionless = True security = IdEncodingHelper(id_secret="randomdoesntmatter") self.user_context = ProvidesFileSourcesUserContext(user_context) self.file_sources = file_sources self.serialize_jobs = serialize_jobs self.sessionless = sessionless self.security = security self.export_directory = export_directory self.serialization_options = model.SerializationOptions( for_edit=for_edit, serialize_dataset_objects=serialize_dataset_objects, strip_metadata_files=strip_metadata_files, serialize_files_handler=self, ) self.export_files = export_files self.included_datasets: dict[model.DatasetInstance, tuple[model.DatasetInstance, bool]] = {} self.dataset_implicit_conversions: dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {} self.included_collections: dict[ Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], ] = {} self.included_libraries: list[model.Library] = [] self.included_library_folders: list[model.LibraryFolder] = [] self.included_invocations: list[model.WorkflowInvocation] = [] self.collection_datasets: set[int] = set() self.dataset_id_to_path: dict[int, tuple[Optional[str], Optional[str]]] = {} self.job_output_dataset_associations: dict[int, dict[str, model.DatasetInstance]] = {}
@property def workflows_directory(self) -> str: return os.path.join(self.export_directory, "workflows")
[docs] def serialize_files(self, dataset: model.DatasetInstance, as_dict: JsonDictT) -> None: if self.export_files is None: return None add: Callable[[str, str], None] if self.export_files == "symlink": add = os.symlink elif self.export_files == "copy": def add(src, dest): if os.path.isdir(src): shutil.copytree(src, dest) else: shutil.copyfile(src, dest) else: raise Exception(f"Unknown export_files parameter type encountered {self.export_files}") export_directory = self.export_directory _, include_files = self.included_datasets[dataset] if not include_files: return file_name, extra_files_path = None, None try: _file_name = dataset.get_file_name() if os.path.exists(_file_name): file_name = _file_name except ObjectNotFound: pass if dataset.extra_files_path_exists(): extra_files_path = dataset.extra_files_path else: pass dir_name = "datasets" dir_path = os.path.join(export_directory, dir_name) if dataset.dataset.id in self.dataset_id_to_path: file_name, extra_files_path = self.dataset_id_to_path[dataset.dataset.id] if file_name is not None: as_dict["file_name"] = file_name if extra_files_path is not None: as_dict["extra_files_path"] = extra_files_path return conversion = self.dataset_implicit_conversions.get(dataset) conversion_key = self.serialization_options.get_identifier(self.security, conversion) if conversion else None if file_name: if not os.path.exists(dir_path): os.makedirs(dir_path) target_filename = get_export_dataset_filename( as_dict["name"], as_dict["extension"], as_dict["encoded_id"], conversion_key=conversion_key ) arcname = os.path.join(dir_name, target_filename) src = file_name dest = os.path.join(export_directory, arcname) add(src, dest) as_dict["file_name"] = arcname if extra_files_path: try: file_list = os.listdir(extra_files_path) except OSError: file_list = [] if len(file_list): extra_files_target_filename = get_export_dataset_extra_files_dir_name( as_dict["encoded_id"], conversion_key=conversion_key ) arcname = os.path.join(dir_name, extra_files_target_filename) add(extra_files_path, os.path.join(export_directory, arcname)) as_dict["extra_files_path"] = arcname else: as_dict["extra_files_path"] = "" self.dataset_id_to_path[dataset.dataset.id] = (as_dict.get("file_name"), as_dict.get("extra_files_path"))
[docs] def exported_key( self, obj: model.RepresentById, ) -> Union[str, int]: return self.serialization_options.get_identifier(self.security, obj)
def __enter__(self) -> "DirectoryModelExportStore": return self
[docs] def push_metadata_files(self): for dataset in self.included_datasets: for metadata_element in dataset.metadata.values(): if isinstance(metadata_element, model.MetadataFile): metadata_element.update_from_file(metadata_element.get_file_name())
[docs] def export_job(self, job: model.Job, tool=None, include_job_data=True): self.export_jobs([job], include_job_data=include_job_data) if tool_source := getattr(tool, "tool_source", None): with open(os.path.join(self.export_directory, "tool.xml"), "w") as out: out.write(tool_source.to_string())
[docs] def export_jobs( self, jobs: Iterable[model.Job], jobs_attrs: Optional[list[dict[str, Any]]] = None, include_job_data: bool = True, ) -> list[dict[str, Any]]: """ Export jobs. ``include_job_data`` determines whether datasets associated with jobs should be exported as well. This should generally be ``True``, except when re-exporting a job (to store the generated command line) when running the set_meta script. """ jobs_attrs = jobs_attrs or [] for job in jobs: job_attrs = job.serialize(self.security, self.serialization_options) if include_job_data: # -- Get input, output datasets. -- input_dataset_mapping: dict[str, list[Union[str, int]]] = {} output_dataset_mapping: dict[str, list[Union[str, int]]] = {} input_dataset_collection_mapping: dict[str, list[Union[str, int]]] = {} input_dataset_collection_element_mapping: dict[str, list[Union[str, int]]] = {} output_dataset_collection_mapping: dict[str, list[Union[str, int]]] = {} implicit_output_dataset_collection_mapping: dict[str, list[Union[str, int]]] = {} for id_assoc in job.input_datasets: # Optional data inputs will not have a dataset. if id_assoc.dataset: name = id_assoc.name if name not in input_dataset_mapping: input_dataset_mapping[name] = [] input_dataset_mapping[name].append(self.exported_key(id_assoc.dataset)) if include_job_data: self.add_dataset(id_assoc.dataset) for od_assoc in job.output_datasets: # Optional data inputs will not have a dataset. if od_assoc.dataset: name = od_assoc.name if name not in output_dataset_mapping: output_dataset_mapping[name] = [] output_dataset_mapping[name].append(self.exported_key(od_assoc.dataset)) if include_job_data: self.add_dataset(od_assoc.dataset) for idc_assoc in job.input_dataset_collections: # Optional data inputs will not have a dataset. if idc_assoc.dataset_collection: name = idc_assoc.name if name not in input_dataset_collection_mapping: input_dataset_collection_mapping[name] = [] input_dataset_collection_mapping[name].append(self.exported_key(idc_assoc.dataset_collection)) if include_job_data: self.export_collection(idc_assoc.dataset_collection) for idce_assoc in job.input_dataset_collection_elements: if idce_assoc.dataset_collection_element: name = idce_assoc.name if name not in input_dataset_collection_element_mapping: input_dataset_collection_element_mapping[name] = [] input_dataset_collection_element_mapping[name].append( self.exported_key(idce_assoc.dataset_collection_element) ) if include_job_data: if idce_assoc.dataset_collection_element.is_collection: assert isinstance( idce_assoc.dataset_collection_element.element_object, model.DatasetCollection ) self.export_collection(idce_assoc.dataset_collection_element.element_object) else: assert isinstance( idce_assoc.dataset_collection_element.element_object, model.DatasetInstance ) self.add_dataset(idce_assoc.dataset_collection_element.element_object) for odci_assoc in job.output_dataset_collection_instances: # Optional data outputs will not have a dataset. # These are implicit outputs, we don't need to export them if odci_assoc.dataset_collection_instance: name = odci_assoc.name if name not in output_dataset_collection_mapping: output_dataset_collection_mapping[name] = [] output_dataset_collection_mapping[name].append( self.exported_key(odci_assoc.dataset_collection_instance) ) for odc_assoc in job.output_dataset_collections: if odc_assoc.dataset_collection: name = odc_assoc.name if name not in implicit_output_dataset_collection_mapping: implicit_output_dataset_collection_mapping[name] = [] implicit_output_dataset_collection_mapping[name].append( self.exported_key(odc_assoc.dataset_collection) ) if include_job_data: self.export_collection(odc_assoc.dataset_collection) job_attrs["input_dataset_mapping"] = input_dataset_mapping job_attrs["input_dataset_collection_mapping"] = input_dataset_collection_mapping job_attrs["input_dataset_collection_element_mapping"] = input_dataset_collection_element_mapping job_attrs["output_dataset_mapping"] = output_dataset_mapping job_attrs["output_dataset_collection_mapping"] = output_dataset_collection_mapping job_attrs["implicit_output_dataset_collection_mapping"] = implicit_output_dataset_collection_mapping jobs_attrs.append(job_attrs) jobs_attrs_filename = os.path.join(self.export_directory, ATTRS_FILENAME_JOBS) with open(jobs_attrs_filename, "w") as jobs_attrs_out: jobs_attrs_out.write(json_encoder.encode(jobs_attrs)) return jobs_attrs
[docs] def export_history( self, history: model.History, include_hidden: bool = False, include_deleted: bool = False ) -> None: app = self.app assert app, "exporting histories requires being bound to a session and Galaxy app object" export_directory = self.export_directory history_attrs = history.serialize(app.security, self.serialization_options) history_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_HISTORY) with open(history_attrs_filename, "w") as history_attrs_out: dump(history_attrs, history_attrs_out) sa_session = app.model.session # Write collections' attributes (including datasets list) to file. stmt_hdca = ( select(model.HistoryDatasetCollectionAssociation) .where(model.HistoryDatasetCollectionAssociation.history == history) .where(model.HistoryDatasetCollectionAssociation.deleted == expression.false()) ) collections = sa_session.scalars(stmt_hdca) for collection in collections: # filter this ? if not collection.populated: break if collection.state != "ok": break self.export_collection(collection, include_deleted=include_deleted) # Write datasets' attributes to file. actions_backref = model.Dataset.actions stmt_hda = ( select(model.HistoryDatasetAssociation) .where(model.HistoryDatasetAssociation.history == history) .join(model.Dataset) .options(joinedload(model.HistoryDatasetAssociation.dataset).joinedload(actions_backref)) .order_by(model.HistoryDatasetAssociation.hid) .where(model.Dataset.purged == expression.false()) ) datasets = sa_session.scalars(stmt_hda).unique() for dataset in datasets: # Add a new "annotation" attribute so that the user annotation for the dataset can be serialized without needing the user dataset.annotation = get_item_annotation_str(sa_session, history.user, dataset) # type: ignore[attr-defined] should_include_file = self._should_include_dataset( dataset, include_hidden=include_hidden, include_deleted=include_deleted ) if not dataset.deleted and dataset.id in self.collection_datasets: should_include_file = True if dataset not in self.included_datasets: if should_include_file: self._ensure_dataset_file_exists(dataset) if dataset.implicitly_converted_parent_datasets: # fetching 0th of list but I think this is just a mapping quirk - I can't imagine how there # would be more than one of these -John conversion = dataset.implicitly_converted_parent_datasets[0] self.add_implicit_conversion_dataset(dataset, should_include_file, conversion) else: self.add_dataset(dataset, include_files=should_include_file)
[docs] def export_library( self, library: model.Library, include_hidden: bool = False, include_deleted: bool = False ) -> None: self.included_libraries.append(library) root_folder = library.root_folder self.export_library_folder_contents(root_folder, include_hidden=include_hidden, include_deleted=include_deleted)
[docs] def export_library_folder(self, library_folder: model.LibraryFolder, include_hidden=False, include_deleted=False): self.included_library_folders.append(library_folder) self.export_library_folder_contents( library_folder, include_hidden=include_hidden, include_deleted=include_deleted )
[docs] def export_library_folder_contents( self, library_folder: model.LibraryFolder, include_hidden: bool = False, include_deleted: bool = False ) -> None: for library_dataset in library_folder.datasets: ldda = library_dataset.library_dataset_dataset_association should_include_file = (not ldda.visible or not include_hidden) and (not ldda.deleted or include_deleted) self.add_dataset(ldda, should_include_file) for folder in library_folder.folders: self.export_library_folder_contents(folder, include_hidden=include_hidden, include_deleted=include_deleted)
def _should_include_dataset( self, dataset: model.DatasetInstance, include_hidden: bool = False, include_deleted: bool = False ) -> bool: return (dataset.visible or include_hidden) and (not dataset.deleted or include_deleted)
[docs] def export_workflow_invocation( self, workflow_invocation: model.WorkflowInvocation, include_hidden: bool = False, include_deleted: bool = False ) -> None: self.included_invocations.append(workflow_invocation) for input_dataset in workflow_invocation.input_datasets: if input_dataset.dataset and self._should_include_dataset( input_dataset.dataset, include_hidden, include_deleted ): self.add_dataset(input_dataset.dataset) for output_dataset in workflow_invocation.output_datasets: if self._should_include_dataset(output_dataset.dataset, include_hidden, include_deleted): self.add_dataset(output_dataset.dataset) for input_dataset_collection in workflow_invocation.input_dataset_collections: if input_dataset_collection.dataset_collection: self.export_collection( input_dataset_collection.dataset_collection, include_hidden=include_hidden, include_deleted=include_deleted, ) for output_dataset_collection in workflow_invocation.output_dataset_collections: self.export_collection( output_dataset_collection.dataset_collection, include_hidden=include_hidden, include_deleted=include_deleted, ) for workflow_invocation_step in workflow_invocation.steps: for assoc in workflow_invocation_step.output_datasets: if self._should_include_dataset(assoc.dataset, include_hidden, include_deleted): self.add_dataset(assoc.dataset) for assoc in workflow_invocation_step.output_dataset_collections: self.export_collection( assoc.dataset_collection, include_hidden=include_hidden, include_deleted=include_deleted )
[docs] def add_job_output_dataset_associations( self, job_id: int, name: str, dataset_instance: model.DatasetInstance ) -> None: job_output_dataset_associations = self.job_output_dataset_associations if job_id not in job_output_dataset_associations: job_output_dataset_associations[job_id] = {} job_output_dataset_associations[job_id][name] = dataset_instance
[docs] def export_collection( self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], include_deleted: bool = False, include_hidden: bool = False, ) -> None: self.add_dataset_collection(collection) # export datasets for this collection has_collection = ( collection.collection if isinstance(collection, model.HistoryDatasetCollectionAssociation) else collection ) for collection_dataset in has_collection.dataset_instances: # ignoring include_hidden since the datasets will default to hidden for this collection. if collection_dataset.deleted and not include_deleted: include_files = False else: include_files = True self.add_dataset(collection_dataset, include_files=include_files) self.collection_datasets.add(collection_dataset.id)
[docs] def add_dataset_collection( self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation] ) -> None: self.included_collections[collection] = collection
[docs] def add_implicit_conversion_dataset( self, dataset: model.DatasetInstance, include_files: bool, conversion: model.ImplicitlyConvertedDatasetAssociation, ) -> None: parent_hda = conversion.parent_hda if parent_hda and parent_hda not in self.included_datasets: # We should always include the parent of an implicit conversion # to avoid holes in the provenance. self.included_datasets[parent_hda] = (parent_hda, include_files) grand_parent_association = parent_hda.implicitly_converted_parent_datasets if grand_parent_association and (grand_parent_hda := grand_parent_association[0].parent_hda): self.add_implicit_conversion_dataset(grand_parent_hda, include_files, grand_parent_association[0]) self.included_datasets[dataset] = (dataset, include_files) self.dataset_implicit_conversions[dataset] = conversion
[docs] def add_dataset(self, dataset: model.DatasetInstance, include_files: bool = True) -> None: self.included_datasets[dataset] = (dataset, include_files)
def _ensure_dataset_file_exists(self, dataset: model.DatasetInstance) -> None: state = dataset.dataset.state if state in [model.Dataset.states.OK] and not dataset.get_file_name(): log.error( f"Dataset [{dataset.id}] does not exists on on object store [{dataset.dataset.object_store_id or 'None'}], while trying to export." ) raise Exception( f"Cannot export history dataset [{getattr(dataset, 'hid', '')}: {dataset.name}] with id {self.exported_key(dataset)}" ) def _finalize(self) -> None: export_directory = self.export_directory datasets_attrs = [] provenance_attrs = [] for dataset, include_files in self.included_datasets.values(): if include_files: datasets_attrs.append(dataset) else: provenance_attrs.append(dataset) def to_json(attributes): return json_encoder.encode([a.serialize(self.security, self.serialization_options) for a in attributes]) datasets_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_DATASETS) with open(datasets_attrs_filename, "w") as datasets_attrs_out: datasets_attrs_out.write(to_json(datasets_attrs)) with open(f"{datasets_attrs_filename}.provenance", "w") as provenance_attrs_out: provenance_attrs_out.write(to_json(provenance_attrs)) libraries_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_LIBRARIES) with open(libraries_attrs_filename, "w") as libraries_attrs_out: libraries_attrs_out.write(to_json(self.included_libraries)) library_folders_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_LIBRARY_FOLDERS) with open(library_folders_attrs_filename, "w") as library_folder_attrs_out: library_folder_attrs_out.write(to_json(self.included_library_folders)) collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS) with open(collections_attrs_filename, "w") as collections_attrs_out: collections_attrs_out.write(to_json(self.included_collections.values())) conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS) with open(conversions_attrs_filename, "w") as conversions_attrs_out: conversions_attrs_out.write(to_json(self.dataset_implicit_conversions.values())) jobs_attrs = [] for job_id, job_output_dataset_associations in self.job_output_dataset_associations.items(): output_dataset_mapping: dict[str, list[Union[str, int]]] = {} for name, dataset in job_output_dataset_associations.items(): if name not in output_dataset_mapping: output_dataset_mapping[name] = [] output_dataset_mapping[name].append(self.exported_key(dataset)) jobs_attrs.append({"id": job_id, "output_dataset_mapping": output_dataset_mapping}) if self.serialize_jobs: # # Write jobs attributes file. # # Get all jobs associated with included HDAs. jobs_dict: dict[int, model.Job] = {} implicit_collection_jobs_dict = {} def record_job(job): if not job or job.id in jobs_dict: # No viable job or job already recorded. return jobs_dict[job.id] = job if icja := job.implicit_collection_jobs_association: implicit_collection_jobs = icja.implicit_collection_jobs implicit_collection_jobs_dict[implicit_collection_jobs.id] = implicit_collection_jobs def record_associated_jobs(obj): # Get the job object. job = None for assoc in getattr(obj, "creating_job_associations", []): # For mapped over jobs obj could be DatasetCollection, which has no creating_job_association job = assoc.job break record_job(job) for hda, _include_files in self.included_datasets.values(): # Get the associated job, if any. If this hda was copied from another, # we need to find the job that created the original hda if not isinstance(hda, (model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation)): raise Exception( f"Expected a HistoryDatasetAssociation or LibraryDatasetDatasetAssociation, but got a {type(hda)}: {hda}" ) job_hda = hda while job_hda.copied_from_history_dataset_association: # should this check library datasets as well? # record job (if one exists) even if dataset was copied # copy could have been created manually through UI/API or using database operation tool, # in which case we have a relevant job to export. record_associated_jobs(job_hda) job_hda = job_hda.copied_from_history_dataset_association record_associated_jobs(job_hda) for hdca in self.included_collections: record_associated_jobs(hdca) self.export_jobs(jobs_dict.values(), jobs_attrs=jobs_attrs) for invocation in self.included_invocations: for step in invocation.steps: for job in step.jobs: record_job(job) if step.implicit_collection_jobs: implicit_collection_jobs = step.implicit_collection_jobs implicit_collection_jobs_dict[implicit_collection_jobs.id] = implicit_collection_jobs # Get jobs' attributes. icjs_attrs = [] for icj in implicit_collection_jobs_dict.values(): icj_attrs = icj.serialize(self.security, self.serialization_options) icjs_attrs.append(icj_attrs) icjs_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS) with open(icjs_attrs_filename, "w") as icjs_attrs_out: icjs_attrs_out.write(json_encoder.encode(icjs_attrs)) invocations_attrs = [] for invocation in self.included_invocations: invocation_attrs = invocation.serialize(self.security, self.serialization_options) workflows_directory = self.workflows_directory safe_makedirs(workflows_directory) workflow = invocation.workflow workflow_key = self.serialization_options.get_identifier(self.security, workflow) history = invocation.history assert invocation_attrs invocation_attrs["workflow"] = workflow_key if not self.app: raise Exception(f"Missing self.app in {self}.") self.app.workflow_contents_manager.store_workflow_artifacts( workflows_directory, workflow_key, workflow, user=history.user, history=history ) invocations_attrs.append(invocation_attrs) invocations_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_INVOCATIONS) with open(invocations_attrs_filename, "w") as invocations_attrs_out: dump(invocations_attrs, invocations_attrs_out) export_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_EXPORT) with open(export_attrs_filename, "w") as export_attrs_out: dump({"galaxy_export_version": GALAXY_EXPORT_VERSION}, export_attrs_out) def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> bool: if exc_type is None: self._finalize() # http://effbot.org/zone/python-with-statement.htm # Ignores TypeError exceptions return isinstance(exc_val, TypeError)
[docs] class WriteCrates: included_invocations: list[model.WorkflowInvocation] export_directory: StrPath included_datasets: dict[model.DatasetInstance, tuple[model.DatasetInstance, bool]] dataset_implicit_conversions: dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] dataset_id_to_path: dict[int, tuple[Optional[str], Optional[str]]] @property @abc.abstractmethod def workflows_directory(self) -> str: ... def _generate_markdown_readme(self) -> str: markdown_parts: list[str] = [] if self._is_single_invocation_export(): invocation = self.included_invocations[0] name = invocation.workflow.name create_time = invocation.create_time markdown_parts.append("# Galaxy Workflow Invocation Export") markdown_parts.append("") markdown_parts.append(f"This crate describes the invocation of workflow {name} executed at {create_time}.") else: markdown_parts.append("# Galaxy Dataset Export") return "\n".join(markdown_parts) def _is_single_invocation_export(self) -> bool: return len(self.included_invocations) == 1 def _init_crate(self) -> ROCrate: is_invocation_export = self._is_single_invocation_export() if is_invocation_export: invocation_crate_builder = WorkflowRunCrateProfileBuilder(self) return invocation_crate_builder.build_crate() ro_crate = ROCrate() # TODO: allow user to set the name, or get the name of the history ro_crate.name = "Galaxy dataset export" ro_crate.description = ( "This is a Galaxy dataset export " f"including {len(self.included_datasets)} datasets. " "This RO-Crate was automatically generated by Galaxy." ) ro_crate.license = "" markdown_path = os.path.join(self.export_directory, "README.md") with open(markdown_path, "w") as f: f.write(self._generate_markdown_readme()) properties = { "name": "README.md", "encodingFormat": "text/markdown", "about": {"@id": "./"}, } ro_crate.add_file( markdown_path, dest_path="README.md", properties=properties, ) for dataset, _ in self.included_datasets.values(): if dataset.dataset.id in self.dataset_id_to_path: file_name, _ = self.dataset_id_to_path[dataset.dataset.id] if file_name is None: # The dataset was discarded or no longer exists. No file to export. # TODO: should this be registered in the crate as a special case? log.warning( "RO-Crate export: skipping dataset [%s] with state [%s] because file does not exist.", dataset.id, dataset.state, ) continue name = dataset.name encoding_format = dataset.datatype.get_mime() properties = { "name": name, "encodingFormat": encoding_format, } ro_crate.add_file( os.path.join(self.export_directory, file_name), dest_path=file_name, properties=properties, ) workflows_directory = self.workflows_directory if os.path.exists(workflows_directory): for filename in os.listdir(workflows_directory): is_computational_wf = not filename.endswith(".cwl") workflow_cls = ComputationalWorkflow if is_computational_wf else WorkflowDescription lang = "galaxy" if not filename.endswith(".cwl") else "cwl" dest_path = os.path.join("workflows", filename) is_main_entity = is_invocation_export and is_computational_wf ro_crate.add_workflow( source=os.path.join(workflows_directory, filename), dest_path=dest_path, main=is_main_entity, cls=workflow_cls, lang=lang, ) found_workflow_licenses = set() for workflow_invocation in self.included_invocations: workflow = workflow_invocation.workflow license = workflow.license if license: found_workflow_licenses.add(license) if len(found_workflow_licenses) == 1: ro_crate.license = next(iter(found_workflow_licenses)) # TODO: license per workflow # TODO: API options to license workflow outputs seprately # TODO: Export report as PDF and stick it in here return ro_crate
[docs] class WorkflowInvocationOnlyExportStore(DirectoryModelExportStore):
[docs] def export_history(self, history: model.History, include_hidden: bool = False, include_deleted: bool = False): """Export history to store.""" raise NotImplementedError()
[docs] def export_library(self, history, include_hidden=False, include_deleted=False): """Export library to store.""" raise NotImplementedError()
@property def only_invocation(self) -> model.WorkflowInvocation: assert len(self.included_invocations) == 1 return self.included_invocations[0]
[docs] @dataclass class BcoExportOptions: galaxy_url: str galaxy_version: str merge_history_metadata: bool = False override_environment_variables: Optional[dict[str, str]] = None override_empirical_error: Optional[dict[str, str]] = None override_algorithmic_error: Optional[dict[str, str]] = None override_xref: Optional[list[XrefItem]] = None
[docs] class FileSourceModelExportStore(abc.ABC, DirectoryModelExportStore): """ Export to file sources, from where data can be retrieved later on using a URI. """ file_source_uri: Optional[StrPath] = None # data can be retrieved later using this URI out_file: StrPath # the output file is written to this path, from which it is written to the file source
[docs] def __init__(self, uri, **kwds): temp_output_dir = tempfile.mkdtemp() self.temp_output_dir = temp_output_dir if "://" in str(uri): self.out_file = os.path.join(temp_output_dir, "out") self.file_source_uri = uri export_directory = os.path.join(temp_output_dir, "export") else: self.out_file = uri self.file_source_uri = None export_directory = temp_output_dir super().__init__(export_directory, **kwds)
@abc.abstractmethod def _generate_output_file(self) -> None: """ Generate the output file that will be uploaded to the file source. Produce an output file and save it to `self.out_file`. A common pattern for this method to create an archive out of `self.export_directory`. This method runs after `DirectoryModelExportStore._finalize()`. Therefore, `self.export_directory` will already have been populated when it runs. """ def _finalize(self): super()._finalize() # populate `self.export_directory` self._generate_output_file() # generate the output file `self.out_file` if self.file_source_uri: # upload output file to file source if not self.file_sources: raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.") file_source_uri = urlparse(str(self.file_source_uri)) file_source_path = self.file_sources.get_file_source_path(self.file_source_uri) file_source = file_source_path.file_source assert os.path.exists(self.out_file) self.file_source_uri = f"{file_source_uri.scheme}://{file_source_uri.netloc}" + file_source.write_from( file_source_path.path, self.out_file, user_context=self.user_context ) shutil.rmtree(self.temp_output_dir)
[docs] class BcoModelExportStore(FileSourceModelExportStore, WorkflowInvocationOnlyExportStore):
[docs] def __init__(self, uri, export_options: BcoExportOptions, **kwds): self.export_options = export_options super().__init__(uri, **kwds)
def _generate_output_file(self): core_biocompute_object, object_id = self._core_biocompute_object_and_object_id() write_to_file(object_id, core_biocompute_object, self.out_file) def _core_biocompute_object_and_object_id(self) -> tuple[BioComputeObjectCore, str]: assert self.app # need app.security to do anything... export_options = self.export_options workflow_invocation = self.only_invocation history = workflow_invocation.history workflow = workflow_invocation.workflow stored_workflow = workflow.stored_workflow def get_dataset_url(encoded_dataset_id: str): return f"{export_options.galaxy_url}api/datasets/{encoded_dataset_id}/display" # pull in the creator_metadata info from workflow if it exists contributors = get_contributors(workflow.creator_metadata) provenance_domain = ProvenanceDomain( name=workflow.name, version=bco_workflow_version(workflow), review=[], contributors=contributors, license=workflow.license or "", created=workflow_invocation.create_time.isoformat(), modified=workflow_invocation.update_time.isoformat(), ) keywords = [] for tag in stored_workflow.tags: keywords.append(tag.user_tname) if export_options.merge_history_metadata: for tag in history.tags: if tag.user_tname not in keywords: keywords.append(tag.user_tname) # metrics = {} ... TODO pipeline_steps: list[PipelineStep] = [] software_prerequisite_tracker = SoftwarePrerequisiteTracker() input_subdomain_items: list[InputSubdomainItem] = [] output_subdomain_items: list[OutputSubdomainItem] = [] for step in workflow_invocation.steps: workflow_step = step.workflow_step software_prerequisite_tracker.register_step(workflow_step) if workflow_step.type == "tool": workflow_outputs_list = set() output_list: list[DescriptionDomainUri] = [] input_list: list[DescriptionDomainUri] = [] for wo in workflow_step.workflow_outputs: workflow_outputs_list.add(wo.output_name) for job in step.jobs: for job_input in job.input_datasets: if hasattr(job_input.dataset, "dataset_id"): encoded_dataset_id = self.app.security.encode_id(job_input.dataset.dataset_id) url = get_dataset_url(encoded_dataset_id) input_uri_obj = DescriptionDomainUri( # TODO: that should maybe be a step prefix + element identifier where appropriate. filename=job_input.dataset.name, uri=url, access_time=job_input.dataset.create_time.isoformat(), ) input_list.append(input_uri_obj) for job_output in job.output_datasets: if hasattr(job_output.dataset, "dataset_id"): encoded_dataset_id = self.app.security.encode_id(job_output.dataset.dataset_id) url = get_dataset_url(encoded_dataset_id) output_obj = DescriptionDomainUri( filename=job_output.dataset.name, uri=url, access_time=job_output.dataset.create_time.isoformat(), ) output_list.append(output_obj) if job_output.name in workflow_outputs_list: output = OutputSubdomainItem( mediatype=job_output.dataset.extension, uri=InputAndOutputDomainUri( filename=job_output.dataset.name, uri=url, access_time=job_output.dataset.create_time.isoformat(), ), ) output_subdomain_items.append(output) step_index = workflow_step.order_index step_name = workflow_step.label or workflow_step.tool_id pipeline_step = PipelineStep( step_number=step_index, name=step_name, description=workflow_step.annotations[0].annotation if workflow_step.annotations else "", version=workflow_step.tool_version, prerequisite=[], input_list=input_list, output_list=output_list, ) pipeline_steps.append(pipeline_step) if workflow_step.type == "data_input" and step.output_datasets: for output_assoc in step.output_datasets: encoded_dataset_id = self.app.security.encode_id(output_assoc.dataset_id) url = get_dataset_url(encoded_dataset_id) input_obj = InputSubdomainItem( uri=Uri( uri=url, filename=workflow_step.label, access_time=workflow_step.update_time.isoformat(), ), ) input_subdomain_items.append(input_obj) if workflow_step.type == "data_collection_input" and step.output_dataset_collections: for output_dataset_collection_association in step.output_dataset_collections: encoded_dataset_id = self.app.security.encode_id( output_dataset_collection_association.dataset_collection_id ) url = f"{export_options.galaxy_url}api/dataset_collections/{encoded_dataset_id}/download" input_obj = InputSubdomainItem( uri=Uri( uri=url, filename=workflow_step.label, access_time=workflow_step.update_time.isoformat(), ), ) input_subdomain_items.append(input_obj) usability_domain_str: list[str] = [] for a in stored_workflow.annotations: usability_domain_str.append(a.annotation) if export_options.merge_history_metadata: for h in history.annotations: usability_domain_str.append(h.annotation) parametric_domain_items: list[ParametricDomainItem] = [] for inv_step in workflow_invocation.steps: try: tool_inputs = inv_step.workflow_step.tool_inputs if tool_inputs: for k, v in tool_inputs.items(): param, value, step_index = k, v, inv_step.workflow_step.order_index parametric_domain_items.append( ParametricDomainItem(param=str(param), value=str(value), step=str(step_index)) ) except Exception: continue encoded_workflow_id = self.app.security.encode_id(workflow.id) execution_domain = galaxy_execution_domain( export_options.galaxy_url, f"{export_options.galaxy_url}api/workflows?encoded_workflow_id={encoded_workflow_id}", software_prerequisite_tracker.software_prerequisites, export_options.override_environment_variables, ) extension_domain = extension_domains(export_options.galaxy_url, export_options.galaxy_version) error_domain = ErrorDomain( empirical_error=export_options.override_empirical_error or {}, algorithmic_error=export_options.override_algorithmic_error or {}, ) usability_domain = UsabilityDomain(root=usability_domain_str) description_domain = DescriptionDomain( keywords=keywords, xref=export_options.override_xref or [], platform=["Galaxy"], pipeline_steps=pipeline_steps, ) parametric_domain = ParametricDomain(root=parametric_domain_items) io_domain = InputAndOutputDomain( input_subdomain=input_subdomain_items, output_subdomain=output_subdomain_items, ) core = BioComputeObjectCore( description_domain=description_domain, error_domain=error_domain, execution_domain=execution_domain, extension_domain=extension_domain, io_domain=io_domain, parametric_domain=parametric_domain, provenance_domain=provenance_domain, usability_domain=usability_domain, ) encoded_invocation_id = self.app.security.encode_id(workflow_invocation.id) url = f"{export_options.galaxy_url}api/invocations/{encoded_invocation_id}" return core, url
[docs] class ROCrateModelExportStore(DirectoryModelExportStore, WriteCrates):
[docs] def __init__(self, crate_directory: StrPath, **kwds) -> None: self.crate_directory = crate_directory super().__init__(crate_directory, export_files="symlink", **kwds)
def _finalize(self) -> None: super()._finalize() ro_crate = self._init_crate() ro_crate.write(self.crate_directory)
[docs] class ROCrateArchiveModelExportStore(FileSourceModelExportStore, WriteCrates): def _generate_output_file(self): ro_crate = self._init_crate() ro_crate.write(self.export_directory) out_file_name = str(self.out_file) if out_file_name.endswith(".zip"): out_file = out_file_name[: -len(".zip")] else: out_file = out_file_name archive = make_fast_zipfile(base_name=out_file, base_dir=self.export_directory, root_dir=self.export_directory) shutil.move(archive, self.out_file)
[docs] class TarModelExportStore(FileSourceModelExportStore):
[docs] def __init__(self, uri: StrPath, gzip: bool = True, **kwds) -> None: self.gzip = gzip super().__init__(uri, **kwds)
def _generate_output_file(self): tar_export_directory(self.export_directory, self.out_file, self.gzip)
[docs] class BagDirectoryModelExportStore(DirectoryModelExportStore):
[docs] def __init__(self, out_directory: str, **kwds) -> None: self.out_directory = out_directory super().__init__(out_directory, **kwds)
def _finalize(self) -> None: super()._finalize() bdb.make_bag(self.out_directory)
[docs] class BagArchiveModelExportStore(FileSourceModelExportStore, BagDirectoryModelExportStore):
[docs] def __init__(self, uri: StrPath, bag_archiver: str = "tgz", **kwds) -> None: # bag_archiver in tgz, zip, tar self.bag_archiver = bag_archiver super().__init__(uri, **kwds)
def _generate_output_file(self): archive = bdb.archive_bag(self.export_directory, self.bag_archiver) shutil.move(archive, self.out_file)
[docs] def get_export_store_factory( app, download_format: str, export_files=None, bco_export_options: Optional[BcoExportOptions] = None, user_context=None, ) -> Callable[[StrPath], FileSourceModelExportStore]: export_store_class: type[FileSourceModelExportStore] export_store_class_kwds = { "app": app, "export_files": export_files, "serialize_dataset_objects": False, "user_context": user_context, } if download_format in ["tar.gz", "tgz"]: export_store_class = TarModelExportStore export_store_class_kwds["gzip"] = True elif download_format in ["tar"]: export_store_class = TarModelExportStore export_store_class_kwds["gzip"] = False elif download_format == "rocrate.zip": export_store_class = ROCrateArchiveModelExportStore elif download_format == "bco.json": export_store_class = BcoModelExportStore export_store_class_kwds["export_options"] = bco_export_options elif download_format.startswith("bag."): bag_archiver = download_format[len("bag.") :] if bag_archiver not in ["zip", "tar", "tgz"]: raise RequestParameterInvalidException(f"Unknown download format [{download_format}]") export_store_class = BagArchiveModelExportStore export_store_class_kwds["bag_archiver"] = bag_archiver else: raise RequestParameterInvalidException(f"Unknown download format [{download_format}]") return lambda path: export_store_class(path, **export_store_class_kwds)
[docs] def tar_export_directory(export_directory: StrPath, out_file: StrPath, gzip: bool) -> None: tarfile_mode: Literal["w", "w:gz"] = "w:gz" if gzip else "w" with tarfile.open(out_file, tarfile_mode, dereference=True) as store_archive: for export_path in os.listdir(export_directory): store_archive.add(os.path.join(export_directory, export_path), arcname=export_path)
[docs] def get_export_dataset_filename(name: str, ext: str, encoded_id: str, conversion_key: Optional[str]) -> str: """ Builds a filename for a dataset using its name an extension. """ base = "".join(c in FILENAME_VALID_CHARS and c or "_" for c in name) if not conversion_key: return f"{base}_{encoded_id}.{ext}" else: return f"{base}_{encoded_id}_conversion_{conversion_key}.{ext}"
[docs] def get_export_dataset_extra_files_dir_name(encoded_id: str, conversion_key: Optional[str]) -> str: if not conversion_key: return f"extra_files_path_{encoded_id}" else: return f"extra_files_path_{encoded_id}_conversion_{conversion_key}"
[docs] def imported_store_for_metadata( directory: str, object_store: Optional[ObjectStore] = None ) -> BaseDirectoryImportModelStore: import_options = ImportOptions(allow_dataset_object_edit=True, allow_edit=True) import_model_store = get_import_model_store_for_directory( directory, import_options=import_options, object_store=object_store ) import_model_store.perform_import() return import_model_store
[docs] def source_to_import_store( source: Union[str, dict], app: StoreAppProtocol, import_options: Optional[ImportOptions], model_store_format: Optional[ModelStoreFormat] = None, user_context=None, ) -> ModelImportStore: galaxy_user = user_context.user if user_context else None if isinstance(source, dict): if model_store_format is not None: raise Exception( "Can only specify a model_store_format as an argument to source_to_import_store in conjuction with URIs" ) model_import_store: ModelImportStore = get_import_model_store_for_dict( source, import_options=import_options, app=app, user=galaxy_user, ) else: source_uri: str = str(source) delete = False tag_handler = app.tag_handler.create_tag_handler_session(galaxy_session=None) if source_uri.startswith("file://"): source_uri = source_uri[len("file://") :] if "://" in source_uri: user_context = ProvidesFileSourcesUserContext(user_context) source_uri = stream_url_to_file( source_uri, app.file_sources, prefix="gx_import_model_store", user_context=user_context ) delete = True target_path = source_uri if target_path.endswith(".json"): with open(target_path) as f: store_dict = load(f) assert isinstance(store_dict, dict) model_import_store = get_import_model_store_for_dict( store_dict, import_options=import_options, app=app, user=galaxy_user, ) elif os.path.isdir(target_path): model_import_store = get_import_model_store_for_directory( target_path, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler ) else: model_store_format = model_store_format or ModelStoreFormat.TGZ if ModelStoreFormat.is_compressed(model_store_format): try: temp_dir = mkdtemp() with CompressedFile(target_path) as cf: target_dir = cf.extract(temp_dir) finally: if delete: os.remove(target_path) model_import_store = get_import_model_store_for_directory( target_dir, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler ) elif ModelStoreFormat.is_bag(model_store_format): model_import_store = BagArchiveImportModelStore( target_path, import_options=import_options, app=app, user=galaxy_user ) else: raise Exception(f"Unknown model_store_format type encountered {model_store_format}") return model_import_store
[docs] def payload_to_source_uri(payload) -> str: if payload.store_content_uri: source_uri = payload.store_content_uri else: store_dict = payload.store_dict assert isinstance(store_dict, dict) temp_dir = mkdtemp() import_json = os.path.join(temp_dir, "import_store.json") with open(import_json, "w") as f: dump(store_dict, f) source_uri = f"file://{import_json}" return source_uri
[docs] def copy_dataset_instance_metadata_attributes(source: model.DatasetInstance, target: model.DatasetInstance) -> None: target.metadata = source.metadata target.blurb = source.blurb target.peek = source.peek target.info = source.info target.tool_version = source.tool_version target.extension = source.extension