Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.store

import abc
import contextlib
import datetime
import os
import shutil
import tarfile
import tempfile
from json import (
    dump,
    dumps,
    load,
)
from uuid import uuid4

from bdbag import bdbag_api as bdb
from boltons.iterutils import remap
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import expression

from galaxy.exceptions import MalformedContents, ObjectNotFound
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.util import FILENAME_VALID_CHARS
from galaxy.util import in_directory
from galaxy.util.bunch import Bunch
from galaxy.util.path import safe_walk
from ..custom_types import json_encoder
from ..item_attrs import add_item_annotation, get_item_annotation_str
from ... import model

ATTRS_FILENAME_HISTORY = 'history_attrs.txt'
ATTRS_FILENAME_DATASETS = 'datasets_attrs.txt'
ATTRS_FILENAME_JOBS = 'jobs_attrs.txt'
ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS = 'implicit_collection_jobs_attrs.txt'
ATTRS_FILENAME_COLLECTIONS = 'collections_attrs.txt'
ATTRS_FILENAME_EXPORT = 'export_attrs.txt'
ATTRS_FILENAME_LIBRARIES = 'libraries_attrs.txt'
GALAXY_EXPORT_VERSION = "2"


[docs]class ImportOptions:
[docs] def __init__(self, allow_edit=False, allow_library_creation=False, allow_dataset_object_edit=None): self.allow_edit = allow_edit self.allow_library_creation = allow_library_creation if allow_dataset_object_edit is None: allow_dataset_object_edit = allow_edit self.allow_dataset_object_edit = allow_dataset_object_edit
[docs]class SessionlessContext:
[docs] def __init__(self): self.objects = []
[docs] def flush(self): pass
[docs] def add(self, obj): self.objects.append(obj)
[docs] def query(self, model_class): def find(obj_id): for obj in self.objects: if isinstance(obj, model_class) and obj.id == obj_id: return obj return None return Bunch(find=find)
[docs]class ModelImportStore(metaclass=abc.ABCMeta):
[docs] def __init__(self, import_options=None, app=None, user=None, object_store=None): if object_store is None: if app is not None: object_store = app.object_store self.object_store = object_store self.app = app if app is not None: self.sa_session = app.model.context.current self.sessionless = False else: self.sa_session = SessionlessContext() self.sessionless = True self.user = user self.import_options = import_options or ImportOptions() self.dataset_state_serialized = True
[docs] @abc.abstractmethod def defines_new_history(self): """Does this store define a new history to create."""
[docs] @abc.abstractmethod def new_history_properties(self): """Dict of history properties if defines_new_history() is truthy."""
[docs] @abc.abstractmethod def datasets_properties(self): """Return a list of HDA properties."""
[docs] def library_properties(self): """Return a list of library properties.""" return []
[docs] @abc.abstractmethod def collections_properties(self): """Return a list of HDCA properties."""
[docs] @abc.abstractmethod def jobs_properties(self): """Return a list of jobs properties."""
@abc.abstractproperty def object_key(self): """Key used to connect objects in metadata. Legacy exports used 'hid' but associated objects may not be from the same history and a history may contain multiple objects with the same 'hid'. """
[docs] def trust_hid(self, obj_attrs): """Trust HID when importing objects into a new History."""
[docs] @contextlib.contextmanager def target_history(self, default_history=None): new_history = None if self.defines_new_history(): history_properties = self.new_history_properties() history_name = history_properties.get('name') if history_name: history_name = 'imported from archive: %s' % history_name else: history_name = 'unnamed imported history' # Create history. new_history = model.History(name=history_name, user=self.user) new_history.importing = True hid_counter = history_properties.get('hid_counter') genome_build = history_properties.get('genome_build') # TODO: This seems like it shouldn't be imported, try to test and verify we can calculate this # and get away without it. -John if hid_counter: new_history.hid_counter = hid_counter if genome_build: new_history.genome_build = genome_build self._session_add(new_history) self._flush() if self.user: add_item_annotation(self.sa_session, self.user, new_history, history_properties.get('annotation')) history = new_history else: history = default_history yield history if new_history is not None: # Done importing. new_history.importing = False self._flush()
[docs] def perform_import(self, history=None, new_history=False, job=None): object_import_tracker = ObjectImportTracker() datasets_attrs = self.datasets_properties() collections_attrs = self.collections_properties() self._import_datasets(object_import_tracker, datasets_attrs, history, new_history, job) self._import_dataset_copied_associations(object_import_tracker, datasets_attrs) self._import_libraries(object_import_tracker) self._import_collection_instances(object_import_tracker, collections_attrs, history, new_history) self._import_collection_implicit_input_associations(object_import_tracker, collections_attrs) self._import_collection_copied_associations(object_import_tracker, collections_attrs) self._reassign_hids(object_import_tracker, history) self._import_jobs(object_import_tracker, history) self._import_implicit_collection_jobs(object_import_tracker) self._flush()
def _import_datasets(self, object_import_tracker, datasets_attrs, history, new_history, job): object_key = self.object_key for dataset_attrs in datasets_attrs: if 'state' not in dataset_attrs: self.dataset_state_serialized = False def handle_dataset_object_edit(dataset_instance): if "dataset" in dataset_attrs: assert self.import_options.allow_dataset_object_edit dataset_attributes = [ "state", "deleted", "purged", "external_filename", "_extra_files_path", "file_size", "object_store_id", "total_size", "created_from_basename", "uuid" ] for attribute in dataset_attributes: if attribute in dataset_attrs["dataset"]: setattr(dataset_instance.dataset, attribute, dataset_attrs["dataset"][attribute]) if "hashes" in dataset_attrs["dataset"]: for hash_attrs in dataset_attrs["dataset"]["hashes"]: hash_obj = model.DatasetHash() hash_obj.hash_value = hash_attrs["hash_value"] hash_obj.hash_function = hash_attrs["hash_function"] hash_obj.extra_files_path = hash_attrs["extra_files_path"] dataset_instance.dataset.hashes.append(hash_obj) if 'id' in dataset_attrs["dataset"] and self.import_options.allow_edit: dataset_instance.dataset.id = dataset_attrs["dataset"]['id'] if 'id' in dataset_attrs and self.import_options.allow_edit and not self.sessionless: hda = self.sa_session.query(model.HistoryDatasetAssociation).get(dataset_attrs["id"]) attributes = [ "name", "extension", "info", "blurb", "peek", "designation", "visible", "metadata", ] for attribute in attributes: if attribute in dataset_attrs: value = dataset_attrs[attribute] if attribute == "metadata": def remap_objects(p, k, obj): if isinstance(obj, dict) and "model_class" in obj and obj["model_class"] == "MetadataFile": return (k, model.MetadataFile(dataset=hda, uuid=obj["uuid"])) return (k, obj) value = remap(value, remap_objects) setattr(hda, attribute, value) handle_dataset_object_edit(hda) self._flush() else: metadata = dataset_attrs['metadata'] model_class = dataset_attrs.get("model_class", "HistoryDatasetAssociation") if model_class == "HistoryDatasetAssociation": # Create dataset and HDA. dataset_instance = model.HistoryDatasetAssociation(name=dataset_attrs['name'], extension=dataset_attrs['extension'], info=dataset_attrs['info'], blurb=dataset_attrs['blurb'], peek=dataset_attrs['peek'], designation=dataset_attrs['designation'], visible=dataset_attrs['visible'], deleted=dataset_attrs.get('deleted', False), dbkey=metadata['dbkey'], metadata=metadata, history=history, create_dataset=True, flush=False, sa_session=self.sa_session) elif model_class == "LibraryDatasetDatasetAssociation": # Create dataset and HDA. dataset_instance = model.LibraryDatasetDatasetAssociation(name=dataset_attrs['name'], extension=dataset_attrs['extension'], info=dataset_attrs['info'], blurb=dataset_attrs['blurb'], peek=dataset_attrs['peek'], designation=dataset_attrs['designation'], visible=dataset_attrs['visible'], deleted=dataset_attrs.get('deleted', False), dbkey=metadata['dbkey'], metadata=metadata, create_dataset=True, sa_session=self.sa_session) else: raise Exception("Unknown dataset instance type encountered") self._attach_raw_id_if_editing(dataset_instance, dataset_attrs) # Older style... if 'uuid' in dataset_attrs: dataset_instance.dataset.uuid = dataset_attrs["uuid"] if 'dataset_uuid' in dataset_attrs: dataset_instance.dataset.uuid = dataset_attrs["dataset_uuid"] self._session_add(dataset_instance) self._flush() if model_class == "HistoryDatasetAssociation": # don't use add_history to manage HID handling across full import to try to preserve # HID structure. dataset_instance.history = history if new_history and self.trust_hid(dataset_attrs): dataset_instance.hid = dataset_attrs['hid'] else: object_import_tracker.requires_hid.append(dataset_instance) self._flush() if 'dataset' in dataset_attrs: handle_dataset_object_edit(dataset_instance) else: file_name = dataset_attrs.get('file_name') if file_name: # Do security check and move/copy dataset data. archive_path = os.path.abspath(os.path.join(self.archive_dir, file_name)) if os.path.islink(archive_path): raise MalformedContents("Invalid dataset path: %s" % archive_path) temp_dataset_file_name = \ os.path.realpath(archive_path) if not in_directory(temp_dataset_file_name, self.archive_dir): raise MalformedContents("Invalid dataset path: %s" % temp_dataset_file_name) if not file_name or not os.path.exists(temp_dataset_file_name): dataset_instance.state = dataset_instance.states.DISCARDED dataset_instance.deleted = True dataset_instance.purged = True dataset_instance.dataset.deleted = True dataset_instance.dataset.purged = True else: dataset_instance.state = dataset_attrs.get('state', dataset_instance.states.OK) self.object_store.update_from_file(dataset_instance.dataset, file_name=temp_dataset_file_name, create=True) # Import additional files if present. Histories exported previously might not have this attribute set. dataset_extra_files_path = dataset_attrs.get('extra_files_path', None) if dataset_extra_files_path: dir_name = dataset_instance.dataset.extra_files_path_name dataset_extra_files_path = os.path.join(self.archive_dir, dataset_extra_files_path) for root, _dirs, files in safe_walk(dataset_extra_files_path): extra_dir = os.path.join(dir_name, root.replace(dataset_extra_files_path, '', 1).lstrip(os.path.sep)) extra_dir = os.path.normpath(extra_dir) for extra_file in files: source = os.path.join(root, extra_file) if not in_directory(source, self.archive_dir): raise MalformedContents("Invalid dataset path: %s" % source) self.object_store.update_from_file( dataset_instance.dataset, extra_dir=extra_dir, alt_name=extra_file, file_name=source, create=True) dataset_instance.dataset.set_total_size() # update the filesize record in the database if dataset_instance.deleted: dataset_instance.dataset.deleted = True if model_class == "HistoryDatasetAssociation" and self.user: add_item_annotation(self.sa_session, self.user, dataset_instance, dataset_attrs['annotation']) tag_list = dataset_attrs.get('tags') if tag_list: tag_handler = model.tags.GalaxyTagHandler(sa_session=self.sa_session) tag_handler.set_tags_from_list(user=self.user, item=dataset_instance, new_tags_list=tag_list) if self.app: self.app.datatypes_registry.set_external_metadata_tool.regenerate_imported_metadata_if_needed( dataset_instance, history, job ) if model_class == "HistoryDatasetAssociation": if object_key in dataset_attrs: object_import_tracker.hdas_by_key[dataset_attrs[object_key]] = dataset_instance else: assert 'id' in dataset_attrs object_import_tracker.hdas_by_id[dataset_attrs['id']] = dataset_instance else: if object_key in dataset_attrs: object_import_tracker.lddas_by_key[dataset_attrs[object_key]] = dataset_instance else: assert 'id' in dataset_attrs object_import_tracker.lddas_by_key[dataset_attrs['id']] = dataset_instance def _import_libraries(self, object_import_tracker): object_key = self.object_key libraries_attrs = self.library_properties() for library_attrs in libraries_attrs: assert self.import_options.allow_library_creation name = library_attrs['name'] description = library_attrs['description'] synopsis = library_attrs['synopsis'] library = model.Library(name=name, description=description, synopsis=synopsis) self._session_add(library) def import_folder(folder_attrs): name = folder_attrs['name'] description = folder_attrs['description'] genome_build = folder_attrs['genome_build'] deleted = folder_attrs['deleted'] library_folder = model.LibraryFolder( name=name, description=description, genome_build=genome_build ) library_folder.deleted = deleted self._session_add(library_folder) self._flush() for sub_folder_attrs in folder_attrs.get("folders", []): sub_folder = import_folder(sub_folder_attrs) library_folder.add_folder(sub_folder) for ld_attrs in folder_attrs.get("datasets", []): ld = model.LibraryDataset( folder=library_folder, name=ld_attrs['name'], info=ld_attrs['info'], order_id=ld_attrs['order_id'] ) if 'ldda' in ld_attrs: ldda = object_import_tracker.lddas_by_key[ld_attrs["ldda"][object_key]] ld.library_dataset_dataset_association = ldda self._session_add(ld) self._flush() return library_folder if 'root_folder' in library_attrs: library.root_folder = import_folder(library_attrs['root_folder']) object_import_tracker.libraries_by_key[library_attrs[object_key]] = library def _import_collection_instances(self, object_import_tracker, collections_attrs, history, new_history): object_key = self.object_key def import_collection(collection_attrs): def materialize_elements(dc): if "elements" not in collection_attrs: return elements_attrs = collection_attrs['elements'] for element_attrs in elements_attrs: dce = model.DatasetCollectionElement(collection=dc, element=model.DatasetCollectionElement.UNINITIALIZED_ELEMENT, element_index=element_attrs['element_index'], element_identifier=element_attrs['element_identifier']) if 'encoded_id' in element_attrs: object_import_tracker.dces_by_key[element_attrs['encoded_id']] = dce if 'hda' in element_attrs: hda_attrs = element_attrs['hda'] if object_key in hda_attrs: hda_key = hda_attrs[object_key] hdas_by_key = object_import_tracker.hdas_by_key if hda_key in hdas_by_key: hda = hdas_by_key[hda_key] else: raise KeyError(f"Failed to find exported hda with key [{hda_key}] of type [{object_key}] in [{hdas_by_key}]") else: hda_id = hda_attrs["id"] hdas_by_id = object_import_tracker.hdas_by_id if hda_id not in hdas_by_id: raise Exception(f"Failed to find HDA with id [{hda_id}] in [{hdas_by_id}]") hda = hdas_by_id[hda_id] dce.hda = hda elif 'child_collection' in element_attrs: dce.child_collection = import_collection(element_attrs['child_collection']) else: raise Exception("Unknown collection element type encountered.") self._session_add(dce) if "id" in collection_attrs and self.import_options.allow_edit and not self.sessionless: dc = self.sa_session.query(model.DatasetCollection).get(collection_attrs["id"]) attributes = [ "collection_type", "populated_state", "element_count", ] for attribute in attributes: if attribute in collection_attrs: setattr(dc, attribute, collection_attrs[attribute]) materialize_elements(dc) else: # create collection dc = model.DatasetCollection(collection_type=collection_attrs['type']) dc.populated_state = collection_attrs["populated_state"] self._attach_raw_id_if_editing(dc, collection_attrs) # TODO: element_count... materialize_elements(dc) self._session_add(dc) return dc for collection_attrs in collections_attrs: if 'collection' in collection_attrs: dc = import_collection(collection_attrs["collection"]) if 'id' in collection_attrs and self.import_options.allow_edit and not self.sessionless: hdca = self.sa_session.query(model.HistoryDatasetCollectionAssociation).get(collection_attrs["id"]) # TODO: edit attributes... else: hdca = model.HistoryDatasetCollectionAssociation(collection=dc, visible=True, name=collection_attrs['display_name'], implicit_output_name=collection_attrs.get("implicit_output_name")) self._attach_raw_id_if_editing(hdca, collection_attrs) hdca.history = history if new_history and self.trust_hid(collection_attrs): hdca.hid = collection_attrs['hid'] else: object_import_tracker.requires_hid.append(hdca) self._session_add(hdca) if object_key in collection_attrs: object_import_tracker.hdcas_by_key[collection_attrs[object_key]] = hdca else: assert 'id' in collection_attrs object_import_tracker.hdcas_by_id[collection_attrs['id']] = hdca else: import_collection(collection_attrs) def _attach_raw_id_if_editing(self, obj, attrs): if self.sessionless and 'id' in attrs and self.import_options.allow_edit: obj.id = attrs['id'] def _import_collection_implicit_input_associations(self, object_import_tracker, collections_attrs): object_key = self.object_key for collection_attrs in collections_attrs: if "id" in collection_attrs: # Existing object, not a new one, this property is immutable via model stores currently. continue hdca = object_import_tracker.hdcas_by_key[collection_attrs[object_key]] if "implicit_input_collections" in collection_attrs: implicit_input_collections = collection_attrs["implicit_input_collections"] for implicit_input_collection in implicit_input_collections: name = implicit_input_collection["name"] input_collection_identifier = implicit_input_collection["input_dataset_collection"] if input_collection_identifier in object_import_tracker.hdcas_by_key: input_dataset_collection = object_import_tracker.hdcas_by_key[input_collection_identifier] hdca.add_implicit_input_collection(name, input_dataset_collection) def _import_dataset_copied_associations(self, object_import_tracker, datasets_attrs): object_key = self.object_key # Re-establish copied_from_history_dataset_association relationships so history extraction # has a greater chance of working in this history, for reproducibility. for dataset_attrs in datasets_attrs: if "id" in dataset_attrs: # Existing object, not a new one, this property is not immutable via model stores currently. continue dataset_key = dataset_attrs[object_key] if dataset_key not in object_import_tracker.hdas_by_key: continue hda = object_import_tracker.hdas_by_key[dataset_key] copied_from_chain = dataset_attrs.get("copied_from_history_dataset_association_id_chain", []) copied_from_object_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdas_by_key) if not copied_from_object_key: continue # Re-establish the chain if we can. if copied_from_object_key in object_import_tracker.hdas_by_key: hda.copied_from_history_dataset_association = object_import_tracker.hdas_by_key[copied_from_object_key] else: # We're at the end of the chain and this HDA was copied from an HDA # outside the history. So when we find this job and are looking for inputs/outputs # attach to this node... unless we've already encountered another dataset # copied from that jobs output... in that case we are going to cheat and # say this dataset was copied from that one. It wasn't in the original Galaxy # instance but I think it is fine to pretend in order to create a DAG here. hda_copied_from_sinks = object_import_tracker.hda_copied_from_sinks if copied_from_object_key in hda_copied_from_sinks: hda.copied_from_history_dataset_association = object_import_tracker.hdas_by_key[hda_copied_from_sinks[copied_from_object_key]] else: hda_copied_from_sinks[copied_from_object_key] = dataset_key def _import_collection_copied_associations(self, object_import_tracker, collections_attrs): object_key = self.object_key # Re-establish copied_from_history_dataset_collection_association relationships so history extraction # has a greater chance of working in this history, for reproducibility. Very similar to HDA code above # see comments there. for collection_attrs in collections_attrs: if "id" in collection_attrs: # Existing object, not a new one, this property is immutable via model stores currently. continue dataset_collection_key = collection_attrs[object_key] if dataset_collection_key not in object_import_tracker.hdcas_by_key: continue hdca = object_import_tracker.hdcas_by_key[dataset_collection_key] copied_from_chain = collection_attrs.get("copied_from_history_dataset_collection_association_id_chain", []) copied_from_object_key = _copied_from_object_key(copied_from_chain, object_import_tracker.hdcas_by_key) if not copied_from_object_key: continue # Re-establish the chain if we can, again see comments for hdas above for this to make more # sense. hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks if copied_from_object_key in object_import_tracker.hdcas_by_key: hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[copied_from_object_key] else: if copied_from_object_key in hdca_copied_from_sinks: hdca.copied_from_history_dataset_association = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]] else: hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key def _reassign_hids(self, object_import_tracker, history): # assign HIDs for newly created objects that didn't match original history requires_hid = object_import_tracker.requires_hid requires_hid_len = len(requires_hid) if requires_hid_len > 0 and not self.sessionless: base = history._next_hid(n=requires_hid_len) for i, obj in enumerate(requires_hid): obj.hid = base + i self._flush() def _import_jobs(self, object_import_tracker, history): object_key = self.object_key def _find_hda(input_key): hda = None if input_key in object_import_tracker.hdas_by_key: hda = object_import_tracker.hdas_by_key[input_key] if input_key in object_import_tracker.hda_copied_from_sinks: hda = object_import_tracker.hdas_by_key[object_import_tracker.hda_copied_from_sinks[input_key]] return hda def _find_hdca(input_key): hdca = None if input_key in object_import_tracker.hdcas_by_key: hdca = object_import_tracker.hdcas_by_key[input_key] if input_key in object_import_tracker.hdca_copied_from_sinks: hdca = object_import_tracker.hdcas_by_key[object_import_tracker.hdca_copied_from_sinks[input_key]] return hdca def _find_dce(input_key): dce = None if input_key in object_import_tracker.dces_by_key: dce = object_import_tracker.dces_by_key[input_key] return dce # # Create jobs. # jobs_attrs = self.jobs_properties() # Create each job. for job_attrs in jobs_attrs: if 'id' in job_attrs: # only thing we allow editing currently is associations for incoming jobs. assert self.import_options.allow_edit assert not self.sessionless job = self.sa_session.query(model.Job).get(job_attrs["id"]) self._connect_job_io(job, job_attrs, _find_hda, _find_hdca, _find_dce) self._flush() continue imported_job = model.Job() imported_job.user = self.user imported_job.history = history imported_job.imported = True imported_job.tool_id = job_attrs['tool_id'] imported_job.tool_version = job_attrs['tool_version'] raw_state = job_attrs['state'] if raw_state not in model.Job.terminal_states: raw_state = model.Job.states.ERROR imported_job.set_state(raw_state) imported_job.info = job_attrs.get('info', None) imported_job.exit_code = job_attrs.get('exit_code', None) imported_job.traceback = job_attrs.get('traceback', None) if 'stdout' in job_attrs: # Pre 19.05 export. imported_job.tool_stdout = job_attrs.get('stdout', None) imported_job.tool_stderr = job_attrs.get('stderr', None) else: # Post 19.05 export with separated I/O imported_job.tool_stdout = job_attrs.get('tool_stdout', None) imported_job.job_stdout = job_attrs.get('job_stdout', None) imported_job.tool_stderr = job_attrs.get('tool_stderr', None) imported_job.job_stderr = job_attrs.get('job_stderr', None) imported_job.command_line = job_attrs.get('command_line', None) try: imported_job.create_time = datetime.datetime.strptime(job_attrs["create_time"], "%Y-%m-%dT%H:%M:%S.%f") imported_job.update_time = datetime.datetime.strptime(job_attrs["update_time"], "%Y-%m-%dT%H:%M:%S.%f") except Exception: pass self._session_add(imported_job) self._flush() # Connect jobs to input and output datasets. params = self._normalize_job_parameters(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) for name, value in params.items(): # Transform parameter values when necessary. imported_job.add_parameter(name, dumps(value)) self._connect_job_io(imported_job, job_attrs, _find_hda, _find_hdca, _find_dce) self._flush() if object_key in job_attrs: object_import_tracker.jobs_by_key[job_attrs[object_key]] = imported_job def _import_implicit_collection_jobs(self, object_import_tracker): implicit_collection_jobs_attrs = self.implicit_collection_jobs_properties() for icj_attrs in implicit_collection_jobs_attrs: icj = model.ImplicitCollectionJobs() icj.populated_state = icj_attrs["populated_state"] icj.jobs = [] for order_index, job in enumerate(icj_attrs["jobs"]): icja = model.ImplicitCollectionJobsJobAssociation() icja.implicit_collection_jobs = icj if job in object_import_tracker.jobs_by_key: icja.job = object_import_tracker.jobs_by_key[job] icja.order_index = order_index icj.jobs.append(icja) self._session_add(icja) self._session_add(icj) self._flush() def _session_add(self, obj): self.sa_session.add(obj) def _flush(self): self.sa_session.flush()
def _copied_from_object_key(copied_from_chain, objects_by_key): if len(copied_from_chain) == 0: return None # Okay this gets fun, we need the last thing in the chain to reconnect jobs # from outside the history to inputs/outputs in this history but there may # be cycles in the chain that lead outside the original history, so just eliminate # all IDs not from this history except the last one. filtered_copied_from_chain = [] for i, copied_from_key in enumerate(copied_from_chain): filter_id = (i != len(copied_from_chain) - 1) and (copied_from_key not in objects_by_key) if not filter_id: filtered_copied_from_chain.append(copied_from_key) copied_from_chain = filtered_copied_from_chain if len(copied_from_chain) == 0: return None copied_from_object_key = copied_from_chain[0] return copied_from_object_key
[docs]class ObjectImportTracker: """Keep track of new and existing imported objects. Needed to re-establish connections and such in multiple passes. """
[docs] def __init__(self): self.libraries_by_key = {} self.hdas_by_key = {} self.hdas_by_id = {} self.hdcas_by_key = {} self.hdcas_by_id = {} self.dces_by_key = {} self.dces_by_id = {} self.lddas_by_key = {} self.hda_copied_from_sinks = {} self.hdca_copied_from_sinks = {} self.jobs_by_key = {} self.requires_hid = []
[docs]def get_import_model_store_for_directory(archive_dir, **kwd): assert os.path.isdir(archive_dir) if os.path.exists(os.path.join(archive_dir, ATTRS_FILENAME_EXPORT)): return DirectoryImportModelStoreLatest(archive_dir, **kwd) else: return DirectoryImportModelStore1901(archive_dir, **kwd)
[docs]class BaseDirectoryImportModelStore(ModelImportStore):
[docs] def defines_new_history(self): new_history_attributes = os.path.join(self.archive_dir, ATTRS_FILENAME_HISTORY) return os.path.exists(new_history_attributes)
[docs] def new_history_properties(self): new_history_attributes = os.path.join(self.archive_dir, ATTRS_FILENAME_HISTORY) history_properties = load(open(new_history_attributes)) return history_properties
[docs] def datasets_properties(self): datasets_attrs_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_DATASETS) datasets_attrs = load(open(datasets_attrs_file_name)) provenance_file_name = datasets_attrs_file_name + ".provenance" if os.path.exists(provenance_file_name): provenance_attrs = load(open(provenance_file_name)) datasets_attrs += provenance_attrs return datasets_attrs
[docs] def collections_properties(self): collections_attrs_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_COLLECTIONS) if os.path.exists(collections_attrs_file_name): collections_attrs = load(open(collections_attrs_file_name)) else: collections_attrs = [] return collections_attrs
[docs] def library_properties(self): libraries_attrs_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_LIBRARIES) if os.path.exists(libraries_attrs_file_name): libraries_attrs = load(open(libraries_attrs_file_name)) else: libraries_attrs = [] return libraries_attrs
[docs] def jobs_properties(self): jobs_attr_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_JOBS) try: return load(open(jobs_attr_file_name)) except FileNotFoundError: return []
[docs] def implicit_collection_jobs_properties(self): implicit_collection_jobs_attrs_file_name = os.path.join(self.archive_dir, ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS) try: return load(open(implicit_collection_jobs_attrs_file_name)) except FileNotFoundError: return []
[docs]class DirectoryImportModelStore1901(BaseDirectoryImportModelStore): object_key = 'hid'
[docs] def __init__(self, archive_dir, **kwd): super().__init__(**kwd) archive_dir = os.path.realpath(archive_dir) # Bioblend previous to 17.01 exported histories with an extra subdir. if not os.path.exists(os.path.join(archive_dir, ATTRS_FILENAME_HISTORY)): for d in os.listdir(archive_dir): if os.path.isdir(os.path.join(archive_dir, d)): archive_dir = os.path.join(archive_dir, d) break self.archive_dir = archive_dir
def _connect_job_io(self, imported_job, job_attrs, _find_hda, _find_hdca, _find_dce): for output_key in job_attrs['output_datasets']: output_hda = _find_hda(output_key) if output_hda: if not self.dataset_state_serialized: # dataset state has not been serialized, get state from job output_hda.state = imported_job.state imported_job.add_output_dataset(output_hda.name, output_hda) if 'input_mapping' in job_attrs: for input_name, input_key in job_attrs['input_mapping'].items(): input_hda = _find_hda(input_key) if input_hda: imported_job.add_input_dataset(input_name, input_hda) def _normalize_job_parameters(self, imported_job, job_attrs, _find_hda, _find_hdca, _find_dce): def remap_objects(p, k, obj): if isinstance(obj, dict) and obj.get('__HistoryDatasetAssociation__', False): imported_hda = _find_hda(obj[self.object_key]) if imported_hda: return (k, {"src": "hda", "id": imported_hda.id}) return (k, obj) params = job_attrs['params'] params = remap(params, remap_objects) return params
[docs] def trust_hid(self, obj_attrs): # We didn't do object tracking so we pretty much have to trust the HID and accept # that it will be wrong a lot. return True
[docs]class DirectoryImportModelStoreLatest(BaseDirectoryImportModelStore): object_key = 'encoded_id'
[docs] def __init__(self, archive_dir, **kwd): super().__init__(**kwd) archive_dir = os.path.realpath(archive_dir) self.archive_dir = archive_dir if self.defines_new_history(): self.import_history_encoded_id = self.new_history_properties().get("encoded_id") else: self.import_history_encoded_id = None
def _connect_job_io(self, imported_job, job_attrs, _find_hda, _find_hdca, _find_dce): if 'input_dataset_mapping' in job_attrs: for input_name, input_keys in job_attrs['input_dataset_mapping'].items(): input_keys = input_keys or [] for input_key in input_keys: input_hda = _find_hda(input_key) if input_hda: imported_job.add_input_dataset(input_name, input_hda) if 'input_dataset_collection_mapping' in job_attrs: for input_name, input_keys in job_attrs['input_dataset_collection_mapping'].items(): input_keys = input_keys or [] for input_key in input_keys: input_hdca = _find_hdca(input_key) if input_hdca: imported_job.add_input_dataset_collection(input_name, input_hdca) if 'input_dataset_collection_element_mapping' in job_attrs: for input_name, input_keys in job_attrs['input_dataset_collection_element_mapping'].items(): input_keys = input_keys or [] for input_key in input_keys: input_dce = _find_dce(input_key) if input_dce: imported_job.add_input_dataset_collection_element(input_name, input_dce) if 'output_dataset_mapping' in job_attrs: for output_name, output_keys in job_attrs['output_dataset_mapping'].items(): output_keys = output_keys or [] for output_key in output_keys: output_hda = _find_hda(output_key) if output_hda: if not self.dataset_state_serialized: # dataset state has not been serialized, get state from job output_hda.state = imported_job.state imported_job.add_output_dataset(output_name, output_hda) if 'output_dataset_collection_mapping' in job_attrs: for output_name, output_keys in job_attrs['output_dataset_collection_mapping'].items(): output_keys = output_keys or [] for output_key in output_keys: output_hdca = _find_hdca(output_key) if output_hdca: imported_job.add_output_dataset_collection(output_name, output_hdca)
[docs] def trust_hid(self, obj_attrs): return self.import_history_encoded_id and obj_attrs.get("history_encoded_id") == self.import_history_encoded_id
def _normalize_job_parameters(self, imported_job, job_attrs, _find_hda, _find_hdca, _find_dce): def remap_objects(p, k, obj): if isinstance(obj, dict) and "src" in obj and obj["src"] in ["hda", "hdca", "dce"]: if obj["src"] == "hda": imported_hda = _find_hda(obj["id"]) if imported_hda: new_id = imported_hda.id else: new_id = None elif obj["src"] == "hdca": imported_hdca = _find_hdca(obj["id"]) if imported_hdca: new_id = imported_hdca.id else: new_id = None elif obj["src"] == "dce": imported_dce = _find_dce(obj["id"]) if imported_dce: new_id = imported_dce.id else: new_id = None else: raise NotImplementedError() new_obj = obj.copy() new_obj["id"] = new_id return (k, new_obj) return (k, obj) params = job_attrs['params'] params = remap(params, remap_objects) return params
[docs]class BagArchiveImportModelStore(DirectoryImportModelStoreLatest):
[docs] def __init__(self, bag_archive, **kwd): archive_dir = tempfile.mkdtemp() bdb.extract_bag(bag_archive, output_path=archive_dir) # Why this line though...? archive_dir = os.path.join(archive_dir, os.listdir(archive_dir)[0]) bdb.revert_bag(archive_dir) super().__init__(archive_dir, **kwd)
[docs]class ModelExportStore(metaclass=abc.ABCMeta):
[docs] @abc.abstractmethod def export_history(self, history, include_hidden=False, include_deleted=False): """Export history to store."""
[docs] @abc.abstractmethod def add_dataset_collection(self, collection): """Add HDCA to export store."""
[docs] @abc.abstractmethod def add_dataset(self, dataset, include_files=True): """Add HDA to export store."""
@abc.abstractmethod def __enter__(self): """Export store should be used as context manager.""" @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): """Export store should be used as context manager."""
[docs]class DirectoryModelExportStore(ModelExportStore):
[docs] def __init__(self, export_directory, app=None, for_edit=False, serialize_dataset_objects=None, export_files=None, strip_metadata_files=True, serialize_jobs=True): """ :param export_directory: path to export directory. Will be created if it does not exist. :param app: Galaxy App or app-like object. Must be provided if `for_edit` and/or `serialize_dataset_objects` are True :param for_edit: Allow modifying existing HDA and dataset metadata during import. :param serialize_dataset_objects: If True will encode IDs using the host secret. Defaults `for_edit`. :param export_files: How files should be exported, can be 'symlink', 'copy' or None, in which case files will not be serialized. :param serialize_jobs: Include job data in model export. Not needed for set_metadata script. """ if not os.path.exists(export_directory): os.makedirs(export_directory) if app is not None: self.app = app security = app.security sessionless = False else: sessionless = True security = IdEncodingHelper(id_secret="randomdoesntmatter") self.serialize_jobs = serialize_jobs self.sessionless = sessionless self.security = security self.export_directory = export_directory self.serialization_options = model.SerializationOptions( for_edit=for_edit, serialize_dataset_objects=serialize_dataset_objects, strip_metadata_files=strip_metadata_files, serialize_files_handler=self, ) self.export_files = export_files self.included_datasets = {} self.included_collections = [] self.included_libraries = [] self.included_library_folders = [] self.collection_datasets = {} self.collections_attrs = [] self.dataset_id_to_path = {} self.job_output_dataset_associations = {}
[docs] def serialize_files(self, dataset, as_dict): if self.export_files is None: return None elif self.export_files == "symlink": add = os.symlink elif self.export_files == "copy": def add(src, dest): if os.path.isdir(src): shutil.copytree(src, dest) else: shutil.copyfile(src, dest) export_directory = self.export_directory _, include_files = self.included_datasets[dataset.id] if not include_files: return file_name, extra_files_path = None, None try: _file_name = dataset.file_name if os.path.exists(_file_name): file_name = _file_name except ObjectNotFound: pass if dataset.extra_files_path_exists(): extra_files_path = dataset.extra_files_path else: pass dir_name = 'datasets' dir_path = os.path.join(export_directory, dir_name) dataset_hid = as_dict['hid'] assert dataset_hid, as_dict if dataset.dataset.id in self.dataset_id_to_path: file_name, extra_files_path = self.dataset_id_to_path[dataset.dataset.id] if file_name is not None: as_dict['file_name'] = file_name if extra_files_path is not None: as_dict['extra_files_path'] = extra_files_path return if file_name: if not os.path.exists(dir_path): os.makedirs(dir_path) target_filename = get_export_dataset_filename(as_dict['name'], as_dict['extension'], dataset_hid) arcname = os.path.join(dir_name, target_filename) src = file_name dest = os.path.join(export_directory, arcname) add(src, dest) as_dict['file_name'] = arcname if extra_files_path: try: file_list = os.listdir(extra_files_path) except OSError: file_list = [] if len(file_list): arcname = os.path.join(dir_name, 'extra_files_path_%s' % dataset_hid) add(extra_files_path, os.path.join(export_directory, arcname)) as_dict['extra_files_path'] = arcname else: as_dict['extra_files_path'] = '' self.dataset_id_to_path[dataset.dataset.id] = (as_dict.get("file_name"), as_dict.get("extra_files_path"))
[docs] def exported_key(self, obj): return self.serialization_options.get_identifier(self.security, obj)
def __enter__(self): return self
[docs] def export_history(self, history, include_hidden=False, include_deleted=False): app = self.app export_directory = self.export_directory history_attrs = history.serialize(app.security, self.serialization_options) history_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_HISTORY) with open(history_attrs_filename, 'w') as history_attrs_out: dump(history_attrs, history_attrs_out) sa_session = app.model.context.current # Write collections' attributes (including datasets list) to file. query = (sa_session.query(model.HistoryDatasetCollectionAssociation) .filter(model.HistoryDatasetCollectionAssociation.history == history) .filter(model.HistoryDatasetCollectionAssociation.deleted == expression.false())) collections = query.all() for collection in collections: # filter this ? if not collection.populated: break if collection.state != 'ok': break self.add_dataset_collection(collection) # export jobs for these datasets for collection_dataset in collection.dataset_instances: if collection_dataset.deleted and not include_deleted: include_files = False else: include_files = True self.add_dataset(collection_dataset, include_files=include_files) self.collection_datasets[collection_dataset.id] = True # Write datasets' attributes to file. query = (sa_session.query(model.HistoryDatasetAssociation) .filter(model.HistoryDatasetAssociation.history == history) .join("dataset") .options(joinedload("dataset").joinedload("actions")) .order_by(model.HistoryDatasetAssociation.hid) .filter(model.Dataset.purged == expression.false())) datasets = query.all() for dataset in datasets: dataset.annotation = get_item_annotation_str(sa_session, history.user, dataset) add_dataset = (not dataset.visible or not include_hidden) and (not dataset.deleted or include_deleted) if dataset.id in self.collection_datasets: add_dataset = True if dataset.id not in self.included_datasets: self.add_dataset(dataset, include_files=add_dataset)
[docs] def export_library(self, library, include_hidden=False, include_deleted=False): self.included_libraries.append(library) self.included_library_folders.append(library.root_folder) def collect_datasets(library_folder): for library_dataset in library_folder.datasets: ldda = library_dataset.library_dataset_dataset_association add_dataset = (not ldda.visible or not include_hidden) and (not ldda.deleted or include_deleted) # TODO: competing IDs here between ldda and hdas - fix this! self.included_datasets[ldda.id] = (ldda, add_dataset) for folder in library_folder.folders: collect_datasets(folder) collect_datasets(library.root_folder)
[docs] def add_job_output_dataset_associations(self, job_id, name, dataset_instance): job_output_dataset_associations = self.job_output_dataset_associations if job_id not in job_output_dataset_associations: job_output_dataset_associations[job_id] = {} job_output_dataset_associations[job_id][name] = dataset_instance
[docs] def add_dataset_collection(self, collection): self.collections_attrs.append(collection) self.included_collections.append(collection)
[docs] def add_dataset(self, dataset, include_files=True): dataset_id = dataset.id if dataset_id is None: # Better be a sessionless export, just assign a random ID # won't be able to de-duplicate datasets. This could be fixed # by using object identity or attaching something to the object # like temp_id used in serialization. assert self.sessionless dataset_id = uuid4().hex self.included_datasets[dataset_id] = (dataset, include_files)
def _finalize(self): export_directory = self.export_directory datasets_attrs = [] provenance_attrs = [] for dataset, include_files in self.included_datasets.values(): if include_files: datasets_attrs.append(dataset) else: provenance_attrs.append(dataset) def to_json(attributes): return json_encoder.encode([a.serialize(self.security, self.serialization_options) for a in attributes]) datasets_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_DATASETS) with open(datasets_attrs_filename, 'w') as datasets_attrs_out: datasets_attrs_out.write(to_json(datasets_attrs)) with open(datasets_attrs_filename + ".provenance", 'w') as provenance_attrs_out: provenance_attrs_out.write(to_json(provenance_attrs)) libraries_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_LIBRARIES) with open(libraries_attrs_filename, 'w') as libraries_attrs_out: libraries_attrs_out.write(to_json(self.included_libraries)) collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS) with open(collections_attrs_filename, 'w') as collections_attrs_out: collections_attrs_out.write(to_json(self.collections_attrs)) jobs_attrs = [] for job_id, job_output_dataset_associations in self.job_output_dataset_associations.items(): output_dataset_mapping = {} for name, dataset in job_output_dataset_associations.items(): if name not in output_dataset_mapping: output_dataset_mapping[name] = [] output_dataset_mapping[name].append(self.exported_key(dataset)) jobs_attrs.append({"id": job_id, 'output_dataset_mapping': output_dataset_mapping}) if self.serialize_jobs: # # Write jobs attributes file. # # Get all jobs associated with included HDAs. jobs_dict = {} implicit_collection_jobs_dict = {} def record_associated_jobs(obj): # Get the job object. job = None for assoc in getattr(obj, 'creating_job_associations', []): # For mapped over jobs obj could be DatasetCollection, which has no creating_job_association job = assoc.job break if not job: # No viable job. return jobs_dict[job.id] = job icja = job.implicit_collection_jobs_association if icja: implicit_collection_jobs = icja.implicit_collection_jobs implicit_collection_jobs_dict[implicit_collection_jobs.id] = implicit_collection_jobs for hda, _include_files in self.included_datasets.values(): # Get the associated job, if any. If this hda was copied from another, # we need to find the job that created the origial hda job_hda = hda while job_hda.copied_from_history_dataset_association: # should this check library datasets as well? job_hda = job_hda.copied_from_history_dataset_association if not job_hda.creating_job_associations: # No viable HDA found. continue record_associated_jobs(job_hda) for hdca in self.included_collections: record_associated_jobs(hdca) # Get jobs' attributes. for job in jobs_dict.values(): if self.serialization_options.for_edit: continue job_attrs = job.serialize(self.security, self.serialization_options) # -- Get input, output datasets. -- input_dataset_mapping = {} output_dataset_mapping = {} input_dataset_collection_mapping = {} input_dataset_collection_element_mapping = {} output_dataset_collection_mapping = {} implicit_output_dataset_collection_mapping = {} for assoc in job.input_datasets: # Optional data inputs will not have a dataset. if assoc.dataset: name = assoc.name if name not in input_dataset_mapping: input_dataset_mapping[name] = [] input_dataset_mapping[name].append(self.exported_key(assoc.dataset)) for assoc in job.output_datasets: # Optional data inputs will not have a dataset. if assoc.dataset: name = assoc.name if name not in output_dataset_mapping: output_dataset_mapping[name] = [] output_dataset_mapping[name].append(self.exported_key(assoc.dataset)) for assoc in job.input_dataset_collections: # Optional data inputs will not have a dataset. if assoc.dataset_collection: name = assoc.name if name not in input_dataset_collection_mapping: input_dataset_collection_mapping[name] = [] input_dataset_collection_mapping[name].append(self.exported_key(assoc.dataset_collection)) for assoc in job.input_dataset_collection_elements: if assoc.dataset_collection_element: name = assoc.name if name not in input_dataset_collection_element_mapping: input_dataset_collection_element_mapping[name] = [] input_dataset_collection_element_mapping[name].append(self.exported_key(assoc.dataset_collection_element)) for assoc in job.output_dataset_collection_instances: # Optional data outputs will not have a dataset. if assoc.dataset_collection_instance: name = assoc.name if name not in output_dataset_collection_mapping: output_dataset_collection_mapping[name] = [] output_dataset_collection_mapping[name].append(self.exported_key(assoc.dataset_collection_instance)) for assoc in job.output_dataset_collections: if assoc.dataset_collection: name = assoc.name if name not in implicit_output_dataset_collection_mapping: implicit_output_dataset_collection_mapping[name] = [] implicit_output_dataset_collection_mapping[name].append(self.exported_key(assoc.dataset_collection)) job_attrs['input_dataset_mapping'] = input_dataset_mapping job_attrs['input_dataset_collection_mapping'] = input_dataset_collection_mapping job_attrs['input_dataset_collection_element_mapping'] = input_dataset_collection_element_mapping job_attrs['output_dataset_mapping'] = output_dataset_mapping job_attrs['output_dataset_collection_mapping'] = output_dataset_collection_mapping job_attrs['implicit_output_dataset_collection_mapping'] = implicit_output_dataset_collection_mapping jobs_attrs.append(job_attrs) icjs_attrs = [] for icj in implicit_collection_jobs_dict.values(): icj_attrs = icj.serialize(self.security, self.serialization_options) icjs_attrs.append(icj_attrs) icjs_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_IMPLICIT_COLLECTION_JOBS) with open(icjs_attrs_filename, 'w') as icjs_attrs_out: icjs_attrs_out.write(json_encoder.encode(icjs_attrs)) export_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_EXPORT) with open(export_attrs_filename, 'w') as export_attrs_out: dump({"galaxy_export_version": GALAXY_EXPORT_VERSION}, export_attrs_out) jobs_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_JOBS) with open(jobs_attrs_filename, 'w') as jobs_attrs_out: jobs_attrs_out.write(json_encoder.encode(jobs_attrs)) def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: self._finalize() # http://effbot.org/zone/python-with-statement.htm # Ignores TypeError exceptions return isinstance(exc_val, TypeError)
[docs]class TarModelExportStore(DirectoryModelExportStore):
[docs] def __init__(self, out_file, gzip=True, **kwds): self.gzip = gzip self.out_file = out_file temp_output_dir = tempfile.mkdtemp() super().__init__(temp_output_dir, **kwds)
def _finalize(self): super()._finalize() tar_export_directory(self.export_directory, self.out_file, self.gzip) shutil.rmtree(self.export_directory)
[docs]class BagDirectoryModelExportStore(DirectoryModelExportStore):
[docs] def __init__(self, out_directory, **kwds): self.out_directory = out_directory super().__init__(out_directory, **kwds)
def _finalize(self): super()._finalize() bdb.make_bag(self.out_directory)
[docs]class BagArchiveModelExportStore(BagDirectoryModelExportStore):
[docs] def __init__(self, out_file, bag_archiver="tgz", **kwds): # bag_archiver in tgz, zip, tar self.bag_archiver = bag_archiver self.out_file = out_file temp_output_dir = tempfile.mkdtemp() super().__init__(temp_output_dir, **kwds)
def _finalize(self): super()._finalize() rval = bdb.archive_bag(self.export_directory, self.bag_archiver) shutil.move(rval, self.out_file) shutil.rmtree(self.export_directory)
[docs]def tar_export_directory(export_directory, out_file, gzip): tarfile_mode = "w" if gzip: tarfile_mode += ":gz" with tarfile.open(out_file, tarfile_mode, dereference=True) as history_archive: for export_path in os.listdir(export_directory): history_archive.add(os.path.join(export_directory, export_path), arcname=export_path)
[docs]def get_export_dataset_filename(name, ext, hid): """ Builds a filename for a dataset using its name an extension. """ base = ''.join(c in FILENAME_VALID_CHARS and c or '_' for c in name) return base + f"_{hid}.{ext}"
[docs]def imported_store_for_metadata(directory, object_store=None): import_options = ImportOptions(allow_dataset_object_edit=True, allow_edit=True) import_model_store = get_import_model_store_for_directory(directory, import_options=import_options, object_store=object_store) import_model_store.perform_import() return import_model_store