Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.group_users

"""
API operations on Group objects.
"""
import logging
from galaxy.web.base.controller import BaseAPIController, url_for
from galaxy import web

log = logging.getLogger(__name__)


[docs]class GroupUsersAPIController(BaseAPIController):
[docs] @web.expose_api @web.require_admin def index(self, trans, group_id, **kwd): """ GET /api/groups/{encoded_group_id}/users Displays a collection (list) of groups. """ decoded_group_id = trans.security.decode_id(group_id) try: group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id) except: group = None if not group: trans.response.status = 400 return "Invalid group id ( %s ) specified." % str(group_id) rval = [] try: for uga in group.users: user = uga.user encoded_id = trans.security.encode_id(user.id) rval.append(dict(id=encoded_id, email=user.email, url=url_for('group_user', group_id=group_id, id=encoded_id, ))) except Exception as e: rval = "Error in group API at listing users" log.error(rval + ": %s" % str(e)) trans.response.status = 500 return rval
[docs] @web.expose_api @web.require_admin def show(self, trans, id, group_id, **kwd): """ GET /api/groups/{encoded_group_id}/users/{encoded_user_id} Displays information about a group user. """ user_id = id decoded_group_id = trans.security.decode_id(group_id) decoded_user_id = trans.security.decode_id(user_id) item = None try: group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id) user = trans.sa_session.query(trans.app.model.User).get(decoded_user_id) for uga in group.users: if uga.user == user: item = dict(id=user_id, email=user.email, url=url_for('group_user', group_id=group_id, id=user_id)) # TODO Fix This if not item: item = "user %s not in group %s" % (user.email, group.name) except Exception as e: item = "Error in group_user API group %s user %s" % (group.name, user.email) log.error(item + ": %s" % str(e)) return item
[docs] @web.expose_api @web.require_admin def update(self, trans, id, group_id, **kwd): """ PUT /api/groups/{encoded_group_id}/users/{encoded_user_id} Adds a user to a group """ user_id = id decoded_group_id = trans.security.decode_id(group_id) decoded_user_id = trans.security.decode_id(user_id) item = None try: group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id) user = trans.sa_session.query(trans.app.model.User).get(decoded_user_id) for uga in group.users: if uga.user == user: item = dict(id=user_id, email=user.email, url=url_for('group_user', group_id=group_id, id=user_id)) if not item: uga = trans.app.model.UserGroupAssociation(user, group) # Add UserGroupAssociations trans.sa_session.add(uga) trans.sa_session.flush() item = dict(id=user_id, email=user.email, url=url_for('group_user', group_id=group_id, id=user_id)) except Exception as e: item = "Error in group_user API Adding user %s to group %s" % (user.email, group.name) log.error(item + ": %s" % str(e)) return item
[docs] @web.expose_api @web.require_admin def delete(self, trans, id, group_id, **kwd): """ DELETE /api/groups/{encoded_group_id}/users/{encoded_user_id} Removes a user from a group """ user_id = id decoded_group_id = trans.security.decode_id(group_id) decoded_user_id = trans.security.decode_id(user_id) try: group = trans.sa_session.query(trans.app.model.Group).get(decoded_group_id) user = trans.sa_session.query(trans.app.model.User).get(decoded_user_id) for uga in group.users: if uga.user == user: trans.sa_session.delete(uga) trans.sa_session.flush() item = dict(id=user_id, email=user.email, url=url_for('group_user', group_id=group_id, id=user_id)) if not item: item = "user %s not in group %s" % (user.email, group.name) except Exception as e: item = "Error in group_user API Removing user %s from group %s" % (user.email, group.name) log.error(item + ": %s" % str(e)) return item