Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.display_applications.parameters

# Contains parameters that are used in Display Applications
import mimetypes
from typing import Optional
from urllib.parse import quote_plus

from galaxy.util import string_as_bool
from galaxy.util.bunch import Bunch
from galaxy.util.template import fill_template

DEFAULT_DATASET_NAME = "dataset"


[docs]class DisplayApplicationParameter: """Abstract Class for Display Application Parameters""" type: Optional[str] = None
[docs] @classmethod def from_elem(cls, elem, link): param_type = elem.get("type", None) assert param_type, "DisplayApplicationParameter requires a type" return parameter_type_to_class[param_type](elem, link)
[docs] def __init__(self, elem, link): self.name = elem.get("name", None) assert self.name, "DisplayApplicationParameter requires a name" self.link = link self.url = elem.get( "url", self.name ) # name used in url for display purposes defaults to name; e.g. want the form of file.ext, where a '.' is not allowed as python variable name/keyword self.mime_type = elem.get("mimetype", None) self.guess_mime_type = string_as_bool(elem.get("guess_mimetype", "False")) self.viewable = string_as_bool( elem.get("viewable", "False") ) # only allow these to be viewed via direct url when explicitly set to viewable self.strip = string_as_bool(elem.get("strip", "False")) self.strip_https = string_as_bool(elem.get("strip_https", "False")) self.allow_override = string_as_bool( elem.get("allow_override", "False") ) # Passing query param app_<name>=<value> to dataset controller allows override if this is true. self.allow_cors = string_as_bool(elem.get("allow_cors", "False"))
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): raise Exception("get_value() is unimplemented for DisplayApplicationDataParameter")
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans): return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def ready(self, other_values): return True
[docs] def is_preparing(self, other_values): return False
[docs] def build_url(self, other_values): return fill_template(self.url, context=other_values)
[docs]class DisplayApplicationDataParameter(DisplayApplicationParameter): """Parameter that returns a file_name containing the requested content""" type = "data"
[docs] def __init__(self, elem, link): DisplayApplicationParameter.__init__(self, elem, link) self.extensions = elem.get("format", None) if self.extensions: self.extensions = self.extensions.split(",") self.metadata = elem.get("metadata", None) self.allow_extra_files_access = string_as_bool(elem.get("allow_extra_files_access", "False")) self.dataset = elem.get( "dataset", DEFAULT_DATASET_NAME ) # 'dataset' is default name assigned to dataset to be displayed assert not ( self.extensions and self.metadata ), "A format or a metadata can be defined for a DisplayApplicationParameter, but not both." assert not ( self.allow_extra_files_access and self.metadata ), "allow_extra_files_access or metadata can be defined for a DisplayApplicationParameter, but not both." self.viewable = string_as_bool(elem.get("viewable", "True")) # data params should be viewable self.force_url_param = string_as_bool(elem.get("force_url_param", "False")) self.force_conversion = string_as_bool(elem.get("force_conversion", "False"))
@property def formats(self): if self.extensions: return tuple( map( type, map( self.link.display_application.app.datatypes_registry.get_datatype_by_extension, self.extensions ), ) ) return None def _get_dataset_like_object(self, other_values): # this returned object has file_name, state, and states attributes equivalent to a DatasetAssociation data = other_values.get(self.dataset, None) assert data, "Base dataset could not be found in values provided to DisplayApplicationDataParameter" if isinstance(data, DisplayDataValueWrapper): data = data.value if self.metadata: rval = getattr(data.metadata, self.metadata, None) assert rval, f'Unknown metadata name "{self.metadata}" provided for dataset type "{data.ext}".' return Bunch(file_name=rval.file_name, state=data.state, states=data.states, extension="data") elif self.extensions and (self.force_conversion or not isinstance(data.datatype, self.formats)): for ext in self.extensions: rval = data.get_converted_files_by_type(ext) if rval: return rval direct_match, target_ext, converted_dataset = data.find_conversion_destination(self.formats) assert direct_match or target_ext is not None, f"No conversion path found for data param: {self.name}" return None return data
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): data = self._get_dataset_like_object(other_values) if data: return DisplayDataValueWrapper(data, self, other_values, dataset_hash, user_hash, trans) return None
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans): data = self._get_dataset_like_object(other_values) if not data and self.formats: data = other_values.get(self.dataset, None) trans.sa_session.refresh(data) # start conversion # FIXME: Much of this is copied (more than once...); should be some abstract method elsewhere called from here # find target ext direct_match, target_ext, converted_dataset = data.find_conversion_destination( self.formats, converter_safe=True ) if not direct_match: if target_ext and not converted_dataset: if isinstance(data, DisplayDataValueWrapper): data = data.value new_data = next( iter( data.datatype.convert_dataset( trans, data, target_ext, return_output=True, visible=False ).values() ) ) new_data.hid = data.hid new_data.name = data.name trans.sa_session.add(new_data) assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation( parent=data, file_type=target_ext, dataset=new_data, metadata_safe=False ) trans.sa_session.add(assoc) trans.sa_session.flush() elif converted_dataset and converted_dataset.state == converted_dataset.states.ERROR: raise Exception(f"Dataset conversion failed for data parameter: {self.name}") return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def is_preparing(self, other_values): value = self._get_dataset_like_object(other_values) if value and value.state in (value.states.NEW, value.states.UPLOAD, value.states.QUEUED, value.states.RUNNING): return True return False
[docs] def ready(self, other_values): value = self._get_dataset_like_object(other_values) if value: if value.state == value.states.OK: return True elif value.state == value.states.ERROR: raise Exception(f"A data display parameter is in the error state: {self.name}") return False
[docs]class DisplayApplicationTemplateParameter(DisplayApplicationParameter): """Parameter that returns a string containing the requested content""" type = "template"
[docs] def __init__(self, elem, link): DisplayApplicationParameter.__init__(self, elem, link) self.text = elem.text or ""
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): value = fill_template(self.text, context=other_values) if self.strip: value = value.strip() return DisplayParameterValueWrapper(value, self, other_values, dataset_hash, user_hash, trans)
parameter_type_to_class = { DisplayApplicationDataParameter.type: DisplayApplicationDataParameter, DisplayApplicationTemplateParameter.type: DisplayApplicationTemplateParameter, }
[docs]class DisplayParameterValueWrapper: ACTION_NAME = "param"
[docs] def __init__(self, value, parameter, other_values, dataset_hash, user_hash, trans): self.value = value self.parameter = parameter self.other_values = other_values self.trans = trans self._dataset_hash = dataset_hash self._user_hash = user_hash self._url = self.parameter.build_url(self.other_values)
def __str__(self): return str(self.value)
[docs] def mime_type(self, action_param_extra=None): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: mime, encoding = mimetypes.guess_type(self._url) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(".".split(self._url)[-1], None) if mime: return mime return "text/plain"
@property def url(self): base_url = self.trans.request.base if self.parameter.strip_https and base_url[:5].lower() == "https": base_url = f"http{base_url[5:]}" return "{}{}".format( base_url, self.trans.app.url_for( controller="dataset", action="display_application", dataset_id=self._dataset_hash, user_id=self._user_hash, app_name=quote_plus(self.parameter.link.display_application.id), link_name=quote_plus(self.parameter.link.id), app_action=self.action_name, action_param=self._url, ), ) @property def action_name(self): return self.ACTION_NAME @property def qp(self): # returns quoted str contents return self.other_values["qp"](str(self)) def __getattr__(self, key): return getattr(self.value, key)
[docs]class DisplayDataValueWrapper(DisplayParameterValueWrapper): ACTION_NAME = "data" def __str__(self): # string of data param is filename return str(self.value.file_name)
[docs] def mime_type(self, action_param_extra=None): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: if action_param_extra: mime, encoding = mimetypes.guess_type(action_param_extra) else: mime, encoding = mimetypes.guess_type(self._url) if not mime: if action_param_extra: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension( ".".split(action_param_extra)[-1], None ) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(".".split(self._url)[-1], None) if mime: return mime if hasattr(self.value, "get_mime"): return self.value.get_mime() return self.other_values[DEFAULT_DATASET_NAME].get_mime()
@property def action_name(self): if self.parameter.force_url_param: return super(DisplayParameterValueWrapper, self).action_name return self.ACTION_NAME @property def qp(self): # returns quoted url contents return self.other_values["qp"](self.url)