Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.grids.repository_grids
import json
import logging
from markupsafe import escape as escape_html
from sqlalchemy import and_, false, or_, true
import tool_shed.grids.util as grids_util
import tool_shed.repository_types.util as rt_util
import tool_shed.util.shed_util_common as suc
from galaxy.webapps.reports.framework import grids
from tool_shed.util import hg_util, metadata_util, repository_util
from tool_shed.webapp import model
log = logging.getLogger(__name__)
[docs]class CategoryGrid(grids.Grid):
[docs] class RepositoriesColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, category):
category_name = str(category.name)
filter = trans.app.repository_grid_filter_manager.get_filter(trans)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
return trans.app.repository_registry.certified_level_one_viewable_repositories_and_suites_by_category.get(category_name, 0)
elif filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
return trans.app.repository_registry.certified_level_one_viewable_suites_by_category.get(category_name, 0)
elif filter == trans.app.repository_grid_filter_manager.filters.SUITES:
return trans.app.repository_registry.viewable_suites_by_category.get(category_name, 0)
else:
# The value filter is None.
return trans.app.repository_registry.viewable_repositories_and_suites_by_category.get(category_name, 0)
title = "Categories"
model_class = model.Category
template = '/webapps/tool_shed/category/grid.mako'
default_sort_key = "name"
columns = [
NameColumn("Name",
key="Category.name",
link=(lambda item: dict(operation="repositories_by_category", id=item.id)),
attach_popup=False),
DescriptionColumn("Description",
key="Category.description",
attach_popup=False),
RepositoriesColumn("Repositories",
model_class=model.Repository,
attach_popup=False)
]
# Override these
default_filter = {}
global_actions = []
operations = []
standard_filters = []
num_rows_per_page = 50
use_paging = False
[docs]class RepositoryGrid(grids.Grid):
[docs] class TypeColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository):
type_class = repository.get_type_class(trans.app)
return escape_html(type_class.label)
[docs] class HeadsColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, repository):
"""Display the current repository heads."""
repo = repository.hg_repo
heads = hg_util.get_repository_heads(repo)
multiple_heads = len(heads) > 1
if multiple_heads:
heads_str = '<font color="red">'
else:
heads_str = ''
for ctx in heads:
heads_str += '%s<br/>' % hg_util.get_revision_label_from_ctx(ctx, include_date=True)
heads_str.rstrip('<br/>')
if multiple_heads:
heads_str += '</font>'
return heads_str
[docs] class MetadataRevisionColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, repository):
"""Display a SelectField whose options are the changeset_revision strings of all metadata revisions of this repository."""
# A repository's metadata revisions may not all be installable, as some may contain only invalid tools.
select_field = grids_util.build_changeset_revision_select_field(trans, repository, downloadable=False)
if len(select_field.options) > 1:
tmpl = "<select name='%s'>" % select_field.name
for o in select_field.options:
tmpl += "<option value='%s'>%s</option>" % (o[1], o[0])
tmpl += "</select>"
return tmpl
elif len(select_field.options) == 1:
option_items = select_field.options[0][0]
rev_label, rev_date = option_items.split(' ')
rev_date = '<i><font color="#666666">%s</font></i>' % rev_date
return '%s %s' % (rev_label, rev_date)
return ''
[docs] class LatestInstallableRevisionColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, repository):
"""Display the latest installable revision label (may not be the repository tip)."""
select_field = grids_util.build_changeset_revision_select_field(trans, repository, downloadable=False)
if select_field.options:
return select_field.options[0][0]
return ''
[docs] class TipRevisionColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, repository):
"""Display the repository tip revision label."""
return escape_html(repository.revision())
[docs] class CategoryColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository):
rval = '<ul>'
if repository.categories:
for rca in repository.categories:
rval += '<li><a href="browse_repositories?operation=repositories_by_category&id=%s">%s</a></li>' \
% (trans.security.encode_id(rca.category.id), rca.category.name)
else:
rval += '<li>not set</li>'
rval += '</ul>'
return rval
[docs] class RepositoryCategoryColumn(grids.GridColumn):
[docs] def filter(self, trans, user, query, column_filter):
"""Modify query to filter by category."""
if column_filter == "All":
return query
return query.filter(model.Category.name == column_filter)
[docs] class UserColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository):
if repository.user:
return escape_html(repository.user.username)
return 'no user'
[docs] class EmailColumn(grids.TextColumn):
[docs] def filter(self, trans, user, query, column_filter):
if column_filter == 'All':
return query
return query.filter(and_(model.Repository.table.c.user_id == model.User.table.c.id,
model.User.table.c.email == column_filter))
[docs] class EmailAlertsColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository):
if trans.user and repository.email_alerts and trans.user.email in json.loads(repository.email_alerts):
return 'yes'
return ''
[docs] class DeprecatedColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository):
if repository.deprecated:
return 'yes'
return ''
title = "Repositories"
model_class = model.Repository
default_sort_key = "name"
use_hide_message = False
columns = [
NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
DescriptionColumn("Synopsis",
key="description",
attach_popup=False),
TypeColumn("Type"),
MetadataRevisionColumn("Metadata<br/>Revisions"),
UserColumn("Owner",
model_class=model.User,
link=(lambda item: dict(operation="repositories_by_user", id=item.id)),
attach_popup=False,
key="User.username"),
# Columns that are valid for filtering but are not visible.
EmailColumn("Email",
model_class=model.User,
key="email",
visible=False),
RepositoryCategoryColumn("Category",
model_class=model.Category,
key="Category.name",
visible=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, description",
cols_to_filter=[columns[0], columns[1]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
standard_filters = []
default_filter = dict(deleted="False")
num_rows_per_page = 50
use_paging = True
allow_fetching_all_results = False
[docs] def build_initial_query(self, trans, **kwd):
filter = trans.app.repository_grid_filter_manager.get_filter(trans)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
return trans.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.type == rt_util.REPOSITORY_SUITE_DEFINITION) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
else:
# The filter is None.
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
[docs]class DockerImageGrid(RepositoryGrid):
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.DescriptionColumn("Synopsis",
key="description",
attach_popup=False),
RepositoryGrid.UserColumn("Owner",
model_class=model.User,
link=(lambda item: dict(operation="repositories_by_user", id=item.id)),
attach_popup=False,
key="User.username"),
RepositoryGrid.EmailAlertsColumn("Alert", attach_popup=False),
]
operations = [grids.GridOperation("Include in Docker image", allow_multiple=True)]
show_item_checkboxes = True
[docs]class EmailAlertsRepositoryGrid(RepositoryGrid):
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.DescriptionColumn("Synopsis",
key="description",
attach_popup=False),
RepositoryGrid.UserColumn("Owner",
model_class=model.User,
link=(lambda item: dict(operation="repositories_by_user", id=item.id)),
attach_popup=False,
key="User.username"),
RepositoryGrid.EmailAlertsColumn("Alert", attach_popup=False),
# Columns that are valid for filtering but are not visible.
grids.DeletedColumn("Deleted",
key="deleted",
visible=False,
filterable="advanced")
]
operations = [grids.GridOperation("Receive email alerts", allow_multiple=True)]
global_actions = [
grids.GridAction("User preferences", dict(controller='user', action='index', cntrller='repository'))
]
[docs]class MatchedRepositoryGrid(grids.Grid):
# This grid filters out repositories that have been marked as deleted or deprecated.
[docs] class NameColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
return escape_html(repository_metadata.repository.name)
[docs] class DescriptionColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
return escape_html(repository_metadata.repository.description)
[docs] class RevisionColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
return repository_metadata.changeset_revision
[docs] class UserColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.repository.user:
return escape_html(repository_metadata.repository.user.username)
return 'no user'
# Grid definition
title = "Matching repositories"
model_class = model.RepositoryMetadata
default_sort_key = "Repository.name"
use_hide_message = False
columns = [
NameColumn("Repository name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=True),
DescriptionColumn("Synopsis",
attach_popup=False),
RevisionColumn("Revision"),
UserColumn("Owner",
model_class=model.User,
attach_popup=False)
]
operations = [grids.GridOperation("Install to Galaxy", allow_multiple=True)]
standard_filters = []
default_filter = {}
num_rows_per_page = 50
use_paging = False
[docs] def build_initial_query(self, trans, **kwd):
match_tuples = kwd.get('match_tuples', [])
clause_list = []
if match_tuples:
for match_tuple in match_tuples:
repository_id, changeset_revision = match_tuple
clause_list.append(and_(
model.RepositoryMetadata.repository_id == int(repository_id),
model.RepositoryMetadata.changeset_revision == changeset_revision))
return trans.sa_session.query(model.RepositoryMetadata) \
.join(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table) \
.filter(or_(*clause_list)) \
.order_by(model.Repository.name)
# Return an empty query
return trans.sa_session.query(model.RepositoryMetadata) \
.filter(model.RepositoryMetadata.id < 0)
[docs]class InstallMatchedRepositoryGrid(MatchedRepositoryGrid):
columns = [col for col in MatchedRepositoryGrid.columns]
# Override the NameColumn
columns[0] = MatchedRepositoryGrid.NameColumn("Name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False)
[docs]class MyWritableRepositoriesGrid(RepositoryGrid):
# This grid filters out repositories that have been marked as either deprecated or deleted.
title = 'Repositories I can change'
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.TypeColumn("Type"),
RepositoryGrid.MetadataRevisionColumn("Metadata<br/>Revisions"),
RepositoryGrid.UserColumn("Owner",
model_class=model.User,
link=(lambda item: dict(operation="repositories_by_user", id=item.id)),
attach_popup=False,
key="User.username")
]
columns.append(grids.MulticolFilterColumn("Search repository name",
cols_to_filter=[columns[0]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
[docs] def build_initial_query(self, trans, **kwd):
# TODO: improve performance by adding a db table associating users with repositories for which they have write access.
username = trans.user.username
clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())):
allow_push = repository.allow_push()
if allow_push:
allow_push_usernames = allow_push.split(',')
if username in allow_push_usernames:
clause_list.append(model.Repository.table.c.id == repository.id)
if clause_list:
return trans.sa_session.query(model.Repository) \
.filter(or_(*clause_list)) \
.join(model.User.table)
# Return an empty query.
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.id < 0)
[docs]class RepositoriesByUserGrid(RepositoryGrid):
title = "Repositories by user"
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.DescriptionColumn("Synopsis",
key="description",
attach_popup=False),
RepositoryGrid.TypeColumn("Type"),
RepositoryGrid.MetadataRevisionColumn("Metadata<br/>Revisions"),
RepositoryGrid.CategoryColumn("Category",
model_class=model.Category,
key="Category.name",
attach_popup=False)
]
operations = []
standard_filters = []
default_filter = dict(deleted="False")
[docs] def build_initial_query(self, trans, **kwd):
decoded_user_id = trans.security.decode_id(kwd['user_id'])
filter = trans.app.repository_grid_filter_manager.get_filter(trans)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.user_id == decoded_user_id) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.type == rt_util.REPOSITORY_SUITE_DEFINITION,
model.Repository.table.c.user_id == decoded_user_id)) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
else:
# The value of filter is None.
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false(),
model.Repository.table.c.user_id == decoded_user_id)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
[docs]class RepositoriesInCategoryGrid(RepositoryGrid):
title = "Category"
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(controller="repository", operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.DescriptionColumn("Synopsis",
key="description",
attach_popup=False),
RepositoryGrid.TypeColumn("Type"),
RepositoryGrid.MetadataRevisionColumn("Metadata<br/>Revisions"),
RepositoryGrid.UserColumn("Owner",
model_class=model.User,
link=(lambda item: dict(controller="repository", operation="repositories_by_user", id=item.id)),
attach_popup=False,
key="User.username"),
# Columns that are valid for filtering but are not visible.
RepositoryGrid.EmailColumn("Email",
model_class=model.User,
key="email",
visible=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, description",
cols_to_filter=[columns[0], columns[1]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
use_paging = False
[docs] def build_initial_query(self, trans, **kwd):
category_id = kwd.get('id', None)
filter = trans.app.repository_grid_filter_manager.get_filter(trans)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
if category_id:
category = suc.get_category(trans.app, category_id)
if category:
return trans.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table) \
.filter(model.Category.table.c.name == category.name)
return trans.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
if category_id:
category = suc.get_category(trans.app, category_id)
if category:
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.type == rt_util.REPOSITORY_SUITE_DEFINITION) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table) \
.filter(model.Category.table.c.name == category.name)
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.type == rt_util.REPOSITORY_SUITE_DEFINITION) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
else:
# The value of filter is None.
if category_id:
category = suc.get_category(trans.app, category_id)
if category:
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table) \
.filter(model.Category.table.c.name == category.name)
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
[docs]class RepositoriesIOwnGrid(RepositoryGrid):
title = "Repositories I own"
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.TypeColumn("Type"),
RepositoryGrid.MetadataRevisionColumn("Metadata<br/>Revisions"),
RepositoryGrid.DeprecatedColumn("Deprecated")
]
columns.append(grids.MulticolFilterColumn("Search repository name",
cols_to_filter=[columns[0]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.user_id == trans.user.id)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
[docs]class RepositoriesICanAdministerGrid(RepositoryGrid):
title = "Repositories I can administer"
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.UserColumn("Owner"),
RepositoryGrid.MetadataRevisionColumn("Metadata<br/>Revisions"),
RepositoryGrid.DeprecatedColumn("Deprecated")
]
columns.append(grids.MulticolFilterColumn("Search repository name",
cols_to_filter=[columns[0]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
[docs] def build_initial_query(self, trans, **kwd):
"""
Retrieve all repositories for which the current user has been granted administrative privileges.
"""
current_user = trans.user
# Build up an or-based clause list containing role table records.
clause_list = []
# Include each of the user's roles.
for ura in current_user.roles:
clause_list.append(model.Role.table.c.id == ura.role_id)
# Include each role associated with each group of which the user is a member.
for uga in current_user.groups:
group = uga.group
for gra in group.roles:
clause_list.append(model.Role.table.c.id == gra.role_id)
# Filter out repositories for which the user does not have the administrative role either directly
# via a role association or indirectly via a group -> role association.
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.deleted == false()) \
.outerjoin(model.RepositoryRoleAssociation.table) \
.outerjoin(model.Role.table) \
.filter(or_(*clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
[docs]class RepositoriesMissingToolTestComponentsGrid(RepositoryGrid):
# This grid displays only the latest installable revision of each repository.
title = "Repositories with missing tool test components"
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.LatestInstallableRevisionColumn("Latest Installable Revision"),
RepositoryGrid.UserColumn("Owner",
key="User.username",
model_class=model.User,
link=(lambda item: dict(operation="repositories_by_user", id=item.id)),
attach_popup=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name",
cols_to_filter=[columns[0]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
[docs] def build_initial_query(self, trans, **kwd):
# Filter by latest installable revisions that contain tools with missing tool test components.
revision_clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())):
changeset_revision = \
grids_util.filter_by_latest_downloadable_changeset_revision_that_has_missing_tool_test_components(trans, repository)
if changeset_revision:
revision_clause_list.append(model.RepositoryMetadata.table.c.changeset_revision == changeset_revision)
if revision_clause_list:
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())) \
.join(model.RepositoryMetadata) \
.filter(or_(*revision_clause_list)) \
.join(model.User.table)
# Return an empty query.
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.id < 0)
[docs]class MyWritableRepositoriesMissingToolTestComponentsGrid(RepositoriesMissingToolTestComponentsGrid):
# This grid displays only the latest installable revision of each repository.
title = "Repositories I can change with missing tool test components"
columns = [col for col in RepositoriesMissingToolTestComponentsGrid.columns]
operations = []
[docs] def build_initial_query(self, trans, **kwd):
# First get all repositories that the current user is authorized to update.
username = trans.user.username
user_clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())):
allow_push = repository.allow_push()
if allow_push:
allow_push_usernames = allow_push.split(',')
if username in allow_push_usernames:
user_clause_list.append(model.Repository.table.c.id == repository.id)
if user_clause_list:
# We have the list of repositories that the current user is authorized to update, so filter
# further by latest installable revisions that contain tools with missing tool test components.
revision_clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())) \
.filter(or_(*user_clause_list)):
changeset_revision = \
grids_util.filter_by_latest_downloadable_changeset_revision_that_has_missing_tool_test_components(trans, repository)
if changeset_revision:
revision_clause_list.append(model.RepositoryMetadata.table.c.changeset_revision == changeset_revision)
if revision_clause_list:
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())) \
.join(model.User.table) \
.filter(or_(*user_clause_list)) \
.join(model.RepositoryMetadata) \
.filter(or_(*revision_clause_list))
# Return an empty query.
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.id < 0)
[docs]class DeprecatedRepositoriesIOwnGrid(RepositoriesIOwnGrid):
title = "Deprecated repositories I own"
columns = [
RepositoriesIOwnGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.TypeColumn("Type"),
RepositoriesIOwnGrid.MetadataRevisionColumn("Metadata<br/>Revisions"),
RepositoriesIOwnGrid.CategoryColumn("Category",
model_class=model.Category,
key="Category.name",
attach_popup=False),
]
columns.append(grids.MulticolFilterColumn("Search repository name",
cols_to_filter=[columns[0]],
key="free-text-search",
visible=False,
filterable="standard"))
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.user_id == trans.user.id,
model.Repository.table.c.deprecated == true())) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table)
[docs]class RepositoriesWithInvalidToolsGrid(RepositoryGrid):
# This grid displays only the latest installable revision of each repository.
[docs] class InvalidToolConfigColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, repository):
# At the time this grid is displayed we know that the received repository will have invalid tools in its latest changeset revision
# that has associated metadata.
val = ''
repository_metadata = \
grids_util.get_latest_repository_metadata_if_it_includes_invalid_tools(trans, repository)
metadata = repository_metadata.metadata
invalid_tools = metadata.get('invalid_tools', [])
if invalid_tools:
for invalid_tool_config in invalid_tools:
href_str = '<a href="load_invalid_tool?repository_id=%s&tool_config=%s&changeset_revision=%s">%s</a>' % \
(trans.security.encode_id(repository.id), invalid_tool_config, repository_metadata.changeset_revision, invalid_tool_config)
val += href_str
val += '<br/>'
val = val.rstrip('<br/>')
return val
title = "Repositories with invalid tools"
columns = [
InvalidToolConfigColumn("Tool config"),
RepositoryGrid.NameColumn("Name",
key="name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryGrid.LatestInstallableRevisionColumn("Latest Metadata Revision"),
RepositoryGrid.UserColumn("Owner",
key="User.username",
model_class=model.User,
link=(lambda item: dict(operation="repositories_by_user", id=item.id)),
attach_popup=False)
]
operations = []
[docs] def build_initial_query(self, trans, **kwd):
# Filter by latest metadata revisions that contain invalid tools.
revision_clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())):
changeset_revision = \
grids_util.filter_by_latest_metadata_changeset_revision_that_has_invalid_tools(trans, repository)
if changeset_revision:
revision_clause_list.append(model.RepositoryMetadata.table.c.changeset_revision == changeset_revision)
if revision_clause_list:
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())) \
.join(model.RepositoryMetadata) \
.filter(or_(*revision_clause_list)) \
.join(model.User.table)
# Return an empty query.
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.id < 0)
[docs]class MyWritableRepositoriesWithInvalidToolsGrid(RepositoriesWithInvalidToolsGrid):
# This grid displays only the latest installable revision of each repository.
title = "Repositories I can change with invalid tools"
columns = [col for col in RepositoriesWithInvalidToolsGrid.columns]
operations = []
[docs] def build_initial_query(self, trans, **kwd):
# First get all repositories that the current user is authorized to update.
username = trans.user.username
user_clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())):
allow_push = repository.allow_push()
if allow_push:
allow_push_usernames = allow_push.split(',')
if username in allow_push_usernames:
user_clause_list.append(model.Repository.table.c.id == repository.id)
if user_clause_list:
# We have the list of repositories that the current user is authorized to update, so filter
# further by latest metadata revisions that contain invalid tools.
revision_clause_list = []
for repository in trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())) \
.filter(or_(*user_clause_list)):
changeset_revision = \
grids_util.filter_by_latest_metadata_changeset_revision_that_has_invalid_tools(trans, repository)
if changeset_revision:
revision_clause_list.append(model.RepositoryMetadata.table.c.changeset_revision == changeset_revision)
if revision_clause_list:
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deprecated == false(),
model.Repository.table.c.deleted == false())) \
.join(model.User.table) \
.filter(or_(*user_clause_list)) \
.join(model.RepositoryMetadata) \
.filter(or_(*revision_clause_list))
# Return an empty query.
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.table.c.id < 0)
[docs]class RepositoryMetadataGrid(grids.Grid):
[docs] class RepositoryNameColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
repository = repository_metadata.repository
return escape_html(repository.name)
[docs] class RepositoryTypeColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
repository = repository_metadata.repository
type_class = repository.get_type_class(trans.app)
return escape_html(type_class.label)
[docs] class RepositoryOwnerColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
repository = repository_metadata.repository
return escape_html(repository.user.username)
[docs] class ChangesetRevisionColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
repository = repository_metadata.repository
changeset_revision = repository_metadata.changeset_revision
changeset_revision_label = hg_util.get_revision_label(trans.app, repository, changeset_revision, include_date=True)
return changeset_revision_label
[docs] class MaliciousColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.malicious:
return 'yes'
return ''
[docs] class DownloadableColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.downloadable:
return 'yes'
return ''
[docs] class HasRepositoryDependenciesColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.has_repository_dependencies:
return 'yes'
return ''
[docs] class IncludesDatatypesColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.includes_datatypes:
return 'yes'
return ''
[docs] class IncludesToolsColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.includes_tools:
return 'yes'
return ''
[docs] class IncludesToolDependenciesColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.includes_tool_dependencies:
return 'yes'
return ''
[docs] class IncludesWorkflowsColumn(grids.BooleanColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
if repository_metadata.includes_workflows:
return 'yes'
return ''
title = "Repository metadata"
model_class = model.RepositoryMetadata
default_sort_key = "Repository.name"
columns = [
RepositoryNameColumn("Repository name",
key="Repository.name",
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False),
RepositoryNameColumn("Type"),
RepositoryOwnerColumn("Owner",
model_class=model.User,
attach_popup=False,
key="User.username")
]
columns.append(grids.MulticolFilterColumn("Search repository name, description",
cols_to_filter=[columns[0], columns[1]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
standard_filters = []
default_filter = dict(malicious="False")
num_rows_per_page = 50
use_paging = True
allow_fetching_all_results = False
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.RepositoryMetadata) \
.join(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table)
[docs]class RepositoryDependenciesGrid(RepositoryMetadataGrid):
[docs] class RequiredRepositoryColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
rd_str = []
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
rd_dict = metadata.get('repository_dependencies', {})
if rd_dict:
rd_tups = rd_dict['repository_dependencies']
# "repository_dependencies": [["http://localhost:9009", "bwa059", "test", "a07baa797d53"]]
# Sort rd_tups by by required repository name.
sorted_rd_tups = sorted(rd_tups, key=lambda rd_tup: rd_tup[1])
for rd_tup in sorted_rd_tups:
name, owner, changeset_revision = rd_tup[1:4]
rd_line = ''
required_repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
if required_repository and not required_repository.deleted:
required_repository_id = trans.security.encode_id(required_repository.id)
required_repository_metadata = \
metadata_util.get_repository_metadata_by_repository_id_changeset_revision(trans.app,
required_repository_id,
changeset_revision)
if not required_repository_metadata:
updated_changeset_revision = \
metadata_util.get_next_downloadable_changeset_revision(trans.app, required_repository, changeset_revision)
required_repository_metadata = \
metadata_util.get_repository_metadata_by_repository_id_changeset_revision(trans.app,
required_repository_id,
updated_changeset_revision)
required_repository_metadata_id = trans.security.encode_id(required_repository_metadata.id)
rd_line += '<a href="browse_repository_dependencies?operation=view_or_manage_repository&id=%s">' % (required_repository_metadata_id)
rd_line += 'Repository <b>%s</b> revision <b>%s</b> owned by <b>%s</b>' % (escape_html(name), escape_html(owner), escape_html(changeset_revision))
if required_repository:
rd_line += '</a>'
rd_str.append(rd_line)
return '<br />'.join(rd_str)
title = "Valid repository dependency definitions in this tool shed"
default_sort_key = "Repository.name"
columns = [
RequiredRepositoryColumn("Repository dependency",
attach_popup=False),
RepositoryMetadataGrid.RepositoryNameColumn("Repository name",
model_class=model.Repository,
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False,
key="Repository.name"),
RepositoryMetadataGrid.RepositoryOwnerColumn("Owner",
model_class=model.User,
attach_popup=False,
key="User.username"),
RepositoryMetadataGrid.ChangesetRevisionColumn("Revision",
attach_popup=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, owner",
cols_to_filter=[columns[1], columns[2]],
key="free-text-search",
visible=False,
filterable="standard"))
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.RepositoryMetadata) \
.join(model.Repository) \
.filter(and_(model.RepositoryMetadata.table.c.has_repository_dependencies == true(),
model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table)
[docs]class DatatypesGrid(RepositoryMetadataGrid):
[docs] class DatatypesColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
datatype_list = []
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
datatype_dicts = metadata.get('datatypes', [])
if datatype_dicts:
# Create tuples of the attributes we want so we can sort them by extension.
datatype_tups = []
for datatype_dict in datatype_dicts:
# Example: {"display_in_upload": "true", "dtype": "galaxy.datatypes.blast:BlastXml", "extension": "blastxml", "mimetype": "application/xml"}
extension = datatype_dict.get('extension', '')
dtype = datatype_dict.get('dtype', '')
# For now we'll just display extension and dtype.
if extension and dtype:
datatype_tups.append((extension, dtype))
sorted_datatype_tups = sorted(datatype_tups, key=lambda datatype_tup: datatype_tup[0])
for datatype_tup in sorted_datatype_tups:
extension, datatype = datatype_tup[:2]
datatype_str = '<a href="browse_datatypes?operation=view_or_manage_repository&id=%s">' % trans.security.encode_id(repository_metadata.id)
datatype_str += '<b>%s:</b> %s' % (escape_html(extension), escape_html(datatype))
datatype_str += '</a>'
datatype_list.append(datatype_str)
return '<br />'.join(datatype_list)
title = "Custom datatypes in this tool shed"
default_sort_key = "Repository.name"
columns = [
DatatypesColumn("Datatype extension and class",
attach_popup=False),
RepositoryMetadataGrid.RepositoryNameColumn("Repository name",
model_class=model.Repository,
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False,
key="Repository.name"),
RepositoryMetadataGrid.RepositoryOwnerColumn("Owner",
model_class=model.User,
attach_popup=False,
key="User.username"),
RepositoryMetadataGrid.ChangesetRevisionColumn("Revision",
attach_popup=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, owner",
cols_to_filter=[columns[1], columns[2]],
key="free-text-search",
visible=False,
filterable="standard"))
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.RepositoryMetadata) \
.join(model.Repository) \
.filter(and_(model.RepositoryMetadata.table.c.includes_datatypes == true(),
model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table)
[docs]class ToolDependenciesGrid(RepositoryMetadataGrid):
[docs] class ToolDependencyColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
td_str = ''
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
tds_dict = metadata.get('tool_dependencies', {})
if tds_dict:
# Example: {"bwa/0.5.9": {"name": "bwa", "type": "package", "version": "0.5.9"}}
sorted_keys = sorted(tds_dict.keys())
num_keys = len(sorted_keys)
# Handle environment settings first.
if 'set_environment' in sorted_keys:
# Example: "set_environment": [{"name": "JAVA_JAR_FILE", "type": "set_environment"}]
env_dicts = tds_dict['set_environment']
num_env_dicts = len(env_dicts)
if num_env_dicts > 0:
td_str += '<a href="browse_datatypes?operation=view_or_manage_repository&id=%s">' % trans.security.encode_id(repository_metadata.id)
td_str += '<b>environment:</b> '
td_str += ', '.join(escape_html(env_dict['name']) for env_dict in env_dicts)
td_str += '</a><br/>'
for index, key in enumerate(sorted_keys):
if key == 'set_environment':
continue
td_dict = tds_dict[key]
# Example: {"name": "bwa", "type": "package", "version": "0.5.9"}
name = td_dict['name']
version = td_dict['version']
td_str += '<a href="browse_datatypes?operation=view_or_manage_repository&id=%s">' % trans.security.encode_id(repository_metadata.id)
td_str += '<b>%s</b> version <b>%s</b>' % (escape_html(name), escape_html(version))
td_str += '</a>'
if index < num_keys - 1:
td_str += '<br/>'
return td_str
title = "Tool dependency definitions in this tool shed"
default_sort_key = "Repository.name"
columns = [
ToolDependencyColumn("Tool dependency",
attach_popup=False),
RepositoryMetadataGrid.RepositoryNameColumn("Repository name",
model_class=model.Repository,
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False,
key="Repository.name"),
RepositoryMetadataGrid.RepositoryOwnerColumn("Owner",
model_class=model.User,
attach_popup=False,
key="User.username"),
RepositoryMetadataGrid.ChangesetRevisionColumn("Revision",
attach_popup=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, owner",
cols_to_filter=[columns[1], columns[2]],
key="free-text-search",
visible=False,
filterable="standard"))
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.RepositoryMetadata) \
.join(model.Repository) \
.filter(and_(model.RepositoryMetadata.table.c.includes_tool_dependencies == true(),
model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table)
[docs]class ToolsGrid(RepositoryMetadataGrid):
[docs] class ToolsColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository_metadata):
tool_line = []
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
tool_dicts = metadata.get('tools', [])
if tool_dicts:
# Create tuples of the attributes we want so we can sort them by extension.
tool_tups = []
for tool_dict in tool_dicts:
tool_id = tool_dict.get('id', '')
version = tool_dict.get('version', '')
# For now we'll just display tool id and version.
if tool_id and version:
tool_tups.append((tool_id, version))
sorted_tool_tups = sorted(tool_tups, key=lambda tool_tup: tool_tup[0])
for tool_tup in sorted_tool_tups:
tool_id, version = tool_tup[:2]
tool_str = '<a href="browse_datatypes?operation=view_or_manage_repository&id=%s">' % trans.security.encode_id(repository_metadata.id)
tool_str += '<b>%s:</b> %s' % (escape_html(tool_id), escape_html(version))
tool_str += '</a>'
tool_line.append(tool_str)
return '<br />'.join(tool_line)
title = "Valid tools in this tool shed"
default_sort_key = "Repository.name"
columns = [
ToolsColumn("Tool id and version",
attach_popup=False),
RepositoryMetadataGrid.RepositoryNameColumn("Repository name",
model_class=model.Repository,
link=(lambda item: dict(operation="view_or_manage_repository", id=item.id)),
attach_popup=False,
key="Repository.name"),
RepositoryMetadataGrid.RepositoryOwnerColumn("Owner",
model_class=model.User,
attach_popup=False,
key="User.username"),
RepositoryMetadataGrid.ChangesetRevisionColumn("Revision",
attach_popup=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, owner",
cols_to_filter=[columns[1], columns[2]],
key="free-text-search",
visible=False,
filterable="standard"))
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(model.RepositoryMetadata) \
.join(model.Repository) \
.filter(and_(model.RepositoryMetadata.table.c.includes_tools == true(),
model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table)
[docs]class ValidCategoryGrid(CategoryGrid):
[docs] class RepositoriesColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, category):
category_name = str(category.name)
filter = trans.app.repository_grid_filter_manager.get_filter(trans)
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
return trans.app.repository_registry.certified_level_one_viewable_repositories_and_suites_by_category.get(category_name, 0)
elif filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
return trans.app.repository_registry.certified_level_one_viewable_suites_by_category.get(category_name, 0)
elif filter == trans.app.repository_grid_filter_manager.filters.SUITES:
return trans.app.repository_registry.viewable_valid_suites_by_category.get(category_name, 0)
else:
# The value filter is None.
return trans.app.repository_registry.viewable_valid_repositories_and_suites_by_category.get(category_name, 0)
title = "Categories of Valid Repositories"
model_class = model.Category
template = '/webapps/tool_shed/category/valid_grid.mako'
default_sort_key = "name"
columns = [
CategoryGrid.NameColumn("Name",
key="Category.name",
link=(lambda item: dict(operation="valid_repositories_by_category", id=item.id)),
attach_popup=False),
CategoryGrid.DescriptionColumn("Description",
key="Category.description",
attach_popup=False),
# Columns that are valid for filtering but are not visible.
RepositoriesColumn("Valid repositories",
model_class=model.Repository,
attach_popup=False)
]
# Override these
default_filter = {}
global_actions = []
operations = []
standard_filters = []
num_rows_per_page = 50
use_paging = False
[docs]class ValidRepositoryGrid(RepositoryGrid):
# This grid filters out repositories that have been marked as either deleted or deprecated.
[docs] class CategoryColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, repository):
rval = '<ul>'
if repository.categories:
for rca in repository.categories:
rval += '<li><a href="browse_repositories?operation=valid_repositories_by_category&id=%s">%s</a></li>' \
% (trans.security.encode_id(rca.category.id), rca.category.name)
else:
rval += '<li>not set</li>'
rval += '</ul>'
return rval
[docs] class RepositoryCategoryColumn(grids.GridColumn):
[docs] def filter(self, trans, user, query, column_filter):
"""Modify query to filter by category."""
if column_filter == "All":
return query
return query.filter(model.Category.name == column_filter)
[docs] class InstallableRevisionColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, repository):
"""Display a SelectField whose options are the changeset_revision strings of all download-able revisions of this repository."""
select_field = grids_util.build_changeset_revision_select_field(trans, repository, downloadable=True)
if len(select_field.options) > 1:
tmpl = "<select name='%s'>" % select_field.name
for o in select_field.options:
tmpl += "<option value='%s'>%s</option>" % (o[1], o[0])
tmpl += "</select>"
return tmpl
elif len(select_field.options) == 1:
return select_field.options[0][0]
return ''
title = "Valid Repositories"
columns = [
RepositoryGrid.NameColumn("Name",
key="name",
attach_popup=True),
RepositoryGrid.DescriptionColumn("Synopsis",
key="description",
attach_popup=False),
RepositoryGrid.TypeColumn("Type"),
InstallableRevisionColumn("Installable Revisions"),
RepositoryGrid.UserColumn("Owner",
model_class=model.User,
attach_popup=False),
# Columns that are valid for filtering but are not visible.
RepositoryCategoryColumn("Category",
model_class=model.Category,
key="Category.name",
visible=False)
]
columns.append(grids.MulticolFilterColumn("Search repository name, description",
cols_to_filter=[columns[0], columns[1]],
key="free-text-search",
visible=False,
filterable="standard"))
operations = []
[docs] def build_initial_query(self, trans, **kwd):
filter = trans.app.repository_grid_filter_manager.get_filter(trans)
if 'id' in kwd:
# The user is browsing categories of valid repositories, so filter the request by the received id,
# which is a category id.
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
return trans.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.join(model.RepositoryCategoryAssociation.table) \
.join(model.Category.table) \
.filter(and_(model.Category.table.c.id == trans.security.decode_id(kwd['id']),
model.RepositoryMetadata.table.c.downloadable == true()))
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.type == rt_util.REPOSITORY_SUITE_DEFINITION) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.join(model.RepositoryCategoryAssociation.table) \
.join(model.Category.table) \
.filter(and_(model.Category.table.c.id == trans.security.decode_id(kwd['id']),
model.RepositoryMetadata.table.c.downloadable == true()))
else:
# The value of filter is None.
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.RepositoryMetadata.table) \
.join(model.User.table) \
.join(model.RepositoryCategoryAssociation.table) \
.join(model.Category.table) \
.filter(and_(model.Category.table.c.id == trans.security.decode_id(kwd['id']),
model.RepositoryMetadata.table.c.downloadable == true()))
# The user performed a free text search on the ValidCategoryGrid.
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE:
return trans.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table) \
.filter(model.RepositoryMetadata.table.c.downloadable == true())
if filter == trans.app.repository_grid_filter_manager.filters.CERTIFIED_LEVEL_ONE_SUITES:
return trans.sa_session.query(model.Repository) \
.filter(model.Repository.type == rt_util.REPOSITORY_SUITE_DEFINITION) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*trans.app.repository_registry.certified_level_one_clause_list)) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table) \
.filter(model.RepositoryMetadata.table.c.downloadable == true())
else:
# The value of filter is None.
return trans.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.RepositoryMetadata.table) \
.join(model.User.table) \
.outerjoin(model.RepositoryCategoryAssociation.table) \
.outerjoin(model.Category.table) \
.filter(model.RepositoryMetadata.table.c.downloadable == true())