Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.base.webapp

"""
"""

import datetime
import inspect
import logging
import os
import re
import socket
import time
from http.cookies import CookieError
from typing import (
    Any,
    Dict,
    Optional,
)
from urllib.parse import urlparse

import mako.lookup
import mako.runtime
from apispec import APISpec
from paste.urlmap import URLMap
from sqlalchemy import (
    and_,
    select,
    true,
)
from sqlalchemy.orm.exc import NoResultFound

from galaxy import util
from galaxy.exceptions import (
    AuthenticationFailed,
    ConfigurationError,
    MalformedId,
    MessageException,
    RequestParameterMissingException,
)
from galaxy.managers import context
from galaxy.managers.session import GalaxySessionManager
from galaxy.managers.users import UserManager
from galaxy.model.base import (
    ensure_object_added_to_session,
    transaction,
)
from galaxy.structured_app import (
    BasicSharedApp,
    MinimalApp,
)
from galaxy.util import (
    asbool,
    safe_makedirs,
    unicodify,
)
from galaxy.util.sanitize_html import sanitize_html
from galaxy.version import VERSION
from galaxy.web.framework import (
    base,
    helpers,
    url_for,
)
from galaxy.web.framework.middleware.static import CacheableStaticURLParser as Static

try:
    from importlib.resources import files  # type: ignore[attr-defined]
except ImportError:
    # Python < 3.9
    from importlib_resources import files  # type: ignore[no-redef]

log = logging.getLogger(__name__)


UCSC_SERVERS = (
    "hgw1.cse.ucsc.edu",
    "hgw2.cse.ucsc.edu",
    "hgw3.cse.ucsc.edu",
    "hgw4.cse.ucsc.edu",
    "hgw5.cse.ucsc.edu",
    "hgw6.cse.ucsc.edu",
    "hgw7.cse.ucsc.edu",
    "hgw8.cse.ucsc.edu",
    "hgw1.soe.ucsc.edu",
    "hgw2.soe.ucsc.edu",
    "hgw3.soe.ucsc.edu",
    "hgw4.soe.ucsc.edu",
    "hgw5.soe.ucsc.edu",
    "hgw6.soe.ucsc.edu",
    "hgw7.soe.ucsc.edu",
    "hgw8.soe.ucsc.edu",
)

TOOL_RUNNER_SESSION_COOKIE = "galaxytoolrunnersession"


