Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.admin_util

import logging
import time
from typing import Optional

from sqlalchemy import false, func

from galaxy import util, web
from galaxy.security.validate_user_input import validate_password
from galaxy.util import inflector
from galaxy.util.hash_util import new_secure_hash
from galaxy.web.form_builder import CheckboxField
from galaxy.web.legacy_framework.grids import Grid, GridOperation
from tool_shed.util.web_util import escape

log = logging.getLogger(__name__)
compliance_log = logging.getLogger('COMPLIANCE')


[docs]class Admin: # Override these user_list_grid: Optional[Grid] = None role_list_grid: Optional[Grid] = None group_list_grid: Optional[Grid] = None delete_operation: Optional[GridOperation] = None undelete_operation: Optional[GridOperation] = None purge_operation: Optional[GridOperation] = None
[docs] @web.expose @web.require_admin def index(self, trans, **kwd): message = escape(kwd.get('message', '')) status = kwd.get('status', 'done') return trans.fill_template('/webapps/tool_shed/admin/index.mako', message=message, status=status)
[docs] @web.expose @web.require_admin def center(self, trans, **kwd): message = escape(kwd.get('message', '')) status = kwd.get('status', 'done') return trans.fill_template('/webapps/tool_shed/admin/center.mako', message=message, status=status)
[docs] @web.expose @web.require_admin def roles(self, trans, **kwargs): if 'operation' in kwargs: operation = kwargs['operation'].lower().replace('+', ' ') if operation == "roles": return self.role(trans, **kwargs) if operation == "create": return self.create_role(trans, **kwargs) if operation == "delete": return self.mark_role_deleted(trans, **kwargs) if operation == "undelete": return self.undelete_role(trans, **kwargs) if operation == "purge": return self.purge_role(trans, **kwargs) if operation == "manage users and groups": return self.manage_users_and_groups_for_role(trans, **kwargs) if operation == "manage role associations": # This is currently used only in the Tool Shed. return self.manage_role_associations(trans, **kwargs) if operation == "rename": return self.rename_role(trans, **kwargs) # Render the list view return self.role_list_grid(trans, **kwargs)
[docs] @web.expose @web.require_admin def create_role(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') name = util.restore_text(params.get('name', '')) description = util.restore_text(params.get('description', '')) in_users = util.listify(params.get('in_users', [])) out_users = util.listify(params.get('out_users', [])) in_groups = util.listify(params.get('in_groups', [])) out_groups = util.listify(params.get('out_groups', [])) create_group_for_role = params.get('create_group_for_role', '') create_group_for_role_checked = CheckboxField.is_checked(create_group_for_role) ok = True if params.get('create_role_button', False): if not name or not description: message = "Enter a valid name and a description." status = 'error' ok = False elif trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == name).first(): message = "Role names must be unique and a role with that name already exists, so choose another name." status = 'error' ok = False else: # Create the role role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN) trans.sa_session.add(role) # Create the UserRoleAssociations for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]: ura = trans.app.model.UserRoleAssociation(user, role) trans.sa_session.add(ura) # Create the GroupRoleAssociations for group in [trans.sa_session.query(trans.app.model.Group).get(x) for x in in_groups]: gra = trans.app.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) if create_group_for_role_checked: # Create the group group = trans.app.model.Group(name=name) trans.sa_session.add(group) # Associate the group with the role gra = trans.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) num_in_groups = len(in_groups) + 1 else: num_in_groups = len(in_groups) trans.sa_session.flush() message = "Role '%s' has been created with %d associated users and %d associated groups. " \ % (role.name, len(in_users), num_in_groups) if create_group_for_role_checked: message += 'One of the groups associated with this role is the newly created group with the same name.' trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done')) if ok: for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): out_users.append((user.id, user.email)) for group in trans.sa_session.query(trans.app.model.Group) \ .filter(trans.app.model.Group.table.c.deleted == false()) \ .order_by(trans.app.model.Group.table.c.name): out_groups.append((group.id, group.name)) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_create.mako', name=name, description=description, in_users=in_users, out_users=out_users, in_groups=in_groups, out_groups=out_groups, create_group_for_role_checked=create_group_for_role_checked, message=message, status=status)
[docs] @web.expose @web.require_admin def rename_role(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') id = params.get('id', None) if not id: message = "No role ids received for renaming" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) role = get_role(trans, id) if params.get('rename_role_button', False): old_name = role.name new_name = util.restore_text(params.name) new_description = util.restore_text(params.description) if not new_name: message = 'Enter a valid name' status = 'error' else: existing_role = trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == new_name).first() if existing_role and existing_role.id != role.id: message = 'A role with that name already exists' status = 'error' else: if not (role.name == new_name and role.description == new_description): role.name = new_name role.description = new_description trans.sa_session.add(role) trans.sa_session.flush() message = f"Role '{old_name}' has been renamed to '{new_name}'" return trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done')) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_rename.mako', role=role, message=message, status=status)
[docs] @web.expose @web.require_admin def manage_users_and_groups_for_role(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') id = params.get('id', None) if not id: message = "No role ids received for managing users and groups" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) role = get_role(trans, id) if params.get('role_members_edit_button', False): in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)] if trans.webapp.name == 'galaxy': for ura in role.users: user = trans.sa_session.query(trans.app.model.User).get(ura.user_id) if user not in in_users: # Delete DefaultUserPermissions for previously associated users that have been removed from the role for dup in user.default_permissions: if role == dup.role: trans.sa_session.delete(dup) # Delete DefaultHistoryPermissions for previously associated users that have been removed from the role for history in user.histories: for dhp in history.default_permissions: if role == dhp.role: trans.sa_session.delete(dhp) trans.sa_session.flush() in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(params.in_groups)] trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups) trans.sa_session.refresh(role) message = "Role '%s' has been updated with %d associated users and %d associated groups" % (role.name, len(in_users), len(in_groups)) trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status=status)) in_users = [] out_users = [] in_groups = [] out_groups = [] for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): if user in [x.user for x in role.users]: in_users.append((user.id, user.email)) else: out_users.append((user.id, user.email)) for group in trans.sa_session.query(trans.app.model.Group) \ .filter(trans.app.model.Group.table.c.deleted == false()) \ .order_by(trans.app.model.Group.table.c.name): if group in [x.group for x in role.groups]: in_groups.append((group.id, group.name)) else: out_groups.append((group.id, group.name)) library_dataset_actions = {} if trans.webapp.name == 'galaxy' and len(role.dataset_actions) < 25: # Build a list of tuples that are LibraryDatasetDatasetAssociationss followed by a list of actions # whose DatasetPermissions is associated with the Role # [ ( LibraryDatasetDatasetAssociation [ action, action ] ) ] for dp in role.dataset_actions: for ldda in trans.sa_session.query(trans.app.model.LibraryDatasetDatasetAssociation) \ .filter(trans.app.model.LibraryDatasetDatasetAssociation.dataset_id == dp.dataset_id): root_found = False folder_path = '' folder = ldda.library_dataset.folder while not root_found: folder_path = f'{folder.name} / {folder_path}' if not folder.parent: root_found = True else: folder = folder.parent folder_path = f'{folder_path} {ldda.name}' library = trans.sa_session.query(trans.app.model.Library) \ .filter(trans.app.model.Library.table.c.root_folder_id == folder.id) \ .first() if library not in library_dataset_actions: library_dataset_actions[library] = {} try: library_dataset_actions[library][folder_path].append(dp.action) except Exception: library_dataset_actions[library][folder_path] = [dp.action] else: message = "Not showing associated datasets, there are too many." status = 'info' return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role.mako', role=role, in_users=in_users, out_users=out_users, in_groups=in_groups, out_groups=out_groups, library_dataset_actions=library_dataset_actions, message=message, status=status)
[docs] @web.expose @web.require_admin def mark_role_deleted(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No role ids received for deleting" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) ids = util.listify(id) message = "Deleted %d roles: " % len(ids) for role_id in ids: role = get_role(trans, role_id) role.deleted = True trans.sa_session.add(role) trans.sa_session.flush() message += f" {role.name} " trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def undelete_role(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No role ids received for undeleting" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) ids = util.listify(id) count = 0 undeleted_roles = "" for role_id in ids: role = get_role(trans, role_id) if not role.deleted: message = f"Role '{role.name}' has not been deleted, so it cannot be undeleted." trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='error')) role.deleted = False trans.sa_session.add(role) trans.sa_session.flush() count += 1 undeleted_roles += f" {role.name}" message = "Undeleted %d roles: %s" % (count, undeleted_roles) trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def purge_role(self, trans, **kwd): # This method should only be called for a Role that has previously been deleted. # Purging a deleted Role deletes all of the following from the database: # - UserRoleAssociations where role_id == Role.id # - DefaultUserPermissions where role_id == Role.id # - DefaultHistoryPermissions where role_id == Role.id # - GroupRoleAssociations where role_id == Role.id # - DatasetPermissionss where role_id == Role.id id = kwd.get('id', None) if not id: message = "No role ids received for purging" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='error')) ids = util.listify(id) message = "Purged %d roles: " % len(ids) for role_id in ids: role = get_role(trans, role_id) if not role.deleted: message = f"Role '{role.name}' has not been deleted, so it cannot be purged." trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='error')) # Delete UserRoleAssociations for ura in role.users: user = trans.sa_session.query(trans.app.model.User).get(ura.user_id) # Delete DefaultUserPermissions for associated users for dup in user.default_permissions: if role == dup.role: trans.sa_session.delete(dup) # Delete DefaultHistoryPermissions for associated users for history in user.histories: for dhp in history.default_permissions: if role == dhp.role: trans.sa_session.delete(dhp) trans.sa_session.delete(ura) # Delete GroupRoleAssociations for gra in role.groups: trans.sa_session.delete(gra) # Delete DatasetPermissionss for dp in role.dataset_actions: trans.sa_session.delete(dp) trans.sa_session.flush() message += f" {role.name} " trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def groups(self, trans, **kwargs): if 'operation' in kwargs: operation = kwargs['operation'].lower().replace('+', ' ') if operation == "groups": return self.group(trans, **kwargs) if operation == "create": return self.create_group(trans, **kwargs) if operation == "delete": return self.mark_group_deleted(trans, **kwargs) if operation == "undelete": return self.undelete_group(trans, **kwargs) if operation == "purge": return self.purge_group(trans, **kwargs) if operation == "manage users and roles": return self.manage_users_and_roles_for_group(trans, **kwargs) if operation == "rename": return self.rename_group(trans, **kwargs) # Render the list view return self.group_list_grid(trans, **kwargs)
[docs] @web.expose @web.require_admin def rename_group(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') id = params.get('id', None) if not id: message = "No group ids received for renaming" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=message, status='error')) group = get_group(trans, id) if params.get('rename_group_button', False): old_name = group.name new_name = util.restore_text(params.name) if not new_name: message = 'Enter a valid name' status = 'error' else: existing_group = trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == new_name).first() if existing_group and existing_group.id != group.id: message = 'A group with that name already exists' status = 'error' else: if group.name != new_name: group.name = new_name trans.sa_session.add(group) trans.sa_session.flush() message = f"Group '{old_name}' has been renamed to '{new_name}'" return trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done')) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_rename.mako', group=group, message=message, status=status)
[docs] @web.expose @web.require_admin def manage_users_and_roles_for_group(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') group = get_group(trans, params.id) if params.get('group_roles_users_edit_button', False): in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(params.in_roles)] in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)] trans.app.security_agent.set_entity_group_associations(groups=[group], roles=in_roles, users=in_users) trans.sa_session.refresh(group) message += "Group '%s' has been updated with %d associated roles and %d associated users" % (group.name, len(in_roles), len(in_users)) trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status=status)) in_roles = [] out_roles = [] in_users = [] out_users = [] for role in trans.sa_session.query(trans.app.model.Role) \ .filter(trans.app.model.Role.table.c.deleted == false()) \ .order_by(trans.app.model.Role.table.c.name): if role in [x.role for x in group.roles]: in_roles.append((role.id, role.name)) else: out_roles.append((role.id, role.name)) for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): if user in [x.user for x in group.users]: in_users.append((user.id, user.email)) else: out_users.append((user.id, user.email)) message += 'Group %s is currently associated with %d roles and %d users' % (group.name, len(in_roles), len(in_users)) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group.mako', group=group, in_roles=in_roles, out_roles=out_roles, in_users=in_users, out_users=out_users, message=message, status=status)
[docs] @web.expose @web.require_admin def create_group(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') name = util.restore_text(params.get('name', '')) in_users = util.listify(params.get('in_users', [])) out_users = util.listify(params.get('out_users', [])) in_roles = util.listify(params.get('in_roles', [])) out_roles = util.listify(params.get('out_roles', [])) create_role_for_group = params.get('create_role_for_group', '') create_role_for_group_checked = CheckboxField.is_checked(create_role_for_group) ok = True if params.get('create_group_button', False): if not name: message = "Enter a valid name." status = 'error' ok = False elif trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == name).first(): message = "Group names must be unique and a group with that name already exists, so choose another name." status = 'error' ok = False else: # Create the group group = trans.app.model.Group(name=name) trans.sa_session.add(group) trans.sa_session.flush() # Create the UserRoleAssociations for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]: uga = trans.app.model.UserGroupAssociation(user, group) trans.sa_session.add(uga) # Create the GroupRoleAssociations for role in [trans.sa_session.query(trans.app.model.Role).get(x) for x in in_roles]: gra = trans.app.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) if create_role_for_group_checked: # Create the role role = trans.app.model.Role(name=name, description=f'Role for group {name}') trans.sa_session.add(role) # Associate the role with the group gra = trans.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) num_in_roles = len(in_roles) + 1 else: num_in_roles = len(in_roles) trans.sa_session.flush() message = "Group '%s' has been created with %d associated users and %d associated roles. " \ % (group.name, len(in_users), num_in_roles) if create_role_for_group_checked: message += 'One of the roles associated with this group is the newly created role with the same name.' trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done')) if ok: for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): out_users.append((user.id, user.email)) for role in trans.sa_session.query(trans.app.model.Role) \ .filter(trans.app.model.Role.table.c.deleted == false()) \ .order_by(trans.app.model.Role.table.c.name): out_roles.append((role.id, role.name)) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_create.mako', name=name, in_users=in_users, out_users=out_users, in_roles=in_roles, out_roles=out_roles, create_role_for_group_checked=create_role_for_group_checked, message=message, status=status)
[docs] @web.expose @web.require_admin def mark_group_deleted(self, trans, **kwd): params = util.Params(kwd) id = params.get('id', None) if not id: message = "No group ids received for marking deleted" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=message, status='error')) ids = util.listify(id) message = "Deleted %d groups: " % len(ids) for group_id in ids: group = get_group(trans, group_id) group.deleted = True trans.sa_session.add(group) trans.sa_session.flush() message += f" {group.name} " trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def undelete_group(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No group ids received for undeleting" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=message, status='error')) ids = util.listify(id) count = 0 undeleted_groups = "" for group_id in ids: group = get_group(trans, group_id) if not group.deleted: message = f"Group '{group.name}' has not been deleted, so it cannot be undeleted." trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='error')) group.deleted = False trans.sa_session.add(group) trans.sa_session.flush() count += 1 undeleted_groups += f" {group.name}" message = "Undeleted %d groups: %s" % (count, undeleted_groups) trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def purge_group(self, trans, **kwd): # This method should only be called for a Group that has previously been deleted. # Purging a deleted Group simply deletes all UserGroupAssociations and GroupRoleAssociations. id = kwd.get('id', None) if not id: message = "No group ids received for purging" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='error')) ids = util.listify(id) message = "Purged %d groups: " % len(ids) for group_id in ids: group = get_group(trans, group_id) if not group.deleted: # We should never reach here, but just in case there is a bug somewhere... message = f"Group '{group.name}' has not been deleted, so it cannot be purged." trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='error')) # Delete UserGroupAssociations for uga in group.users: trans.sa_session.delete(uga) # Delete GroupRoleAssociations for gra in group.roles: trans.sa_session.delete(gra) trans.sa_session.flush() message += f" {group.name} " trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def create_new_user(self, trans, **kwd): return trans.response.send_redirect(web.url_for(controller='user', action='create', cntrller='admin'))
[docs] @web.expose @web.require_admin def reset_user_password(self, trans, **kwd): user_id = kwd.get('id', None) if not user_id: message = "No users received for resetting passwords." trans.response.send_redirect(web.url_for(controller='admin', action='users', message=message, status='error')) user_ids = util.listify(user_id) if 'reset_user_password_button' in kwd: message = '' status = '' for user_id in user_ids: user = get_user(trans, user_id) password = kwd.get('password', None) confirm = kwd.get('confirm', None) message = validate_password(trans, password, confirm) if message: status = 'error' break else: user.set_password_cleartext(password) trans.sa_session.add(user) trans.sa_session.flush() if not message and not status: message = "Passwords reset for %d %s." % (len(user_ids), inflector.cond_plural(len(user_ids), 'user')) status = 'done' trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status=status)) users = [get_user(trans, user_id) for user_id in user_ids] if len(user_ids) > 1: user_id = ','.join(user_ids) return trans.fill_template('/webapps/tool_shed/admin/user/reset_password.mako', id=user_id, users=users, password='', confirm='')
[docs] @web.expose @web.require_admin def mark_user_deleted(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No user ids received for deleting" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=message, status='error')) ids = util.listify(id) message = "Deleted %d users: " % len(ids) for user_id in ids: user = get_user(trans, user_id) user.deleted = True compliance_log.info(f'delete-user-event: {user_id}') # See lib/galaxy/webapps/tool_shed/controllers/admin.py pseudorandom_value = str(int(time.time())) email_hash = new_secure_hash(user.email + pseudorandom_value) uname_hash = new_secure_hash(user.username + pseudorandom_value) for role in user.all_roles(): print(role, self.app.config.redact_username_during_deletion, self.app.config.redact_email_during_deletion) if self.app.config.redact_username_during_deletion: role.name = role.name.replace(user.username, uname_hash) role.description = role.description.replace(user.username, uname_hash) if self.app.config.redact_email_during_deletion: role.name = role.name.replace(user.email, email_hash) role.description = role.description.replace(user.email, email_hash) if self.app.config.redact_email_during_deletion: user.email = email_hash if self.app.config.redact_username_during_deletion: user.username = uname_hash trans.sa_session.add(user) trans.sa_session.flush() message += f" {user.email} " trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def undelete_user(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No user ids received for undeleting" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=message, status='error')) ids = util.listify(id) count = 0 undeleted_users = "" for user_id in ids: user = get_user(trans, user_id) if not user.deleted: message = f"User '{user.email}' has not been deleted, so it cannot be undeleted." trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) user.deleted = False trans.sa_session.add(user) trans.sa_session.flush() count += 1 undeleted_users += f" {user.email}" message = "Undeleted %d users: %s" % (count, undeleted_users) trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def purge_user(self, trans, **kwd): # This method should only be called for a User that has previously been deleted. # We keep the User in the database ( marked as purged ), and stuff associated # with the user's private role in case we want the ability to unpurge the user # some time in the future. # Purging a deleted User deletes all of the following: # - History where user_id = User.id # - HistoryDatasetAssociation where history_id = History.id # - Dataset where HistoryDatasetAssociation.dataset_id = Dataset.id # - UserGroupAssociation where user_id == User.id # - UserRoleAssociation where user_id == User.id EXCEPT FOR THE PRIVATE ROLE # - UserAddress where user_id == User.id # Purging Histories and Datasets must be handled via the cleanup_datasets.py script id = kwd.get('id', None) if not id: message = "No user ids received for purging" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) ids = util.listify(id) message = "Purged %d users: " % len(ids) for user_id in ids: user = get_user(trans, user_id) if not user.deleted: # We should never reach here, but just in case there is a bug somewhere... message = f"User '{user.email}' has not been deleted, so it cannot be purged." trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) private_role = trans.app.security_agent.get_private_user_role(user) # Delete History for h in user.active_histories: trans.sa_session.refresh(h) for hda in h.active_datasets: # Delete HistoryDatasetAssociation d = trans.sa_session.query(trans.app.model.Dataset).get(hda.dataset_id) # Delete Dataset if not d.deleted: d.deleted = True trans.sa_session.add(d) hda.deleted = True trans.sa_session.add(hda) h.deleted = True trans.sa_session.add(h) # Delete UserGroupAssociations for uga in user.groups: trans.sa_session.delete(uga) # Delete UserRoleAssociations EXCEPT FOR THE PRIVATE ROLE for ura in user.roles: if ura.role_id != private_role.id: trans.sa_session.delete(ura) # Delete UserAddresses for address in user.addresses: trans.sa_session.delete(address) # Purge the user user.purged = True trans.sa_session.add(user) trans.sa_session.flush() message += f"{user.email} " trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def users(self, trans, **kwd): if 'operation' in kwd: operation = kwd['operation'].lower() if operation == "roles": return self.user(trans, **kwd) elif operation == "reset password": return self.reset_user_password(trans, **kwd) elif operation == "delete": return self.mark_user_deleted(trans, **kwd) elif operation == "undelete": return self.undelete_user(trans, **kwd) elif operation == "purge": return self.purge_user(trans, **kwd) elif operation == "create": return self.create_new_user(trans, **kwd) elif operation == "manage roles and groups": return self.manage_roles_and_groups_for_user(trans, **kwd) if trans.app.config.allow_user_deletion: if self.delete_operation not in self.user_list_grid.operations: self.user_list_grid.operations.append(self.delete_operation) if self.undelete_operation not in self.user_list_grid.operations: self.user_list_grid.operations.append(self.undelete_operation) if self.purge_operation not in self.user_list_grid.operations: self.user_list_grid.operations.append(self.purge_operation) # Render the list view return self.user_list_grid(trans, **kwd)
[docs] @web.expose @web.require_admin def name_autocomplete_data(self, trans, q=None, limit=None, timestamp=None): """Return autocomplete data for user emails""" ac_data = "" for user in trans.sa_session.query(trans.app.model.User).filter_by(deleted=False).filter(func.lower(trans.app.model.User.email).like(f"{q.lower()}%")): ac_data = f"{ac_data + user.email}\n" return ac_data
[docs] @web.expose @web.require_admin def manage_roles_and_groups_for_user(self, trans, **kwd): user_id = kwd.get('id', None) message = '' status = '' if not user_id: message += f"Invalid user id ({str(user_id)}) received" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) user = get_user(trans, user_id) private_role = trans.app.security_agent.get_private_user_role(user) if kwd.get('user_roles_groups_edit_button', False): # Make sure the user is not dis-associating himself from his private role out_roles = kwd.get('out_roles', []) if out_roles: out_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(out_roles)] if private_role in out_roles: message += "You cannot eliminate a user's private role association. " status = 'error' in_roles = kwd.get('in_roles', []) if in_roles: in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(in_roles)] out_groups = kwd.get('out_groups', []) if out_groups: out_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(out_groups)] in_groups = kwd.get('in_groups', []) if in_groups: in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(in_groups)] if in_roles: trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups) trans.sa_session.refresh(user) message += "User '%s' has been updated with %d associated roles and %d associated groups (private roles are not displayed)" % \ (user.email, len(in_roles), len(in_groups)) trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done')) in_roles = [] out_roles = [] in_groups = [] out_groups = [] for role in trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.deleted == false()) \ .order_by(trans.app.model.Role.table.c.name): if role in [x.role for x in user.roles]: in_roles.append((role.id, role.name)) elif role.type != trans.app.model.Role.types.PRIVATE: # There is a 1 to 1 mapping between a user and a PRIVATE role, so private roles should # not be listed in the roles form fields, except for the currently selected user's private # role, which should always be in in_roles. The check above is added as an additional # precaution, since for a period of time we were including private roles in the form fields. out_roles.append((role.id, role.name)) for group in trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.deleted == false()) \ .order_by(trans.app.model.Group.table.c.name): if group in [x.group for x in user.groups]: in_groups.append((group.id, group.name)) else: out_groups.append((group.id, group.name)) message += "User '%s' is currently associated with %d roles and is a member of %d groups" % \ (user.email, len(in_roles), len(in_groups)) if not status: status = 'done' return trans.fill_template('/webapps/tool_shed/admin/user/user.mako', user=user, in_roles=in_roles, out_roles=out_roles, in_groups=in_groups, out_groups=out_groups, message=message, status=status)
# ---- Utility methods -------------------------------------------------------
[docs]def get_user(trans, user_id): """Get a User from the database by id.""" user = trans.sa_session.query(trans.model.User).get(trans.security.decode_id(user_id)) if not user: return trans.show_error_message(f"User not found for id ({str(user_id)})") return user
[docs]def get_role(trans, id): """Get a Role from the database by id.""" # Load user from database id = trans.security.decode_id(id) role = trans.sa_session.query(trans.model.Role).get(id) if not role: return trans.show_error_message(f"Role not found for id ({str(id)})") return role
[docs]def get_group(trans, id): """Get a Group from the database by id.""" # Load user from database id = trans.security.decode_id(id) group = trans.sa_session.query(trans.model.Group).get(id) if not group: return trans.show_error_message(f"Group not found for id ({str(id)})") return group