Source code for galaxy.webapps.galaxy.controllers.user

"""
Contains the user interface in the Universe class
"""

import logging
from datetime import (
    datetime,
    timedelta,
)
from urllib.parse import unquote

from markupsafe import escape
from sqlalchemy.orm.exc import NoResultFound

from galaxy import (
    util,
    web,
)
from galaxy.exceptions import Conflict
from galaxy.managers import users
from galaxy.managers.users import get_user_by_email
from galaxy.security.validate_user_input import (
    validate_email,
    validate_publicname,
)
from galaxy.structured_app import StructuredApp
from galaxy.web import (
    expose_api_anonymous_and_sessionless,
    url_for,
)
from galaxy.webapps.base.controller import (
    BaseUIController,
    UsesFormDefinitionsMixin,
)
from ..api import depends

log = logging.getLogger(__name__)


def _filtered_registration_params_dict(payload):
    return {k: v for (k, v) in payload.items() if k in ["email", "username", "password", "confirm", "subscribe"]}


[docs]class User(BaseUIController, UsesFormDefinitionsMixin): user_manager: users.UserManager = depends(users.UserManager) installed_len_files = None
[docs] def __init__(self, app: StructuredApp): super().__init__(app)
def __handle_role_and_group_auto_creation( self, trans, user, roles, auto_create_roles=False, auto_create_groups=False, auto_assign_roles_to_groups_only=False, ): for role_name in roles: role = None group = None if auto_create_roles: try: # first try to find the role role = trans.app.security_agent.get_role(role_name) except NoResultFound: # or create it role, num_in_groups = trans.app.security_agent.create_role( role_name, "Auto created upon user registration", [], [], create_group_for_role=auto_create_groups, ) if auto_create_groups: trans.log_event("Created role and group for auto-registered user.") else: trans.log_event("Created role for auto-registered user.") if auto_create_groups: # only create a group if not existing yet try: group = ( self.sa_session.query(trans.app.model.Group) .filter(trans.app.model.Group.name == role_name) .first() ) except NoResultFound: group = self.model.Group(name=role_name) self.sa_session.add(group) trans.app.security_agent.associate_user_group(user, group) if auto_assign_roles_to_groups_only and group and role: trans.log_event("Assigning role to group only") trans.app.security_agent.associate_group_role(group, role) elif not auto_assign_roles_to_groups_only and role: trans.log_event("Assigning role to newly created user") trans.app.security_agent.associate_user_role(user, role) def __autoregistration(self, trans, login, password): """ Does the autoregistration if enabled. Returns a message """ try: autoreg = trans.app.auth_manager.check_auto_registration(trans, login, password) except Conflict as conflict: return f"Auto-registration failed, {conflict}", None user = None if autoreg["auto_reg"]: email = autoreg["email"] username = autoreg["username"] message = " ".join( (validate_email(trans, email, allow_empty=True), validate_publicname(trans, username)) ).rstrip() if not message: user = self.user_manager.create(email=email, username=username, password="") if trans.app.config.user_activation_on: self.user_manager.send_activation_email(trans, email, username) # The handle_user_login() method has a call to the history_set_default_permissions() method # (needed when logging in with a history), user needs to have default permissions set before logging in if not trans.user_is_admin: trans.handle_user_login(user) trans.log_event("User (auto) created a new account") trans.log_event("User logged in") if "attributes" in autoreg and "roles" in autoreg["attributes"]: self.__handle_role_and_group_auto_creation( trans, user, autoreg["attributes"]["roles"], auto_create_groups=autoreg["auto_create_groups"], auto_create_roles=autoreg["auto_create_roles"], auto_assign_roles_to_groups_only=autoreg["auto_assign_roles_to_groups_only"], ) else: message = f"Auto-registration failed, contact your local Galaxy administrator. {message}" else: message = "No such user or invalid password." return message, user
[docs] @expose_api_anonymous_and_sessionless def login(self, trans, payload=None, **kwd): payload = payload or {} return self.__validate_login(trans, payload, **kwd)
def __validate_login(self, trans, payload=None, **kwd): """Handle Galaxy Log in""" if not payload: payload = kwd message = trans.check_csrf_token(payload) if message: return self.message_exception(trans, message) login = payload.get("login") password = payload.get("password") redirect = payload.get("redirect") status = None if not login or not password: return self.message_exception(trans, "Please specify a username and password.") user = self.user_manager.get_user_by_identity(login) log.debug(f"trans.app.config.auth_config_file: {trans.app.config.auth_config_file}") if user is None: message, user = self.__autoregistration(trans, login, password) if message: return self.message_exception(trans, message) elif user.deleted: message = ( "This account has been marked deleted, contact your local Galaxy administrator to restore the account." ) if trans.app.config.error_email_to is not None: message += f" Contact: {trans.app.config.error_email_to}." return self.message_exception(trans, message, sanitize=False) elif user.external: message = "This account was created for use with an external authentication method, contact your local Galaxy administrator to activate it." if trans.app.config.error_email_to is not None: message += f" Contact: {trans.app.config.error_email_to}." return self.message_exception(trans, message, sanitize=False) elif not trans.app.auth_manager.check_password(user, password, trans.request): return self.message_exception(trans, "Invalid password.") elif trans.app.config.user_activation_on and not user.active: # activation is ON and the user is INACTIVE if trans.app.config.activation_grace_period != 0: # grace period is ON if self.is_outside_grace_period( trans, user.create_time ): # User is outside the grace period. Login is disabled and he will have the activation email resent. message, status = self.resend_activation_email(trans, user.email, user.username) return self.message_exception(trans, message, sanitize=False) else: # User is within the grace period, let him log in. trans.handle_user_login(user) trans.log_event("User logged in") else: # Grace period is off. Login is disabled and user will have the activation email resent. message, status = self.resend_activation_email(trans, user.email, user.username) return self.message_exception(trans, message, sanitize=False) else: # activation is OFF pw_expires = getattr(trans.app.config, "password_expiration_period", None) if pw_expires and user.last_password_change < datetime.today() - pw_expires: # Password is expired, we don't log them in. return { "message": "Your password has expired. Please reset or change it to access Galaxy.", "status": "warning", "expired_user": trans.security.encode_id(user.id), } trans.handle_user_login(user) trans.log_event("User logged in") if pw_expires and user.last_password_change < datetime.today() - timedelta(days=pw_expires.days / 10): # If password is about to expire, modify message to state that. expiredate = datetime.today() - user.last_password_change + pw_expires return {"message": f"Your password will expire in {expiredate.days} day(s).", "status": "warning"} return {"message": "Success.", "redirect": self.__get_redirect_url(redirect)}
[docs] @web.expose def resend_verification(self, trans, **kwargs): """ Exposed function for use outside of the class. E.g. when user click on the resend link in the masthead. """ message, status = self.resend_activation_email(trans, None, None) if status: return trans.show_ok_message(message) else: return trans.show_error_message(message)
[docs] def resend_activation_email(self, trans, email, username): """ Function resends the verification email in case user wants to log in with an inactive account or he clicks the resend link. """ if email is None: # User is coming from outside registration form, load email from trans if not trans.user: return "No session found, cannot send activation email.", None email = trans.user.email if username is None: # User is coming from outside registration form, load email from trans username = trans.user.username is_activation_sent = self.user_manager.send_activation_email(trans, email, username) if is_activation_sent: message = f"This account has not been activated yet. The activation link has been sent again. Please check your email address <b>{escape(email)}</b> including the spam/trash folder. <a target=\"_top\" href=\"{url_for('/')}\">Return to the home page</a>." else: message = f"This account has not been activated yet but we are unable to send the activation link. Please contact your local Galaxy administrator. <a target=\"_top\" href=\"{url_for('/')}\">Return to the home page</a>." if trans.app.config.error_email_to is not None: message += f" Error contact: {trans.app.config.error_email_to}." return message, is_activation_sent
[docs] def is_outside_grace_period(self, trans, create_time): """ Function checks whether the user is outside the config-defined grace period for inactive accounts. """ # Activation is forced and the user is not active yet. Check the grace period. activation_grace_period = trans.app.config.activation_grace_period delta = timedelta(hours=int(activation_grace_period)) time_difference = datetime.utcnow() - create_time return time_difference > delta or activation_grace_period == 0
[docs] @web.expose @web.json def logout(self, trans, logout_all=False, **kwd): if message := trans.check_csrf_token(kwd): return self.message_exception(trans, message) # Since logging an event requires a session, we'll log prior to ending the session trans.log_event("User logged out") trans.handle_user_logout(logout_all=logout_all) success_response = {"message": "Success."} # This is a little weird as a response. if trans.app.config.use_remote_user and trans.app.config.remote_user_logout_href: success_response["redirect_uri"] = trans.app.config.remote_user_logout_href return success_response
[docs] @expose_api_anonymous_and_sessionless def create(self, trans, payload=None, **kwd): if not payload: payload = kwd message = trans.check_csrf_token(payload) if message: return self.message_exception(trans, message) user, message = self.user_manager.register(trans, **_filtered_registration_params_dict(payload)) if message: return self.message_exception(trans, message, sanitize=False) elif user and not trans.user_is_admin: trans.handle_user_login(user) trans.log_event("User created a new account") trans.log_event("User logged in") return {"message": "Success."}
[docs] @web.expose def activate(self, trans, **kwd): """ Check whether token fits the user and then activate the user's account. """ params = util.Params(kwd, sanitize=False) email = params.get("email", None) if email is not None: email = unquote(email) activation_token = params.get("activation_token", None) index_url = web.url_for(controller="root", action="index") if email is None or activation_token is None: # We don't have the email or activation_token, show error. return trans.show_error_message( f"You are using an invalid activation link. Try to log in and we will send you a new activation email. <br><a href='{index_url}'>Go to login page.</a>" ) else: # Find the user user = get_user_by_email(trans.sa_session, email) if not user: # Probably wrong email address return trans.show_error_message( f"You are using an invalid activation link. Try to log in and we will send you a new activation email. <br><a href='{index_url}'>Go to login page.</a>" ) # If the user is active already don't try to activate if user.active is True: return trans.show_ok_message( f"Your account is already active. Nothing has changed. <br><a href='{index_url}'>Go to login page.</a>" ) if user.activation_token == activation_token[:64]: user.activation_token = None self.user_manager.activate(user) return trans.show_ok_message( f"Your account has been successfully activated! <br><a href='{index_url}'>Go to login page.</a>" ) else: # Tokens don't match. Activation is denied. return trans.show_error_message( f"You are using an invalid activation link. Try to log in and we will send you a new activation email. <br><a href='{index_url}'>Go to login page.</a>" )
[docs] @expose_api_anonymous_and_sessionless def change_password(self, trans, payload=None, **kwd): """ Allows to change own password. :type payload: dict :param payload: dictionary structure containing: * id: encoded user id * current: current user password * token: temporary token to change password (instead of id and current) * password: new password * confirm: new password (confirmation) """ payload = payload or {} user, message = self.user_manager.change_password(trans, **payload) if user is None: return self.message_exception(trans, message) trans.handle_user_login(user) return {"message": "Password has been changed."}
[docs] @expose_api_anonymous_and_sessionless def reset_password(self, trans, payload=None, **kwd): """Reset the user's password. Send an email with token that allows a password change.""" payload = payload or {} if message := self.user_manager.send_reset_email(trans, payload): return self.message_exception(trans, message) return {"message": "Reset link has been sent to your email."}
def __get_redirect_url(self, redirect): if not redirect or redirect == "None": return None root_url = url_for("/", qualified=True) # compare urls, to prevent a redirect from pointing (directly) outside of galaxy # or to enter a logout/login loop if not util.compare_urls(root_url, redirect, compare_path=False) or util.compare_urls( url_for(controller="user", action="logout", qualified=True), redirect ): log.warning("Redirect URL is outside of Galaxy, will redirect to Galaxy root instead: %s", redirect) redirect = root_url elif util.compare_urls(url_for(controller="user", action="logout", qualified=True), redirect): redirect = root_url return redirect