Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.services.base

import mimetypes
from tempfile import NamedTemporaryFile
from typing import (
    Any,
    cast,
    List,
    NamedTuple,
    Optional,
)

from celery.result import AsyncResult

from galaxy.exceptions import (
    AuthenticationRequired,
    ConfigDoesNotAllowException,
)
from galaxy.managers.base import (
    decode_with_security,
    encode_with_security,
    get_class,
    get_object,
    SortableManager,
)
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.model_stores import create_objects_from_store
from galaxy.model import User
from galaxy.model.store import (
    get_export_store_factory,
    ModelExportStore,
)
from galaxy.schema.fields import EncodedDatabaseIdField
from galaxy.schema.schema import AsyncTaskResultSummary
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.short_term_storage import (
    ShortTermStorageAllocator,
    ShortTermStorageTarget,
)
from galaxy.util import ready_name_for_url


[docs]def ensure_celery_tasks_enabled(config): if not config.enable_celery_tasks: raise ConfigDoesNotAllowException( "This operation requires asynchronous tasks to be enabled on the Galaxy server and they are not, please contact the server admin." )
[docs]class SecurityNotProvidedError(Exception): pass
[docs]class ServiceBase: """Base class with common logic and utils reused by other services. A service class: - Provides top level operations (`Index`, `Show`, `Delete`...) that are usually consumed directly by the API controllers or other services. - Uses a combination of managers to perform the operations and avoids accessing the database layer directly. - Can speak 'pydantic' and has rich type annotations to be explicit about the required parameters and outputs of each operation. """
[docs] def __init__(self, security: Optional[IdEncodingHelper] = None): self._security = security
@property def security(self) -> IdEncodingHelper: if self._security is None: raise SecurityNotProvidedError( "Security encoding helper must be set in the service constructor to encode/decode ids." ) return self._security
[docs] def decode_id(self, id: EncodedDatabaseIdField, kind: Optional[str] = None) -> int: """Decodes a previously encoded database ID.""" return decode_with_security(self.security, id, kind=kind)
[docs] def encode_id(self, id: int, kind: Optional[str] = None) -> EncodedDatabaseIdField: """Encodes a raw database ID.""" return encode_with_security(self.security, id, kind=kind)
[docs] def decode_ids(self, ids: List[EncodedDatabaseIdField]) -> List[int]: """ Decodes all encoded IDs in the given list. """ return [self.decode_id(id) for id in ids]
[docs] def encode_all_ids(self, rval, recursive: bool = False): """ Encodes all integer values in the dict rval whose keys are 'id' or end with '_id' It might be useful to turn this in to a decorator """ return self.security.encode_all_ids(rval, recursive=recursive)
[docs] def build_order_by(self, manager: SortableManager, order_by_query: Optional[str] = None): """Returns an ORM compatible order_by clause using the order attribute and the given manager. The manager has to implement the `parse_order_by` function to support all the sortable model attributes.""" ORDER_BY_SEP_CHAR = "," if order_by_query and ORDER_BY_SEP_CHAR in order_by_query: return [manager.parse_order_by(o) for o in order_by_query.split(ORDER_BY_SEP_CHAR)] return manager.parse_order_by(order_by_query)
[docs] def get_class(self, class_name): """ Returns the class object that a string denotes. Without this method, we'd have to do eval(<class_name>). """ return get_class(class_name)
[docs] def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None): """ Convenience method to get a model object with the specified checks. """ return get_object( trans, id, class_name, check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted )
[docs] def check_user_is_authenticated(self, trans: ProvidesUserContext): """Raises an exception if the request is anonymous.""" if trans.anonymous: raise AuthenticationRequired("API authentication required for this request")
[docs] def get_authenticated_user(self, trans: ProvidesUserContext) -> User: """Gets the authenticated user and prevents access from anonymous users.""" self.check_user_is_authenticated(trans) return cast(User, trans.user)
[docs]class ServedExportStore(NamedTuple): export_store: ModelExportStore export_target: Any
[docs]def model_store_storage_target( short_term_storage_allocator: ShortTermStorageAllocator, file_name: str, model_store_format: str ) -> ShortTermStorageTarget: cleaned_filename = ready_name_for_url(file_name) filename_with_extension = f"{cleaned_filename}.{model_store_format}" mime_type = mimetypes.guess_type(filename_with_extension)[0] or "application/octet-stream" return short_term_storage_allocator.new_target( filename_with_extension, mime_type, )
[docs]class ServesExportStores:
[docs] def serve_export_store(self, app, download_format: str): export_target = NamedTemporaryFile("wb") export_store = get_export_store_factory(app, download_format)(export_target.name) return ServedExportStore(export_store, export_target)
[docs]class ConsumesModelStores:
[docs] def create_objects_from_store( self, trans, payload, history=None, for_library=False, ): galaxy_user = None if isinstance(trans.user, User): galaxy_user = trans.user return create_objects_from_store( app=trans.app, galaxy_user=galaxy_user, payload=payload, history=history, for_library=for_library, )
[docs]def async_task_summary(async_result: AsyncResult) -> AsyncTaskResultSummary: name = None try: name = async_result.name except AttributeError: # if backend is disabled, we won't have this pass queue = None try: queue = async_result.queue except AttributeError: # if backend is disabled, we won't have this pass return AsyncTaskResultSummary( id=str(async_result.id), ignored=async_result.ignored, name=name, queue=queue, )