Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.admin_util
import logging
import time
from typing import Optional
from sqlalchemy import false, func
from galaxy import util, web
from galaxy.security.validate_user_input import validate_password
from galaxy.util import inflector
from galaxy.util.hash_util import new_secure_hash
from galaxy.web.form_builder import CheckboxField
from galaxy.web.legacy_framework.grids import Grid, GridOperation
from tool_shed.util.web_util import escape
log = logging.getLogger(__name__)
compliance_log = logging.getLogger('COMPLIANCE')
[docs]class Admin:
    # Override these
    user_list_grid: Optional[Grid] = None
    role_list_grid: Optional[Grid] = None
    group_list_grid: Optional[Grid] = None
    delete_operation: Optional[GridOperation] = None
    undelete_operation: Optional[GridOperation] = None
    purge_operation: Optional[GridOperation] = None
[docs]    @web.expose
    @web.require_admin
    def index(self, trans, **kwd):
        message = escape(kwd.get('message', ''))
        status = kwd.get('status', 'done')
        return trans.fill_template('/webapps/tool_shed/admin/index.mako',
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def center(self, trans, **kwd):
        message = escape(kwd.get('message', ''))
        status = kwd.get('status', 'done')
        return trans.fill_template('/webapps/tool_shed/admin/center.mako',
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def roles(self, trans, **kwargs):
        if 'operation' in kwargs:
            operation = kwargs['operation'].lower().replace('+', ' ')
            if operation == "roles":
                return self.role(trans, **kwargs)
            if operation == "create":
                return self.create_role(trans, **kwargs)
            if operation == "delete":
                return self.mark_role_deleted(trans, **kwargs)
            if operation == "undelete":
                return self.undelete_role(trans, **kwargs)
            if operation == "purge":
                return self.purge_role(trans, **kwargs)
            if operation == "manage users and groups":
                return self.manage_users_and_groups_for_role(trans, **kwargs)
            if operation == "manage role associations":
                # This is currently used only in the Tool Shed.
                return self.manage_role_associations(trans, **kwargs)
            if operation == "rename":
                return self.rename_role(trans, **kwargs)
        # Render the list view
        return self.role_list_grid(trans, **kwargs)
[docs]    @web.expose
    @web.require_admin
    def create_role(self, trans, **kwd):
        params = util.Params(kwd)
        message = util.restore_text(params.get('message', ''))
        status = params.get('status', 'done')
        name = util.restore_text(params.get('name', ''))
        description = util.restore_text(params.get('description', ''))
        in_users = util.listify(params.get('in_users', []))
        out_users = util.listify(params.get('out_users', []))
        in_groups = util.listify(params.get('in_groups', []))
        out_groups = util.listify(params.get('out_groups', []))
        create_group_for_role = params.get('create_group_for_role', '')
        create_group_for_role_checked = CheckboxField.is_checked(create_group_for_role)
        ok = True
        if params.get('create_role_button', False):
            if not name or not description:
                message = "Enter a valid name and a description."
                status = 'error'
                ok = False
            elif trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == name).first():
                message = "Role names must be unique and a role with that name already exists, so choose another name."
                status = 'error'
                ok = False
            else:
                # Create the role
                role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN)
                trans.sa_session.add(role)
                # Create the UserRoleAssociations
                for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]:
                    ura = trans.app.model.UserRoleAssociation(user, role)
                    trans.sa_session.add(ura)
                # Create the GroupRoleAssociations
                for group in [trans.sa_session.query(trans.app.model.Group).get(x) for x in in_groups]:
                    gra = trans.app.model.GroupRoleAssociation(group, role)
                    trans.sa_session.add(gra)
                if create_group_for_role_checked:
                    # Create the group
                    group = trans.app.model.Group(name=name)
                    trans.sa_session.add(group)
                    # Associate the group with the role
                    gra = trans.model.GroupRoleAssociation(group, role)
                    trans.sa_session.add(gra)
                    num_in_groups = len(in_groups) + 1
                else:
                    num_in_groups = len(in_groups)
                trans.sa_session.flush()
                message = "Role '%s' has been created with %d associated users and %d associated groups.  " \
                    % (role.name, len(in_users), num_in_groups)
                if create_group_for_role_checked:
                    message += 'One of the groups associated with this role is the newly created group with the same name.'
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='roles',
                                                         message=util.sanitize_text(message),
                                                         status='done'))
        if ok:
            for user in trans.sa_session.query(trans.app.model.User) \
                                        .filter(trans.app.model.User.table.c.deleted == false()) \
                                        .order_by(trans.app.model.User.table.c.email):
                out_users.append((user.id, user.email))
            for group in trans.sa_session.query(trans.app.model.Group) \
                                         .filter(trans.app.model.Group.table.c.deleted == false()) \
                                         .order_by(trans.app.model.Group.table.c.name):
                out_groups.append((group.id, group.name))
        return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_create.mako',
                                   name=name,
                                   description=description,
                                   in_users=in_users,
                                   out_users=out_users,
                                   in_groups=in_groups,
                                   out_groups=out_groups,
                                   create_group_for_role_checked=create_group_for_role_checked,
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def rename_role(self, trans, **kwd):
        params = util.Params(kwd)
        message = util.restore_text(params.get('message', ''))
        status = params.get('status', 'done')
        id = params.get('id', None)
        if not id:
            message = "No role ids received for renaming"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='roles',
                                                     message=message,
                                                     status='error'))
        role = get_role(trans, id)
        if params.get('rename_role_button', False):
            old_name = role.name
            new_name = util.restore_text(params.name)
            new_description = util.restore_text(params.description)
            if not new_name:
                message = 'Enter a valid name'
                status = 'error'
            else:
                existing_role = trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == new_name).first()
                if existing_role and existing_role.id != role.id:
                    message = 'A role with that name already exists'
                    status = 'error'
                else:
                    if not (role.name == new_name and role.description == new_description):
                        role.name = new_name
                        role.description = new_description
                        trans.sa_session.add(role)
                        trans.sa_session.flush()
                        message = f"Role '{old_name}' has been renamed to '{new_name}'"
                    return trans.response.send_redirect(web.url_for(controller='admin',
                                                                    action='roles',
                                                                    message=util.sanitize_text(message),
                                                                    status='done'))
        return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_rename.mako',
                                   role=role,
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def manage_users_and_groups_for_role(self, trans, **kwd):
        params = util.Params(kwd)
        message = util.restore_text(params.get('message', ''))
        status = params.get('status', 'done')
        id = params.get('id', None)
        if not id:
            message = "No role ids received for managing users and groups"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='roles',
                                                     message=message,
                                                     status='error'))
        role = get_role(trans, id)
        if params.get('role_members_edit_button', False):
            in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)]
            if trans.webapp.name == 'galaxy':
                for ura in role.users:
                    user = trans.sa_session.query(trans.app.model.User).get(ura.user_id)
                    if user not in in_users:
                        # Delete DefaultUserPermissions for previously associated users that have been removed from the role
                        for dup in user.default_permissions:
                            if role == dup.role:
                                trans.sa_session.delete(dup)
                        # Delete DefaultHistoryPermissions for previously associated users that have been removed from the role
                        for history in user.histories:
                            for dhp in history.default_permissions:
                                if role == dhp.role:
                                    trans.sa_session.delete(dhp)
                        trans.sa_session.flush()
            in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(params.in_groups)]
            trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups)
            trans.sa_session.refresh(role)
            message = "Role '%s' has been updated with %d associated users and %d associated groups" % (role.name, len(in_users), len(in_groups))
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='roles',
                                                     message=util.sanitize_text(message),
                                                     status=status))
        in_users = []
        out_users = []
        in_groups = []
        out_groups = []
        for user in trans.sa_session.query(trans.app.model.User) \
                                    .filter(trans.app.model.User.table.c.deleted == false()) \
                                    .order_by(trans.app.model.User.table.c.email):
            if user in [x.user for x in role.users]:
                in_users.append((user.id, user.email))
            else:
                out_users.append((user.id, user.email))
        for group in trans.sa_session.query(trans.app.model.Group) \
                                     .filter(trans.app.model.Group.table.c.deleted == false()) \
                                     .order_by(trans.app.model.Group.table.c.name):
            if group in [x.group for x in role.groups]:
                in_groups.append((group.id, group.name))
            else:
                out_groups.append((group.id, group.name))
        library_dataset_actions = {}
        if trans.webapp.name == 'galaxy' and len(role.dataset_actions) < 25:
            # Build a list of tuples that are LibraryDatasetDatasetAssociationss followed by a list of actions
            # whose DatasetPermissions is associated with the Role
            # [ ( LibraryDatasetDatasetAssociation [ action, action ] ) ]
            for dp in role.dataset_actions:
                for ldda in trans.sa_session.query(trans.app.model.LibraryDatasetDatasetAssociation) \
                                            .filter(trans.app.model.LibraryDatasetDatasetAssociation.dataset_id == dp.dataset_id):
                    root_found = False
                    folder_path = ''
                    folder = ldda.library_dataset.folder
                    while not root_found:
                        folder_path = f'{folder.name} / {folder_path}'
                        if not folder.parent:
                            root_found = True
                        else:
                            folder = folder.parent
                    folder_path = f'{folder_path} {ldda.name}'
                    library = trans.sa_session.query(trans.app.model.Library) \
                                              .filter(trans.app.model.Library.table.c.root_folder_id == folder.id) \
                                              .first()
                    if library not in library_dataset_actions:
                        library_dataset_actions[library] = {}
                    try:
                        library_dataset_actions[library][folder_path].append(dp.action)
                    except Exception:
                        library_dataset_actions[library][folder_path] = [dp.action]
        else:
            message = "Not showing associated datasets, there are too many."
            status = 'info'
        return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role.mako',
                                   role=role,
                                   in_users=in_users,
                                   out_users=out_users,
                                   in_groups=in_groups,
                                   out_groups=out_groups,
                                   library_dataset_actions=library_dataset_actions,
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def mark_role_deleted(self, trans, **kwd):
        id = kwd.get('id', None)
        if not id:
            message = "No role ids received for deleting"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='roles',
                                                     message=message,
                                                     status='error'))
        ids = util.listify(id)
        message = "Deleted %d roles: " % len(ids)
        for role_id in ids:
            role = get_role(trans, role_id)
            role.deleted = True
            trans.sa_session.add(role)
            trans.sa_session.flush()
            message += f" {role.name} "
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='roles',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def undelete_role(self, trans, **kwd):
        id = kwd.get('id', None)
        if not id:
            message = "No role ids received for undeleting"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='roles',
                                                     message=message,
                                                     status='error'))
        ids = util.listify(id)
        count = 0
        undeleted_roles = ""
        for role_id in ids:
            role = get_role(trans, role_id)
            if not role.deleted:
                message = f"Role '{role.name}' has not been deleted, so it cannot be undeleted."
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='roles',
                                                         message=util.sanitize_text(message),
                                                         status='error'))
            role.deleted = False
            trans.sa_session.add(role)
            trans.sa_session.flush()
            count += 1
            undeleted_roles += f" {role.name}"
        message = "Undeleted %d roles: %s" % (count, undeleted_roles)
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='roles',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def purge_role(self, trans, **kwd):
        # This method should only be called for a Role that has previously been deleted.
        # Purging a deleted Role deletes all of the following from the database:
        # - UserRoleAssociations where role_id == Role.id
        # - DefaultUserPermissions where role_id == Role.id
        # - DefaultHistoryPermissions where role_id == Role.id
        # - GroupRoleAssociations where role_id == Role.id
        # - DatasetPermissionss where role_id == Role.id
        id = kwd.get('id', None)
        if not id:
            message = "No role ids received for purging"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='roles',
                                                     message=util.sanitize_text(message),
                                                     status='error'))
        ids = util.listify(id)
        message = "Purged %d roles: " % len(ids)
        for role_id in ids:
            role = get_role(trans, role_id)
            if not role.deleted:
                message = f"Role '{role.name}' has not been deleted, so it cannot be purged."
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='roles',
                                                         message=util.sanitize_text(message),
                                                         status='error'))
            # Delete UserRoleAssociations
            for ura in role.users:
                user = trans.sa_session.query(trans.app.model.User).get(ura.user_id)
                # Delete DefaultUserPermissions for associated users
                for dup in user.default_permissions:
                    if role == dup.role:
                        trans.sa_session.delete(dup)
                # Delete DefaultHistoryPermissions for associated users
                for history in user.histories:
                    for dhp in history.default_permissions:
                        if role == dhp.role:
                            trans.sa_session.delete(dhp)
                trans.sa_session.delete(ura)
            # Delete GroupRoleAssociations
            for gra in role.groups:
                trans.sa_session.delete(gra)
            # Delete DatasetPermissionss
            for dp in role.dataset_actions:
                trans.sa_session.delete(dp)
            trans.sa_session.flush()
            message += f" {role.name} "
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='roles',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def groups(self, trans, **kwargs):
        if 'operation' in kwargs:
            operation = kwargs['operation'].lower().replace('+', ' ')
            if operation == "groups":
                return self.group(trans, **kwargs)
            if operation == "create":
                return self.create_group(trans, **kwargs)
            if operation == "delete":
                return self.mark_group_deleted(trans, **kwargs)
            if operation == "undelete":
                return self.undelete_group(trans, **kwargs)
            if operation == "purge":
                return self.purge_group(trans, **kwargs)
            if operation == "manage users and roles":
                return self.manage_users_and_roles_for_group(trans, **kwargs)
            if operation == "rename":
                return self.rename_group(trans, **kwargs)
        # Render the list view
        return self.group_list_grid(trans, **kwargs)
