import logging
from typing import Set
from sqlalchemy import (
false,
func,
true,
)
from typing_extensions import TypedDict
from galaxy import (
model,
util,
web,
)
from galaxy.exceptions import ActionInputError
from galaxy.managers.quotas import QuotaManager
from galaxy.model.base import transaction
from galaxy.model.index_filter_util import (
raw_text_column_filter,
text_column_filter,
)
from galaxy.security.validate_user_input import validate_password
from galaxy.structured_app import StructuredApp
from galaxy.util.search import (
FilteredTerm,
parse_filters_structured,
RawTextTerm,
)
from galaxy.web import url_for
from galaxy.web.framework.helpers import (
grids,
time_ago,
)
from galaxy.webapps.base import controller
log = logging.getLogger(__name__)
[docs]class UserListGrid(grids.GridData):
[docs] class StatusColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, user):
if user.purged:
return "Purged"
elif user.deleted:
return "Deleted"
return "Available"
[docs] class GroupsColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, user):
if user.groups:
return len(user.groups)
return 0
[docs] class RolesColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, user):
if user.roles:
return len(user.roles)
return 0
[docs] class LastLoginColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, user):
if user.galaxy_sessions:
return self.format(user.current_galaxy_session.update_time)
return "never"
[docs] def sort(self, trans, query, ascending, column_name=None):
last_login_subquery = (
trans.sa_session.query(
model.GalaxySession.table.c.user_id,
func.max(model.GalaxySession.table.c.update_time).label("last_login"),
)
.group_by(model.GalaxySession.table.c.user_id)
.subquery()
)
query = query.outerjoin(last_login_subquery, model.User.table.c.id == last_login_subquery.c.user_id)
if not ascending:
query = query.order_by((last_login_subquery.c.last_login).desc().nullslast())
else:
query = query.order_by((last_login_subquery.c.last_login).asc().nullsfirst())
return query
[docs] class DiskUsageColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, user):
return user.get_disk_usage(nice_size=True)
[docs] def sort(self, trans, query, ascending, column_name=None):
if column_name is None:
column_name = self.key
column = self.model_class.table.c.get(column_name)
if column is None:
column = getattr(self.model_class, column_name)
if ascending:
query = query.order_by(func.coalesce(column, 0).asc())
else:
query = query.order_by(func.coalesce(column, 0).desc())
return query
# Grid definition
title = "Users"
title_id = "users-grid"
model_class = model.User
default_sort_key = "email"
columns = [
grids.GridColumn("Email", key="email"),
grids.GridColumn("User Name", key="username"),
LastLoginColumn("Last Login", key="last_login", format=time_ago),
DiskUsageColumn("Disk Usage", key="disk_usage"),
StatusColumn("Status", key="status"),
grids.GridColumn("Created", key="create_time"),
grids.GridColumn("Activated", key="active", escape=False),
GroupsColumn("Groups", key="groups"),
RolesColumn("Roles", key="roles"),
grids.GridColumn("External", key="external", escape=False),
grids.GridColumn("Deleted", key="deleted", escape=False),
grids.GridColumn("Purged", key="purged", escape=False),
]
[docs] def apply_query_filter(self, query, **kwargs):
INDEX_SEARCH_FILTERS = {
"email": "email",
"username": "username",
"is": "is",
}
deleted = False
purged = False
if search_query := kwargs.get("search"):
parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
q = term.text
if key == "email":
query = query.filter(text_column_filter(self.model_class.email, term))
elif key == "username":
query = query.filter(text_column_filter(self.model_class.username, term))
elif key == "is":
if q == "deleted":
deleted = True
elif q == "purged":
purged = True
elif isinstance(term, RawTextTerm):
query = query.filter(
raw_text_column_filter(
[
self.model_class.email,
self.model_class.username,
],
term,
)
)
if purged:
query = query.filter(self.model_class.purged == true())
else:
query = query.filter(self.model_class.deleted == (true() if deleted else false()))
return query
[docs]class RoleListGrid(grids.GridData):
[docs] class GroupsColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, role):
if role.groups:
return len(role.groups)
return 0
[docs] class UsersColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, role):
if role.users:
return len(role.users)
return 0
# Grid definition
title = "Roles"
title_id = "roles-grid"
model_class = model.Role
default_sort_key = "name"
columns = [
grids.GridColumn("Name", key="name"),
grids.GridColumn("Description", key="description"),
grids.GridColumn("Type", key="type"),
GroupsColumn("Groups", key="groups"),
UsersColumn("Users", key="users"),
grids.GridColumn("Deleted", key="deleted", escape=False),
grids.GridColumn("Purged", key="purged", escape=False),
grids.GridColumn("Last Updated", key="update_time"),
]
[docs] def apply_query_filter(self, query, **kwargs):
INDEX_SEARCH_FILTERS = {
"description": "description",
"name": "name",
"is": "is",
}
deleted = False
query = query.filter(self.model_class.type != self.model_class.types.PRIVATE)
if search_query := kwargs.get("search"):
parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
q = term.text
if key == "name":
query = query.filter(text_column_filter(self.model_class.name, term))
if key == "description":
query = query.filter(text_column_filter(self.model_class.description, term))
elif key == "is":
if q == "deleted":
deleted = True
elif isinstance(term, RawTextTerm):
query = query.filter(
raw_text_column_filter(
[
self.model_class.description,
self.model_class.name,
],
term,
)
)
query = query.filter(self.model_class.deleted == (true() if deleted else false()))
return query
[docs]class GroupListGrid(grids.GridData):
[docs] class RolesColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, group):
if group.roles:
return len(group.roles)
return 0
[docs] class UsersColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, group):
if group.users:
return len(group.users)
return 0
# Grid definition
title = "Groups"
title_id = "groups-grid"
model_class = model.Group
default_sort_key = "name"
columns = [
grids.GridColumn("Name", key="name"),
UsersColumn("Users", key="users"),
RolesColumn("Roles", key="roles"),
grids.GridColumn("Deleted", key="deleted", escape=False),
grids.GridColumn("Last Updated", key="update_time"),
]
[docs] def apply_query_filter(self, query, **kwargs):
INDEX_SEARCH_FILTERS = {
"name": "name",
"is": "is",
}
deleted = False
if search_query := kwargs.get("search"):
parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
q = term.text
if key == "name":
query = query.filter(text_column_filter(self.model_class.name, term))
elif key == "is":
if q == "deleted":
deleted = True
elif isinstance(term, RawTextTerm):
query = query.filter(
raw_text_column_filter(
[
self.model_class.name,
],
term,
)
)
query = query.filter(self.model_class.deleted == (true() if deleted else false()))
return query
[docs]class QuotaListGrid(grids.GridData):
[docs] class AmountColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, quota):
return quota.operation + quota.display_amount
[docs] class DefaultTypeColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, quota):
if quota.default:
return quota.default[0].type
return None
[docs] class UsersColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, quota):
if quota.users:
return len(quota.users)
return 0
[docs] class GroupsColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, quota):
if quota.groups:
return len(quota.groups)
return 0
# Grid definition
title = "Quotas"
model_class = model.Quota
default_sort_key = "name"
columns = [
grids.GridColumn("Name", key="name"),
grids.GridColumn("Description", key="description"),
AmountColumn("Amount", key="amount", model_class=model.Quota),
UsersColumn("Users", key="users"),
GroupsColumn("Groups", key="groups"),
grids.GridColumn("Source", key="quota_source_label", escape=False),
DefaultTypeColumn("Type", key="default_type"),
grids.GridColumn("Deleted", key="deleted", escape=False),
grids.GridColumn("Updated", key="update_time"),
]
[docs] def apply_query_filter(self, query, **kwargs):
INDEX_SEARCH_FILTERS = {
"name": "name",
"description": "description",
"is": "is",
}
deleted = False
if search_query := kwargs.get("search"):
parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
q = term.text
if key == "name":
query = query.filter(text_column_filter(self.model_class.name, term))
if key == "description":
query = query.filter(text_column_filter(self.model_class.description, term))
elif key == "is":
if q == "deleted":
deleted = True
elif isinstance(term, RawTextTerm):
query = query.filter(
raw_text_column_filter(
[
self.model_class.name,
self.model_class.description,
],
term,
)
)
query = query.filter(self.model_class.deleted == (true() if deleted else false()))
return query
# TODO: Convert admin UI to use the API and drop this.
[docs]class DatatypesEntryT(TypedDict):
status: str
keys: list
data: list
message: str
[docs]class AdminGalaxy(controller.JSAppLauncher):
user_list_grid = UserListGrid()
role_list_grid = RoleListGrid()
group_list_grid = GroupListGrid()
quota_list_grid = QuotaListGrid()
[docs] def __init__(self, app: StructuredApp):
super().__init__(app)
self.quota_manager: QuotaManager = QuotaManager(app)
[docs] @web.expose
@web.json
@web.require_admin
def data_tables_list(self, trans, **kwd):
data = []
message = kwd.get("message", "")
status = kwd.get("status", "done")
sorted_data_tables = sorted(trans.app.tool_data_tables.get_tables().items())
for _data_table_elem_name, data_table in sorted_data_tables:
for filename, file_dict in data_table.filenames.items():
file_missing = ["file missing"] if not file_dict.get("found") else []
data.append(
{
"name": data_table.name,
"filename": filename,
"tool_data_path": file_dict.get("tool_data_path"),
"errors": ", ".join(file_missing + list(file_dict.get("errors", []))),
}
)
return {"data": data, "message": message, "status": status}
[docs] @web.expose
@web.json
@web.require_admin
def data_types_list(self, trans, **kwd) -> DatatypesEntryT:
datatypes = []
keys: Set[str] = set()
message = kwd.get("message", "")
status = kwd.get("status", "done")
for dtype in sorted(trans.app.datatypes_registry.datatype_elems, key=lambda dt: dt.get("extension")):
attrib = dict(dtype.attrib)
datatypes.append(attrib)
keys |= set(attrib.keys())
return {"keys": list(keys), "data": datatypes, "message": message, "status": status}
[docs] @web.expose
@web.json
@web.require_admin
def users_list(self, trans, **kwd):
return self.user_list_grid(trans, **kwd)
[docs] @web.legacy_expose_api
@web.require_admin
def quotas_list(self, trans, payload=None, **kwargs):
return self.quota_list_grid(trans, **kwargs)
[docs] @web.legacy_expose_api
@web.require_admin
def create_quota(self, trans, payload=None, **kwd):
if trans.request.method == "GET":
all_users = []
all_groups = []
labels = trans.app.object_store.get_quota_source_map().get_quota_source_labels()
label_options = [("Default Quota", "__default__")]
label_options.extend([(label, label) for label in labels])
for user in (
trans.sa_session.query(trans.app.model.User)
.filter(trans.app.model.User.table.c.deleted == false())
.order_by(trans.app.model.User.table.c.email)
):
all_users.append((user.email, trans.security.encode_id(user.id)))
for group in (
trans.sa_session.query(trans.app.model.Group)
.filter(trans.app.model.Group.deleted == false())
.order_by(trans.app.model.Group.name)
):
all_groups.append((group.name, trans.security.encode_id(group.id)))
default_options = [("No", "no")]
for type_ in trans.app.model.DefaultQuotaAssociation.types:
default_options.append((f"Yes, {type_}", type_))
rval = {
"title": "Create Quota",
"inputs": [
{"name": "name", "label": "Name"},
{"name": "description", "label": "Description"},
{"name": "amount", "label": "Amount", "help": 'Examples: "10000MB", "99 gb", "0.2T", "unlimited"'},
{
"name": "operation",
"label": "Assign, increase by amount, or decrease by amount?",
"options": [("=", "="), ("+", "+"), ("-", "-")],
},
{
"name": "default",
"label": "Is this quota a default for a class of users (if yes, what type)?",
"options": default_options,
"help": "Warning: Any users or groups associated with this quota will be disassociated.",
},
],
}
if len(label_options) > 1:
rval["inputs"].append(
{
"name": "quota_source_label",
"label": "Apply quota to labeled object stores.",
"options": label_options,
}
)
rval["inputs"].extend(
[
build_select_input("in_groups", "Groups", all_groups, []),
build_select_input("in_users", "Users", all_users, []),
]
)
return rval
else:
try:
quota_source_label = payload.get("quota_source_label")
if quota_source_label == "__default__":
payload["quota_source_label"] = None
quota, message = self.quota_manager.create_quota(payload, decode_id=trans.security.decode_id)
return {"message": message}
except ActionInputError as e:
return self.message_exception(trans, e.err_msg)
[docs] @web.legacy_expose_api
@web.require_admin
def rename_quota(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No quota id received for renaming.")
quota = get_quota(trans, id)
if trans.request.method == "GET":
return {
"title": f"Change quota name and description for '{quota.name}'",
"inputs": [
{"name": "name", "label": "Name", "value": quota.name},
{"name": "description", "label": "Description", "value": quota.description},
],
}
else:
try:
return {"message": self.quota_manager.rename_quota(quota, util.Params(payload))}
except ActionInputError as e:
return self.message_exception(trans, e.err_msg)
[docs] @web.legacy_expose_api
@web.require_admin
def manage_users_and_groups_for_quota(self, trans, payload=None, **kwd):
quota_id = kwd.get("id")
if not quota_id:
return self.message_exception(trans, f"Invalid quota id ({str(quota_id)}) received")
quota = get_quota(trans, quota_id)
if trans.request.method == "GET":
in_users = []
all_users = []
in_groups = []
all_groups = []
for user in (
trans.sa_session.query(trans.app.model.User)
.filter(trans.app.model.User.table.c.deleted == false())
.order_by(trans.app.model.User.table.c.email)
):
if user in [x.user for x in quota.users]:
in_users.append(trans.security.encode_id(user.id))
all_users.append((user.email, trans.security.encode_id(user.id)))
for group in (
trans.sa_session.query(trans.app.model.Group)
.filter(trans.app.model.Group.deleted == false())
.order_by(trans.app.model.Group.name)
):
if group in [x.group for x in quota.groups]:
in_groups.append(trans.security.encode_id(group.id))
all_groups.append((group.name, trans.security.encode_id(group.id)))
return {
"title": f"Quota '{quota.name}'",
"message": "Quota '%s' is currently associated with %d user(s) and %d group(s)."
% (quota.name, len(in_users), len(in_groups)),
"status": "info",
"inputs": [
build_select_input("in_groups", "Groups", all_groups, in_groups),
build_select_input("in_users", "Users", all_users, in_users),
],
}
else:
try:
return {
"message": self.quota_manager.manage_users_and_groups_for_quota(
quota, util.Params(payload), decode_id=trans.security.decode_id
)
}
except ActionInputError as e:
return self.message_exception(trans, e.err_msg)
[docs] @web.legacy_expose_api
@web.require_admin
def edit_quota(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No quota id received for renaming.")
quota = get_quota(trans, id)
if trans.request.method == "GET":
return {
"title": f"Edit quota size for '{quota.name}'",
"inputs": [
{
"name": "amount",
"label": "Amount",
"value": quota.display_amount,
"help": 'Examples: "10000MB", "99 gb", "0.2T", "unlimited"',
},
{
"name": "operation",
"label": "Assign, increase by amount, or decrease by amount?",
"options": [("=", "="), ("+", "+"), ("-", "-")],
"value": quota.operation,
},
],
}
else:
try:
return {"message": self.quota_manager.edit_quota(quota, util.Params(payload))}
except ActionInputError as e:
return self.message_exception(trans, e.err_msg)
[docs] @web.legacy_expose_api
@web.require_admin
def set_quota_default(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No quota id received for renaming.")
quota = get_quota(trans, id)
if trans.request.method == "GET":
default_value = quota.default[0].type if quota.default else "no"
default_options = [("No", "no")]
for typ in trans.app.model.DefaultQuotaAssociation.types.__members__.values():
default_options.append((f"Yes, {typ}", typ))
return {
"title": f"Set quota default for '{quota.name}'",
"inputs": [
{
"name": "default",
"label": "Is this quota a default for a class of users (if yes, what type)?",
"options": default_options,
"value": default_value,
"help": "Warning: Any users or groups associated with this quota will be disassociated.",
}
],
}
else:
try:
return {"message": self.quota_manager.set_quota_default(quota, util.Params(payload))}
except ActionInputError as e:
return self.message_exception(trans, e.err_msg)
[docs] @web.expose
@web.require_admin
def impersonate(self, trans, **kwd):
if not trans.app.config.allow_user_impersonation:
return trans.show_error_message("User impersonation is not enabled in this instance of Galaxy.")
user = None
if (user_id := kwd.get("id", None)) is not None:
try:
user = trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(user_id))
if user:
trans.handle_user_logout()
trans.handle_user_login(user)
return trans.show_message(
f"You are now logged in as {user.email}, <a target=\"_top\" href=\"{url_for(controller='root')}\">return to the home page</a>",
use_panels=True,
)
except Exception:
log.exception("Error fetching user for impersonation")
return trans.response.send_redirect(
web.url_for(controller="admin", action="users", message="Invalid user selected", status="error")
)
[docs] @web.expose
@web.json
@web.require_admin
def roles_list(self, trans, **kwargs):
return self.role_list_grid(trans, **kwargs)
[docs] @web.legacy_expose_api
@web.require_admin
def create_role(self, trans, payload=None, **kwd):
if trans.request.method == "GET":
all_users = []
all_groups = []
for user in (
trans.sa_session.query(trans.app.model.User)
.filter(trans.app.model.User.table.c.deleted == false())
.order_by(trans.app.model.User.table.c.email)
):
all_users.append((user.email, trans.security.encode_id(user.id)))
for group in (
trans.sa_session.query(trans.app.model.Group)
.filter(trans.app.model.Group.deleted == false())
.order_by(trans.app.model.Group.name)
):
all_groups.append((group.name, trans.security.encode_id(group.id)))
return {
"title": "Create Role",
"inputs": [
{"name": "name", "label": "Name"},
{"name": "description", "label": "Description"},
build_select_input("in_groups", "Groups", all_groups, []),
build_select_input("in_users", "Users", all_users, []),
{
"name": "auto_create",
"label": "Create a new group of the same name for this role:",
"type": "boolean",
"optional": True,
},
],
}
else:
name = util.restore_text(payload.get("name", ""))
description = util.restore_text(payload.get("description", ""))
auto_create_checked = payload.get("auto_create")
in_users = [
trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_users"))
]
in_groups = [
trans.sa_session.query(trans.app.model.Group).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_groups"))
]
if not name or not description:
return self.message_exception(trans, "Enter a valid name and a description.")
elif trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.name == name).first():
return self.message_exception(
trans, "Role names must be unique and a role with that name already exists, so choose another name."
)
elif None in in_users or None in in_groups:
return self.message_exception(trans, "One or more invalid user/group id has been provided.")
else:
# Create the role
role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN)
trans.sa_session.add(role)
# Create the UserRoleAssociations
for user in in_users:
ura = trans.app.model.UserRoleAssociation(user, role)
trans.sa_session.add(ura)
# Create the GroupRoleAssociations
for group in in_groups:
gra = trans.app.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
if auto_create_checked:
# Check if role with same name already exists
if trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.name == name).first():
return self.message_exception(
trans,
"A group with that name already exists, so choose another name or disable group creation.",
)
# Create the group
group = trans.app.model.Group(name=name)
trans.sa_session.add(group)
# Associate the group with the role
gra = trans.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
num_in_groups = len(in_groups) + 1
else:
num_in_groups = len(in_groups)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = f"Role '{role.name}' has been created with {len(in_users)} associated users and {num_in_groups} associated groups."
if auto_create_checked:
message += (
"One of the groups associated with this role is the newly created group with the same name."
)
return {"message": message}
[docs] @web.legacy_expose_api
@web.require_admin
def rename_role(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No role id received for renaming.")
role = get_role(trans, id)
if trans.request.method == "GET":
return {
"title": f"Change role name and description for '{role.name}'",
"inputs": [
{"name": "name", "label": "Name", "value": role.name},
{"name": "description", "label": "Description", "value": role.description},
],
}
else:
old_name = role.name
new_name = util.restore_text(payload.get("name"))
new_description = util.restore_text(payload.get("description"))
if not new_name:
return self.message_exception(trans, "Enter a valid role name.")
else:
existing_role = (
trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.name == new_name).first()
)
if existing_role and existing_role.id != role.id:
return self.message_exception(trans, "A role with that name already exists.")
else:
if not (role.name == new_name and role.description == new_description):
role.name = new_name
role.description = new_description
trans.sa_session.add(role)
with transaction(trans.sa_session):
trans.sa_session.commit()
return {"message": f"Role '{old_name}' has been renamed to '{new_name}'."}
[docs] @web.legacy_expose_api
@web.require_admin
def manage_users_and_groups_for_role(self, trans, payload=None, **kwd):
role_id = kwd.get("id")
if not role_id:
return self.message_exception(trans, f"Invalid role id ({str(role_id)}) received")
role = get_role(trans, role_id)
if trans.request.method == "GET":
in_users = []
all_users = []
in_groups = []
all_groups = []
for user in (
trans.sa_session.query(trans.app.model.User)
.filter(trans.app.model.User.table.c.deleted == false())
.order_by(trans.app.model.User.table.c.email)
):
if user in [x.user for x in role.users]:
in_users.append(trans.security.encode_id(user.id))
all_users.append((user.email, trans.security.encode_id(user.id)))
for group in (
trans.sa_session.query(trans.app.model.Group)
.filter(trans.app.model.Group.deleted == false())
.order_by(trans.app.model.Group.name)
):
if group in [x.group for x in role.groups]:
in_groups.append(trans.security.encode_id(group.id))
all_groups.append((group.name, trans.security.encode_id(group.id)))
return {
"title": f"Role '{role.name}'",
"message": "Role '%s' is currently associated with %d user(s) and %d group(s)."
% (role.name, len(in_users), len(in_groups)),
"status": "info",
"inputs": [
build_select_input("in_groups", "Groups", all_groups, in_groups),
build_select_input("in_users", "Users", all_users, in_users),
],
}
else:
in_users = [
trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_users"))
]
in_groups = [
trans.sa_session.query(trans.app.model.Group).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_groups"))
]
if None in in_users or None in in_groups:
return self.message_exception(trans, "One or more invalid user/group id has been provided.")
for ura in role.users:
user = trans.sa_session.query(trans.app.model.User).get(ura.user_id)
if user not in in_users:
# Delete DefaultUserPermissions for previously associated users that have been removed from the role
for dup in user.default_permissions:
if role == dup.role:
trans.sa_session.delete(dup)
# Delete DefaultHistoryPermissions for previously associated users that have been removed from the role
for history in user.histories:
for dhp in history.default_permissions:
if role == dhp.role:
trans.sa_session.delete(dhp)
with transaction(trans.sa_session):
trans.sa_session.commit()
trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups)
trans.sa_session.refresh(role)
return {
"message": f"Role '{role.name}' has been updated with {len(in_users)} associated users and {len(in_groups)} associated groups."
}
[docs] @web.legacy_expose_api
@web.require_admin
def groups_list(self, trans, **kwargs):
return self.group_list_grid(trans, **kwargs)
[docs] @web.legacy_expose_api
@web.require_admin
def rename_group(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No group id received for renaming.")
group = get_group(trans, id)
if trans.request.method == "GET":
return {
"title": f"Change group name for '{group.name}'",
"inputs": [{"name": "name", "label": "Name", "value": group.name}],
}
else:
old_name = group.name
new_name = util.restore_text(payload.get("name"))
if not new_name:
return self.message_exception(trans, "Enter a valid group name.")
else:
existing_group = (
trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.name == new_name).first()
)
if existing_group and existing_group.id != group.id:
return self.message_exception(trans, "A group with that name already exists.")
else:
if not (group.name == new_name):
group.name = new_name
trans.sa_session.add(group)
with transaction(trans.sa_session):
trans.sa_session.commit()
return {"message": f"Group '{old_name}' has been renamed to '{new_name}'."}
[docs] @web.legacy_expose_api
@web.require_admin
def manage_users_and_roles_for_group(self, trans, payload=None, **kwd):
group_id = kwd.get("id")
if not group_id:
return self.message_exception(trans, f"Invalid group id ({str(group_id)}) received")
group = get_group(trans, group_id)
if trans.request.method == "GET":
in_users = []
all_users = []
in_roles = []
all_roles = []
for user in (
trans.sa_session.query(trans.app.model.User)
.filter(trans.app.model.User.table.c.deleted == false())
.order_by(trans.app.model.User.table.c.email)
):
if user in [x.user for x in group.users]:
in_users.append(trans.security.encode_id(user.id))
all_users.append((user.email, trans.security.encode_id(user.id)))
for role in (
trans.sa_session.query(trans.app.model.Role)
.filter(trans.app.model.Role.deleted == false())
.order_by(trans.app.model.Role.name)
):
if role in [x.role for x in group.roles]:
in_roles.append(trans.security.encode_id(role.id))
all_roles.append((role.name, trans.security.encode_id(role.id)))
return {
"title": f"Group '{group.name}'",
"message": "Group '%s' is currently associated with %d user(s) and %d role(s)."
% (group.name, len(in_users), len(in_roles)),
"status": "info",
"inputs": [
build_select_input("in_roles", "Roles", all_roles, in_roles),
build_select_input("in_users", "Users", all_users, in_users),
],
}
else:
in_users = [
trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_users"))
]
in_roles = [
trans.sa_session.query(trans.app.model.Role).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_roles"))
]
if None in in_users or None in in_roles:
return self.message_exception(trans, "One or more invalid user/role id has been provided.")
trans.app.security_agent.set_entity_group_associations(groups=[group], users=in_users, roles=in_roles)
trans.sa_session.refresh(group)
return {
"message": f"Group '{group.name}' has been updated with {len(in_users)} associated users and {len(in_roles)} associated roles."
}
[docs] @web.legacy_expose_api
@web.require_admin
def create_group(self, trans, payload=None, **kwd):
if trans.request.method == "GET":
all_users = []
all_roles = []
for user in (
trans.sa_session.query(trans.app.model.User)
.filter(trans.app.model.User.table.c.deleted == false())
.order_by(trans.app.model.User.table.c.email)
):
all_users.append((user.email, trans.security.encode_id(user.id)))
for role in (
trans.sa_session.query(trans.app.model.Role)
.filter(trans.app.model.Role.deleted == false())
.order_by(trans.app.model.Role.name)
):
all_roles.append((role.name, trans.security.encode_id(role.id)))
return {
"title": "Create Group",
"title_id": "create-group",
"inputs": [
{"name": "name", "label": "Name"},
build_select_input("in_roles", "Roles", all_roles, []),
build_select_input("in_users", "Users", all_users, []),
{
"name": "auto_create",
"label": "Create a new role of the same name for this group:",
"type": "boolean",
"optional": True,
},
],
}
else:
name = util.restore_text(payload.get("name", ""))
auto_create_checked = payload.get("auto_create")
in_users = [
trans.sa_session.query(trans.app.model.User).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_users"))
]
in_roles = [
trans.sa_session.query(trans.app.model.Role).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_roles"))
]
if not name:
return self.message_exception(trans, "Enter a valid name.")
elif trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.name == name).first():
return self.message_exception(
trans,
"Group names must be unique and a group with that name already exists, so choose another name.",
)
elif None in in_users or None in in_roles:
return self.message_exception(trans, "One or more invalid user/role id has been provided.")
else:
# Create the role
group = trans.app.model.Group(name=name)
trans.sa_session.add(group)
# Create the UserRoleAssociations
for user in in_users:
uga = trans.app.model.UserGroupAssociation(user, group)
trans.sa_session.add(uga)
# Create the GroupRoleAssociations
for role in in_roles:
gra = trans.app.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
if auto_create_checked:
# Check if role with same name already exists
if trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.name == name).first():
return self.message_exception(
trans,
"A role with that name already exists, so choose another name or disable role creation.",
)
# Create the role
role = trans.app.model.Role(name=name, description=f"Role for group {name}")
trans.sa_session.add(role)
# Associate the group with the role
gra = trans.model.GroupRoleAssociation(group, role)
trans.sa_session.add(gra)
num_in_roles = len(in_roles) + 1
else:
num_in_roles = len(in_roles)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = "Group '%s' has been created with %d associated users and %d associated roles." % (
group.name,
len(in_users),
num_in_roles,
)
if auto_create_checked:
message += (
"One of the roles associated with this group is the newly created role with the same name."
)
return {"message": message}
[docs] @web.expose
@web.require_admin
def create_new_user(self, trans, **kwd):
return trans.response.send_redirect(web.url_for(controller="user", action="create", cntrller="admin"))
[docs] @web.legacy_expose_api
@web.require_admin
def reset_user_password(self, trans, payload=None, **kwd):
users = {user_id: get_user(trans, user_id) for user_id in util.listify(kwd.get("id"))}
if users:
if trans.request.method == "GET":
return {
"message": f"Changes password(s) for: {', '.join(user.email for user in users.values())}.",
"status": "info",
"inputs": [
{"name": "password", "label": "New password", "type": "password"},
{"name": "confirm", "label": "Confirm password", "type": "password"},
],
}
else:
password = payload.get("password")
confirm = payload.get("confirm")
message = validate_password(trans, password, confirm)
if message:
return self.message_exception(trans, message)
for user in users.values():
user.set_password_cleartext(password)
trans.sa_session.add(user)
with transaction(trans.sa_session):
trans.sa_session.commit()
return {"message": "Passwords reset for %d user(s)." % len(users)}
else:
return self.message_exception(trans, "Please specify user ids.")
[docs] @web.legacy_expose_api
@web.require_admin
def manage_roles_and_groups_for_user(self, trans, payload=None, **kwd):
user_id = kwd.get("id")
if not user_id:
return self.message_exception(trans, f"Invalid user id ({str(user_id)}) received")
user = get_user(trans, user_id)
if trans.request.method == "GET":
in_roles = []
all_roles = []
in_groups = []
all_groups = []
for role in (
trans.sa_session.query(trans.app.model.Role)
.filter(trans.app.model.Role.deleted == false())
.order_by(trans.app.model.Role.name)
):
if role in [x.role for x in user.roles]:
in_roles.append(trans.security.encode_id(role.id))
if role.type != trans.app.model.Role.types.PRIVATE:
# There is a 1 to 1 mapping between a user and a PRIVATE role, so private roles should
# not be listed in the roles form fields, except for the currently selected user's private
# role, which should always be in in_roles. The check above is added as an additional
# precaution, since for a period of time we were including private roles in the form fields.
all_roles.append((role.name, trans.security.encode_id(role.id)))
for group in (
trans.sa_session.query(trans.app.model.Group)
.filter(trans.app.model.Group.deleted == false())
.order_by(trans.app.model.Group.name)
):
if group in [x.group for x in user.groups]:
in_groups.append(trans.security.encode_id(group.id))
all_groups.append((group.name, trans.security.encode_id(group.id)))
return {
"title": f"Roles and groups for '{user.email}'",
"message": f"User '{user.email}' is currently associated with {len(in_roles) - 1} role(s) and is a member of {len(in_groups)} group(s).",
"status": "info",
"inputs": [
build_select_input("in_roles", "Roles", all_roles, in_roles),
build_select_input("in_groups", "Groups", all_groups, in_groups),
],
}
else:
in_roles = [
trans.sa_session.query(trans.app.model.Role).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_roles"))
]
in_groups = [
trans.sa_session.query(trans.app.model.Group).get(trans.security.decode_id(x))
for x in util.listify(payload.get("in_groups"))
]
if None in in_groups or None in in_roles:
return self.message_exception(trans, "One or more invalid role/group id has been provided.")
# make sure the user is not dis-associating himself from his private role
private_role = trans.app.security_agent.get_private_user_role(user)
if private_role not in in_roles:
in_roles.append(private_role)
trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups)
trans.sa_session.refresh(user)
return {
"message": f"User '{user.email}' has been updated with {len(in_roles) - 1} associated roles and {len(in_groups)} associated groups (private roles are not displayed)."
}
# ---- Utility methods -------------------------------------------------------
[docs]def get_user(trans, user_id):
"""Get a User from the database by id."""
user = trans.sa_session.query(trans.model.User).get(trans.security.decode_id(user_id))
if not user:
return trans.show_error_message(f"User not found for id ({str(user_id)})")
return user
[docs]def get_role(trans, id):
"""Get a Role from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
role = trans.sa_session.query(trans.model.Role).get(id)
if not role:
return trans.show_error_message(f"Role not found for id ({str(id)})")
return role
[docs]def get_group(trans, id):
"""Get a Group from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
group = trans.sa_session.query(trans.model.Group).get(id)
if not group:
return trans.show_error_message(f"Group not found for id ({str(id)})")
return group
[docs]def get_quota(trans, id):
"""Get a Quota from the database by id."""
# Load user from database
id = trans.security.decode_id(id)
quota = trans.sa_session.query(trans.model.Quota).get(id)
return quota