import json
import logging
import os
import string
import tempfile
from datetime import date
from mercurial import (
cmdutil,
commands,
mdiff,
patch,
)
from sqlalchemy import (
false,
null,
select,
)
import tool_shed.grids.repository_grids as repository_grids
import tool_shed.grids.util as grids_util
import tool_shed.repository_types.util as rt_util
from galaxy import (
util,
web,
)
from galaxy.managers.users import get_user_by_username
from galaxy.model.base import transaction
from galaxy.tool_shed.util import dependency_display
from galaxy.tools.repositories import ValidationContext
from galaxy.util.tool_shed import encoding_util
from galaxy.web.form_builder import (
CheckboxField,
SelectField,
)
from galaxy.web.legacy_framework import grids
from galaxy.webapps.base.controller import BaseUIController
from tool_shed.dependencies.repository import relation_builder
from tool_shed.managers.repositories import readmes
from tool_shed.metadata import repository_metadata_manager
from tool_shed.tools import (
tool_validator,
tool_version_manager,
)
from tool_shed.util import (
basic_util,
common_util,
hg_util,
metadata_util,
readme_util,
repository_util,
search_util,
shed_util_common as suc,
tool_util,
)
from tool_shed.util.web_util import escape
from tool_shed.utility_containers import ToolShedUtilityContainerManager
from tool_shed.webapp.framework.decorators import require_login
from tool_shed.webapp.model import (
Category,
Repository,
RepositoryCategoryAssociation,
RepositoryMetadata,
)
from tool_shed.webapp.util import ratings_util
log = logging.getLogger(__name__)
malicious_error = " This changeset cannot be downloaded because it potentially produces malicious behavior or contains inappropriate content."
malicious_error_can_push = " Correct this changeset as soon as possible, it potentially produces malicious behavior or contains inappropriate content."
[docs]def get_mercurial_default_options_dict(command):
"""Borrowed from repoman - get default parameters for a mercurial command."""
possible = cmdutil.findpossible(command, commands.table)
# Mercurial >= 3.4 returns a tuple whose first element is the old return dict
if type(possible) is tuple:
possible = possible[0]
if len(possible) != 1:
raise Exception(f'unable to find mercurial command "{command}"')
return {r[1].replace(b"-", b"_"): r[2] for r in next(iter(possible.values()))[1][1]}
[docs]class RepositoryController(BaseUIController, ratings_util.ItemRatings):
category_grid = repository_grids.CategoryGrid()
datatypes_grid = repository_grids.DatatypesGrid()
deprecated_repositories_i_own_grid = repository_grids.DeprecatedRepositoriesIOwnGrid()
email_alerts_repository_grid = repository_grids.EmailAlertsRepositoryGrid()
docker_image_grid = repository_grids.DockerImageGrid()
install_matched_repository_grid = repository_grids.InstallMatchedRepositoryGrid()
matched_repository_grid = repository_grids.MatchedRepositoryGrid()
my_writable_repositories_grid = repository_grids.MyWritableRepositoriesGrid()
my_writable_repositories_missing_tool_test_components_grid = (
repository_grids.MyWritableRepositoriesMissingToolTestComponentsGrid()
)
my_writable_repositories_with_invalid_tools_grid = repository_grids.MyWritableRepositoriesWithInvalidToolsGrid()
repositories_by_user_grid = repository_grids.RepositoriesByUserGrid()
repositories_i_own_grid = repository_grids.RepositoriesIOwnGrid()
repositories_i_can_administer_grid = repository_grids.RepositoriesICanAdministerGrid()
repositories_in_category_grid = repository_grids.RepositoriesInCategoryGrid()
repositories_missing_tool_test_components_grid = repository_grids.RepositoriesMissingToolTestComponentsGrid()
repositories_with_invalid_tools_grid = repository_grids.RepositoriesWithInvalidToolsGrid()
repository_dependencies_grid = repository_grids.RepositoryDependenciesGrid()
repository_grid = repository_grids.RepositoryGrid()
# The repository_metadata_grid is not currently displayed, but is sub-classed by several grids.
repository_metadata_grid = repository_grids.RepositoryMetadataGrid()
tool_dependencies_grid = repository_grids.ToolDependenciesGrid()
tools_grid = repository_grids.ToolsGrid()
valid_category_grid = repository_grids.ValidCategoryGrid()
valid_repository_grid = repository_grids.ValidRepositoryGrid()
def _redirect_if_necessary(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **kwd)
)
elif operation == "repositories_by_user":
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_repositories_by_user", **kwd)
)
elif operation in ["mark as deprecated", "mark as not deprecated"]:
kwd["mark_deprecated"] = operation == "mark as deprecated"
return trans.response.send_redirect(web.url_for(controller="repository", action="deprecate", **kwd))
[docs] @web.expose
def browse_categories(self, trans, **kwd):
# The request came from the tool shed.
if "f-free-text-search" in kwd:
# Trick to enable searching repository name, description from the CategoryGrid.
# What we've done is rendered the search box for the RepositoryGrid on the grid.mako
# template for the CategoryGrid. See ~/templates/webapps/tool_shed/category/grid.mako.
# Since we are searching repositories and not categories, redirect to browse_repositories().
if "id" in kwd and "f-free-text-search" in kwd and kwd["id"] == kwd["f-free-text-search"]:
# The value of 'id' has been set to the search string, which is a repository name.
# We'll try to get the desired encoded repository id to pass on.
try:
repository_name = kwd["id"]
repository = repository_util.get_repository_by_name(trans.app, repository_name)
kwd["id"] = trans.security.encode_id(repository.id)
except Exception:
pass
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_repositories", **kwd)
)
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation in ["repositories_by_category", "repositories_by_user"]:
# Eliminate the current filters if any exist.
for k in list(kwd.keys()):
if k.startswith("f-"):
del kwd[k]
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_repositories", **kwd)
)
title = trans.app.repository_grid_filter_manager.get_grid_title(
trans, trailing_string="by Category", default="Repositories"
)
self.category_grid.title = title
return self.category_grid(trans, **kwd)
[docs] @web.expose
def browse_datatypes(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
# The received id is a RepositoryMetadata id.
repository_metadata_id = kwd["id"]
repository_metadata = metadata_util.get_repository_metadata_by_id(trans.app, repository_metadata_id)
repository_id = trans.security.encode_id(repository_metadata.repository_id)
changeset_revision = repository_metadata.changeset_revision
new_kwd = dict(id=repository_id, changeset_revision=changeset_revision)
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **new_kwd)
)
return self.datatypes_grid(trans, **kwd)
[docs] @web.expose
def browse_deprecated_repositories_i_own(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **kwd)
)
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
operation="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
return self.deprecated_repositories_i_own_grid(trans, **kwd)
[docs] @web.expose
def browse_my_writable_repositories(self, trans, **kwd):
if (_redir := self._redirect_if_necessary(trans, **kwd)) is not None:
return _redir
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
operation="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
return self.my_writable_repositories_grid(trans, **kwd)
[docs] @web.expose
def browse_repositories(self, trans, **kwd):
# We add params to the keyword dict in this method in order to rename the param with an "f-" prefix,
# simulating filtering by clicking a search link. We have to take this approach because the "-"
# character is illegal in HTTP requests.
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **kwd)
)
elif operation == "edit_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="edit_repository", **kwd)
)
elif operation == "repositories_by_user":
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_repositories_by_user", **kwd)
)
elif operation == "repositories_by_category":
category_id = kwd.get("id", None)
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories_in_category",
id=category_id,
message=message,
status=status,
)
)
elif operation == "receive email alerts":
if trans.user:
if kwd["id"]:
kwd["caller"] = "browse_repositories"
return trans.response.send_redirect(
web.url_for(controller="repository", action="set_email_alerts", **kwd)
)
else:
kwd["message"] = "You must be logged in to set email alerts."
kwd["status"] = "error"
del kwd["operation"]
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
operation="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
title = trans.app.repository_grid_filter_manager.get_grid_title(
trans, trailing_string="", default="Repositories"
)
self.repository_grid.title = title
return self.repository_grid(trans, **kwd)
[docs] @web.expose
def browse_repositories_by_user(self, trans, **kwd):
"""Display the list of repositories owned by a specified user."""
# Eliminate the current search filters if any exist.
for k in list(kwd.keys()):
if k.startswith("f-"):
del kwd[k]
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **kwd)
)
user_id = kwd.get("user_id", None)
if user_id is None:
# The received id is the repository id, so we need to get the id of the user that owns the repository.
repository_id = kwd.get("id", None)
if repository_id:
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
user_id = trans.security.encode_id(repository.user.id)
kwd["user_id"] = user_id
else:
# The user selected a repository revision which results in a refresh_on_change.
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
if user_id:
user = suc.get_user(trans.app, user_id)
trailing_string = ""
default = f"Repositories Owned by {str(user.username)}"
else:
trailing_string = ""
default = "Repositories"
title = trans.app.repository_grid_filter_manager.get_grid_title(
trans, trailing_string=trailing_string, default=default
)
self.repositories_by_user_grid.title = title
return self.repositories_by_user_grid(trans, **kwd)
[docs] @web.expose
def browse_repositories_i_can_administer(self, trans, **kwd):
if (_redir := self._redirect_if_necessary(trans, **kwd)) is not None:
return _redir
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
operation="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
return self.repositories_i_can_administer_grid(trans, **kwd)
[docs] @web.expose
def browse_repositories_i_own(self, trans, **kwd):
if (_redir := self._redirect_if_necessary(trans, **kwd)) is not None:
return _redir
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
operation="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
return self.repositories_i_own_grid(trans, **kwd)
[docs] @web.expose
def browse_repositories_in_category(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **kwd)
)
if operation == "repositories_by_user":
user_id = kwd.get("user_id", None)
if user_id is None:
# The received id is the repository id, so we need to get the id of the user that owns the repository.
repository_id = kwd.get("id", None)
if repository_id:
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
user_id = trans.security.encode_id(repository.user.id)
user = suc.get_user(trans.app, user_id)
self.repositories_by_user_grid.title = f"Repositories owned by {user.username}"
kwd["user_id"] = user_id
return self.repositories_by_user_grid(trans, **kwd)
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
# The user selected a repository revision which results in a refresh_on_change.
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="view_or_manage_repository",
id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
if category_id := kwd.get("id", None):
category = suc.get_category(trans.app, category_id)
if category:
trailing_string = f"in Category {str(category.name)}"
else:
trailing_string = "in Category"
else:
trailing_string = "in Category"
title = trans.app.repository_grid_filter_manager.get_grid_title(
trans, trailing_string=trailing_string, default="Repositories"
)
self.repositories_in_category_grid.title = title
return self.repositories_in_category_grid(trans, **kwd)
[docs] @web.expose
def browse_repository(self, trans, id, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
commit_message = escape(kwd.get("commit_message", "Deleted selected files"))
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
repo_path = repository.repo_path(trans.app)
# Update repository files for browsing.
hg_util.update_repository(repo_path)
changeset_revision = repository.tip()
metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
trans.app, id, changeset_revision, metadata_only=True
)
repository_type_select_field = rt_util.build_repository_type_select_field(trans, repository=repository)
return trans.fill_template(
"/webapps/tool_shed/repository/browse_repository.mako",
repository=repository,
changeset_revision=changeset_revision,
metadata=metadata,
commit_message=commit_message,
repository_type_select_field=repository_type_select_field,
message=message,
status=status,
)
[docs] @web.expose
def browse_repository_dependencies(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
# The received id is a RepositoryMetadata id.
repository_metadata_id = kwd["id"]
repository_metadata = metadata_util.get_repository_metadata_by_id(trans.app, repository_metadata_id)
repository_id = trans.security.encode_id(repository_metadata.repository_id)
changeset_revision = repository_metadata.changeset_revision
new_kwd = dict(id=repository_id, changeset_revision=changeset_revision)
if operation == "browse_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_repository", **new_kwd)
)
if operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **new_kwd)
)
return self.repository_dependencies_grid(trans, **kwd)
[docs] @web.expose
def browse_valid_categories(self, trans, **kwd):
"""Filter repositories per category by those that are valid for installing into Galaxy."""
# The request came from Galaxy, so restrict category links to display only valid repository changeset revisions.
if galaxy_url := common_util.handle_galaxy_url(trans, **kwd):
kwd["galaxy_url"] = galaxy_url
if "f-free-text-search" in kwd:
if kwd["f-free-text-search"] == "All":
# The user performed a search, then clicked the "x" to eliminate the search criteria.
new_kwd = {}
return self.valid_category_grid(trans, **new_kwd)
# Since we are searching valid repositories and not categories, redirect to browse_valid_repositories().
if "id" in kwd and "f-free-text-search" in kwd and kwd["id"] == kwd["f-free-text-search"]:
# The value of 'id' has been set to the search string, which is a repository name.
# We'll try to get the desired encoded repository id to pass on.
try:
name = kwd["id"]
repository = repository_util.get_repository_by_name(trans.app, name)
kwd["id"] = trans.security.encode_id(repository.id)
except Exception:
pass
return self.browse_valid_repositories(trans, **kwd)
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation in ["valid_repositories_by_category", "valid_repositories_by_user"]:
# Eliminate the current filters if any exist.
for k in list(kwd.keys()):
if k.startswith("f-"):
del kwd[k]
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_valid_repositories", **kwd)
)
title = trans.app.repository_grid_filter_manager.get_grid_title(
trans, trailing_string="by Category", default="Categories of Valid Repositories"
)
self.valid_category_grid.title = title
return self.valid_category_grid(trans, **kwd)
[docs] @web.expose
def browse_valid_repositories(self, trans, **kwd):
"""Filter repositories to those that are installable into Galaxy."""
if galaxy_url := common_util.handle_galaxy_url(trans, **kwd):
kwd["galaxy_url"] = galaxy_url
repository_id = kwd.get("id", None)
if "f-free-text-search" in kwd:
if "f-Category.name" in kwd:
# The user browsed to a category and then entered a search string, so get the category associated with its value.
category_name = kwd["f-Category.name"]
category = suc.get_category_by_name(trans.app, category_name)
# Set the id value in kwd since it is required by the ValidRepositoryGrid.build_initial_query method.
kwd["id"] = trans.security.encode_id(category.id)
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "preview_tools_in_changeset":
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
repository_metadata = metadata_util.get_latest_repository_metadata(
trans.app, repository.id, downloadable=True
)
latest_installable_changeset_revision = repository_metadata.changeset_revision
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="preview_tools_in_changeset",
repository_id=repository_id,
changeset_revision=latest_installable_changeset_revision,
)
)
elif operation == "valid_repositories_by_category":
# Eliminate the current filters if any exist.
for k in list(kwd.keys()):
if k.startswith("f-"):
del kwd[k]
category_id = kwd.get("id", None)
category = suc.get_category(trans.app, category_id)
kwd["f-Category.name"] = category.name
selected_changeset_revision, repository = suc.get_repository_from_refresh_on_change(trans.app, **kwd)
if repository:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="preview_tools_in_changeset",
repository_id=trans.security.encode_id(repository.id),
changeset_revision=selected_changeset_revision,
)
)
url_args = dict(
action="browse_valid_repositories", operation="preview_tools_in_changeset", repository_id=repository_id
)
self.valid_repository_grid.operations = [
grids.GridOperation("Preview and install", url_args=url_args, allow_multiple=False, async_compatible=False)
]
title = trans.app.repository_grid_filter_manager.get_grid_title(
trans, trailing_string="", default="Valid Repositories"
)
self.valid_repository_grid.title = title
return self.valid_repository_grid(trans, **kwd)
[docs] @web.expose
def create_galaxy_docker_image(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository_ids = util.listify(kwd.get("id", ""))
if "operation" in kwd:
if repository_ids:
operation = kwd["operation"].lower()
if operation == "include in docker image":
repository_tups = []
for repository_id in repository_ids:
repository = repository_util.get_repository_by_id(trans.app, repository_id)
repository_tups.append(
(str(repository.name), str(repository.user.username), str(repository.type))
)
return trans.fill_template(
"/webapps/tool_shed/repository/docker_image_repositories.mako",
id=",".join(repository_ids),
repository_tups=repository_tups,
message=message,
status=status,
)
else:
# This can only occur when there is a multi-select grid with check boxes and an operation,
# and the user clicked the operation button without checking any of the check boxes.
kwd["message"] = "No items were selected."
kwd["status"] = "error"
elif kwd.get("create_docker_image_button", False):
tmp_image_dir = tempfile.mkdtemp(prefix="tmp-toolshed-cdidir")
docker_file_name = "Dockerfile"
docker_file_path = os.path.join(tmp_image_dir, docker_file_name)
tool_shed_url = tool_shed_url = web.url_for("/", qualified=True)
repository_string = ""
for repository_id in repository_ids:
repository = repository_util.get_repository_by_id(trans.app, repository_id)
template = basic_util.SELECTED_REPOSITORIES_TEMPLATE
repository_template = string.Template(template).safe_substitute(
tool_shed_url=tool_shed_url,
repository_owner=str(repository.user.username),
repository_name=str(repository.name),
)
repository_string = f"{repository_string}\n{repository_template}"
template = basic_util.DOCKER_IMAGE_TEMPLATE
docker_image_template = string.Template(template).safe_substitute(selected_repositories=repository_string)
docker_image_string = docker_image_template
trans.response.set_content_type("application/text/plain")
trans.response.headers["Content-Disposition"] = f'attachment; filename="{docker_file_name}"'
opened_file = open(docker_file_path, "w")
opened_file.write(docker_image_string)
opened_file.close()
opened_file = open(docker_file_path)
# Make sure the file is removed from disk after the contents have been downloaded.
os.unlink(docker_file_path)
docker_file_path, docker_file_name = os.path.split(docker_file_path)
basic_util.remove_dir(docker_file_path)
return opened_file
return self.docker_image_grid(trans, **kwd)
[docs] @web.expose
def create_repository(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
categories = suc.get_categories(trans)
if not categories:
message = "No categories have been configured in this instance of the Galaxy Tool Shed. "
message += "An administrator needs to create some via the Administrator control panel before creating repositories."
status = "error"
return trans.response.send_redirect(
web.url_for(controller="repository", action="browse_repositories", message=message, status=status)
)
name = kwd.get("name", "").strip()
remote_repository_url = kwd.get("remote_repository_url", "")
homepage_url = kwd.get("homepage_url", "")
description = kwd.get("description", "")
long_description = kwd.get("long_description", "")
category_ids = util.listify(kwd.get("category_id", ""))
selected_categories = [trans.security.decode_id(id) for id in category_ids]
repository_type = kwd.get("repository_type", rt_util.UNRESTRICTED)
if kwd.get("create_repository_button", False):
error = False
message = repository_util.validate_repository_name(trans.app, name, trans.user)
if message:
error = True
if not description:
message = "Enter a description."
error = True
if error:
status = "error"
else:
repository, message = repository_util.create_repository(
trans.app,
name,
repository_type,
description,
long_description,
user=trans.user,
category_ids=category_ids,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
)
trans.response.send_redirect(
web.url_for(
controller="repository",
action="manage_repository",
message=message,
id=trans.security.encode_id(repository.id),
)
)
repository_type_select_field = rt_util.build_repository_type_select_field(trans)
return trans.fill_template(
"/webapps/tool_shed/repository/create_repository.mako",
name=name,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
description=description,
long_description=long_description,
selected_categories=selected_categories,
categories=categories,
repository_type_select_field=repository_type_select_field,
message=message,
status=status,
)
[docs] @web.expose
@require_login("deprecate repository")
def deprecate(self, trans, **kwd):
"""Mark a repository in the tool shed as deprecated or not deprecated."""
# Marking a repository in the tool shed as deprecated has no effect on any downloadable changeset
# revisions that may be associated with the repository. Revisions are not marked as not downlaodable
# because those that have installed the repository must be allowed to get updates.
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository_id = kwd.get("id", None)
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
mark_deprecated = util.string_as_bool(kwd.get("mark_deprecated", False))
repository.deprecated = mark_deprecated
trans.sa_session.add(repository)
with transaction(trans.sa_session):
trans.sa_session.commit()
if mark_deprecated:
# Update the repository registry.
trans.app.repository_registry.remove_entry(repository)
message = f"The repository <b>{escape(repository.name)}</b> has been marked as deprecated."
else:
# Update the repository registry.
trans.app.repository_registry.add_entry(repository)
message = f"The repository <b>{escape(repository.name)}</b> has been marked as not deprecated."
trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
operation="repositories_i_own",
message=message,
status=status,
)
)
[docs] @web.expose
def display_image_in_repository(self, trans, **kwd):
"""
Open an image file that is contained in repository or that is referenced by a URL for display. The image can be defined in
either a README.rst file contained in the repository or the help section of a Galaxy tool config that is contained in the repository.
The following image definitions are all supported. The former $PATH_TO_IMAGES is no longer required, and is now ignored.
.. image:: https://raw.github.com/galaxy/some_image.png
.. image:: $PATH_TO_IMAGES/some_image.png
.. image:: /static/images/some_image.gif
.. image:: some_image.jpg
.. image:: /deep/some_image.png
"""
repository_id = kwd.get("repository_id", None)
relative_path_to_image_file = kwd.get("image_file", None)
if repository_id and relative_path_to_image_file:
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
if repository:
repo_files_dir = repository.repo_path(trans.app)
path_to_file = repository_util.get_absolute_path_to_file_in_repository(
repo_files_dir, relative_path_to_image_file
)
if os.path.exists(path_to_file):
file_name = os.path.basename(relative_path_to_image_file)
try:
extension = file_name.split(".")[-1]
except Exception:
extension = None
if extension:
mimetype = trans.app.datatypes_registry.get_mimetype_by_extension(extension)
if mimetype:
trans.response.set_content_type(mimetype)
return open(path_to_file, "rb")
return None
[docs] @web.expose
def download(self, trans, repository_id, changeset_revision, file_type, **kwd):
"""Download an archive of the repository files compressed as zip, gz or bz2."""
# FIXME: this will currently only download the repository tip, no matter which installable changeset_revision is being viewed.
# This should be enhanced to use the export method below, which accounts for the currently viewed changeset_revision.
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
# Allow hgweb to handle the download. This requires the tool shed
# server account's .hgrc file to include the following setting:
# [web]
# allow_archive = bz2, gz, zip
file_type_str = basic_util.get_file_type_str(changeset_revision, file_type)
repository.times_downloaded += 1
trans.sa_session.add(repository)
with transaction(trans.sa_session):
trans.sa_session.commit()
tool_shed_url = web.url_for("/", qualified=True)
pathspec = ["repos", str(repository.user.username), str(repository.name), "archive", file_type_str]
download_url = util.build_url(tool_shed_url, pathspec=pathspec)
return trans.response.send_redirect(download_url)
[docs] @web.expose
def export_via_api(self, trans, **kwd):
"""Return an exported gzip compressed repository archive file opened for reading."""
encoded_repositories_archive_name = kwd.get("encoded_repositories_archive_name", None)
if encoded_repositories_archive_name:
repositories_archive_name = encoding_util.tool_shed_decode(encoded_repositories_archive_name)
opened_archive = open(repositories_archive_name)
# Make sure the file is removed from disk after the contents have been downloaded.
os.unlink(repositories_archive_name)
return opened_archive
return ""
[docs] @web.expose
def get_changeset_revision_and_ctx_rev(self, trans, **kwd):
"""Handle a request from a local Galaxy instance to retrieve the changeset revision hash to which an installed repository can be updated."""
def has_galaxy_utilities(repository_metadata):
has_galaxy_utilities_dict = dict(
includes_data_managers=False,
includes_datatypes=False,
includes_tools=False,
includes_tools_for_display_in_tool_panel=False,
has_repository_dependencies=False,
has_repository_dependencies_only_if_compiling_contained_td=False,
includes_tool_dependencies=False,
includes_workflows=False,
)
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
if "data_manager" in metadata:
has_galaxy_utilities_dict["includes_data_managers"] = True
if "datatypes" in metadata:
has_galaxy_utilities_dict["includes_datatypes"] = True
if "tools" in metadata:
has_galaxy_utilities_dict["includes_tools"] = True
if "tool_dependencies" in metadata:
has_galaxy_utilities_dict["includes_tool_dependencies"] = True
repository_dependencies_dict = metadata.get("repository_dependencies", {})
repository_dependencies = repository_dependencies_dict.get("repository_dependencies", [])
(
has_repository_dependencies,
has_repository_dependencies_only_if_compiling_contained_td,
) = repository_util.get_repository_dependency_types(repository_dependencies)
has_galaxy_utilities_dict["has_repository_dependencies"] = has_repository_dependencies
has_galaxy_utilities_dict["has_repository_dependencies_only_if_compiling_contained_td"] = (
has_repository_dependencies_only_if_compiling_contained_td
)
if "workflows" in metadata:
has_galaxy_utilities_dict["includes_workflows"] = True
return has_galaxy_utilities_dict
name = kwd.get("name", None)
owner = kwd.get("owner", None)
changeset_revision = kwd.get("changeset_revision", None)
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, trans.security.encode_id(repository.id), changeset_revision
)
has_galaxy_utilities_dict = has_galaxy_utilities(repository_metadata)
includes_data_managers = has_galaxy_utilities_dict["includes_data_managers"]
includes_datatypes = has_galaxy_utilities_dict["includes_datatypes"]
includes_tools = has_galaxy_utilities_dict["includes_tools"]
includes_tools_for_display_in_tool_panel = has_galaxy_utilities_dict["includes_tools_for_display_in_tool_panel"]
includes_tool_dependencies = has_galaxy_utilities_dict["includes_tool_dependencies"]
has_repository_dependencies = has_galaxy_utilities_dict["has_repository_dependencies"]
has_repository_dependencies_only_if_compiling_contained_td = has_galaxy_utilities_dict[
"has_repository_dependencies_only_if_compiling_contained_td"
]
includes_workflows = has_galaxy_utilities_dict["includes_workflows"]
repo = repository.hg_repo
# Default to the received changeset revision and ctx_rev.
update_to_ctx = hg_util.get_changectx_for_changeset(repo, changeset_revision)
ctx_rev = str(update_to_ctx.rev())
latest_changeset_revision = changeset_revision
update_dict = dict(
changeset_revision=changeset_revision,
ctx_rev=ctx_rev,
includes_data_managers=includes_data_managers,
includes_datatypes=includes_datatypes,
includes_tools=includes_tools,
includes_tools_for_display_in_tool_panel=includes_tools_for_display_in_tool_panel,
includes_tool_dependencies=includes_tool_dependencies,
has_repository_dependencies=has_repository_dependencies,
has_repository_dependencies_only_if_compiling_contained_td=has_repository_dependencies_only_if_compiling_contained_td,
includes_workflows=includes_workflows,
)
if changeset_revision == repository.tip():
# If changeset_revision is the repository tip, there are no additional updates.
return encoding_util.tool_shed_encode(update_dict)
else:
if repository_metadata:
# If changeset_revision is in the repository_metadata table for this repository, there are no additional updates.
return encoding_util.tool_shed_encode(update_dict)
else:
# The changeset_revision column in the repository_metadata table has been updated with a new changeset_revision value since the
# repository was installed. We need to find the changeset_revision to which we need to update.
update_to_changeset_hash = None
for changeset in repo.changelog:
includes_tools = False
has_repository_dependencies = False
has_repository_dependencies_only_if_compiling_contained_td = False
changeset_hash = str(repo[changeset])
if update_to_changeset_hash:
update_to_repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, trans.security.encode_id(repository.id), changeset_hash
)
if update_to_repository_metadata:
has_galaxy_utilities_dict = has_galaxy_utilities(repository_metadata)
includes_data_managers = has_galaxy_utilities_dict["includes_data_managers"]
includes_datatypes = has_galaxy_utilities_dict["includes_datatypes"]
includes_tools = has_galaxy_utilities_dict["includes_tools"]
includes_tools_for_display_in_tool_panel = has_galaxy_utilities_dict[
"includes_tools_for_display_in_tool_panel"
]
includes_tool_dependencies = has_galaxy_utilities_dict["includes_tool_dependencies"]
has_repository_dependencies = has_galaxy_utilities_dict["has_repository_dependencies"]
has_repository_dependencies_only_if_compiling_contained_td = has_galaxy_utilities_dict[
"has_repository_dependencies_only_if_compiling_contained_td"
]
includes_workflows = has_galaxy_utilities_dict["includes_workflows"]
# We found a RepositoryMetadata record.
if changeset_hash == repository.tip():
# The current ctx is the repository tip, so use it.
update_to_ctx = hg_util.get_changectx_for_changeset(repo, changeset_hash)
latest_changeset_revision = changeset_hash
else:
update_to_ctx = hg_util.get_changectx_for_changeset(repo, update_to_changeset_hash)
latest_changeset_revision = update_to_changeset_hash
break
elif not update_to_changeset_hash and changeset_hash == changeset_revision:
# We've found the changeset in the changelog for which we need to get the next update.
update_to_changeset_hash = changeset_hash
update_dict["includes_data_managers"] = includes_data_managers
update_dict["includes_datatypes"] = includes_datatypes
update_dict["includes_tools"] = includes_tools
update_dict["includes_tools_for_display_in_tool_panel"] = includes_tools_for_display_in_tool_panel
update_dict["includes_tool_dependencies"] = includes_tool_dependencies
update_dict["includes_workflows"] = includes_workflows
update_dict["has_repository_dependencies"] = has_repository_dependencies
update_dict["has_repository_dependencies_only_if_compiling_contained_td"] = (
has_repository_dependencies_only_if_compiling_contained_td
)
update_dict["changeset_revision"] = str(latest_changeset_revision)
update_dict["ctx_rev"] = str(update_to_ctx.rev())
return encoding_util.tool_shed_encode(update_dict)
[docs] @web.expose
def get_ctx_rev(self, trans, **kwd):
"""Given a repository and changeset_revision, return the correct ctx.rev() value."""
repository_name = kwd["name"]
repository_owner = kwd["owner"]
changeset_revision = kwd["changeset_revision"]
repository = repository_util.get_repository_by_name_and_owner(trans.app, repository_name, repository_owner)
repo = repository.hg_repo
if ctx := hg_util.get_changectx_for_changeset(repo, changeset_revision):
return str(ctx.rev())
return ""
[docs] @web.json
@web.do_not_cache
def get_file_contents(self, trans, file_path, repository_id):
is_admin = trans.user_is_admin
return suc.get_repository_file_contents(trans.app, file_path, repository_id, is_admin)
[docs] @web.json
def get_latest_downloadable_changeset_revision(self, trans, **kwd):
"""
Return the latest installable changeset revision for the repository associated with the received
name and owner. This method is called from Galaxy when attempting to install the latest revision
of an installed repository.
"""
repository_name = kwd.get("name", None)
repository_owner = kwd.get("owner", None)
if repository_name is not None and repository_owner is not None:
repository = repository_util.get_repository_by_name_and_owner(trans.app, repository_name, repository_owner)
if repository:
return metadata_util.get_latest_downloadable_changeset_revision(trans.app, repository)
return hg_util.INITIAL_CHANGELOG_HASH
[docs] @web.json
def get_readme_files(self, trans, **kwd):
"""
This method is called when installing or re-installing a single repository into a Galaxy instance.
If the received changeset_revision includes one or more readme files, return them in a dictionary.
"""
repository_name = kwd.get("name", None)
repository_owner = kwd.get("owner", None)
changeset_revision = kwd.get("changeset_revision", None)
if repository_name is not None and repository_owner is not None and changeset_revision is not None:
repository = repository_util.get_repository_by_name_and_owner(trans.app, repository_name, repository_owner)
return readmes(trans.app, repository, changeset_revision)
return {}
[docs] @web.json
def get_repository_dependencies(self, trans, **kwd):
"""
Return an encoded dictionary of all repositories upon which the contents of the received repository
depends.
"""
name = kwd.get("name", None)
owner = kwd.get("owner", None)
changeset_revision = kwd.get("changeset_revision", None)
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
# get_repository_dependencies( self, app, changeset, toolshed_url )
dependencies = repository.get_repository_dependencies(
trans.app, changeset_revision, web.url_for("/", qualified=True)
)
if dependencies:
return encoding_util.tool_shed_encode(dependencies)
return ""
[docs] @web.expose
def get_repository_id(self, trans, **kwd):
"""Given a repository name and owner, return the encoded repository id."""
repository_name = kwd["name"]
repository_owner = kwd["owner"]
repository = repository_util.get_repository_by_name_and_owner(trans.app, repository_name, repository_owner)
if repository:
return trans.security.encode_id(repository.id)
return ""
[docs] @web.expose
def get_repository_type(self, trans, **kwd):
"""Given a repository name and owner, return the type."""
repository_name = kwd["name"]
repository_owner = kwd["owner"]
repository = repository_util.get_repository_by_name_and_owner(trans.app, repository_name, repository_owner)
return str(repository.type)
[docs] @web.json
def get_required_repo_info_dict(self, trans, encoded_str=None):
"""
Retrieve and return a dictionary that includes a list of dictionaries that each contain all of the
information needed to install the list of repositories defined by the received encoded_str.
"""
repo_info_dict = {}
if encoded_str:
encoded_required_repository_str = encoding_util.tool_shed_decode(encoded_str)
encoded_required_repository_tups = encoded_required_repository_str.split(encoding_util.encoding_sep2)
decoded_required_repository_tups = []
for encoded_required_repository_tup in encoded_required_repository_tups:
decoded_required_repository_tups.append(
encoded_required_repository_tup.split(encoding_util.encoding_sep)
)
encoded_repository_ids = []
changeset_revisions = []
for required_repository_tup in decoded_required_repository_tups:
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(required_repository_tup)
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
encoded_repository_ids.append(trans.security.encode_id(repository.id))
changeset_revisions.append(changeset_revision)
if encoded_repository_ids and changeset_revisions:
repo_info_dict = json.loads(
self.get_repository_information(trans, encoded_repository_ids, changeset_revisions)
)
return repo_info_dict
[docs] @web.expose
def get_tool_dependencies_config_contents(self, trans, **kwd):
"""
Handle a request from a Galaxy instance to get the tool_dependencies.xml file contents for a
specified changeset revision.
"""
name = kwd.get("name", None)
owner = kwd.get("owner", None)
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
# TODO: We're currently returning the tool_dependencies.xml file that is available on disk. We need
# to enhance this process to retrieve older versions of the tool-dependencies.xml file from the repository
# manafest.
repo_dir = repository.repo_path(trans.app)
# Get the tool_dependencies.xml file from disk.
tool_dependencies_config = hg_util.get_config_from_disk(rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME, repo_dir)
# Return the encoded contents of the tool_dependencies.xml file.
if tool_dependencies_config:
tool_dependencies_config_file = open(tool_dependencies_config, "rb")
contents = tool_dependencies_config_file.read()
tool_dependencies_config_file.close()
return contents
return ""
[docs] @web.expose
def help(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
return trans.fill_template("/webapps/tool_shed/repository/help.mako", message=message, status=status, **kwd)
[docs] @web.expose
def index(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
# See if there are any RepositoryMetadata records since menu items require them.
repository_metadata = get_first_repository_metadata(trans.sa_session)
# TODO: move the following to some in-memory register so these queries can be done once
# at startup. The in-memory register can then be managed during the current session.
can_administer_repositories = False
has_deprecated_repositories = False
if current_user := trans.user:
# See if the current user has any repositories that have been marked as deprecated.
for repository in current_user.active_repositories:
if repository.deprecated:
has_deprecated_repositories = True
break
# See if the current user can administer any repositories, but only if not an admin user.
if not trans.user_is_admin:
if current_user.active_repositories:
can_administer_repositories = True
else:
for repository in get_current_repositories(trans.sa_session):
if trans.app.security_agent.user_can_administer_repository(current_user, repository):
can_administer_repositories = True
break
# Route in may have been from a sharable URL, in whcih case we'll have a user_id and possibly a name
# The received user_id will be the id of the repository owner.
user_id = kwd.get("user_id", None)
repository_id = kwd.get("repository_id", None)
changeset_revision = kwd.get("changeset_revision", None)
self.validate_changeset_revision(trans, changeset_revision, repository_id)
return trans.fill_template(
"/webapps/tool_shed/index.mako",
repository_metadata=repository_metadata,
can_administer_repositories=can_administer_repositories,
has_deprecated_repositories=has_deprecated_repositories,
user_id=user_id,
repository_id=repository_id,
changeset_revision=changeset_revision,
message=message,
status=status,
)
[docs] @web.expose
@require_login("manage email alerts")
def manage_email_alerts(self, trans, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
new_repo_alert = kwd.get("new_repo_alert", "")
new_repo_alert_checked = CheckboxField.is_checked(new_repo_alert)
user = trans.user
if kwd.get("new_repo_alert_button", False):
user.new_repo_alert = new_repo_alert_checked
trans.sa_session.add(user)
with transaction(trans.sa_session):
trans.sa_session.commit()
if new_repo_alert_checked:
message = "You will receive email alerts for all new valid tool shed repositories."
else:
message = "You will not receive any email alerts for new valid tool shed repositories."
checked = new_repo_alert_checked or (user and user.new_repo_alert)
new_repo_alert_check_box = CheckboxField("new_repo_alert", value=checked)
email_alert_repositories = []
for repository in get_current_email_alert_repositories(trans.sa_session):
if user.email in repository.email_alerts:
email_alert_repositories.append(repository)
return trans.fill_template(
"/webapps/tool_shed/user/manage_email_alerts.mako",
new_repo_alert_check_box=new_repo_alert_check_box,
email_alert_repositories=email_alert_repositories,
message=message,
status=status,
)
[docs] @web.expose
@require_login("manage repository")
def manage_repository(self, trans, id, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
repository_type = kwd.get("repository_type", str(repository.type))
repo = repository.hg_repo
repo_name = kwd.get("repo_name", repository.name)
changeset_revision = kwd.get("changeset_revision", repository.tip())
repository.share_url = repository_util.generate_sharable_link_for_repository_in_tool_shed(
repository, changeset_revision=changeset_revision
)
repository.clone_url = common_util.generate_clone_url_for_repository_in_tool_shed(trans.user, repository)
remote_repository_url = kwd.get("remote_repository_url", repository.remote_repository_url)
homepage_url = kwd.get("homepage_url", repository.homepage_url)
description = kwd.get("description", repository.description)
long_description = kwd.get("long_description", repository.long_description)
avg_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, repository, webapp_model=trans.model)
alerts = kwd.get("alerts", "")
alerts_checked = CheckboxField.is_checked(alerts)
category_ids = util.listify(kwd.get("category_id", ""))
if repository.email_alerts:
email_alerts = json.loads(repository.email_alerts)
else:
email_alerts = []
allow_push = kwd.get("allow_push", "")
error = False
user = trans.user
if kwd.get("edit_repository_button", False):
update_kwds = dict(
name=repo_name,
description=description,
long_description=long_description,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
type=repository_type,
)
repository, message = repository_util.update_repository(trans=trans, id=id, **update_kwds)
if repository is None:
return trans.response.send_redirect(
web.url_for(
controller="repository", action="view_repository", id=id, message=message, status="error"
)
)
elif kwd.get("manage_categories_button", False):
flush_needed = False
# Delete all currently existing categories.
for rca in repository.categories:
trans.sa_session.delete(rca)
with transaction(trans.sa_session):
trans.sa_session.commit()
if category_ids:
# Create category associations
for category_id in category_ids:
category = trans.sa_session.get(Category, trans.security.decode_id(category_id))
rca = RepositoryCategoryAssociation(repository, category)
trans.sa_session.add(rca)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = "The repository information has been updated."
elif kwd.get("user_access_button", False):
if allow_push not in ["none"]:
remove_auth = kwd.get("remove_auth", "")
if remove_auth:
usernames = ""
else:
user_ids = util.listify(allow_push)
usernames = []
for user_id in user_ids:
user = trans.sa_session.get(trans.model.User, trans.security.decode_id(user_id))
usernames.append(user.username)
usernames = ",".join(usernames)
repository.set_allow_push(usernames, remove_auth=remove_auth)
message = "The repository information has been updated."
elif kwd.get("receive_email_alerts_button", False):
flush_needed = False
if alerts_checked:
if user.email not in email_alerts:
email_alerts.append(user.email)
repository.email_alerts = json.dumps(email_alerts)
flush_needed = True
else:
if user.email in email_alerts:
email_alerts.remove(user.email)
repository.email_alerts = json.dumps(email_alerts)
flush_needed = True
if flush_needed:
trans.sa_session.add(repository)
with transaction(trans.sa_session):
trans.sa_session.commit()
message = "The repository information has been updated."
if error:
status = "error"
allow_push_select_field = SelectField(name="allow_push", multiple=True)
if current_allow_push := repository.allow_push():
current_allow_push_list = current_allow_push.split(",")
else:
current_allow_push_list = []
options = []
for user in trans.sa_session.scalars(select(trans.model.User)):
if user.username not in current_allow_push_list:
options.append(user)
for obj in options:
label = obj.username
allow_push_select_field.add_option(label, trans.security.encode_id(obj.id))
checked = alerts_checked or user.email in email_alerts
alerts_check_box = CheckboxField("alerts", value=checked)
changeset_revision_select_field = grids_util.build_changeset_revision_select_field(
trans, repository, selected_value=changeset_revision, add_id_to_name=False, downloadable=False
)
revision_label = hg_util.get_revision_label(trans.app, repository, repository.tip(), include_date=False)
repository_metadata = None
metadata = None
is_malicious = False
repository_dependencies = None
if changeset_revision != hg_util.INITIAL_CHANGELOG_HASH:
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, id, changeset_revision
)
if repository_metadata:
revision_label = hg_util.get_revision_label(
trans.app, repository, changeset_revision, include_date=False
)
metadata = repository_metadata.metadata
is_malicious = repository_metadata.malicious
else:
# There is no repository_metadata defined for the changeset_revision, so see if it was defined in a previous
# changeset in the changelog.
previous_changeset_revision = metadata_util.get_previous_metadata_changeset_revision(
trans.app, repository, changeset_revision, downloadable=False
)
if previous_changeset_revision != hg_util.INITIAL_CHANGELOG_HASH:
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, id, previous_changeset_revision
)
if repository_metadata:
revision_label = hg_util.get_revision_label(
trans.app, repository, previous_changeset_revision, include_date=False
)
metadata = repository_metadata.metadata
is_malicious = repository_metadata.malicious
changeset_revision = previous_changeset_revision
if repository_metadata:
metadata = repository_metadata.metadata
# Get a dictionary of all repositories upon which the contents of the current repository_metadata record depend.
toolshed_base_url = str(web.url_for("/", qualified=True)).rstrip("/")
rb = relation_builder.RelationBuilder(trans.app, repository, repository_metadata, toolshed_base_url)
repository_dependencies = rb.get_repository_dependencies_for_changeset_revision()
if str(repository.type) != rt_util.REPOSITORY_SUITE_DEFINITION:
# Handle messaging for resetting repository type to the optimal value.
change_repository_type_message = rt_util.generate_message_for_repository_type_change(
trans.app, repository
)
if change_repository_type_message:
message += change_repository_type_message
status = "warning"
elif str(repository.type) != rt_util.TOOL_DEPENDENCY_DEFINITION:
# Handle messaging for resetting repository type to the optimal value.
change_repository_type_message = rt_util.generate_message_for_repository_type_change(
trans.app, repository
)
if change_repository_type_message:
message += change_repository_type_message
status = "warning"
else:
# Handle messaging for orphan tool dependency definitions.
dd = dependency_display.DependencyDisplayer(trans.app)
orphan_message = dd.generate_message_for_orphan_tool_dependencies(repository, metadata)
if orphan_message:
message += orphan_message
status = "warning"
if is_malicious:
if trans.app.security_agent.can_push(trans.app, trans.user, repository):
message += malicious_error_can_push
else:
message += malicious_error
status = "error"
repository_type_select_field = rt_util.build_repository_type_select_field(trans, repository=repository)
malicious_check_box = CheckboxField("malicious", value=is_malicious)
categories = suc.get_categories(trans.app)
selected_categories = [_rca.category_id for _rca in repository.categories]
tsucm = ToolShedUtilityContainerManager(trans.app)
containers_dict = tsucm.build_repository_containers(
repository, changeset_revision, repository_dependencies, repository_metadata
)
heads = hg_util.get_repository_heads(repo)
deprecated_repository_dependency_tups = metadata_util.get_repository_dependency_tups_from_repository_metadata(
trans.app, repository_metadata, deprecated_only=True
)
return trans.fill_template(
"/webapps/tool_shed/repository/manage_repository.mako",
repo_name=repo_name,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
description=description,
long_description=long_description,
current_allow_push_list=current_allow_push_list,
allow_push_select_field=allow_push_select_field,
deprecated_repository_dependency_tups=deprecated_repository_dependency_tups,
repo=repo,
heads=heads,
repository=repository,
containers_dict=containers_dict,
repository_metadata=repository_metadata,
changeset_revision=changeset_revision,
changeset_revision_select_field=changeset_revision_select_field,
revision_label=revision_label,
selected_categories=selected_categories,
categories=categories,
metadata=metadata,
avg_rating=avg_rating,
num_ratings=num_ratings,
alerts_check_box=alerts_check_box,
malicious_check_box=malicious_check_box,
repository_type_select_field=repository_type_select_field,
message=message,
status=status,
)
[docs] @web.expose
@require_login("manage repository administrators")
def manage_repository_admins(self, trans, id, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
changeset_revision = kwd.get("changeset_revision", repository.tip())
metadata = None
if changeset_revision != hg_util.INITIAL_CHANGELOG_HASH:
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, id, changeset_revision
)
if repository_metadata:
metadata = repository_metadata.metadata
else:
# There is no repository_metadata defined for the changeset_revision, so see if it was defined
# in a previous changeset in the changelog.
previous_changeset_revision = metadata_util.get_previous_metadata_changeset_revision(
trans.app, repository, changeset_revision, downloadable=False
)
if previous_changeset_revision != hg_util.INITIAL_CHANGELOG_HASH:
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, id, previous_changeset_revision
)
if repository_metadata:
metadata = repository_metadata.metadata
role = repository.admin_role
associations_dict = repository_util.handle_role_associations(trans.app, role, repository, **kwd)
in_users = associations_dict.get("in_users", [])
out_users = associations_dict.get("out_users", [])
in_groups = associations_dict.get("in_groups", [])
out_groups = associations_dict.get("out_groups", [])
message = associations_dict.get("message", "")
status = associations_dict.get("status", "done")
return trans.fill_template(
"/webapps/tool_shed/role/role.mako",
in_admin_controller=False,
repository=repository,
metadata=metadata,
changeset_revision=changeset_revision,
role=role,
in_users=in_users,
out_users=out_users,
in_groups=in_groups,
out_groups=out_groups,
message=message,
status=status,
)
[docs] @web.expose
@require_login("multi select email alerts")
def multi_select_email_alerts(self, trans, **kwd):
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "receive email alerts":
if trans.user:
if kwd["id"]:
kwd["caller"] = "multi_select_email_alerts"
return trans.response.send_redirect(
web.url_for(controller="repository", action="set_email_alerts", **kwd)
)
else:
kwd["message"] = "You must be logged in to set email alerts."
kwd["status"] = "error"
del kwd["operation"]
elif operation == "view_or_manage_repository":
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_or_manage_repository", **kwd)
)
self.email_alerts_repository_grid.title = "Set email alerts for repository changes"
return self.email_alerts_repository_grid(trans, **kwd)
[docs] @web.expose
def next_installable_changeset_revision(self, trans, **kwd):
"""
Handle a request from a Galaxy instance where the changeset_revision defined for a repository
in a dependency definition file is older than the changeset_revision associated with the installed
repository.
"""
name = kwd.get("name", None)
owner = kwd.get("owner", None)
changeset_revision = kwd.get("changeset_revision", None)
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
# Get the next installable changeset_revision beyond the received changeset_revision.
next_changeset_revision = metadata_util.get_next_downloadable_changeset_revision(
trans.app, repository, changeset_revision
)
if next_changeset_revision and next_changeset_revision != changeset_revision:
return next_changeset_revision
return ""
[docs] @web.json
@web.do_not_cache
def open_folder(self, trans, folder_path, repository_id):
is_admin = trans.user_is_admin
return suc.open_repository_files_folder(trans.app, folder_path, repository_id, is_admin)
[docs] @web.expose
def previous_changeset_revisions(self, trans, from_tip=False, **kwd):
"""
Handle a request from a local Galaxy instance. This method will handle two scenarios: (1) the
repository was previously installed using an older changeset_revsion, but later the repository
was updated in the tool shed and the Galaxy admin is trying to install the latest changeset
revision of the same repository instead of updating the one that was previously installed. (2)
the admin is attempting to get updates for an installed repository that has a repository dependency
and both the repository and its dependency have available updates. In this case, the from_tip
parameter will be True because the repository dependency definition may define a changeset hash
for the dependency that is newer than the installed changeset revision of the dependency (this is
due to the behavior of "Tool dependency definition" repositories, whose metadata is always the tip),
so the complete list of changset hashes in the changelog must be returned.
"""
name = kwd.get("name", None)
owner = kwd.get("owner", None)
if name is not None and owner is not None:
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
from_tip = util.string_as_bool(from_tip)
if from_tip:
changeset_revision = repository.tip()
else:
changeset_revision = kwd.get("changeset_revision", None)
if changeset_revision is not None:
repo = repository.hg_repo
# Get the lower bound changeset revision.
lower_bound_changeset_revision = metadata_util.get_previous_metadata_changeset_revision(
trans.app, repository, changeset_revision, downloadable=True
)
# Build the list of changeset revision hashes.
changeset_hashes = []
for changeset in hg_util.reversed_lower_upper_bounded_changelog(
repo, lower_bound_changeset_revision, changeset_revision
):
changeset_hashes.append(str(repo[changeset]))
if changeset_hashes:
changeset_hashes_str = ",".join(changeset_hashes)
return changeset_hashes_str
return ""
[docs] @web.expose
@require_login("rate repositories")
def rate_repository(self, trans, **kwd):
"""Rate a repository and return updated rating data."""
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
id = kwd.get("id", None)
if not id:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
message="Select a repository to rate",
status="error",
)
)
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
changeset_revision = repository.tip()
if repository.user == trans.user:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="browse_repositories",
message="You are not allowed to rate your own repository",
status="error",
)
)
if kwd.get("rate_button", False):
rating = int(kwd.get("rating", "0"))
comment = kwd.get("comment", "")
rating = self.rate_item(trans, trans.user, repository, rating, comment)
avg_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, repository, webapp_model=trans.model)
rra = self.get_user_item_rating(trans.sa_session, trans.user, repository, webapp_model=trans.model)
metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
trans.app, id, changeset_revision, metadata_only=True
)
repository_type_select_field = rt_util.build_repository_type_select_field(trans, repository=repository)
revision_label = hg_util.get_revision_label(trans.app, repository, changeset_revision, include_date=True)
return trans.fill_template(
"/webapps/tool_shed/repository/rate_repository.mako",
repository=repository,
metadata=metadata,
revision_label=revision_label,
avg_rating=avg_rating,
num_ratings=num_ratings,
rra=rra,
repository_type_select_field=repository_type_select_field,
message=message,
status=status,
)
[docs] @web.expose
@require_login("set email alerts")
def set_email_alerts(self, trans, **kwd):
"""Set email alerts for selected repositories."""
# This method is called from multiple grids, so the caller must be passed.
caller = kwd["caller"]
if user := trans.user:
repository_ids = util.listify(kwd.get("id", ""))
total_alerts_added = 0
total_alerts_removed = 0
flush_needed = False
for repository_id in repository_ids:
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
if repository.email_alerts:
email_alerts = json.loads(repository.email_alerts)
else:
email_alerts = []
if user.email in email_alerts:
email_alerts.remove(user.email)
repository.email_alerts = json.dumps(email_alerts)
trans.sa_session.add(repository)
flush_needed = True
total_alerts_removed += 1
else:
email_alerts.append(user.email)
repository.email_alerts = json.dumps(email_alerts)
trans.sa_session.add(repository)
flush_needed = True
total_alerts_added += 1
if flush_needed:
with transaction(trans.sa_session):
trans.sa_session.commit()
message = "Total alerts added: %d, total alerts removed: %d" % (total_alerts_added, total_alerts_removed)
kwd["message"] = message
kwd["status"] = "done"
del kwd["operation"]
return trans.response.send_redirect(web.url_for(controller="repository", action=caller, **kwd))
[docs] @web.expose
@require_login("set repository as malicious")
def set_malicious(self, trans, id, ctx_str, **kwd):
malicious = kwd.get("malicious", "")
if kwd.get("malicious_button", False):
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(trans.app, id, ctx_str)
malicious_checked = CheckboxField.is_checked(malicious)
repository_metadata.malicious = malicious_checked
trans.sa_session.add(repository_metadata)
with transaction(trans.sa_session):
trans.sa_session.commit()
if malicious_checked:
message = "The repository tip has been defined as malicious."
else:
message = "The repository tip has been defined as <b>not</b> malicious."
status = "done"
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="manage_repository",
id=id,
changeset_revision=ctx_str,
malicious=malicious,
message=message,
status=status,
)
)
[docs] @web.expose
def sharable_owner(self, trans, owner):
"""Support for sharable URL for each repository owner's tools, e.g. http://example.org/view/owner."""
try:
user = get_user_by_username(trans.model.session, owner, trans.model.User)
except Exception:
user = None
if user:
user_id = trans.security.encode_id(user.id)
return trans.response.send_redirect(web.url_for(controller="repository", action="index", user_id=user_id))
else:
return trans.show_error_message(
"The tool shed <b>{}</b> contains no repositories owned by <b>{}</b>.".format(
web.url_for("/", qualified=True).rstrip("/"), owner
)
)
[docs] @web.expose
def sharable_repository(self, trans, owner, name):
"""Support for sharable URL for a specified repository, e.g. http://example.org/view/owner/name."""
try:
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
except Exception:
repository = None
if repository:
repository_id = trans.security.encode_id(repository.id)
return trans.response.send_redirect(
web.url_for(controller="repository", action="index", repository_id=repository_id)
)
else:
# If the owner is valid, then show all of their repositories.
try:
user = get_user_by_username(trans.model.session, owner, trans.model.User)
except Exception:
user = None
if user:
user_id = trans.security.encode_id(user.id)
message = (
f"This list of repositories owned by <b>{owner}</b>, does not include one named <b>{name}</b>."
)
return trans.response.send_redirect(
web.url_for(
controller="repository", action="index", user_id=user_id, message=message, status="error"
)
)
else:
return trans.show_error_message(
"The tool shed <b>{}</b> contains no repositories named <b>{}</b> with owner <b>{}</b>.".format(
web.url_for("/", qualified=True).rstrip("/"), name, owner
)
)
[docs] @web.expose
def sharable_repository_revision(self, trans, owner, name, changeset_revision):
"""Support for sharable URL for a specified repository revision, e.g. http://example.org/view/owner/name/changeset_revision."""
try:
repository = repository_util.get_repository_by_name_and_owner(trans.app, name, owner)
except Exception:
repository = None
if repository:
repository_id = trans.security.encode_id(repository.id)
repository_metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
trans.app, repository_id, changeset_revision
)
if not repository_metadata:
# Get updates to the received changeset_revision if any exist.
upper_bound_changeset_revision = metadata_util.get_next_downloadable_changeset_revision(
trans.app, repository, changeset_revision
)
if upper_bound_changeset_revision and upper_bound_changeset_revision != changeset_revision:
changeset_revision = upper_bound_changeset_revision
repository_metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
trans.app, repository_id, changeset_revision
)
if repository_metadata:
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="index",
repository_id=repository_id,
changeset_revision=changeset_revision,
)
)
else:
message = f"The change log for the repository named <b>{escape(str(name))}</b> owned by <b>{escape(str(owner))}</b> does not include revision <b>{escape(str(changeset_revision))}</b>."
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="index",
repository_id=repository_id,
message=message,
status="error",
)
)
else:
# See if the owner is valid.
return trans.response.send_redirect(
web.url_for(controller="repository", action="sharable_owner", owner=owner)
)
[docs] @web.expose
def updated_changeset_revisions(self, trans, **kwd):
"""
Handle a request from a local Galaxy instance to retrieve the list of changeset revisions to which an
installed repository can be updated. This method will return a string of comma-separated changeset revision
hashes for all available updates to the received changeset revision. Among other things , this method
handles the scenario where an installed tool shed repository's tool_dependency definition file defines a
changeset revision for a complex repository dependency that is outdated. In other words, a defined changeset
revision is older than the current changeset revision for the required repository, making it impossible to
discover the repository without knowledge of revisions to which it could have been updated.
"""
name = kwd.get("name", None)
owner = kwd.get("owner", None)
changeset_revision = kwd.get("changeset_revision", None)
if name and owner and changeset_revision:
return metadata_util.get_updated_changeset_revisions(trans.app, name, owner, changeset_revision)
return ""
[docs] @web.expose
def view_changelog(self, trans, id, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
repo = repository.hg_repo
changesets = []
for changeset in repo.changelog:
ctx = repo[changeset]
if metadata_util.get_repository_metadata_by_changeset_revision(trans.app, id, str(ctx)):
has_metadata = True
else:
has_metadata = False
change_dict = {
"ctx": ctx,
"rev": str(ctx.rev()),
"date": date,
"display_date": hg_util.get_readable_ctx_date(ctx),
"description": ctx.description(),
"files": ctx.files(),
"user": ctx.user(),
"parent": ctx.parents()[0],
"has_metadata": has_metadata,
}
# Make sure we'll view latest changeset first.
changesets.insert(0, change_dict)
metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
trans.app, id, repository.tip(), metadata_only=True
)
return trans.fill_template(
"/webapps/tool_shed/repository/view_changelog.mako",
repository=repository,
metadata=metadata,
changesets=changesets,
message=message,
status=status,
)
[docs] @web.expose
def view_changeset(self, trans, id, ctx_str, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
repo = repository.hg_repo
ctx = hg_util.get_changectx_for_changeset(repo, ctx_str)
if ctx is None:
message = f"Repository does not include changeset revision '{str(ctx_str)}'."
status = "error"
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_changelog", id=id, message=message, status=status)
)
ctx_parent = ctx.parents()[0]
if ctx.children():
ctx_child = ctx.children()[0]
else:
ctx_child = None
diffs = []
options_dict = get_mercurial_default_options_dict(b"diff")
# Not quite sure if the following settings make any difference, but with a combination of them and the size check on each
# diff, we don't run out of memory when viewing the changelog of the cisortho2 repository on the test tool shed.
options_dict = {util.unicodify(k): util.unicodify(v) for k, v in options_dict.items()}
options_dict["maxfile"] = basic_util.MAXDIFFSIZE
options_dict["maxtotal"] = basic_util.MAXDIFFSIZE
diffopts = mdiff.diffopts(**options_dict)
for diff in patch.diff(repo, node1=ctx_parent.node(), node2=ctx.node(), opts=diffopts):
if len(diff) > basic_util.MAXDIFFSIZE:
diff = util.shrink_string_by_size(diff, basic_util.MAXDIFFSIZE)
diffs.append(basic_util.to_html_string(diff))
modified, added, removed, deleted, unknown, ignored, clean = repo.status(
node1=ctx_parent.node(), node2=ctx.node()
)
anchors = modified + added + removed + deleted + unknown + ignored + clean
anchors = util.unicodify(anchors)
metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
trans.app, id, ctx_str, metadata_only=True
)
# For rendering the prev button.
if ctx_parent:
ctx_parent_date = hg_util.get_readable_ctx_date(ctx_parent)
ctx_parent_rev = ctx_parent.rev()
if ctx_parent_rev < 0:
prev = None
else:
prev = f"<b>{ctx_parent_rev}:{ctx_parent}</b> <i>({ctx_parent_date})</i>"
else:
prev = None
if ctx_child:
ctx_child_date = hg_util.get_readable_ctx_date(ctx_child)
ctx_child_rev = ctx_child.rev()
next = f"<b>{ctx_child_rev}:{ctx_child}</b> <i>({ctx_child_date})</i>"
else:
next = None
return trans.fill_template(
"/webapps/tool_shed/repository/view_changeset.mako",
repository=repository,
metadata=metadata,
prev=prev,
next=next,
ctx=ctx,
ctx_parent=ctx_parent,
ctx_child=ctx_child,
anchors=anchors,
modified=modified,
added=added,
removed=removed,
deleted=deleted,
unknown=unknown,
ignored=ignored,
clean=clean,
diffs=diffs,
message=message,
status=status,
)
[docs] @web.expose
def view_or_manage_repository(self, trans, **kwd):
if repository_id := kwd.get("id", None):
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
user = trans.user
if repository:
if user is not None and (
trans.user_is_admin or trans.app.security_agent.user_can_administer_repository(user, repository)
):
return trans.response.send_redirect(
web.url_for(controller="repository", action="manage_repository", **kwd)
)
else:
return trans.response.send_redirect(
web.url_for(controller="repository", action="view_repository", **kwd)
)
return trans.show_error_message(f"Invalid repository id '{repository_id}' received.")
return trans.show_error_message("The repository id was not received.")
[docs] @web.expose
def view_repository(self, trans, id, **kwd):
message = escape(kwd.get("message", ""))
status = kwd.get("status", "done")
repository = repository_util.get_repository_in_tool_shed(trans.app, id)
repo = repository.hg_repo
avg_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, repository, webapp_model=trans.model)
changeset_revision = kwd.get("changeset_revision", repository.tip())
self.validate_changeset_revision(trans, changeset_revision, id)
repository.share_url = repository_util.generate_sharable_link_for_repository_in_tool_shed(
repository, changeset_revision=changeset_revision
)
repository.clone_url = common_util.generate_clone_url_for_repository_in_tool_shed(trans.user, repository)
alerts = kwd.get("alerts", "")
alerts_checked = CheckboxField.is_checked(alerts)
if repository.email_alerts:
email_alerts = json.loads(repository.email_alerts)
else:
email_alerts = []
repository_dependencies = None
user = trans.user
if user and kwd.get("receive_email_alerts_button", False):
flush_needed = False
if alerts_checked:
if user.email not in email_alerts:
email_alerts.append(user.email)
repository.email_alerts = json.dumps(email_alerts)
flush_needed = True
else:
if user.email in email_alerts:
email_alerts.remove(user.email)
repository.email_alerts = json.dumps(email_alerts)
flush_needed = True
if flush_needed:
trans.sa_session.add(repository)
with transaction(trans.sa_session):
trans.sa_session.commit()
checked = alerts_checked or (user and user.email in email_alerts)
alerts_check_box = CheckboxField("alerts", value=checked)
changeset_revision_select_field = grids_util.build_changeset_revision_select_field(
trans, repository, selected_value=changeset_revision, add_id_to_name=False, downloadable=False
)
revision_label = hg_util.get_revision_label(trans.app, repository, changeset_revision, include_date=False)
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
trans.app, id, changeset_revision
)
if repository_metadata:
metadata = repository_metadata.metadata
# Get a dictionary of all repositories upon which the contents of the current repository_metadata record depend.
toolshed_base_url = str(web.url_for("/", qualified=True)).rstrip("/")
rb = relation_builder.RelationBuilder(trans.app, repository, repository_metadata, toolshed_base_url)
repository_dependencies = rb.get_repository_dependencies_for_changeset_revision()
if str(repository.type) != rt_util.TOOL_DEPENDENCY_DEFINITION:
# Handle messaging for orphan tool dependency definitions.
dd = dependency_display.DependencyDisplayer(trans.app)
orphan_message = dd.generate_message_for_orphan_tool_dependencies(repository, metadata)
if orphan_message:
message += orphan_message
status = "warning"
else:
metadata = None
is_malicious = metadata_util.is_malicious(trans.app, id, repository.tip())
if is_malicious:
if trans.app.security_agent.can_push(trans.app, trans.user, repository):
message += malicious_error_can_push
else:
message += malicious_error
status = "error"
tsucm = ToolShedUtilityContainerManager(trans.app)
containers_dict = tsucm.build_repository_containers(
repository, changeset_revision, repository_dependencies, repository_metadata
)
repository_type_select_field = rt_util.build_repository_type_select_field(trans, repository=repository)
heads = hg_util.get_repository_heads(repo)
return trans.fill_template(
"/webapps/tool_shed/repository/view_repository.mako",
repo=repo,
heads=heads,
repository=repository,
repository_metadata=repository_metadata,
metadata=metadata,
containers_dict=containers_dict,
avg_rating=avg_rating,
num_ratings=num_ratings,
alerts_check_box=alerts_check_box,
changeset_revision=changeset_revision,
changeset_revision_select_field=changeset_revision_select_field,
revision_label=revision_label,
repository_type_select_field=repository_type_select_field,
message=message,
status=status,
)
[docs] def validate_changeset_revision(self, trans, changeset_revision, repository_id):
"""In case changeset revision is invalid send them to the repository page"""
if changeset_revision:
repository = repository_util.get_repository_in_tool_shed(trans.app, repository_id)
repo = repository.hg_repo
if not hg_util.get_changectx_for_changeset(repo, changeset_revision):
message = "Invalid changeset revision"
return trans.response.send_redirect(
web.url_for(
controller="repository",
action="index",
repository_id=repository_id,
message=message,
status="error",
)
)
[docs]def get_current_repositories(session):
stmt = select(Repository).where(Repository.deleted == false())
return session.scalars(stmt)
[docs]def get_current_email_alert_repositories(session):
stmt = (
select(Repository)
.where(Repository.deleted == false())
.where(Repository.email_alerts != null())
.order_by(Repository.name)
)
return session.scalars(stmt)