[docs]class WebApplication(base.WebApplication): """ Base WSGI application instantiated for all Galaxy webapps. A web application that: * adds API and UI controllers by scanning given directories and importing all modules found there. * has a security object. * builds mako template lookups. * generates GalaxyWebTransactions. """ injection_aware: bool = False
[docs] def __init__( self, galaxy_app: MinimalApp, session_cookie: str = "galaxysession", name: Optional[str] = None ) -> None: super().__init__() self.name = name galaxy_app.is_webapp = True self.set_transaction_factory(lambda e: self.transaction_chooser(e, galaxy_app, session_cookie)) # Mako support self.mako_template_lookup = self.create_mako_template_lookup(galaxy_app, name) # Security helper self.security = galaxy_app.security # We need this to set the REQUEST_ID contextvar in model.base *BEFORE* a GalaxyWebTransaction is created. # This will ensure a SQLAlchemy session is request-scoped for legacy (non-fastapi) endpoints. self.session_factories.append(galaxy_app.model)
[docs] def build_apispec(self): """ Traverse all route paths starting with "api" and create an APISpec instance. """ # API specification builder apispec = APISpec( title=self.name, version=VERSION, openapi_version="3.0.2", ) RE_URL = re.compile( r""" (?::\(|{) (\w*) (?::.*)? (?:\)|})""", re.X, ) DEFAULT_API_RESOURCE_NAMES = ("index", "create", "new", "update", "edit", "show", "delete") for rule in self.mapper.matchlist: if rule.routepath.endswith(".:(format)") or not rule.routepath.startswith("api/"): continue # Try to replace routes various ways to encode variables with simple swagger {form} swagger_path = "/%s" % RE_URL.sub(r"{\1}", rule.routepath) controller = rule.defaults.get("controller", "") action = rule.defaults.get("action", "") # Get the list of methods for the route methods = [] if rule.conditions: m = rule.conditions.get("method", []) methods = [m] if isinstance(m, str) else m # Find the controller class if controller not in self.api_controllers: # Only happens when removing a controller after porting to FastAPI. raise Exception(f"No controller class found for '{controller}', remove from buildapp.py ?") controller_class = self.api_controllers[controller] if not hasattr(controller_class, action): if action not in DEFAULT_API_RESOURCE_NAMES: # There's a manually specified action that points to a function that doesn't exist anymore raise Exception( f"No action found for {action} in class {controller_class}, remove from buildapp.py ?" ) continue action_method = getattr(controller_class, action) operations = {} # Add methods that have routes but are not documents for method in methods: if method.lower() not in operations: operations[method.lower()] = { "description": f"This route has not yet been ported to FastAPI. The documentation may not be complete.\n{action_method.__doc__}", "tags": ["undocumented"], } # Store the swagger path apispec.path(path=swagger_path, operations=operations) return apispec
[docs] def create_mako_template_lookup(self, galaxy_app, name): paths = [] base_package = ( "tool_shed.webapp" if galaxy_app.name == "tool_shed" else "galaxy.webapps.base" ) # reports has templates in galaxy package base_template_path = files(base_package) / "templates" # First look in webapp specific directory if name is not None: paths.append(base_template_path / "webapps" / name) # Then look in root directory paths.append(base_template_path) # Create TemplateLookup with a small cache return mako.lookup.TemplateLookup( directories=paths, module_directory=galaxy_app.config.template_cache_path, collection_size=500 )
[docs] def handle_controller_exception(self, e, trans, method, **kwargs): if isinstance(e, TypeError): method_signature = inspect.signature(method) required_parameters = { p.name for p in method_signature.parameters.values() if p.name != "trans" and p.default is inspect._empty } missing_required_parameters = required_parameters.difference(kwargs) if missing_required_parameters: e = RequestParameterMissingException( f"Required parameter(s) {', '.join(missing_required_parameters)} not provided in request." ) if isinstance(e, MessageException): # In the case of a controller exception, sanitize to make sure # unsafe html input isn't reflected back to the user trans.response.status = e.status_code return trans.show_message(sanitize_html(e.err_msg), e.type)
[docs] def make_body_iterable(self, trans, body): return base.WebApplication.make_body_iterable(self, trans, body)
[docs] def transaction_chooser(self, environ, galaxy_app: BasicSharedApp, session_cookie: str): return GalaxyWebTransaction(environ, galaxy_app, self, session_cookie)
[docs] def add_ui_controllers(self, package_name, app): """ Search for UI controllers in `package_name` and add them to the webapp. """ from galaxy.webapps.base.controller import BaseUIController for name, module in base.walk_controller_modules(package_name): # Look for a controller inside the modules for key in dir(module): T = getattr(module, key) if inspect.isclass(T) and T is not BaseUIController and issubclass(T, BaseUIController): controller = self._instantiate_controller(T, app) self.add_ui_controller(name, controller)
[docs] def add_api_controllers(self, package_name, app): """ Search for UI controllers in `package_name` and add them to the webapp. """ from galaxy.webapps.base.controller import BaseAPIController for name, module in base.walk_controller_modules(package_name): for key in dir(module): T = getattr(module, key) # Exclude classes such as BaseAPIController and BaseTagItemsController if inspect.isclass(T) and not key.startswith("Base") and issubclass(T, BaseAPIController): # By default use module_name, but allow controller to override name controller_name = getattr(T, "controller_name", name) controller = self._instantiate_controller(T, app) self.add_api_controller(controller_name, controller)
def _instantiate_controller(self, T, app): """Extension point, allow apps to construct controllers differently, really just used to stub out actual controllers for routes testing. """ controller = None if self.injection_aware: controller = app.resolve_or_none(T) if controller is not None: for key, value in T.__dict__.items(): if hasattr(value, "galaxy_type_depends"): value_type = value.galaxy_type_depends setattr(controller, key, app[value_type]) if controller is None: controller = T(app) return controller
[docs]def config_allows_origin(origin_raw, config): # boil origin header down to hostname origin = urlparse(origin_raw).hostname # singular match def matches_allowed_origin(origin, allowed_origin): if isinstance(allowed_origin, str): return origin == allowed_origin match = allowed_origin.match(origin) return match and match.group() == origin # localhost uses no origin header (== null) if not origin: return False # check for '*' or compare to list of allowed for allowed_origin in config.allowed_origin_hostnames: if allowed_origin == "*" or matches_allowed_origin(origin, allowed_origin): return True return False
[docs]def url_builder(*args, **kwargs) -> str: """Wrapper around the WSGI version of the function for reversing URLs.""" kwargs.update(kwargs.pop("query_params", {})) return url_for(*args, **kwargs)
[docs]class GalaxyWebTransaction(base.DefaultWebTransaction, context.ProvidesHistoryContext): """ Encapsulates web transaction specific state for the Galaxy application (specifically the user's "cookie" session and history) """
[docs] def __init__( self, environ: Dict[str, Any], app: BasicSharedApp, webapp: WebApplication, session_cookie: Optional[str] = None ) -> None: self._app = app self.webapp = webapp self.user_manager = app[UserManager] self.session_manager = app[GalaxySessionManager] super().__init__(environ) config = self.app.config self.debug = asbool(config.get("debug", False)) if x_frame_options := getattr(config, "x_frame_options", None): self.response.headers["X-Frame-Options"] = x_frame_options # Flag indicating whether we are in workflow building mode (means # that the current history should not be used for parameter values # and such). self.workflow_building_mode = False self.__user = None self.galaxy_session = None self.error_message = None self.host = self.request.host # set any cross origin resource sharing headers if configured to do so self.set_cors_headers() if self.environ.get("is_api_request", False): # With API requests, if there's a key, use it and associate the # user with the transaction. # If not, check for an active session but do not create one. # If an error message is set here, it's sent back using # trans.show_error in the response -- in expose_api. assert session_cookie self.error_message = self._authenticate_api(session_cookie) elif self.app.name == "reports": self.galaxy_session = None else: # This is a web request, get or create session. assert session_cookie self._ensure_valid_session(session_cookie) if hasattr(self.app, "authnz_manager") and self.app.authnz_manager: self.app.authnz_manager.refresh_expiring_oidc_tokens(self) # type: ignore[attr-defined] if self.galaxy_session: # When we've authenticated by session, we have to check the # following. # Prevent deleted users from accessing Galaxy if config.use_remote_user and self.galaxy_session.user.deleted: self.response.send_redirect(url_for("/static/user_disabled.html")) if config.require_login: self._ensure_logged_in_user(session_cookie) if config.session_duration: # TODO DBTODO All ajax calls from the client need to go through # a single point of control where we can do things like # redirect/etc. This is API calls as well as something like 40 # @web.json requests that might not get handled well on the # clientside. # # Make sure we're not past the duration, and either log out or # update timestamp. now = datetime.datetime.now() if self.galaxy_session.last_action: expiration_time = self.galaxy_session.last_action + datetime.timedelta( minutes=config.session_duration ) else: expiration_time = now self.galaxy_session.last_action = now - datetime.timedelta(seconds=1) self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit() if expiration_time < now: # Expiration time has passed. self.handle_user_logout() if self.environ.get("is_api_request", False): self.response.status = 401 self.user = None self.galaxy_session = None else: self.response.send_redirect( url_for( controller="root", action="login", message="You have been logged out due to inactivity. Please log in again to continue using Galaxy.", status="info", use_panels=True, ) ) else: self.galaxy_session.last_action = now self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit()
@property def app(self): return self._app @property def url_builder(self): return url_builder
[docs] def set_cors_allow(self, name=None, value=None): acr = "Access-Control-Request-" if name is None: for key in self.request.headers.keys(): if key.startswith(acr): self.set_cors_allow(name=key[len(acr) :], value=value) else: resp_name = f"Access-Control-Allow-{name}" if value is None: value = self.request.headers.get(acr + name, None) if value: self.response.headers[resp_name] = value elif resp_name in self.response.headers: del self.response.headers[resp_name]
[docs] def set_cors_origin(self, origin=None): if origin is None: origin = self.request.headers.get("Origin", None) if origin: self.response.headers["Access-Control-Allow-Origin"] = origin elif "Access-Control-Allow-Origin" in self.response.headers: del self.response.headers["Access-Control-Allow-Origin"]
[docs] def set_cors_headers(self): """Allow CORS requests if configured to do so by echoing back the request's 'Origin' header (if any) as the response header 'Access-Control-Allow-Origin' Preflight OPTIONS requests to the API work by routing all OPTIONS requests to a single method in the authenticate API (options method), setting CORS headers, and responding OK. NOTE: raising some errors (such as httpexceptions), will remove the header (e.g. client will get both CORS error and 404 inside that) """ # do not set any access control headers if not configured for it (common case) if not self.app.config.get("allowed_origin_hostnames", None): return # do not set any access control headers if there's no origin header on the request origin_header = self.request.headers.get("Origin", None) if not origin_header: return # check against the list of allowed strings/regexp hostnames, echo original if cleared if config_allows_origin(origin_header, self.app.config): self.set_cors_origin(origin=origin_header) else: self.response.status = 400
[docs] def get_user(self): """Return the current user if logged in or None.""" user = self.__user if not user and self.galaxy_session: user = self.galaxy_session.user self.__user = user return user
[docs] def set_user(self, user): """Set the current user.""" if self.galaxy_session: if user and not user.bootstrap_admin_user: self.galaxy_session.user = user self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit() self.__user = user
user = property(get_user, set_user) def _set_cookie(self, value, name="galaxysession", path="/", age=90, version="1", encode_value=False): """Convenience method for setting a session cookie""" # The galaxysession cookie value must be a high entropy 128 bit random number encrypted # using a server secret key. Any other value is invalid and could pose security issues. if encode_value: value = self.security.encode_guid(value) self.response.cookies[name] = unicodify(value) self.response.cookies[name]["path"] = path self.response.cookies[name]["max-age"] = 3600 * 24 * age # 90 days tstamp = time.localtime(time.time() + 3600 * 24 * age) self.response.cookies[name]["expires"] = time.strftime("%a, %d-%b-%Y %H:%M:%S GMT", tstamp) self.response.cookies[name]["version"] = version https = self.request.environ["wsgi.url_scheme"] == "https" if https: self.response.cookies[name]["secure"] = True try: self.response.cookies[name]["httponly"] = True except CookieError as e: log.warning(f"Error setting httponly attribute in cookie '{name}': {e}") if self.app.config.cookie_domain is not None: self.response.cookies[name]["domain"] = self.app.config.cookie_domain def _authenticate_api(self, session_cookie: str) -> Optional[str]: """ Authenticate for the API via key or session (if available). """ oidc_access_token = self.request.headers.get("Authorization", None) oidc_token_supplied = ( self.environ.get("is_api_request", False) and oidc_access_token and "Bearer " in oidc_access_token ) api_key = self.request.params.get("key", None) or self.request.headers.get("x-api-key", None) secure_id = self.get_cookie(name=session_cookie) api_key_supplied = self.environ.get("is_api_request", False) and api_key if api_key_supplied: # Sessionless API transaction, we just need to associate a user. try: user = self.user_manager.by_api_key(api_key) except AuthenticationFailed as e: return str(e) self.set_user(user) elif secure_id: # API authentication via active session # Associate user using existing session # This will throw an exception under remote auth with anon users. try: self._ensure_valid_session(session_cookie) except Exception: log.exception( "Exception during Session-based API authentication, this was most likely an attempt to use an anonymous cookie under remote authentication (so, no user), which we don't support." ) self.user = None self.galaxy_session = None elif oidc_token_supplied: # Sessionless API transaction with oidc token, we just need to associate a user. oidc_access_token = oidc_access_token.replace("Bearer ", "") try: user = self.user_manager.by_oidc_access_token(oidc_access_token) except AuthenticationFailed as e: return str(e) self.set_user(user) else: # Anonymous API interaction -- anything but @expose_api_anonymous will fail past here. self.user = None self.galaxy_session = None return None def _ensure_valid_session(self, session_cookie: str, create: bool = True) -> None: """ Ensure that a valid Galaxy session exists and is available as trans.session (part of initialization) """ # Try to load an existing session secure_id = self.get_cookie(name=session_cookie) galaxy_session = None prev_galaxy_session = None user_for_new_session = None invalidate_existing_session = False # Track whether the session has changed so we can avoid calling flush # in the most common case (session exists and is valid). galaxy_session_requires_flush = False if secure_id: # Decode the cookie value to get the session_key try: session_key = self.security.decode_guid(secure_id) except MalformedId: # Invalid session key, we're going to create a new one. # IIRC we did this when we switched to python 3 and clients # were sending sessioncookies that started with a stringified # bytestring, e.g 'b"0123456789abcdef"'. Maybe we need to drop # this exception catching, but then it'd be tricky to invalidate # a faulty session key log.debug("Received invalid session key '{secure_id}', setting a new session key") session_key = None if session_key: # We do NOT catch exceptions here, if the database is down the request should fail, # and we should not generate a new session. galaxy_session = self.session_manager.get_session_from_session_key(session_key=session_key) if not galaxy_session: session_key = None # If remote user is in use it can invalidate the session and in some # cases won't have a cookie set above, so we need to check some things # now. if self.app.config.use_remote_user: remote_user_email = self.environ.get(self.app.config.remote_user_header, None) if galaxy_session: if remote_user_email and galaxy_session.user is None: # No user, associate galaxy_session.user = self.user_manager.get_or_create_remote_user(remote_user_email) galaxy_session_requires_flush = True elif ( remote_user_email and galaxy_session.user.email != remote_user_email and ( not self.app.config.allow_user_impersonation or remote_user_email not in self.app.config.admin_users_list ) ): # Session exists but is not associated with the correct # remote user, and the currently set remote_user is not a # potentially impersonating admin. invalidate_existing_session = True user_for_new_session = self.user_manager.get_or_create_remote_user(remote_user_email) log.warning( "User logged in as '%s' externally, but has a cookie as '%s' invalidating session", remote_user_email, galaxy_session.user.email, ) elif remote_user_email: # No session exists, get/create user for new session user_for_new_session = self.user_manager.get_or_create_remote_user(remote_user_email) if (galaxy_session and galaxy_session.user is None) and user_for_new_session is None: raise Exception("Remote Authentication Failure - user is unknown and/or not supplied.") else: if galaxy_session is not None and galaxy_session.user and galaxy_session.user.external: # Remote user support is not enabled, but there is an existing # session with an external user, invalidate invalidate_existing_session = True log.warning( "User '%s' is an external user with an existing session, invalidating session since external auth is disabled", galaxy_session.user.email, ) elif galaxy_session is not None and galaxy_session.user is not None and galaxy_session.user.deleted: invalidate_existing_session = True log.warning(f"User '{galaxy_session.user.email}' is marked deleted, invalidating session") # Do we need to invalidate the session for some reason? if invalidate_existing_session: assert galaxy_session prev_galaxy_session = galaxy_session prev_galaxy_session.is_valid = False galaxy_session = None # No relevant cookies, or couldn't find, or invalid, so create a new session if galaxy_session is None: galaxy_session = self.__create_new_session(prev_galaxy_session, user_for_new_session) galaxy_session_requires_flush = True self.galaxy_session = galaxy_session self.__update_session_cookie(name=session_cookie) else: self.galaxy_session = galaxy_session if self.webapp.name == "galaxy": self.get_or_create_default_history() # Do we need to flush the session? if galaxy_session_requires_flush: self.sa_session.add(galaxy_session) # FIXME: If prev_session is a proper relation this would not # be needed. if prev_galaxy_session: self.sa_session.add(prev_galaxy_session) with transaction(self.sa_session): self.sa_session.commit() def _ensure_logged_in_user(self, session_cookie: str) -> None: # The value of session_cookie can be one of # 'galaxysession' or 'galaxycommunitysession' # Currently this method does nothing unless session_cookie is 'galaxysession' assert self.galaxy_session if session_cookie == "galaxysession" and self.galaxy_session.user is None: # TODO: re-engineer to eliminate the use of allowed_paths # as maintenance overhead is far too high. allowed_paths = [ # client app route # TODO: might be better as '/:username/login', '/:username/logout' url_for(controller="root", action="login"), url_for(controller="login", action="start"), # mako app routes url_for(controller="user", action="login"), url_for(controller="user", action="logout"), url_for(controller="user", action="reset_password"), url_for(controller="user", action="change_password"), # TODO: do any of these still need to bypass require login? url_for(controller="user", action="api_keys"), url_for(controller="user", action="create"), url_for(controller="user", action="index"), url_for(controller="user", action="manage_user_info"), url_for(controller="user", action="set_default_permissions"), ] # append the welcome url to allowed paths if we'll show it at the login screen if self.app.config.show_welcome_with_login: allowed_paths.append(url_for(controller="root", action="welcome")) # prevent redirect when UCSC server attempts to get dataset contents as 'anon' user display_as = url_for(controller="root", action="display_as") if self.app.datatypes_registry.get_display_sites("ucsc") and self.request.path == display_as: try: host = socket.gethostbyaddr(self.environ["REMOTE_ADDR"])[0] except (OSError, socket.herror, socket.gaierror, socket.timeout): host = None if host in UCSC_SERVERS: return # prevent redirect for external, enabled display applications getting dataset contents external_display_path = url_for(controller="", action="display_application") if self.request.path.startswith(external_display_path): request_path_split = self.request.path.split("/") try: if ( self.app.datatypes_registry.display_applications.get(request_path_split[-5]) and request_path_split[-4] in self.app.datatypes_registry.display_applications.get(request_path_split[-5]).links and request_path_split[-3] != "None" ): return except IndexError: pass authnz_controller_base = url_for(controller="authnz", action="index") if self.request.path.startswith(authnz_controller_base): # All authnz requests pass through return # redirect to root if the path is not in the list above if self.request.path not in allowed_paths: login_url = url_for(controller="root", action="login", redirect=self.request.path) self.response.send_redirect(login_url) def __create_new_session(self, prev_galaxy_session=None, user_for_new_session=None): """ Create a new GalaxySession for this request, possibly with a connection to a previous session (in `prev_galaxy_session`) and an existing user (in `user_for_new_session`). Caller is responsible for flushing the returned session. """ return create_new_session( self, prev_galaxy_session=prev_galaxy_session, user_for_new_session=user_for_new_session ) @property def cookie_path(self): # Cookies for non-root paths should not end with `/` -> https://stackoverflow.com/questions/36131023/setting-a-slash-on-cookie-path return (self.app.config.cookie_path or url_for("/")).rstrip("/") or "/" def __update_session_cookie(self, name="galaxysession"): """ Update the session cookie to match the current session. """ self.set_cookie(self.security.encode_guid(self.galaxy_session.session_key), name=name, path=self.cookie_path)
[docs] def check_user_library_import_dir(self, user): if getattr(self.app.config, "user_library_import_dir_auto_creation", False): # try to create a user library import directory try: safe_makedirs(os.path.join(self.app.config.user_library_import_dir, user.email)) except ConfigurationError as e: self.log_event(unicodify(e))
[docs] def user_checks(self, user): """ This could contain more checks around a user upon login """ self.check_user_library_import_dir(user)
def _associate_user_history(self, user, prev_galaxy_session=None): """ Associate the user's last accessed history (if exists) with their new session """ history = None set_permissions = False try: users_last_session = user.current_galaxy_session except Exception: users_last_session = None if ( prev_galaxy_session and prev_galaxy_session.current_history and not prev_galaxy_session.current_history.deleted and not prev_galaxy_session.current_history.empty and (prev_galaxy_session.current_history.user is None or prev_galaxy_session.current_history.user == user) ): # If the previous galaxy session had a history, associate it with the new session, but only if it didn't # belong to a different user. history = prev_galaxy_session.current_history if prev_galaxy_session.user is None: # Increase the user's disk usage by the amount of the previous history's datasets if they didn't already # own it. for hda in history.datasets: user.adjust_total_disk_usage(hda.quota_amount(user), hda.dataset.quota_source_info.label) # Only set default history permissions if the history is from the previous session and anonymous set_permissions = True elif self.galaxy_session.current_history: history = self.galaxy_session.current_history if ( not history and users_last_session and users_last_session.current_history and not users_last_session.current_history.deleted ): history = users_last_session.current_history if history not in self.galaxy_session.histories: self.galaxy_session.add_history(history) if not history: history = self.new_history() if history.user is None: history.user = user self.galaxy_session.current_history = history if set_permissions: self.app.security_agent.history_set_default_permissions( history, dataset=True, bypass_manage_permission=True ) self.sa_session.add_all((prev_galaxy_session, self.galaxy_session, history))
[docs] def handle_user_login(self, user): """ Login a new user (possibly newly created) - do some 'system' checks (if any) for this user - create a new session - associate new session with user - if old session had a history and it was not associated with a user, associate it with the new session, otherwise associate the current session's history with the user - add the disk usage of the current session to the user's total disk usage """ self.user_checks(user) self.app.security_agent.create_user_role(user, self.app) # Set the previous session prev_galaxy_session = self.galaxy_session prev_galaxy_session.is_valid = False # Define a new current_session self.galaxy_session = self.__create_new_session(prev_galaxy_session, user) if self.webapp.name == "galaxy": cookie_name = "galaxysession" self._associate_user_history(user, prev_galaxy_session) else: cookie_name = "galaxycommunitysession" self.sa_session.add_all((prev_galaxy_session, self.galaxy_session)) with transaction(self.sa_session): self.sa_session.commit() # This method is not called from the Galaxy reports, so the cookie will always be galaxysession self.__update_session_cookie(name=cookie_name)
[docs] def handle_user_logout(self, logout_all=False): """ Logout the current user: - invalidate the current session - create a new session with no user associated """ prev_galaxy_session = self.galaxy_session prev_galaxy_session.is_valid = False self.galaxy_session = self.__create_new_session(prev_galaxy_session) self.sa_session.add_all((prev_galaxy_session, self.galaxy_session)) galaxy_user_id = prev_galaxy_session.user_id if logout_all and galaxy_user_id is not None: stmt = select(self.app.model.GalaxySession).filter( and_( self.app.model.GalaxySession.user_id == galaxy_user_id, self.app.model.GalaxySession.is_valid == true(), self.app.model.GalaxySession.id != prev_galaxy_session.id, ) ) for other_galaxy_session in self.sa_session.scalars(stmt): other_galaxy_session.is_valid = False self.sa_session.add(other_galaxy_session) with transaction(self.sa_session): self.sa_session.commit() if self.webapp.name == "galaxy": # This method is not called from the Galaxy reports, so the cookie will always be galaxysession self.__update_session_cookie(name="galaxysession") elif self.webapp.name == "tool_shed": self.__update_session_cookie(name="galaxycommunitysession")
[docs] def get_galaxy_session(self): """ Return the current galaxy session """ return self.galaxy_session
[docs] def get_history(self, create=False, most_recent=False): """ Load the current history. - If that isn't available, we find the most recently updated history. - If *that* isn't available, we get or create the default history. Transactions will not always have an active history (API requests), so None is a valid response. """ history = None if self.galaxy_session: if hasattr(self.galaxy_session, "current_history"): history = self.galaxy_session.current_history if not history and most_recent: history = self.get_most_recent_history() if not history and util.string_as_bool(create): history = self.get_or_create_default_history() return history
[docs] def set_history(self, history): if history and not history.deleted: self.galaxy_session.current_history = history self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit()
@property def history(self): return self.get_history()
[docs] def get_or_create_default_history(self): """ Gets or creates a default history and associates it with the current session. """ # Just return the current history if one exists and is not deleted. history = self.galaxy_session.current_history if history and not history.deleted: return history # Look for an existing history that has the default name, is not # deleted, and is empty. If this exists, we associate it with the # current session and return it. user = self.galaxy_session.user if user: stmt = select(self.app.model.History).filter_by( user=user, name=self.app.model.History.default_name, deleted=False ) unnamed_histories = self.sa_session.scalars(stmt) for history in unnamed_histories: if history.empty: self.set_history(history) return history # Don't create new history if login required and user is anonymous if self.app.config.require_login and not self.user: return None # No suitable history found, create a new one. return self.new_history()
[docs] def get_most_recent_history(self): """ Gets the most recently updated history. """ # There must be a user to fetch histories, and without a user we have # no recent history. user = self.get_user() if not user: return None try: stmt = ( select(self.app.model.History) .filter_by(user=user, deleted=False) .order_by(self.app.model.History.update_time.desc()) .limit(1) ) recent_history = self.sa_session.scalars(stmt).first() except NoResultFound: return None self.set_history(recent_history) return recent_history
[docs] def new_history(self, name=None): """ Create a new history and associate it with the current session and its associated user (if set). """ # Create new history history = self.app.model.History() if name: history.name = name # Associate with session history.add_galaxy_session(self.galaxy_session) # Make it the session's current history self.galaxy_session.current_history = history # Associate with user if self.galaxy_session.user: history.user = self.galaxy_session.user # Track genome_build with history history.genome_build = self.app.genome_builds.default_value # Set the user's default history permissions self.app.security_agent.history_set_default_permissions(history) # Save self.sa_session.add_all((self.galaxy_session, history)) with transaction(self.sa_session): self.sa_session.commit() return history
@base.lazy_property def template_context(self): return dict()
[docs] def set_message(self, message, type=None): """ Convenience method for setting the 'message' and 'message_type' element of the template context. """ self.template_context["message"] = message if type: self.template_context["status"] = type
[docs] def get_message(self): """ Convenience method for getting the 'message' element of the template context. """ return self.template_context["message"]
[docs] def show_message(self, message, type="info", refresh_frames=None, cont=None, use_panels=False, active_view=""): """ Convenience method for displaying a simple page with a single message. `type`: one of "error", "warning", "info", or "done"; determines the type of dialog box and icon displayed with the message `refresh_frames`: names of frames in the interface that should be refreshed when the message is displayed """ refresh_frames = refresh_frames or [] return self.fill_template( "message.mako", status=type, message=message, refresh_frames=refresh_frames, cont=cont, use_panels=use_panels, active_view=active_view, )
[docs] def show_error_message(self, message, refresh_frames=None, use_panels=False, active_view=""): """ Convenience method for displaying an error message. See `show_message`. """ refresh_frames = refresh_frames or [] return self.show_message(message, "error", refresh_frames, use_panels=use_panels, active_view=active_view)
[docs] def show_ok_message(self, message, refresh_frames=None, use_panels=False, active_view=""): """ Convenience method for displaying an ok message. See `show_message`. """ refresh_frames = refresh_frames or [] return self.show_message(message, "done", refresh_frames, use_panels=use_panels, active_view=active_view)
[docs] def show_warn_message(self, message, refresh_frames=None, use_panels=False, active_view=""): """ Convenience method for displaying an warn message. See `show_message`. """ refresh_frames = refresh_frames or [] return self.show_message(message, "warning", refresh_frames, use_panels=use_panels, active_view=active_view)
@property def session_csrf_token(self): token = "" if self.galaxy_session: token = self.security.encode_id(self.galaxy_session.id, kind="csrf") return token
[docs] def check_csrf_token(self, payload): session_csrf_token = payload.get("session_csrf_token") if not session_csrf_token: return "No session token set, denying request." elif session_csrf_token != self.session_csrf_token: return "Wrong session token found, denying request."
[docs] def fill_template(self, filename, **kwargs): """ Fill in a template, putting any keyword arguments on the context. """ # call get_user so we can invalidate sessions from external users, # if external auth has been disabled. self.get_user() assert filename.endswith(".mako") return self.fill_template_mako(filename, **kwargs)
[docs] def fill_template_mako(self, filename, template_lookup=None, **kwargs): template_lookup = template_lookup or self.webapp.mako_template_lookup template = template_lookup.get_template(filename) data = dict( caller=self, t=self, trans=self, h=helpers, util=util, request=self.request, response=self.response, app=self.app, ) data.update(self.template_context) data.update(kwargs) return template.render(**data)
[docs] def qualified_url_for_path(self, path): return url_for(path, qualified=True)
[docs]def create_new_session(trans, prev_galaxy_session=None, user_for_new_session=None): """ Create a new GalaxySession for this request, possibly with a connection to a previous session (in `prev_galaxy_session`) and an existing user (in `user_for_new_session`). Caller is responsible for flushing the returned session. """ session_key = trans.security.get_new_guid() galaxy_session = trans.app.model.GalaxySession( session_key=session_key, is_valid=True, remote_host=trans.request.remote_host, remote_addr=trans.request.remote_addr, referer=trans.request.headers.get("Referer", None), ) if prev_galaxy_session: # Invalidated an existing session for some reason, keep track galaxy_session.prev_session_id = prev_galaxy_session.id if user_for_new_session: # The new session should be associated with the user galaxy_session.user = user_for_new_session ensure_object_added_to_session(galaxy_session, object_in_session=user_for_new_session) return galaxy_session
[docs]def default_url_path(path): return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
[docs]def build_url_map(app, global_conf, **local_conf): urlmap = URLMap() # Merge the global and local configurations conf = global_conf.copy() conf.update(local_conf) # Get cache time in seconds cache_time = conf.get("static_cache_time", None) if cache_time is not None: cache_time = int(cache_time) # Send to dynamic app by default urlmap["/"] = app def get_static_from_config(option_name, default_path): config_val = conf.get(option_name, default_url_path(default_path)) per_host_config_option = f"{option_name}_by_host" per_host_config = conf.get(per_host_config_option) return Static(config_val, cache_time, directory_per_host=per_host_config) # Define static mappings from config urlmap["/static"] = get_static_from_config("static_dir", "static/") urlmap["/images"] = get_static_from_config("static_images_dir", "static/images") urlmap["/static/scripts"] = get_static_from_config("static_scripts_dir", "static/scripts/") urlmap["/static/welcome.html"] = get_static_from_config("static_welcome_html", "static/welcome.html") urlmap["/favicon.ico"] = get_static_from_config("static_favicon_dir", "static/favicon.ico") urlmap["/robots.txt"] = get_static_from_config("static_robots_txt", "static/robots.txt") if "static_local_dir" in conf: urlmap["/static_local"] = Static(conf["static_local_dir"], cache_time) return urlmap