Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.admin_util
import logging
import time
from typing import Optional
from sqlalchemy import (
false,
func,
select,
)
from galaxy import (
util,
web,
)
from galaxy.model import (
Library,
LibraryDatasetDatasetAssociation,
)
from galaxy.model.base import transaction
from galaxy.security.validate_user_input import validate_password
from galaxy.util import inflector
from galaxy.util.hash_util import new_secure_hash_v2
from galaxy.web.form_builder import CheckboxField
from galaxy.web.legacy_framework.grids import (
Grid,
GridOperation,
)
from tool_shed.util.web_util import escape
log = logging.getLogger(__name__)
compliance_log = logging.getLogger("COMPLIANCE")
[docs]class Admin:
# Override these
user_list_grid: Optional[Grid] = None
role_list_grid: Optional[Grid] = None
group_list_grid: Optional[Grid] = None
delete_operation: Optional[GridOperation] = None
undelete_operation: Optional[GridOperation] = None
purge_operation: Optional[GridOperation] = None
[docs] @web.expose
@web.require_admin
def index(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
return trans.fill_template("/webapps/tool_shed/admin/index.mako", message=message, status=status)
[docs] @web.expose
@web.require_admin
def center(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
return trans.fill_template("/webapps/tool_shed/admin/center.mako", message=message, status=status)
[docs] @web.expose
@web.require_admin
def roles(self, trans, **kwargs):
if "operation" in kwargs:
operation = kwargs["operation"].lower().replace("+", " ")
if operation == "roles":
return self.role(trans, **kwargs)
if operation == "create":
return self.create_role(trans, **kwargs)
if operation == "delete":
return self.mark_role_deleted(trans, **kwargs)
if operation == "undelete":
return self.undelete_role(trans, **kwargs)
if operation == "purge":
return self.purge_role(trans, **kwargs)
if operation == "manage users and groups":
return self.manage_users_and_groups_for_role(trans, **kwargs)
if operation == "manage role associations":
# This is currently used only in the Tool Shed.
return self.manage_role_associations(trans, **kwargs)
if operation == "rename":
return self.rename_role(trans, **kwargs)
# Render the list view
return self.role_list_grid(trans, **kwargs)
[docs] @web.expose
@web.require_admin
def create_role(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get("message", ""))
status = params.get("status", "done")
name = util.restore_text(params.get("name", ""))
description = util.restore_text(params.get("description", ""))
in_users = util.listify(params.get("in_users", []))
out_users = util.listify(params.get("out_users", []))
in_groups = util.listify(params.get("in_groups", []))
out_groups = util.listify(params.get("out_groups", []))
create_group_for_role = params.get("create_group_for_role", "")
create_group_for_role_checked = CheckboxField.is_checked(create_group_for_role)
ok = True
if params.get("create_role_button", False):
if not name or not description:
message = "Enter a valid name and a description."
status = "error"
ok = False
elif get_role_id(trans.sa_session, trans.app.model.Role, name):
message = "Role names must be unique and a role with that name already exists, so choose another name."
status = "error"
ok = False
else:
# Create the role
role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN)
trans.sa_session.add(role)
# Create the UserRoleAssociations
for user in [trans.sa_session.get(trans.app.model.User, x) for x in in_users]:
ura = trans.app.model.UserRoleAssociation(user, role)
trans.sa_session.add(ura)
# Create the GroupRoleAssociations
for group in [trans.sa_session.get(trans.app.model.Group, x) for x in in_groups]:
gra = trans.app.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
if create_group_for_role_checked:
# Create the group
group = trans.app.model.Group(name=name)
trans.sa_session.add(group)
# Associate the group with the role
gra = trans.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
num_in_groups = len(in_groups) + 1
else:
num_in_groups = len(in_groups)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = "Role '%s' has been created with %d associated users and %d associated groups. " % (
role.name,
len(in_users),
num_in_groups,
)
if create_group_for_role_checked:
message += (
"One of the groups associated with this role is the newly created group with the same name."
)
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="done")
)
if ok:
for user in get_current_users(trans.sa_session, trans.app.model.User):
out_users.append((user.id, user.email))
for group in get_current_groups(trans.sa_session, trans.app.model.Group):
out_groups.append((group.id, group.name))
return trans.fill_template(
"/webapps/tool_shed/admin/dataset_security/role/role_create.mako",
name=name,
description=description,
in_users=in_users,
out_users=out_users,
in_groups=in_groups,
out_groups=out_groups,
create_group_for_role_checked=create_group_for_role_checked,
message=message,
status=status,
)
[docs] @web.expose
@web.require_admin
def rename_role(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get("message", ""))
status = params.get("status", "done")
id = params.get("id", None)
if not id:
message = "No role ids received for renaming"
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=message, status="error")
)
role = get_role(trans, id)
if params.get("rename_role_button", False):
old_name = role.name
new_name = util.restore_text(params.name)
new_description = util.restore_text(params.description)
if not new_name:
message = "Enter a valid name"
status = "error"
else:
if get_role_id(trans.sa_session, trans.app.model.Role, new_name) != role.id:
message = "A role with that name already exists"
status = "error"
else:
if not (role.name == new_name and role.description == new_description):
role.name = new_name
role.description = new_description
trans.sa_session.add(role)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = f"Role '{old_name}' has been renamed to '{new_name}'"
return trans.response.send_redirect(
web.url_for(
controller="admin", action="roles", message=util.sanitize_text(message), status="done"
)
)
return trans.fill_template(
"/webapps/tool_shed/admin/dataset_security/role/role_rename.mako", role=role, message=message, status=status
)
[docs] @web.expose
@web.require_admin
def manage_users_and_groups_for_role(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get("message", ""))
status = params.get("status", "done")
id = params.get("id", None)
if not id:
message = "No role ids received for managing users and groups"
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=message, status="error")
)
role = get_role(trans, id)
if params.get("role_members_edit_button", False):
in_users = [trans.sa_session.get(trans.app.model.User, x) for x in util.listify(params.in_users)]
if trans.webapp.name == "galaxy":
for ura in role.users:
user = trans.sa_session.get(trans.app.model.User, ura.user_id)
if user not in in_users:
# Delete DefaultUserPermissions for previously associated users that have been removed from the role
for dup in user.default_permissions:
if role == dup.role:
trans.sa_session.delete(dup)
# Delete DefaultHistoryPermissions for previously associated users that have been removed from the role
for history in user.histories:
for dhp in history.default_permissions:
if role == dhp.role:
trans.sa_session.delete(dhp)
with transaction(trans.sa_session):
trans.sa_session.commit()
in_groups = [trans.sa_session.get(trans.app.model.Group, x) for x in util.listify(params.in_groups)]
trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups)
trans.sa_session.refresh(role)
message = "Role '%s' has been updated with %d associated users and %d associated groups" % (
role.name,
len(in_users),
len(in_groups),
)
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status=status)
)
in_users = []
out_users = []
in_groups = []
out_groups = []
for user in get_current_users(trans.sa_session, trans.app.model.User):
if user in [x.user for x in role.users]:
in_users.append((user.id, user.email))
else:
out_users.append((user.id, user.email))
for group in get_current_groups(trans.sa_session, trans.app.model.Group):
if group in [x.group for x in role.groups]:
in_groups.append((group.id, group.name))
else:
out_groups.append((group.id, group.name))
library_dataset_actions = {}
if trans.webapp.name == "galaxy" and len(role.dataset_actions) < 25:
# Build a list of tuples that are LibraryDatasetDatasetAssociationss followed by a list of actions
# whose DatasetPermissions is associated with the Role
# [ ( LibraryDatasetDatasetAssociation [ action, action ] ) ]
for dp in role.dataset_actions:
for ldda in get_ldda_by_dataset(trans.sa_session, dp.dataset_id):
root_found = False
folder_path = ""
folder = ldda.library_dataset.folder
while not root_found:
folder_path = f"{folder.name} / {folder_path}"
if not folder.parent:
root_found = True
else:
folder = folder.parent
folder_path = f"{folder_path} {ldda.name}"
library = get_library_by_folder(trans.sa_session, folder.id)
if library not in library_dataset_actions:
library_dataset_actions[library] = {}
try:
library_dataset_actions[library][folder_path].append(dp.action)
except Exception:
library_dataset_actions[library][folder_path] = [dp.action]
else:
message = "Not showing associated datasets, there are too many."
status = "info"
return trans.fill_template(
"/webapps/tool_shed/admin/dataset_security/role/role.mako",
role=role,
in_users=in_users,
out_users=out_users,
in_groups=in_groups,
out_groups=out_groups,
library_dataset_actions=library_dataset_actions,
message=message,
status=status,
)
[docs] @web.expose
@web.require_admin
def mark_role_deleted(self, trans, **kwd):
id = kwd.get("id", None)
if not id:
message = "No role ids received for deleting"
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=message, status="error")
)
ids = util.listify(id)
message = "Deleted %d roles: " % len(ids)
for role_id in ids:
role = get_role(trans, role_id)
role.deleted = True
trans.sa_session.add(role)
with transaction(trans.sa_session):
trans.sa_session.commit()
message += f" {role.name} "
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def undelete_role(self, trans, **kwd):
id = kwd.get("id", None)
if not id:
message = "No role ids received for undeleting"
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=message, status="error")
)
ids = util.listify(id)
count = 0
undeleted_roles = ""
for role_id in ids:
role = get_role(trans, role_id)
if not role.deleted:
message = f"Role '{role.name}' has not been deleted, so it cannot be undeleted."
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="error")
)
role.deleted = False
trans.sa_session.add(role)
with transaction(trans.sa_session):
trans.sa_session.commit()
count += 1
undeleted_roles += f" {role.name}"
message = "Undeleted %d roles: %s" % (count, undeleted_roles)
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def purge_role(self, trans, **kwd):
# This method should only be called for a Role that has previously been deleted.
# Purging a deleted Role deletes all of the following from the database:
# - UserRoleAssociations where role_id == Role.id
# - DefaultUserPermissions where role_id == Role.id
# - DefaultHistoryPermissions where role_id == Role.id
# - GroupRoleAssociations where role_id == Role.id
# - DatasetPermissionss where role_id == Role.id
id = kwd.get("id", None)
if not id:
message = "No role ids received for purging"
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="error")
)
ids = util.listify(id)
message = "Purged %d roles: " % len(ids)
for role_id in ids:
role = get_role(trans, role_id)
if not role.deleted:
message = f"Role '{role.name}' has not been deleted, so it cannot be purged."
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="error")
)
# Delete UserRoleAssociations
for ura in role.users:
user = trans.sa_session.get(trans.app.model.User, ura.user_id)
# Delete DefaultUserPermissions for associated users
for dup in user.default_permissions:
if role == dup.role:
trans.sa_session.delete(dup)
# Delete DefaultHistoryPermissions for associated users
for history in user.histories:
for dhp in history.default_permissions:
if role == dhp.role:
trans.sa_session.delete(dhp)
trans.sa_session.delete(ura)
# Delete GroupRoleAssociations
for gra in role.groups:
trans.sa_session.delete(gra)
# Delete DatasetPermissionss
for dp in role.dataset_actions:
trans.sa_session.delete(dp)
with transaction(trans.sa_session):
trans.sa_session.commit()
message += f" {role.name} "
trans.response.send_redirect(
web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def groups(self, trans, **kwargs):
if "operation" in kwargs:
operation = kwargs["operation"].lower().replace("+", " ")
if operation == "groups":
return self.group(trans, **kwargs)
if operation == "create":
return self.create_group(trans, **kwargs)
if operation == "delete":
return self.mark_group_deleted(trans, **kwargs)
if operation == "undelete":
return self.undelete_group(trans, **kwargs)
if operation == "purge":
return self.purge_group(trans, **kwargs)
if operation == "manage users and roles":
return self.manage_users_and_roles_for_group(trans, **kwargs)
if operation == "rename":
return self.rename_group(trans, **kwargs)
# Render the list view
return self.group_list_grid(trans, **kwargs)
[docs] @web.expose
@web.require_admin
def rename_group(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get("message", ""))
status = params.get("status", "done")
id = params.get("id", None)
if not id:
message = "No group ids received for renaming"
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=message, status="error")
)
group = get_group(trans, id)
if params.get("rename_group_button", False):
old_name = group.name
new_name = util.restore_text(params.name)
if not new_name:
message = "Enter a valid name"
status = "error"
else:
if get_group_id(trans.sa_session, trans.app.model.Group, new_name) != group.id:
message = "A group with that name already exists"
status = "error"
else:
if group.name != new_name:
group.name = new_name
trans.sa_session.add(group)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = f"Group '{old_name}' has been renamed to '{new_name}'"
return trans.response.send_redirect(
web.url_for(
controller="admin", action="groups", message=util.sanitize_text(message), status="done"
)
)
return trans.fill_template(
"/webapps/tool_shed/admin/dataset_security/group/group_rename.mako",
group=group,
message=message,
status=status,
)
[docs] @web.expose
@web.require_admin
def manage_users_and_roles_for_group(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get("message", ""))
status = params.get("status", "done")
group = get_group(trans, params.id)
if params.get("group_roles_users_edit_button", False):
in_roles = [trans.sa_session.get(trans.app.model.Role, x) for x in util.listify(params.in_roles)]
in_users = [trans.sa_session.get(trans.app.model.User, x) for x in util.listify(params.in_users)]
trans.app.security_agent.set_entity_group_associations(groups=[group], roles=in_roles, users=in_users)
trans.sa_session.refresh(group)
message += "Group '%s' has been updated with %d associated roles and %d associated users" % (
group.name,
len(in_roles),
len(in_users),
)
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status=status)
)
in_roles = []
out_roles = []
in_users = []
out_users = []
for role in get_current_roles(trans.sa_session, trans.app.model.Role):
if role in [x.role for x in group.roles]:
in_roles.append((role.id, role.name))
else:
out_roles.append((role.id, role.name))
for user in get_current_users(trans.sa_session, trans.app.model.User):
if user in [x.user for x in group.users]:
in_users.append((user.id, user.email))
else:
out_users.append((user.id, user.email))
message += "Group %s is currently associated with %d roles and %d users" % (
group.name,
len(in_roles),
len(in_users),
)
return trans.fill_template(
"/webapps/tool_shed/admin/dataset_security/group/group.mako",
group=group,
in_roles=in_roles,
out_roles=out_roles,
in_users=in_users,
out_users=out_users,
message=message,
status=status,
)
[docs] @web.expose
@web.require_admin
def create_group(self, trans, **kwd):
params = util.Params(kwd)
message = util.restore_text(params.get("message", ""))
status = params.get("status", "done")
name = util.restore_text(params.get("name", ""))
in_users = util.listify(params.get("in_users", []))
out_users = util.listify(params.get("out_users", []))
in_roles = util.listify(params.get("in_roles", []))
out_roles = util.listify(params.get("out_roles", []))
create_role_for_group = params.get("create_role_for_group", "")
create_role_for_group_checked = CheckboxField.is_checked(create_role_for_group)
ok = True
if params.get("create_group_button", False):
if not name:
message = "Enter a valid name."
status = "error"
ok = False
elif get_group_id(trans.sa_session, trans.app.model.Group, name):
message = (
"Group names must be unique and a group with that name already exists, so choose another name."
)
status = "error"
ok = False
else:
# Create the group
group = trans.app.model.Group(name=name)
trans.sa_session.add(group)
with transaction(trans.sa_session):
trans.sa_session.commit()
# Create the UserRoleAssociations
for user in [trans.sa_session.get(trans.app.model.User, x) for x in in_users]:
uga = trans.app.model.UserGroupAssociation(user, group)
trans.sa_session.add(uga)
# Create the GroupRoleAssociations
for role in [trans.sa_session.get(trans.app.model.Role, x) for x in in_roles]:
gra = trans.app.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
if create_role_for_group_checked:
# Create the role
role = trans.app.model.Role(name=name, description=f"Role for group {name}")
trans.sa_session.add(role)
# Associate the role with the group
gra = trans.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
num_in_roles = len(in_roles) + 1
else:
num_in_roles = len(in_roles)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = "Group '%s' has been created with %d associated users and %d associated roles. " % (
group.name,
len(in_users),
num_in_roles,
)
if create_role_for_group_checked:
message += (
"One of the roles associated with this group is the newly created role with the same name."
)
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status="done")
)
if ok:
for user in get_current_users(trans.sa_session, trans.app.model.User):
out_users.append((user.id, user.email))
for role in get_current_roles(trans.sa_session, trans.app.model.Role):
out_roles.append((role.id, role.name))
return trans.fill_template(
"/webapps/tool_shed/admin/dataset_security/group/group_create.mako",
name=name,
in_users=in_users,
out_users=out_users,
in_roles=in_roles,
out_roles=out_roles,
create_role_for_group_checked=create_role_for_group_checked,
message=message,
status=status,
)
[docs] @web.expose
@web.require_admin
def mark_group_deleted(self, trans, **kwd):
params = util.Params(kwd)
id = params.get("id", None)
if not id:
message = "No group ids received for marking deleted"
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=message, status="error")
)
ids = util.listify(id)
message = "Deleted %d groups: " % len(ids)
for group_id in ids:
group = get_group(trans, group_id)
group.deleted = True
trans.sa_session.add(group)
with transaction(trans.sa_session):
trans.sa_session.commit()
message += f" {group.name} "
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def undelete_group(self, trans, **kwd):
id = kwd.get("id", None)
if not id:
message = "No group ids received for undeleting"
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=message, status="error")
)
ids = util.listify(id)
count = 0
undeleted_groups = ""
for group_id in ids:
group = get_group(trans, group_id)
if not group.deleted:
message = f"Group '{group.name}' has not been deleted, so it cannot be undeleted."
trans.response.send_redirect(
web.url_for(
controller="admin", action="groups", message=util.sanitize_text(message), status="error"
)
)
group.deleted = False
trans.sa_session.add(group)
with transaction(trans.sa_session):
trans.sa_session.commit()
count += 1
undeleted_groups += f" {group.name}"
message = "Undeleted %d groups: %s" % (count, undeleted_groups)
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def purge_group(self, trans, **kwd):
# This method should only be called for a Group that has previously been deleted.
# Purging a deleted Group simply deletes all UserGroupAssociations and GroupRoleAssociations.
id = kwd.get("id", None)
if not id:
message = "No group ids received for purging"
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status="error")
)
ids = util.listify(id)
message = "Purged %d groups: " % len(ids)
for group_id in ids:
group = get_group(trans, group_id)
if not group.deleted:
# We should never reach here, but just in case there is a bug somewhere...
message = f"Group '{group.name}' has not been deleted, so it cannot be purged."
trans.response.send_redirect(
web.url_for(
controller="admin", action="groups", message=util.sanitize_text(message), status="error"
)
)
# Delete UserGroupAssociations
for uga in group.users:
trans.sa_session.delete(uga)
# Delete GroupRoleAssociations
for gra in group.roles:
trans.sa_session.delete(gra)
with transaction(trans.sa_session):
trans.sa_session.commit()
message += f" {group.name} "
trans.response.send_redirect(
web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def create_new_user(self, trans, **kwd):
return trans.response.send_redirect(web.url_for(controller="user", action="create", cntrller="admin"))
[docs] @web.expose
@web.require_admin
def reset_user_password(self, trans, **kwd):
user_id = kwd.get("id", None)
if not user_id:
message = "No users received for resetting passwords."
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=message, status="error")
)
user_ids = util.listify(user_id)
if "reset_user_password_button" in kwd:
message = ""
status = ""
for user_id in user_ids:
user = get_user(trans, user_id)
password = kwd.get("password", None)
confirm = kwd.get("confirm", None)
message = validate_password(trans, password, confirm)
if message:
status = "error"
break
else:
user.set_password_cleartext(password)
trans.sa_session.add(user)
with transaction(trans.sa_session):
trans.sa_session.commit()
if not message and not status:
message = "Passwords reset for %d %s." % (len(user_ids), inflector.cond_plural(len(user_ids), "user"))
status = "done"
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status=status)
)
users = [get_user(trans, user_id) for user_id in user_ids]
if len(user_ids) > 1:
user_id = ",".join(user_ids)
return trans.fill_template(
"/webapps/tool_shed/admin/user/reset_password.mako", id=user_id, users=users, password="", confirm=""
)
[docs] @web.expose
@web.require_admin
def mark_user_deleted(self, trans, **kwd):
id = kwd.get("id", None)
if not id:
message = "No user ids received for deleting"
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=message, status="error")
)
ids = util.listify(id)
message = "Deleted %d users: " % len(ids)
for user_id in ids:
user = get_user(trans, user_id)
user.deleted = True
compliance_log.info(f"delete-user-event: {user_id}")
# See lib/galaxy/webapps/tool_shed/controllers/admin.py
pseudorandom_value = str(int(time.time()))
email_hash = new_secure_hash_v2(user.email + pseudorandom_value)
uname_hash = new_secure_hash_v2(user.username + pseudorandom_value)
for role in user.all_roles():
print(
role, self.app.config.redact_username_during_deletion, self.app.config.redact_email_during_deletion
)
if self.app.config.redact_username_during_deletion:
role.name = role.name.replace(user.username, uname_hash)
role.description = role.description.replace(user.username, uname_hash)
if self.app.config.redact_email_during_deletion:
role.name = role.name.replace(user.email, email_hash)
role.description = role.description.replace(user.email, email_hash)
if self.app.config.redact_email_during_deletion:
user.email = email_hash
if self.app.config.redact_username_during_deletion:
user.username = uname_hash
trans.sa_session.add(user)
with transaction(trans.sa_session):
trans.sa_session.commit()
message += f" {user.email} "
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def undelete_user(self, trans, **kwd):
id = kwd.get("id", None)
if not id:
message = "No user ids received for undeleting"
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=message, status="error")
)
ids = util.listify(id)
count = 0
undeleted_users = ""
for user_id in ids:
user = get_user(trans, user_id)
if not user.deleted:
message = f"User '{user.email}' has not been deleted, so it cannot be undeleted."
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="error")
)
user.deleted = False
trans.sa_session.add(user)
with transaction(trans.sa_session):
trans.sa_session.commit()
count += 1
undeleted_users += f" {user.email}"
message = "Undeleted %d users: %s" % (count, undeleted_users)
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def purge_user(self, trans, **kwd):
# This method should only be called for a User that has previously been deleted.
# We keep the User in the database ( marked as purged ), and stuff associated
# with the user's private role in case we want the ability to unpurge the user
# some time in the future.
# Purging a deleted User deletes all of the following:
# - History where user_id = User.id
# - HistoryDatasetAssociation where history_id = History.id
# - Dataset where HistoryDatasetAssociation.dataset_id = Dataset.id
# - UserGroupAssociation where user_id == User.id
# - UserRoleAssociation where user_id == User.id EXCEPT FOR THE PRIVATE ROLE
# - UserAddress where user_id == User.id
# Purging Histories and Datasets must be handled via the cleanup_datasets.py script
id = kwd.get("id", None)
if not id:
message = "No user ids received for purging"
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="error")
)
ids = util.listify(id)
message = "Purged %d users: " % len(ids)
for user_id in ids:
user = get_user(trans, user_id)
if not user.deleted:
# We should never reach here, but just in case there is a bug somewhere...
message = f"User '{user.email}' has not been deleted, so it cannot be purged."
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="error")
)
private_role = trans.app.security_agent.get_private_user_role(user)
# Delete History
for h in user.active_histories:
trans.sa_session.refresh(h)
for hda in h.active_datasets:
# Delete HistoryDatasetAssociation
d = trans.sa_session.get(trans.app.model.Dataset, hda.dataset_id)
# Delete Dataset
if not d.deleted:
d.deleted = True
trans.sa_session.add(d)
hda.deleted = True
trans.sa_session.add(hda)
h.deleted = True
trans.sa_session.add(h)
# Delete UserGroupAssociations
for uga in user.groups:
trans.sa_session.delete(uga)
# Delete UserRoleAssociations EXCEPT FOR THE PRIVATE ROLE
for ura in user.roles:
if ura.role_id != private_role.id:
trans.sa_session.delete(ura)
# Delete UserAddresses
for address in user.addresses:
trans.sa_session.delete(address)
# Purge the user
user.purged = True
trans.sa_session.add(user)
with transaction(trans.sa_session):
trans.sa_session.commit()
message += f"{user.email} "
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="done")
)
[docs] @web.expose
@web.require_admin
def users(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "roles":
return self.user(trans, **kwd)
elif operation == "reset password":
return self.reset_user_password(trans, **kwd)
elif operation == "delete":
return self.mark_user_deleted(trans, **kwd)
elif operation == "undelete":
return self.undelete_user(trans, **kwd)
elif operation == "purge":
return self.purge_user(trans, **kwd)
elif operation == "create":
return self.create_new_user(trans, **kwd)
elif operation == "manage roles and groups":
return self.manage_roles_and_groups_for_user(trans, **kwd)
if trans.app.config.allow_user_deletion:
if self.delete_operation not in self.user_list_grid.operations:
self.user_list_grid.operations.append(self.delete_operation)
if self.undelete_operation not in self.user_list_grid.operations:
self.user_list_grid.operations.append(self.undelete_operation)
if self.purge_operation not in self.user_list_grid.operations:
self.user_list_grid.operations.append(self.purge_operation)
# Render the list view
return self.user_list_grid(trans, **kwd)
[docs] @web.expose
@web.require_admin
def name_autocomplete_data(self, trans, q=None, limit=None, timestamp=None):
"""Return autocomplete data for user emails"""
emails = get_user_emails_by_prefix(trans.sa_session, trans.app.model.User, q)
return "\n".join(emails)
[docs] @web.expose
@web.require_admin
def manage_roles_and_groups_for_user(self, trans, **kwd):
user_id = kwd.get("id", None)
message = ""
status = ""
if not user_id:
message += f"Invalid user id ({str(user_id)}) received"
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="error")
)
user = get_user(trans, user_id)
private_role = trans.app.security_agent.get_private_user_role(user)
if kwd.get("user_roles_groups_edit_button", False):
# Make sure the user is not dis-associating himself from his private role
out_roles = kwd.get("out_roles", [])
if out_roles:
out_roles = [trans.sa_session.get(trans.app.model.Role, x) for x in util.listify(out_roles)]
if private_role in out_roles:
message += "You cannot eliminate a user's private role association. "
status = "error"
in_roles = kwd.get("in_roles", [])
if in_roles:
in_roles = [trans.sa_session.get(trans.app.model.Role, x) for x in util.listify(in_roles)]
out_groups = kwd.get("out_groups", [])
if out_groups:
out_groups = [trans.sa_session.get(trans.app.model.Group, x) for x in util.listify(out_groups)]
in_groups = kwd.get("in_groups", [])
if in_groups:
in_groups = [trans.sa_session.get(trans.app.model.Group, x) for x in util.listify(in_groups)]
if in_roles:
trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups)
trans.sa_session.refresh(user)
message += (
"User '%s' has been updated with %d associated roles and %d associated groups (private roles are not displayed)"
% (user.email, len(in_roles), len(in_groups))
)
trans.response.send_redirect(
web.url_for(controller="admin", action="users", message=util.sanitize_text(message), status="done")
)
in_roles = []
out_roles = []
in_groups = []
out_groups = []
for role in get_current_roles(trans.sa_session, trans.app.model.Role):
if role in [x.role for x in user.roles]:
in_roles.append((role.id, role.name))
elif role.type != trans.app.model.Role.types.PRIVATE:
# There is a 1 to 1 mapping between a user and a PRIVATE role, so private roles should
# not be listed in the roles form fields, except for the currently selected user's private
# role, which should always be in in_roles. The check above is added as an additional
# precaution, since for a period of time we were including private roles in the form fields.
out_roles.append((role.id, role.name))
for group in get_current_groups(trans.sa_session, trans.app.model.Group):
if group in [x.group for x in user.groups]:
in_groups.append((group.id, group.name))
else:
out_groups.append((group.id, group.name))
message += "User '%s' is currently associated with %d roles and is a member of %d groups" % (
user.email,
len(in_roles),
len(in_groups),
)
if not status:
status = "done"
return trans.fill_template(
"/webapps/tool_shed/admin/user/user.mako",
user=user,
in_roles=in_roles,
out_roles=out_roles,
in_groups=in_groups,
out_groups=out_groups,
message=message,
status=status,
)
# ---- Utility methods -------------------------------------------------------
[docs]def get_user(trans, user_id):
"""Get a User from the database by id."""
user = trans.sa_session.get(trans.model.User, trans.security.decode_id(user_id))
if not user:
return trans.show_error_message(f"User not found for id ({str(user_id)})")
return user
[docs]def get_role(trans, id):
"""Get a Role from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
role = trans.sa_session.get(trans.model.Role, id)
if not role:
return trans.show_error_message(f"Role not found for id ({str(id)})")
return role
[docs]def get_group(trans, id):
"""Get a Group from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
group = trans.sa_session.get(trans.model.Group, id)
if not group:
return trans.show_error_message(f"Group not found for id ({str(id)})")
return group
[docs]def get_role_id(session, role_model, name):
stmt = select(role_model.id).where(role_model.name == name).limit(1)
return session.scalars(stmt).first()
[docs]def get_group_id(session, group_model, name):
stmt = select(group_model.id).where(group_model.name == name).limit(1)
return session.scalars(stmt).first()
[docs]def get_current_users(session, user_model):
stmt = select(user_model).where(user_model.deleted == false()).order_by(user_model.email)
return session.scalars(stmt)
[docs]def get_current_groups(session, group_model):
stmt = select(group_model).where(group_model.deleted == false()).order_by(group_model.name)
return session.scalars(stmt)
[docs]def get_current_roles(session, role_model):
stmt = select(role_model).where(role_model.deleted == false()).order_by(role_model.name)
return session.scalars(stmt)
[docs]def get_ldda_by_dataset(session, dataset_id):
stmt = select(LibraryDatasetDatasetAssociation).where(LibraryDatasetDatasetAssociation.dataset_id == dataset_id)
return session.scalars(stmt)
[docs]def get_library_by_folder(session, folder_id):
stmt = select(Library).where(Library.folder_id == folder_id).limit(1)
return session.scalars(stmt).first()
[docs]def get_user_emails_by_prefix(session, user_model, prefix):
stmt = (
select(user_model.email)
.where(user_model.deleted == false())
.where(func.lower(user_model.email).like(f"{prefix.lower()}%"))
)
return session.scalars(stmt)