Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.controllers.history

import logging

from dateutil.parser import isoparse
from sqlalchemy import select

from galaxy import (
    exceptions,
    model,
    web,
)
from galaxy.managers import histories
from galaxy.managers.sharable import SlugBuilder
from galaxy.model import Role
from galaxy.model.base import transaction
from galaxy.model.item_attrs import (
    UsesAnnotations,
    UsesItemRatings,
)
from galaxy.structured_app import StructuredApp
from galaxy.util import (
    listify,
    sanitize_text,
    string_as_bool,
)
from galaxy.web import (
    expose_api_anonymous,
    url_for,
)
from galaxy.webapps.base.controller import (
    BaseUIController,
    SharableMixin,
)
from ..api import depends

log = logging.getLogger(__name__)


[docs]class HistoryController(BaseUIController, SharableMixin, UsesAnnotations, UsesItemRatings): history_manager: histories.HistoryManager = depends(histories.HistoryManager) history_serializer: histories.HistorySerializer = depends(histories.HistorySerializer) slug_builder: SlugBuilder = depends(SlugBuilder)
[docs] def __init__(self, app: StructuredApp): super().__init__(app)
[docs] @web.expose def index(self, trans): return ""
[docs] @expose_api_anonymous def view(self, trans, id=None, show_deleted=False, show_hidden=False, use_panels=True): """ View a history. If a history is importable, then it is viewable by any user. """ show_deleted = string_as_bool(show_deleted) show_hidden = string_as_bool(show_hidden) use_panels = string_as_bool(use_panels) history_dictionary = {} user_is_owner = False if id: history_to_view = self.history_manager.get_accessible( self.decode_id(id), trans.user, current_history=trans.history ) user_is_owner = history_to_view.user == trans.user history_is_current = history_to_view == trans.history else: history_to_view = trans.history if not history_to_view: raise exceptions.RequestParameterMissingException( "No 'id' parameter provided for history, and user does not have a current history." ) user_is_owner = True history_is_current = True # include all datasets: hidden, deleted, and purged history_dictionary = self.history_serializer.serialize_to_view( history_to_view, view="dev-detailed", user=trans.user, trans=trans ) return { "history": history_dictionary, "user_is_owner": user_is_owner, "history_is_current": history_is_current, "show_deleted": show_deleted, "show_hidden": show_hidden, "use_panels": use_panels, "allow_user_dataset_purge": trans.app.config.allow_user_dataset_purge, }
[docs] @web.expose def display_by_username_and_slug(self, trans, username, slug, **kwargs): """ Display history based on a username and slug. """ # Get history. session = trans.sa_session user = session.scalars(select(model.User).filter_by(username=username).limit(1)).first() history = session.scalars( select(model.History) .filter_by(user=user, slug=slug, deleted=False) # return public histories first if slug is not unique .order_by(model.History.importable.desc()) .limit(1) ).first() if history is None: raise web.httpexceptions.HTTPNotFound() # Security check raises error if user cannot access history. self.history_manager.error_unless_accessible(history, trans.user, current_history=trans.history) # Encode history id. history_id = trans.security.encode_id(history.id) # Redirect to client. return trans.response.send_redirect( web.url_for( controller="published", action="history", id=history_id, ) )
[docs] @web.legacy_expose_api @web.require_login("changing default permissions") def permissions(self, trans, payload=None, **kwd): """ Sets the permissions on a history. """ history_id = kwd.get("id") if not history_id: return self.message_exception(trans, f"Invalid history id ({str(history_id)}) received") history = self.history_manager.get_owned(self.decode_id(history_id), trans.user, current_history=trans.history) if trans.request.method == "GET": inputs = [] all_roles = trans.user.all_roles() current_actions = history.default_permissions for action_key, action in trans.app.model.Dataset.permitted_actions.items(): in_roles = set() for a in current_actions: if a.action == action.action: in_roles.add(a.role) inputs.append( { "type": "select", "multiple": True, "optional": True, "individual": True, "name": action_key, "label": action.action, "help": action.description, "options": [(role.name, trans.security.encode_id(role.id)) for role in set(all_roles)], "value": [trans.security.encode_id(role.id) for role in in_roles], } ) return {"title": "Change default dataset permissions for history '%s'" % history.name, "inputs": inputs} else: self.history_manager.error_unless_mutable(history) permissions = {} for action_key, action in trans.app.model.Dataset.permitted_actions.items(): in_roles = payload.get(action_key) or [] in_roles = [trans.sa_session.get(Role, trans.security.decode_id(x)) for x in in_roles] permissions[trans.app.security_agent.get_action(action.action)] = in_roles trans.app.security_agent.history_set_default_permissions(history, permissions) return {"message": "Default history '%s' dataset permissions have been changed." % history.name}
[docs] @web.legacy_expose_api @web.require_login("make datasets private") def make_private(self, trans, history_id=None, all_histories=False, **kwd): """ Sets the datasets within a history to private. Also sets the default permissions for the history to private, for future datasets. """ histories = [] all_histories = string_as_bool(all_histories) if all_histories: histories = trans.user.histories elif history_id: history = self.history_manager.get_owned( self.decode_id(history_id), trans.user, current_history=trans.history ) if history: histories.append(history) if not histories: return self.message_exception(trans, "Invalid history or histories specified.") private_role = trans.app.security_agent.get_private_user_role(trans.user) user_roles = trans.user.all_roles() private_permissions = { trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS: [private_role], trans.app.security_agent.permitted_actions.DATASET_ACCESS: [private_role], } for history in histories: self.history_manager.error_unless_mutable(history) # Set default role for history to private trans.app.security_agent.history_set_default_permissions(history, private_permissions) # Set private role for all datasets for hda in history.datasets: if ( not hda.dataset.library_associations and not trans.app.security_agent.dataset_is_private_to_user(trans, hda.dataset) and trans.app.security_agent.can_manage_dataset(user_roles, hda.dataset) ): # If it's not private to me, and I can manage it, set fixed private permissions. trans.app.security_agent.set_all_dataset_permissions(hda.dataset, private_permissions) if not trans.app.security_agent.dataset_is_private_to_user(trans, hda.dataset): raise exceptions.InternalServerError("An error occurred and the dataset is NOT private.") return { "message": f"Success, requested permissions have been changed in {'all histories' if all_histories else history.name}." }
# ......................................................................... actions/orig. async
[docs] @web.expose def purge_deleted_datasets(self, trans): count = 0 if trans.app.config.allow_user_dataset_purge and trans.history: for hda in trans.history.datasets: if not hda.deleted or hda.purged: continue hda.purge_usage_from_quota(trans.user, hda.dataset.quota_source_info) hda.purged = True trans.sa_session.add(hda) trans.log_event(f"HDA id {hda.id} has been purged") with transaction(trans.sa_session): trans.sa_session.commit() if hda.dataset.user_can_purge: try: hda.dataset.full_delete() trans.log_event( f"Dataset id {hda.dataset.id} has been purged upon the purge of HDA id {hda.id}" ) trans.sa_session.add(hda.dataset) except Exception: log.exception(f"Unable to purge dataset ({hda.dataset.id}) on purge of hda ({hda.id}):") count += 1 return trans.show_ok_message( "%d datasets have been deleted permanently" % count, refresh_frames=["history"] ) return trans.show_error_message("Cannot purge deleted datasets from this session.")
[docs] @web.expose def resume_paused_jobs(self, trans, current=False, ids=None, **kwargs): """Resume paused jobs the active history -- this does not require a logged in user.""" if not ids and string_as_bool(current): histories = [trans.get_history()] refresh_frames = ["history"] else: raise NotImplementedError("You can currently only resume all the datasets of the current history.") for history in histories: history.resume_paused_jobs() trans.sa_session.add(history) with transaction(trans.sa_session): trans.sa_session.commit() return trans.show_ok_message("Your jobs have been resumed.", refresh_frames=refresh_frames)
# TODO: used in index.mako
[docs] @web.legacy_expose_api @web.require_login("rename histories") def rename(self, trans, payload=None, **kwd): id = kwd.get("id") if not id: return self.message_exception(trans, "No history id received for renaming.") user = trans.get_user() id = listify(id) histories = [] for history_id in id: history = self.history_manager.get_mutable( self.decode_id(history_id), trans.user, current_history=trans.history ) if history and history.user_id == user.id: histories.append(history) if trans.request.method == "GET": return { "title": "Change history name(s)", "inputs": [ {"name": "name_%i" % i, "label": f"Current: {h.name}", "value": h.name} for i, h in enumerate(histories) ], } else: messages = [] for i, h in enumerate(histories): cur_name = h.get_display_name() new_name = payload.get("name_%i" % i) # validate name is empty if not isinstance(new_name, str) or not new_name.strip(): messages.append("You must specify a valid name for History '%s'." % cur_name) # skip if not the owner elif h.user_id != user.id: messages.append("History '%s' does not appear to belong to you." % cur_name) # skip if it wouldn't be a change elif new_name != cur_name: h.name = new_name trans.sa_session.add(h) with transaction(trans.sa_session): trans.sa_session.commit() trans.log_event(f"History renamed: id: {str(h.id)}, renamed to: {new_name}") messages.append(f"History '{cur_name}' renamed to '{new_name}'.") message = sanitize_text(" ".join(messages)) if messages else "History names remain unchanged." return {"message": message, "status": "success"}
# ------------------------------------------------------------------------- current history
[docs] @web.expose @web.require_login("switch to a history") def switch_to_history(self, trans, hist_id=None, **kwargs): """Change the current user's current history to one with `hist_id`.""" # remains for backwards compat self.set_as_current(trans, id=hist_id) return trans.response.send_redirect(url_for("/"))
[docs] def get_item(self, trans, id): return self.history_manager.get_owned(self.decode_id(id), trans.user, current_history=trans.history)
# TODO: override of base ui controller?
[docs] def history_data(self, trans, history): """Return the given history in a serialized, dictionary form.""" return self.history_serializer.serialize_to_view(history, view="dev-detailed", user=trans.user, trans=trans)
# TODO: combine these next two - poss. with a redirect flag # @web.require_login( "switch to a history" )
[docs] @web.json @web.do_not_cache def set_as_current(self, trans, id, **kwargs): """Change the current user's current history to one with `id`.""" try: history = self.history_manager.get_mutable(self.decode_id(id), trans.user, current_history=trans.history) trans.set_history(history) return self.history_data(trans, history) except exceptions.MessageException as msg_exc: trans.response.status = msg_exc.status_code return {"err_msg": msg_exc.err_msg, "err_code": msg_exc.err_code.code}
[docs] @web.json @web.do_not_cache def current_history_json(self, trans, since=None, **kwargs): """Return the current user's current history in a serialized, dictionary form.""" history = trans.get_history(most_recent=True, create=True) if since and history.update_time <= isoparse(since): # Should ideally be a 204 response, but would require changing web.json # This endpoint should either give way to a proper API or a SSE loop return return self.history_data(trans, history)
[docs] @web.json def create_new_current(self, trans, name=None, **kwargs): """Create a new, current history for the current user""" new_history = trans.new_history(name) return self.history_data(trans, new_history)
# TODO: /history/current to do all of the above: if ajax, return json; if post, read id and set to current