Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.model_stores

from typing import Optional

from galaxy import model
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.jobs.manager import JobManager
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.histories import HistoryManager
from galaxy.managers.users import UserManager
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.model.store import (
    ImportDiscardedDataType,
    ImportOptions,
    ObjectImportTracker,
    source_to_import_store,
)
from galaxy.schema.schema import HistoryContentType
from galaxy.schema.tasks import (
    GenerateHistoryContentDownload,
    GenerateHistoryDownload,
    GenerateInvocationDownload,
    ImportModelStoreTaskRequest,
    SetupHistoryExportJob,
    WriteHistoryContentTo,
    WriteHistoryTo,
    WriteInvocationTo,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.web.short_term_storage import (
    ShortTermStorageMonitor,
    storage_context,
)


[docs]class ModelStoreUserContext(ProvidesUserContext):
[docs] def __init__(self, app: MinimalManagerApp, user: model.User) -> None: self._app = app self._user = user
@property def app(self): return self._app @property def url_builder(self): raise NotImplementedError("URL builder not available in ModelStore context.")
[docs] def get_user(self): return self._user
[docs] def set_user(self, user): raise NotImplementedError("Cannot change user from ModelStore context.")
user = property(get_user, set_user)
[docs]class ModelStoreManager:
[docs] def __init__( self, app: MinimalManagerApp, history_manager: HistoryManager, sa_session: galaxy_scoped_session, job_manager: JobManager, short_term_storage_monitor: ShortTermStorageMonitor, user_manager: UserManager, ): self._app = app self._sa_session = sa_session self._job_manager = job_manager self._history_manager = history_manager self._short_term_storage_monitor = short_term_storage_monitor self._user_manager = user_manager
[docs] def setup_history_export_job(self, request: SetupHistoryExportJob): history_id = request.history_id job_id = request.job_id include_hidden = request.include_hidden include_deleted = request.include_deleted store_directory = request.store_directory history = self._sa_session.query(model.History).get(history_id) # symlink files on export, on worker files will tarred up in a dereferenced manner. with model.store.DirectoryModelExportStore( store_directory, app=self._app, export_files="symlink" ) as export_store: export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted) job = self._sa_session.query(model.Job).get(job_id) job.state = model.Job.states.NEW self._sa_session.flush() self._job_manager.enqueue(job)
[docs] def prepare_history_download(self, request: GenerateHistoryDownload): model_store_format = request.model_store_format history = self._history_manager.by_id(request.history_id) export_files = "symlink" if request.include_files else None include_hidden = request.include_hidden include_deleted = request.include_deleted with storage_context( request.short_term_storage_request_id, self._short_term_storage_monitor ) as short_term_storage_target: with model.store.get_export_store_factory(self._app, model_store_format, export_files=export_files)( short_term_storage_target.path ) as export_store: export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted)
[docs] def prepare_history_content_download(self, request: GenerateHistoryContentDownload): model_store_format = request.model_store_format export_files = "symlink" if request.include_files else None with storage_context( request.short_term_storage_request_id, self._short_term_storage_monitor ) as short_term_storage_target: with model.store.get_export_store_factory(self._app, model_store_format, export_files=export_files)( short_term_storage_target.path ) as export_store: if request.content_type == HistoryContentType.dataset: hda = self._sa_session.query(model.HistoryDatasetAssociation).get(request.content_id) export_store.add_dataset(hda) else: hdca = self._sa_session.query(model.HistoryDatasetCollectionAssociation).get(request.content_id) export_store.export_collection( hdca, include_hidden=request.include_hidden, include_deleted=request.include_deleted )
[docs] def prepare_invocation_download(self, request: GenerateInvocationDownload): model_store_format = request.model_store_format export_files = "symlink" if request.include_files else None with storage_context( request.short_term_storage_request_id, self._short_term_storage_monitor ) as short_term_storage_target: with model.store.get_export_store_factory(self._app, model_store_format, export_files=export_files)( short_term_storage_target.path ) as export_store: invocation = self._sa_session.query(model.WorkflowInvocation).get(request.invocation_id) export_store.export_workflow_invocation( invocation, include_hidden=request.include_hidden, include_deleted=request.include_deleted )
[docs] def write_invocation_to(self, request: WriteInvocationTo): model_store_format = request.model_store_format export_files = "symlink" if request.include_files else None target_uri = request.target_uri user_context = self._build_user_context(request.user.user_id) with model.store.get_export_store_factory( self._app, model_store_format, export_files=export_files, user_context=user_context, )(target_uri) as export_store: invocation = self._sa_session.query(model.WorkflowInvocation).get(request.invocation_id) export_store.export_workflow_invocation( invocation, include_hidden=request.include_hidden, include_deleted=request.include_deleted )
[docs] def write_history_content_to(self, request: WriteHistoryContentTo): model_store_format = request.model_store_format export_files = "symlink" if request.include_files else None target_uri = request.target_uri user_context = self._build_user_context(request.user.user_id) with model.store.get_export_store_factory( self._app, model_store_format, export_files=export_files, user_context=user_context )(target_uri) as export_store: if request.content_type == HistoryContentType.dataset: hda = self._sa_session.query(model.HistoryDatasetAssociation).get(request.content_id) export_store.add_dataset(hda) else: hdca = self._sa_session.query(model.HistoryDatasetCollectionAssociation).get(request.content_id) export_store.export_collection( hdca, include_hidden=request.include_hidden, include_deleted=request.include_deleted )
[docs] def write_history_to(self, request: WriteHistoryTo): model_store_format = request.model_store_format export_files = "symlink" if request.include_files else None target_uri = request.target_uri user_context = self._build_user_context(request.user.user_id) with model.store.get_export_store_factory( self._app, model_store_format, export_files=export_files, user_context=user_context )(target_uri) as export_store: history = self._history_manager.by_id(request.history_id) export_store.export_history( history, include_hidden=request.include_hidden, include_deleted=request.include_deleted )
[docs] def import_model_store(self, request: ImportModelStoreTaskRequest): import_options = ImportOptions( allow_library_creation=request.for_library, ) history_id = request.history_id if history_id: history = self._sa_session.query(model.History).get(history_id) else: history = None user_context = self._build_user_context(request.user.user_id) model_import_store = source_to_import_store( request.source_uri, self._app, import_options, model_store_format=request.model_store_format, user_context=user_context, ) new_history = history is None and not request.for_library if new_history: if not model_import_store.defines_new_history(): raise RequestParameterInvalidException("Supplied model store doesn't define new history to import.") with model_import_store.target_history(legacy_history_naming=False) as new_history: object_tracker = model_import_store.perform_import(new_history, new_history=True) object_tracker.new_history = new_history else: object_tracker = model_import_store.perform_import( history=history, new_history=new_history, ) return object_tracker
def _build_user_context(self, user_id: int): user = self._user_manager.by_id(user_id) user_context = ModelStoreUserContext(self._app, user) return user_context
[docs]def create_objects_from_store( app: MinimalManagerApp, galaxy_user: Optional[model.User], payload, history: Optional[model.History] = None, for_library: bool = False, ) -> ObjectImportTracker: import_options = ImportOptions( discarded_data=ImportDiscardedDataType.FORCE, allow_library_creation=for_library, ) user_context = ModelStoreUserContext(app, galaxy_user) if galaxy_user is not None else None model_import_store = source_to_import_store( payload.store_content_uri or payload.store_dict, app=app, import_options=import_options, model_store_format=payload.model_store_format, user_context=user_context, ) new_history = history is None and not for_library if new_history: if not model_import_store.defines_new_history(): raise RequestParameterInvalidException("Supplied model store doesn't define new history to import.") with model_import_store.target_history(legacy_history_naming=False) as new_history: object_tracker = model_import_store.perform_import(new_history, new_history=True) object_tracker.new_history = new_history else: object_tracker = model_import_store.perform_import( history=history, new_history=new_history, ) return object_tracker