Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.tool_shed.controllers.user

from datetime import datetime

from markupsafe import escape
from sqlalchemy import (
    and_,
    true
)

from galaxy import (
    util,
    web
)
from galaxy.security.validate_user_input import validate_email, validate_password, validate_publicname
from galaxy.web import url_for
from galaxy.webapps.galaxy.controllers.user import User as BaseUser


[docs]class User(BaseUser):
[docs] @web.expose def index(self, trans, cntrller='user', **kwd): return trans.fill_template('/webapps/tool_shed/user/index.mako', cntrller=cntrller)
[docs] @web.expose def manage_user_info(self, trans, cntrller, **kwd): '''Manage a user's login, password, public username, type, addresses, etc.''' params = util.Params(kwd) user_id = params.get('id', None) if user_id: user = trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(user_id)) else: user = trans.user if not user: raise AssertionError("The user id (%s) is not valid" % str(user_id)) email = util.restore_text(params.get('email', user.email)) username = util.restore_text(params.get('username', '')) if not username: username = user.username message = escape(util.restore_text(params.get('message', ''))) status = params.get('status', 'done') return trans.fill_template('/webapps/tool_shed/user/manage_info.mako', cntrller=cntrller, user=user, email=email, username=username, message=message, status=status)
[docs] @web.expose @web.require_login() def api_keys(self, trans, cntrller, **kwd): params = util.Params(kwd) message = escape(util.restore_text(params.get('message', ''))) status = params.get('status', 'done') if params.get('new_api_key_button', False): self.create_api_key(trans, trans.user) message = "Generated a new web API key" status = "done" return trans.fill_template('/webapps/tool_shed/user/api_keys.mako', cntrller=cntrller, user=trans.user, message=message, status=status)
# For REMOTE_USER, we need the ability to just edit the username
[docs] @web.expose @web.require_login("to manage the public name") def edit_username(self, trans, cntrller, **kwd): params = util.Params(kwd) is_admin = cntrller == 'admin' and trans.user_is_admin message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') user_id = params.get('user_id', None) if user_id and is_admin: user = trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(user_id)) else: user = trans.user if user and params.get('change_username_button', False): username = kwd.get('username', '') if username: message = validate_publicname(trans, username, user) if message: status = 'error' else: user.username = username trans.sa_session.add(user) trans.sa_session.flush() message = 'The username has been updated with the changes.' return trans.fill_template('/webapps/tool_shed/user/username.mako', cntrller=cntrller, user=user, username=user.username, message=message, status=status)
[docs] @web.expose def edit_info(self, trans, cntrller, **kwd): """ Edit user information = username, email or password. """ params = util.Params(kwd) is_admin = cntrller == 'admin' and trans.user_is_admin message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') user_id = params.get('user_id', None) if user_id and is_admin: user = trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(user_id)) elif user_id and (not trans.user or trans.user.id != trans.security.decode_id(user_id)): message = 'Invalid user id' status = 'error' user = None else: user = trans.user if user and params.get('login_info_button', False): # Editing email and username email = util.restore_text(params.get('email', '')) username = util.restore_text(params.get('username', '')).lower() # Validate the new values for email and username message = validate_email(trans, email, user) if not message and username: message = validate_publicname(trans, username, user) if message: status = 'error' else: if (user.email != email): # The user's private role name must match the user's login ( email ) private_role = trans.app.security_agent.get_private_user_role(user) private_role.name = email private_role.description = 'Private role for ' + email # Change the email itself user.email = email trans.sa_session.add_all((user, private_role)) trans.sa_session.flush() if trans.webapp.name == 'galaxy' and trans.app.config.user_activation_on: user.active = False trans.sa_session.add(user) trans.sa_session.flush() is_activation_sent = self.send_verification_email(trans, user.email, user.username) if is_activation_sent: message = 'The login information has been updated with the changes.<br>Verification email has been sent to your new email address. Please verify it by clicking the activation link in the email.<br>Please check your spam/trash folder in case you cannot find the message.' else: message = 'Unable to send activation email, please contact your local Galaxy administrator.' if trans.app.config.error_email_to is not None: message += ' Contact: %s' % trans.app.config.error_email_to if (user.username != username): user.username = username trans.sa_session.add(user) trans.sa_session.flush() message = 'The login information has been updated with the changes.' elif user and params.get('edit_user_info_button', False): # Edit user information - webapp MUST BE 'galaxy' user_type_fd_id = params.get('user_type_fd_id', 'none') if user_type_fd_id not in ['none']: user_type_form_definition = trans.sa_session.query(trans.app.model.FormDefinition).get(trans.security.decode_id(user_type_fd_id)) elif user.values: user_type_form_definition = user.values.form_definition else: # User was created before any of the user_info forms were created user_type_form_definition = None if user_type_form_definition: values = self.get_form_values(trans, user, user_type_form_definition, **kwd) else: values = {} flush_needed = False if user.values: # Editing the user info of an existing user with existing user info user.values.content = values trans.sa_session.add(user.values) flush_needed = True elif values: form_values = trans.model.FormValues(user_type_form_definition, values) trans.sa_session.add(form_values) user.values = form_values flush_needed = True if flush_needed: trans.sa_session.add(user) trans.sa_session.flush() message = "The user information has been updated with the changes." if user and trans.webapp.name == 'galaxy' and is_admin: kwd['user_id'] = trans.security.encode_id(user.id) kwd['id'] = user_id if message: kwd['message'] = util.sanitize_text(message) if status: kwd['status'] = status return trans.response.send_redirect(web.url_for(controller='user', action='manage_user_info', cntrller=cntrller, **kwd))
[docs] @web.expose def change_password(self, trans, token=None, **kwd): """ Provides a form with which one can change their password. If token is provided, don't require current password. """ status = None message = kwd.get('message', '') user = None if kwd.get('change_password_button', False): password = kwd.get('password', '') confirm = kwd.get('confirm', '') current = kwd.get('current', '') token_result = None if token: # If a token was supplied, validate and set user token_result = trans.sa_session.query(trans.app.model.PasswordResetToken).get(token) if token_result and token_result.expiration_time > datetime.utcnow(): user = token_result.user else: return trans.show_error_message("Invalid or expired password reset token, please request a new one.") else: # The user is changing their own password, validate their current password (ok, message) = trans.app.auth_manager.check_change_password(trans.user, current) if ok: user = trans.user else: status = 'error' if user: # Validate the new password message = validate_password(trans, password, confirm) if message: status = 'error' else: # Save new password user.set_password_cleartext(password) # if we used a token, invalidate it and log the user in. if token_result: trans.handle_user_login(token_result.user) token_result.expiration_time = datetime.utcnow() trans.sa_session.add(token_result) # Invalidate all other sessions for other_galaxy_session in trans.sa_session.query(trans.app.model.GalaxySession) \ .filter(and_(trans.app.model.GalaxySession.table.c.user_id == user.id, trans.app.model.GalaxySession.table.c.is_valid == true(), trans.app.model.GalaxySession.table.c.id != trans.galaxy_session.id)): other_galaxy_session.is_valid = False trans.sa_session.add(other_galaxy_session) trans.sa_session.add(user) trans.sa_session.flush() trans.log_event("User change password") if kwd.get('display_top', False) == 'True': return trans.response.send_redirect(url_for('/', message='Password has been changed')) else: return trans.show_ok_message('The password has been changed and any other existing Galaxy sessions have been logged out (but jobs in histories in those sessions will not be interrupted).') return trans.fill_template('/webapps/tool_shed/user/change_password.mako', token=token, status=status, message=message, display_top=kwd.get('redirect_home', False))