[docs]    @web.expose
    @web.require_admin
    def rename_group(self, trans, **kwd):
        params = util.Params(kwd)
        message = util.restore_text(params.get('message', ''))
        status = params.get('status', 'done')
        id = params.get('id', None)
        if not id:
            message = "No group ids received for renaming"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='groups',
                                                     message=message,
                                                     status='error'))
        group = get_group(trans, id)
        if params.get('rename_group_button', False):
            old_name = group.name
            new_name = util.restore_text(params.name)
            if not new_name:
                message = 'Enter a valid name'
                status = 'error'
            else:
                existing_group = trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == new_name).first()
                if existing_group and existing_group.id != group.id:
                    message = 'A group with that name already exists'
                    status = 'error'
                else:
                    if group.name != new_name:
                        group.name = new_name
                        trans.sa_session.add(group)
                        trans.sa_session.flush()
                        message = f"Group '{old_name}' has been renamed to '{new_name}'"
                    return trans.response.send_redirect(web.url_for(controller='admin',
                                                                    action='groups',
                                                                    message=util.sanitize_text(message),
                                                                    status='done'))
        return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_rename.mako',
                                   group=group,
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def manage_users_and_roles_for_group(self, trans, **kwd):
        params = util.Params(kwd)
        message = util.restore_text(params.get('message', ''))
        status = params.get('status', 'done')
        group = get_group(trans, params.id)
        if params.get('group_roles_users_edit_button', False):
            in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(params.in_roles)]
            in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)]
            trans.app.security_agent.set_entity_group_associations(groups=[group], roles=in_roles, users=in_users)
            trans.sa_session.refresh(group)
            message += "Group '%s' has been updated with %d associated roles and %d associated users" % (group.name, len(in_roles), len(in_users))
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='groups',
                                                     message=util.sanitize_text(message),
                                                     status=status))
        in_roles = []
        out_roles = []
        in_users = []
        out_users = []
        for role in trans.sa_session.query(trans.app.model.Role) \
                                    .filter(trans.app.model.Role.table.c.deleted == false()) \
                                    .order_by(trans.app.model.Role.table.c.name):
            if role in [x.role for x in group.roles]:
                in_roles.append((role.id, role.name))
            else:
                out_roles.append((role.id, role.name))
        for user in trans.sa_session.query(trans.app.model.User) \
                                    .filter(trans.app.model.User.table.c.deleted == false()) \
                                    .order_by(trans.app.model.User.table.c.email):
            if user in [x.user for x in group.users]:
                in_users.append((user.id, user.email))
            else:
                out_users.append((user.id, user.email))
        message += 'Group %s is currently associated with %d roles and %d users' % (group.name, len(in_roles), len(in_users))
        return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group.mako',
                                   group=group,
                                   in_roles=in_roles,
                                   out_roles=out_roles,
                                   in_users=in_users,
                                   out_users=out_users,
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def create_group(self, trans, **kwd):
        params = util.Params(kwd)
        message = util.restore_text(params.get('message', ''))
        status = params.get('status', 'done')
        name = util.restore_text(params.get('name', ''))
        in_users = util.listify(params.get('in_users', []))
        out_users = util.listify(params.get('out_users', []))
        in_roles = util.listify(params.get('in_roles', []))
        out_roles = util.listify(params.get('out_roles', []))
        create_role_for_group = params.get('create_role_for_group', '')
        create_role_for_group_checked = CheckboxField.is_checked(create_role_for_group)
        ok = True
        if params.get('create_group_button', False):
            if not name:
                message = "Enter a valid name."
                status = 'error'
                ok = False
            elif trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == name).first():
                message = "Group names must be unique and a group with that name already exists, so choose another name."
                status = 'error'
                ok = False
            else:
                # Create the group
                group = trans.app.model.Group(name=name)
                trans.sa_session.add(group)
                trans.sa_session.flush()
                # Create the UserRoleAssociations
                for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]:
                    uga = trans.app.model.UserGroupAssociation(user, group)
                    trans.sa_session.add(uga)
                # Create the GroupRoleAssociations
                for role in [trans.sa_session.query(trans.app.model.Role).get(x) for x in in_roles]:
                    gra = trans.app.model.GroupRoleAssociation(group, role)
                    trans.sa_session.add(gra)
                if create_role_for_group_checked:
                    # Create the role
                    role = trans.app.model.Role(name=name, description=f'Role for group {name}')
                    trans.sa_session.add(role)
                    # Associate the role with the group
                    gra = trans.model.GroupRoleAssociation(group, role)
                    trans.sa_session.add(gra)
                    num_in_roles = len(in_roles) + 1
                else:
                    num_in_roles = len(in_roles)
                trans.sa_session.flush()
                message = "Group '%s' has been created with %d associated users and %d associated roles.  " \
                    % (group.name, len(in_users), num_in_roles)
                if create_role_for_group_checked:
                    message += 'One of the roles associated with this group is the newly created role with the same name.'
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='groups',
                                                         message=util.sanitize_text(message),
                                                         status='done'))
        if ok:
            for user in trans.sa_session.query(trans.app.model.User) \
                                        .filter(trans.app.model.User.table.c.deleted == false()) \
                                        .order_by(trans.app.model.User.table.c.email):
                out_users.append((user.id, user.email))
            for role in trans.sa_session.query(trans.app.model.Role) \
                                        .filter(trans.app.model.Role.table.c.deleted == false()) \
                                        .order_by(trans.app.model.Role.table.c.name):
                out_roles.append((role.id, role.name))
        return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_create.mako',
                                   name=name,
                                   in_users=in_users,
                                   out_users=out_users,
                                   in_roles=in_roles,
                                   out_roles=out_roles,
                                   create_role_for_group_checked=create_role_for_group_checked,
                                   message=message,
                                   status=status)
