Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.review_util

import logging

from sqlalchemy import and_

from galaxy.util.odict import odict
from tool_shed.util import hg_util

log = logging.getLogger( __name__ )


[docs]def can_browse_repository_reviews( app, user, repository ): """ Determine if there are any reviews of the received repository for which the current user has permission to browse any component reviews. """ if user: for review in repository.reviews: for component_review in review.component_reviews: if app.security_agent.user_can_browse_component_review( app, repository, component_review, user ): return True return False
[docs]def changeset_revision_reviewed_by_user( user, repository, changeset_revision ): """Determine if the current changeset revision has been reviewed by the current user.""" for review in repository.reviews: if review.changeset_revision == changeset_revision and review.user == user: return True return False
[docs]def get_component( app, id ): """Get a component from the database.""" sa_session = app.model.context.current return sa_session.query( app.model.Component ).get( app.security.decode_id( id ) )
[docs]def get_component_review( app, id ): """Get a component_review from the database""" sa_session = app.model.context.current return sa_session.query( app.model.ComponentReview ).get( app.security.decode_id( id ) )
[docs]def get_component_by_name( app, name ): """Get a component from the database via a name.""" sa_session = app.model.context.current return sa_session.query( app.model.Component ) \ .filter( app.model.Component.table.c.name == name ) \ .first()
[docs]def get_component_review_by_repository_review_id_component_id( app, repository_review_id, component_id ): """Get a component_review from the database via repository_review_id and component_id.""" sa_session = app.model.context.current return sa_session.query( app.model.ComponentReview ) \ .filter( and_( app.model.ComponentReview.table.c.repository_review_id == app.security.decode_id( repository_review_id ), app.model.ComponentReview.table.c.component_id == app.security.decode_id( component_id ) ) ) \ .first()
[docs]def get_components( app ): sa_session = app.model.context.current return sa_session.query( app.model.Component ) \ .order_by( app.model.Component.name ) \ .all()
[docs]def get_previous_repository_reviews( app, repository, changeset_revision ): """ Return an ordered dictionary of repository reviews up to and including the received changeset revision. """ repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False ) reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ] previous_reviews_dict = odict() for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ): previous_changeset_revision = str( repo.changectx( changeset ) ) if previous_changeset_revision in reviewed_revision_hashes: previous_rev, previous_changeset_revision_label = \ hg_util.get_rev_label_from_changeset_revision( repo, previous_changeset_revision ) revision_reviews = get_reviews_by_repository_id_changeset_revision( app, app.security.encode_id( repository.id ), previous_changeset_revision ) previous_reviews_dict[ previous_changeset_revision ] = \ dict( changeset_revision_label=previous_changeset_revision_label, reviews=revision_reviews ) return previous_reviews_dict
[docs]def get_review( app, id ): """Get a repository_review from the database via id.""" sa_session = app.model.context.current return sa_session.query( app.model.RepositoryReview ).get( app.security.decode_id( id ) )
[docs]def get_review_by_repository_id_changeset_revision_user_id( app, repository_id, changeset_revision, user_id ): """ Get a repository_review from the database via repository id, changeset_revision and user_id. """ sa_session = app.model.context.current return sa_session.query( app.model.RepositoryReview ) \ .filter( and_( app.model.RepositoryReview.repository_id == app.security.decode_id( repository_id ), app.model.RepositoryReview.changeset_revision == changeset_revision, app.model.RepositoryReview.user_id == app.security.decode_id( user_id ) ) ) \ .first()
[docs]def get_reviews_by_repository_id_changeset_revision( app, repository_id, changeset_revision ): """Get all repository_reviews from the database via repository id and changeset_revision.""" sa_session = app.model.context.current return sa_session.query( app.model.RepositoryReview ) \ .filter( and_( app.model.RepositoryReview.repository_id == app.security.decode_id( repository_id ), app.model.RepositoryReview.changeset_revision == changeset_revision ) ) \ .all()
[docs]def has_previous_repository_reviews( app, repository, changeset_revision ): """ Determine if a repository has a changeset revision review prior to the received changeset revision. """ repo = hg_util.get_repo_for_repository( app, repository=repository, repo_path=None, create=False ) reviewed_revision_hashes = [ review.changeset_revision for review in repository.reviews ] for changeset in hg_util.reversed_upper_bounded_changelog( repo, changeset_revision ): previous_changeset_revision = str( repo.changectx( changeset ) ) if previous_changeset_revision in reviewed_revision_hashes: return True return False