Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.dependencies.repository.relation_builder
import logging
import tool_shed.util.repository_util
from galaxy.util import (
asbool,
listify,
)
from tool_shed.util import (
common_util,
container_util,
metadata_util,
shed_util_common as suc,
)
log = logging.getLogger(__name__)
[docs]class RelationBuilder:
[docs] def __init__(self, app, repository, repository_metadata, tool_shed_url, trans=None):
self.all_repository_dependencies = {}
self.app = app
self.circular_repository_dependencies = []
self.repository = repository
self.repository_metadata = repository_metadata
self.handled_key_rd_dicts = []
self.key_rd_dicts_to_be_processed = []
self.tool_shed_url = tool_shed_url
self.trans = trans
[docs] def can_add_to_key_rd_dicts(self, key_rd_dict, key_rd_dicts):
"""Handle the case where an update to the changeset revision was done."""
k = next(iter(key_rd_dict))
rd = key_rd_dict[k]
partial_rd = rd[0:3]
for kr_dict in key_rd_dicts:
key = next(iter(kr_dict))
if key == k:
repository_dependency = kr_dict[key]
if repository_dependency[0:3] == partial_rd:
return False
return True
[docs] def filter_only_if_compiling_contained_td(self, key_rd_dict):
"""
Return a copy of the received key_rd_dict with repository dependencies that are needed
only_if_compiling_contained_td filtered out of the list of repository dependencies for
each rd_key.
"""
filtered_key_rd_dict = {}
for rd_key, required_rd_tup in key_rd_dict.items():
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(required_rd_tup)
if not asbool(only_if_compiling_contained_td):
filtered_key_rd_dict[rd_key] = required_rd_tup
return filtered_key_rd_dict
[docs] def get_prior_installation_required_and_only_if_compiling_contained_td(self):
"""
This method is called from the tool shed and never Galaxy. If self.all_repository_dependencies
contains a repository dependency tuple that is associated with self.repository, return the
value of the tuple's prior_installation_required component.
"""
cleaned_toolshed_base_url = common_util.remove_protocol_from_tool_shed_url(self.tool_shed_url)
if self.all_repository_dependencies:
for rd_key, rd_tups in self.all_repository_dependencies.items():
if rd_key in ["root_key", "description"]:
continue
for rd_tup in rd_tups:
(
rd_toolshed,
rd_name,
rd_owner,
rd_changeset_revision,
rd_prior_installation_required,
rd_only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(rd_tup)
cleaned_rd_toolshed = common_util.remove_protocol_from_tool_shed_url(rd_toolshed)
if (
cleaned_rd_toolshed == cleaned_toolshed_base_url
and rd_name == self.repository.name
and rd_owner == self.repository.user.username
and rd_changeset_revision == self.repository_metadata.changeset_revision
):
return rd_prior_installation_required, rd_only_if_compiling_contained_td
elif self.repository_metadata:
# Get the list of changeset revisions from the tool shed to which self.repository may be updated.
metadata = self.repository_metadata.metadata
current_changeset_revision = str(self.repository_metadata.changeset_revision)
# Get the changeset revision to which the current value of required_repository_changeset_revision
# should be updated if it's not current.
text = metadata_util.get_updated_changeset_revisions(
self.app,
name=str(self.repository.name),
owner=str(self.repository.user.username),
changeset_revision=current_changeset_revision,
)
if text:
valid_changeset_revisions = listify(text)
if current_changeset_revision not in valid_changeset_revisions:
valid_changeset_revisions.append(current_changeset_revision)
else:
valid_changeset_revisions = [current_changeset_revision]
repository_dependencies_dict = metadata["repository_dependencies"]
rd_tups = repository_dependencies_dict.get("repository_dependencies", [])
for rd_tup in rd_tups:
(
rd_toolshed,
rd_name,
rd_owner,
rd_changeset_revision,
rd_prior_installation_required,
rd_only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(rd_tup)
cleaned_rd_toolshed = common_util.remove_protocol_from_tool_shed_url(rd_toolshed)
if (
cleaned_rd_toolshed == cleaned_toolshed_base_url
and rd_name == self.repository.name
and rd_owner == self.repository.user.username
and rd_changeset_revision in valid_changeset_revisions
):
return rd_prior_installation_required, rd_only_if_compiling_contained_td
# Default both prior_installation_required and only_if_compiling_contained_td to False.
return "False", "False"
[docs] def get_key_for_repository_changeset_revision(self):
# The received toolshed_base_url must include the port, but doesn't have to include the protocol.
(
prior_installation_required,
only_if_compiling_contained_td,
) = self.get_prior_installation_required_and_only_if_compiling_contained_td()
# Create a key with the value of prior_installation_required defaulted to False.
key = container_util.generate_repository_dependencies_key_for_repository(
self.tool_shed_url,
self.repository.name,
self.repository.user.username,
self.repository_metadata.changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
)
return key
[docs] def get_repository_dependencies_for_changeset_revision(self):
"""
Return a dictionary of all repositories upon which the contents of self.repository_metadata
record depend. The dictionary keys are name-spaced values consisting of:
self.tool_shed_url/repository_name/repository_owner/changeset_revision
and the values are lists of repository_dependency tuples consisting of:
( self.tool_shed_url, repository_name, repository_owner, changeset_revision ).
This method ensures that all required repositories to the nth degree are returned.
"""
# Assume the current repository does not have repository dependencies defined for it.
current_repository_key = None
metadata = self.repository_metadata.metadata
if metadata:
# The value of self.tool_shed_url must include the port, but doesn't have to include
# the protocol.
if "repository_dependencies" in metadata:
current_repository_key = self.get_key_for_repository_changeset_revision()
repository_dependencies_dict = metadata["repository_dependencies"]
if not self.all_repository_dependencies:
self.initialize_all_repository_dependencies(current_repository_key, repository_dependencies_dict)
# Handle the repository dependencies defined in the current repository, if any, and populate
# the various repository dependency objects for this round of processing.
current_repository_key_rd_dicts = self.populate_repository_dependency_objects_for_processing(
current_repository_key, repository_dependencies_dict
)
if current_repository_key:
if current_repository_key_rd_dicts:
# There should be only a single current_repository_key_rd_dict in this list.
current_repository_key_rd_dict = current_repository_key_rd_dicts[0]
# Handle circular repository dependencies.
if not self.in_circular_repository_dependencies(current_repository_key_rd_dict):
if current_repository_key in self.all_repository_dependencies:
self.handle_current_repository_dependency(current_repository_key)
elif self.key_rd_dicts_to_be_processed:
self.handle_next_repository_dependency()
elif self.key_rd_dicts_to_be_processed:
self.handle_next_repository_dependency()
elif self.key_rd_dicts_to_be_processed:
self.handle_next_repository_dependency()
self.all_repository_dependencies = self.prune_invalid_repository_dependencies(self.all_repository_dependencies)
return self.all_repository_dependencies
[docs] def get_repository_dependency_as_key(self, repository_dependency):
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(repository_dependency)
return container_util.generate_repository_dependencies_key_for_repository(
tool_shed, name, owner, changeset_revision, prior_installation_required, only_if_compiling_contained_td
)
[docs] def get_updated_changeset_revisions_for_repository_dependencies(self, key_rd_dicts):
updated_key_rd_dicts = []
for key_rd_dict in key_rd_dicts:
key = next(iter(key_rd_dict))
repository_dependency = key_rd_dict[key]
(
rd_toolshed,
rd_name,
rd_owner,
rd_changeset_revision,
rd_prior_installation_required,
rd_only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(repository_dependency)
tool_shed_is_this_tool_shed = suc.tool_shed_is_this_tool_shed(rd_toolshed, trans=self.trans)
if tool_shed_is_this_tool_shed:
repository = tool_shed.util.repository_util.get_repository_by_name_and_owner(
self.app, rd_name, rd_owner
)
if repository:
repository_id = self.app.security.encode_id(repository.id)
repository_metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
self.app, repository_id, rd_changeset_revision
)
if repository_metadata:
# The repository changeset_revision is installable, so no updates are available.
new_key_rd_dict = {}
new_key_rd_dict[key] = repository_dependency
updated_key_rd_dicts.append(key_rd_dict)
else:
# The repository changeset_revision is no longer installable, so see if there's been an update.
changeset_revision = metadata_util.get_next_downloadable_changeset_revision(
self.app, repository, rd_changeset_revision
)
if changeset_revision != rd_changeset_revision:
repository_metadata = (
metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
self.app, repository_id, changeset_revision
)
)
if repository_metadata:
new_key_rd_dict = {}
new_key_rd_dict[key] = [
rd_toolshed,
rd_name,
rd_owner,
repository_metadata.changeset_revision,
rd_prior_installation_required,
rd_only_if_compiling_contained_td,
]
# We have the updated changeset revision.
updated_key_rd_dicts.append(new_key_rd_dict)
else:
repository_components_tuple = container_util.get_components_from_key(key)
components_list = tool_shed.util.repository_util.extract_components_from_tuple(
repository_components_tuple
)
(
toolshed,
repository_name,
repository_owner,
repository_changeset_revision,
) = components_list[0:4]
# For backward compatibility to the 12/20/12 Galaxy release.
if len(components_list) in (4, 5):
rd_only_if_compiling_contained_td = "False"
log.debug(
"The revision %s defined for repository %s owned by %s is invalid, so repository "
"dependencies defined for repository %s will be ignored.",
rd_changeset_revision,
rd_name,
rd_owner,
repository_name,
)
else:
repository_components_tuple = container_util.get_components_from_key(key)
components_list = tool_shed.util.repository_util.extract_components_from_tuple(
repository_components_tuple
)
toolshed, repository_name, repository_owner, repository_changeset_revision = components_list[0:4]
log.debug(
"The revision %s defined for repository %s owned by %s is invalid, "
"so repository dependencies defined for repository %s will be ignored.",
rd_changeset_revision,
rd_name,
rd_owner,
repository_name,
)
return updated_key_rd_dicts
[docs] def handle_circular_repository_dependency(self, repository_key, repository_dependency):
all_repository_dependencies_root_key = self.all_repository_dependencies["root_key"]
repository_dependency_as_key = self.get_repository_dependency_as_key(repository_dependency)
self.update_circular_repository_dependencies(
repository_key, repository_dependency, self.all_repository_dependencies[repository_dependency_as_key]
)
if all_repository_dependencies_root_key != repository_dependency_as_key:
self.all_repository_dependencies[repository_key] = [repository_dependency]
[docs] def handle_current_repository_dependency(self, current_repository_key):
current_repository_key_rd_dicts = []
for rd in self.all_repository_dependencies[current_repository_key]:
rd_copy = [str(item) for item in rd]
new_key_rd_dict = {}
new_key_rd_dict[current_repository_key] = rd_copy
current_repository_key_rd_dicts.append(new_key_rd_dict)
if current_repository_key_rd_dicts:
self.handle_key_rd_dicts_for_repository(current_repository_key, current_repository_key_rd_dicts)
return self.get_repository_dependencies_for_changeset_revision()
[docs] def handle_key_rd_dicts_for_repository(self, current_repository_key, repository_key_rd_dicts):
key_rd_dict = repository_key_rd_dicts.pop(0)
repository_dependency = key_rd_dict[current_repository_key]
(
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(repository_dependency)
if suc.tool_shed_is_this_tool_shed(toolshed, trans=self.trans):
required_repository = tool_shed.util.repository_util.get_repository_by_name_and_owner(self.app, name, owner)
self.repository = required_repository
repository_id = self.app.security.encode_id(required_repository.id)
required_repository_metadata = metadata_util.get_repository_metadata_by_repository_id_changeset_revision(
self.app, repository_id, changeset_revision
)
self.repository_metadata = required_repository_metadata
if required_repository_metadata:
# The required_repository_metadata changeset_revision is installable.
required_metadata = required_repository_metadata.metadata
if required_metadata:
for current_repository_key_rd_dict in repository_key_rd_dicts:
if not self.in_key_rd_dicts(current_repository_key_rd_dict, self.key_rd_dicts_to_be_processed):
# Add the current repository_dependency into self.key_rd_dicts_to_be_processed.
self.key_rd_dicts_to_be_processed.append(current_repository_key_rd_dict)
if not self.in_key_rd_dicts(key_rd_dict, self.handled_key_rd_dicts):
# Add the current repository_dependency into self.handled_key_rd_dicts.
self.handled_key_rd_dicts.append(key_rd_dict)
if self.in_key_rd_dicts(key_rd_dict, self.key_rd_dicts_to_be_processed):
# Remove the current repository from self.key_rd_dicts_to_be_processed.
self.key_rd_dicts_to_be_processed = self.remove_from_key_rd_dicts(
key_rd_dict, self.key_rd_dicts_to_be_processed
)
else:
# The repository is in a different tool shed, so build an url and send a request.
error_message = "Repository dependencies are currently supported only within the same Tool Shed. "
error_message += "Ignoring repository dependency definition for tool shed "
error_message += f"{toolshed}, name {name}, owner {owner}, changeset revision {changeset_revision}"
log.debug(error_message)
[docs] def handle_next_repository_dependency(self):
next_repository_key_rd_dict = self.key_rd_dicts_to_be_processed.pop(0)
next_repository_key_rd_dicts = [next_repository_key_rd_dict]
next_repository_key = next(iter(next_repository_key_rd_dict))
self.handle_key_rd_dicts_for_repository(next_repository_key, next_repository_key_rd_dicts)
return self.get_repository_dependencies_for_changeset_revision()
[docs] def in_all_repository_dependencies(self, repository_key, repository_dependency):
"""
Return True if { repository_key : repository_dependency } is in self.all_repository_dependencies.
"""
for key, val in self.all_repository_dependencies.items():
if key != repository_key:
continue
if repository_dependency in val:
return True
return False
[docs] def in_circular_repository_dependencies(self, repository_key_rd_dict):
"""
Return True if any combination of a circular dependency tuple is the key : value pair defined
in the received repository_key_rd_dict. This means that each circular dependency tuple is converted
into the key : value pair for comparison.
"""
for tup in self.circular_repository_dependencies:
rd_0, rd_1 = tup
rd_0_as_key = self.get_repository_dependency_as_key(rd_0)
rd_1_as_key = self.get_repository_dependency_as_key(rd_1)
if rd_0_as_key in repository_key_rd_dict and repository_key_rd_dict[rd_0_as_key] == rd_1:
return True
if rd_1_as_key in repository_key_rd_dict and repository_key_rd_dict[rd_1_as_key] == rd_0:
return True
return False
[docs] def in_key_rd_dicts(self, key_rd_dict, key_rd_dicts):
"""Return True if key_rd_dict is contained in the list of key_rd_dicts."""
k = next(iter(key_rd_dict))
v = key_rd_dict[k]
for key_rd_dict in key_rd_dicts:
for key, val in key_rd_dict.items():
if key == k and val == v:
return True
return False
[docs] def initialize_all_repository_dependencies(self, current_repository_key, repository_dependencies_dict):
"""Initialize the self.all_repository_dependencies dictionary."""
# It's safe to assume that current_repository_key in this case will have a value.
self.all_repository_dependencies["root_key"] = current_repository_key
self.all_repository_dependencies[current_repository_key] = []
# Store the value of the 'description' key only once, the first time through this recursive method.
description = repository_dependencies_dict.get("description", None)
self.all_repository_dependencies["description"] = description
[docs] def is_circular_repository_dependency(self, repository_key, repository_dependency):
"""
Return True if the received repository_dependency is a key in self.all_repository_dependencies
whose list of repository dependencies includes the received repository_key.
"""
repository_dependency_as_key = self.get_repository_dependency_as_key(repository_dependency)
repository_key_as_repository_dependency = repository_key.split(container_util.STRSEP)
for key, val in self.all_repository_dependencies.items():
if key != repository_dependency_as_key:
continue
if repository_key_as_repository_dependency in val:
return True
return False
[docs] def populate_repository_dependency_objects_for_processing(
self, current_repository_key, repository_dependencies_dict
):
"""
The process that discovers all repository dependencies for a specified repository's changeset
revision uses this method to populate the following items for the current processing loop:
filtered_current_repository_key_rd_dicts, self.key_rd_dicts_to_be_processed,
self.handled_key_rd_dicts, self.all_repository_dependencies. Each processing loop may discover
more repository dependencies, so this method is repeatedly called until all repository
dependencies have been discovered.
"""
current_repository_key_rd_dicts = []
filtered_current_repository_key_rd_dicts = []
for rd_tup in repository_dependencies_dict["repository_dependencies"]:
new_key_rd_dict = {}
new_key_rd_dict[current_repository_key] = rd_tup
current_repository_key_rd_dicts.append(new_key_rd_dict)
if current_repository_key_rd_dicts and current_repository_key:
# Remove all repository dependencies that point to a revision within its own repository.
current_repository_key_rd_dicts = self.remove_repository_dependency_reference_to_self(
current_repository_key_rd_dicts
)
current_repository_key_rd_dicts = self.get_updated_changeset_revisions_for_repository_dependencies(
current_repository_key_rd_dicts
)
for key_rd_dict in current_repository_key_rd_dicts:
# Filter out repository dependencies that are required only if compiling the dependent
# repository's tool dependency.
# TODO: this temporary work-around should be removed when the underlying framework
# support for handling only_if_compiling_contained_td-flagged repositories is completed.
key_rd_dict = self.filter_only_if_compiling_contained_td(key_rd_dict)
if key_rd_dict:
is_circular = False
in_handled_key_rd_dicts = self.in_key_rd_dicts(key_rd_dict, self.handled_key_rd_dicts)
in_key_rd_dicts_to_be_processed = self.in_key_rd_dicts(key_rd_dict, self.key_rd_dicts_to_be_processed)
if not in_handled_key_rd_dicts and not in_key_rd_dicts_to_be_processed:
filtered_current_repository_key_rd_dicts.append(key_rd_dict)
repository_dependency = key_rd_dict[current_repository_key]
if current_repository_key in self.all_repository_dependencies:
# Add all repository dependencies for the current repository into its entry
# in self.all_repository_dependencies.
all_repository_dependencies_val = self.all_repository_dependencies[current_repository_key]
if repository_dependency not in all_repository_dependencies_val:
all_repository_dependencies_val.append(repository_dependency)
self.all_repository_dependencies[current_repository_key] = all_repository_dependencies_val
elif not self.in_all_repository_dependencies(current_repository_key, repository_dependency):
# Handle circular repository dependencies.
if self.is_circular_repository_dependency(current_repository_key, repository_dependency):
is_circular = True
self.handle_circular_repository_dependency(current_repository_key, repository_dependency)
else:
self.all_repository_dependencies[current_repository_key] = [repository_dependency]
if not is_circular and self.can_add_to_key_rd_dicts(key_rd_dict, self.key_rd_dicts_to_be_processed):
new_key_rd_dict = {}
new_key_rd_dict[current_repository_key] = repository_dependency
self.key_rd_dicts_to_be_processed.append(new_key_rd_dict)
return filtered_current_repository_key_rd_dicts
[docs] def prune_invalid_repository_dependencies(self, repository_dependencies):
"""
Eliminate all invalid entries in the received repository_dependencies dictionary. An entry
is invalid if the value_list of the key/value pair is empty. This occurs when an invalid
combination of tool shed, name , owner, changeset_revision is used and a repository_metadata
record is not found.
"""
valid_repository_dependencies = {}
description = repository_dependencies.get("description", None)
root_key = repository_dependencies.get("root_key", None)
if root_key is None:
return valid_repository_dependencies
for key, value in repository_dependencies.items():
if key in ["description", "root_key"]:
continue
if value:
valid_repository_dependencies[key] = value
if valid_repository_dependencies:
valid_repository_dependencies["description"] = description
valid_repository_dependencies["root_key"] = root_key
return valid_repository_dependencies
[docs] def remove_from_key_rd_dicts(self, key_rd_dict, key_rd_dicts):
"""Eliminate the key_rd_dict from the list of key_rd_dicts if it is contained in the list."""
k = next(iter(key_rd_dict))
v = key_rd_dict[k]
clean_key_rd_dicts = []
for krd_dict in key_rd_dicts:
key = next(iter(krd_dict))
val = krd_dict[key]
if key == k and val == v:
continue
clean_key_rd_dicts.append(krd_dict)
return clean_key_rd_dicts
[docs] def remove_repository_dependency_reference_to_self(self, key_rd_dicts):
"""Remove all repository dependencies that point to a revision within its own repository."""
clean_key_rd_dicts = []
key = next(iter(key_rd_dicts[0]))
repository_tup = key.split(container_util.STRSEP)
(
rd_toolshed,
rd_name,
rd_owner,
rd_changeset_revision,
rd_prior_installation_required,
rd_only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(repository_tup)
cleaned_rd_toolshed = common_util.remove_protocol_from_tool_shed_url(rd_toolshed)
for key_rd_dict in key_rd_dicts:
k = next(iter(key_rd_dict))
repository_dependency = key_rd_dict[k]
(
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(repository_dependency)
cleaned_toolshed = common_util.remove_protocol_from_tool_shed_url(toolshed)
if cleaned_rd_toolshed == cleaned_toolshed and rd_name == name and rd_owner == owner:
debug_msg = f"Removing repository dependency for repository {name} owned by {owner} "
debug_msg += "since it refers to a revision within itself."
log.debug(debug_msg)
else:
new_key_rd_dict = {}
new_key_rd_dict[key] = repository_dependency
clean_key_rd_dicts.append(new_key_rd_dict)
return clean_key_rd_dicts
[docs] def update_circular_repository_dependencies(self, repository_key, repository_dependency, repository_dependencies):
repository_key_as_repository_dependency = repository_key.split(container_util.STRSEP)
if repository_key_as_repository_dependency in repository_dependencies:
found = False
for tup in self.circular_repository_dependencies:
if repository_dependency in tup and repository_key_as_repository_dependency in tup:
# The circular dependency has already been included.
found = True
if not found:
new_circular_tup = [repository_dependency, repository_key_as_repository_dependency]
self.circular_repository_dependencies.append(new_circular_tup)