Source code for galaxy.visualization.plugins.resource_parser

"""
Deserialize Galaxy resources (hdas, ldas, datasets, genomes, etc.) from
a dictionary of string data/ids (often from a query string).
"""
import json
import logging
import weakref

import galaxy.exceptions
import galaxy.util
from galaxy.managers import (
    hdas as hda_manager,
    visualizations as visualization_manager
)
from galaxy.util import bunch

log = logging.getLogger(__name__)


[docs]class ResourceParser(object): """ Given a parameter dictionary (often a converted query string) and a configuration dictionary (curr. only VisualizationsRegistry uses this), convert the entries in the parameter dictionary into resources (Galaxy models, primitive types, lists of either, etc.) and return in a new dictionary. The keys used to store the new values can optionally be re-mapped to new keys (e.g. dataset_id="NNN" -> hda=<HistoryDatasetAssociation>). """ primitive_parsers = { 'str' : lambda param: galaxy.util.sanitize_html.sanitize_html(param), 'bool' : lambda param: galaxy.util.string_as_bool(param), 'int' : int, 'float' : float, # 'date' : lambda param: , 'json' : (lambda param: json.loads( galaxy.util.sanitize_html.sanitize_html(param))), }
[docs] def __init__(self, app, *args, **kwargs): self.app = weakref.ref(app) self.managers = self._init_managers(app)
def _init_managers(self, app): return bunch.Bunch( visualization=visualization_manager.VisualizationManager(app), hda=hda_manager.HDAManager(app) )
[docs] def parse_parameter_dictionary(self, trans, param_config_dict, query_params, param_modifiers=None): """ Parse all expected params from the query dictionary `query_params`. If param is required and not present, raises a `KeyError`. """ # log.debug( 'parse_parameter_dictionary, query_params:\n%s', query_params ) # parse the modifiers first since they modify the params coming next # TODO: this is all really for hda_ldda - which we could replace with model polymorphism params_that_modify_other_params = self.parse_parameter_modifiers( trans, param_modifiers, query_params) resources = {} for param_name, param_config in param_config_dict.items(): # optionally rename the variable returned, defaulting to the original name var_name_in_template = param_config.get('var_name_in_template', param_name) # if the param is present, get its value, any param modifiers for that param, and parse it into a resource # use try catch here and not caller to fall back on the default value or re-raise if required resource = None query_val = query_params.get(param_name, None) if query_val is not None: try: target_param_modifiers = params_that_modify_other_params.get(param_name, None) resource = self.parse_parameter(trans, param_config, query_val, param_modifiers=target_param_modifiers) except Exception as exception: if trans.debug: raise else: log.warning('Exception parsing visualization param from query: %s, %s, (%s) %s', param_name, query_val, str(type(exception)), str(exception)) resource = None # here - we've either had no value in the query_params or there was a failure to parse # so: error if required, otherwise get a default (which itself defaults to None) if resource is None: if param_config['required']: raise KeyError('required param %s not found in URL' % (param_name)) resource = self.parse_parameter_default(trans, param_config) resources[var_name_in_template] = resource return resources
[docs] def parse_config(self, trans, param_config_dict, query_params): """ Return `query_params` dict parsing only JSON serializable params. Complex params such as models, etc. are left as the original query value. Keys in `query_params` not found in the `param_config_dict` will not be returned. """ # log.debug( 'parse_config, query_params:\n%s', query_params ) config = {} for param_name, param_config in param_config_dict.items(): config_val = query_params.get(param_name, None) if config_val is not None and param_config['type'] in self.primitive_parsers: try: config_val = self.parse_parameter(trans, param_config, config_val) except Exception as exception: log.warning('Exception parsing visualization param from query: ' + '%s, %s, (%s) %s' % (param_name, config_val, str(type(exception)), str(exception))) config_val = None # here - we've either had no value in the query_params or there was a failure to parse # so: if there's a default and it's not None, add it to the config if config_val is None: if param_config.get('default', None) is None: continue config_val = self.parse_parameter_default(trans, param_config) config[param_name] = config_val return config
# TODO: I would LOVE to rip modifiers out completely
[docs] def parse_parameter_modifiers(self, trans, param_modifiers, query_params): """ Parse and return parameters that are meant to modify other parameters, be grouped with them, or are needed to successfully parse other parameters. """ # only one level of modification - down that road lies madness # parse the modifiers out of query_params first since they modify the other params coming next parsed_modifiers = {} if not param_modifiers: return parsed_modifiers # precondition: expects a two level dictionary # { target_param_name -> { param_modifier_name -> { param_modifier_data }}} for target_param_name, modifier_dict in param_modifiers.items(): parsed_modifiers[target_param_name] = target_modifiers = {} for modifier_name, modifier_config in modifier_dict.items(): query_val = query_params.get(modifier_name, None) if query_val is not None: modifier = self.parse_parameter(trans, modifier_config, query_val) target_modifiers[modifier_name] = modifier else: # TODO: required attr? target_modifiers[modifier_name] = self.parse_parameter_default(trans, modifier_config) return parsed_modifiers
[docs] def parse_parameter_default(self, trans, param_config): """ Parse any default values for the given param, defaulting the default to `None`. """ # currently, *default* default is None, so this is quaranteed to be part of the dictionary default = param_config['default'] # if default is None, do not attempt to parse it if default is None: return default # otherwise, parse (currently param_config['default'] is a string just like query param and needs to be parsed) # this saves us the trouble of parsing the default when the config file is read # (and adding this code to the xml parser) return self.parse_parameter(trans, param_config, default)
[docs] def parse_parameter(self, trans, expected_param_data, query_param, recurse=True, param_modifiers=None): """ Use data in `expected_param_data` to parse `query_param` from a string into a resource usable directly by a template. """ param_type = expected_param_data.get('type') # constrain_to = expected_param_data.get( 'constrain_to' ) csv = expected_param_data.get('csv') parsed_param = None # handle recursion for csv values if csv and recurse: parsed_param = [] query_param_list = galaxy.util.listify(query_param) for query_param in query_param_list: parsed_param.append(self._parse_param(trans, expected_param_data, query_param, recurse=False)) return parsed_param if param_type in self.primitive_parsers: # TODO: what about param modifiers on primitives? parsed_param = self.primitive_parsers[param_type](query_param) # TODO: constrain_to: this gets complicated - remove? # db models elif param_type == 'visualization': # ?: is this even used anymore/anywhere? decoded_visualization_id = self._decode_id(query_param) parsed_param = self.managers.visualization.get_accessible(decoded_visualization_id, trans.user) elif param_type == 'dataset': decoded_dataset_id = self._decode_id(query_param) parsed_param = self.managers.hda.get_accessible(decoded_dataset_id, trans.user) elif param_type == 'hda_or_ldda': encoded_dataset_id = query_param # needs info from another param... hda_ldda = param_modifiers.get('hda_ldda') if hda_ldda == 'hda': decoded_dataset_id = self._decode_id(encoded_dataset_id) parsed_param = self.managers.hda.get_accessible(decoded_dataset_id, trans.user) else: parsed_param = self.managers.ldda.get(trans, encoded_dataset_id) # TODO: ideally this would check v. a list of valid dbkeys elif param_type == 'dbkey': dbkey = query_param parsed_param = galaxy.util.sanitize_html.sanitize_html(dbkey) return parsed_param
def _decode_id(self, id): try: return self.app().security.decode_id(str(id)) except (ValueError, TypeError): raise galaxy.exceptions.MalformedId( "Malformed id ( %s ) specified, unable to decode" % (str(id)), id=str(id) )