Source code for galaxy.datatypes.display_applications.application

# Contains objects for using external display applications
import logging
from copy import deepcopy
from urllib.parse import quote_plus

from galaxy.util import (
    parse_xml,
    string_as_bool,
)
from galaxy.util.template import fill_template
from .parameters import (
    DEFAULT_DATASET_NAME,
    DisplayApplicationDataParameter,
    DisplayApplicationParameter,
)
from .util import encode_dataset_user

log = logging.getLogger(__name__)


[docs]def quote_plus_string(value, **kwds): # Simple helper to make sure value is a string when trying to quote # Object passed in might not be a bare string/bytes, if is e.g., a template result # Prevents e.g. issue of "quote_from_bytes() expected bytes" return quote_plus(str(value), **kwds)
[docs]class DynamicDisplayApplicationBuilder:
[docs] def __init__(self, elem, display_application, build_sites): filename = None data_table = None if elem.get("site_type", None) is not None: filename = build_sites.get(elem.get("site_type")) else: filename = elem.get("from_file", None) if filename is None: data_table_name = elem.get("from_data_table", None) if data_table_name: data_table = display_application.app.tool_data_tables.get(data_table_name, None) assert data_table is not None, f'Unable to find data table named "{data_table_name}".' assert filename is not None or data_table is not None, "Filename or data Table is required for dynamic_links." skip_startswith = elem.get("skip_startswith", None) separator = elem.get("separator", "\t") id_col = elem.get("id", None) try: id_col = int(id_col) except (TypeError, ValueError): if data_table: if id_col is None: id_col = data_table.columns.get("id", None) if id_col is None: id_col = data_table.columns.get("value", None) try: id_col = int(id_col) except (TypeError, ValueError): # id is set to a string or None, use column by that name if available id_col = data_table.columns.get(id_col, None) id_col = int(id_col) name_col = elem.get("name", None) try: name_col = int(name_col) except (TypeError, ValueError): if data_table: if name_col is None: name_col = data_table.columns.get("name", None) else: name_col = data_table.columns.get(name_col, None) else: name_col = None if name_col is None: name_col = id_col max_col = max(id_col, name_col) dynamic_params = {} if data_table is not None: max_col = max([max_col] + list(data_table.columns.values())) for key, value in data_table.columns.items(): dynamic_params[key] = {"column": value, "split": False, "separator": ","} for dynamic_param in elem.findall("dynamic_param"): name = dynamic_param.get("name") value = int(dynamic_param.get("value")) split = string_as_bool(dynamic_param.get("split", False)) param_separator = dynamic_param.get("separator", ",") max_col = max(max_col, value) dynamic_params[name] = {"column": value, "split": split, "separator": param_separator} if filename: data_iter = open(filename) elif data_table: version, data_iter = data_table.get_version_fields() display_application.add_data_table_watch(data_table.name, version) links = [] for line in data_iter: if isinstance(line, str): if not skip_startswith or not line.startswith(skip_startswith): line = line.rstrip("\n\r") if not line: continue fields = line.split(separator) else: continue else: fields = line if len(fields) > max_col: new_elem = deepcopy(elem) new_elem.set("id", fields[id_col]) new_elem.set("name", fields[name_col]) dynamic_values = {} for key, attributes in dynamic_params.items(): value = fields[attributes["column"]] if attributes["split"]: value = value.split(attributes["separator"]) dynamic_values[key] = value # now populate links.append( DisplayApplicationLink.from_elem(new_elem, display_application, other_values=dynamic_values) ) else: log.warning(f'Invalid dynamic display application link specified in {filename}: "{line}"') self.links = links
def __iter__(self): return iter(self.links)
[docs]class DisplayApplication:
[docs] @classmethod def from_file(cls, filename, app): return cls.from_elem(parse_xml(filename).getroot(), app, filename=filename)
[docs] @classmethod def from_elem(cls, elem, app, filename=None): att_dict = cls._get_attributes_from_elem(elem) rval = DisplayApplication( att_dict["id"], att_dict["name"], app, att_dict["version"], filename=filename, elem=elem ) rval._load_links_from_elem(elem) return rval
@classmethod def _get_attributes_from_elem(cls, elem): display_id = elem.get("id", None) assert display_id, "ID tag is required for a Display Application" name = elem.get("name", display_id) version = elem.get("version", None) return dict(id=display_id, name=name, version=version)
[docs] def __init__(self, display_id, name, app, version=None, filename=None, elem=None): self.id = display_id self.name = name self.app = app if version is None: version = "1.0.0" self.version = version self.links = {} self._filename = filename self._elem = elem self._data_table_versions = {}
def _load_links_from_elem(self, elem): for link_elem in elem.findall("link"): link = DisplayApplicationLink.from_elem(link_elem, self) if link: self.links[link.id] = link try: for dynamic_links in elem.findall("dynamic_links"): for link in DynamicDisplayApplicationBuilder( dynamic_links, self, self.app.datatypes_registry.build_sites ): self.links[link.id] = link except Exception as e: log.error("Error loading a set of Dynamic Display Application links: %s", e)
[docs] def filter_by_dataset(self, data, trans): self._check_and_reload() filtered = DisplayApplication(self.id, self.name, self.app, version=self.version) for link_name, link_value in self.links.items(): if link_value.filter_by_dataset(data, trans): filtered.links[link_name] = link_value return filtered
[docs] def reload(self): if self._filename: elem = parse_xml(self._filename).getroot() elif self._elem: elem = self._elem else: raise Exception(f"Unable to reload DisplayApplication {self.name}.") # All toolshed-specific attributes added by e.g the registry will remain attr_dict = self._get_attributes_from_elem(elem) # We will not allow changing the id at this time (we'll need to fix several mappings upstream to handle this case) assert attr_dict.get("id") == self.id, ValueError( "You cannot reload a Display application where the ID has changed. You will need to restart the server instead." ) # clear old links self.links = {} # clear data table versions: self._data_table_versions = {} # Set new attributes for key, value in attr_dict.items(): setattr(self, key, value) # Load new links self._load_links_from_elem(elem) return self
[docs] def add_data_table_watch(self, table_name, version=None): self._data_table_versions[table_name] = version
def _requires_reload(self): for key, value in self._data_table_versions.items(): table = self.app.tool_data_tables.get(key, None) if table and not table.is_current_version(value): return True return False def _check_and_reload(self): if self._requires_reload(): self.reload()