[docs]    @web.expose
    @web.require_admin
    def mark_group_deleted(self, trans, **kwd):
        params = util.Params(kwd)
        id = params.get('id', None)
        if not id:
            message = "No group ids received for marking deleted"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='groups',
                                                     message=message,
                                                     status='error'))
        ids = util.listify(id)
        message = "Deleted %d groups: " % len(ids)
        for group_id in ids:
            group = get_group(trans, group_id)
            group.deleted = True
            trans.sa_session.add(group)
            trans.sa_session.flush()
            message += f" {group.name} "
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='groups',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def undelete_group(self, trans, **kwd):
        id = kwd.get('id', None)
        if not id:
            message = "No group ids received for undeleting"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='groups',
                                                     message=message,
                                                     status='error'))
        ids = util.listify(id)
        count = 0
        undeleted_groups = ""
        for group_id in ids:
            group = get_group(trans, group_id)
            if not group.deleted:
                message = f"Group '{group.name}' has not been deleted, so it cannot be undeleted."
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='groups',
                                                         message=util.sanitize_text(message),
                                                         status='error'))
            group.deleted = False
            trans.sa_session.add(group)
            trans.sa_session.flush()
            count += 1
            undeleted_groups += f" {group.name}"
        message = "Undeleted %d groups: %s" % (count, undeleted_groups)
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='groups',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def purge_group(self, trans, **kwd):
        # This method should only be called for a Group that has previously been deleted.
        # Purging a deleted Group simply deletes all UserGroupAssociations and GroupRoleAssociations.
        id = kwd.get('id', None)
        if not id:
            message = "No group ids received for purging"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='groups',
                                                     message=util.sanitize_text(message),
                                                     status='error'))
        ids = util.listify(id)
        message = "Purged %d groups: " % len(ids)
        for group_id in ids:
            group = get_group(trans, group_id)
            if not group.deleted:
                # We should never reach here, but just in case there is a bug somewhere...
                message = f"Group '{group.name}' has not been deleted, so it cannot be purged."
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='groups',
                                                         message=util.sanitize_text(message),
                                                         status='error'))
            # Delete UserGroupAssociations
            for uga in group.users:
                trans.sa_session.delete(uga)
            # Delete GroupRoleAssociations
            for gra in group.roles:
                trans.sa_session.delete(gra)
            trans.sa_session.flush()
            message += f" {group.name} "
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='groups',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def create_new_user(self, trans, **kwd):
        return trans.response.send_redirect(web.url_for(controller='user',
                                                        action='create',
                                                        cntrller='admin'))
