import logging
import os
import re
import shutil
from typing import (
Any,
cast,
Dict,
List,
Optional,
Tuple,
Union,
)
from urllib.error import HTTPError
from markupsafe import escape
from sqlalchemy import (
and_,
false,
or_,
)
from sqlalchemy.orm import joinedload
from galaxy import util
from galaxy.model.base import (
check_database_connection,
transaction,
)
from galaxy.model.scoped_session import install_model_scoped_session
from galaxy.model.tool_shed_install import ToolShedRepository
from galaxy.tool_shed.util import basic_util
from galaxy.util.tool_shed import (
common_util,
encoding_util,
)
from galaxy.util.tool_shed.tool_shed_registry import Registry
log = logging.getLogger(__name__)
VALID_REPOSITORYNAME_RE = re.compile(r"^[a-z0-9\_]+$")
[docs]def check_for_updates(
tool_shed_registry: Registry,
install_model_context: install_model_scoped_session,
repository_id: Optional[int] = None,
) -> Tuple[str, str]:
message = ""
status = "ok"
if repository_id is None:
success_count = 0
repository_names_not_updated = []
updated_count = 0
for repository in install_model_context.query(ToolShedRepository).filter(ToolShedRepository.deleted == false()):
ok, updated = _check_or_update_tool_shed_status_for_installed_repository(
tool_shed_registry, install_model_context, repository
)
if ok:
success_count += 1
else:
repository_names_not_updated.append(f"<b>{escape(str(repository.name))}</b>")
if updated:
updated_count += 1
message = "Checked the status in the tool shed for %d repositories. " % success_count
message += "Updated the tool shed status for %d repositories. " % updated_count
if repository_names_not_updated:
message += "Unable to retrieve status from the tool shed for the following repositories:\n"
message += ", ".join(repository_names_not_updated)
else:
repository = install_model_context.get(ToolShedRepository, repository_id) # type:ignore[assignment]
assert repository
ok, updated = _check_or_update_tool_shed_status_for_installed_repository(
tool_shed_registry, install_model_context, repository
)
if ok:
if updated:
message = f"The tool shed status for repository <b>{escape(str(repository.name))}</b> has been updated."
else:
message = (
f"The status has not changed in the tool shed for repository <b>{escape(str(repository.name))}</b>."
)
else:
message = (
f"Unable to retrieve status from the tool shed for repository <b>{escape(str(repository.name))}</b>."
)
status = "error"
return message, status
def _check_or_update_tool_shed_status_for_installed_repository(
tool_shed_registry: Registry, install_model_context: install_model_scoped_session, repository: ToolShedRepository
) -> Tuple[bool, bool]:
updated = False
tool_shed_status_dict = get_tool_shed_status_for(tool_shed_registry, repository)
if tool_shed_status_dict:
ok = True
if tool_shed_status_dict != repository.tool_shed_status:
repository.tool_shed_status = tool_shed_status_dict
session = install_model_context
session.add(repository)
with transaction(session):
session.commit()
updated = True
else:
ok = False
return ok, updated
[docs]def get_absolute_path_to_file_in_repository(repo_files_dir, file_name):
"""Return the absolute path to a specified disk file contained in a repository."""
stripped_file_name = basic_util.strip_path(file_name)
file_path = None
for root, _, files in os.walk(repo_files_dir):
if root.find(".hg") < 0:
for name in files:
if name == stripped_file_name:
return os.path.abspath(os.path.join(root, name))
return file_path
[docs]def get_installed_repository(
app,
tool_shed=None,
name=None,
owner=None,
changeset_revision=None,
installed_changeset_revision=None,
repository_id=None,
from_cache=False,
):
"""
Return a tool shed repository database record defined by the combination of a toolshed, repository name,
repository owner and either current or originally installed changeset_revision.
"""
check_database_connection(app.install_model.context)
# We store the port, if one exists, in the database.
tool_shed = common_util.remove_protocol_from_tool_shed_url(tool_shed)
if from_cache:
tsr_cache = getattr(app, "tool_shed_repository_cache", None)
if tsr_cache:
return app.tool_shed_repository_cache.get_installed_repository(
tool_shed=tool_shed,
name=name,
owner=owner,
installed_changeset_revision=installed_changeset_revision,
changeset_revision=changeset_revision,
repository_id=repository_id,
)
query = app.install_model.context.query(app.install_model.ToolShedRepository)
if repository_id:
clause_list = [app.install_model.ToolShedRepository.id == repository_id]
else:
clause_list = [
app.install_model.ToolShedRepository.tool_shed == tool_shed,
app.install_model.ToolShedRepository.name == name,
app.install_model.ToolShedRepository.owner == owner,
]
if changeset_revision is not None:
clause_list.append(app.install_model.ToolShedRepository.changeset_revision == changeset_revision)
if installed_changeset_revision is not None:
clause_list.append(
app.install_model.ToolShedRepository.installed_changeset_revision == installed_changeset_revision
)
return query.filter(and_(*clause_list)).first()
[docs]def get_prior_import_or_install_required_dict(app, tsr_ids, repo_info_dicts):
"""
This method is used in the Tool Shed when exporting a repository and its dependencies,
and in Galaxy when a repository and its dependencies are being installed. Return a
dictionary whose keys are the received tsr_ids and whose values are a list of tsr_ids,
each of which is contained in the received list of tsr_ids and whose associated repository
must be imported or installed prior to the repository associated with the tsr_id key.
"""
# Initialize the dictionary.
prior_import_or_install_required_dict = {}
for tsr_id in tsr_ids:
prior_import_or_install_required_dict[tsr_id] = []
# Inspect the repository dependencies for each repository about to be installed and populate the dictionary.
for repo_info_dict in repo_info_dicts:
repository, repository_dependencies = get_repository_and_repository_dependencies_from_repo_info_dict(
app, repo_info_dict
)
if repository:
encoded_repository_id = app.security.encode_id(repository.id)
if encoded_repository_id in tsr_ids:
# We've located the database table record for one of the repositories we're about to install, so find out if it has any repository
# dependencies that require prior installation.
prior_import_or_install_ids = get_repository_ids_requiring_prior_import_or_install(
app, tsr_ids, repository_dependencies
)
prior_import_or_install_required_dict[encoded_repository_id] = prior_import_or_install_ids
return prior_import_or_install_required_dict
ToolDependenciesDictT = Dict[str, Union[Dict[str, Any], List[Dict[str, Any]]]]
OldRepositoryTupleT = Tuple[str, str, str, str, str, ToolDependenciesDictT]
RepositoryTupleT = Tuple[str, str, str, str, str, Optional[Any], ToolDependenciesDictT]
AnyRepositoryTupleT = Union[OldRepositoryTupleT, RepositoryTupleT]
[docs]def get_repo_info_tuple_contents(repo_info_tuple: AnyRepositoryTupleT) -> RepositoryTupleT:
"""Take care in handling the repo_info_tuple as it evolves over time as new tool shed features are introduced."""
if len(repo_info_tuple) == 6:
old_repo_info = cast(OldRepositoryTupleT, repo_info_tuple)
(
description,
repository_clone_url,
changeset_revision,
ctx_rev,
repository_owner,
tool_dependencies,
) = old_repo_info
repository_dependencies = None
elif len(repo_info_tuple) == 7:
repo_info = cast(RepositoryTupleT, repo_info_tuple)
(
description,
repository_clone_url,
changeset_revision,
ctx_rev,
repository_owner,
repository_dependencies,
tool_dependencies,
) = repo_info
return (
description,
repository_clone_url,
changeset_revision,
ctx_rev,
repository_owner,
repository_dependencies,
tool_dependencies,
)
[docs]def get_repository_admin_role_name(repository_name, repository_owner):
return f"{repository_name}_{repository_owner}_admin"
[docs]def get_repository_and_repository_dependencies_from_repo_info_dict(app, repo_info_dict):
"""Return a tool_shed_repository or repository record defined by the information in the received repo_info_dict."""
repository_name = list(repo_info_dict.keys())[0]
repo_info_tuple = repo_info_dict[repository_name]
(
description,
repository_clone_url,
changeset_revision,
ctx_rev,
repository_owner,
repository_dependencies,
tool_dependencies,
) = get_repo_info_tuple_contents(repo_info_tuple)
if hasattr(app, "install_model"):
# In a tool shed client (Galaxy, or something install repositories like Galaxy)
tool_shed = get_tool_shed_from_clone_url(repository_clone_url)
repository = get_repository_for_dependency_relationship(
app, tool_shed, repository_name, repository_owner, changeset_revision
)
else:
# We're in the tool shed.
repository = get_repository_by_name_and_owner(app, repository_name, repository_owner)
return repository, repository_dependencies
[docs]def get_repository_by_id(app, id):
"""Get a repository from the database via id."""
if is_tool_shed_client(app):
return app.install_model.context.query(app.install_model.ToolShedRepository).get(app.security.decode_id(id))
else:
sa_session = app.model.session
return sa_session.query(app.model.Repository).get(app.security.decode_id(id))
[docs]def get_repository_by_name_and_owner(app, name, owner, eagerload_columns=None):
"""Get a repository from the database via name and owner"""
repository_query = get_repository_query(app)
if is_tool_shed_client(app):
return repository_query.filter(
and_(
app.install_model.ToolShedRepository.name == name,
app.install_model.ToolShedRepository.owner == owner,
)
).first()
# We're in the tool shed.
q = repository_query.filter(
and_(
app.model.Repository.name == name,
app.model.User.username == owner,
app.model.Repository.user_id == app.model.User.id,
)
)
if eagerload_columns:
q = q.options(joinedload(*eagerload_columns))
return q.first()
[docs]def get_repository_by_name(app, name):
"""Get a repository from the database via name."""
return get_repository_query(app).filter_by(name=name).first()
[docs]def get_repository_dependency_types(repository_dependencies):
"""
Inspect the received list of repository_dependencies tuples and return boolean values
for has_repository_dependencies and has_repository_dependencies_only_if_compiling_contained_td.
"""
# Set has_repository_dependencies, which will be True only if at least one repository_dependency
# is defined with the value of
# only_if_compiling_contained_td as False.
has_repository_dependencies = False
for rd_tup in repository_dependencies:
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(rd_tup)
if not util.asbool(only_if_compiling_contained_td):
has_repository_dependencies = True
break
# Set has_repository_dependencies_only_if_compiling_contained_td, which will be True only if at
# least one repository_dependency is defined with the value of only_if_compiling_contained_td as True.
has_repository_dependencies_only_if_compiling_contained_td = False
for rd_tup in repository_dependencies:
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(rd_tup)
if util.asbool(only_if_compiling_contained_td):
has_repository_dependencies_only_if_compiling_contained_td = True
break
return has_repository_dependencies, has_repository_dependencies_only_if_compiling_contained_td
[docs]def get_repository_for_dependency_relationship(app, tool_shed, name, owner, changeset_revision):
"""
Return an installed tool_shed_repository database record that is defined by either the current changeset
revision or the installed_changeset_revision.
"""
# This method is used only in Galaxy, not the Tool Shed. We store the port (if one exists) in the database.
tool_shed = common_util.remove_protocol_from_tool_shed_url(tool_shed)
if tool_shed is None or name is None or owner is None or changeset_revision is None:
message = "Unable to retrieve the repository record from the database because one or more of the following "
message += f"required parameters is None: tool_shed: {tool_shed}, name: {name}, owner: {owner}, changeset_revision: {changeset_revision}"
raise Exception(message)
repository = get_installed_repository(
app=app, tool_shed=tool_shed, name=name, owner=owner, installed_changeset_revision=changeset_revision
)
if not repository:
repository = get_installed_repository(
app=app, tool_shed=tool_shed, name=name, owner=owner, changeset_revision=changeset_revision
)
if not repository:
tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed)
repository_clone_url = os.path.join(tool_shed_url, "repos", owner, name)
repo_info_tuple = (None, repository_clone_url, changeset_revision, None, owner, None, None)
repository, pcr = repository_was_previously_installed(app, tool_shed_url, name, repo_info_tuple)
if not repository:
# The received changeset_revision is no longer installable, so get the next changeset_revision
# in the repository's changelog in the tool shed that is associated with repository_metadata.
tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed)
params = dict(name=name, owner=owner, changeset_revision=changeset_revision)
pathspec = ["repository", "next_installable_changeset_revision"]
text = util.url_get(
tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params
)
if text:
repository = get_installed_repository(
app=app, tool_shed=tool_shed, name=name, owner=owner, changeset_revision=text
)
return repository
[docs]def get_repository_ids_requiring_prior_import_or_install(app, tsr_ids, repository_dependencies):
"""
This method is used in the Tool Shed when exporting a repository and its dependencies,
and in Galaxy when a repository and its dependencies are being installed. Inspect the
received repository_dependencies and determine if the encoded id of each required
repository is in the received tsr_ids. If so, then determine whether that required
repository should be imported / installed prior to its dependent repository. Return a
list of encoded repository ids, each of which is contained in the received list of tsr_ids,
and whose associated repositories must be imported / installed prior to the dependent
repository associated with the received repository_dependencies.
"""
prior_tsr_ids = []
if repository_dependencies:
for key, rd_tups in repository_dependencies.items():
if key in ["description", "root_key"]:
continue
for rd_tup in rd_tups:
(
tool_shed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = common_util.parse_repository_dependency_tuple(rd_tup)
# If only_if_compiling_contained_td is False, then the repository dependency
# is not required to be installed prior to the dependent repository even if
# prior_installation_required is True. This is because the only meaningful
# content of the repository dependency is its contained tool dependency, which
# is required in order to compile the dependent repository's tool dependency.
# In the scenario where the repository dependency is not installed prior to the
# dependent repository's tool dependency compilation process, the tool dependency
# compilation framework will install the repository dependency prior to compilation
# of the dependent repository's tool dependency.
if not util.asbool(only_if_compiling_contained_td):
if util.asbool(prior_installation_required):
if is_tool_shed_client(app):
# We store the port, if one exists, in the database.
tool_shed = common_util.remove_protocol_from_tool_shed_url(tool_shed)
repository = get_repository_for_dependency_relationship(
app, tool_shed, name, owner, changeset_revision
)
else:
repository = get_repository_by_name_and_owner(app, name, owner)
if repository:
encoded_repository_id = app.security.encode_id(repository.id)
if encoded_repository_id in tsr_ids:
prior_tsr_ids.append(encoded_repository_id)
return prior_tsr_ids
[docs]def get_repository_owner(cleaned_repository_url):
"""Gvien a "cleaned" repository clone URL, return the owner of the repository."""
items = cleaned_repository_url.split("/repos/")
repo_path = items[1]
if repo_path.startswith("/"):
repo_path = repo_path.replace("/", "", 1)
return repo_path.lstrip("/").split("/")[0]
[docs]def get_repository_owner_from_clone_url(repository_clone_url):
"""Given a repository clone URL, return the owner of the repository."""
tmp_url = common_util.remove_protocol_and_user_from_clone_url(repository_clone_url)
return get_repository_owner(tmp_url)
[docs]def get_repository_query(app):
if is_tool_shed_client(app):
query = app.install_model.context.query(app.install_model.ToolShedRepository)
else:
query = app.model.context.query(app.model.Repository)
return query
[docs]def get_role_by_id(app, role_id):
"""Get a Role from the database by id."""
sa_session = app.model.session
return sa_session.query(app.model.Role).get(app.security.decode_id(role_id))
def get_tool_shed_status_for(tool_shed_registry: Registry, repository: ToolShedRepository):
tool_shed_url = tool_shed_registry.get_tool_shed_url(str(repository.tool_shed))
assert tool_shed_url
params: Dict[str, Any] = dict(
name=repository.name, owner=repository.owner, changeset_revision=repository.changeset_revision
)
pathspec = ["repository", "status_for_installed_repository"]
try:
encoded_tool_shed_status_dict = util.url_get(
tool_shed_url, auth=tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params
)
tool_shed_status_dict = encoding_util.tool_shed_decode(encoded_tool_shed_status_dict)
return tool_shed_status_dict
except HTTPError as e:
# This should handle backward compatility to the Galaxy 12/20/12 release. We used to only handle updates for an installed revision
# using a boolean value.
log.debug(
"Error attempting to get tool shed status for installed repository %s: %s\nAttempting older 'check_for_updates' method.\n",
repository.name,
e,
)
pathspec = ["repository", "check_for_updates"]
params["from_update_manager"] = True
try:
# The value of text will be 'true' or 'false', depending upon whether there is an update available for the installed revision.
text = util.url_get(
tool_shed_url, auth=tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params
)
return dict(revision_update=text)
except Exception:
# The required tool shed may be unavailable, so default the revision_update value to 'false'.
return dict(revision_update="false")
except Exception:
log.exception("Error attempting to get tool shed status for installed repository %s", str(repository.name))
return {}
[docs]def repository_was_previously_installed(app, tool_shed_url, repository_name, repo_info_tuple, from_tip=False):
"""
Find out if a repository is already installed into Galaxy - there are several scenarios where this
is necessary. For example, this method will handle the case where the repository was previously
installed using an older changeset_revsion, but later the repository was updated in the tool shed
and now we're trying to install the latest changeset revision of the same repository instead of
updating the one that was previously installed. We'll look in the database instead of on disk since
the repository may be currently uninstalled.
"""
tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed_url)
(
description,
repository_clone_url,
changeset_revision,
ctx_rev,
repository_owner,
repository_dependencies,
tool_dependencies,
) = get_repo_info_tuple_contents(repo_info_tuple)
tool_shed = get_tool_shed_from_clone_url(repository_clone_url)
# See if we can locate the repository using the value of changeset_revision.
tool_shed_repository = get_installed_repository(
app,
tool_shed=tool_shed,
name=repository_name,
owner=repository_owner,
installed_changeset_revision=changeset_revision,
)
if tool_shed_repository:
return tool_shed_repository, changeset_revision
# Get all previous changeset revisions from the tool shed for the repository back to, but excluding,
# the previous valid changeset revision to see if it was previously installed using one of them.
params = dict(
name=repository_name,
owner=repository_owner,
changeset_revision=changeset_revision,
from_tip=str(from_tip),
)
pathspec = ["repository", "previous_changeset_revisions"]
text = util.url_get(
tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params
)
if text:
changeset_revisions = util.listify(text)
for previous_changeset_revision in changeset_revisions:
tool_shed_repository = get_installed_repository(
app,
tool_shed=tool_shed,
name=repository_name,
owner=repository_owner,
installed_changeset_revision=previous_changeset_revision,
)
if tool_shed_repository:
return tool_shed_repository, previous_changeset_revision
return None, None
[docs]def set_repository_attributes(app, repository, status, error_message, deleted, uninstalled, remove_from_disk=False):
if remove_from_disk:
relative_install_dir = repository.repo_path(app)
if relative_install_dir:
clone_dir = os.path.abspath(relative_install_dir)
try:
shutil.rmtree(clone_dir)
log.debug("Removed repository installation directory: %s", clone_dir)
except Exception as e:
log.debug("Error removing repository installation directory %s: %s", clone_dir, util.unicodify(e))
repository.error_message = error_message
repository.status = status
repository.deleted = deleted
repository.uninstalled = uninstalled
session = app.install_model.context
session.add(repository)
with transaction(session):
session.commit()
__all__ = (
"check_for_updates",
"create_or_update_tool_shed_repository",
"extract_components_from_tuple",
"generate_tool_shed_repository_install_dir",
"get_absolute_path_to_file_in_repository",
"get_ids_of_tool_shed_repositories_being_installed",
"get_installed_repository",
"get_installed_tool_shed_repository",
"get_prior_import_or_install_required_dict",
"get_repo_info_tuple_contents",
"get_repository_admin_role_name",
"get_repository_and_repository_dependencies_from_repo_info_dict",
"get_repository_by_id",
"get_repository_by_name",
"get_repository_by_name_and_owner",
"get_repository_dependency_types",
"get_repository_for_dependency_relationship",
"get_repository_ids_requiring_prior_import_or_install",
"get_repository_owner",
"get_repository_owner_from_clone_url",
"get_repository_query",
"get_role_by_id",
"get_tool_shed_from_clone_url",
"get_tool_shed_repository_by_id",
"get_tool_shed_status_for_installed_repository",
"is_tool_shed_client",
"repository_was_previously_installed",
"set_repository_attributes",
)