Source code for galaxy.webapps.base.webapp

"""
"""

import datetime
import inspect
import logging
import os
import re
import socket
import time
from http.cookies import CookieError
from typing import (
    Any,
    Dict,
    Optional,
)
from urllib.parse import urlparse

import mako.lookup
import mako.runtime
from apispec import APISpec
from paste.urlmap import URLMap
from sqlalchemy import (
    and_,
    select,
    true,
)
from sqlalchemy.orm.exc import NoResultFound

from galaxy import util
from galaxy.exceptions import (
    AuthenticationFailed,
    ConfigurationError,
    MalformedId,
    MessageException,
    RequestParameterMissingException,
)
from galaxy.managers import context
from galaxy.managers.session import GalaxySessionManager
from galaxy.managers.users import UserManager
from galaxy.model.base import (
    ensure_object_added_to_session,
    transaction,
)
from galaxy.structured_app import (
    BasicSharedApp,
    MinimalApp,
)
from galaxy.util import (
    asbool,
    safe_makedirs,
    unicodify,
)
from galaxy.util.sanitize_html import sanitize_html
from galaxy.version import VERSION
from galaxy.web.framework import (
    base,
    helpers,
    url_for,
)
from galaxy.web.framework.middleware.static import CacheableStaticURLParser as Static

try:
    from importlib.resources import files  # type: ignore[attr-defined]
except ImportError:
    # Python < 3.9
    from importlib_resources import files  # type: ignore[no-redef]

log = logging.getLogger(__name__)


UCSC_SERVERS = (
    "hgw1.cse.ucsc.edu",
    "hgw2.cse.ucsc.edu",
    "hgw3.cse.ucsc.edu",
    "hgw4.cse.ucsc.edu",
    "hgw5.cse.ucsc.edu",
    "hgw6.cse.ucsc.edu",
    "hgw7.cse.ucsc.edu",
    "hgw8.cse.ucsc.edu",
    "hgw1.soe.ucsc.edu",
    "hgw2.soe.ucsc.edu",
    "hgw3.soe.ucsc.edu",
    "hgw4.soe.ucsc.edu",
    "hgw5.soe.ucsc.edu",
    "hgw6.soe.ucsc.edu",
    "hgw7.soe.ucsc.edu",
    "hgw8.soe.ucsc.edu",
)

TOOL_RUNNER_SESSION_COOKIE = "galaxytoolrunnersession"


