Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.admin_util

import logging
import time

from sqlalchemy import false, func

from galaxy import util, web
from galaxy.security.validate_user_input import validate_password
from galaxy.util import inflector
from galaxy.util.hash_util import new_secure_hash
from galaxy.web.form_builder import CheckboxField
from tool_shed.util.web_util import escape

log = logging.getLogger(__name__)
compliance_log = logging.getLogger('COMPLIANCE')


[docs]class Admin: # Override these user_list_grid = None role_list_grid = None group_list_grid = None delete_operation = None undelete_operation = None purge_operation = None
[docs] @web.expose @web.require_admin def index(self, trans, **kwd): message = escape(kwd.get('message', '')) status = kwd.get('status', 'done') return trans.fill_template('/webapps/tool_shed/admin/index.mako', message=message, status=status)
[docs] @web.expose @web.require_admin def center(self, trans, **kwd): message = escape(kwd.get('message', '')) status = kwd.get('status', 'done') return trans.fill_template('/webapps/tool_shed/admin/center.mako', message=message, status=status)
[docs] @web.expose @web.require_admin def roles(self, trans, **kwargs): if 'operation' in kwargs: operation = kwargs['operation'].lower().replace('+', ' ') if operation == "roles": return self.role(trans, **kwargs) if operation == "create": return self.create_role(trans, **kwargs) if operation == "delete": return self.mark_role_deleted(trans, **kwargs) if operation == "undelete": return self.undelete_role(trans, **kwargs) if operation == "purge": return self.purge_role(trans, **kwargs) if operation == "manage users and groups": return self.manage_users_and_groups_for_role(trans, **kwargs) if operation == "manage role associations": # This is currently used only in the Tool Shed. return self.manage_role_associations(trans, **kwargs) if operation == "rename": return self.rename_role(trans, **kwargs) # Render the list view return self.role_list_grid(trans, **kwargs)
[docs] @web.expose @web.require_admin def create_role(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') name = util.restore_text(params.get('name', '')) description = util.restore_text(params.get('description', '')) in_users = util.listify(params.get('in_users', [])) out_users = util.listify(params.get('out_users', [])) in_groups = util.listify(params.get('in_groups', [])) out_groups = util.listify(params.get('out_groups', [])) create_group_for_role = params.get('create_group_for_role', '') create_group_for_role_checked = CheckboxField.is_checked(create_group_for_role) ok = True if params.get('create_role_button', False): if not name or not description: message = "Enter a valid name and a description." status = 'error' ok = False elif trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == name).first(): message = "Role names must be unique and a role with that name already exists, so choose another name." status = 'error' ok = False else: # Create the role role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN) trans.sa_session.add(role) # Create the UserRoleAssociations for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]: ura = trans.app.model.UserRoleAssociation(user, role) trans.sa_session.add(ura) # Create the GroupRoleAssociations for group in [trans.sa_session.query(trans.app.model.Group).get(x) for x in in_groups]: gra = trans.app.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) if create_group_for_role_checked: # Create the group group = trans.app.model.Group(name=name) trans.sa_session.add(group) # Associate the group with the role gra = trans.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) num_in_groups = len(in_groups) + 1 else: num_in_groups = len(in_groups) trans.sa_session.flush() message = "Role '%s' has been created with %d associated users and %d associated groups. " \ % (role.name, len(in_users), num_in_groups) if create_group_for_role_checked: message += 'One of the groups associated with this role is the newly created group with the same name.' trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done')) if ok: for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): out_users.append((user.id, user.email)) for group in trans.sa_session.query(trans.app.model.Group) \ .filter(trans.app.model.Group.table.c.deleted == false()) \ .order_by(trans.app.model.Group.table.c.name): out_groups.append((group.id, group.name)) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_create.mako', name=name, description=description, in_users=in_users, out_users=out_users, in_groups=in_groups, out_groups=out_groups, create_group_for_role_checked=create_group_for_role_checked, message=message, status=status)
[docs] @web.expose @web.require_admin def rename_role(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') id = params.get('id', None) if not id: message = "No role ids received for renaming" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) role = get_role(trans, id) if params.get('rename_role_button', False): old_name = role.name new_name = util.restore_text(params.name) new_description = util.restore_text(params.description) if not new_name: message = 'Enter a valid name' status = 'error' else: existing_role = trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == new_name).first() if existing_role and existing_role.id != role.id: message = 'A role with that name already exists' status = 'error' else: if not (role.name == new_name and role.description == new_description): role.name = new_name role.description = new_description trans.sa_session.add(role) trans.sa_session.flush() message = f"Role '{old_name}' has been renamed to '{new_name}'" return trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done')) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_rename.mako', role=role, message=message, status=status)
[docs] @web.expose @web.require_admin def manage_users_and_groups_for_role(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') id = params.get('id', None) if not id: message = "No role ids received for managing users and groups" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) role = get_role(trans, id) if params.get('role_members_edit_button', False): in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)] if trans.webapp.name == 'galaxy': for ura in role.users: user = trans.sa_session.query(trans.app.model.User).get(ura.user_id) if user not in in_users: # Delete DefaultUserPermissions for previously associated users that have been removed from the role for dup in user.default_permissions: if role == dup.role: trans.sa_session.delete(dup) # Delete DefaultHistoryPermissions for previously associated users that have been removed from the role for history in user.histories: for dhp in history.default_permissions: if role == dhp.role: trans.sa_session.delete(dhp) trans.sa_session.flush() in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(params.in_groups)] trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups) trans.sa_session.refresh(role) message = "Role '%s' has been updated with %d associated users and %d associated groups" % (role.name, len(in_users), len(in_groups)) trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status=status)) in_users = [] out_users = [] in_groups = [] out_groups = [] for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): if user in [x.user for x in role.users]: in_users.append((user.id, user.email)) else: out_users.append((user.id, user.email)) for group in trans.sa_session.query(trans.app.model.Group) \ .filter(trans.app.model.Group.table.c.deleted == false()) \ .order_by(trans.app.model.Group.table.c.name): if group in [x.group for x in role.groups]: in_groups.append((group.id, group.name)) else: out_groups.append((group.id, group.name)) library_dataset_actions = {} if trans.webapp.name == 'galaxy' and len(role.dataset_actions) < 25: # Build a list of tuples that are LibraryDatasetDatasetAssociationss followed by a list of actions # whose DatasetPermissions is associated with the Role # [ ( LibraryDatasetDatasetAssociation [ action, action ] ) ] for dp in role.dataset_actions: for ldda in trans.sa_session.query(trans.app.model.LibraryDatasetDatasetAssociation) \ .filter(trans.app.model.LibraryDatasetDatasetAssociation.dataset_id == dp.dataset_id): root_found = False folder_path = '' folder = ldda.library_dataset.folder while not root_found: folder_path = f'{folder.name} / {folder_path}' if not folder.parent: root_found = True else: folder = folder.parent folder_path = f'{folder_path} {ldda.name}' library = trans.sa_session.query(trans.app.model.Library) \ .filter(trans.app.model.Library.table.c.root_folder_id == folder.id) \ .first() if library not in library_dataset_actions: library_dataset_actions[library] = {} try: library_dataset_actions[library][folder_path].append(dp.action) except Exception: library_dataset_actions[library][folder_path] = [dp.action] else: message = "Not showing associated datasets, there are too many." status = 'info' return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role.mako', role=role, in_users=in_users, out_users=out_users, in_groups=in_groups, out_groups=out_groups, library_dataset_actions=library_dataset_actions, message=message, status=status)
[docs] @web.expose @web.require_admin def mark_role_deleted(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No role ids received for deleting" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) ids = util.listify(id) message = "Deleted %d roles: " % len(ids) for role_id in ids: role = get_role(trans, role_id) role.deleted = True trans.sa_session.add(role) trans.sa_session.flush() message += " %s " % role.name trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def undelete_role(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No role ids received for undeleting" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=message, status='error')) ids = util.listify(id) count = 0 undeleted_roles = "" for role_id in ids: role = get_role(trans, role_id) if not role.deleted: message = "Role '%s' has not been deleted, so it cannot be undeleted." % role.name trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='error')) role.deleted = False trans.sa_session.add(role) trans.sa_session.flush() count += 1 undeleted_roles += " %s" % role.name message = "Undeleted %d roles: %s" % (count, undeleted_roles) trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def purge_role(self, trans, **kwd): # This method should only be called for a Role that has previously been deleted. # Purging a deleted Role deletes all of the following from the database: # - UserRoleAssociations where role_id == Role.id # - DefaultUserPermissions where role_id == Role.id # - DefaultHistoryPermissions where role_id == Role.id # - GroupRoleAssociations where role_id == Role.id # - DatasetPermissionss where role_id == Role.id id = kwd.get('id', None) if not id: message = "No role ids received for purging" trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='error')) ids = util.listify(id) message = "Purged %d roles: " % len(ids) for role_id in ids: role = get_role(trans, role_id) if not role.deleted: message = "Role '%s' has not been deleted, so it cannot be purged." % role.name trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='error')) # Delete UserRoleAssociations for ura in role.users: user = trans.sa_session.query(trans.app.model.User).get(ura.user_id) # Delete DefaultUserPermissions for associated users for dup in user.default_permissions: if role == dup.role: trans.sa_session.delete(dup) # Delete DefaultHistoryPermissions for associated users for history in user.histories: for dhp in history.default_permissions: if role == dhp.role: trans.sa_session.delete(dhp) trans.sa_session.delete(ura) # Delete GroupRoleAssociations for gra in role.groups: trans.sa_session.delete(gra) # Delete DatasetPermissionss for dp in role.dataset_actions: trans.sa_session.delete(dp) trans.sa_session.flush() message += " %s " % role.name trans.response.send_redirect(web.url_for(controller='admin', action='roles', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def groups(self, trans, **kwargs): if 'operation' in kwargs: operation = kwargs['operation'].lower().replace('+', ' ') if operation == "groups": return self.group(trans, **kwargs) if operation == "create": return self.create_group(trans, **kwargs) if operation == "delete": return self.mark_group_deleted(trans, **kwargs) if operation == "undelete": return self.undelete_group(trans, **kwargs) if operation == "purge": return self.purge_group(trans, **kwargs) if operation == "manage users and roles": return self.manage_users_and_roles_for_group(trans, **kwargs) if operation == "rename": return self.rename_group(trans, **kwargs) # Render the list view return self.group_list_grid(trans, **kwargs)
[docs] @web.expose @web.require_admin def rename_group(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') id = params.get('id', None) if not id: message = "No group ids received for renaming" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=message, status='error')) group = get_group(trans, id) if params.get('rename_group_button', False): old_name = group.name new_name = util.restore_text(params.name) if not new_name: message = 'Enter a valid name' status = 'error' else: existing_group = trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == new_name).first() if existing_group and existing_group.id != group.id: message = 'A group with that name already exists' status = 'error' else: if group.name != new_name: group.name = new_name trans.sa_session.add(group) trans.sa_session.flush() message = f"Group '{old_name}' has been renamed to '{new_name}'" return trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done')) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_rename.mako', group=group, message=message, status=status)
[docs] @web.expose @web.require_admin def manage_users_and_roles_for_group(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') group = get_group(trans, params.id) if params.get('group_roles_users_edit_button', False): in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(params.in_roles)] in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)] trans.app.security_agent.set_entity_group_associations(groups=[group], roles=in_roles, users=in_users) trans.sa_session.refresh(group) message += "Group '%s' has been updated with %d associated roles and %d associated users" % (group.name, len(in_roles), len(in_users)) trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status=status)) in_roles = [] out_roles = [] in_users = [] out_users = [] for role in trans.sa_session.query(trans.app.model.Role) \ .filter(trans.app.model.Role.table.c.deleted == false()) \ .order_by(trans.app.model.Role.table.c.name): if role in [x.role for x in group.roles]: in_roles.append((role.id, role.name)) else: out_roles.append((role.id, role.name)) for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): if user in [x.user for x in group.users]: in_users.append((user.id, user.email)) else: out_users.append((user.id, user.email)) message += 'Group %s is currently associated with %d roles and %d users' % (group.name, len(in_roles), len(in_users)) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group.mako', group=group, in_roles=in_roles, out_roles=out_roles, in_users=in_users, out_users=out_users, message=message, status=status)
[docs] @web.expose @web.require_admin def create_group(self, trans, **kwd): params = util.Params(kwd) message = util.restore_text(params.get('message', '')) status = params.get('status', 'done') name = util.restore_text(params.get('name', '')) in_users = util.listify(params.get('in_users', [])) out_users = util.listify(params.get('out_users', [])) in_roles = util.listify(params.get('in_roles', [])) out_roles = util.listify(params.get('out_roles', [])) create_role_for_group = params.get('create_role_for_group', '') create_role_for_group_checked = CheckboxField.is_checked(create_role_for_group) ok = True if params.get('create_group_button', False): if not name: message = "Enter a valid name." status = 'error' ok = False elif trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == name).first(): message = "Group names must be unique and a group with that name already exists, so choose another name." status = 'error' ok = False else: # Create the group group = trans.app.model.Group(name=name) trans.sa_session.add(group) trans.sa_session.flush() # Create the UserRoleAssociations for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]: uga = trans.app.model.UserGroupAssociation(user, group) trans.sa_session.add(uga) # Create the GroupRoleAssociations for role in [trans.sa_session.query(trans.app.model.Role).get(x) for x in in_roles]: gra = trans.app.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) if create_role_for_group_checked: # Create the role role = trans.app.model.Role(name=name, description='Role for group %s' % name) trans.sa_session.add(role) # Associate the role with the group gra = trans.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) num_in_roles = len(in_roles) + 1 else: num_in_roles = len(in_roles) trans.sa_session.flush() message = "Group '%s' has been created with %d associated users and %d associated roles. " \ % (group.name, len(in_users), num_in_roles) if create_role_for_group_checked: message += 'One of the roles associated with this group is the newly created role with the same name.' trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done')) if ok: for user in trans.sa_session.query(trans.app.model.User) \ .filter(trans.app.model.User.table.c.deleted == false()) \ .order_by(trans.app.model.User.table.c.email): out_users.append((user.id, user.email)) for role in trans.sa_session.query(trans.app.model.Role) \ .filter(trans.app.model.Role.table.c.deleted == false()) \ .order_by(trans.app.model.Role.table.c.name): out_roles.append((role.id, role.name)) return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_create.mako', name=name, in_users=in_users, out_users=out_users, in_roles=in_roles, out_roles=out_roles, create_role_for_group_checked=create_role_for_group_checked, message=message, status=status)
[docs] @web.expose @web.require_admin def mark_group_deleted(self, trans, **kwd): params = util.Params(kwd) id = params.get('id', None) if not id: message = "No group ids received for marking deleted" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=message, status='error')) ids = util.listify(id) message = "Deleted %d groups: " % len(ids) for group_id in ids: group = get_group(trans, group_id) group.deleted = True trans.sa_session.add(group) trans.sa_session.flush() message += " %s " % group.name trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def undelete_group(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No group ids received for undeleting" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=message, status='error')) ids = util.listify(id) count = 0 undeleted_groups = "" for group_id in ids: group = get_group(trans, group_id) if not group.deleted: message = "Group '%s' has not been deleted, so it cannot be undeleted." % group.name trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='error')) group.deleted = False trans.sa_session.add(group) trans.sa_session.flush() count += 1 undeleted_groups += " %s" % group.name message = "Undeleted %d groups: %s" % (count, undeleted_groups) trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def purge_group(self, trans, **kwd): # This method should only be called for a Group that has previously been deleted. # Purging a deleted Group simply deletes all UserGroupAssociations and GroupRoleAssociations. id = kwd.get('id', None) if not id: message = "No group ids received for purging" trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='error')) ids = util.listify(id) message = "Purged %d groups: " % len(ids) for group_id in ids: group = get_group(trans, group_id) if not group.deleted: # We should never reach here, but just in case there is a bug somewhere... message = "Group '%s' has not been deleted, so it cannot be purged." % group.name trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='error')) # Delete UserGroupAssociations for uga in group.users: trans.sa_session.delete(uga) # Delete GroupRoleAssociations for gra in group.roles: trans.sa_session.delete(gra) trans.sa_session.flush() message += " %s " % group.name trans.response.send_redirect(web.url_for(controller='admin', action='groups', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def create_new_user(self, trans, **kwd): return trans.response.send_redirect(web.url_for(controller='user', action='create', cntrller='admin'))
[docs] @web.expose @web.require_admin def reset_user_password(self, trans, **kwd): user_id = kwd.get('id', None) if not user_id: message = "No users received for resetting passwords." trans.response.send_redirect(web.url_for(controller='admin', action='users', message=message, status='error')) user_ids = util.listify(user_id) if 'reset_user_password_button' in kwd: message = '' status = '' for user_id in user_ids: user = get_user(trans, user_id) password = kwd.get('password', None) confirm = kwd.get('confirm', None) message = validate_password(trans, password, confirm) if message: status = 'error' break else: user.set_password_cleartext(password) trans.sa_session.add(user) trans.sa_session.flush() if not message and not status: message = "Passwords reset for %d %s." % (len(user_ids), inflector.cond_plural(len(user_ids), 'user')) status = 'done' trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status=status)) users = [get_user(trans, user_id) for user_id in user_ids] if len(user_ids) > 1: user_id = ','.join(user_ids) return trans.fill_template('/webapps/tool_shed/admin/user/reset_password.mako', id=user_id, users=users, password='', confirm='')
[docs] @web.expose @web.require_admin def mark_user_deleted(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No user ids received for deleting" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=message, status='error')) ids = util.listify(id) message = "Deleted %d users: " % len(ids) for user_id in ids: user = get_user(trans, user_id) user.deleted = True compliance_log.info('delete-user-event: %s' % user_id) # See lib/galaxy/webapps/tool_shed/controllers/admin.py pseudorandom_value = str(int(time.time())) email_hash = new_secure_hash(user.email + pseudorandom_value) uname_hash = new_secure_hash(user.username + pseudorandom_value) for role in user.all_roles(): print(role, self.app.config.redact_username_during_deletion, self.app.config.redact_email_during_deletion) if self.app.config.redact_username_during_deletion: role.name = role.name.replace(user.username, uname_hash) role.description = role.description.replace(user.username, uname_hash) if self.app.config.redact_email_during_deletion: role.name = role.name.replace(user.email, email_hash) role.description = role.description.replace(user.email, email_hash) if self.app.config.redact_email_during_deletion: user.email = email_hash if self.app.config.redact_username_during_deletion: user.username = uname_hash trans.sa_session.add(user) trans.sa_session.flush() message += " %s " % user.email trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def undelete_user(self, trans, **kwd): id = kwd.get('id', None) if not id: message = "No user ids received for undeleting" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=message, status='error')) ids = util.listify(id) count = 0 undeleted_users = "" for user_id in ids: user = get_user(trans, user_id) if not user.deleted: message = "User '%s' has not been deleted, so it cannot be undeleted." % user.email trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) user.deleted = False trans.sa_session.add(user) trans.sa_session.flush() count += 1 undeleted_users += " %s" % user.email message = "Undeleted %d users: %s" % (count, undeleted_users) trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def purge_user(self, trans, **kwd): # This method should only be called for a User that has previously been deleted. # We keep the User in the database ( marked as purged ), and stuff associated # with the user's private role in case we want the ability to unpurge the user # some time in the future. # Purging a deleted User deletes all of the following: # - History where user_id = User.id # - HistoryDatasetAssociation where history_id = History.id # - Dataset where HistoryDatasetAssociation.dataset_id = Dataset.id # - UserGroupAssociation where user_id == User.id # - UserRoleAssociation where user_id == User.id EXCEPT FOR THE PRIVATE ROLE # - UserAddress where user_id == User.id # Purging Histories and Datasets must be handled via the cleanup_datasets.py script id = kwd.get('id', None) if not id: message = "No user ids received for purging" trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) ids = util.listify(id) message = "Purged %d users: " % len(ids) for user_id in ids: user = get_user(trans, user_id) if not user.deleted: # We should never reach here, but just in case there is a bug somewhere... message = "User '%s' has not been deleted, so it cannot be purged." % user.email trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) private_role = trans.app.security_agent.get_private_user_role(user) # Delete History for h in user.active_histories: trans.sa_session.refresh(h) for hda in h.active_datasets: # Delete HistoryDatasetAssociation d = trans.sa_session.query(trans.app.model.Dataset).get(hda.dataset_id) # Delete Dataset if not d.deleted: d.deleted = True trans.sa_session.add(d) hda.deleted = True trans.sa_session.add(hda) h.deleted = True trans.sa_session.add(h) # Delete UserGroupAssociations for uga in user.groups: trans.sa_session.delete(uga) # Delete UserRoleAssociations EXCEPT FOR THE PRIVATE ROLE for ura in user.roles: if ura.role_id != private_role.id: trans.sa_session.delete(ura) # Delete UserAddresses for address in user.addresses: trans.sa_session.delete(address) # Purge the user user.purged = True trans.sa_session.add(user) trans.sa_session.flush() message += "%s " % user.email trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done'))
[docs] @web.expose @web.require_admin def users(self, trans, **kwd): if 'operation' in kwd: operation = kwd['operation'].lower() if operation == "roles": return self.user(trans, **kwd) elif operation == "reset password": return self.reset_user_password(trans, **kwd) elif operation == "delete": return self.mark_user_deleted(trans, **kwd) elif operation == "undelete": return self.undelete_user(trans, **kwd) elif operation == "purge": return self.purge_user(trans, **kwd) elif operation == "create": return self.create_new_user(trans, **kwd) elif operation == "manage roles and groups": return self.manage_roles_and_groups_for_user(trans, **kwd) if trans.app.config.allow_user_deletion: if self.delete_operation not in self.user_list_grid.operations: self.user_list_grid.operations.append(self.delete_operation) if self.undelete_operation not in self.user_list_grid.operations: self.user_list_grid.operations.append(self.undelete_operation) if self.purge_operation not in self.user_list_grid.operations: self.user_list_grid.operations.append(self.purge_operation) # Render the list view return self.user_list_grid(trans, **kwd)
[docs] @web.expose @web.require_admin def name_autocomplete_data(self, trans, q=None, limit=None, timestamp=None): """Return autocomplete data for user emails""" ac_data = "" for user in trans.sa_session.query(trans.app.model.User).filter_by(deleted=False).filter(func.lower(trans.app.model.User.email).like(q.lower() + "%")): ac_data = ac_data + user.email + "\n" return ac_data
[docs] @web.expose @web.require_admin def manage_roles_and_groups_for_user(self, trans, **kwd): user_id = kwd.get('id', None) message = '' status = '' if not user_id: message += "Invalid user id (%s) received" % str(user_id) trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='error')) user = get_user(trans, user_id) private_role = trans.app.security_agent.get_private_user_role(user) if kwd.get('user_roles_groups_edit_button', False): # Make sure the user is not dis-associating himself from his private role out_roles = kwd.get('out_roles', []) if out_roles: out_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(out_roles)] if private_role in out_roles: message += "You cannot eliminate a user's private role association. " status = 'error' in_roles = kwd.get('in_roles', []) if in_roles: in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(in_roles)] out_groups = kwd.get('out_groups', []) if out_groups: out_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(out_groups)] in_groups = kwd.get('in_groups', []) if in_groups: in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(in_groups)] if in_roles: trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups) trans.sa_session.refresh(user) message += "User '%s' has been updated with %d associated roles and %d associated groups (private roles are not displayed)" % \ (user.email, len(in_roles), len(in_groups)) trans.response.send_redirect(web.url_for(controller='admin', action='users', message=util.sanitize_text(message), status='done')) in_roles = [] out_roles = [] in_groups = [] out_groups = [] for role in trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.deleted == false()) \ .order_by(trans.app.model.Role.table.c.name): if role in [x.role for x in user.roles]: in_roles.append((role.id, role.name)) elif role.type != trans.app.model.Role.types.PRIVATE: # There is a 1 to 1 mapping between a user and a PRIVATE role, so private roles should # not be listed in the roles form fields, except for the currently selected user's private # role, which should always be in in_roles. The check above is added as an additional # precaution, since for a period of time we were including private roles in the form fields. out_roles.append((role.id, role.name)) for group in trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.deleted == false()) \ .order_by(trans.app.model.Group.table.c.name): if group in [x.group for x in user.groups]: in_groups.append((group.id, group.name)) else: out_groups.append((group.id, group.name)) message += "User '%s' is currently associated with %d roles and is a member of %d groups" % \ (user.email, len(in_roles), len(in_groups)) if not status: status = 'done' return trans.fill_template('/webapps/tool_shed/admin/user/user.mako', user=user, in_roles=in_roles, out_roles=out_roles, in_groups=in_groups, out_groups=out_groups, message=message, status=status)
# ---- Utility methods -------------------------------------------------------
[docs]def get_user(trans, user_id): """Get a User from the database by id.""" user = trans.sa_session.query(trans.model.User).get(trans.security.decode_id(user_id)) if not user: return trans.show_error_message("User not found for id (%s)" % str(user_id)) return user
[docs]def get_role(trans, id): """Get a Role from the database by id.""" # Load user from database id = trans.security.decode_id(id) role = trans.sa_session.query(trans.model.Role).get(id) if not role: return trans.show_error_message("Role not found for id (%s)" % str(id)) return role
[docs]def get_group(trans, id): """Get a Group from the database by id.""" # Load user from database id = trans.security.decode_id(id) group = trans.sa_session.query(trans.model.Group).get(id) if not group: return trans.show_error_message("Group not found for id (%s)" % str(id)) return group