Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.tools.tool_version_manager
import logging
from sqlalchemy import and_
from tool_shed.util import hg_util
from tool_shed.util import metadata_util
from tool_shed.util import repository_util
log = logging.getLogger(__name__)
[docs]class ToolVersionManager(object):
[docs] def get_tool_version(self, tool_id):
context = self.app.install_model.context
return context.query(self.app.install_model.ToolVersion) \
.filter(self.app.install_model.ToolVersion.table.c.tool_id == tool_id) \
.first()
[docs] def get_tool_version_association(self, parent_tool_version, tool_version):
"""
Return a ToolVersionAssociation if one exists that associates the two
received tool_versions. This function is called only from Galaxy.
"""
context = self.app.install_model.context
return context.query(self.app.install_model.ToolVersionAssociation) \
.filter(and_(self.app.install_model.ToolVersionAssociation.table.c.parent_id == parent_tool_version.id,
self.app.install_model.ToolVersionAssociation.table.c.tool_id == tool_version.id)) \
.first()
[docs] def get_version_lineage_for_tool(self, repository_id, repository_metadata, guid):
"""
Return the tool version lineage chain in descendant order for the received
guid contained in the received repsitory_metadata.tool_versions. This function
is called only from the Tool Shed.
"""
repository = repository_util.get_repository_by_id(self.app, repository_id)
repo = hg_util.get_repo_for_repository(self.app, repository=repository, repo_path=None, create=False)
# Initialize the tool lineage
version_lineage = [guid]
# Get all ancestor guids of the received guid.
current_child_guid = guid
for changeset in hg_util.reversed_upper_bounded_changelog(repo, repository_metadata.changeset_revision):
ctx = repo.changectx(changeset)
rm = metadata_util.get_repository_metadata_by_changeset_revision(self.app, repository_id, str(ctx))
if rm:
parent_guid = rm.tool_versions.get(current_child_guid, None)
if parent_guid:
version_lineage.append(parent_guid)
current_child_guid = parent_guid
# Get all descendant guids of the received guid.
current_parent_guid = guid
for changeset in hg_util.reversed_lower_upper_bounded_changelog(repo,
repository_metadata.changeset_revision,
repository.tip(self.app)):
ctx = repo.changectx(changeset)
rm = metadata_util.get_repository_metadata_by_changeset_revision(self.app, repository_id, str(ctx))
if rm:
tool_versions = rm.tool_versions
for child_guid, parent_guid in tool_versions.items():
if parent_guid == current_parent_guid:
version_lineage.insert(0, child_guid)
current_parent_guid = child_guid
break
return version_lineage