Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.display_applications.application

# Contains objects for using external display applications
import logging
from collections import OrderedDict
from copy import deepcopy

from six import string_types
from six.moves.urllib.parse import quote_plus

from galaxy.util import (
    parse_xml,
    string_as_bool
)
from galaxy.util.template import fill_template
from .parameters import (
    DEFAULT_DATASET_NAME,
    DisplayApplicationDataParameter,
    DisplayApplicationParameter
)
from .util import encode_dataset_user

log = logging.getLogger(__name__)


[docs]def quote_plus_string(value, **kwds): # Simple helper to make sure value is a string when trying to quote # Object passed in might not be a bare string/bytes, if is e.g., a template result # Prevents e.g. issue of "quote_from_bytes() expected bytes" return quote_plus(str(value), **kwds)
[docs]class DynamicDisplayApplicationBuilder(object):
[docs] def __init__(self, elem, display_application, build_sites): filename = None data_table = None if elem.get('site_type', None) is not None: filename = build_sites.get(elem.get('site_type')) else: filename = elem.get('from_file', None) if filename is None: data_table_name = elem.get('from_data_table', None) if data_table_name: data_table = display_application.app.tool_data_tables.get(data_table_name, None) assert data_table is not None, 'Unable to find data table named "%s".' % data_table_name assert filename is not None or data_table is not None, 'Filename or data Table is required for dynamic_links.' skip_startswith = elem.get('skip_startswith', None) separator = elem.get('separator', '\t') id_col = elem.get('id', None) try: id_col = int(id_col) except (TypeError, ValueError): if data_table: if id_col is None: id_col = data_table.columns.get('id', None) if id_col is None: id_col = data_table.columns.get('value', None) try: id_col = int(id_col) except (TypeError, ValueError): # id is set to a string or None, use column by that name if available id_col = data_table.columns.get(id_col, None) id_col = int(id_col) name_col = elem.get('name', None) try: name_col = int(name_col) except (TypeError, ValueError): if data_table: if name_col is None: name_col = data_table.columns.get('name', None) else: name_col = data_table.columns.get(name_col, None) else: name_col = None if name_col is None: name_col = id_col max_col = max(id_col, name_col) dynamic_params = {} if data_table is not None: max_col = max([max_col] + list(data_table.columns.values())) for key, value in data_table.columns.items(): dynamic_params[key] = {'column': value, 'split': False, 'separator': ','} for dynamic_param in elem.findall('dynamic_param'): name = dynamic_param.get('name') value = int(dynamic_param.get('value')) split = string_as_bool(dynamic_param.get('split', False)) param_separator = dynamic_param.get('separator', ',') max_col = max(max_col, value) dynamic_params[name] = {'column': value, 'split': split, 'separator': param_separator} if filename: data_iter = open(filename) elif data_table: version, data_iter = data_table.get_version_fields() display_application.add_data_table_watch(data_table.name, version) links = [] for line in data_iter: if isinstance(line, string_types): if not skip_startswith or not line.startswith(skip_startswith): line = line.rstrip('\n\r') if not line: continue fields = line.split(separator) else: continue else: fields = line if len(fields) > max_col: new_elem = deepcopy(elem) new_elem.set('id', fields[id_col]) new_elem.set('name', fields[name_col]) dynamic_values = {} for key, attributes in dynamic_params.items(): value = fields[attributes['column']] if attributes['split']: value = value.split(attributes['separator']) dynamic_values[key] = value # now populate links.append(DisplayApplicationLink.from_elem(new_elem, display_application, other_values=dynamic_values)) else: log.warning('Invalid dynamic display application link specified in %s: "%s"' % (filename, line)) self.links = links
def __iter__(self): return iter(self.links)
[docs]class DisplayApplication(object):
[docs] @classmethod def from_file(cls, filename, app): return cls.from_elem(parse_xml(filename).getroot(), app, filename=filename)
[docs] @classmethod def from_elem(cls, elem, app, filename=None): att_dict = cls._get_attributes_from_elem(elem) rval = DisplayApplication(att_dict['id'], att_dict['name'], app, att_dict['version'], filename=filename, elem=elem) rval._load_links_from_elem(elem) return rval
@classmethod def _get_attributes_from_elem(cls, elem): display_id = elem.get('id', None) assert display_id, "ID tag is required for a Display Application" name = elem.get('name', display_id) version = elem.get('version', None) return dict(id=display_id, name=name, version=version)
[docs] def __init__(self, display_id, name, app, version=None, filename=None, elem=None): self.id = display_id self.name = name self.app = app if version is None: version = "1.0.0" self.version = version self.links = OrderedDict() self._filename = filename self._elem = elem self._data_table_versions = {}
def _load_links_from_elem(self, elem): for link_elem in elem.findall('link'): link = DisplayApplicationLink.from_elem(link_elem, self) if link: self.links[link.id] = link try: for dynamic_links in elem.findall('dynamic_links'): for link in DynamicDisplayApplicationBuilder(dynamic_links, self, self.app.datatypes_registry.build_sites): self.links[link.id] = link except Exception as e: log.error("Error loading a set of Dynamic Display Application links: %s", e)
[docs] def filter_by_dataset(self, data, trans): self._check_and_reload() filtered = DisplayApplication(self.id, self.name, self.app, version=self.version) for link_name, link_value in self.links.items(): if link_value.filter_by_dataset(data, trans): filtered.links[link_name] = link_value return filtered
[docs] def reload(self): if self._filename: elem = parse_xml(self._filename).getroot() elif self._elem: elem = self._elem else: raise Exception("Unable to reload DisplayApplication %s." % (self.name)) # All toolshed-specific attributes added by e.g the registry will remain attr_dict = self._get_attributes_from_elem(elem) # We will not allow changing the id at this time (we'll need to fix several mappings upstream to handle this case) assert attr_dict.get('id') == self.id, ValueError("You cannot reload a Display application where the ID has changed. You will need to restart the server instead.") # clear old links self.links = {} # clear data table versions: self._data_table_versions = {} # Set new attributes for key, value in attr_dict.items(): setattr(self, key, value) # Load new links self._load_links_from_elem(elem) return self
[docs] def add_data_table_watch(self, table_name, version=None): self._data_table_versions[table_name] = version
def _requires_reload(self): for key, value in self._data_table_versions.items(): table = self.app.tool_data_tables.get(key, None) if table and not table.is_current_version(value): return True return False def _check_and_reload(self): if self._requires_reload(): self.reload()