[docs]    @web.expose
    @web.require_admin
    def reset_user_password(self, trans, **kwd):
        user_id = kwd.get('id', None)
        if not user_id:
            message = "No users received for resetting passwords."
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='users',
                                                     message=message,
                                                     status='error'))
        user_ids = util.listify(user_id)
        if 'reset_user_password_button' in kwd:
            message = ''
            status = ''
            for user_id in user_ids:
                user = get_user(trans, user_id)
                password = kwd.get('password', None)
                confirm = kwd.get('confirm', None)
                message = validate_password(trans, password, confirm)
                if message:
                    status = 'error'
                    break
                else:
                    user.set_password_cleartext(password)
                    trans.sa_session.add(user)
                    trans.sa_session.flush()
            if not message and not status:
                message = "Passwords reset for %d %s." % (len(user_ids), inflector.cond_plural(len(user_ids), 'user'))
                status = 'done'
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='users',
                                                     message=util.sanitize_text(message),
                                                     status=status))
        users = [get_user(trans, user_id) for user_id in user_ids]
        if len(user_ids) > 1:
            user_id = ','.join(user_ids)
        return trans.fill_template('/webapps/tool_shed/admin/user/reset_password.mako',
                                   id=user_id,
                                   users=users,
                                   password='',
                                   confirm='')
