Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.history
import logging
from dateutil.parser import isoparse
from markupsafe import escape
from sqlalchemy import (
false,
null,
true,
)
from sqlalchemy.orm import (
joinedload,
undefer,
)
from galaxy import (
exceptions,
model,
web,
)
from galaxy.managers import histories
from galaxy.managers.sharable import SlugBuilder
from galaxy.model.item_attrs import (
UsesAnnotations,
UsesItemRatings,
)
from galaxy.structured_app import StructuredApp
from galaxy.util import (
listify,
parse_int,
sanitize_text,
string_as_bool,
unicodify,
)
from galaxy.web import (
expose_api_anonymous,
url_for,
)
from galaxy.web.framework.helpers import (
grids,
iff,
time_ago,
)
from galaxy.webapps.base.controller import (
BaseUIController,
ERROR,
INFO,
SharableMixin,
SUCCESS,
WARNING,
)
from ._create_history_template import render_item
from ..api import depends
log = logging.getLogger(__name__)
[docs]class HistoryListGrid(grids.Grid):
# Custom column types
[docs] class HistoryListNameColumn(NameColumn):
[docs] def get_link(self, trans, grid, history):
link = None
if not history.deleted:
link = dict(operation="Switch", id=history.id, use_panels=grid.use_panels, async_compatible=True)
return link
[docs] class DeletedColumn(grids.DeletedColumn):
[docs] def get_value(self, trans, grid, history):
if history == trans.history:
return "<strong>current history</strong>"
if history.purged:
return "deleted permanently"
elif history.deleted:
return "deleted"
return ""
[docs] def sort(self, trans, query, ascending, column_name=None):
if ascending:
query = query.order_by(self.model_class.table.c.purged.asc(), self.model_class.update_time.desc())
else:
query = query.order_by(self.model_class.table.c.purged.desc(), self.model_class.update_time.desc())
return query
[docs] def build_initial_query(self, trans, **kwargs):
# Override to preload sharing information used when fetching data for grid.
query = super().build_initial_query(trans, **kwargs)
query = query.options(undefer("users_shared_with_count"))
return query
# Grid definition
title = "Saved Histories"
model_class = model.History
default_sort_key = "-update_time"
columns = [
HistoryListNameColumn("Name", key="name", attach_popup=True, filterable="advanced"),
ItemCountColumn("Items", key="item_count", sortable=False),
grids.GridColumn("Datasets", key="datasets_by_state", sortable=False, nowrap=True, delayed=True),
grids.IndividualTagsColumn(
"Tags",
key="tags",
model_tag_association_class=model.HistoryTagAssociation,
filterable="advanced",
grid_name="HistoryListGrid",
),
grids.SharingStatusColumn(
"Sharing", key="sharing", filterable="advanced", sortable=False, use_shared_with_count=True
),
grids.GridColumn("Size on Disk", key="disk_size", sortable=False, delayed=True),
grids.GridColumn("Created", key="create_time", format=time_ago),
grids.GridColumn("Last Updated", key="update_time", format=time_ago),
DeletedColumn("Status", key="deleted", filterable="advanced"),
]
columns.append(
grids.MulticolFilterColumn(
"search history names and tags",
cols_to_filter=[columns[0], columns[3]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
global_actions = [grids.GridAction("Import history", dict(controller="", action="histories/import"))]
operations = [
grids.GridOperation(
"Switch", allow_multiple=False, condition=(lambda item: not item.deleted), async_compatible=True
),
grids.GridOperation("View", allow_multiple=False, url_args=dict(controller="", action="histories/view")),
grids.GridOperation(
"Share or Publish",
allow_multiple=False,
condition=(lambda item: not item.deleted),
url_args=dict(controller="", action="histories/sharing"),
),
grids.GridOperation(
"Change Permissions",
allow_multiple=False,
condition=(lambda item: not item.deleted),
url_args=dict(controller="", action="histories/permissions"),
),
grids.GridOperation(
"Copy", allow_multiple=False, condition=(lambda item: not item.deleted), async_compatible=False
),
grids.GridOperation(
"Rename",
condition=(lambda item: not item.deleted),
url_args=dict(controller="", action="histories/rename"),
target="top",
),
grids.GridOperation("Delete", condition=(lambda item: not item.deleted), async_compatible=True),
grids.GridOperation(
"Delete Permanently",
condition=(lambda item: not item.purged),
confirm="History contents will be removed from disk, this cannot be undone. Continue?",
async_compatible=True,
),
grids.GridOperation(
"Undelete", condition=(lambda item: item.deleted and not item.purged), async_compatible=True
),
]
standard_filters = [
grids.GridColumnFilter("Active", args=dict(deleted=False)),
grids.GridColumnFilter("Deleted", args=dict(deleted=True)),
grids.GridColumnFilter("All", args=dict(deleted="All")),
]
default_filter = dict(name="All", deleted="False", tags="All", sharing="All")
num_rows_per_page = 15
use_paging = True
info_text = "Histories that have been deleted for more than a time period specified by the Galaxy administrator(s) may be permanently deleted."
[docs] def apply_query_filter(self, trans, query, **kwargs):
return query.filter_by(user=trans.user, importing=False)
[docs]class HistoryAllPublishedGrid(grids.Grid):
title = "Published Histories"
model_class = model.History
default_sort_key = "update_time"
default_filter = dict(public_url="All", username="All", tags="All")
use_paging = True
num_rows_per_page = 50
columns = [
NameURLColumn("Name", key="name", filterable="advanced"),
grids.OwnerAnnotationColumn(
"Annotation",
key="annotation",
model_annotation_association_class=model.HistoryAnnotationAssociation,
filterable="advanced",
),
grids.OwnerColumn("Owner", key="username", model_class=model.User, filterable="advanced"),
grids.CommunityRatingColumn("Community Rating", key="rating"),
grids.CommunityTagsColumn(
"Community Tags",
key="tags",
model_tag_association_class=model.HistoryTagAssociation,
filterable="advanced",
grid_name="PublicHistoryListGrid",
),
grids.ReverseSortColumn("Last Updated", key="update_time", format=time_ago),
]
columns.append(
grids.MulticolFilterColumn(
"Search name, annotation, owner, and tags",
cols_to_filter=[columns[0], columns[1], columns[2], columns[4]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
[docs] def build_initial_query(self, trans, **kwargs):
# TODO: Tags are still loaded one at a time, consider doing this all at once:
# - joinedload would keep everything in one query but would explode the number of rows and potentially
# result in unneeded info transferred over the wire.
# - subqueryload("tags").subqueryload("tag") would probably be better under postgres but I'd
# like some performance data against a big database first - might cause problems?
# - Pull down only username from associated User table since that is all that is used
# (can be used during search). Need join in addition to the joinedload since it is used in
# the .count() query which doesn't respect the joinedload options (could eliminate this with #5523).
# - Undefer average_rating column to prevent loading individual ratings per-history.
# - Eager load annotations - this causes a left join which might be inefficient if there were
# potentially many items per history (like if joining HDAs for instance) but there should only
# be at most one so this is fine.
return (
trans.sa_session.query(self.model_class)
.join("user")
.options(joinedload("user").load_only("username"), joinedload("annotations"), undefer("average_rating"))
)
[docs] def apply_query_filter(self, trans, query, **kwargs):
# A public history is published, has a slug, and is not deleted.
return (
query.filter(self.model_class.published == true())
.filter(self.model_class.slug != null())
.filter(self.model_class.deleted == false())
)
[docs]class HistoryController(BaseUIController, SharableMixin, UsesAnnotations, UsesItemRatings):
history_manager: histories.HistoryManager = depends(histories.HistoryManager)
history_export_view: histories.HistoryExportView = depends(histories.HistoryExportView)
history_serializer: histories.HistorySerializer = depends(histories.HistorySerializer)
slug_builder: SlugBuilder = depends(SlugBuilder)
[docs] @web.expose
def list_as_xml(self, trans):
"""XML history list for functional tests"""
trans.response.set_content_type("text/xml")
return trans.fill_template("/history/list_as_xml.mako")
# ......................................................................... lists
stored_list_grid = HistoryListGrid()
shared_list_grid = SharedHistoryListGrid()
published_list_grid = HistoryAllPublishedGrid()
[docs] @web.expose
@web.json
def list_published(self, trans, **kwargs):
return self.published_list_grid(trans, **kwargs)
[docs] @web.legacy_expose_api
@web.require_login("work with multiple histories")
def list(self, trans, **kwargs):
"""List all available histories"""
current_history = trans.get_history()
message = kwargs.get("message")
status = kwargs.get("status")
if "operation" in kwargs:
operation = kwargs["operation"].lower()
history_ids = listify(kwargs.get("id", []))
# Display no message by default
status, message = None, None
# Load the histories and ensure they all belong to the current user
histories = []
for history_id in history_ids:
history = self.history_manager.get_owned(
self.decode_id(history_id), trans.user, current_history=trans.history
)
if history:
# Ensure history is owned by current user
if history.user_id is not None and trans.user:
assert trans.user.id == history.user_id, "History does not belong to current user"
histories.append(history)
else:
log.warning("Invalid history id '%r' passed to list", history_id)
if histories:
if operation == "switch":
status, message = self._list_switch(trans, histories)
# Take action to update UI to reflect history switch. If
# grid is using panels, it is standalone and hence a redirect
# to root is needed; if grid is not using panels, it is nested
# in the main Galaxy UI and refreshing the history frame
# is sufficient.
use_panels = kwargs.get("use_panels", False) == "True"
if use_panels:
return trans.response.send_redirect(url_for("/"))
else:
kwargs["refresh_frames"] = ["history"]
elif operation in ("delete", "delete permanently"):
status, message = self._list_delete(trans, histories, purge=(operation == "delete permanently"))
if current_history in histories:
# Deleted the current history, so a new, empty history was
# created automatically, and we need to refresh the history frame
kwargs["refresh_frames"] = ["history"]
elif operation == "undelete":
status, message = self._list_undelete(trans, histories)
trans.sa_session.flush()
# Render the list view
if message and status:
kwargs["message"] = sanitize_text(message)
kwargs["status"] = status
return self.stored_list_grid(trans, **kwargs)
def _list_delete(self, trans, histories, purge=False):
"""Delete histories"""
n_deleted = 0
deleted_current = False
message_parts = []
status = SUCCESS
current_history = trans.get_history()
for history in histories:
try:
if history.users_shared_with:
raise exceptions.ObjectAttributeInvalidException(
f"History ({history.name}) has been shared with others, unshare it before deleting it."
)
if purge:
self.history_manager.purge(history)
else:
self.history_manager.delete(history)
if history == current_history:
deleted_current = True
except Exception as e:
message_parts.append(unicodify(e))
status = ERROR
else:
trans.log_event(f"History ({history.name}) marked as deleted")
n_deleted += 1
if n_deleted:
part = "Deleted %d %s" % (n_deleted, iff(n_deleted != 1, "histories", "history"))
if purge and trans.app.config.allow_user_dataset_purge:
part += f" and removed {iff(n_deleted != 1, 'their', 'its')} dataset{iff(n_deleted != 1, 's', '')} from disk"
elif purge:
part += " but the datasets were not removed from disk because that feature is not enabled in this Galaxy instance"
message_parts.append(f"{part}. ")
if deleted_current:
# if this history is the current history for this session,
# - attempt to find the most recently used, undeleted history and switch to it.
# - If no suitable recent history is found, create a new one and switch
# note: this needs to come after commits above or will use an empty history that was deleted above
not_deleted_or_purged = [model.History.deleted == false(), model.History.purged == false()]
most_recent_history = self.history_manager.most_recent(user=trans.user, filters=not_deleted_or_purged)
if most_recent_history:
self.history_manager.set_current(trans, most_recent_history)
else:
trans.get_or_create_default_history()
message_parts.append("Your active history was deleted, a new empty history is now active. ")
status = INFO
return (status, " ".join(message_parts))
def _list_undelete(self, trans, histories):
"""Undelete histories"""
n_undeleted = 0
n_already_purged = 0
for history in histories:
if history.purged:
n_already_purged += 1
if history.deleted:
history.deleted = False
if not history.default_permissions:
# For backward compatibility - for a while we were deleting all DefaultHistoryPermissions on
# the history when we deleted the history. We are no longer doing this.
# Need to add default DefaultHistoryPermissions in case they were deleted when the history was deleted
default_action = trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS
private_user_role = trans.app.security_agent.get_private_user_role(history.user)
default_permissions = {}
default_permissions[default_action] = [private_user_role]
trans.app.security_agent.history_set_default_permissions(history, default_permissions)
n_undeleted += 1
trans.log_event("History (%s) %d marked as undeleted" % (history.name, history.id))
status = SUCCESS
message_parts = []
if n_undeleted:
message_parts.append("Undeleted %d %s. " % (n_undeleted, iff(n_undeleted != 1, "histories", "history")))
if n_already_purged:
message_parts.append("%d histories have already been purged and cannot be undeleted." % n_already_purged)
status = WARNING
return status, "".join(message_parts)
def _list_switch(self, trans, histories):
"""Switch to a new different history"""
new_history = histories[0]
galaxy_session = trans.get_galaxy_session()
try:
association = (
trans.sa_session.query(trans.app.model.GalaxySessionToHistoryAssociation)
.filter_by(session_id=galaxy_session.id, history_id=new_history.id)
.first()
)
except Exception:
association = None
new_history.add_galaxy_session(galaxy_session, association=association)
trans.sa_session.add(new_history)
trans.sa_session.flush()
trans.set_history(new_history)
# No message
return None, None
[docs] @web.expose
def as_xml(self, trans, id=None, show_deleted=None, show_hidden=None):
"""
Return a history in xml format.
"""
if trans.app.config.require_login and not trans.user:
return trans.fill_template("/no_access.mako", message="Please log in to access Galaxy histories.")
if id:
history = self.history_manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
else:
history = trans.get_history(most_recent=True, create=True)
trans.response.set_content_type("text/xml")
return trans.fill_template_mako(
"history/as_xml.mako",
history=history,
show_deleted=string_as_bool(show_deleted),
show_hidden=string_as_bool(show_hidden),
)
[docs] @web.expose
@web.json
def display_structured(self, trans, id=None):
"""
Display a history as a nested structure showing the jobs and workflow
invocations that created each dataset (if any).
"""
# Get history
if id is None:
id = trans.history.id
else:
id = self.decode_id(id)
# Expunge history from the session to allow us to force a reload
# with a bunch of eager loaded joins
trans.sa_session.expunge(trans.history)
history = (
trans.sa_session.query(model.History)
.options(
joinedload("active_datasets")
.joinedload("creating_job_associations")
.joinedload("job")
.joinedload("workflow_invocation_step")
.joinedload("workflow_invocation")
.joinedload("workflow"),
)
.get(id)
)
if not (
history
and (
(history.user and trans.user and history.user.id == trans.user.id)
or (trans.history and history.id == trans.history.id)
or trans.user_is_admin
)
):
return trans.show_error_message("Cannot display history structure.")
# Resolve jobs and workflow invocations for the datasets in the history
# items is filled with items (hdas, jobs, or workflows) that go at the
# top level
items = []
# First go through and group hdas by job, if there is no job they get
# added directly to items
jobs = {}
for hda in history.active_datasets:
if hda.visible is False:
continue
# Follow "copied from ..." association until we get to the original
# instance of the dataset
original_hda = hda
# while original_hda.copied_from_history_dataset_association:
# original_hda = original_hda.copied_from_history_dataset_association
# Check if the job has a creating job, most should, datasets from
# before jobs were tracked, or from the upload tool before it
# created a job, may not
if not original_hda.creating_job_associations:
items.append((hda, None))
# Attach hda to correct job
# -- there should only be one creating_job_association, so this
# loop body should only be hit once
for assoc in original_hda.creating_job_associations:
job = assoc.job
if job in jobs:
jobs[job].append((hda, None))
else:
jobs[job] = [(hda, None)]
# Second, go through the jobs and connect to workflows
wf_invocations = {}
for job, hdas in jobs.items():
# Job is attached to a workflow step, follow it to the
# workflow_invocation and group
if job.workflow_invocation_step:
wf_invocation = job.workflow_invocation_step.workflow_invocation
if wf_invocation in wf_invocations:
wf_invocations[wf_invocation].append((job, hdas))
else:
wf_invocations[wf_invocation] = [(job, hdas)]
# Not attached to a workflow, add to items
else:
items.append((job, hdas))
# Finally, add workflow invocations to items, which should now
# contain all hdas with some level of grouping
items.extend(wf_invocations.items())
# Sort items by age
items.sort(key=(lambda x: x[0].create_time), reverse=True)
# logic taken from mako files
from galaxy.managers import hdas
hda_serializer = hdas.HDASerializer(trans.app)
hda_dicts = []
id_hda_dict_map = {}
for hda in history.active_datasets:
hda_dict = hda_serializer.serialize_to_view(hda, user=trans.user, trans=trans, view="detailed")
id_hda_dict_map[hda_dict["id"]] = hda_dict
hda_dicts.append(hda_dict)
html_template = ""
for entity, children in items:
html_template += render_item(trans, entity, children)
return {"name": history.name, "history_json": hda_dicts, "template": html_template}
[docs] @expose_api_anonymous
def view(self, trans, id=None, show_deleted=False, show_hidden=False, use_panels=True):
"""
View a history. If a history is importable, then it is viewable by any user.
"""
show_deleted = string_as_bool(show_deleted)
show_hidden = string_as_bool(show_hidden)
use_panels = string_as_bool(use_panels)
history_dictionary = {}
user_is_owner = False
if id:
history_to_view = self.history_manager.get_accessible(
self.decode_id(id), trans.user, current_history=trans.history
)
user_is_owner = history_to_view.user == trans.user
history_is_current = history_to_view == trans.history
else:
history_to_view = trans.history
user_is_owner = True
history_is_current = True
# include all datasets: hidden, deleted, and purged
history_dictionary = self.history_serializer.serialize_to_view(
history_to_view, view="dev-detailed", user=trans.user, trans=trans
)
return {
"history": history_dictionary,
"user_is_owner": user_is_owner,
"history_is_current": history_is_current,
"show_deleted": show_deleted,
"show_hidden": show_hidden,
"use_panels": use_panels,
"allow_user_dataset_purge": trans.app.config.allow_user_dataset_purge,
}
[docs] @web.require_login("use more than one Galaxy history")
@web.expose
def view_multiple(self, trans, include_deleted_histories=False, order="update_time", limit=10):
""" """
current_history_id = trans.security.encode_id(trans.history.id)
# TODO: allow specifying user_id for admin?
include_deleted_histories = string_as_bool(include_deleted_histories)
limit = parse_int(limit, min_val=1, default=10, allow_none=True)
return trans.fill_template_mako(
"history/view_multiple.mako",
current_history_id=current_history_id,
include_deleted_histories=include_deleted_histories,
order=order,
limit=limit,
)
[docs] @web.expose
def display_by_username_and_slug(self, trans, username, slug):
"""
Display history based on a username and slug.
"""
# Get history.
session = trans.sa_session
user = session.query(model.User).filter_by(username=username).first()
history = (
trans.sa_session.query(model.History)
.options(joinedload("tags"))
.options(joinedload("annotations"))
.filter_by(user=user, slug=slug, deleted=False)
.first()
)
if history is None:
raise web.httpexceptions.HTTPNotFound()
# Security check raises error if user cannot access history.
self.history_manager.error_unless_accessible(history, trans.user, current_history=trans.history)
# Get rating data.
user_item_rating = 0
if trans.get_user():
user_item_rating = self.get_user_item_rating(trans.sa_session, trans.get_user(), history)
if user_item_rating:
user_item_rating = user_item_rating.rating
else:
user_item_rating = 0
ave_item_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, history)
# create ownership flag for template, dictify models
user_is_owner = trans.user == history.user
history_dictionary = self.history_serializer.serialize_to_view(
history, view="dev-detailed", user=trans.user, trans=trans
)
history_dictionary["annotation"] = self.get_item_annotation_str(trans.sa_session, history.user, history)
return trans.fill_template_mako(
"history/display.mako",
item=history,
item_data=[],
user_is_owner=user_is_owner,
history_dict=history_dictionary,
user_item_rating=user_item_rating,
ave_item_rating=ave_item_rating,
num_ratings=num_ratings,
)
[docs] @web.legacy_expose_api
@web.require_login("changing default permissions")
def permissions(self, trans, payload=None, **kwd):
"""
Sets the permissions on a history.
"""
history_id = kwd.get("id")
if not history_id:
return self.message_exception(trans, f"Invalid history id ({str(history_id)}) received")
history = self.history_manager.get_owned(self.decode_id(history_id), trans.user, current_history=trans.history)
if trans.request.method == "GET":
inputs = []
all_roles = trans.user.all_roles()
current_actions = history.default_permissions
for action_key, action in trans.app.model.Dataset.permitted_actions.items():
in_roles = set()
for a in current_actions:
if a.action == action.action:
in_roles.add(a.role)
inputs.append(
{
"type": "select",
"multiple": True,
"optional": True,
"individual": True,
"name": action_key,
"label": action.action,
"help": action.description,
"options": [(role.name, trans.security.encode_id(role.id)) for role in set(all_roles)],
"value": [trans.security.encode_id(role.id) for role in in_roles],
}
)
return {"title": "Change default dataset permissions for history '%s'" % history.name, "inputs": inputs}
else:
permissions = {}
for action_key, action in trans.app.model.Dataset.permitted_actions.items():
in_roles = payload.get(action_key) or []
in_roles = [
trans.sa_session.query(trans.app.model.Role).get(trans.security.decode_id(x)) for x in in_roles
]
permissions[trans.app.security_agent.get_action(action.action)] = in_roles
trans.app.security_agent.history_set_default_permissions(history, permissions)
return {"message": "Default history '%s' dataset permissions have been changed." % history.name}
[docs] @web.legacy_expose_api
@web.require_login("make datasets private")
def make_private(self, trans, history_id=None, all_histories=False, **kwd):
"""
Sets the datasets within a history to private. Also sets the default
permissions for the history to private, for future datasets.
"""
histories = []
all_histories = string_as_bool(all_histories)
if all_histories:
histories = trans.user.histories
elif history_id:
history = self.history_manager.get_owned(
self.decode_id(history_id), trans.user, current_history=trans.history
)
if history:
histories.append(history)
if not histories:
return self.message_exception(trans, "Invalid history or histories specified.")
private_role = trans.app.security_agent.get_private_user_role(trans.user)
user_roles = trans.user.all_roles()
private_permissions = {
trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS: [private_role],
trans.app.security_agent.permitted_actions.DATASET_ACCESS: [private_role],
}
for history in histories:
# Set default role for history to private
trans.app.security_agent.history_set_default_permissions(history, private_permissions)
# Set private role for all datasets
for hda in history.datasets:
if (
not hda.dataset.library_associations
and not trans.app.security_agent.dataset_is_private_to_user(trans, hda.dataset)
and trans.app.security_agent.can_manage_dataset(user_roles, hda.dataset)
):
# If it's not private to me, and I can manage it, set fixed private permissions.
trans.app.security_agent.set_all_dataset_permissions(hda.dataset, private_permissions)
if not trans.app.security_agent.dataset_is_private_to_user(trans, hda.dataset):
raise exceptions.InternalServerError("An error occurred and the dataset is NOT private.")
return {
"message": f"Success, requested permissions have been changed in {'all histories' if all_histories else history.name}."
}
# ......................................................................... actions/orig. async
[docs] @web.expose
def purge_deleted_datasets(self, trans):
count = 0
if trans.app.config.allow_user_dataset_purge and trans.history:
for hda in trans.history.datasets:
if not hda.deleted or hda.purged:
continue
hda.purge_usage_from_quota(trans.user)
hda.purged = True
trans.sa_session.add(hda)
trans.log_event(f"HDA id {hda.id} has been purged")
trans.sa_session.flush()
if hda.dataset.user_can_purge:
try:
hda.dataset.full_delete()
trans.log_event(
f"Dataset id {hda.dataset.id} has been purged upon the the purge of HDA id {hda.id}"
)
trans.sa_session.add(hda.dataset)
except Exception:
log.exception(f"Unable to purge dataset ({hda.dataset.id}) on purge of hda ({hda.id}):")
count += 1
return trans.show_ok_message(
"%d datasets have been deleted permanently" % count, refresh_frames=["history"]
)
return trans.show_error_message("Cannot purge deleted datasets from this session.")
[docs] @web.expose
def resume_paused_jobs(self, trans, current=False, ids=None):
"""Resume paused jobs the active history -- this does not require a logged in user."""
if not ids and string_as_bool(current):
histories = [trans.get_history()]
refresh_frames = ["history"]
else:
raise NotImplementedError("You can currently only resume all the datasets of the current history.")
for history in histories:
history.resume_paused_jobs()
trans.sa_session.add(history)
trans.sa_session.flush()
return trans.show_ok_message("Your jobs have been resumed.", refresh_frames=refresh_frames)
# TODO: used in index.mako
[docs] @web.expose
@web.require_login("rate items")
@web.json
def rate_async(self, trans, id, rating):
"""Rate a history asynchronously and return updated community data."""
history = self.history_manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
if not history:
return trans.show_error_message("The specified history does not exist.")
# Rate history.
self.rate_item(trans.sa_session, trans.get_user(), history, rating)
return self.get_ave_item_rating_data(trans.sa_session, history)
# TODO: used in display_base.mako
[docs] @web.expose
@web.json
@web.require_login("get history name and link")
def get_name_and_link_async(self, trans, id=None):
"""Returns history's name and link."""
history = self.history_manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
if self.slug_builder.create_item_slug(trans.sa_session, history):
trans.sa_session.flush()
return_dict = {
"name": history.name,
"link": url_for(
controller="history",
action="display_by_username_and_slug",
username=history.user.username,
slug=history.slug,
),
}
return return_dict
# TODO: used in page/editor.mako
[docs] @web.expose
@web.require_login("set history's accessible flag")
def set_accessible_async(self, trans, id=None, accessible=False):
"""Set history's importable attribute and slug."""
history = self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
# Only set if importable value would change; this prevents a change in the update_time unless attribute really changed.
importable = accessible in ["True", "true", "t", "T"]
if history and history.importable != importable:
if importable:
self._make_item_accessible(trans.sa_session, history)
else:
history.importable = importable
trans.sa_session.flush()
return
# TODO: used in page/editor.mako
[docs] @web.legacy_expose_api
@web.require_login("rename histories")
def rename(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No history id received for renaming.")
user = trans.get_user()
id = listify(id)
histories = []
for history_id in id:
history = self.history_manager.get_owned(
self.decode_id(history_id), trans.user, current_history=trans.history
)
if history and history.user_id == user.id:
histories.append(history)
if trans.request.method == "GET":
return {
"title": "Change history name(s)",
"inputs": [
{"name": "name_%i" % i, "label": f"Current: {h.name}", "value": h.name}
for i, h in enumerate(histories)
],
}
else:
messages = []
for i, h in enumerate(histories):
cur_name = h.get_display_name()
new_name = payload.get("name_%i" % i)
# validate name is empty
if not isinstance(new_name, str) or not new_name.strip():
messages.append("You must specify a valid name for History '%s'." % cur_name)
# skip if not the owner
elif h.user_id != user.id:
messages.append("History '%s' does not appear to belong to you." % cur_name)
# skip if it wouldn't be a change
elif new_name != cur_name:
h.name = new_name
trans.sa_session.add(h)
trans.sa_session.flush()
trans.log_event(f"History renamed: id: {str(h.id)}, renamed to: {new_name}")
messages.append(f"History '{cur_name}' renamed to '{new_name}'.")
message = sanitize_text(" ".join(messages)) if messages else "History names remain unchanged."
return {"message": message, "status": "success"}
# ------------------------------------------------------------------------- current history
[docs] @web.expose
@web.require_login("switch to a history")
def switch_to_history(self, trans, hist_id=None):
"""Change the current user's current history to one with `hist_id`."""
# remains for backwards compat
self.set_as_current(trans, id=hist_id)
return trans.response.send_redirect(url_for("/"))
[docs] def get_item(self, trans, id):
return self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
# TODO: override of base ui controller?
[docs] def history_data(self, trans, history):
"""Return the given history in a serialized, dictionary form."""
return self.history_serializer.serialize_to_view(history, view="dev-detailed", user=trans.user, trans=trans)
# TODO: combine these next two - poss. with a redirect flag
# @web.require_login( "switch to a history" )
[docs] @web.json
@web.do_not_cache
def set_as_current(self, trans, id):
"""Change the current user's current history to one with `id`."""
try:
history = self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
trans.set_history(history)
return self.history_data(trans, history)
except exceptions.MessageException as msg_exc:
trans.response.status = msg_exc.status_code
return {"err_msg": msg_exc.err_msg, "err_code": msg_exc.err_code.code}
[docs] @web.json
@web.do_not_cache
def current_history_json(self, trans, since=None):
"""Return the current user's current history in a serialized, dictionary form."""
history = trans.get_history(most_recent=True, create=True)
if since and history.update_time <= isoparse(since):
# Should ideally be a 204 response, but would require changing web.json
# This endpoint should either give way to a proper API or a SSE loop
return
return self.history_data(trans, history)
[docs] @web.json
def create_new_current(self, trans, name=None):
"""Create a new, current history for the current user"""
new_history = trans.new_history(name)
return self.history_data(trans, new_history)
# TODO: /history/current to do all of the above: if ajax, return json; if post, read id and set to current