Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.visualization.plugins.config_parser
import logging
import galaxy.model
from galaxy.util.xml_macros import load
log = logging.getLogger(__name__)
[docs]class ParsingException(ValueError):
"""
An exception class for errors that occur during parsing of the visualizations
framework configuration XML file.
"""
[docs]class VisualizationsConfigParser:
"""
Class that parses a visualizations configuration XML file.
Each visualization will get the following info:
- how to load a visualization:
-- how to find the proper template
-- how to convert query string into DB models
- when/how to generate a link to the visualization
-- what provides the data
-- what information needs to be added to the query string
"""
#: what are the allowed 'entry_point_type' for entry_point elements
ALLOWED_ENTRY_POINT_TYPES = ['mako', 'html', 'script', 'chart']
#: what are the allowed href targets when clicking on a visualization anchor
VALID_RENDER_TARGETS = ['galaxy_main', '_top', '_blank']
[docs] def __init__(self):
# what parsers should be used for sub-components
self.data_source_parser = DataSourceParser()
self.param_parser = ParamParser()
self.param_modifier_parser = ParamModifierParser()
[docs] def parse_file(self, xml_filepath):
"""
Parse the given XML file for visualizations data.
:returns: visualization config dictionary
"""
xml_tree = load(xml_filepath)
visualization = self.parse_visualization(xml_tree.getroot())
return visualization
[docs] def parse_visualization(self, xml_tree):
"""
Parse the template, name, and any data_sources and params from the
given `xml_tree` for a visualization.
"""
returned = {}
# main tag specifies plugin type (visualization or
# interactive_enviornment).
returned['plugin_type'] = xml_tree.tag
# a text display name for end user links
returned['name'] = xml_tree.attrib.get('name', None)
if not returned['name']:
raise ParsingException('Visualization needs a name attribute.')
# allow manually turning off a vis by checking for a disabled property
if 'disabled' in xml_tree.attrib:
log.info('Visualizations plugin disabled: %s. Skipping...', returned['name'])
return None
# record the embeddable flag - defaults to false
# this is a design by contract promise that the visualization can be rendered inside another page
# often by rendering only a DOM fragment. Since this is an advanced feature that requires a bit more
# work from the creator's side - it defaults to False
returned['embeddable'] = False
if 'embeddable' in xml_tree.attrib:
returned['embeddable'] = xml_tree.attrib.get('embeddable', False) == 'true'
# a (for now) text description of what the visualization does
description = xml_tree.find('description')
returned['description'] = description.text.strip() if description is not None else None
# data_sources are the kinds of objects/data associated with the visualization
# e.g. views on HDAs can use this to find out what visualizations are applicable to them
data_sources = []
data_sources_confs = xml_tree.find('data_sources')
for data_source_conf in data_sources_confs.findall('data_source'):
data_source = self.data_source_parser.parse(data_source_conf)
if data_source:
data_sources.append(data_source)
# data_sources are not required
if not data_sources:
raise ParsingException('No valid data_sources for visualization')
returned['data_sources'] = data_sources
# TODO: this is effectively required due to param_confs.findall( 'param' )
# parameters spell out how to convert query string params into resources and data
# that will be parsed, fetched, etc. and passed to the template
# list or dict? ordered or not?
params = {}
param_confs = xml_tree.find('params')
param_elements = param_confs.findall('param') if param_confs is not None else []
for param_conf in param_elements:
param = self.param_parser.parse(param_conf)
if param:
params[param_conf.text] = param
# params are not required
if params:
returned['params'] = params
# param modifiers provide extra information for other params (e.g. hda_ldda='hda' -> dataset_id is an hda id)
# store these modifiers in a 2-level dictionary { target_param: { param_modifier_key: { param_mod_data }
# ugh - wish we didn't need these
param_modifiers = {}
param_modifier_elements = param_confs.findall('param_modifier') if param_confs is not None else []
for param_modifier_conf in param_modifier_elements:
param_modifier = self.param_modifier_parser.parse(param_modifier_conf)
# param modifiers map accrd. to the params they modify (for faster lookup)
target_param = param_modifier_conf.get('modifies')
param_modifier_key = param_modifier_conf.text
if param_modifier and target_param in params:
# multiple params can modify a single, other param,
# so store in a sub-dict, initializing if this is the first
if target_param not in param_modifiers:
param_modifiers[target_param] = {}
param_modifiers[target_param][param_modifier_key] = param_modifier
# not required
if param_modifiers:
returned['param_modifiers'] = param_modifiers
# entry_point: how will this plugin render/load? mako, script tag, or static html file?
returned['entry_point'] = self.parse_entry_point(xml_tree)
# link_text: the string to use for the text of any links/anchors to this visualization
link_text = xml_tree.find('link_text')
if link_text is not None and link_text.text:
returned['link_text'] = link_text
# render_target: where in the browser to open the rendered visualization
# defaults to: galaxy_main
render_target = xml_tree.find('render_target')
if((render_target is not None and render_target.text) and
(render_target.text in self.VALID_RENDER_TARGETS)):
returned['render_target'] = render_target.text
else:
returned['render_target'] = 'galaxy_main'
# consider unifying the above into its own element and parsing method
# load optional custom configuration specifiers
specs_section = xml_tree.find('specs')
if specs_section is not None:
returned['specs'] = DictParser(specs_section)
# load group specifiers
groups_section = xml_tree.find('groups')
if groups_section is not None:
returned['groups'] = ListParser(groups_section)
# load settings specifiers
settings_section = xml_tree.find('settings')
if settings_section is not None:
returned['settings'] = ListParser(settings_section)
return returned
[docs] def parse_entry_point(self, xml_tree):
"""
Parse the config file for an appropriate entry point: a mako template, a script tag,
or an html file, returning as dictionary with: `type`, `file`, and `attr`ibutes of
the element.
"""
# (older) mako-only syntax: the template to use in rendering the visualization
template = xml_tree.find('template')
if template is not None and template.text:
log.info('template syntax is deprecated: use entry_point instead')
return {
'type' : 'mako',
'file' : template.text,
'attr' : {}
}
# need one of the two: (the deprecated) template or entry_point
entry_point = xml_tree.find('entry_point')
if entry_point is None:
raise ParsingException('template or entry_point required')
# parse by returning a sub-object and simply copying any attributes unused here
entry_point_attrib = dict(entry_point.attrib)
entry_point_type = entry_point_attrib.pop('entry_point_type', 'mako')
if entry_point_type not in self.ALLOWED_ENTRY_POINT_TYPES:
raise ParsingException('Unknown entry_point type: ' + entry_point_type)
return {
'type' : entry_point_type,
'file' : entry_point.text,
'attr' : entry_point_attrib
}
# -------------------------------------------------------------------
[docs]class DataSourceParser:
"""
Component class of VisualizationsConfigParser that parses data_source elements
within visualization elements.
data_sources are (in the extreme) any object that can be used to produce
data for the visualization to consume (e.g. HDAs, LDDAs, Jobs, Users, etc.).
There can be more than one data_source associated with a visualization.
"""
# these are the allowed classes to associate visualizations with (as strings)
# any model_class element not in this list will throw a parsing ParsingExcepion
ALLOWED_MODEL_CLASSES = [
'Visualization',
'HistoryDatasetAssociation',
'LibraryDatasetDatasetAssociation'
]
ATTRIBUTE_SPLIT_CHAR = '.'
# these are the allowed object attributes to use in data source tests
# any attribute element not in this list will throw a parsing ParsingExcepion
ALLOWED_DATA_SOURCE_ATTRIBUTES = [
'datatype'
]
[docs] def parse(self, xml_tree):
"""
Return a visualization data_source dictionary parsed from the given
XML element.
"""
returned = {}
# model_class (required, only one) - look up and convert model_class to actual galaxy model class
model_class = self.parse_model_class(xml_tree.find('model_class'))
if not model_class:
raise ParsingException('data_source needs a model class')
returned['model_class'] = model_class
# tests (optional, 0 or more) - data for boolean test: 'is the visualization usable by this object?'
# when no tests are given, default to isinstance( object, model_class )
returned['tests'] = self.parse_tests(xml_tree.findall('test'))
# to_params (optional, 0 or more) - tells the registry to set certain params based on the model_clas, tests
returned['to_params'] = {}
to_params = self.parse_to_params(xml_tree.findall('to_param'))
if to_params:
returned['to_params'] = to_params
return returned
[docs] def parse_model_class(self, xml_tree):
"""
Convert xml model_class element to a galaxy model class
(or None if model class is not found).
This element is required and only the first element is used.
The model_class string must be in ALLOWED_MODEL_CLASSES.
"""
if xml_tree is None or not xml_tree.text:
raise ParsingException('data_source entry requires a model_class')
if xml_tree.text not in self.ALLOWED_MODEL_CLASSES:
# log.debug( 'available data_source model_classes: %s' %( str( self.ALLOWED_MODEL_CLASSES ) ) )
raise ParsingException('Invalid data_source model_class: %s' % (xml_tree.text))
# look up the model from the model module returning an empty data_source if not found
model_class = getattr(galaxy.model, xml_tree.text, None)
return model_class
def _build_getattr_lambda(self, attr_name_list):
"""
Recursively builds a compound lambda function of getattr's
from the attribute names given in `attr_name_list`.
"""
if len(attr_name_list) == 0:
# identity - if list is empty, return object itself
return lambda o: o
next_attr_name = attr_name_list[-1]
if len(attr_name_list) == 1:
# recursive base case
return lambda o: getattr(o, next_attr_name)
# recursive case
return lambda o: getattr(self._build_getattr_lambda(attr_name_list[:-1])(o), next_attr_name)
[docs] def parse_tests(self, xml_tree_list):
"""
Returns a list of test dictionaries that the registry can use
against a given object to determine if the visualization can be
used with the object.
"""
# tests should NOT include expensive operations: reading file data, running jobs, etc.
# do as much here as possible to reduce the overhead of seeing if a visualization is applicable
# currently tests are or'd only (could be and'd or made into compound boolean tests)
tests = []
if not xml_tree_list:
return tests
for test_elem in xml_tree_list:
test_type = test_elem.get('type', 'eq')
test_result = test_elem.text.strip() if test_elem.text else None
if not test_type or not test_result:
log.warning('Skipping test. Needs both type attribute and text node to be parsed: ' +
'{}, {}'.format(test_type, test_elem.text))
continue
test_result = test_result.strip()
# test_attr can be a dot separated chain of object attributes (e.g. dataset.datatype) - convert to list
# TODO: too dangerous - constrain these to some allowed list
# TODO: does this err if no test_attr - it should...
test_attr = test_elem.get('test_attr')
test_attr = test_attr.split(self.ATTRIBUTE_SPLIT_CHAR) if isinstance(test_attr, str) else []
# log.debug( 'test_type: %s, test_attr: %s, test_result: %s', test_type, test_attr, test_result )
# build a lambda function that gets the desired attribute to test
getter = self._build_getattr_lambda(test_attr)
# result type should tell the registry how to convert the result before the test
test_result_type = test_elem.get('result_type', 'string')
# test functions should be sent an object to test, and the parsed result expected from the test
if test_type == 'isinstance':
# is test_attr attribute an instance of result
# TODO: wish we could take this further but it would mean passing in the datatypes_registry
def test_fn(o, result):
return isinstance(getter(o), result)
elif test_type == 'has_dataprovider':
# does the object itself have a datatype attr and does that datatype have the given dataprovider
def test_fn(o, result):
return (hasattr(getter(o), 'has_dataprovider') and
getter(o).has_dataprovider(result))
elif test_type == 'has_attribute':
# does the object itself have attr in 'result' (no equivalence checking)
def test_fn(o, result):
return hasattr(getter(o), result)
elif test_type == 'not_eq':
def test_fn(o, result):
return str(getter(o)) != result
else:
# default to simple (string) equilavance (coercing the test_attr to a string)
def test_fn(o, result):
return str(getter(o)) == result
tests.append({
'type' : test_type,
'result' : test_result,
'result_type' : test_result_type,
'fn' : test_fn
})
return tests
[docs] def parse_to_params(self, xml_tree_list):
"""
Given a list of `to_param` elements, returns a dictionary that allows
the registry to convert the data_source into one or more appropriate
params for the visualization.
"""
to_param_dict = {}
if not xml_tree_list:
return to_param_dict
for element in xml_tree_list:
# param_name required
param_name = element.text
if not param_name:
raise ParsingException('to_param requires text (the param name)')
param = {}
# assign is a shortcut param_attr that assigns a value to a param (as text)
assign = element.get('assign')
if assign is not None:
param['assign'] = assign
# param_attr is the attribute of the object (that the visualization will be applied to)
# that should be converted into a query param (e.g. param_attr="id" -> dataset_id)
# TODO:?? use the build attr getter here?
# simple (1 lvl) attrs for now
param_attr = element.get('param_attr')
if param_attr is not None:
param['param_attr'] = param_attr
# element must have either param_attr or assign? what about no params (the object itself)
if not param_attr and not assign:
raise ParsingException('to_param requires either assign or param_attr attributes: %s', param_name)
# TODO: consider making the to_param name an attribute (param="hda_ldda") and the text what would
# be used for the conversion - this would allow CDATA values to be passed
# <to_param param="json" type="assign"><![CDATA[{ "one": 1, "two": 2 }]]></to_param>
if param:
to_param_dict[param_name] = param
return to_param_dict
[docs]class ListParser(list):
"""
Converts a xml structure into an array
See: http://code.activestate.com/recipes/410469-xml-as-dictionary/
"""
[docs] def __init__(self, aList):
for element in aList:
if len(element) > 0:
if element.tag == element[0].tag:
self.append(ListParser(element))
else:
self.append(DictParser(element))
elif element.text:
text = element.text.strip()
if text:
self.append(text)
[docs]class DictParser(dict):
"""
Converts a xml structure into a dictionary
See: http://code.activestate.com/recipes/410469-xml-as-dictionary/
"""
[docs] def __init__(self, parent_element):
if parent_element.items():
self.update(dict(parent_element.items()))
for element in parent_element:
if len(element) > 0:
if element.tag == element[0].tag:
aDict = ListParser(element)
else:
aDict = DictParser(element)
if element.items():
aDict.update(dict(element.items()))
self.update({element.tag: aDict})
elif element.items():
self.update({element.tag: dict(element.items())})
else:
self.update({element.tag: element.text})
[docs]class ParamParser:
"""
Component class of VisualizationsConfigParser that parses param elements
within visualization elements.
params are parameters that will be parsed (based on their `type`, etc.)
and sent to the visualization template by controllers.visualization.render.
"""
DEFAULT_PARAM_TYPE = 'str'
[docs] def parse(self, xml_tree):
"""
Parse a visualization parameter from the given `xml_tree`.
"""
returned = {}
# don't store key, just check it
param_key = xml_tree.text
if not param_key:
raise ParsingException('Param entry requires text')
returned['type'] = self.parse_param_type(xml_tree)
# is the parameter required in the template and,
# if not, what is the default value?
required = xml_tree.get('required') == "true"
returned['required'] = required
if not required:
# default defaults to None
default = None
if 'default' in xml_tree.attrib:
default = xml_tree.get('default')
# convert default based on param_type here
returned['default'] = default
# does the param have to be within a list of certain values
# NOTE: the interpretation of this list is deferred till parsing and based on param type
# e.g. it could be 'val in constrain_to', or 'constrain_to is min, max for number', etc.
# TODO: currently unused
constrain_to = xml_tree.get('constrain_to')
if constrain_to:
returned['constrain_to'] = constrain_to.split(',')
# is the param a comma-separated-value list?
returned['csv'] = xml_tree.get('csv') == "true"
# remap keys in the params/query string to the var names used in the template
var_name_in_template = xml_tree.get('var_name_in_template')
if var_name_in_template:
returned['var_name_in_template'] = var_name_in_template
return returned
[docs] def parse_param_type(self, xml_tree):
"""
Parse a param type from the given `xml_tree`.
"""
# default to string as param_type
param_type = xml_tree.get('type') or self.DEFAULT_PARAM_TYPE
# TODO: set parsers and validaters, convert here
return param_type
[docs]class ParamModifierParser(ParamParser):
"""
Component class of VisualizationsConfigParser that parses param_modifier
elements within visualization elements.
param_modifiers are params from a dictionary (such as a query string)
that are not standalone but modify the parsing/conversion of a separate
(normal) param (e.g. 'hda_ldda' can equal 'hda' or 'ldda' and control
whether a visualizations 'dataset_id' param is for an HDA or LDDA).
"""
[docs] def parse(self, element):
# modifies is required
modifies = element.get('modifies')
if not modifies:
raise ParsingException('param_modifier entry requires a target param key (attribute "modifies")')
returned = super().parse(element)
return returned