Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.display_applications.parameters

# Contains parameters that are used in Display Applications
import mimetypes
from dataclasses import dataclass
from typing import (
    Callable,
    Optional,
    TYPE_CHECKING,
    Union,
)
from urllib.parse import quote_plus

from galaxy.datatypes.data import Data
from galaxy.model import DatasetInstance
from galaxy.model.base import transaction
from galaxy.schema.schema import DatasetState
from galaxy.util import string_as_bool
from galaxy.util.template import fill_template

if TYPE_CHECKING:
    from galaxy.datatypes.registry import Registry
DEFAULT_DATASET_NAME = "dataset"


[docs]class DisplayApplicationParameter: """Abstract Class for Display Application Parameters""" type: Optional[str] = None
[docs] @classmethod def from_elem(cls, elem, link): param_type = elem.get("type", None) assert param_type, "DisplayApplicationParameter requires a type" return parameter_type_to_class[param_type](elem, link)
[docs] def __init__(self, elem, link): self.name = elem.get("name", None) assert self.name, "DisplayApplicationParameter requires a name" self.link = link self.url = elem.get( "url", self.name ) # name used in url for display purposes defaults to name; e.g. want the form of file.ext, where a '.' is not allowed as python variable name/keyword self.mime_type = elem.get("mimetype", None) self.guess_mime_type = string_as_bool(elem.get("guess_mimetype", "False")) self.viewable = string_as_bool( elem.get("viewable", "False") ) # only allow these to be viewed via direct url when explicitly set to viewable self.strip = string_as_bool(elem.get("strip", "False")) self.strip_https = string_as_bool(elem.get("strip_https", "False")) self.allow_override = string_as_bool( elem.get("allow_override", "False") ) # Passing query param app_<name>=<value> to dataset controller allows override if this is true. self.allow_cors = string_as_bool(elem.get("allow_cors", "False"))
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): raise Exception("get_value() is unimplemented for DisplayApplicationDataParameter")
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans): return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def ready(self, other_values): return True
[docs] def is_preparing(self, other_values): return False
[docs] def build_url(self, other_values): return fill_template(self.url, context=other_values)
[docs]@dataclass class DatasetLikeObject: get_file_name: Callable state: DatasetState extension: str name: str dbkey: Optional[str] datatype: Data
[docs]class DisplayApplicationDataParameter(DisplayApplicationParameter): """Parameter that returns a file_name containing the requested content""" type = "data"
[docs] def __init__(self, elem, link): DisplayApplicationParameter.__init__(self, elem, link) self.extensions = elem.get("format", None) if self.extensions: self.extensions = self.extensions.split(",") self.metadata = elem.get("metadata", None) self.allow_extra_files_access = string_as_bool(elem.get("allow_extra_files_access", "False")) self.dataset = elem.get( "dataset", DEFAULT_DATASET_NAME ) # 'dataset' is default name assigned to dataset to be displayed assert not ( self.extensions and self.metadata ), "A format or a metadata can be defined for a DisplayApplicationParameter, but not both." assert not ( self.allow_extra_files_access and self.metadata ), "allow_extra_files_access or metadata can be defined for a DisplayApplicationParameter, but not both." self.viewable = string_as_bool(elem.get("viewable", "True")) # data params should be viewable self.force_url_param = string_as_bool(elem.get("force_url_param", "False")) self.force_conversion = string_as_bool(elem.get("force_conversion", "False"))
@property def datatypes_registry(self) -> "Registry": return self.link.display_application.app.datatypes_registry @property def formats(self): if self.extensions: return tuple( map( type, map(self.datatypes_registry.get_datatype_by_extension, self.extensions), ) ) return None def _get_dataset_like_object(self, other_values) -> Optional[Union[DatasetLikeObject, DatasetInstance]]: data = other_values.get(self.dataset, None) assert data, "Base dataset could not be found in values provided to DisplayApplicationDataParameter" if isinstance(data, DisplayDataValueWrapper): data = data.value if data.state != DatasetState.OK: return None if self.metadata: rval = getattr(data.metadata, self.metadata, None) if not rval: # May have to look at converted datasets for converted_dataset_association in data.implicitly_converted_datasets: converted_dataset = converted_dataset_association.dataset if converted_dataset.state != DatasetState.OK: return None rval = getattr(converted_dataset.metadata, self.metadata, None) if rval: data = converted_dataset break assert rval, f'Unknown metadata name "{self.metadata}" provided for dataset type "{data.ext}".' return DatasetLikeObject( get_file_name=rval.get_file_name, state=data.state, extension="data", dbkey=data.dbkey, name=data.name, datatype=data.datatype, ) elif ( self.formats and self.extensions and (self.force_conversion or not isinstance(data.datatype, self.formats)) ): for ext in self.extensions: rval = data.get_converted_files_by_type(ext) if rval: return rval direct_match, target_ext, _ = self.datatypes_registry.find_conversion_destination_for_dataset_by_extensions( data.extension, self.extensions ) assert direct_match or target_ext is not None, f"No conversion path found for data param: {self.name}" return None return data
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): data = self._get_dataset_like_object(other_values) if data: return DisplayDataValueWrapper(data, self, other_values, dataset_hash, user_hash, trans) return None
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans): data = self._get_dataset_like_object(other_values) if not data and self.formats: data = other_values.get(self.dataset, None) # start conversion # FIXME: Much of this is copied (more than once...); should be some abstract method elsewhere called from here # find target ext ( direct_match, target_ext, converted_dataset, ) = self.datatypes_registry.find_conversion_destination_for_dataset_by_extensions( data.extension, self.extensions ) if not direct_match: if target_ext and not converted_dataset: if isinstance(data, DisplayDataValueWrapper): data = data.value new_data = next( iter( data.datatype.convert_dataset( trans, data, target_ext, return_output=True, visible=False ).values() ) ) new_data.hid = data.hid new_data.name = data.name trans.sa_session.add(new_data) assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation( parent=data, file_type=target_ext, dataset=new_data, metadata_safe=False ) trans.sa_session.add(assoc) with transaction(trans.sa_session): trans.sa_session.commit() elif converted_dataset and converted_dataset.state == DatasetState.ERROR: raise Exception(f"Dataset conversion failed for data parameter: {self.name}") return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def is_preparing(self, other_values): value = self._get_dataset_like_object(other_values) if value and value.state in (DatasetState.NEW, DatasetState.UPLOAD, DatasetState.QUEUED, DatasetState.RUNNING): return True return False
[docs] def ready(self, other_values): value = self._get_dataset_like_object(other_values) if value: if value.state == DatasetState.OK: return True elif value.state == DatasetState.ERROR: raise Exception(f"A data display parameter is in the error state: {self.name}") return False
[docs]class DisplayApplicationTemplateParameter(DisplayApplicationParameter): """Parameter that returns a string containing the requested content""" type = "template"
[docs] def __init__(self, elem, link): DisplayApplicationParameter.__init__(self, elem, link) self.text = elem.text or ""
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): value = fill_template(self.text, context=other_values) if self.strip: value = value.strip() return DisplayParameterValueWrapper(value, self, other_values, dataset_hash, user_hash, trans)
parameter_type_to_class = { DisplayApplicationDataParameter.type: DisplayApplicationDataParameter, DisplayApplicationTemplateParameter.type: DisplayApplicationTemplateParameter, }
[docs]class DisplayParameterValueWrapper: ACTION_NAME = "param"
[docs] def __init__(self, value, parameter, other_values, dataset_hash, user_hash, trans): self.value = value self.parameter = parameter self.other_values = other_values self.trans = trans self._dataset_hash = dataset_hash self._user_hash = user_hash self._url = self.parameter.build_url(self.other_values)
def __str__(self): return str(self.value)
[docs] def mime_type(self, action_param_extra=None): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: mime, encoding = mimetypes.guess_type(self._url) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(self._url.split(".")[-1], None) if mime: return mime return "text/plain"
@property def url(self): base_url = self.trans.request.base if self.parameter.strip_https and base_url[:5].lower() == "https": base_url = f"http{base_url[5:]}" return "{}{}".format( base_url, self.trans.app.url_for( controller="dataset", action="display_application", dataset_id=self._dataset_hash, user_id=self._user_hash, app_name=quote_plus(self.parameter.link.display_application.id), link_name=quote_plus(self.parameter.link.id), app_action=self.action_name, action_param=self._url, ), ) @property def action_name(self): return self.ACTION_NAME @property def qp(self): # returns quoted str contents return self.other_values["qp"](str(self)) def __getattr__(self, key): return getattr(self.value, key)
[docs]class DisplayDataValueWrapper(DisplayParameterValueWrapper): ACTION_NAME = "data" def __str__(self): # string of data param is filename return str(self.value.get_file_name())
[docs] def mime_type(self, action_param_extra=None): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: if action_param_extra: mime, encoding = mimetypes.guess_type(action_param_extra) else: mime, encoding = mimetypes.guess_type(self._url) if not mime: if action_param_extra: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension( action_param_extra.split(".")[-1], None ) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(self._url.split(".")[-1], None) if mime: return mime if hasattr(self.value, "get_mime"): return self.value.get_mime() return self.other_values[DEFAULT_DATASET_NAME].get_mime()
@property def action_name(self): if self.parameter.force_url_param: return super(DisplayParameterValueWrapper, self).action_name return self.ACTION_NAME @property def qp(self): # returns quoted url contents return self.other_values["qp"](self.url)