Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.display_applications.parameters

# Contains parameters that are used in Display Applications
import mimetypes

from six.moves.urllib.parse import quote_plus

from galaxy.util import string_as_bool
from galaxy.util.bunch import Bunch
from galaxy.util.template import fill_template
from galaxy.web import url_for

DEFAULT_DATASET_NAME = 'dataset'


[docs]class DisplayApplicationParameter(object): """ Abstract Class for Display Application Parameters """ type = None
[docs] @classmethod def from_elem(cls, elem, link): param_type = elem.get('type', None) assert param_type, 'DisplayApplicationParameter requires a type' return parameter_type_to_class[param_type](elem, link)
[docs] def __init__(self, elem, link): self.name = elem.get('name', None) assert self.name, 'DisplayApplicationParameter requires a name' self.link = link self.url = elem.get('url', self.name) # name used in url for display purposes defaults to name; e.g. want the form of file.ext, where a '.' is not allowed as python variable name/keyword self.mime_type = elem.get('mimetype', None) self.guess_mime_type = string_as_bool(elem.get('guess_mimetype', 'False')) self.viewable = string_as_bool(elem.get('viewable', 'False')) # only allow these to be viewed via direct url when explicitly set to viewable self.strip = string_as_bool(elem.get('strip', 'False')) self.strip_https = string_as_bool(elem.get('strip_https', 'False')) self.allow_override = string_as_bool(elem.get('allow_override', 'False')) # Passing query param app_<name>=<value> to dataset controller allows override if this is true.
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): raise Exception('get_value() is unimplemented for DisplayApplicationDataParameter')
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans): return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def ready(self, other_values): return True
[docs] def is_preparing(self, other_values): return False
[docs] def build_url(self, other_values): return fill_template(self.url, context=other_values)
[docs]class DisplayApplicationDataParameter(DisplayApplicationParameter): """ Parameter that returns a file_name containing the requested content """ type = 'data'
[docs] def __init__(self, elem, link): DisplayApplicationParameter.__init__(self, elem, link) self.extensions = elem.get('format', None) if self.extensions: self.extensions = self.extensions.split(",") self.metadata = elem.get('metadata', None) self.allow_extra_files_access = string_as_bool(elem.get('allow_extra_files_access', 'False')) self.dataset = elem.get('dataset', DEFAULT_DATASET_NAME) # 'dataset' is default name assigned to dataset to be displayed assert not (self.extensions and self.metadata), 'A format or a metadata can be defined for a DisplayApplicationParameter, but not both.' assert not (self.allow_extra_files_access and self.metadata), 'allow_extra_files_access or metadata can be defined for a DisplayApplicationParameter, but not both.' self.viewable = string_as_bool(elem.get('viewable', 'True')) # data params should be viewable self.force_url_param = string_as_bool(elem.get('force_url_param', 'False')) self.force_conversion = string_as_bool(elem.get('force_conversion', 'False'))
@property def formats(self): if self.extensions: return tuple(map(type, map(self.link.display_application.app.datatypes_registry.get_datatype_by_extension, self.extensions))) return None def _get_dataset_like_object(self, other_values): # this returned object has file_name, state, and states attributes equivalent to a DatasetAssociation data = other_values.get(self.dataset, None) assert data, 'Base dataset could not be found in values provided to DisplayApplicationDataParameter' if isinstance(data, DisplayDataValueWrapper): data = data.value if self.metadata: rval = getattr(data.metadata, self.metadata, None) assert rval, 'Unknown metadata name (%s) provided for dataset type (%s).' % (self.metadata, data.datatype.__class__.name) return Bunch(file_name=rval.file_name, state=data.state, states=data.states, extension='data') elif self.extensions and (self.force_conversion or not isinstance(data.datatype, self.formats)): for ext in self.extensions: rval = data.get_converted_files_by_type(ext) if rval: return rval assert data.find_conversion_destination(self.formats)[0] is not None, "No conversion path found for data param: %s" % self.name return None return data
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): data = self._get_dataset_like_object(other_values) if data: return DisplayDataValueWrapper(data, self, other_values, dataset_hash, user_hash, trans) return None
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans): data = self._get_dataset_like_object(other_values) if not data and self.formats: data = other_values.get(self.dataset, None) trans.sa_session.refresh(data) # start conversion # FIXME: Much of this is copied (more than once...); should be some abstract method elsewhere called from here # find target ext target_ext, converted_dataset = data.find_conversion_destination(self.formats, converter_safe=True) if target_ext and not converted_dataset: if isinstance(data, DisplayDataValueWrapper): data = data.value new_data = next(iter(data.datatype.convert_dataset(trans, data, target_ext, return_output=True, visible=False).values())) new_data.hid = data.hid new_data.name = data.name trans.sa_session.add(new_data) assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation(parent=data, file_type=target_ext, dataset=new_data, metadata_safe=False) trans.sa_session.add(assoc) trans.sa_session.flush() elif converted_dataset and converted_dataset.state == converted_dataset.states.ERROR: raise Exception("Dataset conversion failed for data parameter: %s" % self.name) return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def is_preparing(self, other_values): value = self._get_dataset_like_object(other_values) if value and value.state in (value.states.NEW, value.states.UPLOAD, value.states.QUEUED, value.states.RUNNING): return True return False
[docs] def ready(self, other_values): value = self._get_dataset_like_object(other_values) if value: if value.state == value.states.OK: return True elif value.state == value.states.ERROR: raise Exception('A data display parameter is in the error state: %s' % (self.name)) return False
[docs]class DisplayApplicationTemplateParameter(DisplayApplicationParameter): """ Parameter that returns a string containing the requested content """ type = 'template'
[docs] def __init__(self, elem, link): DisplayApplicationParameter.__init__(self, elem, link) self.text = elem.text or ''
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans): value = fill_template(self.text, context=other_values) if self.strip: value = value.strip() return DisplayParameterValueWrapper(value, self, other_values, dataset_hash, user_hash, trans)
parameter_type_to_class = {DisplayApplicationDataParameter.type: DisplayApplicationDataParameter, DisplayApplicationTemplateParameter.type: DisplayApplicationTemplateParameter}
[docs]class DisplayParameterValueWrapper(object): ACTION_NAME = 'param'
[docs] def __init__(self, value, parameter, other_values, dataset_hash, user_hash, trans): self.value = value self.parameter = parameter self.other_values = other_values self.trans = trans self._dataset_hash = dataset_hash self._user_hash = user_hash self._url = self.parameter.build_url(self.other_values)
def __str__(self): return str(self.value)
[docs] def mime_type(self, action_param_extra=None): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: mime, encoding = mimetypes.guess_type(self._url) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(".".split(self._url)[-1], None) if mime: return mime return 'text/plain'
@property def url(self): base_url = self.trans.request.base if self.parameter.strip_https and base_url[: 5].lower() == 'https': base_url = "http%s" % base_url[5:] return "%s%s" % (base_url, url_for(controller='dataset', action="display_application", dataset_id=self._dataset_hash, user_id=self._user_hash, app_name=quote_plus(self.parameter.link.display_application.id), link_name=quote_plus(self.parameter.link.id), app_action=self.action_name, action_param=self._url)) @property def action_name(self): return self.ACTION_NAME @property def qp(self): # returns quoted str contents return self.other_values['qp'](str(self)) def __getattr__(self, key): return getattr(self.value, key)
[docs]class DisplayDataValueWrapper(DisplayParameterValueWrapper): ACTION_NAME = 'data' def __str__(self): # string of data param is filename return str(self.value.file_name)
[docs] def mime_type(self, action_param_extra=None): if self.parameter.mime_type is not None: return self.parameter.mime_type if self.parameter.guess_mime_type: if action_param_extra: mime, encoding = mimetypes.guess_type(action_param_extra) else: mime, encoding = mimetypes.guess_type(self._url) if not mime: if action_param_extra: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(".".split(action_param_extra)[-1], None) if not mime: mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(".".split(self._url)[-1], None) if mime: return mime if hasattr(self.value, 'get_mime'): return self.value.get_mime() return self.other_values[DEFAULT_DATASET_NAME].get_mime()
@property def action_name(self): if self.parameter.force_url_param: return super(DisplayParameterValueWrapper, self).action_name return self.ACTION_NAME @property def qp(self): # returns quoted url contents return self.other_values['qp'](self.url)