Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.history
import logging
from collections import OrderedDict
from markupsafe import escape
from sqlalchemy import (
and_,
false,
null,
true
)
from sqlalchemy.orm import (
eagerload,
eagerload_all,
undefer
)
import galaxy.util
from galaxy import exceptions
from galaxy import managers
from galaxy import model
from galaxy import web
from galaxy.model.item_attrs import (
UsesAnnotations,
UsesItemRatings
)
from galaxy.util import (
listify,
Params,
parse_int,
sanitize_text,
string_as_bool,
unicodify
)
from galaxy.web import url_for
from galaxy.web.framework.helpers import (
grids,
iff,
time_ago
)
from galaxy.webapps.base.controller import (
BaseUIController,
ERROR,
ExportsHistoryMixin,
ImportsHistoryMixin,
INFO,
SharableMixin,
SUCCESS,
WARNING,
)
from ._create_history_template import render_item
log = logging.getLogger(__name__)
[docs]class HistoryListGrid(grids.Grid):
# Custom column types
[docs] class HistoryListNameColumn(NameColumn):
[docs] def get_link(self, trans, grid, history):
link = None
if not history.deleted:
link = dict(operation="Switch", id=history.id, use_panels=grid.use_panels, async_compatible=True)
return link
[docs] class DeletedColumn(grids.DeletedColumn):
[docs] def get_value(self, trans, grid, history):
if history == trans.history:
return "<strong>current history</strong>"
if history.purged:
return "deleted permanently"
elif history.deleted:
return "deleted"
return ""
[docs] def sort(self, trans, query, ascending, column_name=None):
if ascending:
query = query.order_by(self.model_class.table.c.purged.asc(), self.model_class.table.c.update_time.desc())
else:
query = query.order_by(self.model_class.table.c.purged.desc(), self.model_class.table.c.update_time.desc())
return query
[docs] def build_initial_query(self, trans, **kwargs):
# Override to preload sharing information used when fetching data for grid.
query = super().build_initial_query(trans, **kwargs)
query = query.options(undefer("users_shared_with_count"))
return query
# Grid definition
title = "Saved Histories"
model_class = model.History
default_sort_key = "-update_time"
columns = [
HistoryListNameColumn("Name", key="name", attach_popup=True, filterable="advanced"),
ItemCountColumn("Items", key="item_count", sortable=False),
grids.GridColumn("Datasets", key="datasets_by_state", sortable=False, nowrap=True, delayed=True),
grids.IndividualTagsColumn("Tags", key="tags", model_tag_association_class=model.HistoryTagAssociation,
filterable="advanced", grid_name="HistoryListGrid"),
grids.SharingStatusColumn("Sharing", key="sharing", filterable="advanced", sortable=False, use_shared_with_count=True),
grids.GridColumn("Size on Disk", key="disk_size", sortable=False, delayed=True),
grids.GridColumn("Created", key="create_time", format=time_ago),
grids.GridColumn("Last Updated", key="update_time", format=time_ago),
DeletedColumn("Status", key="deleted", filterable="advanced")
]
columns.append(
grids.MulticolFilterColumn(
"search history names and tags",
cols_to_filter=[columns[0], columns[3]],
key="free-text-search", visible=False, filterable="standard")
)
global_actions = [
grids.GridAction("Import from file", dict(controller="", action="histories/import"))
]
operations = [
grids.GridOperation("Switch", allow_multiple=False, condition=(lambda item: not item.deleted), async_compatible=True),
grids.GridOperation("View", allow_multiple=False, url_args=dict(controller="", action="histories/view")),
grids.GridOperation("Share or Publish", allow_multiple=False, condition=(lambda item: not item.deleted), url_args=dict(controller="", action="histories/sharing")),
grids.GridOperation("Change Permissions", allow_multiple=False, condition=(lambda item: not item.deleted), url_args=dict(controller="", action="histories/permissions")),
grids.GridOperation("Copy", allow_multiple=False, condition=(lambda item: not item.deleted), async_compatible=False),
grids.GridOperation("Rename", condition=(lambda item: not item.deleted), url_args=dict(controller="", action="histories/rename"), target="top"),
grids.GridOperation("Delete", condition=(lambda item: not item.deleted), async_compatible=True),
grids.GridOperation("Delete Permanently", condition=(lambda item: not item.purged), confirm="History contents will be removed from disk, this cannot be undone. Continue?", async_compatible=True),
grids.GridOperation("Undelete", condition=(lambda item: item.deleted and not item.purged), async_compatible=True),
]
standard_filters = [
grids.GridColumnFilter("Active", args=dict(deleted=False)),
grids.GridColumnFilter("Deleted", args=dict(deleted=True)),
grids.GridColumnFilter("All", args=dict(deleted='All')),
]
default_filter = dict(name="All", deleted="False", tags="All", sharing="All")
num_rows_per_page = 15
use_paging = True
info_text = "Histories that have been deleted for more than a time period specified by the Galaxy administrator(s) may be permanently deleted."
[docs] def apply_query_filter(self, trans, query, **kwargs):
return query.filter_by(user=trans.user, importing=False)
[docs]class HistoryAllPublishedGrid(grids.Grid):
title = "Published Histories"
model_class = model.History
default_sort_key = "update_time"
default_filter = dict(public_url="All", username="All", tags="All")
use_paging = True
num_rows_per_page = 50
columns = [
NameURLColumn("Name", key="name", filterable="advanced"),
grids.OwnerAnnotationColumn("Annotation", key="annotation", model_annotation_association_class=model.HistoryAnnotationAssociation, filterable="advanced"),
grids.OwnerColumn("Owner", key="username", model_class=model.User, filterable="advanced"),
grids.CommunityRatingColumn("Community Rating", key="rating"),
grids.CommunityTagsColumn("Community Tags", key="tags", model_tag_association_class=model.HistoryTagAssociation, filterable="advanced", grid_name="PublicHistoryListGrid"),
grids.ReverseSortColumn("Last Updated", key="update_time", format=time_ago)
]
columns.append(
grids.MulticolFilterColumn(
"Search name, annotation, owner, and tags",
cols_to_filter=[columns[0], columns[1], columns[2], columns[4]],
key="free-text-search", visible=False, filterable="standard")
)
operations = []
[docs] def build_initial_query(self, trans, **kwargs):
# TODO: Tags are still loaded one at a time, consider doing this all at once:
# - eagerload would keep everything in one query but would explode the number of rows and potentially
# result in unneeded info transferred over the wire.
# - subqueryload("tags").subqueryload("tag") would probably be better under postgres but I'd
# like some performance data against a big database first - might cause problems?
# - Pull down only username from associated User table since that is all that is used
# (can be used during search). Need join in addition to the eagerload since it is used in
# the .count() query which doesn't respect the eagerload options (could eliminate this with #5523).
# - Undefer average_rating column to prevent loading individual ratings per-history.
# - Eager load annotations - this causes a left join which might be inefficient if there were
# potentially many items per history (like if joining HDAs for instance) but there should only
# be at most one so this is fine.
return trans.sa_session.query(self.model_class).join("user").options(eagerload("user").load_only("username"), eagerload("annotations"), undefer("average_rating"))
[docs] def apply_query_filter(self, trans, query, **kwargs):
# A public history is published, has a slug, and is not deleted.
return query.filter(self.model_class.published == true()).filter(self.model_class.slug != null()).filter(self.model_class.deleted == false())
[docs]class HistoryController(BaseUIController, SharableMixin, UsesAnnotations, UsesItemRatings,
ExportsHistoryMixin, ImportsHistoryMixin):
[docs] def __init__(self, app):
super().__init__(app)
self.history_manager = managers.histories.HistoryManager(app)
self.history_serializer = managers.histories.HistorySerializer(self.app)
[docs] @web.expose
def list_as_xml(self, trans):
"""XML history list for functional tests"""
trans.response.set_content_type('text/xml')
return trans.fill_template("/history/list_as_xml.mako")
# ......................................................................... lists
stored_list_grid = HistoryListGrid()
shared_list_grid = SharedHistoryListGrid()
published_list_grid = HistoryAllPublishedGrid()
[docs] @web.expose
@web.json
def list_published(self, trans, **kwargs):
return self.published_list_grid(trans, **kwargs)
[docs] @web.legacy_expose_api
@web.require_login("work with multiple histories")
def list(self, trans, **kwargs):
"""List all available histories"""
current_history = trans.get_history()
message = kwargs.get('message')
status = kwargs.get('status')
if 'operation' in kwargs:
operation = kwargs['operation'].lower()
history_ids = listify(kwargs.get('id', []))
# Display no message by default
status, message = None, None
# Load the histories and ensure they all belong to the current user
histories = []
for history_id in history_ids:
history = self.history_manager.get_owned(self.decode_id(history_id), trans.user, current_history=trans.history)
if history:
# Ensure history is owned by current user
if history.user_id is not None and trans.user:
assert trans.user.id == history.user_id, "History does not belong to current user"
histories.append(history)
else:
log.warning("Invalid history id '%r' passed to list", history_id)
if histories:
if operation == "switch":
status, message = self._list_switch(trans, histories)
# Take action to update UI to reflect history switch. If
# grid is using panels, it is standalone and hence a redirect
# to root is needed; if grid is not using panels, it is nested
# in the main Galaxy UI and refreshing the history frame
# is sufficient.
use_panels = kwargs.get('use_panels', False) == 'True'
if use_panels:
return trans.response.send_redirect(url_for("/"))
else:
kwargs['refresh_frames'] = ['history']
elif operation in ("delete", "delete permanently"):
status, message = self._list_delete(trans, histories, purge=(operation == "delete permanently"))
if current_history in histories:
# Deleted the current history, so a new, empty history was
# created automatically, and we need to refresh the history frame
kwargs['refresh_frames'] = ['history']
elif operation == "undelete":
status, message = self._list_undelete(trans, histories)
trans.sa_session.flush()
# Render the list view
if message and status:
kwargs['message'] = sanitize_text(message)
kwargs['status'] = status
return self.stored_list_grid(trans, **kwargs)
def _list_delete(self, trans, histories, purge=False):
"""Delete histories"""
n_deleted = 0
deleted_current = False
message_parts = []
status = SUCCESS
current_history = trans.get_history()
for history in histories:
try:
if history.users_shared_with:
raise exceptions.ObjectAttributeInvalidException(
"History (%s) has been shared with others, unshare it before deleting it." % history.name
)
if purge:
self.history_manager.purge(history)
else:
self.history_manager.delete(history)
if history == current_history:
deleted_current = True
except Exception as e:
message_parts.append(unicodify(e))
status = ERROR
else:
trans.log_event("History (%s) marked as deleted" % history.name)
n_deleted += 1
if n_deleted:
part = "Deleted %d %s" % (n_deleted, iff(n_deleted != 1, "histories", "history"))
if purge and trans.app.config.allow_user_dataset_purge:
part += " and removed {} dataset{} from disk".format(iff(n_deleted != 1, "their", "its"), iff(n_deleted != 1, 's', ''))
elif purge:
part += " but the datasets were not removed from disk because that feature is not enabled in this Galaxy instance"
message_parts.append("%s. " % part)
if deleted_current:
# if this history is the current history for this session,
# - attempt to find the most recently used, undeleted history and switch to it.
# - If no suitable recent history is found, create a new one and switch
# note: this needs to come after commits above or will use an empty history that was deleted above
not_deleted_or_purged = [model.History.deleted == false(), model.History.purged == false()]
most_recent_history = self.history_manager.most_recent(user=trans.user, filters=not_deleted_or_purged)
if most_recent_history:
self.history_manager.set_current(trans, most_recent_history)
else:
trans.get_or_create_default_history()
message_parts.append("Your active history was deleted, a new empty history is now active. ")
status = INFO
return (status, " ".join(message_parts))
def _list_undelete(self, trans, histories):
"""Undelete histories"""
n_undeleted = 0
n_already_purged = 0
for history in histories:
if history.purged:
n_already_purged += 1
if history.deleted:
history.deleted = False
if not history.default_permissions:
# For backward compatibility - for a while we were deleting all DefaultHistoryPermissions on
# the history when we deleted the history. We are no longer doing this.
# Need to add default DefaultHistoryPermissions in case they were deleted when the history was deleted
default_action = trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS
private_user_role = trans.app.security_agent.get_private_user_role(history.user)
default_permissions = {}
default_permissions[default_action] = [private_user_role]
trans.app.security_agent.history_set_default_permissions(history, default_permissions)
n_undeleted += 1
trans.log_event("History (%s) %d marked as undeleted" % (history.name, history.id))
status = SUCCESS
message_parts = []
if n_undeleted:
message_parts.append("Undeleted %d %s. " % (n_undeleted, iff(n_undeleted != 1, "histories", "history")))
if n_already_purged:
message_parts.append("%d histories have already been purged and cannot be undeleted." % n_already_purged)
status = WARNING
return status, "".join(message_parts)
def _list_switch(self, trans, histories):
"""Switch to a new different history"""
new_history = histories[0]
galaxy_session = trans.get_galaxy_session()
try:
association = trans.sa_session.query(trans.app.model.GalaxySessionToHistoryAssociation) \
.filter_by(session_id=galaxy_session.id, history_id=new_history.id) \
.first()
except Exception:
association = None
new_history.add_galaxy_session(galaxy_session, association=association)
trans.sa_session.add(new_history)
trans.sa_session.flush()
trans.set_history(new_history)
# No message
return None, None
[docs] @web.expose
def as_xml(self, trans, id=None, show_deleted=None, show_hidden=None):
"""
Return a history in xml format.
"""
if trans.app.config.require_login and not trans.user:
return trans.fill_template('/no_access.mako', message='Please log in to access Galaxy histories.')
if id:
history = self.history_manager.get_accessible(self.decode_id(id), trans.user,
current_history=trans.history)
else:
history = trans.get_history(most_recent=True, create=True)
trans.response.set_content_type('text/xml')
return trans.fill_template_mako(
"history/as_xml.mako",
history=history,
show_deleted=string_as_bool(show_deleted),
show_hidden=string_as_bool(show_hidden))
[docs] @web.expose
@web.json
def display_structured(self, trans, id=None):
"""
Display a history as a nested structure showing the jobs and workflow
invocations that created each dataset (if any).
"""
# Get history
if id is None:
id = trans.history.id
else:
id = self.decode_id(id)
# Expunge history from the session to allow us to force a reload
# with a bunch of eager loaded joins
trans.sa_session.expunge(trans.history)
history = trans.sa_session.query(model.History).options(
eagerload_all('active_datasets.creating_job_associations.job.workflow_invocation_step.workflow_invocation.workflow'),
).get(id)
if not (history and ((history.user and trans.user and history.user.id == trans.user.id) or
(trans.history and history.id == trans.history.id) or
trans.user_is_admin)):
return trans.show_error_message("Cannot display history structure.")
# Resolve jobs and workflow invocations for the datasets in the history
# items is filled with items (hdas, jobs, or workflows) that go at the
# top level
items = []
# First go through and group hdas by job, if there is no job they get
# added directly to items
jobs = OrderedDict()
for hda in history.active_datasets:
if hda.visible is False:
continue
# Follow "copied from ..." association until we get to the original
# instance of the dataset
original_hda = hda
# while original_hda.copied_from_history_dataset_association:
# original_hda = original_hda.copied_from_history_dataset_association
# Check if the job has a creating job, most should, datasets from
# before jobs were tracked, or from the upload tool before it
# created a job, may not
if not original_hda.creating_job_associations:
items.append((hda, None))
# Attach hda to correct job
# -- there should only be one creating_job_association, so this
# loop body should only be hit once
for assoc in original_hda.creating_job_associations:
job = assoc.job
if job in jobs:
jobs[job].append((hda, None))
else:
jobs[job] = [(hda, None)]
# Second, go through the jobs and connect to workflows
wf_invocations = OrderedDict()
for job, hdas in jobs.items():
# Job is attached to a workflow step, follow it to the
# workflow_invocation and group
if job.workflow_invocation_step:
wf_invocation = job.workflow_invocation_step.workflow_invocation
if wf_invocation in wf_invocations:
wf_invocations[wf_invocation].append((job, hdas))
else:
wf_invocations[wf_invocation] = [(job, hdas)]
# Not attached to a workflow, add to items
else:
items.append((job, hdas))
# Finally, add workflow invocations to items, which should now
# contain all hdas with some level of grouping
items.extend(wf_invocations.items())
# Sort items by age
items.sort(key=(lambda x: x[0].create_time), reverse=True)
# logic taken from mako files
from galaxy.managers import hdas
hda_serializer = hdas.HDASerializer(trans.app)
hda_dicts = []
id_hda_dict_map = {}
for hda in history.active_datasets:
hda_dict = hda_serializer.serialize_to_view(hda, user=trans.user, trans=trans, view='detailed')
id_hda_dict_map[hda_dict['id']] = hda_dict
hda_dicts.append(hda_dict)
html_template = ''
for entity, children in items:
html_template += render_item(trans, entity, children)
return {
'name': history.name,
'history_json': hda_dicts,
'template': html_template
}
[docs] @web.expose
@web.json
def view(self, trans, id=None, show_deleted=False, show_hidden=False, use_panels=True):
"""
View a history. If a history is importable, then it is viewable by any user.
"""
show_deleted = string_as_bool(show_deleted)
show_hidden = string_as_bool(show_hidden)
use_panels = string_as_bool(use_panels)
history_dictionary = {}
user_is_owner = False
try:
if id:
history_to_view = self.history_manager.get_accessible(self.decode_id(id), trans.user,
current_history=trans.history)
user_is_owner = history_to_view.user == trans.user
history_is_current = history_to_view == trans.history
else:
history_to_view = trans.history
user_is_owner = True
history_is_current = True
# include all datasets: hidden, deleted, and purged
history_dictionary = self.history_serializer.serialize_to_view(history_to_view,
view='dev-detailed', user=trans.user, trans=trans)
except Exception as exc:
user_id = str(trans.user.id) if trans.user else '(anonymous)'
log.exception('Error bootstrapping history for user %s', user_id)
if isinstance(exc, exceptions.ItemAccessibilityException):
error_msg = 'You do not have permission to view this history.'
else:
error_msg = ('An error occurred getting the history data from the server. ' +
'Please contact a Galaxy administrator if the problem persists.')
return trans.show_error_message(error_msg, use_panels=use_panels)
return {
"history": history_dictionary,
"user_is_owner": user_is_owner,
"history_is_current": history_is_current,
"show_deleted": show_deleted,
"show_hidden": show_hidden,
"use_panels": use_panels,
"allow_user_dataset_purge": trans.app.config.allow_user_dataset_purge
}
[docs] @web.require_login("use more than one Galaxy history")
@web.expose
def view_multiple(self, trans, include_deleted_histories=False, order='update_time', limit=10):
"""
"""
current_history_id = trans.security.encode_id(trans.history.id)
# TODO: allow specifying user_id for admin?
include_deleted_histories = string_as_bool(include_deleted_histories)
limit = parse_int(limit, min_val=1, default=10, allow_none=True)
return trans.fill_template_mako("history/view_multiple.mako", current_history_id=current_history_id,
include_deleted_histories=include_deleted_histories, order=order, limit=limit)
[docs] @web.expose
def display_by_username_and_slug(self, trans, username, slug):
"""
Display history based on a username and slug.
"""
# Get history.
session = trans.sa_session
user = session.query(model.User).filter_by(username=username).first()
history = trans.sa_session.query(model.History) \
.options(eagerload('tags')).options(eagerload('annotations')) \
.filter_by(user=user, slug=slug, deleted=False).first()
if history is None:
raise web.httpexceptions.HTTPNotFound()
# Security check raises error if user cannot access history.
self.history_manager.error_unless_accessible(history, trans.user, current_history=trans.history)
# Get rating data.
user_item_rating = 0
if trans.get_user():
user_item_rating = self.get_user_item_rating(trans.sa_session, trans.get_user(), history)
if user_item_rating:
user_item_rating = user_item_rating.rating
else:
user_item_rating = 0
ave_item_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, history)
# create ownership flag for template, dictify models
user_is_owner = trans.user == history.user
history_dictionary = self.history_serializer.serialize_to_view(history,
view='dev-detailed', user=trans.user, trans=trans)
history_dictionary['annotation'] = self.get_item_annotation_str(trans.sa_session, history.user, history)
return trans.stream_template_mako("history/display.mako", item=history, item_data=[],
user_is_owner=user_is_owner, history_dict=history_dictionary,
user_item_rating=user_item_rating, ave_item_rating=ave_item_rating, num_ratings=num_ratings)
[docs] @web.legacy_expose_api
@web.require_login("changing default permissions")
def permissions(self, trans, payload=None, **kwd):
"""
Sets the permissions on a history.
"""
history_id = kwd.get('id')
if not history_id:
return self.message_exception(trans, 'Invalid history id (%s) received' % str(history_id))
history = self.history_manager.get_owned(self.decode_id(history_id), trans.user, current_history=trans.history)
if trans.request.method == 'GET':
inputs = []
all_roles = trans.user.all_roles()
current_actions = history.default_permissions
for action_key, action in trans.app.model.Dataset.permitted_actions.items():
in_roles = set()
for a in current_actions:
if a.action == action.action:
in_roles.add(a.role)
inputs.append({'type' : 'select',
'multiple' : True,
'optional' : True,
'individual': True,
'name' : action_key,
'label' : action.action,
'help' : action.description,
'options' : [(role.name, trans.security.encode_id(role.id)) for role in set(all_roles)],
'value' : [trans.security.encode_id(role.id) for role in in_roles]})
return {'title' : 'Change default dataset permissions for history \'%s\'' % history.name, 'inputs' : inputs}
else:
permissions = {}
for action_key, action in trans.app.model.Dataset.permitted_actions.items():
in_roles = payload.get(action_key) or []
in_roles = [trans.sa_session.query(trans.app.model.Role).get(trans.security.decode_id(x)) for x in in_roles]
permissions[trans.app.security_agent.get_action(action.action)] = in_roles
trans.app.security_agent.history_set_default_permissions(history, permissions)
return {'message': 'Default history \'%s\' dataset permissions have been changed.' % history.name}
[docs] @web.legacy_expose_api
@web.require_login("make datasets private")
def make_private(self, trans, history_id=None, all_histories=False, **kwd):
"""
Sets the datasets within a history to private. Also sets the default
permissions for the history to private, for future datasets.
"""
histories = []
all_histories = string_as_bool(all_histories)
if all_histories:
histories = trans.user.histories
elif history_id:
history = self.history_manager.get_owned(self.decode_id(history_id), trans.user, current_history=trans.history)
if history:
histories.append(history)
if not histories:
return self.message_exception(trans, 'Invalid history or histories specified.')
private_role = trans.app.security_agent.get_private_user_role(trans.user)
user_roles = trans.user.all_roles()
private_permissions = {
trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS: [private_role],
trans.app.security_agent.permitted_actions.DATASET_ACCESS: [private_role],
}
for history in histories:
# Set default role for history to private
trans.app.security_agent.history_set_default_permissions(history, private_permissions)
# Set private role for all datasets
for hda in history.datasets:
if (not hda.dataset.library_associations
and not trans.app.security_agent.dataset_is_private_to_user(trans, hda.dataset)
and trans.app.security_agent.can_manage_dataset(user_roles, hda.dataset)):
# If it's not private to me, and I can manage it, set fixed private permissions.
trans.app.security_agent.set_all_dataset_permissions(hda.dataset, private_permissions)
if not trans.app.security_agent.dataset_is_private_to_user(trans, hda.dataset):
raise exceptions.InternalServerError('An error occurred and the dataset is NOT private.')
return {'message': 'Success, requested permissions have been changed in %s.' % ("all histories" if all_histories else history.name)}
def _get_histories(self, trans, ids):
if not ids:
# Default to the current history
ids = trans.security.encode_id(trans.history.id)
ids = listify(ids)
histories = []
for history_id in ids:
history_id = self.decode_id(history_id)
history = self.history_manager.get_owned(history_id, trans.user, current_history=trans.history)
histories.append(history)
return histories
def _get_users(self, trans, user, emails_or_ids):
send_to_users = []
send_to_err = ""
for string in listify(emails_or_ids):
string = string.strip()
if not string:
continue
send_to_user = None
if '@' in string:
email_address = string
send_to_user = self.user_manager.by_email(email_address,
filters=[trans.app.model.User.table.c.deleted == false()])
else:
try:
decoded_user_id = self.decode_id(string)
send_to_user = self.user_manager.by_id(decoded_user_id)
if send_to_user.deleted:
send_to_user = None
# TODO: in an ideal world, we would let this bubble up to web.expose which would handle it
except exceptions.MalformedId:
send_to_user = None
if not send_to_user:
send_to_err += "%s is not a valid Galaxy user. " % string
elif send_to_user == user:
send_to_err += "You cannot send histories to yourself. "
else:
send_to_users.append(send_to_user)
return send_to_users, send_to_err
def _populate(self, trans, histories_for_sharing, other, send_to_err):
# This method will populate the histories_for_sharing dictionary with the users and
# histories in other, eliminating histories that have already been shared with the
# associated user. No security checking on datasets is performed.
# If not empty, the histories_for_sharing dictionary looks like:
# { userA: [ historyX, historyY ], userB: [ historyY ] }
# other looks like:
# { userA: {historyX : [hda, hda], historyY : [hda]}, userB: {historyY : [hda]} }
for send_to_user, history_dict in other.items():
for history in history_dict:
# Make sure the current history has not already been shared with the current send_to_user
if trans.sa_session.query(trans.app.model.HistoryUserShareAssociation) \
.filter(and_(trans.app.model.HistoryUserShareAssociation.table.c.user_id == send_to_user.id,
trans.app.model.HistoryUserShareAssociation.table.c.history_id == history.id)) \
.count() > 0:
send_to_err += "History ({}) already shared with user ({})".format(history.name, send_to_user.email)
else:
# Build the dict that will be used for sharing
if send_to_user not in histories_for_sharing:
histories_for_sharing[send_to_user] = [history]
elif history not in histories_for_sharing[send_to_user]:
histories_for_sharing[send_to_user].append(history)
return histories_for_sharing, send_to_err
def _populate_restricted(self, trans, user, histories, send_to_users, action, send_to_err, unique=False):
# The user may be attempting to share histories whose datasets cannot all be accessed by other users.
# If this is the case, the user sharing the histories can:
# 1) action=='public': choose to make the datasets public if he is permitted to do so
# 2) action=='private': automatically create a new "sharing role" allowing protected
# datasets to be accessed only by the desired users
# This method will populate the can_change, cannot_change and no_change_needed dictionaries, which
# are used for either displaying to the user, letting them make 1 of the choices above, or sharing
# after the user has made a choice. They will be used for display if 'unique' is True, and will look
# like: {historyX : [hda, hda], historyY : [hda] }
# For sharing, they will look like:
# { userA: {historyX : [hda, hda], historyY : [hda]}, userB: {historyY : [hda]} }
can_change = {}
cannot_change = {}
no_change_needed = {}
unique_no_change_needed = {}
user_roles = user.all_roles()
for history in histories:
for send_to_user in send_to_users:
# Make sure the current history has not already been shared with the current send_to_user
if trans.sa_session.query(trans.app.model.HistoryUserShareAssociation) \
.filter(and_(trans.app.model.HistoryUserShareAssociation.table.c.user_id == send_to_user.id,
trans.app.model.HistoryUserShareAssociation.table.c.history_id == history.id)) \
.count() > 0:
send_to_err += "History ({}) already shared with user ({})".format(history.name, send_to_user.email)
else:
# Only deal with datasets that have not been purged
for hda in history.activatable_datasets:
if trans.app.security_agent.can_access_dataset(send_to_user.all_roles(), hda.dataset):
# The no_change_needed dictionary is a special case. If both of can_change
# and cannot_change are empty, no_change_needed will used for sharing. Otherwise
# unique_no_change_needed will be used for displaying, so we need to populate both.
# Build the dictionaries for display, containing unique histories only
if history not in unique_no_change_needed:
unique_no_change_needed[history] = [hda]
else:
unique_no_change_needed[history].append(hda)
# Build the dictionaries for sharing
if send_to_user not in no_change_needed:
no_change_needed[send_to_user] = {}
if history not in no_change_needed[send_to_user]:
no_change_needed[send_to_user][history] = [hda]
else:
no_change_needed[send_to_user][history].append(hda)
else:
# The user with which we are sharing the history does not have access permission on the current dataset
if trans.app.security_agent.can_manage_dataset(user_roles, hda.dataset):
# The current user has authority to change permissions on the current dataset because
# they have permission to manage permissions on the dataset.
# NOTE: ( gvk )There may be problems if the dataset also has an ldda, but I don't think so
# because the user with which we are sharing will not have the "manage permission" permission
# on the dataset in their history. Keep an eye on this though...
if unique:
# Build the dictionaries for display, containing unique histories only
if history not in can_change:
can_change[history] = [hda]
else:
can_change[history].append(hda)
else:
# Build the dictionaries for sharing
if send_to_user not in can_change:
can_change[send_to_user] = {}
if history not in can_change[send_to_user]:
can_change[send_to_user][history] = [hda]
else:
can_change[send_to_user][history].append(hda)
else:
if action in ["private", "public"]:
# The user has made a choice, so 'unique' doesn't apply. Don't change stuff
# that the user doesn't have permission to change
continue
if unique:
# Build the dictionaries for display, containing unique histories only
if history not in cannot_change:
cannot_change[history] = [hda]
else:
cannot_change[history].append(hda)
else:
# Build the dictionaries for sharing
if send_to_user not in cannot_change:
cannot_change[send_to_user] = {}
if history not in cannot_change[send_to_user]:
cannot_change[send_to_user][history] = [hda]
else:
cannot_change[send_to_user][history].append(hda)
return can_change, cannot_change, no_change_needed, unique_no_change_needed, send_to_err
def _share_histories(self, trans, user, send_to_err, histories=None):
# histories looks like: { userA: [ historyX, historyY ], userB: [ historyY ] }
histories = histories or {}
if not histories:
send_to_err += "No users have been specified or no histories can be sent without changing permissions or associating a sharing role. "
return trans.response.send_redirect(web.url_for("/histories/list?status=error&message=%s" % send_to_err))
else:
shared_histories = []
for send_to_user, send_to_user_histories in histories.items():
for history in send_to_user_histories:
share = trans.app.model.HistoryUserShareAssociation()
share.history = history
share.user = send_to_user
trans.sa_session.add(share)
self.create_item_slug(trans.sa_session, history)
trans.sa_session.flush()
if history not in shared_histories:
shared_histories.append(history)
return trans.response.send_redirect(web.url_for("/histories/sharing?id=%s" % trans.security.encode_id(shared_histories[0].id)))
# ......................................................................... actions/orig. async
[docs] @web.expose
def purge_deleted_datasets(self, trans):
count = 0
if trans.app.config.allow_user_dataset_purge and trans.history:
for hda in trans.history.datasets:
if not hda.deleted or hda.purged:
continue
hda.purge_usage_from_quota(trans.user)
hda.purged = True
trans.sa_session.add(hda)
trans.log_event("HDA id %s has been purged" % hda.id)
trans.sa_session.flush()
if hda.dataset.user_can_purge:
try:
hda.dataset.full_delete()
trans.log_event("Dataset id {} has been purged upon the the purge of HDA id {}".format(hda.dataset.id, hda.id))
trans.sa_session.add(hda.dataset)
except Exception:
log.exception('Unable to purge dataset ({}) on purge of hda ({}):'.format(hda.dataset.id, hda.id))
count += 1
return trans.show_ok_message("%d datasets have been deleted permanently" % count, refresh_frames=['history'])
return trans.show_error_message("Cannot purge deleted datasets from this session.")
[docs] @web.expose
def resume_paused_jobs(self, trans, current=False, ids=None):
"""Resume paused jobs the active history -- this does not require a logged in user."""
if not ids and string_as_bool(current):
histories = [trans.get_history()]
refresh_frames = ['history']
else:
raise NotImplementedError("You can currently only resume all the datasets of the current history.")
for history in histories:
history.resume_paused_jobs()
trans.sa_session.add(history)
trans.sa_session.flush()
return trans.show_ok_message("Your jobs have been resumed.", refresh_frames=refresh_frames)
# TODO: used in index.mako
[docs] @web.expose
@web.require_login("rate items")
@web.json
def rate_async(self, trans, id, rating):
""" Rate a history asynchronously and return updated community data. """
history = self.history_manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
if not history:
return trans.show_error_message("The specified history does not exist.")
# Rate history.
self.rate_item(trans.sa_session, trans.get_user(), history, rating)
return self.get_ave_item_rating_data(trans.sa_session, history)
# TODO: used in display_base.mako
[docs] @web.expose
def export_archive(self, trans, id=None, gzip=True, include_hidden=False, include_deleted=False, preview=False):
""" Export a history to an archive. """
#
# Get history to export.
#
if id:
history = self.history_manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
else:
# Use current history.
history = trans.history
id = trans.security.encode_id(history.id)
if not history:
return trans.show_error_message("This history does not exist or you cannot export this history.")
# If history has already been exported and it has not changed since export, stream it.
jeha = history.latest_export
if jeha and jeha.up_to_date:
if jeha.ready:
if preview:
url = url_for(controller='history', action="export_archive", id=id, qualified=True)
return trans.show_message("History Ready: '%(n)s'. Use this link to download "
"the archive or import it to another Galaxy server: "
"<a href='%(u)s'>%(u)s</a>" % ({'n': history.name, 'u': url}))
else:
return self.serve_ready_history_export(trans, jeha)
elif jeha.preparing:
return trans.show_message("Still exporting history %(n)s; please check back soon. Link: <a href='%(s)s'>%(s)s</a>"
% ({'n': history.name, 's': url_for(controller='history', action="export_archive", id=id, qualified=True)}))
self.queue_history_export(trans, history, gzip=gzip, include_hidden=include_hidden, include_deleted=include_deleted)
url = url_for(controller='history', action="export_archive", id=id, qualified=True)
return trans.show_message("Exporting History '%(n)s'. You will need to <a href='%(share)s' target='_top'>make this history 'accessible'</a> in order to import this to another galaxy sever. <br/>"
"Use this link to download the archive or import it to another Galaxy server: "
"<a href='%(u)s'>%(u)s</a>" % ({'share': url_for('/histories/sharing', id=id), 'n': history.name, 'u': url}))
# TODO: used in this file and index.mako
[docs] @web.expose
@web.json
@web.require_login("get history name and link")
def get_name_and_link_async(self, trans, id=None):
""" Returns history's name and link. """
history = self.history_manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
if self.create_item_slug(trans.sa_session, history):
trans.sa_session.flush()
return_dict = {
"name": history.name,
"link": url_for(controller='history', action="display_by_username_and_slug",
username=history.user.username, slug=history.slug)}
return return_dict
# TODO: used in page/editor.mako
[docs] @web.expose
@web.require_login("set history's accessible flag")
def set_accessible_async(self, trans, id=None, accessible=False):
""" Set history's importable attribute and slug. """
history = self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
# Only set if importable value would change; this prevents a change in the update_time unless attribute really changed.
importable = accessible in ['True', 'true', 't', 'T']
if history and history.importable != importable:
if importable:
self._make_item_accessible(trans.sa_session, history)
else:
history.importable = importable
trans.sa_session.flush()
return
# TODO: used in page/editor.mako
[docs] @web.legacy_expose_api
@web.require_login("rename histories")
def rename(self, trans, payload=None, **kwd):
id = kwd.get('id')
if not id:
return self.message_exception(trans, 'No history id received for renaming.')
user = trans.get_user()
id = listify(id)
histories = []
for history_id in id:
history = self.history_manager.get_owned(self.decode_id(history_id), trans.user, current_history=trans.history)
if history and history.user_id == user.id:
histories.append(history)
if trans.request.method == 'GET':
return {
'title' : 'Change history name(s)',
'inputs' : [{
'name' : 'name_%i' % i,
'label' : 'Current: %s' % h.name,
'value' : h.name
} for i, h in enumerate(histories)]
}
else:
messages = []
for i, h in enumerate(histories):
cur_name = h.get_display_name()
new_name = payload.get('name_%i' % i)
# validate name is empty
if not isinstance(new_name, str) or not new_name.strip():
messages.append('You must specify a valid name for History \'%s\'.' % cur_name)
# skip if not the owner
elif h.user_id != user.id:
messages.append('History \'%s\' does not appear to belong to you.' % cur_name)
# skip if it wouldn't be a change
elif new_name != cur_name:
h.name = new_name
trans.sa_session.add(h)
trans.sa_session.flush()
trans.log_event('History renamed: id: {}, renamed to: {}'.format(str(h.id), new_name))
messages.append('History \'' + cur_name + '\' renamed to \'' + new_name + '\'.')
message = sanitize_text(' '.join(messages)) if messages else 'History names remain unchanged.'
return {'message': message, 'status': 'success'}
# ------------------------------------------------------------------------- current history
[docs] @web.expose
@web.require_login("switch to a history")
def switch_to_history(self, trans, hist_id=None):
"""Change the current user's current history to one with `hist_id`."""
# remains for backwards compat
self.set_as_current(trans, id=hist_id)
return trans.response.send_redirect(url_for("/"))
[docs] def get_item(self, trans, id):
return self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
# TODO: override of base ui controller?
[docs] def history_data(self, trans, history):
"""Return the given history in a serialized, dictionary form."""
return self.history_serializer.serialize_to_view(history, view='dev-detailed', user=trans.user, trans=trans)
# TODO: combine these next two - poss. with a redirect flag
# @web.require_login( "switch to a history" )
[docs] @web.json
@web.do_not_cache
def set_as_current(self, trans, id):
"""Change the current user's current history to one with `id`."""
try:
history = self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
trans.set_history(history)
return self.history_data(trans, history)
except exceptions.MessageException as msg_exc:
trans.response.status = msg_exc.err_code.code
return {'err_msg': msg_exc.err_msg, 'err_code': msg_exc.err_code.code}
[docs] @web.json
@web.do_not_cache
def current_history_json(self, trans):
"""Return the current user's current history in a serialized, dictionary form."""
history = trans.get_history(most_recent=True, create=True)
return self.history_data(trans, history)
[docs] @web.json
def create_new_current(self, trans, name=None):
"""Create a new, current history for the current user"""
new_history = trans.new_history(name)
return self.history_data(trans, new_history)
# TODO: /history/current to do all of the above: if ajax, return json; if post, read id and set to current