import abc
import contextlib
import datetime
import logging
import os
import shutil
import tarfile
import tempfile
from collections import defaultdict
from collections.abc import (
Iterable,
Iterator,
)
from dataclasses import dataclass
from enum import Enum
from json import (
dump,
dumps,
load,
)
from tempfile import mkdtemp
from types import TracebackType
from typing import (
Any,
Callable,
cast,
Literal,
Optional,
TYPE_CHECKING,
Union,
)
from urllib.parse import urlparse
from bdbag import bdbag_api as bdb
from boltons.iterutils import remap
from pydantic import (
BaseModel,
ConfigDict,
)
from rocrate.model.computationalworkflow import (
ComputationalWorkflow,
WorkflowDescription,
)
from rocrate.rocrate import ROCrate
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.sql import expression
from typing_extensions import Protocol
from galaxy.datatypes.registry import Registry
from galaxy.exceptions import (
MalformedContents,
ObjectNotFound,
RequestParameterInvalidException,
)
from galaxy.files import (
ConfiguredFileSources,
ProvidesFileSourcesUserContext,
)
from galaxy.files.uris import stream_url_to_file
from galaxy.model.base import ensure_object_added_to_session
from galaxy.model.mapping import GalaxyModelMapping
from galaxy.model.metadata import MetadataCollection
from galaxy.model.orm.util import (
add_object_to_object_session,
add_object_to_session,
get_object_session,
)
from galaxy.model.tags import GalaxyTagHandler
from galaxy.objectstore import (
BaseObjectStore,
ObjectStore,
persist_extra_files,
)
from galaxy.schema.bco import (
BioComputeObjectCore,
DescriptionDomain,
DescriptionDomainUri,
ErrorDomain,
InputAndOutputDomain,
InputAndOutputDomainUri,
InputSubdomainItem,
OutputSubdomainItem,
ParametricDomain,
ParametricDomainItem,
PipelineStep,
ProvenanceDomain,
UsabilityDomain,
XrefItem,
)
from galaxy.schema.bco.io_domain import Uri
from galaxy.schema.bco.util import (
extension_domains,
galaxy_execution_domain,
get_contributors,
write_to_file,
)
from galaxy.schema.schema import (
DatasetStateField,
ModelStoreFormat,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util import (
FILENAME_VALID_CHARS,
in_directory,
safe_makedirs,
)
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import (
CompressedFile,
make_fast_zipfile,
)
from galaxy.util.path import StrPath
from ._bco_convert_utils import (
bco_workflow_version,
SoftwarePrerequisiteTracker,
)
from .ro_crate_utils import WorkflowRunCrateProfileBuilder
from ..custom_types import json_encoder
from ..item_attrs import (
add_item_annotation,
get_item_annotation_str,
)
from ... import model
if TYPE_CHECKING:
from galaxy.managers.workflows import WorkflowContentsManager
from galaxy.model import (
HistoryItem,
ImplicitCollectionJobs,
)
from galaxy.model.tags import GalaxyTagHandlerSession
log = logging.getLogger(__name__)
ObjectKeyType = Union[str, int]
ATTRS_FILENAME_HISTORY = "history_attrs.txt"
ATTRS_FILENAME_DATASETS = "datasets_attrs.txt"
ATTRS_FILENAME_JOBS = "jobs_attrs.txt"
ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS = "implicit_collection_jobs_attrs.txt"
ATTRS_FILENAME_COLLECTIONS = "collections_attrs.txt"
ATTRS_FILENAME_EXPORT = "export_attrs.txt"
ATTRS_FILENAME_LIBRARIES = "libraries_attrs.txt"
ATTRS_FILENAME_LIBRARY_FOLDERS = "library_folders_attrs.txt"
ATTRS_FILENAME_INVOCATIONS = "invocation_attrs.txt"
ATTRS_FILENAME_CONVERSIONS = "implicit_dataset_conversions.txt"
TRACEBACK = "traceback.txt"
GALAXY_EXPORT_VERSION = "2"
DICT_STORE_ATTRS_KEY_HISTORY = "history"
DICT_STORE_ATTRS_KEY_DATASETS = "datasets"
DICT_STORE_ATTRS_KEY_COLLECTIONS = "collections"
DICT_STORE_ATTRS_KEY_CONVERSIONS = "implicit_dataset_conversions"
DICT_STORE_ATTRS_KEY_JOBS = "jobs"
DICT_STORE_ATTRS_KEY_IMPLICIT_COLLECTION_JOBS = "implicit_collection_jobs"
DICT_STORE_ATTRS_KEY_LIBRARIES = "libraries"
DICT_STORE_ATTRS_KEY_INVOCATIONS = "invocations"
JsonDictT = dict[str, Any]
[docs]
class StoreAppProtocol(Protocol):
"""Define the parts of a Galaxy-like app consumed by model store."""
datatypes_registry: Registry
object_store: BaseObjectStore
security: IdEncodingHelper
tag_handler: GalaxyTagHandler
model: GalaxyModelMapping
file_sources: ConfiguredFileSources
workflow_contents_manager: "WorkflowContentsManager"
[docs]
class ImportDiscardedDataType(Enum):
# Don't allow discarded 'okay' datasets on import, datasets will be marked deleted.
FORBID = "forbid"
# Allow datasets to be imported as experimental DISCARDED datasets that are not deleted if file data unavailable.
ALLOW = "allow"
# Import all datasets as discarded regardless of whether file data is available in the store.
FORCE = "force"
[docs]
class DatasetAttributeImportModel(BaseModel):
state: Optional[DatasetStateField] = None
external_filename: Optional[str] = None
_extra_files_path: Optional[str] = None
file_size: Optional[int] = None
object_store_id: Optional[str] = None
total_size: Optional[int] = None
created_from_basename: Optional[str] = None
uuid: Optional[str] = None
model_config = ConfigDict(extra="ignore")
DEFAULT_DISCARDED_DATA_TYPE = ImportDiscardedDataType.FORBID
[docs]
class ImportOptions:
allow_edit: bool
allow_library_creation: bool
allow_dataset_object_edit: bool
discarded_data: ImportDiscardedDataType
[docs]
def __init__(
self,
allow_edit: bool = False,
allow_library_creation: bool = False,
allow_dataset_object_edit: Optional[bool] = None,
discarded_data: ImportDiscardedDataType = DEFAULT_DISCARDED_DATA_TYPE,
) -> None:
self.allow_edit = allow_edit
self.allow_library_creation = allow_library_creation
if allow_dataset_object_edit is None:
self.allow_dataset_object_edit = allow_edit
else:
self.allow_dataset_object_edit = allow_dataset_object_edit
self.discarded_data = discarded_data
[docs]
class SessionlessContext:
[docs]
def __init__(self) -> None:
self.objects: dict[type, dict] = defaultdict(dict)
[docs]
def execute(self, query: Any, *args, **kwargs) -> Any:
pass
[docs]
def delete(self, obj: model.RepresentById) -> None:
self.objects[obj.__class__].pop(obj.id, None)
[docs]
def scalars(self, query: Any, *args, **kwargs) -> Any:
pass
[docs]
def commit(self) -> None:
pass
[docs]
def flush(self) -> None:
pass
[docs]
def add(self, obj: model.RepresentById) -> None:
self.objects[obj.__class__][obj.id] = obj
[docs]
def query(self, model_class: type[model.RepresentById]) -> Bunch:
def find(obj_id):
return self.objects.get(model_class, {}).get(obj_id) or None
def filter_by(*args, **kwargs):
# TODO: Hack for history export archive, should support this too
return Bunch(first=lambda: next(iter(self.objects.get(model_class, {None: None}))))
return Bunch(find=find, get=find, filter_by=filter_by)
[docs]
def get(self, model_class: type[model.RepresentById], primary_key: Any): # patch for SQLAlchemy 2.0 compatibility
return self.query(model_class).get(primary_key)
[docs]
class ModelImportStore(metaclass=abc.ABCMeta):
app: Optional[StoreAppProtocol]
archive_dir: str
sa_session: Union[scoped_session, SessionlessContext]
[docs]
def __init__(
self,
import_options: Optional[ImportOptions] = None,
app: Optional[StoreAppProtocol] = None,
user: Optional[model.User] = None,
object_store: Optional[ObjectStore] = None,
tag_handler: Optional["GalaxyTagHandlerSession"] = None,
) -> None:
if object_store is None:
if app is not None:
object_store = app.object_store
self.object_store = object_store
self.app = app
if app is not None:
self.sa_session = app.model.session
self.sessionless = False
else:
self.sa_session = SessionlessContext()
self.sessionless = True
self.user = user
self.import_options = import_options or ImportOptions()
self.dataset_state_serialized = True
self.tag_handler = tag_handler
if self.defines_new_history():
self.import_history_encoded_id = self.new_history_properties().get("encoded_id")
else:
self.import_history_encoded_id = None
[docs]
@abc.abstractmethod
def workflow_paths(self) -> Iterator[tuple[str, str]]: ...
[docs]
@abc.abstractmethod
def defines_new_history(self) -> bool:
"""Does this store define a new history to create."""
[docs]
@abc.abstractmethod
def new_history_properties(self) -> dict[str, Any]:
"""Dict of history properties if defines_new_history() is truthy."""
[docs]
@abc.abstractmethod
def datasets_properties(self) -> list[dict[str, Any]]:
"""Return a list of HDA properties."""
[docs]
def library_properties(self) -> list[dict[str, Any]]:
"""Return a list of library properties."""
return []
[docs]
@abc.abstractmethod
def invocations_properties(self) -> list[dict[str, Any]]: ...
[docs]
@abc.abstractmethod
def collections_properties(self) -> list[dict[str, Any]]:
"""Return a list of HDCA properties."""
[docs]
@abc.abstractmethod
def implicit_dataset_conversion_properties(self) -> list[dict[str, Any]]:
"""Return a list of ImplicitlyConvertedDatasetAssociation properties."""
[docs]
@abc.abstractmethod
def jobs_properties(self) -> list[dict[str, Any]]:
"""Return a list of jobs properties."""
[docs]
@abc.abstractmethod
def implicit_collection_jobs_properties(self) -> list[dict[str, Any]]: ...
@property
@abc.abstractmethod
def object_key(self) -> str:
"""Key used to connect objects in metadata.
Legacy exports used 'hid' but associated objects may not be from the same history
and a history may contain multiple objects with the same 'hid'.
"""
@property
def file_source_root(self) -> Optional[str]:
"""Source of valid file data."""
return None
[docs]
def trust_hid(self, obj_attrs: dict[str, Any]) -> bool:
"""Trust HID when importing objects into a new History."""
return (
self.import_history_encoded_id is not None
and obj_attrs.get("history_encoded_id") == self.import_history_encoded_id
)
[docs]
@contextlib.contextmanager
def target_history(
self, default_history: Optional[model.History] = None, legacy_history_naming: bool = True
) -> Iterator[Optional[model.History]]:
new_history = None
if self.defines_new_history():
history_properties = self.new_history_properties()
history_name = history_properties.get("name")
if history_name and legacy_history_naming:
history_name = f"imported from archive: {history_name}"
elif history_name:
pass # history_name = history_name
else:
history_name = "unnamed imported history"
# Create history.
new_history = model.History(name=history_name, user=self.user)
new_history.importing = True
hid_counter = history_properties.get("hid_counter")
genome_build = history_properties.get("genome_build")
# TODO: This seems like it shouldn't be imported, try to test and verify we can calculate this
# and get away without it. -John
if hid_counter:
new_history.hid_counter = hid_counter
if genome_build:
new_history.genome_build = genome_build
self._session_add(new_history)
self._flush()
if self.user:
add_item_annotation(self.sa_session, self.user, new_history, history_properties.get("annotation"))
history: Optional[model.History] = new_history
else:
history = default_history
yield history
if new_history is not None:
# Done importing.
new_history.importing = False
self._flush()
def _attach_dataset_hashes(
self,
dataset_or_file_attrs: dict[str, Any],
dataset_instance: model.DatasetInstance,
) -> None:
if "hashes" in dataset_or_file_attrs:
for hash_attrs in dataset_or_file_attrs["hashes"]:
hash_obj = model.DatasetHash()
hash_obj.hash_value = hash_attrs["hash_value"]
hash_obj.hash_function = hash_attrs["hash_function"]
hash_obj.extra_files_path = hash_attrs["extra_files_path"]
dataset_instance.dataset.hashes.append(hash_obj)
def _attach_dataset_sources(
self,
dataset_or_file_attrs: dict[str, Any],
dataset_instance: model.DatasetInstance,
) -> None:
if "sources" in dataset_or_file_attrs:
for source_attrs in dataset_or_file_attrs["sources"]:
source_obj = model.DatasetSource()
source_obj.source_uri = source_attrs["source_uri"]
transform_actions = source_attrs["transform"]
recorded_requested_transform = "requested_transform" in source_attrs
if recorded_requested_transform:
source_obj.requested_transform = source_attrs["requested_transform"]
if dataset_instance.state != "deferred":
source_obj.transform = transform_actions
else:
# legacy transform actions - if this is a deferred dataset treat as requested
# transform actions
if dataset_instance.state == "deferred":
source_obj.requested_transform = transform_actions or []
else:
source_obj.transform = transform_actions
source_obj.extra_files_path = source_attrs["extra_files_path"]
for hash_attrs in source_attrs["hashes"]:
hash_obj = model.DatasetSourceHash()
hash_obj.hash_value = hash_attrs["hash_value"]
hash_obj.hash_function = hash_attrs["hash_function"]
source_obj.hashes.append(hash_obj)
dataset_instance.dataset.sources.append(source_obj)
def _import_datasets(
self,
object_import_tracker: "ObjectImportTracker",
datasets_attrs: list[dict[str, Any]],
history: Optional[model.History],
new_history: bool,
job: Optional[model.Job],
) -> None:
object_key = self.object_key
def handle_dataset_object_edit(dataset_instance, dataset_attrs):
if "dataset" in dataset_attrs:
assert self.import_options.allow_dataset_object_edit
dataset_attributes = DatasetAttributeImportModel(**dataset_attrs["dataset"]).model_dump(
exclude_unset=True,
)
for attribute, value in dataset_attributes.items():
setattr(dataset_instance.dataset, attribute, value)
if dataset_instance.dataset.purged:
dataset_instance.dataset.full_delete()
self._attach_dataset_hashes(dataset_attrs["dataset"], dataset_instance)
self._attach_dataset_sources(dataset_attrs["dataset"], dataset_instance)
if "id" in dataset_attrs["dataset"] and self.import_options.allow_edit:
dataset_instance.dataset.id = dataset_attrs["dataset"]["id"]
for dataset_association in dataset_instance.dataset.history_associations:
if (
dataset_association is not dataset_instance
and dataset_association.extension == dataset_instance.extension
or dataset_association.extension == "auto"
):
copy_dataset_instance_metadata_attributes(
source=dataset_instance, target=dataset_association
)
if job:
dataset_instance.dataset.job_id = job.id
for dataset_attrs in datasets_attrs:
if "state" not in dataset_attrs:
self.dataset_state_serialized = False
if "id" in dataset_attrs and self.import_options.allow_edit and not self.sessionless:
model_class = getattr(model, dataset_attrs["model_class"])
dataset_instance = self.sa_session.get(model_class, dataset_attrs["id"])
assert isinstance(dataset_instance, model.DatasetInstance)
attributes = [
"name",
"extension",
"info",
"blurb",
"peek",
"designation",
"visible",
"metadata",
"tool_version",
"validated_state",
"validated_state_message",
]
for attribute in attributes:
if attribute in dataset_attrs:
value = dataset_attrs[attribute]
if attribute == "metadata":
value = replace_metadata_file(value, dataset_instance, self.sa_session)
setattr(dataset_instance, attribute, value)
handle_dataset_object_edit(dataset_instance, dataset_attrs)
else:
metadata_deferred = dataset_attrs.get("metadata_deferred", False)
metadata = dataset_attrs.get("metadata")
if metadata is None and not metadata_deferred:
raise MalformedContents("metadata_deferred must be true if no metadata found in dataset attributes")
if metadata is None:
metadata = {"dbkey": "?"}
model_class = dataset_attrs.get("model_class", "HistoryDatasetAssociation")
if model_class == "HistoryDatasetAssociation":
# Check if this HDA should reuse a dataset from a copied-from HDA
reuse_dataset = None
copied_from_chain = dataset_attrs.get("copied_from_history_dataset_association_id_chain", [])
if copied_from_chain:
# Look for the source HDA in the current import set
copied_from_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdas_by_key)
if copied_from_key and copied_from_key in object_import_tracker.hdas_by_key:
source_hda = object_import_tracker.hdas_by_key[copied_from_key]
# Reuse the dataset from the source HDA
reuse_dataset = source_hda.dataset
# Create dataset and HDA.
dataset_instance = model.HistoryDatasetAssociation(
name=dataset_attrs["name"],
extension=dataset_attrs["extension"],
info=dataset_attrs["info"],
blurb=dataset_attrs["blurb"],
peek=dataset_attrs["peek"],
designation=dataset_attrs["designation"],
visible=dataset_attrs["visible"],
deleted=dataset_attrs.get("deleted", False),
dbkey=metadata["dbkey"],
tool_version=metadata.get("tool_version"),
metadata_deferred=metadata_deferred,
history=history,
create_dataset=reuse_dataset is None,
dataset=reuse_dataset,
flush=False,
sa_session=self.sa_session,
)
dataset_instance._metadata = metadata
elif model_class == "LibraryDatasetDatasetAssociation":
# Create dataset and LDDA.
dataset_instance = model.LibraryDatasetDatasetAssociation(
name=dataset_attrs["name"],
extension=dataset_attrs["extension"],
info=dataset_attrs["info"],
blurb=dataset_attrs["blurb"],
peek=dataset_attrs["peek"],
designation=dataset_attrs["designation"],
visible=dataset_attrs["visible"],
deleted=dataset_attrs.get("deleted", False),
dbkey=metadata["dbkey"],
tool_version=metadata.get("tool_version"),
metadata_deferred=metadata_deferred,
user=self.user,
create_dataset=True,
flush=False,
sa_session=self.sa_session,
)
else:
raise Exception("Unknown dataset instance type encountered")
metadata = replace_metadata_file(metadata, dataset_instance, self.sa_session)
if self.sessionless:
dataset_instance._metadata_collection = MetadataCollection(
dataset_instance, session=self.sa_session
)
dataset_instance._metadata = metadata
else:
dataset_instance.metadata = metadata
self._attach_raw_id_if_editing(dataset_instance, dataset_attrs)
# Older style...
if self.import_options.allow_edit:
if "uuid" in dataset_attrs:
dataset_instance.dataset.uuid = dataset_attrs["uuid"]
if "dataset_uuid" in dataset_attrs:
dataset_instance.dataset.uuid = dataset_attrs["dataset_uuid"]
self._session_add(dataset_instance)
if model_class == "HistoryDatasetAssociation":
hda = cast(model.HistoryDatasetAssociation, dataset_instance)
# don't use add_history to manage HID handling across full import to try to preserve
# HID structure.
hda.history = history
if new_history and self.trust_hid(dataset_attrs):
hda.hid = dataset_attrs["hid"]
else:
object_import_tracker.requires_hid.append(hda)
else:
pass
# ldda = cast(model.LibraryDatasetDatasetAssociation, dataset_instance)
# ldda.user = self.user
file_source_root = self.file_source_root
# If dataset is in the dictionary - we will assert this dataset is tied to the Galaxy instance
# and the import options are configured for allowing editing the dataset (e.g. for metadata setting).
# Otherwise, we will check for "file" information instead of dataset information - currently this includes
# "file_name", "extra_files_path".
if "dataset" in dataset_attrs:
handle_dataset_object_edit(dataset_instance, dataset_attrs)
else:
file_name = dataset_attrs.get("file_name")
if file_name:
assert file_source_root
# Do security check and move/copy dataset data.
archive_path = os.path.abspath(os.path.join(file_source_root, file_name))
if os.path.islink(archive_path):
raise MalformedContents(f"Invalid dataset path: {archive_path}")
temp_dataset_file_name = os.path.realpath(archive_path)
if not in_directory(temp_dataset_file_name, file_source_root):
raise MalformedContents(f"Invalid dataset path: {temp_dataset_file_name}")
has_good_source = False
file_metadata = dataset_attrs.get("file_metadata") or {}
if "sources" in file_metadata:
for source_attrs in file_metadata["sources"]:
extra_files_path = source_attrs["extra_files_path"]
if extra_files_path is None:
has_good_source = True
discarded_data = self.import_options.discarded_data
dataset_state = dataset_attrs.get("state", dataset_instance.states.OK)
if dataset_state == dataset_instance.states.DEFERRED:
dataset_instance.state = dataset_instance.states.DEFERRED
dataset_instance.deleted = False
if isinstance(dataset_instance, model.HistoryDatasetAssociation):
dataset_instance.purged = False
assert dataset_instance.dataset
dataset_instance.dataset.deleted = False
dataset_instance.dataset.purged = False
elif (
not file_name
or not os.path.exists(temp_dataset_file_name)
or discarded_data is ImportDiscardedDataType.FORCE
):
is_discarded = not has_good_source
target_state = (
dataset_instance.states.DISCARDED if is_discarded else dataset_instance.states.DEFERRED
)
dataset_instance.state = target_state
deleted = is_discarded and (discarded_data == ImportDiscardedDataType.FORBID)
dataset_instance.deleted = deleted
if isinstance(dataset_instance, model.HistoryDatasetAssociation):
dataset_instance.purged = deleted
assert dataset_instance.dataset
dataset_instance.dataset.state = target_state
dataset_instance.dataset.deleted = deleted
dataset_instance.dataset.purged = deleted
else:
dataset_instance.state = dataset_state
if not self.object_store:
raise Exception(f"self.object_store is missing from {self}.")
if not dataset_instance.dataset.purged:
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)
# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Only trust file size if the dataset is purged. If we keep the data we should check the file size.
dataset_instance.dataset.file_size = None
dataset_instance.dataset.set_total_size() # update the filesize record in the database
if dataset_instance.deleted:
dataset_instance.dataset.deleted = True
self._attach_dataset_hashes(file_metadata, dataset_instance)
self._attach_dataset_sources(file_metadata, dataset_instance)
if "created_from_basename" in file_metadata:
dataset_instance.dataset.created_from_basename = file_metadata["created_from_basename"]
if model_class == "HistoryDatasetAssociation" and self.user:
add_item_annotation(self.sa_session, self.user, dataset_instance, dataset_attrs["annotation"])
tag_list = dataset_attrs.get("tags")
if tag_list:
if not self.tag_handler:
raise Exception(f"Missing self.tag_handler on {self}.")
self.tag_handler.set_tags_from_list(
user=self.user, item=dataset_instance, new_tags_list=tag_list, flush=False
)
if self.app:
# If dataset instance is discarded or deferred, don't attempt to regenerate
# metadata for it.
if dataset_instance.state == dataset_instance.states.OK:
regenerate_kwds: dict[str, Any] = {}
if job:
regenerate_kwds["user"] = job.user
regenerate_kwds["session_id"] = job.session_id
elif history:
user = history.user
regenerate_kwds["user"] = user
if user is None and history.galaxy_sessions[0]:
regenerate_kwds["session_id"] = history.galaxy_sessions[0].galaxy_session.id
else:
regenerate_kwds["session_id"] = None
else:
# Need a user to run library jobs to generate metadata...
pass
if not self.import_options.allow_edit:
# external import, metadata files need to be regenerated (as opposed to extended metadata dataset import)
if self.app.datatypes_registry.set_external_metadata_tool:
self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed(
dataset_instance, history, **regenerate_kwds
)
else:
# Try to set metadata directly. @mvdbeek thinks we should only record the datasets
try:
if dataset_instance.has_metadata_files:
dataset_instance.datatype.set_meta(dataset_instance)
except Exception:
log.debug(f"Metadata setting failed on {dataset_instance}", exc_info=True)
dataset_instance.state = dataset_instance.dataset.states.FAILED_METADATA
if model_class == "HistoryDatasetAssociation":
if not isinstance(dataset_instance, model.HistoryDatasetAssociation):
raise Exception(
"Mismatch between model class and Python class, "
f"expected HistoryDatasetAssociation, got a {type(dataset_instance)}: {dataset_instance}"
)
if object_key in dataset_attrs:
object_import_tracker.hdas_by_key[dataset_attrs[object_key]] = dataset_instance
else:
assert "id" in dataset_attrs
object_import_tracker.hdas_by_id[dataset_attrs["id"]] = dataset_instance
else:
if not isinstance(dataset_instance, model.LibraryDatasetDatasetAssociation):
raise Exception(
"Mismatch between model class and Python class, "
f"expected LibraryDatasetDatasetAssociation, got a {type(dataset_instance)}: {dataset_instance}"
)
if object_key in dataset_attrs:
object_import_tracker.lddas_by_key[dataset_attrs[object_key]] = dataset_instance
else:
assert "id" in dataset_attrs
object_import_tracker.lddas_by_key[dataset_attrs["id"]] = dataset_instance
def _import_libraries(self, object_import_tracker: "ObjectImportTracker") -> None:
object_key = self.object_key
def import_folder(folder_attrs, root_folder=None):
if root_folder:
library_folder = root_folder
else:
name = folder_attrs["name"]
description = folder_attrs["description"]
genome_build = folder_attrs["genome_build"]
deleted = folder_attrs["deleted"]
library_folder = model.LibraryFolder(name=name, description=description, genome_build=genome_build)
library_folder.deleted = deleted
self._session_add(library_folder)
for sub_folder_attrs in folder_attrs.get("folders", []):
sub_folder = import_folder(sub_folder_attrs)
library_folder.add_folder(sub_folder)
for ld_attrs in folder_attrs.get("datasets", []):
ld = model.LibraryDataset(
folder=library_folder, name=ld_attrs["name"], info=ld_attrs["info"], order_id=ld_attrs["order_id"]
)
if "ldda" in ld_attrs:
ldda = object_import_tracker.lddas_by_key[ld_attrs["ldda"][object_key]]
ld.library_dataset_dataset_association = ldda
self._session_add(ld)
self.sa_session.commit()
return library_folder
libraries_attrs = self.library_properties()
for library_attrs in libraries_attrs:
if (
library_attrs["model_class"] == "LibraryFolder"
and library_attrs.get("id")
and not self.sessionless
and self.import_options.allow_edit
):
library_folder = self.sa_session.get(model.LibraryFolder, library_attrs["id"])
import_folder(library_attrs, root_folder=library_folder)
else:
assert self.import_options.allow_library_creation
name = library_attrs["name"]
description = library_attrs["description"]
synopsis = library_attrs["synopsis"]
library = model.Library(name=name, description=description, synopsis=synopsis)
self._session_add(library)
object_import_tracker.libraries_by_key[library_attrs[object_key]] = library
if "root_folder" in library_attrs:
library.root_folder = import_folder(library_attrs["root_folder"])
def _import_collection_instances(
self,
object_import_tracker: "ObjectImportTracker",
collections_attrs: list[dict[str, Any]],
history: Optional[model.History],
new_history: bool,
) -> None:
object_key = self.object_key
def import_collection(collection_attrs):
def materialize_elements(dc):
if "elements" not in collection_attrs:
return
elements_attrs = collection_attrs["elements"]
for element_attrs in elements_attrs:
dce = model.DatasetCollectionElement(
collection=dc,
element=model.DatasetCollectionElement.UNINITIALIZED_ELEMENT,
element_index=element_attrs["element_index"],
element_identifier=element_attrs["element_identifier"],
columns=element_attrs.get("columns"),
)
if "encoded_id" in element_attrs:
object_import_tracker.dces_by_key[element_attrs["encoded_id"]] = dce
if "hda" in element_attrs:
hda_attrs = element_attrs["hda"]
if object_key in hda_attrs:
hda_key = hda_attrs[object_key]
hdas_by_key = object_import_tracker.hdas_by_key
if hda_key in hdas_by_key:
hda = hdas_by_key[hda_key]
else:
raise KeyError(
f"Failed to find exported hda with key [{hda_key}] of type [{object_key}] in [{hdas_by_key}]"
)
else:
hda_id = hda_attrs["id"]
hdas_by_id = object_import_tracker.hdas_by_id
if hda_id not in hdas_by_id:
raise Exception(f"Failed to find HDA with id [{hda_id}] in [{hdas_by_id}]")
hda = hdas_by_id[hda_id]
dce.hda = hda
elif "child_collection" in element_attrs:
dce.child_collection = import_collection(element_attrs["child_collection"])
else:
raise Exception("Unknown collection element type encountered.")
dc.element_count = len(elements_attrs)
if "id" in collection_attrs and self.import_options.allow_edit and not self.sessionless:
dc = self.sa_session.get(model.DatasetCollection, collection_attrs["id"])
attributes = [
"collection_type",
"populated_state",
"populated_state_message",
"column_definitions",
"element_count",
]
for attribute in attributes:
if attribute in collection_attrs:
setattr(dc, attribute, collection_attrs.get(attribute))
materialize_elements(dc)
else:
# create collection
dc = model.DatasetCollection(collection_type=collection_attrs["type"])
dc.populated_state = collection_attrs["populated_state"]
dc.populated_state_message = collection_attrs.get("populated_state_message")
dc.column_definitions = collection_attrs.get("column_definitions")
self._attach_raw_id_if_editing(dc, collection_attrs)
materialize_elements(dc)
self._session_add(dc)
return dc
history_sa_session = get_object_session(history)
for collection_attrs in collections_attrs:
if "collection" in collection_attrs:
dc = import_collection(collection_attrs["collection"])
if "id" in collection_attrs and self.import_options.allow_edit and not self.sessionless:
hdca = self.sa_session.get(model.HistoryDatasetCollectionAssociation, collection_attrs["id"])
assert hdca is not None
# TODO: edit attributes...
else:
hdca = model.HistoryDatasetCollectionAssociation(
collection=dc,
visible=True,
name=collection_attrs["display_name"],
implicit_output_name=collection_attrs.get("implicit_output_name"),
)
self._attach_raw_id_if_editing(hdca, collection_attrs)
add_object_to_session(hdca, history_sa_session)
hdca.history = history
if new_history and self.trust_hid(collection_attrs):
hdca.hid = collection_attrs["hid"]
else:
object_import_tracker.requires_hid.append(hdca)
self._session_add(hdca)
if object_key in collection_attrs:
object_import_tracker.hdcas_by_key[collection_attrs[object_key]] = hdca
else:
assert "id" in collection_attrs
object_import_tracker.hdcas_by_id[collection_attrs["id"]] = hdca
else:
import_collection(collection_attrs)
def _attach_raw_id_if_editing(
self,
obj: model.RepresentById,
attrs: dict[str, Any],
) -> None:
if self.sessionless and "id" in attrs and self.import_options.allow_edit:
obj.id = attrs["id"]
def _import_collection_implicit_input_associations(
self, object_import_tracker: "ObjectImportTracker", collections_attrs: list[dict[str, Any]]
) -> None:
object_key = self.object_key
for collection_attrs in collections_attrs:
if "id" in collection_attrs:
# Existing object, not a new one, this property is immutable via model stores currently.
continue
hdca = object_import_tracker.hdcas_by_key[collection_attrs[object_key]]
if "implicit_input_collections" in collection_attrs:
implicit_input_collections = collection_attrs["implicit_input_collections"]
for implicit_input_collection in implicit_input_collections:
name = implicit_input_collection["name"]
input_collection_identifier = implicit_input_collection["input_dataset_collection"]
if input_collection_identifier in object_import_tracker.hdcas_by_key:
input_dataset_collection = object_import_tracker.hdcas_by_key[input_collection_identifier]
hdca.add_implicit_input_collection(name, input_dataset_collection)
def _import_dataset_copied_associations(
self, object_import_tracker: "ObjectImportTracker", datasets_attrs: list[dict[str, Any]]
) -> None:
object_key = self.object_key
# Re-establish copied_from_history_dataset_association relationships so history extraction
# has a greater chance of working in this history, for reproducibility.
for dataset_attrs in datasets_attrs:
if "id" in dataset_attrs:
# Existing object, not a new one, this property is not immutable via model stores currently.
continue
dataset_key = dataset_attrs[object_key]
if dataset_key not in object_import_tracker.hdas_by_key:
continue
hda = object_import_tracker.hdas_by_key[dataset_key]
copied_from_chain = dataset_attrs.get("copied_from_history_dataset_association_id_chain", [])
copied_from_object_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdas_by_key)
if not copied_from_object_key:
continue
# Re-establish the chain if we can.
if copied_from_object_key in object_import_tracker.hdas_by_key:
hda.copied_from_history_dataset_association = object_import_tracker.hdas_by_key[copied_from_object_key]
else:
# We're at the end of the chain and this HDA was copied from an HDA
# outside the history. So when we find this job and are looking for inputs/outputs
# attach to this node... unless we've already encountered another dataset
# copied from that jobs output... in that case we are going to cheat and
# say this dataset was copied from that one. It wasn't in the original Galaxy
# instance but I think it is fine to pretend in order to create a DAG here.
hda_copied_from_sinks = object_import_tracker.hda_copied_from_sinks
if copied_from_object_key in hda_copied_from_sinks:
hda.copied_from_history_dataset_association = object_import_tracker.hdas_by_key[
hda_copied_from_sinks[copied_from_object_key]
]
else:
hda_copied_from_sinks[copied_from_object_key] = dataset_key
def _import_collection_copied_associations(
self, object_import_tracker: "ObjectImportTracker", collections_attrs: list[dict[str, Any]]
) -> None:
object_key = self.object_key
# Re-establish copied_from_history_dataset_collection_association relationships so history extraction
# has a greater chance of working in this history, for reproducibility. Very similar to HDA code above
# see comments there.
for collection_attrs in collections_attrs:
if "id" in collection_attrs:
# Existing object, not a new one, this property is immutable via model stores currently.
continue
dataset_collection_key = collection_attrs[object_key]
if dataset_collection_key not in object_import_tracker.hdcas_by_key:
continue
hdca = object_import_tracker.hdcas_by_key[dataset_collection_key]
copied_from_chain = collection_attrs.get("copied_from_history_dataset_collection_association_id_chain", [])
copied_from_object_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdcas_by_key)
if not copied_from_object_key:
continue
# Re-establish the chain if we can, again see comments for hdas above for this to make more
# sense.
hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks
if copied_from_object_key in object_import_tracker.hdcas_by_key:
source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key]
if source_hdca is not hdca:
# We may not have the copied source, in which case the first included HDCA in the chain
# acts as the source, so here we make sure we don't create a cycle.
hdca.copied_from_history_dataset_collection_association = source_hdca
else:
if copied_from_object_key in hdca_copied_from_sinks:
source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]]
if source_hdca is not hdca:
hdca.copied_from_history_dataset_collection_association = source_hdca
else:
hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key
def _reassign_hids(self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History]) -> None:
# assign HIDs for newly created objects that didn't match original history
requires_hid = object_import_tracker.requires_hid
requires_hid_len = len(requires_hid)
if requires_hid_len > 0 and not self.sessionless:
if not history:
raise Exception("Optional history is required here.")
for obj in requires_hid:
history.stage_addition(obj)
history.add_pending_items()
if object_import_tracker.copy_hid_for:
# in an if to avoid flush if unneeded
for from_dataset, to_dataset in object_import_tracker.copy_hid_for:
to_dataset.hid = from_dataset.hid
self._session_add(to_dataset)
self._flush()
def _import_workflow_invocations(
self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History]
) -> None:
#
# Create jobs.
#
object_key = self.object_key
for workflow_key, workflow_path in self.workflow_paths():
workflows_directory = os.path.join(self.archive_dir, "workflows")
if not self.app:
raise Exception(f"Missing require self.app in {self}.")
workflow = self.app.workflow_contents_manager.read_workflow_from_path(
self.app, self.user, workflow_path, allow_in_directory=workflows_directory
)
object_import_tracker.workflows_by_key[workflow_key] = workflow
invocations_attrs = self.invocations_properties()
for invocation_attrs in invocations_attrs:
assert not self.import_options.allow_edit
imported_invocation = model.WorkflowInvocation()
imported_invocation.history = history
ensure_object_added_to_session(imported_invocation, object_in_session=history)
workflow_key = invocation_attrs["workflow"]
if workflow_key not in object_import_tracker.workflows_by_key:
raise Exception(f"Failed to find key {workflow_key} in {object_import_tracker.workflows_by_key.keys()}")
workflow = object_import_tracker.workflows_by_key[workflow_key]
imported_invocation.workflow = workflow
state = invocation_attrs["state"]
if state in model.WorkflowInvocation.non_terminal_states:
state = model.WorkflowInvocation.states.CANCELLED
imported_invocation.state = state
restore_times(imported_invocation, invocation_attrs)
self._session_add(imported_invocation)
self._flush()
def attach_workflow_step(imported_object, attrs):
order_index = attrs["order_index"]
imported_object.workflow_step = workflow.step_by_index(order_index) # noqa: B023
for step_attrs in invocation_attrs["steps"]:
imported_invocation_step = model.WorkflowInvocationStep()
imported_invocation_step.workflow_invocation = imported_invocation
ensure_object_added_to_session(imported_invocation_step, session=self.sa_session)
attach_workflow_step(imported_invocation_step, step_attrs)
restore_times(imported_invocation_step, step_attrs)
imported_invocation_step.action = step_attrs["action"]
# TODO: ensure terminal...
imported_invocation_step.state = step_attrs["state"]
if "job" in step_attrs:
job = object_import_tracker.jobs_by_key[step_attrs["job"][object_key]]
imported_invocation_step.job = job
ensure_object_added_to_session(imported_invocation_step, object_in_session=job)
elif "implicit_collection_jobs" in step_attrs:
icj = object_import_tracker.implicit_collection_jobs_by_key[
step_attrs["implicit_collection_jobs"][object_key]
]
imported_invocation_step.implicit_collection_jobs = icj
# TODO: handle step outputs...
output_dicts = step_attrs["outputs"]
step_outputs = []
for output_dict in output_dicts:
step_output = model.WorkflowInvocationStepOutputDatasetAssociation()
step_output.output_name = output_dict["output_name"]
dataset_link_attrs = output_dict["dataset"]
if dataset_link_attrs:
dataset = object_import_tracker.find_hda(dataset_link_attrs[object_key])
assert dataset
step_output.dataset = dataset
step_outputs.append(step_output)
imported_invocation_step.output_datasets = step_outputs
output_collection_dicts = step_attrs["output_collections"]
step_output_collections = []
for output_collection_dict in output_collection_dicts:
step_output_collection = model.WorkflowInvocationStepOutputDatasetCollectionAssociation()
step_output_collection.output_name = output_collection_dict["output_name"]
dataset_collection_link_attrs = output_collection_dict["dataset_collection"]
if dataset_collection_link_attrs:
dataset_collection = object_import_tracker.find_hdca(dataset_collection_link_attrs[object_key])
assert dataset_collection
step_output_collection.dataset_collection = dataset_collection
step_output_collections.append(step_output_collection)
imported_invocation_step.output_dataset_collections = step_output_collections
input_parameters = []
for input_parameter_attrs in invocation_attrs["input_parameters"]:
input_parameter = model.WorkflowRequestInputParameter()
input_parameter.value = input_parameter_attrs["value"]
input_parameter.name = input_parameter_attrs["name"]
input_parameter.type = input_parameter_attrs["type"]
input_parameter.workflow_invocation = imported_invocation
self._session_add(input_parameter)
input_parameters.append(input_parameter)
# invocation_attrs["input_parameters"] = input_parameters
step_states = []
for step_state_attrs in invocation_attrs["step_states"]:
step_state = model.WorkflowRequestStepState()
step_state.value = step_state_attrs["value"]
attach_workflow_step(step_state, step_state_attrs)
step_state.workflow_invocation = imported_invocation
self._session_add(step_state)
step_states.append(step_state)
input_step_parameters = []
for input_step_parameter_attrs in invocation_attrs["input_step_parameters"]:
input_step_parameter = model.WorkflowRequestInputStepParameter()
input_step_parameter.parameter_value = input_step_parameter_attrs["parameter_value"]
attach_workflow_step(input_step_parameter, input_step_parameter_attrs)
input_step_parameter.workflow_invocation = imported_invocation
self._session_add(input_step_parameter)
input_step_parameters.append(input_step_parameter)
input_datasets = []
for input_dataset_attrs in invocation_attrs["input_datasets"]:
input_dataset = model.WorkflowRequestToInputDatasetAssociation()
attach_workflow_step(input_dataset, input_dataset_attrs)
input_dataset.workflow_invocation = imported_invocation
input_dataset.name = input_dataset_attrs["name"]
dataset_link_attrs = input_dataset_attrs["dataset"]
if dataset_link_attrs:
dataset = object_import_tracker.find_hda(dataset_link_attrs[object_key])
assert dataset
input_dataset.dataset = dataset
self._session_add(input_dataset)
input_datasets.append(input_dataset)
input_dataset_collections = []
for input_dataset_collection_attrs in invocation_attrs["input_dataset_collections"]:
input_dataset_collection = model.WorkflowRequestToInputDatasetCollectionAssociation()
attach_workflow_step(input_dataset_collection, input_dataset_collection_attrs)
input_dataset_collection.workflow_invocation = imported_invocation
input_dataset_collection.name = input_dataset_collection_attrs["name"]
dataset_collection_link_attrs = input_dataset_collection_attrs["dataset_collection"]
if dataset_collection_link_attrs:
dataset_collection = object_import_tracker.find_hdca(dataset_collection_link_attrs[object_key])
assert dataset_collection
input_dataset_collection.dataset_collection = dataset_collection
self._session_add(input_dataset_collection)
input_dataset_collections.append(input_dataset_collection)
output_dataset_collections = []
for output_dataset_collection_attrs in invocation_attrs["output_dataset_collections"]:
output_dataset_collection = model.WorkflowInvocationOutputDatasetCollectionAssociation()
output_dataset_collection.workflow_invocation = imported_invocation
attach_workflow_step(output_dataset_collection, output_dataset_collection_attrs)
workflow_output = output_dataset_collection_attrs["workflow_output"]
label = workflow_output.get("label")
workflow_output = workflow.workflow_output_for(label)
output_dataset_collection.workflow_output = workflow_output
output_dataset_collection_attrs = output_dataset_collection_attrs.get("dataset_collection") or {}
if hdca_id := output_dataset_collection_attrs.get(object_key):
hdca = object_import_tracker.find_hdca(hdca_id)
if hdca:
output_dataset_collection.dataset_collection = hdca
self._session_add(output_dataset_collection)
output_dataset_collections.append(output_dataset_collection)
output_datasets = []
for output_dataset_attrs in invocation_attrs["output_datasets"]:
output_dataset = model.WorkflowInvocationOutputDatasetAssociation()
output_dataset.workflow_invocation = imported_invocation
attach_workflow_step(output_dataset, output_dataset_attrs)
workflow_output = output_dataset_attrs["workflow_output"]
label = workflow_output.get("label")
workflow_output = workflow.workflow_output_for(label)
output_dataset.workflow_output = workflow_output
output_dataset_dataset_attrs = output_dataset_attrs.get("dataset") or {}
if dataset_id := output_dataset_dataset_attrs.get(object_key):
dataset = object_import_tracker.find_hda(dataset_id)
if dataset:
output_dataset.dataset = dataset
self._session_add(output_dataset)
output_datasets.append(output_dataset)
output_values = []
for output_value_attrs in invocation_attrs["output_values"]:
output_value = model.WorkflowInvocationOutputValue()
output_value.workflow_invocation = imported_invocation
output_value.value = output_value_attrs["value"]
attach_workflow_step(output_value, output_value_attrs)
workflow_output = output_value_attrs["workflow_output"]
label = workflow_output.get("label")
workflow_output = workflow.workflow_output_for(label)
output_value.workflow_output = workflow_output
self._session_add(output_value)
output_values.append(output_value)
if object_key in invocation_attrs:
object_import_tracker.invocations_by_key[invocation_attrs[object_key]] = imported_invocation
def _import_jobs(self, object_import_tracker: "ObjectImportTracker", history: Optional[model.History]) -> None:
self._flush()
object_key = self.object_key
_find_hda = object_import_tracker.find_hda
_find_hdca = object_import_tracker.find_hdca
_find_dce = object_import_tracker.find_dce
#
# Create jobs.
#
jobs_attrs = self.jobs_properties()
# Create each job.
history_sa_session = get_object_session(history)
for job_attrs in jobs_attrs:
if "id" in job_attrs and not self.sessionless:
# only thing we allow editing currently is associations for incoming jobs.
assert self.import_options.allow_edit
job = self.sa_session.get(model.Job, job_attrs["id"])
self._connect_job_io(job, job_attrs, _find_hda, _find_hdca, _find_dce) # type: ignore[attr-defined]
self._set_job_attributes(job, job_attrs, force_terminal=False) # type: ignore[attr-defined]
# Don't edit job
continue
imported_job = model.Job()
imported_job.id = cast(int, job_attrs.get("id"))
imported_job.user = self.user
add_object_to_session(imported_job, history_sa_session)
imported_job.history = history
imported_job.imported = True
imported_job.tool_id = job_attrs["tool_id"]
imported_job.tool_version = job_attrs["tool_version"]
self._set_job_attributes(imported_job, job_attrs, force_terminal=True) # type: ignore[attr-defined]
restore_times(imported_job, job_attrs)
self._session_add(imported_job)
# Connect jobs to input and output datasets.
params = self._normalize_job_parameters(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) # type: ignore[attr-defined]
for name, value in params.items():
# Transform parameter values when necessary.
imported_job.add_parameter(name, dumps(value))
self._connect_job_io(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) # type: ignore[attr-defined]
if object_key in job_attrs:
object_import_tracker.jobs_by_key[job_attrs[object_key]] = imported_job
def _import_implicit_dataset_conversions(self, object_import_tracker: "ObjectImportTracker") -> None:
implicit_dataset_conversion_attrs = self.implicit_dataset_conversion_properties()
for idc_attrs in implicit_dataset_conversion_attrs:
# I don't know what metadata_safe does per se... should we copy this property or
# just set it to False?
metadata_safe = False
idc = model.ImplicitlyConvertedDatasetAssociation(metadata_safe=metadata_safe, for_import=True)
idc.type = idc_attrs["file_type"]
# We may not have exported the parent, so only set the parent_hda attribute if we did.
if (parent_hda_id := idc_attrs.get("parent_hda")) and (
parent_hda := object_import_tracker.hdas_by_key.get(parent_hda_id)
):
# exports created prior to 24.2 may not have a parent if the parent had been purged
idc.parent_hda = parent_hda
if idc_attrs.get("hda"):
idc.dataset = object_import_tracker.hdas_by_key[idc_attrs["hda"]]
# we have the dataset and the parent, lets ensure they land up with the same HID
if idc.dataset and idc.parent_hda:
try:
object_import_tracker.requires_hid.remove(idc.dataset)
except ValueError:
pass # we wanted to remove it anyway.
# A HDA can be the parent of multiple implicitly converted dataset,
# that's thy we use [(source, target)] here
object_import_tracker.copy_hid_for.append((idc.parent_hda, idc.dataset))
self._session_add(idc)
def _import_implicit_collection_jobs(self, object_import_tracker: "ObjectImportTracker") -> None:
object_key = self.object_key
implicit_collection_jobs_attrs = self.implicit_collection_jobs_properties()
for icj_attrs in implicit_collection_jobs_attrs:
icj = model.ImplicitCollectionJobs()
icj.populated_state = icj_attrs["populated_state"]
icj.jobs = []
for order_index, job in enumerate(icj_attrs["jobs"]):
icja = model.ImplicitCollectionJobsJobAssociation()
add_object_to_object_session(icja, icj)
icja.implicit_collection_jobs = icj
if job in object_import_tracker.jobs_by_key:
job_instance = object_import_tracker.jobs_by_key[job]
add_object_to_object_session(icja, job_instance)
icja.job = job_instance
icja.order_index = order_index
icj.jobs.append(icja)
self._session_add(icja)
object_import_tracker.implicit_collection_jobs_by_key[icj_attrs[object_key]] = icj
self._session_add(icj)
def _session_add(self, obj: model.RepresentById) -> None:
self.sa_session.add(obj)
def _flush(self) -> None:
self.sa_session.commit()
def _copied_from_object_key(
copied_from_chain: list[ObjectKeyType],
objects_by_key: Union[
dict[ObjectKeyType, model.HistoryDatasetAssociation],
dict[ObjectKeyType, model.HistoryDatasetCollectionAssociation],
],
) -> Optional[ObjectKeyType]:
if len(copied_from_chain) == 0:
return None
# Okay this gets fun, we need the last thing in the chain to reconnect jobs
# from outside the history to inputs/outputs in this history but there may
# be cycles in the chain that lead outside the original history, so just eliminate
# all IDs not from this history except the last one.
filtered_copied_from_chain = []
for i, copied_from_key in enumerate(copied_from_chain):
filter_id = (i != len(copied_from_chain) - 1) and (copied_from_key not in objects_by_key)
if not filter_id:
filtered_copied_from_chain.append(copied_from_key)
copied_from_chain = filtered_copied_from_chain
if len(copied_from_chain) == 0:
return None
copied_from_object_key = copied_from_chain[0]
return copied_from_object_key
[docs]
class ObjectImportTracker:
"""Keep track of new and existing imported objects.
Needed to re-establish connections and such in multiple passes.
"""
libraries_by_key: dict[ObjectKeyType, model.Library]
hdas_by_key: dict[ObjectKeyType, model.HistoryDatasetAssociation]
hdas_by_id: dict[int, model.HistoryDatasetAssociation]
hdcas_by_key: dict[ObjectKeyType, model.HistoryDatasetCollectionAssociation]
hdcas_by_id: dict[int, model.HistoryDatasetCollectionAssociation]
dces_by_key: dict[ObjectKeyType, model.DatasetCollectionElement]
dces_by_id: dict[int, model.DatasetCollectionElement]
lddas_by_key: dict[ObjectKeyType, model.LibraryDatasetDatasetAssociation]
hda_copied_from_sinks: dict[ObjectKeyType, ObjectKeyType]
hdca_copied_from_sinks: dict[ObjectKeyType, ObjectKeyType]
jobs_by_key: dict[ObjectKeyType, model.Job]
requires_hid: list["HistoryItem"]
copy_hid_for: list[tuple["HistoryItem", "HistoryItem"]]
[docs]
def __init__(self) -> None:
self.libraries_by_key = {}
self.hdas_by_key = {}
self.hdas_by_id = {}
self.hdcas_by_key = {}
self.hdcas_by_id = {}
self.dces_by_key = {}
self.dces_by_id = {}
self.lddas_by_key = {}
self.hda_copied_from_sinks = {}
self.hdca_copied_from_sinks = {}
self.jobs_by_key = {}
self.invocations_by_key: dict[str, model.WorkflowInvocation] = {}
self.implicit_collection_jobs_by_key: dict[str, ImplicitCollectionJobs] = {}
self.workflows_by_key: dict[str, model.Workflow] = {}
self.requires_hid = []
self.copy_hid_for = []
self.new_history: Optional[model.History] = None
[docs]
def find_hda(
self, input_key: ObjectKeyType, hda_id: Optional[int] = None
) -> Optional[model.HistoryDatasetAssociation]:
hda = None
if input_key in self.hdas_by_key:
hda = self.hdas_by_key[input_key]
elif isinstance(input_key, int) and input_key in self.hdas_by_id:
# TODO: untangle this, I don't quite understand why we hdas_by_key and hdas_by_id
hda = self.hdas_by_id[input_key]
if input_key in self.hda_copied_from_sinks:
hda = self.hdas_by_key[self.hda_copied_from_sinks[input_key]]
return hda
[docs]
def find_hdca(self, input_key: ObjectKeyType) -> Optional[model.HistoryDatasetCollectionAssociation]:
hdca = None
if input_key in self.hdcas_by_key:
hdca = self.hdcas_by_key[input_key]
elif isinstance(input_key, int) and input_key in self.hdcas_by_id:
hdca = self.hdcas_by_id[input_key]
if input_key in self.hdca_copied_from_sinks:
hdca = self.hdcas_by_key[self.hdca_copied_from_sinks[input_key]]
return hdca
[docs]
def find_dce(self, input_key: ObjectKeyType) -> Optional[model.DatasetCollectionElement]:
dce = None
if input_key in self.dces_by_key:
dce = self.dces_by_key[input_key]
elif isinstance(input_key, int) and input_key in self.dces_by_id:
dce = self.dces_by_id[input_key]
return dce
[docs]
class FileTracebackException(Exception):
[docs]
def __init__(self, traceback: str, *args, **kwargs) -> None:
self.traceback = traceback
[docs]
def get_import_model_store_for_directory(
archive_dir: str, **kwd
) -> Union["DirectoryImportModelStore1901", "DirectoryImportModelStoreLatest"]:
traceback_file = os.path.join(archive_dir, TRACEBACK)
if not os.path.isdir(archive_dir):
raise Exception(
f"Could not find import model store for directory [{archive_dir}] (full path [{os.path.abspath(archive_dir)}])"
)
if os.path.exists(os.path.join(archive_dir, ATTRS_FILENAME_EXPORT)):
if os.path.exists(traceback_file):
with open(traceback_file) as tb:
raise FileTracebackException(traceback=tb.read())
return DirectoryImportModelStoreLatest(archive_dir, **kwd)
else:
return DirectoryImportModelStore1901(archive_dir, **kwd)
[docs]
class DictImportModelStore(ModelImportStore):
object_key = "encoded_id"
[docs]
def __init__(
self,
store_as_dict: dict[str, Any],
**kwd,
) -> None:
self._store_as_dict = store_as_dict
super().__init__(**kwd)
self.archive_dir = ""
[docs]
def defines_new_history(self) -> bool:
return DICT_STORE_ATTRS_KEY_HISTORY in self._store_as_dict
[docs]
def new_history_properties(self) -> dict[str, Any]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_HISTORY) or {}
[docs]
def datasets_properties(
self,
) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_DATASETS) or []
[docs]
def collections_properties(self) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_COLLECTIONS) or []
[docs]
def implicit_dataset_conversion_properties(self) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_CONVERSIONS) or []
[docs]
def library_properties(
self,
) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_LIBRARIES) or []
[docs]
def jobs_properties(self) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_JOBS) or []
[docs]
def implicit_collection_jobs_properties(self) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_IMPLICIT_COLLECTION_JOBS) or []
[docs]
def invocations_properties(self) -> list[dict[str, Any]]:
return self._store_as_dict.get(DICT_STORE_ATTRS_KEY_INVOCATIONS) or []
[docs]
def workflow_paths(self) -> Iterator[tuple[str, str]]:
return
yield
[docs]
def get_import_model_store_for_dict(
as_dict: dict[str, Any],
**kwd,
) -> DictImportModelStore:
return DictImportModelStore(as_dict, **kwd)
[docs]
class BaseDirectoryImportModelStore(ModelImportStore):
@abc.abstractmethod
def _normalize_job_parameters(
self,
imported_job: model.Job,
job_attrs: dict[str, Any],
_find_hda: Callable,
_find_hdca: Callable,
_find_dce: Callable,
) -> dict[str, Any]: ...
@abc.abstractmethod
def _connect_job_io(
self,
imported_job: model.Job,
job_attrs: dict[str, Any],
_find_hda: Callable,
_find_hdca: Callable,
_find_dce: Callable,
) -> None: ...
@property
def file_source_root(self) -> str:
return self.archive_dir
[docs]
def defines_new_history(self) -> bool:
new_history_attributes = os.path.join(self.archive_dir, ATTRS_FILENAME_HISTORY)
return os.path.exists(new_history_attributes)
[docs]
def new_history_properties(self) -> dict[str, Any]:
new_history_attributes = os.path.join(self.archive_dir, ATTRS_FILENAME_HISTORY)
history_properties = load(open(new_history_attributes))
return history_properties
[docs]
def datasets_properties(self) -> list[dict[str, Any]]:
datasets_attrs_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_DATASETS)
datasets_attrs = load(open(datasets_attrs_file_name))
provenance_file_name = f"{datasets_attrs_file_name}.provenance"
if os.path.exists(provenance_file_name):
provenance_attrs = load(open(provenance_file_name))
datasets_attrs += provenance_attrs
return datasets_attrs
[docs]
def collections_properties(self) -> list[dict[str, Any]]:
return self._read_list_if_exists(ATTRS_FILENAME_COLLECTIONS)
[docs]
def implicit_dataset_conversion_properties(self) -> list[dict[str, Any]]:
return self._read_list_if_exists(ATTRS_FILENAME_CONVERSIONS)
[docs]
def library_properties(
self,
) -> list[dict[str, Any]]:
libraries_attrs = self._read_list_if_exists(ATTRS_FILENAME_LIBRARIES)
libraries_attrs.extend(self._read_list_if_exists(ATTRS_FILENAME_LIBRARY_FOLDERS))
return libraries_attrs
[docs]
def jobs_properties(
self,
) -> list[dict[str, Any]]:
return self._read_list_if_exists(ATTRS_FILENAME_JOBS)
[docs]
def implicit_collection_jobs_properties(self) -> list[dict[str, Any]]:
implicit_collection_jobs_attrs_file_name = os.path.join(
self.archive_dir, ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS
)
try:
return load(open(implicit_collection_jobs_attrs_file_name))
except FileNotFoundError:
return []
[docs]
def invocations_properties(
self,
) -> list[dict[str, Any]]:
return self._read_list_if_exists(ATTRS_FILENAME_INVOCATIONS)
[docs]
def workflow_paths(self) -> Iterator[tuple[str, str]]:
workflows_directory = os.path.join(self.archive_dir, "workflows")
if not os.path.exists(workflows_directory):
return
for name in os.listdir(workflows_directory):
if name.endswith(".ga") or name.endswith(".abstract.cwl") or name.endswith(".html"):
continue
assert name.endswith(".gxwf.yml")
workflow_key = name[0 : -len(".gxwf.yml")]
yield workflow_key, os.path.join(workflows_directory, name)
def _set_job_attributes(
self, imported_job: model.Job, job_attrs: dict[str, Any], force_terminal: bool = False
) -> None:
ATTRIBUTES = (
"info",
"exit_code",
"traceback",
"job_messages",
"tool_stdout",
"tool_stderr",
"job_stdout",
"job_stderr",
"galaxy_version",
)
for attribute in ATTRIBUTES:
value = job_attrs.get(attribute)
if value is not None:
setattr(imported_job, attribute, value)
if "stdout" in job_attrs:
imported_job.tool_stdout = job_attrs.get("stdout")
imported_job.tool_stderr = job_attrs.get("stderr")
raw_state = job_attrs.get("state")
if force_terminal and raw_state and raw_state not in model.Job.terminal_states:
raw_state = model.Job.states.ERROR
if raw_state:
imported_job.set_state(raw_state)
def _read_list_if_exists(self, file_name: str, required: bool = False) -> list[dict[str, Any]]:
file_name = os.path.join(self.archive_dir, file_name)
if os.path.exists(file_name):
attrs = load(open(file_name))
else:
if required:
raise Exception(f"Failed to find file [{file_name}] in model store archive")
attrs = []
return attrs
[docs]
def restore_times(
model_object: Union[model.Job, model.WorkflowInvocation, model.WorkflowInvocationStep], attrs: dict[str, Any]
) -> None:
try:
model_object.create_time = datetime.datetime.strptime(attrs["create_time"], "%Y-%m-%dT%H:%M:%S.%f")
except Exception:
pass
try:
model_object.update_time = datetime.datetime.strptime(attrs["update_time"], "%Y-%m-%dT%H:%M:%S.%f")
except Exception:
pass
[docs]
class DirectoryImportModelStore1901(BaseDirectoryImportModelStore):
object_key = "hid"
[docs]
def __init__(self, archive_dir: str, **kwd) -> None:
archive_dir = os.path.realpath(archive_dir)
# BioBlend previous to 17.01 exported histories with an extra subdir.
if not os.path.exists(os.path.join(archive_dir, ATTRS_FILENAME_HISTORY)):
for d in os.listdir(archive_dir):
if os.path.isdir(os.path.join(archive_dir, d)):
archive_dir = os.path.join(archive_dir, d)
break
self.archive_dir = archive_dir
super().__init__(**kwd)
def _connect_job_io(
self,
imported_job: model.Job,
job_attrs: dict[str, Any],
_find_hda: Callable,
_find_hdca: Callable,
_find_dce: Callable,
) -> None:
for output_key in job_attrs["output_datasets"]:
output_hda = _find_hda(output_key)
if output_hda:
if not self.dataset_state_serialized:
# dataset state has not been serialized, get state from job
output_hda.state = imported_job.state
imported_job.add_output_dataset(output_hda.name, output_hda)
if "input_mapping" in job_attrs:
for input_name, input_key in job_attrs["input_mapping"].items():
input_hda = _find_hda(input_key)
if input_hda:
imported_job.add_input_dataset(input_name, input_hda)
def _normalize_job_parameters(
self,
imported_job: model.Job,
job_attrs: dict[str, Any],
_find_hda: Callable,
_find_hdca: Callable,
_find_dce: Callable,
) -> dict[str, Any]:
def remap_objects(p, k, obj):
if isinstance(obj, dict) and obj.get("__HistoryDatasetAssociation__", False):
imported_hda = _find_hda(obj[self.object_key])
if imported_hda:
return (k, {"src": "hda", "id": imported_hda.id})
return (k, obj)
params = job_attrs["params"]
params = remap(params, remap_objects)
return params
[docs]
def trust_hid(self, obj_attrs: dict[str, Any]) -> bool:
# We didn't do object tracking so we pretty much have to trust the HID and accept
# that it will be wrong a lot.
return True
[docs]
class DirectoryImportModelStoreLatest(BaseDirectoryImportModelStore):
object_key = "encoded_id"
[docs]
def __init__(self, archive_dir: str, **kwd) -> None:
archive_dir = os.path.realpath(archive_dir)
self.archive_dir = archive_dir
super().__init__(**kwd)
def _connect_job_io(
self,
imported_job: model.Job,
job_attrs: dict[str, Any],
_find_hda: Callable,
_find_hdca: Callable,
_find_dce: Callable,
) -> None:
if imported_job.command_line is None:
imported_job.command_line = job_attrs.get("command_line")
if "input_dataset_mapping" in job_attrs:
for input_name, input_keys in job_attrs["input_dataset_mapping"].items():
input_keys = input_keys or []
for input_key in input_keys:
input_hda = _find_hda(input_key)
if input_hda:
imported_job.add_input_dataset(input_name, input_hda)
if "input_dataset_collection_mapping" in job_attrs:
for input_name, input_keys in job_attrs["input_dataset_collection_mapping"].items():
input_keys = input_keys or []
for input_key in input_keys:
input_hdca = _find_hdca(input_key)
if input_hdca:
imported_job.add_input_dataset_collection(input_name, input_hdca)
if "input_dataset_collection_element_mapping" in job_attrs:
for input_name, input_keys in job_attrs["input_dataset_collection_element_mapping"].items():
input_keys = input_keys or []
for input_key in input_keys:
input_dce = _find_dce(input_key)
if input_dce:
imported_job.add_input_dataset_collection_element(input_name, input_dce)
if "output_dataset_mapping" in job_attrs:
for output_name, output_keys in job_attrs["output_dataset_mapping"].items():
output_keys = output_keys or []
for output_key in output_keys:
output_hda = _find_hda(output_key)
if output_hda:
if not self.dataset_state_serialized:
# dataset state has not been serialized, get state from job
output_hda.state = imported_job.state
imported_job.add_output_dataset(output_name, output_hda)
if "output_dataset_collection_mapping" in job_attrs:
for output_name, output_keys in job_attrs["output_dataset_collection_mapping"].items():
output_keys = output_keys or []
for output_key in output_keys:
output_hdca = _find_hdca(output_key)
if output_hdca:
imported_job.add_output_dataset_collection(output_name, output_hdca)
def _normalize_job_parameters(
self,
imported_job: model.Job,
job_attrs: dict[str, Any],
_find_hda: Callable,
_find_hdca: Callable,
_find_dce: Callable,
) -> dict[str, Any]:
def remap_objects(p, k, obj):
if isinstance(obj, dict) and "src" in obj and obj["src"] in ["hda", "hdca", "dce"]:
if obj["src"] == "hda":
imported_hda = _find_hda(obj["id"])
if imported_hda:
new_id = imported_hda.id
else:
new_id = None
elif obj["src"] == "hdca":
imported_hdca = _find_hdca(obj["id"])
if imported_hdca:
new_id = imported_hdca.id
else:
new_id = None
elif obj["src"] == "dce":
imported_dce = _find_dce(obj["id"])
if imported_dce:
new_id = imported_dce.id
else:
new_id = None
else:
raise NotImplementedError()
new_obj = obj.copy()
if not new_id and self.import_options.allow_edit:
# We may not have exported all job parameters, such as dces,
# but we shouldn't set the object_id to none in that case.
new_id = obj["id"]
new_obj["id"] = new_id
return (k, new_obj)
return (k, obj)
params = job_attrs["params"]
params = remap(params, remap_objects)
return cast(dict[str, Any], params)
[docs]
class BagArchiveImportModelStore(DirectoryImportModelStoreLatest):
[docs]
def __init__(self, bag_archive: str, **kwd) -> None:
archive_dir = tempfile.mkdtemp()
bdb.extract_bag(bag_archive, output_path=archive_dir)
# Why this line though...?
archive_dir = os.path.join(archive_dir, os.listdir(archive_dir)[0])
bdb.revert_bag(archive_dir)
super().__init__(archive_dir, **kwd)
[docs]
class ModelExportStore(metaclass=abc.ABCMeta):
[docs]
@abc.abstractmethod
def export_history(
self, history: model.History, include_hidden: bool = False, include_deleted: bool = False
) -> None:
"""Export history to store."""
[docs]
@abc.abstractmethod
def export_library(
self, library: model.Library, include_hidden: bool = False, include_deleted: bool = False
) -> None:
"""Export library to store."""
[docs]
@abc.abstractmethod
def export_library_folder(
self, library_folder: model.LibraryFolder, include_hidden: bool = False, include_deleted: bool = False
) -> None:
"""Export library folder to store."""
[docs]
@abc.abstractmethod
def export_workflow_invocation(self, workflow_invocation, include_hidden=False, include_deleted=False):
"""Export workflow invocation to store."""
[docs]
@abc.abstractmethod
def add_dataset_collection(
self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]
):
"""Add Dataset Collection or HDCA to export store."""
[docs]
@abc.abstractmethod
def add_dataset(self, dataset: model.DatasetInstance, include_files: bool = True):
"""
Add HDA to export store.
``include_files`` controls whether file contents are exported as well.
"""
@abc.abstractmethod
def __enter__(self):
"""Export store should be used as context manager."""
@abc.abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
"""Export store should be used as context manager."""
[docs]
class DirectoryModelExportStore(ModelExportStore):
app: Optional[StoreAppProtocol]
file_sources: Optional[ConfiguredFileSources]
[docs]
def __init__(
self,
export_directory: StrPath,
app: Optional[StoreAppProtocol] = None,
file_sources: Optional[ConfiguredFileSources] = None,
for_edit: bool = False,
serialize_dataset_objects: Optional[bool] = None,
export_files: Optional[str] = None,
strip_metadata_files: bool = True,
serialize_jobs: bool = True,
user_context=None,
) -> None:
"""
:param export_directory: path to export directory. Will be created if it does not exist.
:param app: Galaxy App or app-like object. Must be provided if `for_edit` and/or `serialize_dataset_objects` are True
:param for_edit: Allow modifying existing HDA and dataset metadata during import.
:param serialize_dataset_objects: If True will encode IDs using the host secret. Defaults `for_edit`.
:param export_files: How files should be exported, can be 'symlink', 'copy' or None, in which case files
will not be serialized.
:param serialize_jobs: Include job data in model export. Not needed for set_metadata script.
"""
if not os.path.exists(export_directory):
os.makedirs(export_directory)
sessionless = False
if app is not None:
self.app = app
security = app.security
sessionless = False
if file_sources is None:
file_sources = app.file_sources
else:
sessionless = True
security = IdEncodingHelper(id_secret="randomdoesntmatter")
self.user_context = ProvidesFileSourcesUserContext(user_context)
self.file_sources = file_sources
self.serialize_jobs = serialize_jobs
self.sessionless = sessionless
self.security = security
self.export_directory = export_directory
self.serialization_options = model.SerializationOptions(
for_edit=for_edit,
serialize_dataset_objects=serialize_dataset_objects,
strip_metadata_files=strip_metadata_files,
serialize_files_handler=self,
)
self.export_files = export_files
self.included_datasets: dict[model.DatasetInstance, tuple[model.DatasetInstance, bool]] = {}
self.dataset_implicit_conversions: dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {}
self.included_collections: dict[
Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
] = {}
self.included_libraries: list[model.Library] = []
self.included_library_folders: list[model.LibraryFolder] = []
self.included_invocations: list[model.WorkflowInvocation] = []
self.collection_datasets: set[int] = set()
self.dataset_id_to_path: dict[int, tuple[Optional[str], Optional[str]]] = {}
self.job_output_dataset_associations: dict[int, dict[str, model.DatasetInstance]] = {}
@property
def workflows_directory(self) -> str:
return os.path.join(self.export_directory, "workflows")
[docs]
def serialize_files(self, dataset: model.DatasetInstance, as_dict: JsonDictT) -> None:
if self.export_files is None:
return None
add: Callable[[str, str], None]
if self.export_files == "symlink":
add = os.symlink
elif self.export_files == "copy":
def add(src, dest):
if os.path.isdir(src):
shutil.copytree(src, dest)
else:
shutil.copyfile(src, dest)
else:
raise Exception(f"Unknown export_files parameter type encountered {self.export_files}")
export_directory = self.export_directory
_, include_files = self.included_datasets[dataset]
if not include_files:
return
file_name, extra_files_path = None, None
try:
_file_name = dataset.get_file_name()
if os.path.exists(_file_name):
file_name = _file_name
except ObjectNotFound:
pass
if dataset.extra_files_path_exists():
extra_files_path = dataset.extra_files_path
else:
pass
dir_name = "datasets"
dir_path = os.path.join(export_directory, dir_name)
if dataset.dataset.id in self.dataset_id_to_path:
file_name, extra_files_path = self.dataset_id_to_path[dataset.dataset.id]
if file_name is not None:
as_dict["file_name"] = file_name
if extra_files_path is not None:
as_dict["extra_files_path"] = extra_files_path
return
conversion = self.dataset_implicit_conversions.get(dataset)
conversion_key = self.serialization_options.get_identifier(self.security, conversion) if conversion else None
if file_name:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
target_filename = get_export_dataset_filename(
as_dict["name"], as_dict["extension"], as_dict["encoded_id"], conversion_key=conversion_key
)
arcname = os.path.join(dir_name, target_filename)
src = file_name
dest = os.path.join(export_directory, arcname)
add(src, dest)
as_dict["file_name"] = arcname
if extra_files_path:
try:
file_list = os.listdir(extra_files_path)
except OSError:
file_list = []
if len(file_list):
extra_files_target_filename = get_export_dataset_extra_files_dir_name(
as_dict["encoded_id"], conversion_key=conversion_key
)
arcname = os.path.join(dir_name, extra_files_target_filename)
add(extra_files_path, os.path.join(export_directory, arcname))
as_dict["extra_files_path"] = arcname
else:
as_dict["extra_files_path"] = ""
self.dataset_id_to_path[dataset.dataset.id] = (as_dict.get("file_name"), as_dict.get("extra_files_path"))
[docs]
def exported_key(
self,
obj: model.RepresentById,
) -> Union[str, int]:
return self.serialization_options.get_identifier(self.security, obj)
def __enter__(self) -> "DirectoryModelExportStore":
return self
[docs]
def export_job(self, job: model.Job, tool=None, include_job_data=True):
self.export_jobs([job], include_job_data=include_job_data)
if tool_source := getattr(tool, "tool_source", None):
with open(os.path.join(self.export_directory, "tool.xml"), "w") as out:
out.write(tool_source.to_string())
[docs]
def export_jobs(
self,
jobs: Iterable[model.Job],
jobs_attrs: Optional[list[dict[str, Any]]] = None,
include_job_data: bool = True,
) -> list[dict[str, Any]]:
"""
Export jobs.
``include_job_data`` determines whether datasets associated with jobs should be exported as well.
This should generally be ``True``, except when re-exporting a job (to store the generated command line)
when running the set_meta script.
"""
jobs_attrs = jobs_attrs or []
for job in jobs:
job_attrs = job.serialize(self.security, self.serialization_options)
if include_job_data:
# -- Get input, output datasets. --
input_dataset_mapping: dict[str, list[Union[str, int]]] = {}
output_dataset_mapping: dict[str, list[Union[str, int]]] = {}
input_dataset_collection_mapping: dict[str, list[Union[str, int]]] = {}
input_dataset_collection_element_mapping: dict[str, list[Union[str, int]]] = {}
output_dataset_collection_mapping: dict[str, list[Union[str, int]]] = {}
implicit_output_dataset_collection_mapping: dict[str, list[Union[str, int]]] = {}
for id_assoc in job.input_datasets:
# Optional data inputs will not have a dataset.
if id_assoc.dataset:
name = id_assoc.name
if name not in input_dataset_mapping:
input_dataset_mapping[name] = []
input_dataset_mapping[name].append(self.exported_key(id_assoc.dataset))
if include_job_data:
self.add_dataset(id_assoc.dataset)
for od_assoc in job.output_datasets:
# Optional data inputs will not have a dataset.
if od_assoc.dataset:
name = od_assoc.name
if name not in output_dataset_mapping:
output_dataset_mapping[name] = []
output_dataset_mapping[name].append(self.exported_key(od_assoc.dataset))
if include_job_data:
self.add_dataset(od_assoc.dataset)
for idc_assoc in job.input_dataset_collections:
# Optional data inputs will not have a dataset.
if idc_assoc.dataset_collection:
name = idc_assoc.name
if name not in input_dataset_collection_mapping:
input_dataset_collection_mapping[name] = []
input_dataset_collection_mapping[name].append(self.exported_key(idc_assoc.dataset_collection))
if include_job_data:
self.export_collection(idc_assoc.dataset_collection)
for idce_assoc in job.input_dataset_collection_elements:
if idce_assoc.dataset_collection_element:
name = idce_assoc.name
if name not in input_dataset_collection_element_mapping:
input_dataset_collection_element_mapping[name] = []
input_dataset_collection_element_mapping[name].append(
self.exported_key(idce_assoc.dataset_collection_element)
)
if include_job_data:
if idce_assoc.dataset_collection_element.is_collection:
assert isinstance(
idce_assoc.dataset_collection_element.element_object, model.DatasetCollection
)
self.export_collection(idce_assoc.dataset_collection_element.element_object)
else:
assert isinstance(
idce_assoc.dataset_collection_element.element_object, model.DatasetInstance
)
self.add_dataset(idce_assoc.dataset_collection_element.element_object)
for odci_assoc in job.output_dataset_collection_instances:
# Optional data outputs will not have a dataset.
# These are implicit outputs, we don't need to export them
if odci_assoc.dataset_collection_instance:
name = odci_assoc.name
if name not in output_dataset_collection_mapping:
output_dataset_collection_mapping[name] = []
output_dataset_collection_mapping[name].append(
self.exported_key(odci_assoc.dataset_collection_instance)
)
for odc_assoc in job.output_dataset_collections:
if odc_assoc.dataset_collection:
name = odc_assoc.name
if name not in implicit_output_dataset_collection_mapping:
implicit_output_dataset_collection_mapping[name] = []
implicit_output_dataset_collection_mapping[name].append(
self.exported_key(odc_assoc.dataset_collection)
)
if include_job_data:
self.export_collection(odc_assoc.dataset_collection)
job_attrs["input_dataset_mapping"] = input_dataset_mapping
job_attrs["input_dataset_collection_mapping"] = input_dataset_collection_mapping
job_attrs["input_dataset_collection_element_mapping"] = input_dataset_collection_element_mapping
job_attrs["output_dataset_mapping"] = output_dataset_mapping
job_attrs["output_dataset_collection_mapping"] = output_dataset_collection_mapping
job_attrs["implicit_output_dataset_collection_mapping"] = implicit_output_dataset_collection_mapping
jobs_attrs.append(job_attrs)
jobs_attrs_filename = os.path.join(self.export_directory, ATTRS_FILENAME_JOBS)
with open(jobs_attrs_filename, "w") as jobs_attrs_out:
jobs_attrs_out.write(json_encoder.encode(jobs_attrs))
return jobs_attrs
[docs]
def export_history(
self, history: model.History, include_hidden: bool = False, include_deleted: bool = False
) -> None:
app = self.app
assert app, "exporting histories requires being bound to a session and Galaxy app object"
export_directory = self.export_directory
history_attrs = history.serialize(app.security, self.serialization_options)
history_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_HISTORY)
with open(history_attrs_filename, "w") as history_attrs_out:
dump(history_attrs, history_attrs_out)
sa_session = app.model.session
# Write collections' attributes (including datasets list) to file.
stmt_hdca = (
select(model.HistoryDatasetCollectionAssociation)
.where(model.HistoryDatasetCollectionAssociation.history == history)
.where(model.HistoryDatasetCollectionAssociation.deleted == expression.false())
)
collections = sa_session.scalars(stmt_hdca)
for collection in collections:
# filter this ?
if not collection.populated:
break
if collection.state != "ok":
break
self.export_collection(collection, include_deleted=include_deleted)
# Write datasets' attributes to file.
actions_backref = model.Dataset.actions
stmt_hda = (
select(model.HistoryDatasetAssociation)
.where(model.HistoryDatasetAssociation.history == history)
.join(model.Dataset)
.options(joinedload(model.HistoryDatasetAssociation.dataset).joinedload(actions_backref))
.order_by(model.HistoryDatasetAssociation.hid)
.where(model.Dataset.purged == expression.false())
)
datasets = sa_session.scalars(stmt_hda).unique()
for dataset in datasets:
# Add a new "annotation" attribute so that the user annotation for the dataset can be serialized without needing the user
dataset.annotation = get_item_annotation_str(sa_session, history.user, dataset) # type: ignore[attr-defined]
should_include_file = self._should_include_dataset(
dataset, include_hidden=include_hidden, include_deleted=include_deleted
)
if not dataset.deleted and dataset.id in self.collection_datasets:
should_include_file = True
if dataset not in self.included_datasets:
if should_include_file:
self._ensure_dataset_file_exists(dataset)
if dataset.implicitly_converted_parent_datasets:
# fetching 0th of list but I think this is just a mapping quirk - I can't imagine how there
# would be more than one of these -John
conversion = dataset.implicitly_converted_parent_datasets[0]
self.add_implicit_conversion_dataset(dataset, should_include_file, conversion)
else:
self.add_dataset(dataset, include_files=should_include_file)
[docs]
def export_library(
self, library: model.Library, include_hidden: bool = False, include_deleted: bool = False
) -> None:
self.included_libraries.append(library)
root_folder = library.root_folder
self.export_library_folder_contents(root_folder, include_hidden=include_hidden, include_deleted=include_deleted)
[docs]
def export_library_folder(self, library_folder: model.LibraryFolder, include_hidden=False, include_deleted=False):
self.included_library_folders.append(library_folder)
self.export_library_folder_contents(
library_folder, include_hidden=include_hidden, include_deleted=include_deleted
)
[docs]
def export_library_folder_contents(
self, library_folder: model.LibraryFolder, include_hidden: bool = False, include_deleted: bool = False
) -> None:
for library_dataset in library_folder.datasets:
ldda = library_dataset.library_dataset_dataset_association
should_include_file = (not ldda.visible or not include_hidden) and (not ldda.deleted or include_deleted)
self.add_dataset(ldda, should_include_file)
for folder in library_folder.folders:
self.export_library_folder_contents(folder, include_hidden=include_hidden, include_deleted=include_deleted)
def _should_include_dataset(
self, dataset: model.DatasetInstance, include_hidden: bool = False, include_deleted: bool = False
) -> bool:
return (dataset.visible or include_hidden) and (not dataset.deleted or include_deleted)
[docs]
def export_workflow_invocation(
self, workflow_invocation: model.WorkflowInvocation, include_hidden: bool = False, include_deleted: bool = False
) -> None:
self.included_invocations.append(workflow_invocation)
for input_dataset in workflow_invocation.input_datasets:
if input_dataset.dataset and self._should_include_dataset(
input_dataset.dataset, include_hidden, include_deleted
):
self.add_dataset(input_dataset.dataset)
for output_dataset in workflow_invocation.output_datasets:
if self._should_include_dataset(output_dataset.dataset, include_hidden, include_deleted):
self.add_dataset(output_dataset.dataset)
for input_dataset_collection in workflow_invocation.input_dataset_collections:
if input_dataset_collection.dataset_collection:
self.export_collection(
input_dataset_collection.dataset_collection,
include_hidden=include_hidden,
include_deleted=include_deleted,
)
for output_dataset_collection in workflow_invocation.output_dataset_collections:
self.export_collection(
output_dataset_collection.dataset_collection,
include_hidden=include_hidden,
include_deleted=include_deleted,
)
for workflow_invocation_step in workflow_invocation.steps:
for assoc in workflow_invocation_step.output_datasets:
if self._should_include_dataset(assoc.dataset, include_hidden, include_deleted):
self.add_dataset(assoc.dataset)
for assoc in workflow_invocation_step.output_dataset_collections:
self.export_collection(
assoc.dataset_collection, include_hidden=include_hidden, include_deleted=include_deleted
)
[docs]
def add_job_output_dataset_associations(
self, job_id: int, name: str, dataset_instance: model.DatasetInstance
) -> None:
job_output_dataset_associations = self.job_output_dataset_associations
if job_id not in job_output_dataset_associations:
job_output_dataset_associations[job_id] = {}
job_output_dataset_associations[job_id][name] = dataset_instance
[docs]
def export_collection(
self,
collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
include_deleted: bool = False,
include_hidden: bool = False,
) -> None:
self.add_dataset_collection(collection)
# export datasets for this collection
has_collection = (
collection.collection if isinstance(collection, model.HistoryDatasetCollectionAssociation) else collection
)
for collection_dataset in has_collection.dataset_instances:
# ignoring include_hidden since the datasets will default to hidden for this collection.
if collection_dataset.deleted and not include_deleted:
include_files = False
else:
include_files = True
self.add_dataset(collection_dataset, include_files=include_files)
self.collection_datasets.add(collection_dataset.id)
[docs]
def add_dataset_collection(
self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]
) -> None:
self.included_collections[collection] = collection
[docs]
def add_implicit_conversion_dataset(
self,
dataset: model.DatasetInstance,
include_files: bool,
conversion: model.ImplicitlyConvertedDatasetAssociation,
) -> None:
parent_hda = conversion.parent_hda
if parent_hda and parent_hda not in self.included_datasets:
# We should always include the parent of an implicit conversion
# to avoid holes in the provenance.
self.included_datasets[parent_hda] = (parent_hda, include_files)
grand_parent_association = parent_hda.implicitly_converted_parent_datasets
if grand_parent_association and (grand_parent_hda := grand_parent_association[0].parent_hda):
self.add_implicit_conversion_dataset(grand_parent_hda, include_files, grand_parent_association[0])
self.included_datasets[dataset] = (dataset, include_files)
self.dataset_implicit_conversions[dataset] = conversion
[docs]
def add_dataset(self, dataset: model.DatasetInstance, include_files: bool = True) -> None:
self.included_datasets[dataset] = (dataset, include_files)
def _ensure_dataset_file_exists(self, dataset: model.DatasetInstance) -> None:
state = dataset.dataset.state
if state in [model.Dataset.states.OK] and not dataset.get_file_name():
log.error(
f"Dataset [{dataset.id}] does not exists on on object store [{dataset.dataset.object_store_id or 'None'}], while trying to export."
)
raise Exception(
f"Cannot export history dataset [{getattr(dataset, 'hid', '')}: {dataset.name}] with id {self.exported_key(dataset)}"
)
def _finalize(self) -> None:
export_directory = self.export_directory
datasets_attrs = []
provenance_attrs = []
for dataset, include_files in self.included_datasets.values():
if include_files:
datasets_attrs.append(dataset)
else:
provenance_attrs.append(dataset)
def to_json(attributes):
return json_encoder.encode([a.serialize(self.security, self.serialization_options) for a in attributes])
datasets_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_DATASETS)
with open(datasets_attrs_filename, "w") as datasets_attrs_out:
datasets_attrs_out.write(to_json(datasets_attrs))
with open(f"{datasets_attrs_filename}.provenance", "w") as provenance_attrs_out:
provenance_attrs_out.write(to_json(provenance_attrs))
libraries_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_LIBRARIES)
with open(libraries_attrs_filename, "w") as libraries_attrs_out:
libraries_attrs_out.write(to_json(self.included_libraries))
library_folders_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_LIBRARY_FOLDERS)
with open(library_folders_attrs_filename, "w") as library_folder_attrs_out:
library_folder_attrs_out.write(to_json(self.included_library_folders))
collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS)
with open(collections_attrs_filename, "w") as collections_attrs_out:
collections_attrs_out.write(to_json(self.included_collections.values()))
conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS)
with open(conversions_attrs_filename, "w") as conversions_attrs_out:
conversions_attrs_out.write(to_json(self.dataset_implicit_conversions.values()))
jobs_attrs = []
for job_id, job_output_dataset_associations in self.job_output_dataset_associations.items():
output_dataset_mapping: dict[str, list[Union[str, int]]] = {}
for name, dataset in job_output_dataset_associations.items():
if name not in output_dataset_mapping:
output_dataset_mapping[name] = []
output_dataset_mapping[name].append(self.exported_key(dataset))
jobs_attrs.append({"id": job_id, "output_dataset_mapping": output_dataset_mapping})
if self.serialize_jobs:
#
# Write jobs attributes file.
#
# Get all jobs associated with included HDAs.
jobs_dict: dict[int, model.Job] = {}
implicit_collection_jobs_dict = {}
def record_job(job):
if not job or job.id in jobs_dict:
# No viable job or job already recorded.
return
jobs_dict[job.id] = job
if icja := job.implicit_collection_jobs_association:
implicit_collection_jobs = icja.implicit_collection_jobs
implicit_collection_jobs_dict[implicit_collection_jobs.id] = implicit_collection_jobs
def record_associated_jobs(obj):
# Get the job object.
job = None
for assoc in getattr(obj, "creating_job_associations", []):
# For mapped over jobs obj could be DatasetCollection, which has no creating_job_association
job = assoc.job
break
record_job(job)
for hda, _include_files in self.included_datasets.values():
# Get the associated job, if any. If this hda was copied from another,
# we need to find the job that created the original hda
if not isinstance(hda, (model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation)):
raise Exception(
f"Expected a HistoryDatasetAssociation or LibraryDatasetDatasetAssociation, but got a {type(hda)}: {hda}"
)
job_hda = hda
while job_hda.copied_from_history_dataset_association: # should this check library datasets as well?
# record job (if one exists) even if dataset was copied
# copy could have been created manually through UI/API or using database operation tool,
# in which case we have a relevant job to export.
record_associated_jobs(job_hda)
job_hda = job_hda.copied_from_history_dataset_association
record_associated_jobs(job_hda)
for hdca in self.included_collections:
record_associated_jobs(hdca)
self.export_jobs(jobs_dict.values(), jobs_attrs=jobs_attrs)
for invocation in self.included_invocations:
for step in invocation.steps:
for job in step.jobs:
record_job(job)
if step.implicit_collection_jobs:
implicit_collection_jobs = step.implicit_collection_jobs
implicit_collection_jobs_dict[implicit_collection_jobs.id] = implicit_collection_jobs
# Get jobs' attributes.
icjs_attrs = []
for icj in implicit_collection_jobs_dict.values():
icj_attrs = icj.serialize(self.security, self.serialization_options)
icjs_attrs.append(icj_attrs)
icjs_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS)
with open(icjs_attrs_filename, "w") as icjs_attrs_out:
icjs_attrs_out.write(json_encoder.encode(icjs_attrs))
invocations_attrs = []
for invocation in self.included_invocations:
invocation_attrs = invocation.serialize(self.security, self.serialization_options)
workflows_directory = self.workflows_directory
safe_makedirs(workflows_directory)
workflow = invocation.workflow
workflow_key = self.serialization_options.get_identifier(self.security, workflow)
history = invocation.history
assert invocation_attrs
invocation_attrs["workflow"] = workflow_key
if not self.app:
raise Exception(f"Missing self.app in {self}.")
self.app.workflow_contents_manager.store_workflow_artifacts(
workflows_directory, workflow_key, workflow, user=history.user, history=history
)
invocations_attrs.append(invocation_attrs)
invocations_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_INVOCATIONS)
with open(invocations_attrs_filename, "w") as invocations_attrs_out:
dump(invocations_attrs, invocations_attrs_out)
export_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_EXPORT)
with open(export_attrs_filename, "w") as export_attrs_out:
dump({"galaxy_export_version": GALAXY_EXPORT_VERSION}, export_attrs_out)
def __exit__(
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> bool:
if exc_type is None:
self._finalize()
# http://effbot.org/zone/python-with-statement.htm
# Ignores TypeError exceptions
return isinstance(exc_val, TypeError)
[docs]
class WriteCrates:
included_invocations: list[model.WorkflowInvocation]
export_directory: StrPath
included_datasets: dict[model.DatasetInstance, tuple[model.DatasetInstance, bool]]
dataset_implicit_conversions: dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation]
dataset_id_to_path: dict[int, tuple[Optional[str], Optional[str]]]
@property
@abc.abstractmethod
def workflows_directory(self) -> str: ...
def _generate_markdown_readme(self) -> str:
markdown_parts: list[str] = []
if self._is_single_invocation_export():
invocation = self.included_invocations[0]
name = invocation.workflow.name
create_time = invocation.create_time
markdown_parts.append("# Galaxy Workflow Invocation Export")
markdown_parts.append("")
markdown_parts.append(f"This crate describes the invocation of workflow {name} executed at {create_time}.")
else:
markdown_parts.append("# Galaxy Dataset Export")
return "\n".join(markdown_parts)
def _is_single_invocation_export(self) -> bool:
return len(self.included_invocations) == 1
def _init_crate(self) -> ROCrate:
is_invocation_export = self._is_single_invocation_export()
if is_invocation_export:
invocation_crate_builder = WorkflowRunCrateProfileBuilder(self)
return invocation_crate_builder.build_crate()
ro_crate = ROCrate()
# TODO: allow user to set the name, or get the name of the history
ro_crate.name = "Galaxy dataset export"
ro_crate.description = (
"This is a Galaxy dataset export "
f"including {len(self.included_datasets)} datasets. "
"This RO-Crate was automatically generated by Galaxy."
)
ro_crate.license = ""
markdown_path = os.path.join(self.export_directory, "README.md")
with open(markdown_path, "w") as f:
f.write(self._generate_markdown_readme())
properties = {
"name": "README.md",
"encodingFormat": "text/markdown",
"about": {"@id": "./"},
}
ro_crate.add_file(
markdown_path,
dest_path="README.md",
properties=properties,
)
for dataset, _ in self.included_datasets.values():
if dataset.dataset.id in self.dataset_id_to_path:
file_name, _ = self.dataset_id_to_path[dataset.dataset.id]
if file_name is None:
# The dataset was discarded or no longer exists. No file to export.
# TODO: should this be registered in the crate as a special case?
log.warning(
"RO-Crate export: skipping dataset [%s] with state [%s] because file does not exist.",
dataset.id,
dataset.state,
)
continue
name = dataset.name
encoding_format = dataset.datatype.get_mime()
properties = {
"name": name,
"encodingFormat": encoding_format,
}
ro_crate.add_file(
os.path.join(self.export_directory, file_name),
dest_path=file_name,
properties=properties,
)
workflows_directory = self.workflows_directory
if os.path.exists(workflows_directory):
for filename in os.listdir(workflows_directory):
is_computational_wf = not filename.endswith(".cwl")
workflow_cls = ComputationalWorkflow if is_computational_wf else WorkflowDescription
lang = "galaxy" if not filename.endswith(".cwl") else "cwl"
dest_path = os.path.join("workflows", filename)
is_main_entity = is_invocation_export and is_computational_wf
ro_crate.add_workflow(
source=os.path.join(workflows_directory, filename),
dest_path=dest_path,
main=is_main_entity,
cls=workflow_cls,
lang=lang,
)
found_workflow_licenses = set()
for workflow_invocation in self.included_invocations:
workflow = workflow_invocation.workflow
license = workflow.license
if license:
found_workflow_licenses.add(license)
if len(found_workflow_licenses) == 1:
ro_crate.license = next(iter(found_workflow_licenses))
# TODO: license per workflow
# TODO: API options to license workflow outputs seprately
# TODO: Export report as PDF and stick it in here
return ro_crate
[docs]
class WorkflowInvocationOnlyExportStore(DirectoryModelExportStore):
[docs]
def export_history(self, history: model.History, include_hidden: bool = False, include_deleted: bool = False):
"""Export history to store."""
raise NotImplementedError()
[docs]
def export_library(self, history, include_hidden=False, include_deleted=False):
"""Export library to store."""
raise NotImplementedError()
@property
def only_invocation(self) -> model.WorkflowInvocation:
assert len(self.included_invocations) == 1
return self.included_invocations[0]
[docs]
@dataclass
class BcoExportOptions:
galaxy_url: str
galaxy_version: str
merge_history_metadata: bool = False
override_environment_variables: Optional[dict[str, str]] = None
override_empirical_error: Optional[dict[str, str]] = None
override_algorithmic_error: Optional[dict[str, str]] = None
override_xref: Optional[list[XrefItem]] = None
[docs]
class FileSourceModelExportStore(abc.ABC, DirectoryModelExportStore):
"""
Export to file sources, from where data can be retrieved later on using a URI.
"""
file_source_uri: Optional[StrPath] = None
# data can be retrieved later using this URI
out_file: StrPath
# the output file is written to this path, from which it is written to the file source
[docs]
def __init__(self, uri, **kwds):
temp_output_dir = tempfile.mkdtemp()
self.temp_output_dir = temp_output_dir
if "://" in str(uri):
self.out_file = os.path.join(temp_output_dir, "out")
self.file_source_uri = uri
export_directory = os.path.join(temp_output_dir, "export")
else:
self.out_file = uri
self.file_source_uri = None
export_directory = temp_output_dir
super().__init__(export_directory, **kwds)
@abc.abstractmethod
def _generate_output_file(self) -> None:
"""
Generate the output file that will be uploaded to the file source.
Produce an output file and save it to `self.out_file`. A common pattern for this method to create an archive out
of `self.export_directory`.
This method runs after `DirectoryModelExportStore._finalize()`. Therefore, `self.export_directory` will already
have been populated when it runs.
"""
def _finalize(self):
super()._finalize() # populate `self.export_directory`
self._generate_output_file() # generate the output file `self.out_file`
if self.file_source_uri:
# upload output file to file source
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_uri = urlparse(str(self.file_source_uri))
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(self.out_file)
self.file_source_uri = f"{file_source_uri.scheme}://{file_source_uri.netloc}" + file_source.write_from(
file_source_path.path, self.out_file, user_context=self.user_context
)
shutil.rmtree(self.temp_output_dir)
[docs]
class BcoModelExportStore(FileSourceModelExportStore, WorkflowInvocationOnlyExportStore):
[docs]
def __init__(self, uri, export_options: BcoExportOptions, **kwds):
self.export_options = export_options
super().__init__(uri, **kwds)
def _generate_output_file(self):
core_biocompute_object, object_id = self._core_biocompute_object_and_object_id()
write_to_file(object_id, core_biocompute_object, self.out_file)
def _core_biocompute_object_and_object_id(self) -> tuple[BioComputeObjectCore, str]:
assert self.app # need app.security to do anything...
export_options = self.export_options
workflow_invocation = self.only_invocation
history = workflow_invocation.history
workflow = workflow_invocation.workflow
stored_workflow = workflow.stored_workflow
def get_dataset_url(encoded_dataset_id: str):
return f"{export_options.galaxy_url}api/datasets/{encoded_dataset_id}/display"
# pull in the creator_metadata info from workflow if it exists
contributors = get_contributors(workflow.creator_metadata)
provenance_domain = ProvenanceDomain(
name=workflow.name,
version=bco_workflow_version(workflow),
review=[],
contributors=contributors,
license=workflow.license or "",
created=workflow_invocation.create_time.isoformat(),
modified=workflow_invocation.update_time.isoformat(),
)
keywords = []
for tag in stored_workflow.tags:
keywords.append(tag.user_tname)
if export_options.merge_history_metadata:
for tag in history.tags:
if tag.user_tname not in keywords:
keywords.append(tag.user_tname)
# metrics = {} ... TODO
pipeline_steps: list[PipelineStep] = []
software_prerequisite_tracker = SoftwarePrerequisiteTracker()
input_subdomain_items: list[InputSubdomainItem] = []
output_subdomain_items: list[OutputSubdomainItem] = []
for step in workflow_invocation.steps:
workflow_step = step.workflow_step
software_prerequisite_tracker.register_step(workflow_step)
if workflow_step.type == "tool":
workflow_outputs_list = set()
output_list: list[DescriptionDomainUri] = []
input_list: list[DescriptionDomainUri] = []
for wo in workflow_step.workflow_outputs:
workflow_outputs_list.add(wo.output_name)
for job in step.jobs:
for job_input in job.input_datasets:
if hasattr(job_input.dataset, "dataset_id"):
encoded_dataset_id = self.app.security.encode_id(job_input.dataset.dataset_id)
url = get_dataset_url(encoded_dataset_id)
input_uri_obj = DescriptionDomainUri(
# TODO: that should maybe be a step prefix + element identifier where appropriate.
filename=job_input.dataset.name,
uri=url,
access_time=job_input.dataset.create_time.isoformat(),
)
input_list.append(input_uri_obj)
for job_output in job.output_datasets:
if hasattr(job_output.dataset, "dataset_id"):
encoded_dataset_id = self.app.security.encode_id(job_output.dataset.dataset_id)
url = get_dataset_url(encoded_dataset_id)
output_obj = DescriptionDomainUri(
filename=job_output.dataset.name,
uri=url,
access_time=job_output.dataset.create_time.isoformat(),
)
output_list.append(output_obj)
if job_output.name in workflow_outputs_list:
output = OutputSubdomainItem(
mediatype=job_output.dataset.extension,
uri=InputAndOutputDomainUri(
filename=job_output.dataset.name,
uri=url,
access_time=job_output.dataset.create_time.isoformat(),
),
)
output_subdomain_items.append(output)
step_index = workflow_step.order_index
step_name = workflow_step.label or workflow_step.tool_id
pipeline_step = PipelineStep(
step_number=step_index,
name=step_name,
description=workflow_step.annotations[0].annotation if workflow_step.annotations else "",
version=workflow_step.tool_version,
prerequisite=[],
input_list=input_list,
output_list=output_list,
)
pipeline_steps.append(pipeline_step)
if workflow_step.type == "data_input" and step.output_datasets:
for output_assoc in step.output_datasets:
encoded_dataset_id = self.app.security.encode_id(output_assoc.dataset_id)
url = get_dataset_url(encoded_dataset_id)
input_obj = InputSubdomainItem(
uri=Uri(
uri=url,
filename=workflow_step.label,
access_time=workflow_step.update_time.isoformat(),
),
)
input_subdomain_items.append(input_obj)
if workflow_step.type == "data_collection_input" and step.output_dataset_collections:
for output_dataset_collection_association in step.output_dataset_collections:
encoded_dataset_id = self.app.security.encode_id(
output_dataset_collection_association.dataset_collection_id
)
url = f"{export_options.galaxy_url}api/dataset_collections/{encoded_dataset_id}/download"
input_obj = InputSubdomainItem(
uri=Uri(
uri=url,
filename=workflow_step.label,
access_time=workflow_step.update_time.isoformat(),
),
)
input_subdomain_items.append(input_obj)
usability_domain_str: list[str] = []
for a in stored_workflow.annotations:
usability_domain_str.append(a.annotation)
if export_options.merge_history_metadata:
for h in history.annotations:
usability_domain_str.append(h.annotation)
parametric_domain_items: list[ParametricDomainItem] = []
for inv_step in workflow_invocation.steps:
try:
tool_inputs = inv_step.workflow_step.tool_inputs
if tool_inputs:
for k, v in tool_inputs.items():
param, value, step_index = k, v, inv_step.workflow_step.order_index
parametric_domain_items.append(
ParametricDomainItem(param=str(param), value=str(value), step=str(step_index))
)
except Exception:
continue
encoded_workflow_id = self.app.security.encode_id(workflow.id)
execution_domain = galaxy_execution_domain(
export_options.galaxy_url,
f"{export_options.galaxy_url}api/workflows?encoded_workflow_id={encoded_workflow_id}",
software_prerequisite_tracker.software_prerequisites,
export_options.override_environment_variables,
)
extension_domain = extension_domains(export_options.galaxy_url, export_options.galaxy_version)
error_domain = ErrorDomain(
empirical_error=export_options.override_empirical_error or {},
algorithmic_error=export_options.override_algorithmic_error or {},
)
usability_domain = UsabilityDomain(root=usability_domain_str)
description_domain = DescriptionDomain(
keywords=keywords,
xref=export_options.override_xref or [],
platform=["Galaxy"],
pipeline_steps=pipeline_steps,
)
parametric_domain = ParametricDomain(root=parametric_domain_items)
io_domain = InputAndOutputDomain(
input_subdomain=input_subdomain_items,
output_subdomain=output_subdomain_items,
)
core = BioComputeObjectCore(
description_domain=description_domain,
error_domain=error_domain,
execution_domain=execution_domain,
extension_domain=extension_domain,
io_domain=io_domain,
parametric_domain=parametric_domain,
provenance_domain=provenance_domain,
usability_domain=usability_domain,
)
encoded_invocation_id = self.app.security.encode_id(workflow_invocation.id)
url = f"{export_options.galaxy_url}api/invocations/{encoded_invocation_id}"
return core, url
[docs]
class ROCrateModelExportStore(DirectoryModelExportStore, WriteCrates):
[docs]
def __init__(self, crate_directory: StrPath, **kwds) -> None:
self.crate_directory = crate_directory
super().__init__(crate_directory, export_files="symlink", **kwds)
def _finalize(self) -> None:
super()._finalize()
ro_crate = self._init_crate()
ro_crate.write(self.crate_directory)
[docs]
class ROCrateArchiveModelExportStore(FileSourceModelExportStore, WriteCrates):
def _generate_output_file(self):
ro_crate = self._init_crate()
ro_crate.write(self.export_directory)
out_file_name = str(self.out_file)
if out_file_name.endswith(".zip"):
out_file = out_file_name[: -len(".zip")]
else:
out_file = out_file_name
archive = make_fast_zipfile(base_name=out_file, base_dir=self.export_directory, root_dir=self.export_directory)
shutil.move(archive, self.out_file)
[docs]
class TarModelExportStore(FileSourceModelExportStore):
[docs]
def __init__(self, uri: StrPath, gzip: bool = True, **kwds) -> None:
self.gzip = gzip
super().__init__(uri, **kwds)
def _generate_output_file(self):
tar_export_directory(self.export_directory, self.out_file, self.gzip)
[docs]
class BagDirectoryModelExportStore(DirectoryModelExportStore):
[docs]
def __init__(self, out_directory: str, **kwds) -> None:
self.out_directory = out_directory
super().__init__(out_directory, **kwds)
def _finalize(self) -> None:
super()._finalize()
bdb.make_bag(self.out_directory)
[docs]
class BagArchiveModelExportStore(FileSourceModelExportStore, BagDirectoryModelExportStore):
[docs]
def __init__(self, uri: StrPath, bag_archiver: str = "tgz", **kwds) -> None:
# bag_archiver in tgz, zip, tar
self.bag_archiver = bag_archiver
super().__init__(uri, **kwds)
def _generate_output_file(self):
archive = bdb.archive_bag(self.export_directory, self.bag_archiver)
shutil.move(archive, self.out_file)
[docs]
def get_export_store_factory(
app,
download_format: str,
export_files=None,
bco_export_options: Optional[BcoExportOptions] = None,
user_context=None,
) -> Callable[[StrPath], FileSourceModelExportStore]:
export_store_class: type[FileSourceModelExportStore]
export_store_class_kwds = {
"app": app,
"export_files": export_files,
"serialize_dataset_objects": False,
"user_context": user_context,
}
if download_format in ["tar.gz", "tgz"]:
export_store_class = TarModelExportStore
export_store_class_kwds["gzip"] = True
elif download_format in ["tar"]:
export_store_class = TarModelExportStore
export_store_class_kwds["gzip"] = False
elif download_format == "rocrate.zip":
export_store_class = ROCrateArchiveModelExportStore
elif download_format == "bco.json":
export_store_class = BcoModelExportStore
export_store_class_kwds["export_options"] = bco_export_options
elif download_format.startswith("bag."):
bag_archiver = download_format[len("bag.") :]
if bag_archiver not in ["zip", "tar", "tgz"]:
raise RequestParameterInvalidException(f"Unknown download format [{download_format}]")
export_store_class = BagArchiveModelExportStore
export_store_class_kwds["bag_archiver"] = bag_archiver
else:
raise RequestParameterInvalidException(f"Unknown download format [{download_format}]")
return lambda path: export_store_class(path, **export_store_class_kwds)
[docs]
def tar_export_directory(export_directory: StrPath, out_file: StrPath, gzip: bool) -> None:
tarfile_mode: Literal["w", "w:gz"] = "w:gz" if gzip else "w"
with tarfile.open(out_file, tarfile_mode, dereference=True) as store_archive:
for export_path in os.listdir(export_directory):
store_archive.add(os.path.join(export_directory, export_path), arcname=export_path)
[docs]
def get_export_dataset_filename(name: str, ext: str, encoded_id: str, conversion_key: Optional[str]) -> str:
"""
Builds a filename for a dataset using its name an extension.
"""
base = "".join(c in FILENAME_VALID_CHARS and c or "_" for c in name)
if not conversion_key:
return f"{base}_{encoded_id}.{ext}"
else:
return f"{base}_{encoded_id}_conversion_{conversion_key}.{ext}"
[docs]
def source_to_import_store(
source: Union[str, dict],
app: StoreAppProtocol,
import_options: Optional[ImportOptions],
model_store_format: Optional[ModelStoreFormat] = None,
user_context=None,
) -> ModelImportStore:
galaxy_user = user_context.user if user_context else None
if isinstance(source, dict):
if model_store_format is not None:
raise Exception(
"Can only specify a model_store_format as an argument to source_to_import_store in conjuction with URIs"
)
model_import_store: ModelImportStore = get_import_model_store_for_dict(
source,
import_options=import_options,
app=app,
user=galaxy_user,
)
else:
source_uri: str = str(source)
delete = False
tag_handler = app.tag_handler.create_tag_handler_session(galaxy_session=None)
if source_uri.startswith("file://"):
source_uri = source_uri[len("file://") :]
if "://" in source_uri:
user_context = ProvidesFileSourcesUserContext(user_context)
source_uri = stream_url_to_file(
source_uri, app.file_sources, prefix="gx_import_model_store", user_context=user_context
)
delete = True
target_path = source_uri
if target_path.endswith(".json"):
with open(target_path) as f:
store_dict = load(f)
assert isinstance(store_dict, dict)
model_import_store = get_import_model_store_for_dict(
store_dict,
import_options=import_options,
app=app,
user=galaxy_user,
)
elif os.path.isdir(target_path):
model_import_store = get_import_model_store_for_directory(
target_path, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler
)
else:
model_store_format = model_store_format or ModelStoreFormat.TGZ
if ModelStoreFormat.is_compressed(model_store_format):
try:
temp_dir = mkdtemp()
with CompressedFile(target_path) as cf:
target_dir = cf.extract(temp_dir)
finally:
if delete:
os.remove(target_path)
model_import_store = get_import_model_store_for_directory(
target_dir, import_options=import_options, app=app, user=galaxy_user, tag_handler=tag_handler
)
elif ModelStoreFormat.is_bag(model_store_format):
model_import_store = BagArchiveImportModelStore(
target_path, import_options=import_options, app=app, user=galaxy_user
)
else:
raise Exception(f"Unknown model_store_format type encountered {model_store_format}")
return model_import_store
[docs]
def payload_to_source_uri(payload) -> str:
if payload.store_content_uri:
source_uri = payload.store_content_uri
else:
store_dict = payload.store_dict
assert isinstance(store_dict, dict)
temp_dir = mkdtemp()
import_json = os.path.join(temp_dir, "import_store.json")
with open(import_json, "w") as f:
dump(store_dict, f)
source_uri = f"file://{import_json}"
return source_uri