Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.metadata.repository_metadata_manager
import logging
import tempfile
from typing import (
Any,
Dict,
List,
Optional,
)
from sqlalchemy import (
false,
select,
)
from galaxy import util
from galaxy.model.base import transaction
from galaxy.tool_shed.metadata.metadata_generator import (
BaseMetadataGenerator,
HandleResultT,
InvalidFileT,
)
from galaxy.util import inflector
from galaxy.web.form_builder import SelectField
from tool_shed.context import ProvidesRepositoriesContext
from tool_shed.repository_types import util as rt_util
from tool_shed.repository_types.metadata import TipOnly
from tool_shed.structured_app import ToolShedApp
from tool_shed.util import (
basic_util,
common_util,
hg_util,
metadata_util,
repository_util,
shed_util_common as suc,
tool_util,
)
from tool_shed.util.metadata_util import repository_metadata_by_changeset_revision
from tool_shed.webapp.model import (
Repository,
RepositoryMetadata,
User,
)
log = logging.getLogger(__name__)
[docs]class ToolShedMetadataGenerator(BaseMetadataGenerator):
"""A MetadataGenerator building on ToolShed's app and repository constructs."""
app: ToolShedApp
repository: Optional[Repository]
# why is mypy making me re-annotate these things from the base class, it didn't
# when they were in the same file
invalid_file_tups: List[InvalidFileT]
repository_clone_url: Optional[str]
[docs] def __init__(
self,
trans: ProvidesRepositoriesContext,
repository: Optional[Repository] = None,
changeset_revision: Optional[str] = None,
repository_clone_url: Optional[str] = None,
shed_config_dict: Optional[Dict[str, Any]] = None,
relative_install_dir=None,
repository_files_dir=None,
resetting_all_metadata_on_repository=False,
updating_installed_repository=False,
persist=False,
metadata_dict=None,
user=None,
):
self.trans = trans
self.app = trans.app
self.user = user
self.repository = repository
if changeset_revision is None and self.repository is not None:
self.changeset_revision = self.repository.tip()
else:
self.changeset_revision = changeset_revision
if repository_clone_url is None and self.repository is not None:
self.repository_clone_url = common_util.generate_clone_url_for(self.trans, self.repository)
else:
self.repository_clone_url = repository_clone_url
if shed_config_dict is None:
self.shed_config_dict = {}
else:
self.shed_config_dict = shed_config_dict
if relative_install_dir is None and self.repository is not None:
relative_install_dir = self.repository.repo_path(self.app)
if repository_files_dir is None and self.repository is not None:
repository_files_dir = self.repository.repo_path(self.app)
if metadata_dict is None:
self.metadata_dict = {}
else:
self.metadata_dict = metadata_dict
self.relative_install_dir = relative_install_dir
self.repository_files_dir = repository_files_dir
self.resetting_all_metadata_on_repository = resetting_all_metadata_on_repository
self.updating_installed_repository = updating_installed_repository
self.persist = persist
self.invalid_file_tups = []
self.sa_session = trans.app.model.session
[docs] def set_repository(
self, repository, relative_install_dir: Optional[str] = None, changeset_revision: Optional[str] = None
):
self.repository = repository
if relative_install_dir is None and self.repository is not None:
relative_install_dir = repository.repo_path(self.app)
if changeset_revision is None and self.repository is not None:
self.set_changeset_revision(self.repository.tip())
else:
self.set_changeset_revision(changeset_revision)
self.shed_config_dict = {}
self._reset_attributes_after_repository_update(relative_install_dir)
[docs] def handle_repository_elem(self, repository_elem, only_if_compiling_contained_td=False) -> HandleResultT:
"""
Process the received repository_elem which is a <repository> tag either from a
repository_dependencies.xml file or a tool_dependencies.xml file. If the former,
we're generating repository dependencies metadata for a repository in the Tool Shed.
If the latter, we're generating package dependency metadata within Galaxy or the
Tool Shed.
"""
is_valid = True
error_message = ""
toolshed = repository_elem.get("toolshed", None)
name = repository_elem.get("name", None)
owner = repository_elem.get("owner", None)
changeset_revision = repository_elem.get("changeset_revision", None)
prior_installation_required = str(repository_elem.get("prior_installation_required", False))
repository_dependency_tup = [
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
str(only_if_compiling_contained_td),
]
if not toolshed:
# Default to the current tool shed.
toolshed = self.trans.repositories_hostname
log.warning(f"\n\n\n\n\n\nin not toolshed with {toolshed}\n\n\n\n")
# toolshed = str(url_for("/", qualified=True)).rstrip("/")
repository_dependency_tup[0] = toolshed
else:
log.warning(f"moooocww.....{toolshed}\n\n\n\n\n")
toolshed = common_util.remove_protocol_from_tool_shed_url(toolshed)
if suc.tool_shed_is_this_tool_shed(toolshed, trans=self.trans):
try:
user = get_user_by_username(self.sa_session, owner)
except Exception:
error_message = (
f"Ignoring repository dependency definition for tool shed {toolshed}, name {name}, owner {owner}, "
)
error_message += f"changeset revision {changeset_revision} because the owner is invalid."
log.debug(error_message)
is_valid = False
return repository_dependency_tup, is_valid, error_message
try:
repository = get_repository(self.sa_session, name, user.id)
except Exception:
error_message = f"Ignoring repository dependency definition for tool shed {toolshed},"
error_message += f"name {name}, owner {owner}, "
error_message += f"changeset revision {changeset_revision} because the name is invalid. "
log.debug(error_message)
is_valid = False
return repository_dependency_tup, is_valid, error_message
repo = repository.hg_repo
# The received changeset_revision may be None since defining it in the dependency definition is optional.
# If this is the case, the default will be to set its value to the repository dependency tip revision.
# This probably occurs only when handling circular dependency definitions.
tip_ctx = repo[repo.changelog.tip()]
# Make sure the repo.changlog includes at least 1 revision.
if changeset_revision is None and tip_ctx.rev() >= 0:
changeset_revision = str(tip_ctx)
repository_dependency_tup = [
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
str(only_if_compiling_contained_td),
]
return repository_dependency_tup, is_valid, error_message
else:
# Find the specified changeset revision in the repository's changelog to see if it's valid.
found = False
for changeset in repo.changelog:
changeset_hash = str(repo[changeset])
if changeset_hash == changeset_revision:
found = True
break
if not found:
error_message = f"Ignoring repository dependency definition for tool shed {toolshed}, name {name}, owner {owner}, "
error_message += (
f"changeset revision {changeset_revision} because the changeset revision is invalid. "
)
log.debug(error_message)
is_valid = False
return repository_dependency_tup, is_valid, error_message
else:
# Repository dependencies are currently supported within a single tool shed.
error_message = "Repository dependencies are currently supported only within the same tool shed. Ignoring "
error_message += f"repository dependency definition for tool shed {toolshed}, name {name}, owner {owner}, changeset revision {changeset_revision}. "
log.debug(error_message)
is_valid = False
return repository_dependency_tup, is_valid, error_message
return repository_dependency_tup, is_valid, error_message
[docs]class RepositoryMetadataManager(ToolShedMetadataGenerator):
[docs] def __init__(
self,
trans: ProvidesRepositoriesContext,
repository=None,
changeset_revision=None,
repository_clone_url=None,
shed_config_dict=None,
relative_install_dir=None,
repository_files_dir=None,
resetting_all_metadata_on_repository=False,
updating_installed_repository=False,
persist=False,
metadata_dict=None,
):
super().__init__(
trans,
repository,
changeset_revision,
repository_clone_url,
shed_config_dict,
relative_install_dir,
repository_files_dir,
resetting_all_metadata_on_repository,
updating_installed_repository,
persist,
metadata_dict=metadata_dict,
user=trans.user,
)
app = trans.app
user = trans.user
self.sa_session = app.model.context
self.app = app
self.user = user
# Repository metadata comparisons for changeset revisions.
self.EQUAL = "equal"
self.NO_METADATA = "no metadata"
self.NOT_EQUAL_AND_NOT_SUBSET = "not equal and not subset"
self.SUBSET = "subset"
self.SUBSET_VALUES = [self.EQUAL, self.SUBSET]
def _add_tool_versions(self, id: int, repository_metadata, changeset_revisions):
# Build a dictionary of { 'tool id' : 'parent tool id' } pairs for each tool in repository_metadata.
metadata = repository_metadata.metadata
tool_versions_dict = {}
for tool_dict in metadata.get("tools", []):
# We have at least 2 changeset revisions to compare tool guids and tool ids.
parent_id = self._get_parent_id(
id, tool_dict["id"], tool_dict["version"], tool_dict["guid"], changeset_revisions
)
tool_versions_dict[tool_dict["guid"]] = parent_id
if tool_versions_dict:
repository_metadata.tool_versions = tool_versions_dict
self.sa_session.add(repository_metadata)
session = self.sa_session()
with transaction(session):
session.commit()
[docs] def build_repository_ids_select_field(
self, name="repository_ids", multiple=True, display="checkboxes", my_writable=False
):
"""Generate the current list of repositories for resetting metadata."""
repositories_select_field = SelectField(name=name, multiple=multiple, display=display)
for repository in self.get_repositories_for_setting_metadata(my_writable=my_writable, order=True):
owner = str(repository.user.username)
option_label = f"{str(repository.name)} ({owner})"
option_value = f"{self.app.security.encode_id(repository.id)}"
repositories_select_field.add_option(option_label, option_value)
return repositories_select_field
def _clean_repository_metadata(self, changeset_revisions):
assert self.repository
# Delete all repository_metadata records associated with the repository that have
# a changeset_revision that is not in changeset_revisions. We sometimes see multiple
# records with the same changeset revision value - no idea how this happens. We'll
# assume we can delete the older records, so we'll order by update_time descending and
# delete records that have the same changeset_revision we come across later.
for repository_metadata in get_repository_metadata(self.sa_session, self.repository.id):
changeset_revision = repository_metadata.changeset_revision
if changeset_revision not in changeset_revisions:
self.sa_session.delete(repository_metadata)
session = self.sa_session()
with transaction(session):
session.commit()
[docs] def compare_changeset_revisions(self, ancestor_changeset_revision, ancestor_metadata_dict):
"""
Compare the contents of two changeset revisions to determine if a new repository
metadata revision should be created.
"""
# The metadata associated with ancestor_changeset_revision is ancestor_metadata_dict.
# This changeset_revision is an ancestor of self.changeset_revision which is associated
# with self.metadata_dict. A new repository_metadata record will be created only
# when this method returns the constant value self.NOT_EQUAL_AND_NOT_SUBSET.
ancestor_tools = ancestor_metadata_dict.get("tools", [])
ancestor_guids = [tool_dict["guid"] for tool_dict in ancestor_tools]
ancestor_guids.sort()
ancestor_repository_dependencies_dict = ancestor_metadata_dict.get("repository_dependencies", {})
ancestor_repository_dependencies = ancestor_repository_dependencies_dict.get("repository_dependencies", [])
ancestor_tool_dependencies = ancestor_metadata_dict.get("tool_dependencies", {})
ancestor_data_manager = ancestor_metadata_dict.get("data_manager", {})
current_tools = self.metadata_dict.get("tools", [])
current_guids = [tool_dict["guid"] for tool_dict in current_tools]
current_guids.sort()
current_repository_dependencies_dict = self.metadata_dict.get("repository_dependencies", {})
current_repository_dependencies = current_repository_dependencies_dict.get("repository_dependencies", [])
current_tool_dependencies = self.metadata_dict.get("tool_dependencies", {})
current_data_manager = self.metadata_dict.get("data_manager", {})
# Handle case where no metadata exists for either changeset.
no_repository_dependencies = not ancestor_repository_dependencies and not current_repository_dependencies
no_tool_dependencies = not ancestor_tool_dependencies and not current_tool_dependencies
no_tools = not ancestor_guids and not current_guids
no_data_manager = not ancestor_data_manager and not current_data_manager
if no_repository_dependencies and no_tool_dependencies and no_tools and no_data_manager:
return self.NO_METADATA
repository_dependency_comparison = self.compare_repository_dependencies(
ancestor_repository_dependencies, current_repository_dependencies
)
tool_dependency_comparison = self.compare_tool_dependencies(
ancestor_tool_dependencies, current_tool_dependencies
)
data_manager_comparison = self.compare_data_manager(ancestor_data_manager, current_data_manager)
# Handle case where all metadata is the same.
if (
ancestor_guids == current_guids
and repository_dependency_comparison == self.EQUAL
and tool_dependency_comparison == self.EQUAL
and data_manager_comparison == self.EQUAL
):
return self.EQUAL
# Handle case where ancestor metadata is a subset of current metadata.
# readme_file_is_subset = readme_file_comparision in [ self.EQUAL, self.SUBSET ]
repository_dependency_is_subset = repository_dependency_comparison in self.SUBSET_VALUES
tool_dependency_is_subset = tool_dependency_comparison in self.SUBSET_VALUES
datamanager_is_subset = data_manager_comparison in self.SUBSET_VALUES
if repository_dependency_is_subset and tool_dependency_is_subset and datamanager_is_subset:
is_subset = True
for guid in ancestor_guids:
if guid not in current_guids:
is_subset = False
break
if is_subset:
return self.SUBSET
return self.NOT_EQUAL_AND_NOT_SUBSET
[docs] def compare_data_manager(self, ancestor_metadata, current_metadata):
"""Determine if ancestor_metadata is the same as or a subset of current_metadata for data_managers."""
def __data_manager_dict_to_tuple_list(metadata_dict):
# we do not check tool_guid or tool conf file name
return set(
sorted(
(
name,
tuple(sorted(value.get("data_tables", []))),
value.get("guid"),
value.get("version"),
value.get("name"),
value.get("id"),
)
for name, value in metadata_dict.items()
)
)
# only compare valid entries, any invalid entries are ignored
ancestor_metadata = __data_manager_dict_to_tuple_list(ancestor_metadata.get("data_managers", {}))
current_metadata = __data_manager_dict_to_tuple_list(current_metadata.get("data_managers", {}))
# use set comparisons
if ancestor_metadata.issubset(current_metadata):
if ancestor_metadata == current_metadata:
return self.EQUAL
return self.SUBSET
return self.NOT_EQUAL_AND_NOT_SUBSET
[docs] def compare_repository_dependencies(self, ancestor_repository_dependencies, current_repository_dependencies):
"""
Determine if ancestor_repository_dependencies is the same as or a subset of
current_repository_dependencies.
"""
# The list of repository_dependencies looks something like:
# [["http://localhost:9009", "emboss_datatypes", "test", "ab03a2a5f407", "False", "False"]].
# Create a string from each tuple in the list for easier comparison.
if len(ancestor_repository_dependencies) <= len(current_repository_dependencies):
for ancestor_tup in ancestor_repository_dependencies:
(
a_tool_shed,
a_repo_name,
a_repo_owner,
a_changeset_revision,
a_prior_installation_required,
a_only_if_compiling_contained_td,
) = ancestor_tup
cleaned_a_tool_shed = common_util.remove_protocol_from_tool_shed_url(a_tool_shed)
found_in_current = False
for current_tup in current_repository_dependencies:
(
c_tool_shed,
c_repo_name,
c_repo_owner,
c_changeset_revision,
c_prior_installation_required,
c_only_if_compiling_contained_td,
) = current_tup
cleaned_c_tool_shed = common_util.remove_protocol_from_tool_shed_url(c_tool_shed)
if (
cleaned_c_tool_shed == cleaned_a_tool_shed
and c_repo_name == a_repo_name
and c_repo_owner == a_repo_owner
and c_changeset_revision == a_changeset_revision
and util.string_as_bool(c_prior_installation_required)
== util.string_as_bool(a_prior_installation_required)
and util.string_as_bool(c_only_if_compiling_contained_td)
== util.string_as_bool(a_only_if_compiling_contained_td)
):
found_in_current = True
break
if not found_in_current:
# In some cases, the only difference between a dependency definition in the lists
# is the changeset_revision value. We'll check to see if this is the case, and if
# the defined dependency is a repository that has metadata set only on its tip.
if not self.different_revision_defines_tip_only_repository_dependency(
ancestor_tup, current_repository_dependencies
):
return self.NOT_EQUAL_AND_NOT_SUBSET
return self.SUBSET
if len(ancestor_repository_dependencies) == len(current_repository_dependencies):
return self.EQUAL
else:
return self.SUBSET
return self.NOT_EQUAL_AND_NOT_SUBSET
[docs] def compare_tool_dependencies(self, ancestor_tool_dependencies, current_tool_dependencies):
"""
Determine if ancestor_tool_dependencies is the same as or a subset of current_tool_dependencies.
"""
# The tool_dependencies dictionary looks something like:
# {'bwa/0.5.9': {'readme': 'some string', 'version': '0.5.9', 'type': 'package', 'name': 'bwa'}}
if len(ancestor_tool_dependencies) <= len(current_tool_dependencies):
for ancestor_td_key in ancestor_tool_dependencies.keys():
if ancestor_td_key in current_tool_dependencies:
# The only values that could have changed between the 2 dictionaries are the
# "readme" or "type" values. Changing the readme value makes no difference.
# Changing the type will change the installation process, but for now we'll
# assume it was a typo, so new metadata shouldn't be generated.
continue
else:
return self.NOT_EQUAL_AND_NOT_SUBSET
# At this point we know that ancestor_tool_dependencies is at least a subset of current_tool_dependencies.
if len(ancestor_tool_dependencies) == len(current_tool_dependencies):
return self.EQUAL
else:
return self.SUBSET
return self.NOT_EQUAL_AND_NOT_SUBSET
[docs] def create_or_update_repository_metadata(self, changeset_revision, metadata_dict):
"""Create or update a repository_metadata record in the tool shed."""
has_repository_dependencies = False
has_repository_dependencies_only_if_compiling_contained_td = False
includes_tools = False
includes_tool_dependencies = False
if metadata_dict:
repository_dependencies_dict = metadata_dict.get("repository_dependencies", {})
repository_dependencies = repository_dependencies_dict.get("repository_dependencies", [])
(
has_repository_dependencies,
has_repository_dependencies_only_if_compiling_contained_td,
) = repository_util.get_repository_dependency_types(repository_dependencies)
if "tools" in metadata_dict:
includes_tools = True
if "tool_dependencies" in metadata_dict:
includes_tool_dependencies = True
if (
has_repository_dependencies
or has_repository_dependencies_only_if_compiling_contained_td
or includes_tools
or includes_tool_dependencies
):
downloadable = True
else:
downloadable = False
assert self.repository
repository_metadata = repository_metadata_by_changeset_revision(
self.app.model, self.repository.id, changeset_revision
)
if repository_metadata:
repository_metadata.metadata = metadata_dict
repository_metadata.downloadable = downloadable
repository_metadata.has_repository_dependencies = has_repository_dependencies
repository_metadata.includes_datatypes = False
repository_metadata.includes_tools = includes_tools
repository_metadata.includes_tool_dependencies = includes_tool_dependencies
repository_metadata.includes_workflows = False
else:
repository_metadata = self.app.model.RepositoryMetadata(
repository_id=self.repository.id,
changeset_revision=changeset_revision,
metadata=metadata_dict,
downloadable=downloadable,
has_repository_dependencies=has_repository_dependencies,
includes_datatypes=False,
includes_tools=includes_tools,
includes_tool_dependencies=includes_tool_dependencies,
includes_workflows=False,
)
assert repository_metadata
# Always set the default values for the following columns. When resetting all metadata
# on a repository this will reset the values.
assert repository_metadata
repository_metadata.missing_test_components = False
self.sa_session.add(repository_metadata)
session = self.sa_session()
with transaction(session):
session.commit()
return repository_metadata
[docs] def different_revision_defines_tip_only_repository_dependency(self, rd_tup, repository_dependencies):
"""
Determine if the only difference between rd_tup and a dependency definition in the list of
repository_dependencies is the changeset_revision value.
"""
(
rd_tool_shed,
rd_name,
rd_owner,
rd_changeset_revision,
rd_prior_installation_required,
rd_only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(rd_tup)
cleaned_rd_tool_shed = common_util.remove_protocol_from_tool_shed_url(rd_tool_shed)
for repository_dependency in repository_dependencies:
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(repository_dependency)
cleaned_tool_shed = common_util.remove_protocol_from_tool_shed_url(tool_shed)
if cleaned_rd_tool_shed == cleaned_tool_shed and rd_name == name and rd_owner == owner:
# Determine if the repository represented by the dependency tuple is an instance of the repository type TipOnly.
required_repository = repository_util.get_repository_by_name_and_owner(self.app, name, owner)
repository_type_class = self.app.repository_types_registry.get_class_by_label(required_repository.type)
return isinstance(repository_type_class, TipOnly)
return False
def _get_parent_id(self, id: int, old_id, version, guid, changeset_revisions):
parent_id = None
# Compare from most recent to oldest.
changeset_revisions.reverse()
for changeset_revision in changeset_revisions:
repository_metadata = repository_metadata_by_changeset_revision(self.app.model, id, changeset_revision)
assert repository_metadata
metadata = repository_metadata.metadata
tools_dicts = metadata.get("tools", [])
for tool_dict in tools_dicts:
if tool_dict["guid"] == guid:
# The tool has not changed between the compared changeset revisions.
continue
if tool_dict["id"] == old_id and tool_dict["version"] != version:
# The tool version is different, so we've found the parent.
return tool_dict["guid"]
if parent_id is None:
# The tool did not change through all of the changeset revisions.
return old_id
[docs] def get_repositories_for_setting_metadata(self, my_writable=False, order=True):
"""
Return a list of repositories for resetting metadata. The order parameter
is used for displaying the list of repositories ordered alphabetically for display on
a page. When called from the Tool Shed API, order is False.
"""
# When called from the Tool Shed API, the metadata is reset on all repositories of types
# repository_suite_definition and tool_dependency_definition in addition to other selected
# repositories.
if my_writable:
username = self.user.username
repo_ids = []
for repository in get_current_repositories(self.sa_session):
# Always reset metadata on all repositories of types repository_suite_definition and
# tool_dependency_definition.
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION, rt_util.TOOL_DEPENDENCY_DEFINITION]:
repo_ids.append(repository.id)
else:
allow_push = repository.allow_push()
if allow_push:
# Include all repositories that are writable by the current user.
allow_push_usernames = allow_push.split(",")
if username in allow_push_usernames:
repo_ids.append(repository.id)
if repo_ids:
return get_filtered_repositories(self.sa_session, repo_ids, order)
else:
return []
else:
return get_current_repositories(self.sa_session, order)
[docs] def new_metadata_required_for_utilities(self):
"""
This method compares the last stored repository_metadata record associated with self.repository
against the contents of self.metadata_dict and returns True or False for the union set of Galaxy
utilities contained in both metadata dictionaries. The metadata contained in self.metadata_dict
may not be a subset of that contained in the last stored repository_metadata record associated with
self.repository because one or more Galaxy utilities may have been deleted from self.repository in
the new tip.
"""
assert self.repository
repository_metadata = metadata_util.get_latest_repository_metadata(
self.app, self.repository.id, downloadable=False
)
repository_dependencies_required = self.new_repository_dependency_metadata_required(repository_metadata)
tools_required = self.new_tool_metadata_required(repository_metadata)
tool_dependencies_required = self.new_tool_dependency_metadata_required(repository_metadata)
data_managers_required = self.new_data_manager_required(repository_metadata)
if repository_dependencies_required or tools_required or tool_dependencies_required or data_managers_required:
return True
return False
[docs] def new_repository_dependency_metadata_required(self, repository_metadata):
"""
Compare the last saved metadata for each repository dependency in the repository
with the new metadata in self.metadata_dict to determine if a new repository_metadata
table record is required or if the last saved metadata record can be updated for
repository_dependencies instead.
"""
if repository_metadata:
metadata = repository_metadata.metadata
if "repository_dependencies" in metadata:
saved_repository_dependencies = metadata["repository_dependencies"]["repository_dependencies"]
new_repository_dependencies_metadata = self.metadata_dict.get("repository_dependencies", None)
if new_repository_dependencies_metadata:
new_repository_dependencies = self.metadata_dict["repository_dependencies"][
"repository_dependencies"
]
# TODO: We used to include the following here to handle the case where repository
# dependency definitions were deleted. However this erroneously returned True in
# cases where is should not have done so. This usually occurred where multiple single
# files were uploaded when a single tarball should have been. We need to implement
# support for handling deleted repository dependency definitions so that we can guarantee
# reproducibility, but we need to do it in a way that is better than the following.
# for new_repository_dependency in new_repository_dependencies:
# if new_repository_dependency not in saved_repository_dependencies:
# return True
# The saved metadata must be a subset of the new metadata.
for saved_repository_dependency in saved_repository_dependencies:
if saved_repository_dependency not in new_repository_dependencies:
# In some cases, the only difference between a dependency definition in the lists
# is the changeset_revision value. We'll check to see if this is the case, and if
# the defined dependency is a repository that has metadata set only on its tip.
if not self.different_revision_defines_tip_only_repository_dependency(
saved_repository_dependency, new_repository_dependencies
):
return True
return False
else:
# The repository_dependencies.xml file must have been deleted, so create a new
# repository_metadata record so we always have access to the deleted file.
return True
else:
return False
else:
if "repository_dependencies" in self.metadata_dict:
# There is no saved repository metadata, so we need to create a new repository_metadata record.
return True
else:
# self.metadata_dict includes no metadata for repository dependencies, so a new repository_metadata
# record is not needed.
return False
[docs] def new_data_manager_required(self, repository_metadata):
if self.metadata_dict and repository_metadata and repository_metadata.metadata:
return self.compare_data_manager(self.metadata_dict, repository_metadata.metadata) != self.EQUAL
else:
return bool(
repository_metadata
and repository_metadata.metadata
and repository_metadata.metadata.get("data_managers")
)
[docs] def new_tool_metadata_required(self, repository_metadata):
"""
Compare the last saved metadata for each tool in the repository with the new metadata in
self.metadata_dict to determine if a new repository_metadata table record is required, or if
the last saved metadata record can be updated instead.
"""
if "tools" in self.metadata_dict:
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
if "tools" in metadata:
saved_tool_ids = []
# The metadata for one or more tools was successfully generated in the past
# for this repository, so we first compare the version string for each tool id
# in self.metadata_dict with what was previously saved to see if we need to create
# a new table record or if we can simply update the existing record.
for new_tool_metadata_dict in self.metadata_dict["tools"]:
for saved_tool_metadata_dict in metadata["tools"]:
if saved_tool_metadata_dict["id"] not in saved_tool_ids:
saved_tool_ids.append(saved_tool_metadata_dict["id"])
if new_tool_metadata_dict["id"] == saved_tool_metadata_dict["id"]:
if new_tool_metadata_dict["version"] != saved_tool_metadata_dict["version"]:
return True
# So far, a new metadata record is not required, but we still have to check to see if
# any new tool ids exist in self.metadata_dict that are not in the saved metadata. We do
# this because if a new tarball was uploaded to a repository that included tools, it
# may have removed existing tool files if they were not included in the uploaded tarball.
for new_tool_metadata_dict in self.metadata_dict["tools"]:
if new_tool_metadata_dict["id"] not in saved_tool_ids:
return True
return False
else:
# The new metadata includes tools, but the stored metadata does not, so we can
# update the stored metadata.
return False
else:
# There is no stored metadata, so we can update the metadata column in the
# repository_metadata table.
return False
else:
# There is no stored repository metadata, so we need to create a new repository_metadata
# table record.
return True
# self.metadata_dict includes no metadata for tools, so a new repository_metadata table
# record is not needed.
return False
[docs] def new_tool_dependency_metadata_required(self, repository_metadata):
"""
Compare the last saved metadata for each tool dependency in the repository with the new
metadata in self.metadata_dict to determine if a new repository_metadata table record is
required or if the last saved metadata record can be updated for tool_dependencies instead.
"""
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
if "tool_dependencies" in metadata:
saved_tool_dependencies = metadata["tool_dependencies"]
new_tool_dependencies = self.metadata_dict.get("tool_dependencies", None)
if new_tool_dependencies:
# TODO: We used to include the following here to handle the case where
# tool dependency definitions were deleted. However, this erroneously
# returned True in cases where is should not have done so. This usually
# occurred where multiple single files were uploaded when a single tarball
# should have been. We need to implement support for handling deleted
# tool dependency definitions so that we can guarantee reproducibility,
# but we need to do it in a way that is better than the following.
# for new_tool_dependency in new_tool_dependencies:
# if new_tool_dependency not in saved_tool_dependencies:
# return True
# The saved metadata must be a subset of the new metadata.
for saved_tool_dependency in saved_tool_dependencies:
if saved_tool_dependency not in new_tool_dependencies:
return True
return False
else:
# The tool_dependencies.xml file must have been deleted, so create a new
# repository_metadata record so we always have
# access to the deleted file.
return True
else:
return False
else:
# We have repository metadata that does not include metadata for any tool dependencies
# in the repository, so we can update the existing repository metadata.
return False
else:
if "tool_dependencies" in self.metadata_dict:
# There is no saved repository metadata, so we need to create a new repository_metadata
# record.
return True
else:
# self.metadata_dict includes no metadata for tool dependencies, so a new repository_metadata
# record is not needed.
return False
[docs] def reset_all_metadata_on_repository_in_tool_shed(self, repository_clone_url=None):
"""Reset all metadata on a single repository in a tool shed."""
assert self.repository
log.debug(f"Resetting all metadata on repository: {self.repository.name}")
repo = self.repository.hg_repo
# The list of changeset_revisions refers to repository_metadata records that have been created
# or updated. When the following loop completes, we'll delete all repository_metadata records
# for this repository that do not have a changeset_revision value in this list.
changeset_revisions: List[Optional[str]] = []
# When a new repository_metadata record is created, it always uses the values of
# metadata_changeset_revision and metadata_dict.
metadata_changeset_revision = None
metadata_dict = None
ancestor_changeset_revision = None
ancestor_metadata_dict = None
for changeset in self.repository.get_changesets_for_setting_metadata(self.app):
work_dir = tempfile.mkdtemp(prefix="tmp-toolshed-ramorits")
ctx = repo[changeset]
log.debug("Cloning repository changeset revision: %s", str(ctx.rev()))
assert self.repository_clone_url
repository_clone_url = repository_clone_url or self.repository_clone_url
cloned_ok, error_message = hg_util.clone_repository(repository_clone_url, work_dir, str(ctx.rev()))
if cloned_ok:
log.debug("Generating metadata for changeset revision: %s", str(ctx.rev()))
self.set_changeset_revision(str(ctx))
self.set_repository_files_dir(work_dir)
self.generate_metadata_for_changeset_revision()
if self.metadata_dict:
if metadata_changeset_revision is None and metadata_dict is None:
# We're at the first change set in the change log.
metadata_changeset_revision = self.changeset_revision
metadata_dict = self.metadata_dict
if ancestor_changeset_revision:
# Compare metadata from ancestor and current. The value of comparison will be one of:
# self.NO_METADATA - no metadata for either ancestor or current, so continue from current
# self.EQUAL - ancestor metadata is equivalent to current metadata, so continue from current
# self.SUBSET - ancestor metadata is a subset of current metadata, so continue from current
# self.NOT_EQUAL_AND_NOT_SUBSET - ancestor metadata is neither equal to nor a subset of current
# metadata, so persist ancestor metadata.
log.info(f"amd {ancestor_metadata_dict}")
comparison = self.compare_changeset_revisions(
ancestor_changeset_revision, ancestor_metadata_dict
)
log.info(f"comparison {comparison}")
if comparison in [self.NO_METADATA, self.EQUAL, self.SUBSET]:
ancestor_changeset_revision = self.changeset_revision
ancestor_metadata_dict = self.metadata_dict
elif comparison == self.NOT_EQUAL_AND_NOT_SUBSET:
metadata_changeset_revision = ancestor_changeset_revision
metadata_dict = ancestor_metadata_dict
self.create_or_update_repository_metadata(metadata_changeset_revision, metadata_dict)
changeset_revisions.append(metadata_changeset_revision)
ancestor_changeset_revision = self.changeset_revision
ancestor_metadata_dict = self.metadata_dict
else:
# We're at the beginning of the change log.
ancestor_changeset_revision = self.changeset_revision
ancestor_metadata_dict = self.metadata_dict
if not ctx.children():
metadata_changeset_revision = self.changeset_revision
metadata_dict = self.metadata_dict
# We're at the end of the change log.
self.create_or_update_repository_metadata(metadata_changeset_revision, metadata_dict)
changeset_revisions.append(metadata_changeset_revision)
ancestor_changeset_revision = None
ancestor_metadata_dict = None
elif ancestor_metadata_dict:
# We reach here only if self.metadata_dict is empty and ancestor_metadata_dict is not.
if not ctx.children():
# We're at the end of the change log.
self.create_or_update_repository_metadata(metadata_changeset_revision, metadata_dict)
changeset_revisions.append(metadata_changeset_revision)
ancestor_changeset_revision = None
ancestor_metadata_dict = None
basic_util.remove_dir(work_dir)
# Delete all repository_metadata records for this repository that do not have a changeset_revision
# value in changeset_revisions.
self._clean_repository_metadata(changeset_revisions)
# Set tool version information for all downloadable changeset revisions. Get the list of changeset
# revisions from the changelog.
self._reset_all_tool_versions(repo)
def _reset_all_tool_versions(self, repo):
"""Reset tool version lineage for those changeset revisions that include valid tools."""
assert self.repository
changeset_revisions_that_contain_tools = _get_changeset_revisions_that_contain_tools(
self.app, repo, self.repository
)
# The list of changeset_revisions_that_contain_tools is now filtered to contain only those that
# are downloadable and contain tools. If a repository includes tools, build a dictionary of
# { 'tool id' : 'parent tool id' } pairs for each tool in each changeset revision.
for index, changeset_revision in enumerate(changeset_revisions_that_contain_tools):
tool_versions_dict = {}
repository_metadata = repository_metadata_by_changeset_revision(
self.app.model, self.repository.id, changeset_revision
)
assert repository_metadata
metadata = repository_metadata.metadata
tool_dicts = metadata["tools"]
if index == 0:
# The first changeset_revision is a special case because it will have no ancestor
# changeset_revisions in which to match tools. The parent tool id for tools in the
# first changeset_revision will be the "old_id" in the tool config.
for tool_dict in tool_dicts:
tool_versions_dict[tool_dict["guid"]] = tool_dict["id"]
else:
for tool_dict in tool_dicts:
parent_id = self._get_parent_id(
self.repository.id,
tool_dict["id"],
tool_dict["version"],
tool_dict["guid"],
changeset_revisions_that_contain_tools[0:index],
)
tool_versions_dict[tool_dict["guid"]] = parent_id
if tool_versions_dict:
repository_metadata.tool_versions = tool_versions_dict
self.sa_session.add(repository_metadata)
session = self.sa_session()
with transaction(session):
session.commit()
[docs] def reset_metadata_on_selected_repositories(self, **kwd):
"""
Inspect the repository changelog to reset metadata for all appropriate changeset revisions.
This method is called from both Galaxy and the Tool Shed.
"""
repository_ids = util.listify(kwd.get("repository_ids", None))
message = ""
status = "done"
if repository_ids:
successful_count = 0
unsuccessful_count = 0
for repository_id in repository_ids:
try:
repository = repository_util.get_repository_in_tool_shed(self.app, repository_id)
self.set_repository(repository)
self.resetting_all_metadata_on_repository = True
self.reset_all_metadata_on_repository_in_tool_shed()
if self.invalid_file_tups:
message = tool_util.generate_message_for_invalid_tools(
self.app, self.invalid_file_tups, repository, None, as_html=False
)
log.debug(message)
unsuccessful_count += 1
else:
log.debug(
"Successfully reset metadata on repository %s owned by %s",
repository.name,
repository.user.username,
)
successful_count += 1
except Exception:
log.exception("Error attempting to reset metadata on repository %s", str(repository.name))
unsuccessful_count += 1
message = "Successfully reset metadata on %d %s. " % (
successful_count,
inflector.cond_plural(successful_count, "repository"),
)
if unsuccessful_count:
message += "Error setting metadata on %d %s - see the paster log for details. " % (
unsuccessful_count,
inflector.cond_plural(unsuccessful_count, "repository"),
)
else:
message = "Select at least one repository to on which to reset all metadata."
status = "error"
return message, status
[docs] def set_repository(self, repository, repository_clone_url=None):
super().set_repository(repository)
self.repository_clone_url = repository_clone_url or common_util.generate_clone_url_for(self.trans, repository)
[docs] def set_repository_metadata(self, host, content_alert_str="", **kwd):
"""
Set metadata using the self.repository's current disk files, returning specific error
messages (if any) to alert the repository owner that the changeset has problems.
"""
assert self.repository
message = ""
status = "done"
repository_id = self.repository.id
repo = self.repository.hg_repo
self.generate_metadata_for_changeset_revision()
if self.metadata_dict:
repository_metadata = None
repository_type_class = self.app.repository_types_registry.get_class_by_label(self.repository.type)
tip_only = isinstance(repository_type_class, TipOnly)
if not tip_only and self.new_metadata_required_for_utilities():
# Create a new repository_metadata table row.
repository_metadata = self.create_or_update_repository_metadata(
self.repository.tip(), self.metadata_dict
)
# If this is the first record stored for this repository, see if we need to send any email alerts.
if len(self.repository.downloadable_revisions) == 1:
suc.handle_email_alerts(
self.app, host, self.repository, content_alert_str="", new_repo_alert=True, admin_only=False
)
else:
# Update the latest stored repository metadata with the contents and attributes of self.metadata_dict.
repository_metadata = metadata_util.get_latest_repository_metadata(
self.app, repository_id, downloadable=False
)
if repository_metadata:
downloadable = metadata_util.is_downloadable(self.metadata_dict)
# Update the last saved repository_metadata table row.
repository_metadata.changeset_revision = self.repository.tip()
repository_metadata.metadata = self.metadata_dict
repository_metadata.downloadable = downloadable
repository_metadata.includes_datatypes = False
# We don't store information about the special type of repository dependency that is needed only for
# compiling a tool dependency defined for the dependent repository.
repository_dependencies_dict = self.metadata_dict.get("repository_dependencies", {})
repository_dependencies = repository_dependencies_dict.get("repository_dependencies", [])
(
has_repository_dependencies,
has_repository_dependencies_only_if_compiling_contained_td,
) = repository_util.get_repository_dependency_types(repository_dependencies)
repository_metadata.has_repository_dependencies = has_repository_dependencies
if "tool_dependencies" in self.metadata_dict:
repository_metadata.includes_tool_dependencies = True
else:
repository_metadata.includes_tool_dependencies = False
if "tools" in self.metadata_dict:
repository_metadata.includes_tools = True
else:
repository_metadata.includes_tools = False
repository_metadata.includes_workflows = False
repository_metadata.missing_test_components = False
self.sa_session.add(repository_metadata)
session = self.sa_session()
with transaction(session):
session.commit()
else:
# There are no metadata records associated with the repository.
repository_metadata = self.create_or_update_repository_metadata(
self.repository.tip(), self.metadata_dict
)
if "tools" in self.metadata_dict and repository_metadata and status != "error":
# Set tool versions on the new downloadable change set. The order of the list of changesets is
# critical, so we use the repo's changelog.
changeset_revisions = []
for changeset in repo.changelog:
changeset_revision = str(repo[changeset])
if repository_metadata_by_changeset_revision(self.app.model, repository_id, changeset_revision):
changeset_revisions.append(changeset_revision)
self._add_tool_versions(repository_id, repository_metadata, changeset_revisions)
elif len(repo) == 1 and not self.invalid_file_tups:
message = "Revision <b>%s</b> includes no Galaxy utilities for which metadata can " % str(
self.repository.tip()
)
message += "be defined so this revision cannot be automatically installed into a local Galaxy instance."
status = "error"
if self.invalid_file_tups:
message = tool_util.generate_message_for_invalid_tools(
self.app, self.invalid_file_tups, self.repository, self.metadata_dict
)
status = "error"
return message, status
[docs] def set_repository_metadata_due_to_new_tip(self, host, content_alert_str=None, **kwd):
"""Set metadata on the tip of self.repository in the tool shed."""
error_message, status = self.set_repository_metadata(host, content_alert_str=content_alert_str, **kwd)
return status, error_message
def _get_changeset_revisions_that_contain_tools(app: "ToolShedApp", repo, repository) -> List[str]:
changeset_revisions_that_contain_tools = []
for changeset in repo.changelog:
changeset_revision = str(repo[changeset])
repository_metadata = repository_metadata_by_changeset_revision(app.model, repository.id, changeset_revision)
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
if metadata.get("tools", None):
changeset_revisions_that_contain_tools.append(changeset_revision)
return changeset_revisions_that_contain_tools
[docs]def get_user_by_username(session, username):
stmt = select(User).where(User.username == username)
return session.execute(stmt).scalar_one()
[docs]def get_repository(session, name, user_id):
stmt = select(Repository).where(Repository.name == name).where(Repository.user_id == user_id)
return session.execute(stmt).scalar_one()
[docs]def get_repository_metadata(session, repository_id):
stmt = (
select(RepositoryMetadata)
.where(RepositoryMetadata.repository_id == repository_id)
.order_by(RepositoryMetadata.changeset_revision, RepositoryMetadata.update_time.desc()) # type: ignore[attr-defined] # mapped attribute
)
return session.scalars(stmt)
[docs]def get_current_repositories(session, order=False):
stmt = select(Repository).where(Repository.deleted == false())
if order:
stmt = stmt.order_by(Repository.name, Repository.user_id)
return session.scalars(stmt)
[docs]def get_filtered_repositories(session, repo_ids, order):
stmt = select(Repository).where(Repository.id.in_(repo_ids))
if order:
stmt = stmt.order_by(Repository.name, Repository.user_id)
return session.scalars(stmt)