[docs]    @web.expose
    @web.require_admin
    def mark_user_deleted(self, trans, **kwd):
        id = kwd.get('id', None)
        if not id:
            message = "No user ids received for deleting"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='users',
                                                     message=message,
                                                     status='error'))
        ids = util.listify(id)
        message = "Deleted %d users: " % len(ids)
        for user_id in ids:
            user = get_user(trans, user_id)
            user.deleted = True
            compliance_log.info(f'delete-user-event: {user_id}')
            # See lib/galaxy/webapps/tool_shed/controllers/admin.py
            pseudorandom_value = str(int(time.time()))
            email_hash = new_secure_hash(user.email + pseudorandom_value)
            uname_hash = new_secure_hash(user.username + pseudorandom_value)
            for role in user.all_roles():
                print(role, self.app.config.redact_username_during_deletion, self.app.config.redact_email_during_deletion)
                if self.app.config.redact_username_during_deletion:
                    role.name = role.name.replace(user.username, uname_hash)
                    role.description = role.description.replace(user.username, uname_hash)
                if self.app.config.redact_email_during_deletion:
                    role.name = role.name.replace(user.email, email_hash)
                    role.description = role.description.replace(user.email, email_hash)
            if self.app.config.redact_email_during_deletion:
                user.email = email_hash
            if self.app.config.redact_username_during_deletion:
                user.username = uname_hash
            trans.sa_session.add(user)
            trans.sa_session.flush()
            message += f" {user.email} "
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='users',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def undelete_user(self, trans, **kwd):
        id = kwd.get('id', None)
        if not id:
            message = "No user ids received for undeleting"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='users',
                                                     message=message,
                                                     status='error'))
        ids = util.listify(id)
        count = 0
        undeleted_users = ""
        for user_id in ids:
            user = get_user(trans, user_id)
            if not user.deleted:
                message = f"User '{user.email}' has not been deleted, so it cannot be undeleted."
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='users',
                                                         message=util.sanitize_text(message),
                                                         status='error'))
            user.deleted = False
            trans.sa_session.add(user)
            trans.sa_session.flush()
            count += 1
            undeleted_users += f" {user.email}"
        message = "Undeleted %d users: %s" % (count, undeleted_users)
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='users',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def purge_user(self, trans, **kwd):
        # This method should only be called for a User that has previously been deleted.
        # We keep the User in the database ( marked as purged ), and stuff associated
        # with the user's private role in case we want the ability to unpurge the user
        # some time in the future.
        # Purging a deleted User deletes all of the following:
        # - History where user_id = User.id
        #    - HistoryDatasetAssociation where history_id = History.id
        #    - Dataset where HistoryDatasetAssociation.dataset_id = Dataset.id
        # - UserGroupAssociation where user_id == User.id
        # - UserRoleAssociation where user_id == User.id EXCEPT FOR THE PRIVATE ROLE
        # - UserAddress where user_id == User.id
        # Purging Histories and Datasets must be handled via the cleanup_datasets.py script
        id = kwd.get('id', None)
        if not id:
            message = "No user ids received for purging"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='users',
                                                     message=util.sanitize_text(message),
                                                     status='error'))
        ids = util.listify(id)
        message = "Purged %d users: " % len(ids)
        for user_id in ids:
            user = get_user(trans, user_id)
            if not user.deleted:
                # We should never reach here, but just in case there is a bug somewhere...
                message = f"User '{user.email}' has not been deleted, so it cannot be purged."
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='users',
                                                         message=util.sanitize_text(message),
                                                         status='error'))
            private_role = trans.app.security_agent.get_private_user_role(user)
            # Delete History
            for h in user.active_histories:
                trans.sa_session.refresh(h)
                for hda in h.active_datasets:
                    # Delete HistoryDatasetAssociation
                    d = trans.sa_session.query(trans.app.model.Dataset).get(hda.dataset_id)
                    # Delete Dataset
                    if not d.deleted:
                        d.deleted = True
                        trans.sa_session.add(d)
                    hda.deleted = True
                    trans.sa_session.add(hda)
                h.deleted = True
                trans.sa_session.add(h)
            # Delete UserGroupAssociations
            for uga in user.groups:
                trans.sa_session.delete(uga)
            # Delete UserRoleAssociations EXCEPT FOR THE PRIVATE ROLE
            for ura in user.roles:
                if ura.role_id != private_role.id:
                    trans.sa_session.delete(ura)
            # Delete UserAddresses
            for address in user.addresses:
                trans.sa_session.delete(address)
            # Purge the user
            user.purged = True
            trans.sa_session.add(user)
            trans.sa_session.flush()
            message += f"{user.email} "
        trans.response.send_redirect(web.url_for(controller='admin',
                                                 action='users',
                                                 message=util.sanitize_text(message),
                                                 status='done'))
