Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.visualization
import logging
from json import loads
from markupsafe import escape
from paste.httpexceptions import (
HTTPBadRequest,
HTTPNotFound,
)
from sqlalchemy import (
desc,
false,
or_,
text,
true,
)
from sqlalchemy.orm import (
joinedload,
undefer,
)
from galaxy import (
model,
util,
web,
)
from galaxy.managers.hdas import HDAManager
from galaxy.managers.sharable import SlugBuilder
from galaxy.model.base import transaction
from galaxy.model.item_attrs import (
UsesAnnotations,
UsesItemRatings,
)
from galaxy.structured_app import StructuredApp
from galaxy.util import (
sanitize_text,
unicodify,
)
from galaxy.util.sanitize_html import sanitize_html
from galaxy.visualization.data_providers.phyloviz import PhylovizDataProvider
from galaxy.visualization.genomes import (
decode_dbkey,
GenomeRegion,
)
from galaxy.visualization.plugins import registry
from galaxy.web.framework.helpers import (
grids,
time_ago,
)
from galaxy.webapps.base.controller import (
BaseUIController,
SharableMixin,
UsesVisualizationMixin,
)
from ..api import depends
log = logging.getLogger(__name__)
#
# -- Grids --
#
[docs]class HistoryDatasetsSelectionGrid(grids.Grid):
[docs] class DbKeyColumn(grids.GridColumn):
[docs] def filter(self, trans, user, query, dbkey):
"""Filter by dbkey through a raw SQL b/c metadata is a BLOB."""
dbkey_user, dbkey = decode_dbkey(dbkey)
dbkey = dbkey.replace("'", "\\'")
return query.filter(
or_(text(f'metadata like \'%"dbkey": ["{dbkey}"]%\'', f'metadata like \'%"dbkey": "{dbkey}"%\''))
)
[docs] class HistoryColumn(grids.GridColumn):
[docs] def sort(self, trans, query, ascending, column_name=None):
"""Sort query using this column."""
return grids.GridColumn.sort(self, trans, query, ascending, column_name="history_id")
available_tracks = None
title = "Add Datasets"
model_class = model.HistoryDatasetAssociation
default_filter = {"deleted": "False", "shared": "All"}
default_sort_key = "-hid"
columns = [
grids.GridColumn("Id", key="hid"),
grids.TextColumn("Name", key="name", model_class=model.HistoryDatasetAssociation),
grids.TextColumn("Type", key="extension", model_class=model.HistoryDatasetAssociation),
grids.TextColumn("history_id", key="history_id", model_class=model.HistoryDatasetAssociation, visible=False),
HistoryColumn("History", key="history", visible=True),
DbKeyColumn("Build", key="dbkey", model_class=model.HistoryDatasetAssociation, visible=True, sortable=False),
]
columns.append(
grids.MulticolFilterColumn(
"Search name and filetype",
cols_to_filter=[columns[1], columns[2]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
[docs] def build_initial_query(self, trans, **kwargs):
return trans.sa_session.query(self.model_class).join(model.History.table).join(model.Dataset.table)
[docs] def apply_query_filter(self, trans, query, **kwargs):
if self.available_tracks is None:
self.available_tracks = trans.app.datatypes_registry.get_available_tracks()
return (
query.filter(model.History.user == trans.user)
.filter(model.HistoryDatasetAssociation.extension.in_(self.available_tracks))
.filter(model.Dataset.state == model.Dataset.states.OK)
.filter(model.HistoryDatasetAssociation.deleted == false())
.filter(model.HistoryDatasetAssociation.visible == true())
)
[docs]class LibraryDatasetsSelectionGrid(grids.Grid):
available_tracks = None
title = "Add Datasets"
model_class = model.LibraryDatasetDatasetAssociation
default_filter = {"deleted": "False"}
default_sort_key = "-id"
columns = [
grids.GridColumn("Id", key="id"),
grids.TextColumn("Name", key="name", model_class=model.LibraryDatasetDatasetAssociation),
grids.TextColumn("Type", key="extension", model_class=model.LibraryDatasetDatasetAssociation),
]
columns.append(
grids.MulticolFilterColumn(
"Search name and filetype",
cols_to_filter=[columns[1], columns[2]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
[docs] def build_initial_query(self, trans, **kwargs):
return trans.sa_session.query(self.model_class).join(model.Dataset.table)
[docs] def apply_query_filter(self, trans, query, **kwargs):
if self.available_tracks is None:
self.available_tracks = trans.app.datatypes_registry.get_available_tracks()
return (
query.filter(model.LibraryDatasetDatasetAssociation.user == trans.user)
.filter(model.LibraryDatasetDatasetAssociation.extension.in_(self.available_tracks))
.filter(model.Dataset.state == model.Dataset.states.OK)
.filter(model.LibraryDatasetDatasetAssociation.deleted == false())
.filter(model.LibraryDatasetDatasetAssociation.visible == true())
)
[docs]class TracksterSelectionGrid(grids.Grid):
title = "Insert into visualization"
model_class = model.Visualization
default_sort_key = "-update_time"
use_paging = False
show_item_checkboxes = True
columns = [
grids.TextColumn("Title", key="title", model_class=model.Visualization, filterable="standard"),
grids.TextColumn("Build", key="dbkey", model_class=model.Visualization),
grids.GridColumn("Last Updated", key="update_time", format=time_ago),
]
[docs] def build_initial_query(self, trans, **kwargs):
return trans.sa_session.query(self.model_class)
[docs] def apply_query_filter(self, trans, query, **kwargs):
return (
query.filter(self.model_class.user_id == trans.user.id)
.filter(self.model_class.deleted == false())
.filter(self.model_class.type == "trackster")
)
[docs]class VisualizationListGrid(grids.Grid):
[docs] def get_url_args(item):
"""
Returns dictionary used to create item link.
"""
url_kwargs = dict(controller="visualization", id=item.id)
# TODO: hack to build link to saved visualization - need trans in this function instead in order to do
# link_data = trans.app.visualizations_registry.get_visualizations( trans, item )
if item.type in registry.VisualizationsRegistry.BUILT_IN_VISUALIZATIONS:
url_kwargs["action"] = item.type
else:
url_kwargs["__route_name__"] = "saved_visualization"
url_kwargs["visualization_name"] = item.type
url_kwargs["action"] = "saved"
return url_kwargs
[docs] def get_display_name(self, trans, item):
if trans.app.visualizations_registry and item.type in trans.app.visualizations_registry.plugins:
plugin = trans.app.visualizations_registry.plugins[item.type]
return plugin.config.get("name", item.type)
return item.type
# Grid definition
title = "Saved Visualizations"
model_class = model.Visualization
default_sort_key = "-update_time"
default_filter = dict(title="All", deleted="False", tags="All", sharing="All")
columns = [
grids.TextColumn("Title", key="title", attach_popup=True, link=get_url_args),
grids.TextColumn("Type", method="get_display_name"),
grids.TextColumn("Build", key="dbkey"),
grids.IndividualTagsColumn(
"Tags",
key="tags",
model_tag_association_class=model.VisualizationTagAssociation,
filterable="advanced",
grid_name="VisualizationListGrid",
),
grids.SharingStatusColumn("Sharing", key="sharing", filterable="advanced", sortable=False),
grids.GridColumn("Created", key="create_time", format=time_ago),
grids.GridColumn("Last Updated", key="update_time", format=time_ago),
]
columns.append(
grids.MulticolFilterColumn(
"Search",
cols_to_filter=[columns[0], columns[2]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
operations = [
grids.GridOperation("Open", allow_multiple=False, url_args=get_url_args),
grids.GridOperation(
"Edit Attributes", allow_multiple=False, url_args=dict(controller="", action="visualizations/edit")
),
grids.GridOperation("Copy", allow_multiple=False, condition=(lambda item: not item.deleted)),
grids.GridOperation(
"Share or Publish",
allow_multiple=False,
condition=(lambda item: not item.deleted),
url_args=dict(controller="", action="visualizations/sharing"),
),
grids.GridOperation(
"Delete",
condition=(lambda item: not item.deleted),
confirm="Are you sure you want to delete this visualization?",
),
]
[docs] def apply_query_filter(self, trans, query, **kwargs):
return query.filter_by(user=trans.user, deleted=False)
[docs]class VisualizationAllPublishedGrid(grids.Grid):
# Grid definition
use_panels = True
title = "Published Visualizations"
model_class = model.Visualization
default_sort_key = "update_time"
default_filter = dict(title="All", username="All")
columns = [
grids.PublicURLColumn("Title", key="title", filterable="advanced"),
grids.OwnerAnnotationColumn(
"Annotation",
key="annotation",
model_annotation_association_class=model.VisualizationAnnotationAssociation,
filterable="advanced",
),
grids.OwnerColumn("Owner", key="username", model_class=model.User, filterable="advanced"),
grids.CommunityRatingColumn("Community Rating", key="rating"),
grids.CommunityTagsColumn(
"Community Tags",
key="tags",
model_tag_association_class=model.VisualizationTagAssociation,
filterable="advanced",
grid_name="VisualizationAllPublishedGrid",
),
grids.ReverseSortColumn("Last Updated", key="update_time", format=time_ago),
]
columns.append(
grids.MulticolFilterColumn(
"Search title, annotation, owner, and tags",
cols_to_filter=[columns[0], columns[1], columns[2], columns[4]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
[docs] def build_initial_query(self, trans, **kwargs):
# See optimization description comments and TODO for tags in matching public histories query.
return (
trans.sa_session.query(self.model_class)
.join(self.model_class.user)
.options(
joinedload(self.model_class.user).load_only("username"),
joinedload(self.model_class.annotations),
undefer("average_rating"),
)
)
[docs] def apply_query_filter(self, trans, query, **kwargs):
return query.filter(self.model_class.deleted == false()).filter(self.model_class.published == true())
[docs]class VisualizationController(
BaseUIController, SharableMixin, UsesVisualizationMixin, UsesAnnotations, UsesItemRatings
):
_visualization_list_grid = VisualizationListGrid()
_published_list_grid = VisualizationAllPublishedGrid()
_history_datasets_grid = HistoryDatasetsSelectionGrid()
_library_datasets_grid = LibraryDatasetsSelectionGrid()
_tracks_grid = TracksterSelectionGrid()
hda_manager: HDAManager = depends(HDAManager)
slug_builder: SlugBuilder = depends(SlugBuilder)
#
# -- Functions for listing visualizations. --
#
[docs] @web.expose
@web.json
@web.require_login("see all available libraries")
def list_libraries(self, trans, **kwargs):
"""List all libraries that can be used for selecting datasets."""
return self._libraries_grid(trans, **kwargs)
[docs] @web.expose
@web.json
@web.require_login("see a history's datasets that can added to this visualization")
def list_history_datasets(self, trans, **kwargs):
"""List a history's datasets that can be added to a visualization."""
kwargs["show_item_checkboxes"] = "True"
return self._history_datasets_grid(trans, **kwargs)
[docs] @web.expose
@web.json
@web.require_login("see a history's datasets that can added to this visualization")
def list_library_datasets(self, trans, **kwargs):
"""List a library's datasets that can be added to a visualization."""
kwargs["show_item_checkboxes"] = "True"
return self._library_datasets_grid(trans, **kwargs)
[docs] @web.expose
@web.json
def list_tracks(self, trans, **kwargs):
return self._tracks_grid(trans, **kwargs)
[docs] @web.expose
@web.json
def list_published(self, trans, *args, **kwargs):
grid = self._published_list_grid(trans, **kwargs)
grid["shared_by_others"] = self._get_shared(trans)
return grid
[docs] @web.legacy_expose_api
@web.require_login("use Galaxy visualizations", use_panels=True)
def list(self, trans, **kwargs):
message = kwargs.get("message")
status = kwargs.get("status")
if "operation" in kwargs and "id" in kwargs:
session = trans.sa_session
operation = kwargs["operation"].lower()
ids = util.listify(kwargs["id"])
for id in ids:
if operation == "delete":
item = self.get_visualization(trans, id)
item.deleted = True
if operation == "copy":
self.copy(trans, **kwargs)
with transaction(session):
session.commit()
kwargs["embedded"] = True
if message and status:
kwargs["message"] = sanitize_text(message)
kwargs["status"] = status
grid = self._visualization_list_grid(trans, **kwargs)
grid["shared_by_others"] = self._get_shared(trans)
return grid
def _get_shared(self, trans):
"""Identify shared visualizations"""
shared_by_others = (
trans.sa_session.query(model.VisualizationUserShareAssociation)
.filter_by(user=trans.get_user())
.join(model.Visualization.table)
.filter(model.Visualization.deleted == false())
.order_by(desc(model.Visualization.update_time))
.all()
)
return [
{"username": v.visualization.user.username, "slug": v.visualization.slug, "title": v.visualization.title}
for v in shared_by_others
]
#
# -- Functions for operating on visualizations. --
#
[docs] @web.expose
@web.require_login("use Galaxy visualizations", use_panels=True)
def index(self, trans, *args, **kwargs):
"""Lists user's saved visualizations."""
return self.list(trans, *args, **kwargs)
[docs] @web.expose
@web.require_login()
def copy(self, trans, id, **kwargs):
visualization = self.get_visualization(trans, id, check_ownership=False, check_accessible=True)
user = trans.get_user()
owner = visualization.user == user
new_title = f"Copy of '{visualization.title}'"
if not owner:
new_title += f" shared by {visualization.user.email}"
copied_viz = visualization.copy(user=trans.user, title=new_title)
# Persist
session = trans.sa_session
session.add(copied_viz)
with transaction(session):
session.commit()
# Display the management page
trans.set_message(f'Created new visualization with name "{copied_viz.title}"')
return
[docs] @web.expose
@web.require_login("share Galaxy visualizations")
def imp(self, trans, id, **kwargs):
"""Import a visualization into user's workspace."""
# Set referer message.
referer = trans.request.referer
if referer and not referer.startswith(f"{trans.request.application_url}{web.url_for('/login')}"):
referer_message = f"<a href='{escape(referer)}'>return to the previous page</a>"
else:
referer_message = f"<a href='{web.url_for('/')}'>go to Galaxy's start page</a>"
# Do import.
session = trans.sa_session
visualization = self.get_visualization(trans, id, check_ownership=False, check_accessible=True)
if visualization.importable is False:
return trans.show_error_message(
f"The owner of this visualization has disabled imports via this link.<br>You can {referer_message}",
use_panels=True,
)
elif visualization.deleted:
return trans.show_error_message(
f"You can't import this visualization because it has been deleted.<br>You can {referer_message}",
use_panels=True,
)
else:
# Create imported visualization via copy.
# TODO: need to handle custom db keys.
imported_visualization = visualization.copy(user=trans.user, title=f"imported: {visualization.title}")
# Persist
session = trans.sa_session
session.add(imported_visualization)
with transaction(session):
session.commit()
# Redirect to load galaxy frames.
return trans.show_ok_message(
message="""Visualization "{}" has been imported. <br>You can <a href="{}">start using this visualization</a> or {}.""".format(
visualization.title, web.url_for("/visualizations/list"), referer_message
),
use_panels=True,
)
[docs] @web.expose
def display_by_username_and_slug(self, trans, username, slug, **kwargs):
"""Display visualization based on a username and slug."""
# Get visualization.
session = trans.sa_session
user = session.query(model.User).filter_by(username=username).first()
visualization = (
trans.sa_session.query(model.Visualization).filter_by(user=user, slug=slug, deleted=False).first()
)
if visualization is None:
raise web.httpexceptions.HTTPNotFound()
# Security check raises error if user cannot access visualization.
self.security_check(trans, visualization, check_ownership=False, check_accessible=True)
# Encode page identifier.
visualization_id = trans.security.encode_id(visualization.id)
# Redirect to client.
return trans.response.send_redirect(
web.url_for(
controller="published",
action="visualization",
id=visualization_id,
)
)
[docs] @web.json
def save(self, trans, vis_json=None, type=None, id=None, title=None, dbkey=None, annotation=None, **kwargs):
"""
Save a visualization; if visualization does not have an ID, a new
visualization is created. Returns JSON of visualization.
"""
# Get visualization attributes from kwargs or from config.
vis_config = loads(vis_json)
vis_type = type or vis_config["type"]
vis_id = id or vis_config.get("id", None)
vis_title = title or vis_config.get("title", None)
vis_dbkey = dbkey or vis_config.get("dbkey", None)
vis_annotation = annotation or vis_config.get("annotation", None)
return self.save_visualization(trans, vis_config, vis_type, vis_id, vis_title, vis_dbkey, vis_annotation)
[docs] @web.legacy_expose_api
@web.require_login("edit visualizations")
def edit(self, trans, payload=None, **kwd):
"""
Edit a visualization's attributes.
"""
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No visualization id received for editing.")
trans_user = trans.get_user()
v = self.get_visualization(trans, id, check_ownership=True)
if trans.request.method == "GET":
if v.slug is None:
self.slug_builder.create_item_slug(trans.sa_session, v)
return {
"title": "Edit visualization attributes",
"inputs": [
{"name": "title", "label": "Name", "value": v.title},
{
"name": "slug",
"label": "Identifier",
"value": v.slug,
"help": "A unique identifier that will be used for public links to this visualization. This field can only contain lowercase letters, numbers, and dashes (-).",
},
{
"name": "dbkey",
"label": "Build",
"type": "select",
"optional": True,
"value": v.dbkey,
"options": trans.app.genomes.get_dbkeys(trans_user, chrom_info=True),
"help": "Parameter to associate your visualization with a database key.",
},
{
"name": "annotation",
"label": "Annotation",
"value": self.get_item_annotation_str(trans.sa_session, trans.user, v),
"help": "A description of the visualization. The annotation is shown alongside published visualizations.",
},
],
}
else:
v_title = payload.get("title")
v_slug = payload.get("slug")
v_dbkey = payload.get("dbkey")
v_annotation = payload.get("annotation")
if not v_title:
return self.message_exception(trans, "Please provide a visualization name is required.")
elif not v_slug:
return self.message_exception(trans, "Please provide a unique identifier.")
elif not self._is_valid_slug(v_slug):
return self.message_exception(
trans, "Visualization identifier can only contain lowercase letters, numbers, and dashes (-)."
)
elif (
v_slug != v.slug
and trans.sa_session.query(model.Visualization)
.filter_by(user=v.user, slug=v_slug, deleted=False)
.first()
):
return self.message_exception(trans, "Visualization id must be unique.")
else:
v.title = v_title
v.slug = v_slug
v.dbkey = v_dbkey
if v_annotation:
v_annotation = sanitize_html(v_annotation)
self.add_item_annotation(trans.sa_session, trans_user, v, v_annotation)
trans.sa_session.add(v)
with transaction(trans.sa_session):
trans.sa_session.commit()
return {"message": "Attributes of '%s' successfully saved." % v.title, "status": "success"}
# ------------------------- registry.
[docs] @web.expose
@web.require_login("use Galaxy visualizations", use_panels=True)
def render(self, trans, visualization_name, embedded=None, **kwargs):
"""
Render the appropriate visualization template, parsing the `kwargs`
into appropriate variables and resources (such as ORM models)
based on this visualizations `param` data in visualizations_conf.xml.
URL: /visualization/show/{visualization_name}
"""
plugin = self._get_plugin_from_registry(trans, visualization_name)
try:
return plugin.render(trans=trans, embedded=embedded, **kwargs)
except Exception as exception:
self._handle_plugin_error(trans, visualization_name, exception)
def _get_plugin_from_registry(self, trans, visualization_name):
"""
Get the named plugin from the registry.
:raises HTTPNotFound: if registry has been turned off in config.
:raises HTTPNotFound: if visualization_name isn't a registered plugin.
"""
if not trans.app.visualizations_registry:
raise HTTPNotFound("No visualization registry (possibly disabled in galaxy.ini)")
return trans.app.visualizations_registry.get_plugin(visualization_name)
def _handle_plugin_error(self, trans, visualization_name, exception):
"""
Log, raise if debugging; log and show html message if not.
"""
log.exception("error rendering visualization (%s)", visualization_name)
if trans.debug:
raise exception
return trans.show_error_message(
"There was an error rendering the visualization. "
+ "Contact your Galaxy administrator if the problem persists."
+ "<br/>Details: "
+ unicodify(exception),
use_panels=False,
)
[docs] @web.expose
@web.require_login("use Galaxy visualizations", use_panels=True)
def saved(self, trans, id=None, revision=None, type=None, config=None, title=None, **kwargs):
"""
Save (on POST) or load (on GET) a visualization then render.
"""
# TODO: consider merging saved and render at this point (could break saved URLs, tho)
if trans.request.method == "POST":
self._POST_to_saved(trans, id=id, revision=revision, type=type, config=config, title=title, **kwargs)
# check the id and load the saved visualization
if id is None:
return HTTPBadRequest("A valid visualization id is required to load a visualization")
visualization = self.get_visualization(trans, id, check_ownership=False, check_accessible=True)
# re-add title to kwargs for passing to render
if title:
kwargs["title"] = title
plugin = self._get_plugin_from_registry(trans, visualization.type)
try:
return plugin.render_saved(visualization, trans=trans, **kwargs)
except Exception as exception:
self._handle_plugin_error(trans, visualization.type, exception)
def _POST_to_saved(self, trans, id=None, revision=None, type=None, config=None, title=None, **kwargs):
"""
Save the visualiztion info (revision, type, config, title, etc.) to
the Visualization at `id` or to a new Visualization if `id` is None.
Uses POST/redirect/GET after a successful save, redirecting to GET.
"""
DEFAULT_VISUALIZATION_NAME = "Unnamed Visualization"
# post to saved in order to save a visualization
if type is None or config is None:
return HTTPBadRequest("A visualization type and config are required to save a visualization")
if isinstance(config, str):
config = loads(config)
title = title or DEFAULT_VISUALIZATION_NAME
# TODO: allow saving to (updating) a specific revision - should be part of UsesVisualization
# TODO: would be easier if this returned the visualization directly
# check security if posting to existing visualization
if id is not None:
self.get_visualization(trans, id, check_ownership=True, check_accessible=False)
# ??: on not owner: error raised, but not returned (status = 200)
# TODO: there's no security check in save visualization (if passed an id)
returned = self.save_visualization(trans, config, type, id, title)
# redirect to GET to prevent annoying 'Do you want to post again?' dialog on page reload
render_url = web.url_for(controller="visualization", action="saved", id=returned.get("vis_id"))
return trans.response.send_redirect(render_url)
#
# Visualizations.
#
[docs] @web.expose
@web.require_login()
def trackster(self, trans, **kwargs):
"""
Display browser for the visualization denoted by id and add the datasets listed in `dataset_ids`.
"""
# define app configuration
app = {"jscript": "trackster"}
# get dataset to add
id = kwargs.get("id", None)
# get dataset to add
new_dataset_id = kwargs.get("dataset_id", None)
# set up new browser if no id provided
if not id:
# use dbkey from dataset to be added or from incoming parameter
dbkey = None
if new_dataset_id:
decoded_id = self.decode_id(new_dataset_id)
hda = self.hda_manager.get_owned(decoded_id, trans.user, current_history=trans.user)
dbkey = hda.dbkey
if dbkey == "?":
dbkey = kwargs.get("dbkey", None)
# save database key
app["default_dbkey"] = dbkey
else:
# load saved visualization
vis = self.get_visualization(trans, id, check_ownership=False, check_accessible=True)
app["viz_config"] = self.get_visualization_config(trans, vis)
# backup id
app["id"] = id
# add dataset id
app["add_dataset"] = new_dataset_id
# check for gene region
gene_region = GenomeRegion.from_str(kwargs.get("gene_region", ""))
# update gene region of saved visualization if user parses a new gene region in the url
if gene_region.chrom is not None:
app["gene_region"] = {"chrom": gene_region.chrom, "start": gene_region.start, "end": gene_region.end}
# fill template
return trans.fill_template("visualization/trackster.mako", config={"app": app, "bundle": "extended"})
[docs] @web.expose
def circster(self, trans, id=None, hda_ldda=None, dataset_id=None, dbkey=None, **kwargs):
"""
Display a circster visualization.
"""
# Get dataset to add.
dataset = None
if dataset_id:
dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id)
# Get/create vis.
if id:
# Display existing viz.
vis = self.get_visualization(trans, id, check_ownership=False, check_accessible=True)
dbkey = vis.dbkey
else:
# Create new viz.
if not dbkey:
# If dbkey not specified, use dataset's dbkey.
dbkey = dataset.dbkey
if not dbkey or dbkey == "?":
# Circster requires a valid dbkey.
return trans.show_error_message(
"You must set the dataset's dbkey to view it. You can set "
"a dataset's dbkey by clicking on the pencil icon and editing "
"its attributes.",
use_panels=True,
)
vis = self.create_visualization(trans, type="genome", dbkey=dbkey, save=False)
# Get the vis config and work with it from here on out. Working with the
# config is only possible because the config structure of trackster/genome
# visualizations is well known.
viz_config = self.get_visualization_config(trans, vis)
# Add dataset if specified.
if dataset:
viz_config["tracks"].append(self.get_new_track_config(trans, dataset))
# Get genome info.
chroms_info = self.app.genomes.chroms(trans, dbkey=dbkey)
genome = {"dbkey": dbkey, "chroms_info": chroms_info}
# Add genome-wide data to each track in viz.
tracks = viz_config.get("tracks", [])
for track in tracks:
dataset_dict = track["dataset"]
dataset = self.get_hda_or_ldda(trans, dataset_dict["hda_ldda"], dataset_dict["id"])
genome_data = self._get_genome_data(trans, dataset, dbkey)
if not isinstance(genome_data, str):
track["preloaded_data"] = genome_data
# define app configuration for generic mako template
app = {"jscript": "circster", "viz_config": viz_config, "genome": genome}
# fill template
return trans.fill_template("visualization/trackster.mako", config={"app": app, "bundle": "extended"})
[docs] @web.expose
def sweepster(self, trans, id=None, hda_ldda=None, dataset_id=None, regions=None, **kwargs):
"""
Displays a sweepster visualization using the incoming parameters. If id is available,
get the visualization with the given id; otherwise, create a new visualization using
a given dataset and regions.
"""
regions = regions or "{}"
# Need to create history if necessary in order to create tool form.
trans.get_history(most_recent=True, create=True)
if id:
# Loading a shared visualization.
viz = self.get_visualization(trans, id)
viz_config = self.get_visualization_config(trans, viz)
decoded_id = self.decode_id(viz_config["dataset_id"])
dataset = self.hda_manager.get_owned(decoded_id, trans.user, current_history=trans.history)
else:
# Loading new visualization.
dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id)
job = self.hda_manager.creating_job(dataset)
viz_config = {"dataset_id": dataset_id, "tool_id": job.tool_id, "regions": loads(regions)}
# Add tool, dataset attributes to config based on id.
tool = trans.app.toolbox.get_tool(viz_config["tool_id"])
viz_config["tool"] = tool.to_dict(trans, io_details=True)
viz_config["dataset"] = trans.security.encode_dict_ids(dataset.to_dict())
return trans.fill_template_mako("visualization/sweepster.mako", config=viz_config)
[docs] @web.expose
def phyloviz(self, trans, id=None, dataset_id=None, tree_index=0, **kwargs):
config = None
data = None
# if id, then this is a saved visualization; get its config and the dataset_id from there
if id:
visualization = self.get_visualization(trans, id)
config = self.get_visualization_config(trans, visualization)
dataset_id = config.get("dataset_id", None)
# get the hda if we can, then its data using the phyloviz parsers
if dataset_id:
decoded_id = self.decode_id(dataset_id)
hda = self.hda_manager.get_accessible(decoded_id, trans.user)
hda = self.hda_manager.error_if_uploading(hda)
else:
return trans.show_message("Phyloviz couldn't find a dataset_id")
pd = PhylovizDataProvider(original_dataset=hda)
data = pd.get_data(tree_index=tree_index)
# ensure at least a default configuration (gen. an new/unsaved visualization)
if not config:
config = {
"dataset_id": dataset_id,
"title": hda.display_name(),
"ext": hda.datatype.file_ext,
"treeIndex": tree_index,
"saved_visualization": False,
}
return trans.fill_template_mako("visualization/phyloviz.mako", data=data, config=config)