import logging
import os
from json import loads

import yaml
from markupsafe import escape
from paste.httpexceptions import (
from sqlalchemy import (
from sqlalchemy.orm import eagerload, undefer

from galaxy import model, util, web
from galaxy.datatypes.interval import Bed
from galaxy.managers.hdas import HDAManager
from galaxy.managers.sharable import SlugBuilder
from galaxy.model.item_attrs import UsesAnnotations, UsesItemRatings
from galaxy.structured_app import StructuredApp
from galaxy.util import sanitize_text, unicodify
from galaxy.util.sanitize_html import sanitize_html
from galaxy.visualization.data_providers.genome import RawBedDataProvider
from galaxy.visualization.data_providers.phyloviz import PhylovizDataProvider
from galaxy.visualization.genomes import decode_dbkey
from galaxy.visualization.genomes import GenomeRegion
from galaxy.visualization.plugins import registry
from galaxy.web.framework.helpers import grids, time_ago
from galaxy.webapps.base.controller import (
from ..api import depends

log = logging.getLogger(__name__)

# -- Grids --
[docs]class HistoryDatasetsSelectionGrid(grids.Grid):
[docs] class DbKeyColumn(grids.GridColumn):
[docs] def filter(self, trans, user, query, dbkey): """ Filter by dbkey through a raw SQL b/c metadata is a BLOB. """ dbkey_user, dbkey = decode_dbkey(dbkey) dbkey = dbkey.replace("'", "\\'") return query.filter(or_(text("metadata like '%%\"dbkey\": [\"%s\"]%%'" % dbkey, "metadata like '%%\"dbkey\": \"%s\"%%'" % dbkey)))
[docs] class HistoryColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, hda): return escape(hda.history.name)
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort query using this column.""" return grids.GridColumn.sort(self, trans, query, ascending, column_name="history_id")
available_tracks = None title = "Add Datasets" model_class = model.HistoryDatasetAssociation default_filter = {"deleted": "False", "shared": "All"} default_sort_key = "-hid" columns = [ grids.GridColumn("Id", key="hid"), grids.TextColumn("Name", key="name", model_class=model.HistoryDatasetAssociation), grids.TextColumn("Type", key="extension", model_class=model.HistoryDatasetAssociation), grids.TextColumn("history_id", key="history_id", model_class=model.HistoryDatasetAssociation, visible=False), HistoryColumn("History", key="history", visible=True), DbKeyColumn("Build", key="dbkey", model_class=model.HistoryDatasetAssociation, visible=True, sortable=False) ] columns.append( grids.MulticolFilterColumn("Search name and filetype", cols_to_filter=[columns[1], columns[2]], key="free-text-search", visible=False, filterable="standard") )
[docs] def build_initial_query(self, trans, **kwargs): return trans.sa_session.query(self.model_class).join(model.History.table).join(model.Dataset.table)
[docs] def apply_query_filter(self, trans, query, **kwargs): if self.available_tracks is None: self.available_tracks = trans.app.datatypes_registry.get_available_tracks() return query.filter(model.History.user == trans.user) \ .filter(model.HistoryDatasetAssociation.extension.in_(self.available_tracks)) \ .filter(model.Dataset.state == model.Dataset.states.OK) \ .filter(model.HistoryDatasetAssociation.deleted == false()) \ .filter(model.HistoryDatasetAssociation.visible == true())
[docs]class LibraryDatasetsSelectionGrid(grids.Grid): available_tracks = None title = "Add Datasets" model_class = model.LibraryDatasetDatasetAssociation default_filter = {"deleted": "False"} default_sort_key = "-id" columns = [ grids.GridColumn("Id", key="id"), grids.TextColumn("Name", key="name", model_class=model.LibraryDatasetDatasetAssociation), grids.TextColumn("Type", key="extension", model_class=model.LibraryDatasetDatasetAssociation), ] columns.append( grids.MulticolFilterColumn("Search name and filetype", cols_to_filter=[columns[1], columns[2]], key="free-text-search", visible=False, filterable="standard") )
[docs] def build_initial_query(self, trans, **kwargs): return trans.sa_session.query(self.model_class).join(model.Dataset.table)
[docs] def apply_query_filter(self, trans, query, **kwargs): if self.available_tracks is None: self.available_tracks = trans.app.datatypes_registry.get_available_tracks() return query.filter(model.LibraryDatasetDatasetAssociation.user == trans.user) \ .filter(model.LibraryDatasetDatasetAssociation.extension.in_(self.available_tracks)) \ .filter(model.Dataset.state == model.Dataset.states.OK) \ .filter(model.LibraryDatasetDatasetAssociation.deleted == false()) \ .filter(model.LibraryDatasetDatasetAssociation.visible == true())
[docs]class TracksterSelectionGrid(grids.Grid): title = "Insert into visualization" model_class = model.Visualization default_sort_key = "-update_time" use_paging = False show_item_checkboxes = True columns = [ grids.TextColumn("Title", key="title", model_class=model.Visualization, filterable="standard"), grids.TextColumn("Build", key="dbkey", model_class=model.Visualization), grids.GridColumn("Last Updated", key="update_time", format=time_ago) ]
[docs] def build_initial_query(self, trans, **kwargs): return trans.sa_session.query(self.model_class)
[docs] def apply_query_filter(self, trans, query, **kwargs): return query.filter(self.model_class.user_id == trans.user.id) \ .filter(self.model_class.deleted == false()) \ .filter(self.model_class.type == "trackster")
[docs]class VisualizationListGrid(grids.Grid):
[docs] def get_url_args(item): """ Returns dictionary used to create item link. """ url_kwargs = dict(controller='visualization', id=item.id) # TODO: hack to build link to saved visualization - need trans in this function instead in order to do # link_data = trans.app.visualizations_registry.get_visualizations( trans, item ) if item.type in registry.VisualizationsRegistry.BUILT_IN_VISUALIZATIONS: url_kwargs['action'] = item.type else: url_kwargs['__route_name__'] = 'saved_visualization' url_kwargs['visualization_name'] = item.type url_kwargs['action'] = 'saved' return url_kwargs
[docs] def get_display_name(self, trans, item): if trans.app.visualizations_registry and item.type in trans.app.visualizations_registry.plugins: plugin = trans.app.visualizations_registry.plugins[item.type] return plugin.config.get('name', item.type) return item.type
# Grid definition title = "Saved Visualizations" model_class = model.Visualization default_sort_key = "-update_time" default_filter = dict(title="All", deleted="False", tags="All", sharing="All") columns = [ grids.TextColumn("Title", key="title", attach_popup=True, link=get_url_args), grids.TextColumn("Type", method='get_display_name'), grids.TextColumn("Build", key="dbkey"), grids.IndividualTagsColumn("Tags", key="tags", model_tag_association_class=model.VisualizationTagAssociation, filterable="advanced", grid_name="VisualizationListGrid"), grids.SharingStatusColumn("Sharing", key="sharing", filterable="advanced", sortable=False), grids.GridColumn("Created", key="create_time", format=time_ago), grids.GridColumn("Last Updated", key="update_time", format=time_ago), ] columns.append( grids.MulticolFilterColumn( "Search", cols_to_filter=[columns[0], columns[2]], key="free-text-search", visible=False, filterable="standard") ) operations = [ grids.GridOperation("Open", allow_multiple=False, url_args=get_url_args), grids.GridOperation("Edit Attributes", allow_multiple=False, url_args=dict(controller="", action='visualizations/edit')), grids.GridOperation("Copy", allow_multiple=False, condition=(lambda item: not item.deleted)), grids.GridOperation("Share or Publish", allow_multiple=False, condition=(lambda item: not item.deleted), url_args=dict(controller="", action="visualizations/sharing")), grids.GridOperation("Delete", condition=(lambda item: not item.deleted), confirm="Are you sure you want to delete this visualization?"), ]
[docs] def apply_query_filter(self, trans, query, **kwargs): return query.filter_by(user=trans.user, deleted=False)
[docs]class VisualizationAllPublishedGrid(grids.Grid): # Grid definition use_panels = True title = "Published Visualizations" model_class = model.Visualization default_sort_key = "update_time" default_filter = dict(title="All", username="All") columns = [ grids.PublicURLColumn("Title", key="title", filterable="advanced"), grids.OwnerAnnotationColumn("Annotation", key="annotation", model_annotation_association_class=model.VisualizationAnnotationAssociation, filterable="advanced"), grids.OwnerColumn("Owner", key="username", model_class=model.User, filterable="advanced"), grids.CommunityRatingColumn("Community Rating", key="rating"), grids.CommunityTagsColumn("Community Tags", key="tags", model_tag_association_class=model.VisualizationTagAssociation, filterable="advanced", grid_name="VisualizationAllPublishedGrid"), grids.ReverseSortColumn("Last Updated", key="update_time", format=time_ago) ] columns.append( grids.MulticolFilterColumn( "Search title, annotation, owner, and tags", cols_to_filter=[columns[0], columns[1], columns[2], columns[4]], key="free-text-search", visible=False, filterable="standard") )
[docs] def build_initial_query(self, trans, **kwargs): # See optimization description comments and TODO for tags in matching public histories query. return trans.sa_session.query(self.model_class).join("user").options(eagerload("user").load_only("username"), eagerload("annotations"), undefer("average_rating"))
[docs] def apply_query_filter(self, trans, query, **kwargs): return query.filter(self.model_class.deleted == false()).filter(self.model_class.published == true())
[docs]class VisualizationController(BaseUIController, SharableMixin, UsesVisualizationMixin, UsesAnnotations, UsesItemRatings): _visualization_list_grid = VisualizationListGrid() _published_list_grid = VisualizationAllPublishedGrid() _history_datasets_grid = HistoryDatasetsSelectionGrid() _library_datasets_grid = LibraryDatasetsSelectionGrid() _tracks_grid = TracksterSelectionGrid() hda_manager: HDAManager = depends(HDAManager) slug_builder: SlugBuilder = depends(SlugBuilder)
[docs] def __init__(self, app: StructuredApp): super().__init__(app)
# # -- Functions for listing visualizations. -- #
[docs] @web.expose @web.json @web.require_login("see all available libraries") def list_libraries(self, trans, **kwargs): """List all libraries that can be used for selecting datasets.""" return self._libraries_grid(trans, **kwargs)
[docs] @web.expose @web.json @web.require_login("see a history's datasets that can added to this visualization") def list_history_datasets(self, trans, **kwargs): """List a history's datasets that can be added to a visualization.""" kwargs['show_item_checkboxes'] = 'True' return self._history_datasets_grid(trans, **kwargs)
[docs] @web.expose @web.json @web.require_login("see a history's datasets that can added to this visualization") def list_library_datasets(self, trans, **kwargs): """List a library's datasets that can be added to a visualization.""" kwargs['show_item_checkboxes'] = 'True' return self._library_datasets_grid(trans, **kwargs)
[docs] @web.expose @web.json def list_tracks(self, trans, **kwargs): return self._tracks_grid(trans, **kwargs)
[docs] @web.expose @web.json def list_published(self, trans, *args, **kwargs): grid = self._published_list_grid(trans, **kwargs) grid['shared_by_others'] = self._get_shared(trans) return grid
[docs] @web.legacy_expose_api @web.require_login("use Galaxy visualizations", use_panels=True) def list(self, trans, **kwargs): message = kwargs.get('message') status = kwargs.get('status') if 'operation' in kwargs and 'id' in kwargs: session = trans.sa_session operation = kwargs['operation'].lower() ids = util.listify(kwargs['id']) for id in ids: item = session.query(model.Visualization).get(self.decode_id(id)) if operation == "delete": item.deleted = True if operation == "copy": self.copy(trans, **kwargs) session.flush() kwargs['embedded'] = True if message and status: kwargs['message'] = sanitize_text(message) kwargs['status'] = status grid = self._visualization_list_grid(trans, **kwargs) grid['shared_by_others'] = self._get_shared(trans) return grid
def _get_shared(self, trans): """Identify shared visualizations""" shared_by_others = trans.sa_session \ .query(model.VisualizationUserShareAssociation) \ .filter_by(user=trans.get_user()) \ .join(model.Visualization.table) \ .filter(model.Visualization.deleted == false()) \ .order_by(desc(model.Visualization.update_time)) \ .all() return [{'username': v.visualization.user.username, 'slug': v.visualization.slug, 'title': v.visualization.title} for v in shared_by_others] # # -- Functions for operating on visualizations. -- #
[docs] @web.expose @web.require_login("use Galaxy visualizations", use_panels=True) def index(self, trans, *args, **kwargs): """ Lists user's saved visualizations. """ return self.list(trans, *args, **kwargs)
[docs] @web.expose @web.require_login() def copy(self, trans, id, **kwargs): visualization = self.get_visualization(trans, id, check_ownership=False) user = trans.get_user() owner = (visualization.user == user) new_title = "Copy of '%s'" % visualization.title if not owner: new_title += " shared by %s" % visualization.user.email copied_viz = visualization.copy(user=trans.user, title=new_title) # Persist session = trans.sa_session session.add(copied_viz) session.flush() # Display the management page trans.set_message('Created new visualization with name "%s"' % copied_viz.title) return
[docs] @web.expose @web.require_login("use Galaxy visualizations") def set_accessible_async(self, trans, id=None, accessible=False): """ Set visualization's importable attribute and slug. """ visualization = self.get_visualization(trans, id) # Only set if importable value would change; this prevents a change in the update_time unless attribute really changed. importable = accessible in ['True', 'true', 't', 'T'] if visualization and visualization.importable != importable: if importable: self._make_item_accessible(trans.sa_session, visualization) else: visualization.importable = importable trans.sa_session.flush() return
[docs] @web.expose @web.require_login("rate items") @web.json def rate_async(self, trans, id, rating): """ Rate a visualization asynchronously and return updated community data. """ visualization = self.get_visualization(trans, id, check_ownership=False, check_accessible=True) if not visualization: return trans.show_error_message("The specified visualization does not exist.") # Rate visualization. self.rate_item(trans.sa_session, trans.get_user(), visualization, rating) return self.get_ave_item_rating_data(trans.sa_session, visualization)
[docs] @web.expose @web.require_login("share Galaxy visualizations") def imp(self, trans, id): """ Import a visualization into user's workspace. """ # Set referer message. referer = trans.request.referer if referer: referer_message = "<a href='%s'>return to the previous page</a>" % escape(referer) else: referer_message = "<a href='%s'>go to Galaxy's start page</a>" % web.url_for('/') # Do import. session = trans.sa_session visualization = self.get_visualization(trans, id, check_ownership=False) if visualization.importable is False: return trans.show_error_message("The owner of this visualization has disabled imports via this link.<br>You can %s" % referer_message, use_panels=True) elif visualization.deleted: return trans.show_error_message("You can't import this visualization because it has been deleted.<br>You can %s" % referer_message, use_panels=True) else: # Create imported visualization via copy. # TODO: need to handle custom db keys. imported_visualization = visualization.copy(user=trans.user, title="imported: " + visualization.title) # Persist session = trans.sa_session session.add(imported_visualization) session.flush() # Redirect to load galaxy frames. return trans.show_ok_message( message="""Visualization "%s" has been imported. <br>You can <a href="%s">start using this visualization</a> or %s.""" % (visualization.title, web.url_for('/visualizations/list'), referer_message), use_panels=True)
[docs] @web.expose @web.require_login("share Galaxy visualizations") def share(self, trans, id=None, email="", use_panels=False): """ Handle sharing a visualization with a particular user. """ msg = mtype = None visualization = self.get_visualization(trans, id, check_ownership=True) if email: other = trans.sa_session.query(model.User) \ .filter(and_(model.User.table.c.email == email, model.User.table.c.deleted == false())) \ .first() if not other: mtype = "error" msg = ("User '%s' does not exist" % escape(email)) elif other == trans.get_user(): mtype = "error" msg = ("You cannot share a visualization with yourself") elif trans.sa_session.query(model.VisualizationUserShareAssociation) \ .filter_by(user=other, visualization=visualization).count() > 0: mtype = "error" msg = ("Visualization already shared with '%s'" % escape(email)) else: share = model.VisualizationUserShareAssociation() share.visualization = visualization share.user = other session = trans.sa_session session.add(share) self.slug_builder.create_item_slug(session, visualization) session.flush() viz_title = escape(visualization.title) other_email = escape(other.email) trans.set_message(f"Visualization '{viz_title}' shared with user '{other_email}'") return trans.response.send_redirect(web.url_for("/visualizations/sharing?id=%s" % id)) return trans.fill_template("/ind_share_base.mako", message=msg, messagetype=mtype, item=visualization, email=email, use_panels=use_panels)
[docs] @web.expose def display_by_username_and_slug(self, trans, username, slug): """ Display visualization based on a username and slug. """ # Get visualization. session = trans.sa_session user = session.query(model.User).filter_by(username=username).first() visualization = trans.sa_session.query(model.Visualization).filter_by(user=user, slug=slug, deleted=False).first() if visualization is None: raise web.httpexceptions.HTTPNotFound() # Security check raises error if user cannot access visualization. self.security_check(trans, visualization, check_ownership=False, check_accessible=True) # Get rating data. user_item_rating = 0 if trans.get_user(): user_item_rating = self.get_user_item_rating(trans.sa_session, trans.get_user(), visualization) if user_item_rating: user_item_rating = user_item_rating.rating else: user_item_rating = 0 ave_item_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, visualization) # Fork to template based on visualization.type (registry or builtin). if((trans.app.visualizations_registry and visualization.type in trans.app.visualizations_registry.plugins) and (visualization.type not in trans.app.visualizations_registry.BUILT_IN_VISUALIZATIONS)): # if a registry visualization, load a version of display.mako that will load the vis into an iframe :( # TODO: simplest path from A to B but not optimal - will be difficult to do reg visualizations any other way # TODO: this will load the visualization twice (once above, once when the iframe src calls 'saved') encoded_visualization_id = trans.security.encode_id(visualization.id) return trans.stream_template_mako('visualization/display_in_frame.mako', item=visualization, encoded_visualization_id=encoded_visualization_id, user_item_rating=user_item_rating, ave_item_rating=ave_item_rating, num_ratings=num_ratings, content_only=True) visualization_config = self.get_visualization_config(trans, visualization) return trans.stream_template_mako("visualization/display.mako", item=visualization, item_data=visualization_config, user_item_rating=user_item_rating, ave_item_rating=ave_item_rating, num_ratings=num_ratings, content_only=True)
[docs] @web.expose def get_item_content_async(self, trans, id): """ Returns item content in HTML format. """ # Get visualization, making sure it's accessible. visualization = self.get_visualization(trans, id, check_ownership=False, check_accessible=True) if visualization is None: raise web.httpexceptions.HTTPNotFound() # Return content. visualization_config = self.get_visualization_config(trans, visualization) return trans.fill_template_mako("visualization/item_content.mako", encoded_id=trans.security.encode_id(visualization.id), item=visualization, item_data=visualization_config, content_only=True)
[docs] @web.json def save(self, trans, vis_json=None, type=None, id=None, title=None, dbkey=None, annotation=None): """ Save a visualization; if visualization does not have an ID, a new visualization is created. Returns JSON of visualization. """ # Get visualization attributes from kwargs or from config. vis_config = loads(vis_json) vis_type = type or vis_config['type'] vis_id = id or vis_config.get('id', None) vis_title = title or vis_config.get('title', None) vis_dbkey = dbkey or vis_config.get('dbkey', None) vis_annotation = annotation or vis_config.get('annotation', None) return self.save_visualization(trans, vis_config, vis_type, vis_id, vis_title, vis_dbkey, vis_annotation)
[docs] @web.legacy_expose_api @web.require_login("edit visualizations") def edit(self, trans, payload=None, **kwd): """ Edit a visualization's attributes. """ id = kwd.get('id') if not id: return self.message_exception(trans, 'No visualization id received for editing.') trans_user = trans.get_user() v = self.get_visualization(trans, id, check_ownership=True) if trans.request.method == 'GET': if v.slug is None: self.slug_builder.create_item_slug(trans.sa_session, v) return { 'title': 'Edit visualization attributes', 'inputs': [{ 'name': 'title', 'label': 'Name', 'value': v.title }, { 'name': 'slug', 'label': 'Identifier', 'value': v.slug, 'help': 'A unique identifier that will be used for public links to this visualization. This field can only contain lowercase letters, numbers, and dashes (-).' }, { 'name': 'dbkey', 'label': 'Build', 'type': 'select', 'optional': True, 'value': v.dbkey, 'options': trans.app.genomes.get_dbkeys(trans_user, chrom_info=True), 'help': 'Parameter to associate your visualization with a database key.' }, { 'name': 'annotation', 'label': 'Annotation', 'value': self.get_item_annotation_str(trans.sa_session, trans.user, v), 'help': 'A description of the visualization. The annotation is shown alongside published visualizations.' }] } else: v_title = payload.get('title') v_slug = payload.get('slug') v_dbkey = payload.get('dbkey') v_annotation = payload.get('annotation') if not v_title: return self.message_exception(trans, 'Please provide a visualization name is required.') elif not v_slug: return self.message_exception(trans, 'Please provide a unique identifier.') elif not self._is_valid_slug(v_slug): return self.message_exception(trans, 'Visualization identifier can only contain lowercase letters, numbers, and dashes (-).') elif v_slug != v.slug and trans.sa_session.query(model.Visualization).filter_by(user=v.user, slug=v_slug, deleted=False).first(): return self.message_exception(trans, 'Visualization id must be unique.') else: v.title = v_title v.slug = v_slug v.dbkey = v_dbkey if v_annotation: v_annotation = sanitize_html(v_annotation) self.add_item_annotation(trans.sa_session, trans_user, v, v_annotation) trans.sa_session.add(v) trans.sa_session.flush() return {'message': 'Attributes of \'%s\' successfully saved.' % v.title, 'status': 'success'}
# ------------------------- registry.
[docs] @web.expose @web.require_login("use Galaxy visualizations", use_panels=True) def render(self, trans, visualization_name, embedded=None, **kwargs): """ Render the appropriate visualization template, parsing the `kwargs` into appropriate variables and resources (such as ORM models) based on this visualizations `param` data in visualizations_conf.xml. URL: /visualization/show/{visualization_name} """ plugin = self._get_plugin_from_registry(trans, visualization_name) try: return plugin.render(trans=trans, embedded=embedded, **kwargs) except Exception as exception: self._handle_plugin_error(trans, visualization_name, exception)
def _get_plugin_from_registry(self, trans, visualization_name): """ Get the named plugin from the registry. :raises HTTPNotFound: if registry has been turned off in config. :raises HTTPNotFound: if visualization_name isn't a registered plugin. """ if not trans.app.visualizations_registry: raise HTTPNotFound('No visualization registry (possibly disabled in galaxy.ini)') return trans.app.visualizations_registry.get_plugin(visualization_name) def _handle_plugin_error(self, trans, visualization_name, exception): """ Log, raise if debugging; log and show html message if not. """ log.exception('error rendering visualization (%s)', visualization_name) if trans.debug: raise exception return trans.show_error_message( "There was an error rendering the visualization. " + "Contact your Galaxy administrator if the problem persists." + "<br/>Details: " + unicodify(exception), use_panels=False)
[docs] @web.expose @web.require_login("use Galaxy visualizations", use_panels=True) def saved(self, trans, id=None, revision=None, type=None, config=None, title=None, **kwargs): """ Save (on POST) or load (on GET) a visualization then render. """ # TODO: consider merging saved and render at this point (could break saved URLs, tho) if trans.request.method == 'POST': self._POST_to_saved(trans, id=id, revision=revision, type=type, config=config, title=title, **kwargs) # check the id and load the saved visualization if id is None: return HTTPBadRequest('A valid visualization id is required to load a visualization') visualization = self.get_visualization(trans, id, check_ownership=False, check_accessible=True) # re-add title to kwargs for passing to render if title: kwargs['title'] = title plugin = self._get_plugin_from_registry(trans, visualization.type) try: return plugin.render_saved(visualization, trans=trans, **kwargs) except Exception as exception: self._handle_plugin_error(trans, visualization.type, exception)
def _POST_to_saved(self, trans, id=None, revision=None, type=None, config=None, title=None, **kwargs): """ Save the visualiztion info (revision, type, config, title, etc.) to the Visualization at `id` or to a new Visualization if `id` is None. Uses POST/redirect/GET after a successful save, redirecting to GET. """ DEFAULT_VISUALIZATION_NAME = 'Unnamed Visualization' # post to saved in order to save a visualization if type is None or config is None: return HTTPBadRequest('A visualization type and config are required to save a visualization') if isinstance(config, str): config = loads(config) title = title or DEFAULT_VISUALIZATION_NAME # TODO: allow saving to (updating) a specific revision - should be part of UsesVisualization # TODO: would be easier if this returned the visualization directly # check security if posting to existing visualization if id is not None: self.get_visualization(trans, id, check_ownership=True, check_accessible=False) # ??: on not owner: error raised, but not returned (status = 200) # TODO: there's no security check in save visualization (if passed an id) returned = self.save_visualization(trans, config, type, id, title) # redirect to GET to prevent annoying 'Do you want to post again?' dialog on page reload render_url = web.url_for(controller='visualization', action='saved', id=returned.get('vis_id')) return trans.response.send_redirect(render_url) # # Visualizations. #
[docs] @web.expose @web.require_login() def trackster(self, trans, **kwargs): """ Display browser for the visualization denoted by id and add the datasets listed in `dataset_ids`. """ # define app configuration app = {"jscript": "trackster"} # get dataset to add id = kwargs.get("id", None) # get dataset to add new_dataset_id = kwargs.get("dataset_id", None) # set up new browser if no id provided if not id: # use dbkey from dataset to be added or from incoming parameter dbkey = None if new_dataset_id: decoded_id = self.decode_id(new_dataset_id) hda = self.hda_manager.get_owned(decoded_id, trans.user, current_history=trans.user) dbkey = hda.dbkey if dbkey == '?': dbkey = kwargs.get("dbkey", None) # save database key app['default_dbkey'] = dbkey else: # load saved visualization vis = self.get_visualization(trans, id, check_ownership=False, check_accessible=True) app['viz_config'] = self.get_visualization_config(trans, vis) # backup id app['id'] = id # add dataset id app['add_dataset'] = new_dataset_id # check for gene region gene_region = GenomeRegion.from_str(kwargs.get("gene_region", "")) # update gene region of saved visualization if user parses a new gene region in the url if gene_region.chrom is not None: app['gene_region'] = { 'chrom': gene_region.chrom, 'start': gene_region.start, 'end': gene_region.end } # fill template return trans.fill_template('galaxy.panels.mako', config={'right_panel': True, 'app': app, 'bundle': 'extended'})
[docs] @web.expose def circster(self, trans, id=None, hda_ldda=None, dataset_id=None, dbkey=None): """ Display a circster visualization. """ # Get dataset to add. dataset = None if dataset_id: dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id) # Get/create vis. if id: # Display existing viz. vis = self.get_visualization(trans, id, check_ownership=False, check_accessible=True) dbkey = vis.dbkey else: # Create new viz. if not dbkey: # If dbkey not specified, use dataset's dbkey. dbkey = dataset.dbkey if not dbkey or dbkey == '?': # Circster requires a valid dbkey. return trans.show_error_message("You must set the dataset's dbkey to view it. You can set " "a dataset's dbkey by clicking on the pencil icon and editing " "its attributes.", use_panels=True) vis = self.create_visualization(trans, type="genome", dbkey=dbkey, save=False) # Get the vis config and work with it from here on out. Working with the # config is only possible because the config structure of trackster/genome # visualizations is well known. viz_config = self.get_visualization_config(trans, vis) # Add dataset if specified. if dataset: viz_config['tracks'].append(self.get_new_track_config(trans, dataset)) # Get genome info. chroms_info = self.app.genomes.chroms(trans, dbkey=dbkey) genome = {'dbkey': dbkey, 'chroms_info': chroms_info} # Add genome-wide data to each track in viz. tracks = viz_config.get('tracks', []) for track in tracks: dataset_dict = track['dataset'] dataset = self.get_hda_or_ldda(trans, dataset_dict['hda_ldda'], dataset_dict['id']) genome_data = self._get_genome_data(trans, dataset, dbkey) if not isinstance(genome_data, str): track['preloaded_data'] = genome_data # define app configuration for generic mako template app = { 'jscript': "circster", 'viz_config': viz_config, 'genome': genome } # fill template return trans.fill_template('galaxy.panels.mako', config={'app': app, 'bundle': 'extended'})
[docs] @web.expose def sweepster(self, trans, id=None, hda_ldda=None, dataset_id=None, regions=None): """ Displays a sweepster visualization using the incoming parameters. If id is available, get the visualization with the given id; otherwise, create a new visualization using a given dataset and regions. """ regions = regions or '{}' # Need to create history if necessary in order to create tool form. trans.get_history(most_recent=True, create=True) if id: # Loading a shared visualization. viz = self.get_visualization(trans, id) viz_config = self.get_visualization_config(trans, viz) decoded_id = self.decode_id(viz_config['dataset_id']) dataset = self.hda_manager.get_owned(decoded_id, trans.user, current_history=trans.history) else: # Loading new visualization. dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id) job = self.hda_manager.creating_job(dataset) viz_config = { 'dataset_id': dataset_id, 'tool_id': job.tool_id, 'regions': loads(regions) } # Add tool, dataset attributes to config based on id. tool = trans.app.toolbox.get_tool(viz_config['tool_id']) viz_config['tool'] = tool.to_dict(trans, io_details=True) viz_config['dataset'] = trans.security.encode_dict_ids(dataset.to_dict()) return trans.fill_template_mako("visualization/sweepster.mako", config=viz_config)
[docs] def get_item(self, trans, id): return self.get_visualization(trans, id)
[docs] @web.expose def phyloviz(self, trans, id=None, dataset_id=None, tree_index=0, **kwargs): config = None data = None # if id, then this is a saved visualization; get its config and the dataset_id from there if id: visualization = self.get_visualization(trans, id) config = self.get_visualization_config(trans, visualization) dataset_id = config.get('dataset_id', None) # get the hda if we can, then its data using the phyloviz parsers if dataset_id: decoded_id = self.decode_id(dataset_id) hda = self.hda_manager.get_accessible(decoded_id, trans.user) hda = self.hda_manager.error_if_uploading(hda) else: return trans.show_message("Phyloviz couldn't find a dataset_id") pd = PhylovizDataProvider(original_dataset=hda) data = pd.get_data(tree_index=tree_index) # ensure at least a default configuration (gen. an new/unsaved visualization) if not config: config = { 'dataset_id': dataset_id, 'title': hda.display_name(), 'ext': hda.datatype.file_ext, 'treeIndex': tree_index, 'saved_visualization': False } return trans.fill_template_mako("visualization/phyloviz.mako", data=data, config=config)
[docs] @web.expose @web.require_login("run Galaxy Interactive Environments") def gie_list(self, trans, **kwargs): if not hasattr(self, 'gie_image_map'): self.gie_image_map = {} for gie_dir in self.app.config.gie_dirs: gie_list = os.listdir(gie_dir) for gie in gie_list: gie_path = os.path.join(gie_dir, gie) if not os.path.isdir(gie_path): continue if not os.path.exists(self._gie_config_dir(gie_path)): continue if os.path.exists(self._gie_config_dir(gie_path, 'allowed_images.yml')): image_file = self._gie_config_dir(gie_path, 'allowed_images.yml') elif os.path.exists(self._gie_config_dir(gie_path, 'allowed_images.yml.sample')): image_file = self._gie_config_dir(gie_path, 'allowed_images.yml.sample') else: continue with open(image_file) as handle: self.gie_image_map[gie] = yaml.safe_load(handle) return trans.fill_template_mako( "visualization/gie.mako", gie_image_map=self.gie_image_map, history=trans.get_history(), )
def _gie_config_dir(self, gie_path, *args): nargs = [gie_path, 'config'] if len(args) > 0: nargs += args return os.path.join(*nargs)
[docs] @web.json def bookmarks_from_dataset(self, trans, hda_id=None, ldda_id=None): if hda_id: hda_ldda = "hda" dataset_id = hda_id elif ldda_id: hda_ldda = "ldda" dataset_id = ldda_id dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id) rows = [] if isinstance(dataset.datatype, Bed): data = RawBedDataProvider(original_dataset=dataset).get_iterator() for i, line in enumerate(data): if (i > 500): break fields = line.split() location = name = "{}:{}-{}".format(fields[0], fields[1], fields[2]) if len(fields) > 3: name = fields[4] rows.append([location, name]) return {'data': rows}