Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.api.group_users
"""
API operations on Group objects.
"""
import logging
from galaxy import web
from galaxy.util import unicodify
from galaxy.webapps.base.controller import BaseAPIController, url_for
log = logging.getLogger(__name__)
[docs]class GroupUsersAPIController(BaseAPIController):
[docs] @web.require_admin
@web.legacy_expose_api
def index(self, trans, group_id, **kwd):
"""
GET /api/groups/{encoded_group_id}/users
Displays a collection (list) of groups.
"""
decoded_group_id = trans.security.decode_id(group_id)
try:
group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id)
except Exception:
group = None
if not group:
trans.response.status = 400
return "Invalid group id ( %s ) specified." % str(group_id)
rval = []
try:
for uga in group.users:
user = uga.user
encoded_id = trans.security.encode_id(user.id)
rval.append(dict(id=encoded_id,
email=user.email,
url=url_for('group_user', group_id=group_id, id=encoded_id, )))
except Exception as e:
rval = "Error in group API at listing users"
log.error(rval + ": %s", unicodify(e))
trans.response.status = 500
return rval
[docs] @web.require_admin
@web.legacy_expose_api
def show(self, trans, id, group_id, **kwd):
"""
GET /api/groups/{encoded_group_id}/users/{encoded_user_id}
Displays information about a group user.
"""
user_id = id
decoded_group_id = trans.security.decode_id(group_id)
decoded_user_id = trans.security.decode_id(user_id)
item = None
try:
group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id)
user = trans.sa_session.query(trans.app.model.User).get(decoded_user_id)
for uga in group.users:
if uga.user == user:
item = dict(id=user_id,
email=user.email,
url=url_for('group_user', group_id=group_id, id=user_id)) # TODO Fix This
if not item:
item = "user {} not in group {}".format(user.email, group.name)
except Exception as e:
item = "Error in group_user API group {} user {}".format(group.name, user.email)
log.error(item + ": %s", unicodify(e))
return item
[docs] @web.require_admin
@web.legacy_expose_api
def update(self, trans, id, group_id, **kwd):
"""
PUT /api/groups/{encoded_group_id}/users/{encoded_user_id}
Adds a user to a group
"""
user_id = id
decoded_group_id = trans.security.decode_id(group_id)
decoded_user_id = trans.security.decode_id(user_id)
item = None
try:
group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id)
user = trans.sa_session.query(trans.app.model.User).get(decoded_user_id)
for uga in group.users:
if uga.user == user:
item = dict(id=user_id,
email=user.email,
url=url_for('group_user', group_id=group_id, id=user_id))
if not item:
uga = trans.app.model.UserGroupAssociation(user, group)
# Add UserGroupAssociations
trans.sa_session.add(uga)
trans.sa_session.flush()
item = dict(id=user_id,
email=user.email,
url=url_for('group_user', group_id=group_id, id=user_id))
except Exception as e:
item = "Error in group_user API Adding user {} to group {}".format(user.email, group.name)
log.error(item + ": %s", unicodify(e))
return item
[docs] @web.require_admin
@web.legacy_expose_api
def delete(self, trans, id, group_id, **kwd):
"""
DELETE /api/groups/{encoded_group_id}/users/{encoded_user_id}
Removes a user from a group
"""
user_id = id
decoded_group_id = trans.security.decode_id(group_id)
decoded_user_id = trans.security.decode_id(user_id)
try:
group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id)
user = trans.sa_session.query(trans.app.model.User).get(decoded_user_id)
for uga in group.users:
if uga.user == user:
trans.sa_session.delete(uga)
trans.sa_session.flush()
item = dict(id=user_id,
email=user.email,
url=url_for('group_user', group_id=group_id, id=user_id))
if not item:
item = "user {} not in group {}".format(user.email, group.name)
except Exception as e:
item = "Error in group_user API Removing user {} from group {}".format(user.email, group.name)
log.error(item + ": %s", unicodify(e))
return item