[docs]class WebApplication(base.WebApplication): """ Base WSGI application instantiated for all Galaxy webapps. A web application that: * adds API and UI controllers by scanning given directories and importing all modules found there. * has a security object. * builds mako template lookups. * generates GalaxyWebTransactions. """ injection_aware: bool = False
[docs] def __init__( self, galaxy_app: MinimalApp, session_cookie: str = "galaxysession", name: Optional[str] = None ) -> None: super().__init__() self.name = name galaxy_app.is_webapp = True self.set_transaction_factory(lambda e: self.transaction_chooser(e, galaxy_app, session_cookie)) # Mako support self.mako_template_lookup = self.create_mako_template_lookup(galaxy_app, name) # Security helper self.security = galaxy_app.security # We need this to set the REQUEST_ID contextvar in model.base *BEFORE* a GalaxyWebTransaction is created. # This will ensure a SQLAlchemy session is request-scoped for legacy (non-fastapi) endpoints. self._model = galaxy_app.model
[docs] def build_apispec(self): """ Traverse all route paths starting with "api" and create an APISpec instance. """ # API specification builder apispec = APISpec( title=self.name, version=VERSION, openapi_version="3.0.2", ) RE_URL = re.compile( r""" (?::\(|{) (\w*) (?::.*)? (?:\)|})""", re.X, ) DEFAULT_API_RESOURCE_NAMES = ("index", "create", "new", "update", "edit", "show", "delete") for rule in self.mapper.matchlist: if rule.routepath.endswith(".:(format)") or not rule.routepath.startswith("api/"): continue # Try to replace routes various ways to encode variables with simple swagger {form} swagger_path = "/%s" % RE_URL.sub(r"{\1}", rule.routepath) controller = rule.defaults.get("controller", "") action = rule.defaults.get("action", "") # Get the list of methods for the route methods = [] if rule.conditions: m = rule.conditions.get("method", []) methods = [m] if isinstance(m, str) else m # Find the controller class if controller not in self.api_controllers: # Only happens when removing a controller after porting to FastAPI. raise Exception(f"No controller class found for '{controller}', remove from buildapp.py ?") controller_class = self.api_controllers[controller] if not hasattr(controller_class, action): if action not in DEFAULT_API_RESOURCE_NAMES: # There's a manually specified action that points to a function that doesn't exist anymore raise Exception( f"No action found for {action} in class {controller_class}, remove from buildapp.py ?" ) continue action_method = getattr(controller_class, action) operations = {} # Add methods that have routes but are not documents for method in methods: if method.lower() not in operations: operations[method.lower()] = { "description": f"This route has not yet been ported to FastAPI. The documentation may not be complete.\n{action_method.__doc__}", "tags": ["undocumented"], } # Store the swagger path apispec.path(path=swagger_path, operations=operations) return apispec
[docs] def create_mako_template_lookup(self, galaxy_app, name): paths = [] base_package = ( "tool_shed.webapp" if galaxy_app.name == "tool_shed" else "galaxy.webapps.base" ) # reports has templates in galaxy package base_template_path = files(base_package) / "templates" # First look in webapp specific directory if name is not None: paths.append(base_template_path / "webapps" / name) # Then look in root directory paths.append(base_template_path) # Create TemplateLookup with a small cache return mako.lookup.TemplateLookup( directories=paths, module_directory=galaxy_app.config.template_cache_path, collection_size=500 )
[docs] def handle_controller_exception(self, e, trans, method, **kwargs): if isinstance(e, TypeError): method_signature = inspect.signature(method) required_parameters = { p.name for p in method_signature.parameters.values() if p.name != "trans" and p.default is inspect._empty } missing_required_parameters = required_parameters.difference(kwargs) if missing_required_parameters: e = RequestParameterMissingException( f"Required parameter(s) {', '.join(missing_required_parameters)} not provided in request." ) if isinstance(e, MessageException): # In the case of a controller exception, sanitize to make sure # unsafe html input isn't reflected back to the user trans.response.status = e.status_code return trans.show_message(sanitize_html(e.err_msg), e.type)
[docs] def make_body_iterable(self, trans, body): return base.WebApplication.make_body_iterable(self, trans, body)
[docs] def transaction_chooser(self, environ, galaxy_app: BasicSharedApp, session_cookie: str): return GalaxyWebTransaction(environ, galaxy_app, self, session_cookie)
[docs] def add_ui_controllers(self, package_name, app): """ Search for UI controllers in `package_name` and add them to the webapp. """ from galaxy.webapps.base.controller import BaseUIController for name, module in base.walk_controller_modules(package_name): # Look for a controller inside the modules for key in dir(module): T = getattr(module, key) if inspect.isclass(T) and T is not BaseUIController and issubclass(T, BaseUIController): controller = self._instantiate_controller(T, app) self.add_ui_controller(name, controller)
[docs] def add_api_controllers(self, package_name, app): """ Search for UI controllers in `package_name` and add them to the webapp. """ from galaxy.webapps.base.controller import BaseAPIController for name, module in base.walk_controller_modules(package_name): for key in dir(module): T = getattr(module, key) # Exclude classes such as BaseAPIController and BaseTagItemsController if inspect.isclass(T) and not key.startswith("Base") and issubclass(T, BaseAPIController): # By default use module_name, but allow controller to override name controller_name = getattr(T, "controller_name", name) controller = self._instantiate_controller(T, app) self.add_api_controller(controller_name, controller)
def _instantiate_controller(self, T, app): """Extension point, allow apps to construct controllers differently, really just used to stub out actual controllers for routes testing. """ controller = None if self.injection_aware: controller = app.resolve_or_none(T) if controller is not None: for key, value in T.__dict__.items(): if hasattr(value, "galaxy_type_depends"): value_type = value.galaxy_type_depends setattr(controller, key, app[value_type]) if controller is None: controller = T(app) return controller
[docs]def config_allows_origin(origin_raw, config): # boil origin header down to hostname origin = urlparse(origin_raw).hostname # singular match def matches_allowed_origin(origin, allowed_origin): if isinstance(allowed_origin, str): return origin == allowed_origin match = allowed_origin.match(origin) return match and match.group() == origin # localhost uses no origin header (== null) if not origin: return False # check for '*' or compare to list of allowed for allowed_origin in config.allowed_origin_hostnames: if allowed_origin == "*" or matches_allowed_origin(origin, allowed_origin): return True return False
[docs]def url_builder(*args, **kwargs) -> str: """Wrapper around the WSGI version of the function for reversing URLs.""" kwargs.update(kwargs.pop("query_params", {})) return url_for(*args, **kwargs)
[docs]class GalaxyWebTransaction(base.DefaultWebTransaction, context.ProvidesHistoryContext): """ Encapsulates web transaction specific state for the Galaxy application (specifically the user's "cookie" session and history) """
[docs] def __init__( self, environ: Dict[str, Any], app: BasicSharedApp, webapp: WebApplication, session_cookie: Optional[str] = None ) -> None: self._app = app self.webapp = webapp self.user_manager = app[UserManager] self.session_manager = app[GalaxySessionManager] super().__init__(environ) config = self.app.config self.debug = asbool(config.get("debug", False)) if x_frame_options := getattr(config, "x_frame_options", None): self.response.headers["X-Frame-Options"] = x_frame_options # Flag indicating whether we are in workflow building mode (means # that the current history should not be used for parameter values # and such). self.workflow_building_mode = False self.__user = None self.galaxy_session = None self.error_message = None self.host = self.request.host # set any cross origin resource sharing headers if configured to do so self.set_cors_headers() if self.environ.get("is_api_request", False): # With API requests, if there's a key, use it and associate the # user with the transaction. # If not, check for an active session but do not create one. # If an error message is set here, it's sent back using # trans.show_error in the response -- in expose_api. assert session_cookie self.error_message = self._authenticate_api(session_cookie) elif self.app.name == "reports": self.galaxy_session = None else: # This is a web request, get or create session. assert session_cookie self._ensure_valid_session(session_cookie) if hasattr(self.app, "authnz_manager") and self.app.authnz_manager: self.app.authnz_manager.refresh_expiring_oidc_tokens(self) # type: ignore[attr-defined] if self.galaxy_session: # When we've authenticated by session, we have to check the # following. # Prevent deleted users from accessing Galaxy if config.use_remote_user and self.galaxy_session.user.deleted: self.response.send_redirect(url_for("/static/user_disabled.html")) if config.require_login: self._ensure_logged_in_user(session_cookie) if config.session_duration: # TODO DBTODO All ajax calls from the client need to go through # a single point of control where we can do things like # redirect/etc. This is API calls as well as something like 40 # @web.json requests that might not get handled well on the # clientside. # # Make sure we're not past the duration, and either log out or # update timestamp. now = datetime.datetime.now() if self.galaxy_session.last_action: expiration_time = self.galaxy_session.last_action + datetime.timedelta( minutes=config.session_duration ) else: expiration_time = now self.galaxy_session.last_action = now - datetime.timedelta(seconds=1) self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit() if expiration_time < now: # Expiration time has passed. self.handle_user_logout() if self.environ.get("is_api_request", False): self.response.status = 401 self.user = None self.galaxy_session = None else: self.response.send_redirect( url_for( controller="root", action="login", message="You have been logged out due to inactivity. Please log in again to continue using Galaxy.", status="info", use_panels=True, ) ) else: self.galaxy_session.last_action = now self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit()
@property def app(self): return self._app @property def url_builder(self): return url_builder
[docs] def set_cors_allow(self, name=None, value=None): acr = "Access-Control-Request-" if name is None: for key in self.request.headers.keys(): if key.startswith(acr): self.set_cors_allow(name=key[len(acr) :], value=value) else: resp_name = f"Access-Control-Allow-{name}" if value is None: value = self.request.headers.get(acr + name, None) if value: self.response.headers[resp_name] = value elif resp_name in self.response.headers: del self.response.headers[resp_name]
[docs] def set_cors_origin(self, origin=None): if origin is None: origin = self.request.headers.get("Origin", None) if origin: self.response.headers["Access-Control-Allow-Origin"] = origin elif "Access-Control-Allow-Origin" in self.response.headers: del self.response.headers["Access-Control-Allow-Origin"]
[docs] def set_cors_headers(self): """Allow CORS requests if configured to do so by echoing back the request's 'Origin' header (if any) as the response header 'Access-Control-Allow-Origin' Preflight OPTIONS requests to the API work by routing all OPTIONS requests to a single method in the authenticate API (options method), setting CORS headers, and responding OK. NOTE: raising some errors (such as httpexceptions), will remove the header (e.g. client will get both CORS error and 404 inside that) """ # do not set any access control headers if not configured for it (common case) if not self.app.config.get("allowed_origin_hostnames", None): return # do not set any access control headers if there's no origin header on the request origin_header = self.request.headers.get("Origin", None) if not origin_header: return # check against the list of allowed strings/regexp hostnames, echo original if cleared if config_allows_origin(origin_header, self.app.config): self.set_cors_origin(origin=origin_header) else: self.response.status = 400
[docs] def get_user(self): """Return the current user if logged in or None.""" user = self.__user if not user and self.galaxy_session: user = self.galaxy_session.user self.__user = user return user
[docs] def set_user(self, user): """Set the current user.""" if self.galaxy_session: if user and not user.bootstrap_admin_user: self.galaxy_session.user = user self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit() self.__user = user
user = property(get_user, set_user) def _set_cookie(self, value, name="galaxysession", path="/", age=90, version="1", encode_value=False): """Convenience method for setting a session cookie""" # The galaxysession cookie value must be a high entropy 128 bit random number encrypted # using a server secret key. Any other value is invalid and could pose security issues. if encode_value: value = self.security.encode_guid(value) self.response.cookies[name] = unicodify(value) self.response.cookies[name]["path"] = path self.response.cookies[name]["max-age"] = 3600 * 24 * age # 90 days tstamp = time.localtime(time.time() + 3600 * 24 * age) self.response.cookies[name]["expires"] = time.strftime("%a, %d-%b-%Y %H:%M:%S GMT", tstamp) self.response.cookies[name]["version"] = version https = self.request.environ["wsgi.url_scheme"] == "https" if https: self.response.cookies[name]["secure"] = True try: self.response.cookies[name]["httponly"] = True except CookieError as e: log.warning(f"Error setting httponly attribute in cookie '{name}': {e}") if self.app.config.cookie_domain is not None: self.response.cookies[name]["domain"] = self.app.config.cookie_domain def _authenticate_api(self, session_cookie: str) -> Optional[str]: """ Authenticate for the API via key or session (if available). """ oidc_access_token = self.request.headers.get("Authorization", None) oidc_token_supplied = ( self.environ.get("is_api_request", False) and oidc_access_token and "Bearer " in oidc_access_token ) api_key = self.request.params.get("key", None) or self.request.headers.get("x-api-key", None) secure_id = self.get_cookie(name=session_cookie) api_key_supplied = self.environ.get("is_api_request", False) and api_key if api_key_supplied: # Sessionless API transaction, we just need to associate a user. try: user = self.user_manager.by_api_key(api_key) except AuthenticationFailed as e: return str(e) self.set_user(user) elif secure_id: # API authentication via active session # Associate user using existing session # This will throw an exception under remote auth with anon users. try: self._ensure_valid_session(session_cookie) except Exception: log.exception( "Exception during Session-based API authentication, this was most likely an attempt to use an anonymous cookie under remote authentication (so, no user), which we don't support." ) self.user = None self.galaxy_session = None elif oidc_token_supplied: # Sessionless API transaction with oidc token, we just need to associate a user. oidc_access_token = oidc_access_token.replace("Bearer ", "") try: user = self.user_manager.by_oidc_access_token(oidc_access_token) except AuthenticationFailed as e: return str(e) self.set_user(user) else: # Anonymous API interaction -- anything but @expose_api_anonymous will fail past here. self.user = None self.galaxy_session = None return None def _ensure_valid_session(self, session_cookie: str, create: bool = True) -> None: """ Ensure that a valid Galaxy session exists and is available as trans.session (part of initialization) """ # Try to load an existing session secure_id = self.get_cookie(name=session_cookie) galaxy_session = None prev_galaxy_session = None user_for_new_session = None invalidate_existing_session = False # Track whether the session has changed so we can avoid calling flush # in the most common case (session exists and is valid). galaxy_session_requires_flush = False if secure_id: # Decode the cookie value to get the session_key try: session_key = self.security.decode_guid(secure_id) except MalformedId: # Invalid session key, we're going to create a new one. # IIRC we did this when we switched to python 3 and clients # were sending sessioncookies that started with a stringified # bytestring, e.g 'b"0123456789abcdef"'. Maybe we need to drop # this exception catching, but then it'd be tricky to invalidate # a faulty session key log.debug("Received invalid session key '{secure_id}', setting a new session key") session_key = None if session_key: # We do NOT catch exceptions here, if the database is down the request should fail, # and we should not generate a new session. galaxy_session = self.session_manager.get_session_from_session_key(session_key=session_key) if not galaxy_session: session_key = None # If remote user is in use it can invalidate the session and in some # cases won't have a cookie set above, so we need to check some things # now. if self.app.config.use_remote_user: remote_user_email = self.environ.get(self.app.config.remote_user_header, None) if galaxy_session: if remote_user_email and galaxy_session.user is None: # No user, associate galaxy_session.user = self.user_manager.get_or_create_remote_user(remote_user_email) galaxy_session_requires_flush = True elif ( remote_user_email and galaxy_session.user.email != remote_user_email and ( not self.app.config.allow_user_impersonation or remote_user_email not in self.app.config.admin_users_list ) ): # Session exists but is not associated with the correct # remote user, and the currently set remote_user is not a # potentially impersonating admin. invalidate_existing_session = True user_for_new_session = self.user_manager.get_or_create_remote_user(remote_user_email) log.warning( "User logged in as '%s' externally, but has a cookie as '%s' invalidating session", remote_user_email, galaxy_session.user.email, ) elif remote_user_email: # No session exists, get/create user for new session user_for_new_session = self.user_manager.get_or_create_remote_user(remote_user_email) if (galaxy_session and galaxy_session.user is None) and user_for_new_session is None: raise Exception("Remote Authentication Failure - user is unknown and/or not supplied.") else: if galaxy_session is not None and galaxy_session.user and galaxy_session.user.external: # Remote user support is not enabled, but there is an existing # session with an external user, invalidate invalidate_existing_session = True log.warning( "User '%s' is an external user with an existing session, invalidating session since external auth is disabled", galaxy_session.user.email, ) elif galaxy_session is not None and galaxy_session.user is not None and galaxy_session.user.deleted: invalidate_existing_session = True log.warning(f"User '{galaxy_session.user.email}' is marked deleted, invalidating session") # Do we need to invalidate the session for some reason? if invalidate_existing_session: assert galaxy_session prev_galaxy_session = galaxy_session prev_galaxy_session.is_valid = False galaxy_session = None # No relevant cookies, or couldn't find, or invalid, so create a new session if galaxy_session is None: galaxy_session = self.__create_new_session(prev_galaxy_session, user_for_new_session) galaxy_session_requires_flush = True self.galaxy_session = galaxy_session self.__update_session_cookie(name=session_cookie) else: self.galaxy_session = galaxy_session if self.webapp.name == "galaxy": self.get_or_create_default_history() # Do we need to flush the session? if galaxy_session_requires_flush: self.sa_session.add(galaxy_session) # FIXME: If prev_session is a proper relation this would not # be needed. if prev_galaxy_session: self.sa_session.add(prev_galaxy_session) with transaction(self.sa_session): self.sa_session.commit() def _ensure_logged_in_user(self, session_cookie: str) -> None: # The value of session_cookie can be one of # 'galaxysession' or 'galaxycommunitysession' # Currently this method does nothing unless session_cookie is 'galaxysession' assert self.galaxy_session if session_cookie == "galaxysession" and self.galaxy_session.user is None: # TODO: re-engineer to eliminate the use of allowed_paths # as maintenance overhead is far too high. allowed_paths = [ # client app route # TODO: might be better as '/:username/login', '/:username/logout' url_for(controller="root", action="login"), url_for(controller="login", action="start"), # mako app routes url_for(controller="user", action="login"), url_for(controller="user", action="logout"), url_for(controller="user", action="reset_password"), url_for(controller="user", action="change_password"), # TODO: do any of these still need to bypass require login? url_for(controller="user", action="api_keys"), url_for(controller="user", action="create"), url_for(controller="user", action="index"), url_for(controller="user", action="manage_user_info"), url_for(controller="user", action="set_default_permissions"), ] # append the welcome url to allowed paths if we'll show it at the login screen if self.app.config.show_welcome_with_login: allowed_paths.append(url_for(controller="root", action="welcome")) # prevent redirect when UCSC server attempts to get dataset contents as 'anon' user display_as = url_for(controller="root", action="display_as") if self.app.datatypes_registry.get_display_sites("ucsc") and self.request.path == display_as: try: host = socket.gethostbyaddr(self.environ["REMOTE_ADDR"])[0] except (OSError, socket.herror, socket.gaierror, socket.timeout): host = None if host in UCSC_SERVERS: return # prevent redirect for external, enabled display applications getting dataset contents external_display_path = url_for(controller="", action="display_application") if self.request.path.startswith(external_display_path): request_path_split = self.request.path.split("/") try: if ( self.app.datatypes_registry.display_applications.get(request_path_split[-5]) and request_path_split[-4] in self.app.datatypes_registry.display_applications.get(request_path_split[-5]).links and request_path_split[-3] != "None" ): return except IndexError: pass authnz_controller_base = url_for(controller="authnz", action="index") if self.request.path.startswith(authnz_controller_base): # All authnz requests pass through return # redirect to root if the path is not in the list above if self.request.path not in allowed_paths: login_url = url_for(controller="root", action="login", redirect=self.request.path) self.response.send_redirect(login_url) def __create_new_session(self, prev_galaxy_session=None, user_for_new_session=None): """ Create a new GalaxySession for this request, possibly with a connection to a previous session (in `prev_galaxy_session`) and an existing user (in `user_for_new_session`). Caller is responsible for flushing the returned session. """ return create_new_session( self, prev_galaxy_session=prev_galaxy_session, user_for_new_session=user_for_new_session ) @property def cookie_path(self): # Cookies for non-root paths should not end with `/` -> https://stackoverflow.com/questions/36131023/setting-a-slash-on-cookie-path return (self.app.config.cookie_path or url_for("/")).rstrip("/") or "/" def __update_session_cookie(self, name="galaxysession"): """ Update the session cookie to match the current session. """ self.set_cookie(self.security.encode_guid(self.galaxy_session.session_key), name=name, path=self.cookie_path)
[docs] def check_user_library_import_dir(self, user): if getattr(self.app.config, "user_library_import_dir_auto_creation", False): # try to create a user library import directory try: safe_makedirs(os.path.join(self.app.config.user_library_import_dir, user.email)) except ConfigurationError as e: self.log_event(unicodify(e))
[docs] def user_checks(self, user): """ This could contain more checks around a user upon login """ self.check_user_library_import_dir(user)
def _associate_user_history(self, user, prev_galaxy_session=None): """ Associate the user's last accessed history (if exists) with their new session """ history = None set_permissions = False try: users_last_session = user.current_galaxy_session except Exception: users_last_session = None if ( prev_galaxy_session and prev_galaxy_session.current_history and not prev_galaxy_session.current_history.deleted and not prev_galaxy_session.current_history.empty and (prev_galaxy_session.current_history.user is None or prev_galaxy_session.current_history.user == user) ): # If the previous galaxy session had a history, associate it with the new session, but only if it didn't # belong to a different user. history = prev_galaxy_session.current_history if prev_galaxy_session.user is None: # Increase the user's disk usage by the amount of the previous history's datasets if they didn't already # own it. for hda in history.datasets: user.adjust_total_disk_usage(hda.quota_amount(user), hda.dataset.quota_source_info.label) # Only set default history permissions if the history is from the previous session and anonymous set_permissions = True elif self.galaxy_session.current_history: history = self.galaxy_session.current_history if ( not history and users_last_session and users_last_session.current_history and not users_last_session.current_history.deleted ): history = users_last_session.current_history if history not in self.galaxy_session.histories: self.galaxy_session.add_history(history) if not history: history = self.new_history() if history.user is None: history.user = user self.galaxy_session.current_history = history if set_permissions: self.app.security_agent.history_set_default_permissions( history, dataset=True, bypass_manage_permission=True ) self.sa_session.add_all((prev_galaxy_session, self.galaxy_session, history))
[docs] def handle_user_login(self, user): """ Login a new user (possibly newly created) - do some 'system' checks (if any) for this user - create a new session - associate new session with user - if old session had a history and it was not associated with a user, associate it with the new session, otherwise associate the current session's history with the user - add the disk usage of the current session to the user's total disk usage """ self.user_checks(user) self.app.security_agent.create_user_role(user, self.app) # Set the previous session prev_galaxy_session = self.galaxy_session prev_galaxy_session.is_valid = False # Define a new current_session self.galaxy_session = self.__create_new_session(prev_galaxy_session, user) if self.webapp.name == "galaxy": cookie_name = "galaxysession" self._associate_user_history(user, prev_galaxy_session) else: cookie_name = "galaxycommunitysession" self.sa_session.add_all((prev_galaxy_session, self.galaxy_session)) with transaction(self.sa_session): self.sa_session.commit() # This method is not called from the Galaxy reports, so the cookie will always be galaxysession self.__update_session_cookie(name=cookie_name)
[docs] def handle_user_logout(self, logout_all=False): """ Logout the current user: - invalidate the current session - create a new session with no user associated """ prev_galaxy_session = self.galaxy_session prev_galaxy_session.is_valid = False self.galaxy_session = self.__create_new_session(prev_galaxy_session) self.sa_session.add_all((prev_galaxy_session, self.galaxy_session)) galaxy_user_id = prev_galaxy_session.user_id if logout_all and galaxy_user_id is not None: stmt = select(self.app.model.GalaxySession).filter( and_( self.app.model.GalaxySession.user_id == galaxy_user_id, self.app.model.GalaxySession.is_valid == true(), self.app.model.GalaxySession.id != prev_galaxy_session.id, ) ) for other_galaxy_session in self.sa_session.scalars(stmt): other_galaxy_session.is_valid = False self.sa_session.add(other_galaxy_session) with transaction(self.sa_session): self.sa_session.commit() if self.webapp.name == "galaxy": # This method is not called from the Galaxy reports, so the cookie will always be galaxysession self.__update_session_cookie(name="galaxysession") elif self.webapp.name == "tool_shed": self.__update_session_cookie(name="galaxycommunitysession")
[docs] def get_galaxy_session(self): """ Return the current galaxy session """ return self.galaxy_session
[docs] def get_history(self, create=False, most_recent=False): """ Load the current history. - If that isn't available, we find the most recently updated history. - If *that* isn't available, we get or create the default history. Transactions will not always have an active history (API requests), so None is a valid response. """ history = None if self.galaxy_session: if hasattr(self.galaxy_session, "current_history"): history = self.galaxy_session.current_history if not history and most_recent: history = self.get_most_recent_history() if not history and util.string_as_bool(create): history = self.get_or_create_default_history() return history
[docs] def set_history(self, history): if history and not history.deleted: self.galaxy_session.current_history = history self.sa_session.add(self.galaxy_session) with transaction(self.sa_session): self.sa_session.commit()
@property def history(self): return self.get_history()
[docs] def get_or_create_default_history(self): """ Gets or creates a default history and associates it with the current session. """ # Just return the current history if one exists and is not deleted. history = self.galaxy_session.current_history if history and not history.deleted: return history # Look for an existing history that has the default name, is not # deleted, and is empty. If this exists, we associate it with the # current session and return it. user = self.galaxy_session.user if user: stmt = select(self.app.model.History).filter_by( user=user, name=self.app.model.History.default_name, deleted=False ) unnamed_histories = self.sa_session.scalars(stmt) for history in unnamed_histories: if history.empty: self.set_history(history) return history # No suitable history found, create a new one. return self.new_history()
[docs] def get_most_recent_history(self): """ Gets the most recently updated history. """ # There must be a user to fetch histories, and without a user we have # no recent history. user = self.get_user() if not user: return None try: stmt = ( select(self.app.model.History) .filter_by(user=user, deleted=False) .order_by(self.app.model.History.update_time.desc()) .limit(1) ) recent_history = self.sa_session.scalars(stmt).first() except NoResultFound: return None self.set_history(recent_history) return recent_history
[docs] def new_history(self, name=None): """ Create a new history and associate it with the current session and its associated user (if set). """ # Create new history history = self.app.model.History() if name: history.name = name # Associate with session history.add_galaxy_session(self.galaxy_session) # Make it the session's current history self.galaxy_session.current_history = history # Associate with user if self.galaxy_session.user: history.user = self.galaxy_session.user # Track genome_build with history history.genome_build = self.app.genome_builds.default_value # Set the user's default history permissions self.app.security_agent.history_set_default_permissions(history) # Save self.sa_session.add_all((self.galaxy_session, history)) with transaction(self.sa_session): self.sa_session.commit() return history
@base.lazy_property def template_context(self): return dict()
[docs] def set_message(self, message, type=None): """ Convenience method for setting the 'message' and 'message_type' element of the template context. """ self.template_context["message"] = message if type: self.template_context["status"] = type
[docs] def get_message(self): """ Convenience method for getting the 'message' element of the template context. """ return self.template_context["message"]
[docs] def show_message(self, message, type="info", refresh_frames=None, cont=None, use_panels=False, active_view=""): """ Convenience method for displaying a simple page with a single message. `type`: one of "error", "warning", "info", or "done"; determines the type of dialog box and icon displayed with the message `refresh_frames`: names of frames in the interface that should be refreshed when the message is displayed """ refresh_frames = refresh_frames or [] return self.fill_template( "message.mako", status=type, message=message, refresh_frames=refresh_frames, cont=cont, use_panels=use_panels, active_view=active_view, )
[docs] def show_error_message(self, message, refresh_frames=None, use_panels=False, active_view=""): """ Convenience method for displaying an error message. See `show_message`. """ refresh_frames = refresh_frames or [] return self.show_message(message, "error", refresh_frames, use_panels=use_panels, active_view=active_view)
[docs] def show_ok_message(self, message, refresh_frames=None, use_panels=False, active_view=""): """ Convenience method for displaying an ok message. See `show_message`. """ refresh_frames = refresh_frames or [] return self.show_message(message, "done", refresh_frames, use_panels=use_panels, active_view=active_view)
[docs] def show_warn_message(self, message, refresh_frames=None, use_panels=False, active_view=""): """ Convenience method for displaying an warn message. See `show_message`. """ refresh_frames = refresh_frames or [] return self.show_message(message, "warning", refresh_frames, use_panels=use_panels, active_view=active_view)
@property def session_csrf_token(self): token = "" if self.galaxy_session: token = self.security.encode_id(self.galaxy_session.id, kind="csrf") return token
[docs] def check_csrf_token(self, payload): session_csrf_token = payload.get("session_csrf_token") if not session_csrf_token: return "No session token set, denying request." elif session_csrf_token != self.session_csrf_token: return "Wrong session token found, denying request."
[docs] def fill_template(self, filename, **kwargs): """ Fill in a template, putting any keyword arguments on the context. """ # call get_user so we can invalidate sessions from external users, # if external auth has been disabled. self.get_user() assert filename.endswith(".mako") return self.fill_template_mako(filename, **kwargs)
[docs] def fill_template_mako(self, filename, template_lookup=None, **kwargs): template_lookup = template_lookup or self.webapp.mako_template_lookup template = template_lookup.get_template(filename) data = dict( caller=self, t=self, trans=self, h=helpers, util=util, request=self.request, response=self.response, app=self.app, ) data.update(self.template_context) data.update(kwargs) return template.render(**data)
[docs] def qualified_url_for_path(self, path): return url_for(path, qualified=True)
[docs]def create_new_session(trans, prev_galaxy_session=None, user_for_new_session=None): """ Create a new GalaxySession for this request, possibly with a connection to a previous session (in `prev_galaxy_session`) and an existing user (in `user_for_new_session`). Caller is responsible for flushing the returned session. """ session_key = trans.security.get_new_guid() galaxy_session = trans.app.model.GalaxySession( session_key=session_key, is_valid=True, remote_host=trans.request.remote_host, remote_addr=trans.request.remote_addr, referer=trans.request.headers.get("Referer", None), ) if prev_galaxy_session: # Invalidated an existing session for some reason, keep track galaxy_session.prev_session_id = prev_galaxy_session.id if user_for_new_session: # The new session should be associated with the user galaxy_session.user = user_for_new_session ensure_object_added_to_session(galaxy_session, object_in_session=user_for_new_session) return galaxy_session
[docs]def default_url_path(path): return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
[docs]def build_url_map(app, global_conf, **local_conf): urlmap = URLMap() # Merge the global and local configurations conf = global_conf.copy() conf.update(local_conf) # Get cache time in seconds cache_time = conf.get("static_cache_time", None) if cache_time is not None: cache_time = int(cache_time) # Send to dynamic app by default urlmap["/"] = app def get_static_from_config(option_name, default_path): config_val = conf.get(option_name, default_url_path(default_path)) per_host_config_option = f"{option_name}_by_host" per_host_config = conf.get(per_host_config_option) return Static(config_val, cache_time, directory_per_host=per_host_config) # Define static mappings from config urlmap["/static"] = get_static_from_config("static_dir", "static/") urlmap["/images"] = get_static_from_config("static_images_dir", "static/images") urlmap["/static/scripts"] = get_static_from_config("static_scripts_dir", "static/scripts/") urlmap["/static/welcome.html"] = get_static_from_config("static_welcome_html", "static/welcome.html") urlmap["/favicon.ico"] = get_static_from_config("static_favicon_dir", "static/favicon.ico") urlmap["/robots.txt"] = get_static_from_config("static_robots_txt", "static/robots.txt") if "static_local_dir" in conf: urlmap["/static_local"] = Static(conf["static_local_dir"], cache_time) return urlmap