[docs]    @web.expose
    @web.require_admin
    def users(self, trans, **kwd):
        if 'operation' in kwd:
            operation = kwd['operation'].lower()
            if operation == "roles":
                return self.user(trans, **kwd)
            elif operation == "reset password":
                return self.reset_user_password(trans, **kwd)
            elif operation == "delete":
                return self.mark_user_deleted(trans, **kwd)
            elif operation == "undelete":
                return self.undelete_user(trans, **kwd)
            elif operation == "purge":
                return self.purge_user(trans, **kwd)
            elif operation == "create":
                return self.create_new_user(trans, **kwd)
            elif operation == "manage roles and groups":
                return self.manage_roles_and_groups_for_user(trans, **kwd)
        if trans.app.config.allow_user_deletion:
            if self.delete_operation not in self.user_list_grid.operations:
                self.user_list_grid.operations.append(self.delete_operation)
            if self.undelete_operation not in self.user_list_grid.operations:
                self.user_list_grid.operations.append(self.undelete_operation)
            if self.purge_operation not in self.user_list_grid.operations:
                self.user_list_grid.operations.append(self.purge_operation)
        # Render the list view
        return self.user_list_grid(trans, **kwd)
[docs]    @web.expose
    @web.require_admin
    def name_autocomplete_data(self, trans, q=None, limit=None, timestamp=None):
        """Return autocomplete data for user emails"""
        ac_data = ""
        for user in trans.sa_session.query(trans.app.model.User).filter_by(deleted=False).filter(func.lower(trans.app.model.User.email).like(f"{q.lower()}%")):
            ac_data = f"{ac_data + user.email}\n"
        return ac_data
