# Contains parameters that are used in Display Applications
import mimetypes
from typing import Optional
from urllib.parse import quote_plus
from galaxy.util import string_as_bool
from galaxy.util.bunch import Bunch
from galaxy.util.template import fill_template
DEFAULT_DATASET_NAME = "dataset"
[docs]class DisplayApplicationParameter:
"""Abstract Class for Display Application Parameters"""
type: Optional[str] = None
[docs] @classmethod
def from_elem(cls, elem, link):
param_type = elem.get("type", None)
assert param_type, "DisplayApplicationParameter requires a type"
return parameter_type_to_class[param_type](elem, link)
[docs] def __init__(self, elem, link):
self.name = elem.get("name", None)
assert self.name, "DisplayApplicationParameter requires a name"
self.link = link
self.url = elem.get(
"url", self.name
) # name used in url for display purposes defaults to name; e.g. want the form of file.ext, where a '.' is not allowed as python variable name/keyword
self.mime_type = elem.get("mimetype", None)
self.guess_mime_type = string_as_bool(elem.get("guess_mimetype", "False"))
self.viewable = string_as_bool(
elem.get("viewable", "False")
) # only allow these to be viewed via direct url when explicitly set to viewable
self.strip = string_as_bool(elem.get("strip", "False"))
self.strip_https = string_as_bool(elem.get("strip_https", "False"))
self.allow_override = string_as_bool(
elem.get("allow_override", "False")
) # Passing query param app_<name>=<value> to dataset controller allows override if this is true.
self.allow_cors = string_as_bool(elem.get("allow_cors", "False"))
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans):
raise Exception("get_value() is unimplemented for DisplayApplicationDataParameter")
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans):
return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def ready(self, other_values):
return True
[docs] def is_preparing(self, other_values):
return False
[docs] def build_url(self, other_values):
return fill_template(self.url, context=other_values)
[docs]class DisplayApplicationDataParameter(DisplayApplicationParameter):
"""Parameter that returns a file_name containing the requested content"""
type = "data"
[docs] def __init__(self, elem, link):
DisplayApplicationParameter.__init__(self, elem, link)
self.extensions = elem.get("format", None)
if self.extensions:
self.extensions = self.extensions.split(",")
self.metadata = elem.get("metadata", None)
self.allow_extra_files_access = string_as_bool(elem.get("allow_extra_files_access", "False"))
self.dataset = elem.get(
"dataset", DEFAULT_DATASET_NAME
) # 'dataset' is default name assigned to dataset to be displayed
assert not (
self.extensions and self.metadata
), "A format or a metadata can be defined for a DisplayApplicationParameter, but not both."
assert not (
self.allow_extra_files_access and self.metadata
), "allow_extra_files_access or metadata can be defined for a DisplayApplicationParameter, but not both."
self.viewable = string_as_bool(elem.get("viewable", "True")) # data params should be viewable
self.force_url_param = string_as_bool(elem.get("force_url_param", "False"))
self.force_conversion = string_as_bool(elem.get("force_conversion", "False"))
@property
def formats(self):
if self.extensions:
return tuple(
map(
type,
map(
self.link.display_application.app.datatypes_registry.get_datatype_by_extension, self.extensions
),
)
)
return None
def _get_dataset_like_object(self, other_values):
# this returned object has file_name, state, and states attributes equivalent to a DatasetAssociation
data = other_values.get(self.dataset, None)
assert data, "Base dataset could not be found in values provided to DisplayApplicationDataParameter"
if isinstance(data, DisplayDataValueWrapper):
data = data.value
if data.state != data.states.OK:
return None
if self.metadata:
rval = getattr(data.metadata, self.metadata, None)
assert rval, f'Unknown metadata name "{self.metadata}" provided for dataset type "{data.ext}".'
return Bunch(file_name=rval.file_name, state=data.state, states=data.states, extension="data")
elif self.extensions and (self.force_conversion or not isinstance(data.datatype, self.formats)):
for ext in self.extensions:
rval = data.get_converted_files_by_type(ext)
if rval:
return rval
direct_match, target_ext, _ = data.find_conversion_destination(self.formats)
assert direct_match or target_ext is not None, f"No conversion path found for data param: {self.name}"
return None
return data
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans):
data = self._get_dataset_like_object(other_values)
if data:
return DisplayDataValueWrapper(data, self, other_values, dataset_hash, user_hash, trans)
return None
[docs] def prepare(self, other_values, dataset_hash, user_hash, trans):
data = self._get_dataset_like_object(other_values)
if not data and self.formats:
data = other_values.get(self.dataset, None)
trans.sa_session.refresh(data)
# start conversion
# FIXME: Much of this is copied (more than once...); should be some abstract method elsewhere called from here
# find target ext
direct_match, target_ext, converted_dataset = data.find_conversion_destination(
self.formats, converter_safe=True
)
if not direct_match:
if target_ext and not converted_dataset:
if isinstance(data, DisplayDataValueWrapper):
data = data.value
new_data = next(
iter(
data.datatype.convert_dataset(
trans, data, target_ext, return_output=True, visible=False
).values()
)
)
new_data.hid = data.hid
new_data.name = data.name
trans.sa_session.add(new_data)
assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation(
parent=data, file_type=target_ext, dataset=new_data, metadata_safe=False
)
trans.sa_session.add(assoc)
trans.sa_session.flush()
elif converted_dataset and converted_dataset.state == converted_dataset.states.ERROR:
raise Exception(f"Dataset conversion failed for data parameter: {self.name}")
return self.get_value(other_values, dataset_hash, user_hash, trans)
[docs] def is_preparing(self, other_values):
value = self._get_dataset_like_object(other_values)
if value and value.state in (value.states.NEW, value.states.UPLOAD, value.states.QUEUED, value.states.RUNNING):
return True
return False
[docs] def ready(self, other_values):
value = self._get_dataset_like_object(other_values)
if value:
if value.state == value.states.OK:
return True
elif value.state == value.states.ERROR:
raise Exception(f"A data display parameter is in the error state: {self.name}")
return False
[docs]class DisplayApplicationTemplateParameter(DisplayApplicationParameter):
"""Parameter that returns a string containing the requested content"""
type = "template"
[docs] def __init__(self, elem, link):
DisplayApplicationParameter.__init__(self, elem, link)
self.text = elem.text or ""
[docs] def get_value(self, other_values, dataset_hash, user_hash, trans):
value = fill_template(self.text, context=other_values)
if self.strip:
value = value.strip()
return DisplayParameterValueWrapper(value, self, other_values, dataset_hash, user_hash, trans)
parameter_type_to_class = {
DisplayApplicationDataParameter.type: DisplayApplicationDataParameter,
DisplayApplicationTemplateParameter.type: DisplayApplicationTemplateParameter,
}
[docs]class DisplayParameterValueWrapper:
ACTION_NAME = "param"
[docs] def __init__(self, value, parameter, other_values, dataset_hash, user_hash, trans):
self.value = value
self.parameter = parameter
self.other_values = other_values
self.trans = trans
self._dataset_hash = dataset_hash
self._user_hash = user_hash
self._url = self.parameter.build_url(self.other_values)
def __str__(self):
return str(self.value)
[docs] def mime_type(self, action_param_extra=None):
if self.parameter.mime_type is not None:
return self.parameter.mime_type
if self.parameter.guess_mime_type:
mime, encoding = mimetypes.guess_type(self._url)
if not mime:
mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(self._url.split(".")[-1], None)
if mime:
return mime
return "text/plain"
@property
def url(self):
base_url = self.trans.request.base
if self.parameter.strip_https and base_url[:5].lower() == "https":
base_url = f"http{base_url[5:]}"
return "{}{}".format(
base_url,
self.trans.app.legacy_url_for(
mapper=self.trans.app.legacy_mapper,
controller="dataset",
action="display_application",
dataset_id=self._dataset_hash,
user_id=self._user_hash,
app_name=quote_plus(self.parameter.link.display_application.id),
link_name=quote_plus(self.parameter.link.id),
app_action=self.action_name,
action_param=self._url,
environ=self.trans.request.environ,
),
)
@property
def action_name(self):
return self.ACTION_NAME
@property
def qp(self):
# returns quoted str contents
return self.other_values["qp"](str(self))
def __getattr__(self, key):
return getattr(self.value, key)
[docs]class DisplayDataValueWrapper(DisplayParameterValueWrapper):
ACTION_NAME = "data"
def __str__(self):
# string of data param is filename
return str(self.value.file_name)
[docs] def mime_type(self, action_param_extra=None):
if self.parameter.mime_type is not None:
return self.parameter.mime_type
if self.parameter.guess_mime_type:
if action_param_extra:
mime, encoding = mimetypes.guess_type(action_param_extra)
else:
mime, encoding = mimetypes.guess_type(self._url)
if not mime:
if action_param_extra:
mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(
action_param_extra.split(".")[-1], None
)
if not mime:
mime = self.trans.app.datatypes_registry.get_mimetype_by_extension(self._url.split(".")[-1], None)
if mime:
return mime
if hasattr(self.value, "get_mime"):
return self.value.get_mime()
return self.other_values[DEFAULT_DATASET_NAME].get_mime()
@property
def action_name(self):
if self.parameter.force_url_param:
return super(DisplayParameterValueWrapper, self).action_name
return self.ACTION_NAME
@property
def qp(self):
# returns quoted url contents
return self.other_values["qp"](self.url)