Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.admin_util
import logging
import time
from sqlalchemy import false, func
from galaxy import util, web
from galaxy.security.validate_user_input import validate_password
from galaxy.util import inflector
from galaxy.util.hash_util import new_secure_hash
from galaxy.web.form_builder import CheckboxField
from tool_shed.util.web_util import escape
log = logging.getLogger(__name__)
compliance_log = logging.getLogger('COMPLIANCE')
[docs]class Admin:
# Override these
user_list_grid = None
role_list_grid = None
group_list_grid = None
delete_operation = None
undelete_operation = None
purge_operation = None
[docs] @web.expose
@web.require_admin
def index(self, trans, **kwd):
message = escape(kwd.get('message', ''))
status = kwd.get('status', 'done')
return trans.fill_template('/webapps/tool_shed/admin/index.mako',
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def center(self, trans, **kwd):
message = escape(kwd.get('message', ''))
status = kwd.get('status', 'done')
return trans.fill_template('/webapps/tool_shed/admin/center.mako',
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def roles(self, trans, **kwargs):
if 'operation' in kwargs:
operation = kwargs['operation'].lower().replace('+', ' ')
if operation == "roles":
return self.role(trans, **kwargs)
if operation == "create":
return self.create_role(trans, **kwargs)
if operation == "delete":
return self.mark_role_deleted(trans, **kwargs)
if operation == "undelete":
return self.undelete_role(trans, **kwargs)
if operation == "purge":
return self.purge_role(trans, **kwargs)
if operation == "manage users and groups":
return self.manage_users_and_groups_for_role(trans, **kwargs)
if operation == "manage role associations":
# This is currently used only in the Tool Shed.
return self.manage_role_associations(trans, **kwargs)
if operation == "rename":
return self.rename_role(trans, **kwargs)
# Render the list view
return self.role_list_grid(trans, **kwargs)
[docs] @web.expose
@web.require_admin
def create_role(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get('message', ''))
status = params.get('status', 'done')
name = util.restore_text(params.get('name', ''))
description = util.restore_text(params.get('description', ''))
in_users = util.listify(params.get('in_users', []))
out_users = util.listify(params.get('out_users', []))
in_groups = util.listify(params.get('in_groups', []))
out_groups = util.listify(params.get('out_groups', []))
create_group_for_role = params.get('create_group_for_role', '')
create_group_for_role_checked = CheckboxField.is_checked(create_group_for_role)
ok = True
if params.get('create_role_button', False):
if not name or not description:
message = "Enter a valid name and a description."
status = 'error'
ok = False
elif trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == name).first():
message = "Role names must be unique and a role with that name already exists, so choose another name."
status = 'error'
ok = False
else:
# Create the role
role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN)
trans.sa_session.add(role)
# Create the UserRoleAssociations
for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]:
ura = trans.app.model.UserRoleAssociation(user, role)
trans.sa_session.add(ura)
# Create the GroupRoleAssociations
for group in [trans.sa_session.query(trans.app.model.Group).get(x) for x in in_groups]:
gra = trans.app.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
if create_group_for_role_checked:
# Create the group
group = trans.app.model.Group(name=name)
trans.sa_session.add(group)
# Associate the group with the role
gra = trans.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
num_in_groups = len(in_groups) + 1
else:
num_in_groups = len(in_groups)
trans.sa_session.flush()
message = "Role '%s' has been created with %d associated users and %d associated groups. " \
% (role.name, len(in_users), num_in_groups)
if create_group_for_role_checked:
message += 'One of the groups associated with this role is the newly created group with the same name.'
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='done'))
if ok:
for user in trans.sa_session.query(trans.app.model.User) \
.filter(trans.app.model.User.table.c.deleted == false()) \
.order_by(trans.app.model.User.table.c.email):
out_users.append((user.id, user.email))
for group in trans.sa_session.query(trans.app.model.Group) \
.filter(trans.app.model.Group.table.c.deleted == false()) \
.order_by(trans.app.model.Group.table.c.name):
out_groups.append((group.id, group.name))
return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_create.mako',
name=name,
description=description,
in_users=in_users,
out_users=out_users,
in_groups=in_groups,
out_groups=out_groups,
create_group_for_role_checked=create_group_for_role_checked,
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def rename_role(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get('message', ''))
status = params.get('status', 'done')
id = params.get('id', None)
if not id:
message = "No role ids received for renaming"
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=message,
status='error'))
role = get_role(trans, id)
if params.get('rename_role_button', False):
old_name = role.name
new_name = util.restore_text(params.name)
new_description = util.restore_text(params.description)
if not new_name:
message = 'Enter a valid name'
status = 'error'
else:
existing_role = trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == new_name).first()
if existing_role and existing_role.id != role.id:
message = 'A role with that name already exists'
status = 'error'
else:
if not (role.name == new_name and role.description == new_description):
role.name = new_name
role.description = new_description
trans.sa_session.add(role)
trans.sa_session.flush()
message = "Role '{}' has been renamed to '{}'".format(old_name, new_name)
return trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='done'))
return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role_rename.mako',
role=role,
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def manage_users_and_groups_for_role(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get('message', ''))
status = params.get('status', 'done')
id = params.get('id', None)
if not id:
message = "No role ids received for managing users and groups"
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=message,
status='error'))
role = get_role(trans, id)
if params.get('role_members_edit_button', False):
in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)]
if trans.webapp.name == 'galaxy':
for ura in role.users:
user = trans.sa_session.query(trans.app.model.User).get(ura.user_id)
if user not in in_users:
# Delete DefaultUserPermissions for previously associated users that have been removed from the role
for dup in user.default_permissions:
if role == dup.role:
trans.sa_session.delete(dup)
# Delete DefaultHistoryPermissions for previously associated users that have been removed from the role
for history in user.histories:
for dhp in history.default_permissions:
if role == dhp.role:
trans.sa_session.delete(dhp)
trans.sa_session.flush()
in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(params.in_groups)]
trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups)
trans.sa_session.refresh(role)
message = "Role '%s' has been updated with %d associated users and %d associated groups" % (role.name, len(in_users), len(in_groups))
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status=status))
in_users = []
out_users = []
in_groups = []
out_groups = []
for user in trans.sa_session.query(trans.app.model.User) \
.filter(trans.app.model.User.table.c.deleted == false()) \
.order_by(trans.app.model.User.table.c.email):
if user in [x.user for x in role.users]:
in_users.append((user.id, user.email))
else:
out_users.append((user.id, user.email))
for group in trans.sa_session.query(trans.app.model.Group) \
.filter(trans.app.model.Group.table.c.deleted == false()) \
.order_by(trans.app.model.Group.table.c.name):
if group in [x.group for x in role.groups]:
in_groups.append((group.id, group.name))
else:
out_groups.append((group.id, group.name))
library_dataset_actions = {}
if trans.webapp.name == 'galaxy' and len(role.dataset_actions) < 25:
# Build a list of tuples that are LibraryDatasetDatasetAssociationss followed by a list of actions
# whose DatasetPermissions is associated with the Role
# [ ( LibraryDatasetDatasetAssociation [ action, action ] ) ]
for dp in role.dataset_actions:
for ldda in trans.sa_session.query(trans.app.model.LibraryDatasetDatasetAssociation) \
.filter(trans.app.model.LibraryDatasetDatasetAssociation.dataset_id == dp.dataset_id):
root_found = False
folder_path = ''
folder = ldda.library_dataset.folder
while not root_found:
folder_path = '{} / {}'.format(folder.name, folder_path)
if not folder.parent:
root_found = True
else:
folder = folder.parent
folder_path = '{} {}'.format(folder_path, ldda.name)
library = trans.sa_session.query(trans.app.model.Library) \
.filter(trans.app.model.Library.table.c.root_folder_id == folder.id) \
.first()
if library not in library_dataset_actions:
library_dataset_actions[library] = {}
try:
library_dataset_actions[library][folder_path].append(dp.action)
except Exception:
library_dataset_actions[library][folder_path] = [dp.action]
else:
message = "Not showing associated datasets, there are too many."
status = 'info'
return trans.fill_template('/webapps/tool_shed/admin/dataset_security/role/role.mako',
role=role,
in_users=in_users,
out_users=out_users,
in_groups=in_groups,
out_groups=out_groups,
library_dataset_actions=library_dataset_actions,
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def mark_role_deleted(self, trans, **kwd):
id = kwd.get('id', None)
if not id:
message = "No role ids received for deleting"
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=message,
status='error'))
ids = util.listify(id)
message = "Deleted %d roles: " % len(ids)
for role_id in ids:
role = get_role(trans, role_id)
role.deleted = True
trans.sa_session.add(role)
trans.sa_session.flush()
message += " %s " % role.name
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def undelete_role(self, trans, **kwd):
id = kwd.get('id', None)
if not id:
message = "No role ids received for undeleting"
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=message,
status='error'))
ids = util.listify(id)
count = 0
undeleted_roles = ""
for role_id in ids:
role = get_role(trans, role_id)
if not role.deleted:
message = "Role '%s' has not been deleted, so it cannot be undeleted." % role.name
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='error'))
role.deleted = False
trans.sa_session.add(role)
trans.sa_session.flush()
count += 1
undeleted_roles += " %s" % role.name
message = "Undeleted %d roles: %s" % (count, undeleted_roles)
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def purge_role(self, trans, **kwd):
# This method should only be called for a Role that has previously been deleted.
# Purging a deleted Role deletes all of the following from the database:
# - UserRoleAssociations where role_id == Role.id
# - DefaultUserPermissions where role_id == Role.id
# - DefaultHistoryPermissions where role_id == Role.id
# - GroupRoleAssociations where role_id == Role.id
# - DatasetPermissionss where role_id == Role.id
id = kwd.get('id', None)
if not id:
message = "No role ids received for purging"
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='error'))
ids = util.listify(id)
message = "Purged %d roles: " % len(ids)
for role_id in ids:
role = get_role(trans, role_id)
if not role.deleted:
message = "Role '%s' has not been deleted, so it cannot be purged." % role.name
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='error'))
# Delete UserRoleAssociations
for ura in role.users:
user = trans.sa_session.query(trans.app.model.User).get(ura.user_id)
# Delete DefaultUserPermissions for associated users
for dup in user.default_permissions:
if role == dup.role:
trans.sa_session.delete(dup)
# Delete DefaultHistoryPermissions for associated users
for history in user.histories:
for dhp in history.default_permissions:
if role == dhp.role:
trans.sa_session.delete(dhp)
trans.sa_session.delete(ura)
# Delete GroupRoleAssociations
for gra in role.groups:
trans.sa_session.delete(gra)
# Delete DatasetPermissionss
for dp in role.dataset_actions:
trans.sa_session.delete(dp)
trans.sa_session.flush()
message += " %s " % role.name
trans.response.send_redirect(web.url_for(controller='admin',
action='roles',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def groups(self, trans, **kwargs):
if 'operation' in kwargs:
operation = kwargs['operation'].lower().replace('+', ' ')
if operation == "groups":
return self.group(trans, **kwargs)
if operation == "create":
return self.create_group(trans, **kwargs)
if operation == "delete":
return self.mark_group_deleted(trans, **kwargs)
if operation == "undelete":
return self.undelete_group(trans, **kwargs)
if operation == "purge":
return self.purge_group(trans, **kwargs)
if operation == "manage users and roles":
return self.manage_users_and_roles_for_group(trans, **kwargs)
if operation == "rename":
return self.rename_group(trans, **kwargs)
# Render the list view
return self.group_list_grid(trans, **kwargs)
[docs] @web.expose
@web.require_admin
def rename_group(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get('message', ''))
status = params.get('status', 'done')
id = params.get('id', None)
if not id:
message = "No group ids received for renaming"
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=message,
status='error'))
group = get_group(trans, id)
if params.get('rename_group_button', False):
old_name = group.name
new_name = util.restore_text(params.name)
if not new_name:
message = 'Enter a valid name'
status = 'error'
else:
existing_group = trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == new_name).first()
if existing_group and existing_group.id != group.id:
message = 'A group with that name already exists'
status = 'error'
else:
if group.name != new_name:
group.name = new_name
trans.sa_session.add(group)
trans.sa_session.flush()
message = "Group '{}' has been renamed to '{}'".format(old_name, new_name)
return trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='done'))
return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_rename.mako',
group=group,
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def manage_users_and_roles_for_group(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get('message', ''))
status = params.get('status', 'done')
group = get_group(trans, params.id)
if params.get('group_roles_users_edit_button', False):
in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(params.in_roles)]
in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)]
trans.app.security_agent.set_entity_group_associations(groups=[group], roles=in_roles, users=in_users)
trans.sa_session.refresh(group)
message += "Group '%s' has been updated with %d associated roles and %d associated users" % (group.name, len(in_roles), len(in_users))
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status=status))
in_roles = []
out_roles = []
in_users = []
out_users = []
for role in trans.sa_session.query(trans.app.model.Role) \
.filter(trans.app.model.Role.table.c.deleted == false()) \
.order_by(trans.app.model.Role.table.c.name):
if role in [x.role for x in group.roles]:
in_roles.append((role.id, role.name))
else:
out_roles.append((role.id, role.name))
for user in trans.sa_session.query(trans.app.model.User) \
.filter(trans.app.model.User.table.c.deleted == false()) \
.order_by(trans.app.model.User.table.c.email):
if user in [x.user for x in group.users]:
in_users.append((user.id, user.email))
else:
out_users.append((user.id, user.email))
message += 'Group %s is currently associated with %d roles and %d users' % (group.name, len(in_roles), len(in_users))
return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group.mako',
group=group,
in_roles=in_roles,
out_roles=out_roles,
in_users=in_users,
out_users=out_users,
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def create_group(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get('message', ''))
status = params.get('status', 'done')
name = util.restore_text(params.get('name', ''))
in_users = util.listify(params.get('in_users', []))
out_users = util.listify(params.get('out_users', []))
in_roles = util.listify(params.get('in_roles', []))
out_roles = util.listify(params.get('out_roles', []))
create_role_for_group = params.get('create_role_for_group', '')
create_role_for_group_checked = CheckboxField.is_checked(create_role_for_group)
ok = True
if params.get('create_group_button', False):
if not name:
message = "Enter a valid name."
status = 'error'
ok = False
elif trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == name).first():
message = "Group names must be unique and a group with that name already exists, so choose another name."
status = 'error'
ok = False
else:
# Create the group
group = trans.app.model.Group(name=name)
trans.sa_session.add(group)
trans.sa_session.flush()
# Create the UserRoleAssociations
for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]:
uga = trans.app.model.UserGroupAssociation(user, group)
trans.sa_session.add(uga)
# Create the GroupRoleAssociations
for role in [trans.sa_session.query(trans.app.model.Role).get(x) for x in in_roles]:
gra = trans.app.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
if create_role_for_group_checked:
# Create the role
role = trans.app.model.Role(name=name, description='Role for group %s' % name)
trans.sa_session.add(role)
# Associate the role with the group
gra = trans.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
num_in_roles = len(in_roles) + 1
else:
num_in_roles = len(in_roles)
trans.sa_session.flush()
message = "Group '%s' has been created with %d associated users and %d associated roles. " \
% (group.name, len(in_users), num_in_roles)
if create_role_for_group_checked:
message += 'One of the roles associated with this group is the newly created role with the same name.'
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='done'))
if ok:
for user in trans.sa_session.query(trans.app.model.User) \
.filter(trans.app.model.User.table.c.deleted == false()) \
.order_by(trans.app.model.User.table.c.email):
out_users.append((user.id, user.email))
for role in trans.sa_session.query(trans.app.model.Role) \
.filter(trans.app.model.Role.table.c.deleted == false()) \
.order_by(trans.app.model.Role.table.c.name):
out_roles.append((role.id, role.name))
return trans.fill_template('/webapps/tool_shed/admin/dataset_security/group/group_create.mako',
name=name,
in_users=in_users,
out_users=out_users,
in_roles=in_roles,
out_roles=out_roles,
create_role_for_group_checked=create_role_for_group_checked,
message=message,
status=status)
[docs] @web.expose
@web.require_admin
def mark_group_deleted(self, trans, **kwd):
params = util.Params(kwd)
id = params.get('id', None)
if not id:
message = "No group ids received for marking deleted"
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=message,
status='error'))
ids = util.listify(id)
message = "Deleted %d groups: " % len(ids)
for group_id in ids:
group = get_group(trans, group_id)
group.deleted = True
trans.sa_session.add(group)
trans.sa_session.flush()
message += " %s " % group.name
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def undelete_group(self, trans, **kwd):
id = kwd.get('id', None)
if not id:
message = "No group ids received for undeleting"
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=message,
status='error'))
ids = util.listify(id)
count = 0
undeleted_groups = ""
for group_id in ids:
group = get_group(trans, group_id)
if not group.deleted:
message = "Group '%s' has not been deleted, so it cannot be undeleted." % group.name
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='error'))
group.deleted = False
trans.sa_session.add(group)
trans.sa_session.flush()
count += 1
undeleted_groups += " %s" % group.name
message = "Undeleted %d groups: %s" % (count, undeleted_groups)
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def purge_group(self, trans, **kwd):
# This method should only be called for a Group that has previously been deleted.
# Purging a deleted Group simply deletes all UserGroupAssociations and GroupRoleAssociations.
id = kwd.get('id', None)
if not id:
message = "No group ids received for purging"
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='error'))
ids = util.listify(id)
message = "Purged %d groups: " % len(ids)
for group_id in ids:
group = get_group(trans, group_id)
if not group.deleted:
# We should never reach here, but just in case there is a bug somewhere...
message = "Group '%s' has not been deleted, so it cannot be purged." % group.name
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='error'))
# Delete UserGroupAssociations
for uga in group.users:
trans.sa_session.delete(uga)
# Delete GroupRoleAssociations
for gra in group.roles:
trans.sa_session.delete(gra)
trans.sa_session.flush()
message += " %s " % group.name
trans.response.send_redirect(web.url_for(controller='admin',
action='groups',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def create_new_user(self, trans, **kwd):
return trans.response.send_redirect(web.url_for(controller='user',
action='create',
cntrller='admin'))
[docs] @web.expose
@web.require_admin
def reset_user_password(self, trans, **kwd):
user_id = kwd.get('id', None)
if not user_id:
message = "No users received for resetting passwords."
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=message,
status='error'))
user_ids = util.listify(user_id)
if 'reset_user_password_button' in kwd:
message = ''
status = ''
for user_id in user_ids:
user = get_user(trans, user_id)
password = kwd.get('password', None)
confirm = kwd.get('confirm', None)
message = validate_password(trans, password, confirm)
if message:
status = 'error'
break
else:
user.set_password_cleartext(password)
trans.sa_session.add(user)
trans.sa_session.flush()
if not message and not status:
message = "Passwords reset for %d %s." % (len(user_ids), inflector.cond_plural(len(user_ids), 'user'))
status = 'done'
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status=status))
users = [get_user(trans, user_id) for user_id in user_ids]
if len(user_ids) > 1:
user_id = ','.join(user_ids)
return trans.fill_template('/webapps/tool_shed/admin/user/reset_password.mako',
id=user_id,
users=users,
password='',
confirm='')
[docs] @web.expose
@web.require_admin
def mark_user_deleted(self, trans, **kwd):
id = kwd.get('id', None)
if not id:
message = "No user ids received for deleting"
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=message,
status='error'))
ids = util.listify(id)
message = "Deleted %d users: " % len(ids)
for user_id in ids:
user = get_user(trans, user_id)
user.deleted = True
compliance_log.info('delete-user-event: %s' % user_id)
# See lib/galaxy/webapps/tool_shed/controllers/admin.py
pseudorandom_value = str(int(time.time()))
email_hash = new_secure_hash(user.email + pseudorandom_value)
uname_hash = new_secure_hash(user.username + pseudorandom_value)
for role in user.all_roles():
print(role, self.app.config.redact_username_during_deletion, self.app.config.redact_email_during_deletion)
if self.app.config.redact_username_during_deletion:
role.name = role.name.replace(user.username, uname_hash)
role.description = role.description.replace(user.username, uname_hash)
if self.app.config.redact_email_during_deletion:
role.name = role.name.replace(user.email, email_hash)
role.description = role.description.replace(user.email, email_hash)
if self.app.config.redact_email_during_deletion:
user.email = email_hash
if self.app.config.redact_username_during_deletion:
user.username = uname_hash
trans.sa_session.add(user)
trans.sa_session.flush()
message += " %s " % user.email
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def undelete_user(self, trans, **kwd):
id = kwd.get('id', None)
if not id:
message = "No user ids received for undeleting"
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=message,
status='error'))
ids = util.listify(id)
count = 0
undeleted_users = ""
for user_id in ids:
user = get_user(trans, user_id)
if not user.deleted:
message = "User '%s' has not been deleted, so it cannot be undeleted." % user.email
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='error'))
user.deleted = False
trans.sa_session.add(user)
trans.sa_session.flush()
count += 1
undeleted_users += " %s" % user.email
message = "Undeleted %d users: %s" % (count, undeleted_users)
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def purge_user(self, trans, **kwd):
# This method should only be called for a User that has previously been deleted.
# We keep the User in the database ( marked as purged ), and stuff associated
# with the user's private role in case we want the ability to unpurge the user
# some time in the future.
# Purging a deleted User deletes all of the following:
# - History where user_id = User.id
# - HistoryDatasetAssociation where history_id = History.id
# - Dataset where HistoryDatasetAssociation.dataset_id = Dataset.id
# - UserGroupAssociation where user_id == User.id
# - UserRoleAssociation where user_id == User.id EXCEPT FOR THE PRIVATE ROLE
# - UserAddress where user_id == User.id
# Purging Histories and Datasets must be handled via the cleanup_datasets.py script
id = kwd.get('id', None)
if not id:
message = "No user ids received for purging"
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='error'))
ids = util.listify(id)
message = "Purged %d users: " % len(ids)
for user_id in ids:
user = get_user(trans, user_id)
if not user.deleted:
# We should never reach here, but just in case there is a bug somewhere...
message = "User '%s' has not been deleted, so it cannot be purged." % user.email
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='error'))
private_role = trans.app.security_agent.get_private_user_role(user)
# Delete History
for h in user.active_histories:
trans.sa_session.refresh(h)
for hda in h.active_datasets:
# Delete HistoryDatasetAssociation
d = trans.sa_session.query(trans.app.model.Dataset).get(hda.dataset_id)
# Delete Dataset
if not d.deleted:
d.deleted = True
trans.sa_session.add(d)
hda.deleted = True
trans.sa_session.add(hda)
h.deleted = True
trans.sa_session.add(h)
# Delete UserGroupAssociations
for uga in user.groups:
trans.sa_session.delete(uga)
# Delete UserRoleAssociations EXCEPT FOR THE PRIVATE ROLE
for ura in user.roles:
if ura.role_id != private_role.id:
trans.sa_session.delete(ura)
# Delete UserAddresses
for address in user.addresses:
trans.sa_session.delete(address)
# Purge the user
user.purged = True
trans.sa_session.add(user)
trans.sa_session.flush()
message += "%s " % user.email
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='done'))
[docs] @web.expose
@web.require_admin
def users(self, trans, **kwd):
if 'operation' in kwd:
operation = kwd['operation'].lower()
if operation == "roles":
return self.user(trans, **kwd)
elif operation == "reset password":
return self.reset_user_password(trans, **kwd)
elif operation == "delete":
return self.mark_user_deleted(trans, **kwd)
elif operation == "undelete":
return self.undelete_user(trans, **kwd)
elif operation == "purge":
return self.purge_user(trans, **kwd)
elif operation == "create":
return self.create_new_user(trans, **kwd)
elif operation == "manage roles and groups":
return self.manage_roles_and_groups_for_user(trans, **kwd)
if trans.app.config.allow_user_deletion:
if self.delete_operation not in self.user_list_grid.operations:
self.user_list_grid.operations.append(self.delete_operation)
if self.undelete_operation not in self.user_list_grid.operations:
self.user_list_grid.operations.append(self.undelete_operation)
if self.purge_operation not in self.user_list_grid.operations:
self.user_list_grid.operations.append(self.purge_operation)
# Render the list view
return self.user_list_grid(trans, **kwd)
[docs] @web.expose
@web.require_admin
def name_autocomplete_data(self, trans, q=None, limit=None, timestamp=None):
"""Return autocomplete data for user emails"""
ac_data = ""
for user in trans.sa_session.query(trans.app.model.User).filter_by(deleted=False).filter(func.lower(trans.app.model.User.email).like(q.lower() + "%")):
ac_data = ac_data + user.email + "\n"
return ac_data
[docs] @web.expose
@web.require_admin
def manage_roles_and_groups_for_user(self, trans, **kwd):
user_id = kwd.get('id', None)
message = ''
status = ''
if not user_id:
message += "Invalid user id (%s) received" % str(user_id)
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='error'))
user = get_user(trans, user_id)
private_role = trans.app.security_agent.get_private_user_role(user)
if kwd.get('user_roles_groups_edit_button', False):
# Make sure the user is not dis-associating himself from his private role
out_roles = kwd.get('out_roles', [])
if out_roles:
out_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(out_roles)]
if private_role in out_roles:
message += "You cannot eliminate a user's private role association. "
status = 'error'
in_roles = kwd.get('in_roles', [])
if in_roles:
in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(in_roles)]
out_groups = kwd.get('out_groups', [])
if out_groups:
out_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(out_groups)]
in_groups = kwd.get('in_groups', [])
if in_groups:
in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(in_groups)]
if in_roles:
trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups)
trans.sa_session.refresh(user)
message += "User '%s' has been updated with %d associated roles and %d associated groups (private roles are not displayed)" % \
(user.email, len(in_roles), len(in_groups))
trans.response.send_redirect(web.url_for(controller='admin',
action='users',
message=util.sanitize_text(message),
status='done'))
in_roles = []
out_roles = []
in_groups = []
out_groups = []
for role in trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.deleted == false()) \
.order_by(trans.app.model.Role.table.c.name):
if role in [x.role for x in user.roles]:
in_roles.append((role.id, role.name))
elif role.type != trans.app.model.Role.types.PRIVATE:
# There is a 1 to 1 mapping between a user and a PRIVATE role, so private roles should
# not be listed in the roles form fields, except for the currently selected user's private
# role, which should always be in in_roles. The check above is added as an additional
# precaution, since for a period of time we were including private roles in the form fields.
out_roles.append((role.id, role.name))
for group in trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.deleted == false()) \
.order_by(trans.app.model.Group.table.c.name):
if group in [x.group for x in user.groups]:
in_groups.append((group.id, group.name))
else:
out_groups.append((group.id, group.name))
message += "User '%s' is currently associated with %d roles and is a member of %d groups" % \
(user.email, len(in_roles), len(in_groups))
if not status:
status = 'done'
return trans.fill_template('/webapps/tool_shed/admin/user/user.mako',
user=user,
in_roles=in_roles,
out_roles=out_roles,
in_groups=in_groups,
out_groups=out_groups,
message=message,
status=status)
# ---- Utility methods -------------------------------------------------------
[docs]def get_user(trans, user_id):
"""Get a User from the database by id."""
user = trans.sa_session.query(trans.model.User).get(trans.security.decode_id(user_id))
if not user:
return trans.show_error_message("User not found for id (%s)" % str(user_id))
return user
[docs]def get_role(trans, id):
"""Get a Role from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
role = trans.sa_session.query(trans.model.Role).get(id)
if not role:
return trans.show_error_message("Role not found for id (%s)" % str(id))
return role
[docs]def get_group(trans, id):
"""Get a Group from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
group = trans.sa_session.query(trans.model.Group).get(id)
if not group:
return trans.show_error_message("Group not found for id (%s)" % str(id))
return group