Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.managers.repositories
"""
Manager and Serializer for TS repositories.
"""
import logging
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from galaxy.exceptions import (InconsistentDatabase, InternalServerError,
RequestParameterInvalidException)
log = logging.getLogger(__name__)
# =============================================================================
[docs]class RepoManager:
"""
Interface/service object for interacting with TS repositories.
"""
[docs] def get(self, trans, decoded_repo_id):
"""
Get the repo from the DB.
:param decoded_repo_id: decoded repo id
:type decoded_repo_id: int
:returns: the requested repo
:rtype: Repository
"""
try:
repo = trans.sa_session.query(trans.app.model.Repository).filter(trans.app.model.Repository.table.c.id == decoded_repo_id).one()
except MultipleResultsFound:
raise InconsistentDatabase('Multiple repositories found with the same id.')
except NoResultFound:
raise RequestParameterInvalidException('No repository found with the id provided.')
except Exception:
raise InternalServerError('Error loading from the database.')
return repo
[docs] def list_by_owner(self, trans, user_id):
"""
Return a list of of repositories owned by a given TS user from the DB.
:returns: query that will emit repositories owned by given user
:rtype: sqlalchemy query
"""
query = trans.sa_session.query(trans.app.model.Repository).filter(trans.app.model.Repository.table.c.user_id == user_id)
return query
[docs] def delete(self, trans, group, undelete=False):
"""
Mark given group deleted/undeleted based on the flag.
"""