[docs]    @web.expose
    @web.require_admin
    def manage_roles_and_groups_for_user(self, trans, **kwd):
        user_id = kwd.get('id', None)
        message = ''
        status = ''
        if not user_id:
            message += f"Invalid user id ({str(user_id)}) received"
            trans.response.send_redirect(web.url_for(controller='admin',
                                                     action='users',
                                                     message=util.sanitize_text(message),
                                                     status='error'))
        user = get_user(trans, user_id)
        private_role = trans.app.security_agent.get_private_user_role(user)
        if kwd.get('user_roles_groups_edit_button', False):
            # Make sure the user is not dis-associating himself from his private role
            out_roles = kwd.get('out_roles', [])
            if out_roles:
                out_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(out_roles)]
            if private_role in out_roles:
                message += "You cannot eliminate a user's private role association.  "
                status = 'error'
            in_roles = kwd.get('in_roles', [])
            if in_roles:
                in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(in_roles)]
            out_groups = kwd.get('out_groups', [])
            if out_groups:
                out_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(out_groups)]
            in_groups = kwd.get('in_groups', [])
            if in_groups:
                in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(in_groups)]
            if in_roles:
                trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups)
                trans.sa_session.refresh(user)
                message += "User '%s' has been updated with %d associated roles and %d associated groups (private roles are not displayed)" % \
                    (user.email, len(in_roles), len(in_groups))
                trans.response.send_redirect(web.url_for(controller='admin',
                                                         action='users',
                                                         message=util.sanitize_text(message),
                                                         status='done'))
        in_roles = []
        out_roles = []
        in_groups = []
        out_groups = []
        for role in trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.deleted == false()) \
                .order_by(trans.app.model.Role.table.c.name):
            if role in [x.role for x in user.roles]:
                in_roles.append((role.id, role.name))
            elif role.type != trans.app.model.Role.types.PRIVATE:
                # There is a 1 to 1 mapping between a user and a PRIVATE role, so private roles should
                # not be listed in the roles form fields, except for the currently selected user's private
                # role, which should always be in in_roles.  The check above is added as an additional
                # precaution, since for a period of time we were including private roles in the form fields.
                out_roles.append((role.id, role.name))
        for group in trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.deleted == false()) \
                .order_by(trans.app.model.Group.table.c.name):
            if group in [x.group for x in user.groups]:
                in_groups.append((group.id, group.name))
            else:
                out_groups.append((group.id, group.name))
        message += "User '%s' is currently associated with %d roles and is a member of %d groups" % \
            (user.email, len(in_roles), len(in_groups))
        if not status:
            status = 'done'
        return trans.fill_template('/webapps/tool_shed/admin/user/user.mako',
                                   user=user,
                                   in_roles=in_roles,
                                   out_roles=out_roles,
                                   in_groups=in_groups,
                                   out_groups=out_groups,
                                   message=message,
                                   status=status)
# ---- Utility methods -------------------------------------------------------
[docs]def get_user(trans, user_id):
    """Get a User from the database by id."""
    user = trans.sa_session.query(trans.model.User).get(trans.security.decode_id(user_id))
    if not user:
        return trans.show_error_message(f"User not found for id ({str(user_id)})")
    return user
[docs]def get_role(trans, id):
    """Get a Role from the database by id."""
    # Load user from database
    id = trans.security.decode_id(id)
    role = trans.sa_session.query(trans.model.Role).get(id)
    if not role:
        return trans.show_error_message(f"Role not found for id ({str(id)})")
    return role
[docs]def get_group(trans, id):
    """Get a Group from the database by id."""
    # Load user from database
    id = trans.security.decode_id(id)
    group = trans.sa_session.query(trans.model.Group).get(id)
    if not group:
        return trans.show_error_message(f"Group not found for id ({str(id)})")
    return group