Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.root
"""
Contains the main interface in the Universe class
"""
import logging
from webob.exc import HTTPNotFound
from galaxy import web
from galaxy.managers.histories import (
HistoryManager,
HistorySerializer,
)
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.structured_app import StructuredApp
from galaxy.util import unicodify
from galaxy.webapps.base import controller
from ..api import depends
log = logging.getLogger(__name__)
# =============================================================================
[docs]class RootController(controller.JSAppLauncher, UsesAnnotations):
"""
Controller class that maps to the url root of Galaxy (i.e. '/').
"""
history_manager: HistoryManager = depends(HistoryManager)
history_serializer: HistorySerializer = depends(HistorySerializer)
[docs] @web.expose
def default(self, trans, target1=None, target2=None, **kwd):
"""
Called on any url that does not match a controller method.
"""
raise HTTPNotFound("This link may not be followed from within Galaxy.")
[docs] @web.expose
def index(self, trans, tool_id=None, workflow_id=None, history_id=None, m_c=None, m_a=None, **kwd):
"""
Root and entry point for client-side web app.
:type tool_id: str or None
:param tool_id: load center panel with given tool if not None
:type workflow_id: encoded id or None
:param workflow_id: load center panel with given workflow if not None
:type history_id: encoded id or None
:param history_id: switch current history to given history if not None
:type m_c: str or None
:param m_c: controller name (e.g. 'user')
:type m_a: str or None
:param m_a: controller method/action (e.g. 'dbkeys')
If m_c and m_a are present, the center panel will be loaded using the
controller and action as a url: (e.g. 'user/dbkeys').
"""
self._check_require_login(trans)
# if a history_id was sent, attempt to switch to that history
history = trans.history
if history_id:
unencoded_id = trans.security.decode_id(history_id)
history = self.history_manager.get_owned(unencoded_id, trans.user)
trans.set_history(history)
return self._bootstrapped_client(trans)
[docs] @web.expose
def login(self, trans, redirect=None, is_logout_redirect=False, **kwd):
"""
User login path for client-side.
"""
# directly redirect to oidc provider if 1) enable_oidc is True, 2)
# there is only one oidc provider, 3) auth_conf.xml has no authenticators
if (
trans.app.config.enable_oidc
and len(trans.app.config.oidc) == 1
and len(trans.app.auth_manager.authenticators) == 0
and is_logout_redirect is False
):
provider = next(iter(trans.app.config.oidc.keys()))
success, message, redirect_uri = trans.app.authnz_manager.authenticate(provider, trans)
if success:
return trans.response.send_redirect(redirect_uri)
return self.template(
trans,
"login",
redirect=redirect,
# an installation may have it's own welcome_url - show it here if they've set that
welcome_url=web.url_for(controller="root", action="welcome"),
show_welcome_with_login=trans.app.config.show_welcome_with_login,
)
# ---- Tool related -----------------------------------------------------
[docs] @web.expose
def display_as(self, trans, id=None, display_app=None, **kwd):
"""Returns a file in a format that can successfully be displayed in display_app."""
# TODO: unencoded id
data = trans.sa_session.query(self.app.model.HistoryDatasetAssociation).get(id)
authz_method = "rbac"
if "authz_method" in kwd:
authz_method = kwd["authz_method"]
if data:
if authz_method == "rbac" and trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), data.dataset
):
trans.response.set_content_type(data.get_mime())
trans.log_event(f"Formatted dataset id {str(id)} for display at {display_app}")
return data.as_display_type(display_app, **kwd)
elif authz_method == "display_at" and trans.app.host_security_agent.allow_action(
trans.request.remote_addr, data.permitted_actions.DATASET_ACCESS, dataset=data
):
trans.response.set_content_type(data.get_mime())
return data.as_display_type(display_app, **kwd)
else:
return "You are not allowed to access this dataset."
else:
return "No data with id=%d" % id
# ---- History management -----------------------------------------------
[docs] @web.expose
def clear_history(self, trans):
"""Clears the history for a user."""
# TODO: unused? (seems to only be used in TwillTestCase)
history = trans.get_history()
for dataset in history.datasets:
dataset.deleted = True
dataset.clear_associated_files()
trans.sa_session.flush()
trans.log_event(f"History id {str(history.id)} cleared")
trans.response.send_redirect(web.url_for("/index"))
[docs] @web.expose
def history_import(self, trans, id=None, confirm=False, **kwd):
# TODO: unused?
# TODO: unencoded id
user = trans.get_user()
user_history = trans.get_history()
if not id:
return trans.show_error_message("You must specify a history you want to import.")
import_history = trans.sa_session.query(trans.app.model.History).get(id)
if not import_history:
return trans.show_error_message("The specified history does not exist.")
if user:
if import_history.user_id == user.id:
return trans.show_error_message("You cannot import your own history.")
new_history = import_history.copy(target_user=trans.user)
new_history.name = f"imported: {new_history.name}"
new_history.user_id = user.id
galaxy_session = trans.get_galaxy_session()
try:
association = (
trans.sa_session.query(trans.app.model.GalaxySessionToHistoryAssociation)
.filter_by(session_id=galaxy_session.id, history_id=new_history.id)
.first()
)
except Exception:
association = None
new_history.add_galaxy_session(galaxy_session, association=association)
trans.sa_session.add(new_history)
trans.sa_session.flush()
if not user_history.datasets:
trans.set_history(new_history)
trans.log_event(f"History imported, id: {str(new_history.id)}, name: '{new_history.name}': ")
return trans.show_ok_message(
"""
History "{}" has been imported. Click <a href="{}">here</a>
to begin.""".format(
new_history.name, web.url_for("/")
)
)
elif not user_history.datasets or confirm:
new_history = import_history.copy()
new_history.name = f"imported: {new_history.name}"
new_history.user_id = None
galaxy_session = trans.get_galaxy_session()
try:
association = (
trans.sa_session.query(trans.app.model.GalaxySessionToHistoryAssociation)
.filter_by(session_id=galaxy_session.id, history_id=new_history.id)
.first()
)
except Exception:
association = None
new_history.add_galaxy_session(galaxy_session, association=association)
trans.sa_session.add(new_history)
trans.sa_session.flush()
trans.set_history(new_history)
trans.log_event(f"History imported, id: {str(new_history.id)}, name: '{new_history.name}': ")
return trans.show_ok_message(
"""
History "{}" has been imported. Click <a href="{}">here</a>
to begin.""".format(
new_history.name, web.url_for("/")
)
)
return trans.show_warn_message(
"""
Warning! If you import this history, you will lose your current
history. Click <a href="%s">here</a> to confirm.
"""
% web.url_for(controller="root", action="history_import", id=id, confirm=True)
)
[docs] @web.expose
def history_new(self, trans, name=None):
"""Create a new history with the given name
and refresh the history panel.
"""
trans.new_history(name=name)
trans.log_event(f"Created new History, id: {str(trans.history.id)}.")
return trans.show_message("New history created", refresh_frames=["history"])
[docs] @web.expose
def history_add_to(
self,
trans,
history_id=None,
file_data=None,
name="Data Added to History",
info=None,
ext="txt",
dbkey="?",
copy_access_from=None,
**kwd,
):
"""Adds a POSTed file to a History."""
# TODO: unencoded id
try:
history = trans.sa_session.query(trans.app.model.History).get(history_id)
data = trans.app.model.HistoryDatasetAssociation(
name=name, info=info, extension=ext, dbkey=dbkey, create_dataset=True, sa_session=trans.sa_session
)
if copy_access_from:
copy_access_from = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(
copy_access_from
)
trans.app.security_agent.copy_dataset_permissions(copy_access_from.dataset, data.dataset)
else:
permissions = trans.app.security_agent.history_get_default_permissions(history)
trans.app.security_agent.set_all_dataset_permissions(data.dataset, permissions)
trans.sa_session.add(data)
trans.sa_session.flush()
data_file = open(data.file_name, "wb")
file_data.file.seek(0)
data_file.write(file_data.file.read())
data_file.close()
data.state = data.states.OK
data.set_size()
data.init_meta()
data.set_meta()
trans.sa_session.flush()
history.add_dataset(data)
trans.sa_session.flush()
data.set_peek()
trans.sa_session.flush()
trans.log_event("Added dataset %d to history %d" % (data.id, trans.history.id))
return trans.show_ok_message(f"Dataset {str(data.hid)} added to history {str(history_id)}.")
except Exception as e:
msg = f"Failed to add dataset to history: {unicodify(e)}"
log.error(msg)
trans.log_event(msg)
return trans.show_error_message("Adding File to History has Failed")
[docs] @web.expose
def welcome(self, trans):
welcome_url = trans.app.config.config_value_for_host("welcome_url", trans.host)
return trans.response.send_redirect(web.url_for(welcome_url))