Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.managers.repositories
"""
Manager and Serializer for TS repositories.
"""
import json
import logging
from collections import namedtuple
from time import strftime
from typing import (
Any,
Callable,
cast,
Dict,
List,
Optional,
Union,
)
from pydantic import BaseModel
from sqlalchemy import (
false,
select,
)
from sqlalchemy.orm import scoped_session
from galaxy import web
from galaxy.exceptions import (
ConfigDoesNotAllowException,
InsufficientPermissionsException,
InternalServerError,
MalformedContents,
ObjectNotFound,
RequestParameterInvalidException,
)
from galaxy.tool_shed.util import dependency_display
from galaxy.util import listify
from galaxy.util.tool_shed.encoding_util import tool_shed_encode
from tool_shed.context import (
ProvidesRepositoriesContext,
ProvidesUserContext,
)
from tool_shed.metadata import repository_metadata_manager
from tool_shed.repository_types import util as rt_util
from tool_shed.structured_app import ToolShedApp
from tool_shed.util import hg_util
from tool_shed.util.metadata_util import (
get_all_dependencies,
get_current_repository_metadata_for_changeset_revision,
get_metadata_revisions,
get_next_downloadable_changeset_revision,
get_repository_metadata_by_changeset_revision,
)
from tool_shed.util.readme_util import build_readme_files_dict
from tool_shed.util.repository_content_util import upload_tar
from tool_shed.util.repository_util import (
create_repository as low_level_create_repository,
get_repo_info_dict,
get_repositories_by_category,
get_repository_by_name_and_owner,
get_repository_in_tool_shed,
validate_repository_name,
)
from tool_shed.util.shed_util_common import (
count_repositories_in_category,
get_category,
)
from tool_shed.util.tool_util import generate_message_for_invalid_tools
from tool_shed.webapp.model import (
Repository,
RepositoryMetadata,
)
from tool_shed.webapp.search.repo_search import RepoSearch
from tool_shed_client.schema import (
CreateRepositoryRequest,
DetailedRepository,
ExtraRepoInfo,
LegacyInstallInfoTuple,
Repository as SchemaRepository,
RepositoryMetadataInstallInfoDict,
ResetMetadataOnRepositoryResponse,
)
from .categories import get_value_mapper as category_value_mapper
log = logging.getLogger(__name__)
[docs]def search(trans: ProvidesUserContext, q: str, page: int = 1, page_size: int = 10):
"""
Perform the search over TS repositories.
Note that search works over the Whoosh index which you have
to pre-create with scripts/tool_shed/build_ts_whoosh_index.sh manually.
Also TS config option toolshed_search_on has to be True and
whoosh_index_dir has to be specified.
"""
app = trans.app
conf = app.config
if not conf.toolshed_search_on:
raise ConfigDoesNotAllowException("Searching the TS through the API is turned off for this instance.")
if not conf.whoosh_index_dir:
raise ConfigDoesNotAllowException(
"There is no directory for the search index specified. Please contact the administrator."
)
search_term = q.strip()
if len(search_term) < 1:
raise RequestParameterInvalidException("The search term has to be at least one character long.")
repo_search = RepoSearch()
Boosts = namedtuple(
"Boosts",
[
"repo_name_boost",
"repo_description_boost",
"repo_long_description_boost",
"repo_homepage_url_boost",
"repo_remote_repository_url_boost",
"categories_boost",
"repo_owner_username_boost",
],
)
boosts = Boosts(
float(conf.get("repo_name_boost", 0.9)),
float(conf.get("repo_description_boost", 0.6)),
float(conf.get("repo_long_description_boost", 0.5)),
float(conf.get("repo_homepage_url_boost", 0.3)),
float(conf.get("repo_remote_repository_url_boost", 0.2)),
float(conf.get("categories_boost", 0.5)),
float(conf.get("repo_owner_username_boost", 0.3)),
)
results = repo_search.search(trans, search_term, page, page_size, boosts)
results["hostname"] = web.url_for("/", qualified=True)
return results
[docs]class UpdatesRequest(BaseModel):
name: Optional[str] = None
owner: Optional[str] = None
changeset_revision: str
hexlify: bool = True
[docs]def check_updates(app: ToolShedApp, request: UpdatesRequest) -> Union[str, Dict[str, Any]]:
name = request.name
owner = request.owner
changeset_revision = request.changeset_revision
hexlify_this = request.hexlify
repository = get_repository_by_name_and_owner(
app, name, owner, eagerload_columns=[Repository.downloadable_revisions]
)
if repository and repository.downloadable_revisions:
repository_metadata = get_repository_metadata_by_changeset_revision(
app, app.security.encode_id(repository.id), changeset_revision
)
tool_shed_status_dict = {}
# Handle repository deprecation.
tool_shed_status_dict["repository_deprecated"] = str(repository.deprecated)
tip_revision = repository.downloadable_revisions[0]
# Handle latest installable revision.
if changeset_revision == tip_revision:
tool_shed_status_dict["latest_installable_revision"] = "True"
else:
next_installable_revision = get_next_downloadable_changeset_revision(app, repository, changeset_revision)
if repository_metadata is None:
if next_installable_revision and next_installable_revision != changeset_revision:
tool_shed_status_dict["latest_installable_revision"] = "True"
else:
tool_shed_status_dict["latest_installable_revision"] = "False"
else:
if next_installable_revision and next_installable_revision != changeset_revision:
tool_shed_status_dict["latest_installable_revision"] = "False"
else:
tool_shed_status_dict["latest_installable_revision"] = "True"
# Handle revision updates.
if changeset_revision == tip_revision:
tool_shed_status_dict["revision_update"] = "False"
else:
if repository_metadata is None:
tool_shed_status_dict["revision_update"] = "True"
else:
tool_shed_status_dict["revision_update"] = "False"
# Handle revision upgrades.
metadata_revisions = [revision[1] for revision in get_metadata_revisions(app, repository)]
num_metadata_revisions = len(metadata_revisions)
for index, metadata_revision in enumerate(metadata_revisions):
if index == num_metadata_revisions:
tool_shed_status_dict["revision_upgrade"] = "False"
break
if metadata_revision == changeset_revision:
if num_metadata_revisions - index > 1:
tool_shed_status_dict["revision_upgrade"] = "True"
else:
tool_shed_status_dict["revision_upgrade"] = "False"
break
return tool_shed_encode(tool_shed_status_dict) if hexlify_this else json.dumps(tool_shed_status_dict)
return tool_shed_encode({}) if hexlify_this else json.dumps({})
[docs]def guid_to_repository(app: ToolShedApp, tool_id: str) -> "Repository":
# tool_id = remove_protocol_and_user_from_clone_url(tool_id)
shed, _, owner, name, rest = tool_id.split("/", 5)
return _get_repository_by_name_and_owner(app.model.context, name, owner, app.model.User)
[docs]def index_tool_ids(app: ToolShedApp, tool_ids: List[str]) -> Dict[str, Any]:
repository_found = []
all_metadata = {}
for tool_id in tool_ids:
repository = guid_to_repository(app, tool_id)
owner = repository.user.username
name = repository.name
assert name
repository = _get_repository_by_name_and_owner(app.model.session().current, name, owner, app.model.User)
if not repository:
log.warning(f"Repository {owner}/{name} does not exist, skipping")
continue
for changeset, changehash in repository.installable_revisions(app):
metadata = get_current_repository_metadata_for_changeset_revision(app, repository, changehash)
tools: Optional[List[Dict[str, Any]]] = metadata.metadata.get("tools")
if not tools:
log.warning(f"Repository {owner}/{name}/{changehash} does not contain valid tools, skipping")
continue
for tool_metadata in tools:
if tool_metadata["guid"] in tool_ids:
repository_found.append(f"{int(changeset)}:{changehash}")
metadata = get_current_repository_metadata_for_changeset_revision(app, repository, changehash)
if metadata is None:
continue
metadata_dict = metadata.to_dict(
value_mapper={"id": app.security.encode_id, "repository_id": app.security.encode_id}
)
metadata_dict["repository"] = repository.to_dict(value_mapper={"id": app.security.encode_id})
if metadata.has_repository_dependencies:
metadata_dict["repository_dependencies"] = get_all_dependencies(
app, metadata, processed_dependency_links=[]
)
else:
metadata_dict["repository_dependencies"] = []
if metadata.includes_tool_dependencies:
metadata_dict["tool_dependencies"] = repository.get_tool_dependencies(app, changehash)
else:
metadata_dict["tool_dependencies"] = {}
if metadata.includes_tools:
metadata_dict["tools"] = metadata.metadata["tools"]
all_metadata[f"{int(changeset)}:{changehash}"] = metadata_dict
if repository_found:
all_metadata["current_changeset"] = repository_found[0]
# all_metadata[ 'found_changesets' ] = repository_found
return all_metadata
else:
return {}
[docs]def index_repositories(app: ToolShedApp, name: Optional[str], owner: Optional[str], deleted: bool):
return list(
_get_repositories_by_name_and_owner_and_deleted(app.model.context, name, owner, deleted, app.model.User)
)
[docs]def can_manage_repo(trans: ProvidesUserContext, repository: Repository) -> bool:
security_agent = trans.app.security_agent
return trans.user_is_admin or security_agent.user_can_administer_repository(trans.user, repository)
[docs]def can_update_repo(trans: ProvidesUserContext, repository: Repository) -> bool:
app = trans.app
security_agent = app.security_agent
return can_manage_repo(trans, repository) or security_agent.can_push(app, trans.user, repository)
[docs]def get_repository_metadata_for_management(
trans: ProvidesUserContext, encoded_repository_id: str, changeset_revision: str
) -> RepositoryMetadata:
repository = get_repository_in_tool_shed(trans.app, encoded_repository_id)
ensure_can_manage(trans, repository, "Cannot manage target repository")
revisions = [r for r in repository.metadata_revisions if r.changeset_revision == changeset_revision]
if len(revisions) != 1:
raise ObjectNotFound()
repository_metadata = revisions[0]
return repository_metadata
[docs]def get_install_info(trans: ProvidesRepositoriesContext, name, owner, changeset_revision) -> LegacyInstallInfoTuple:
app = trans.app
value_mapper = get_value_mapper(app)
# Example URL:
# http://<xyz>/api/repositories/get_repository_revision_install_info?name=<n>&owner=<o>&changeset_revision=<cr>
if name and owner and changeset_revision:
# Get the repository information.
repository = get_repository_by_name_and_owner(
app, name, owner, eagerload_columns=[Repository.downloadable_revisions]
)
if repository is None:
log.debug(f"Cannot locate repository {name} owned by {owner}")
return {}, {}, {}
encoded_repository_id = app.security.encode_id(repository.id)
repository_dict: dict = repository.to_dict(view="element", value_mapper=value_mapper)
repository_dict["url"] = web.url_for(controller="repositories", action="show", id=encoded_repository_id)
# Get the repository_metadata information.
repository_metadata = get_repository_metadata_by_changeset_revision(
app, encoded_repository_id, changeset_revision
)
if repository_metadata is None:
# The changeset_revision column in the repository_metadata table has been updated with a new
# value value, so find the changeset_revision to which we need to update.
new_changeset_revision = get_next_downloadable_changeset_revision(app, repository, changeset_revision)
repository_metadata = get_repository_metadata_by_changeset_revision(
app, encoded_repository_id, new_changeset_revision
)
changeset_revision = new_changeset_revision
if repository_metadata is not None:
encoded_repository_metadata_id = app.security.encode_id(repository_metadata.id)
repository_metadata_dict: RepositoryMetadataInstallInfoDict = cast(
RepositoryMetadataInstallInfoDict,
repository_metadata.to_dict(view="collection", value_mapper=value_mapper),
)
repository_metadata_dict["url"] = web.url_for(
controller="repository_revisions", action="show", id=encoded_repository_metadata_id
)
if "tools" in repository_metadata.metadata:
repository_metadata_dict["valid_tools"] = repository_metadata.metadata["tools"]
# Get the repo_info_dict for installing the repository.
repo_info_dict: ExtraRepoInfo
(
repo_info_dict,
includes_tools,
includes_tool_dependencies,
includes_tools_for_display_in_tool_panel,
has_repository_dependencies,
has_repository_dependencies_only_if_compiling_contained_td,
) = get_repo_info_dict(trans, encoded_repository_id, changeset_revision)
return repository_dict, repository_metadata_dict, repo_info_dict
else:
log.debug(
"Unable to locate repository_metadata record for repository id %s and changeset_revision %s",
repository.id,
changeset_revision,
)
return repository_dict, {}, {}
else:
debug_msg = "Error in the Tool Shed repositories API in get_repository_revision_install_info: "
debug_msg += f"Invalid name {name} or owner {owner} or changeset_revision {changeset_revision} received."
log.debug(debug_msg)
return {}, {}, {}
[docs]def get_value_mapper(app: ToolShedApp) -> Dict[str, Callable]:
value_mapper = {
"id": app.security.encode_id,
"repository_id": app.security.encode_id,
"user_id": app.security.encode_id,
}
return value_mapper
[docs]def get_ordered_installable_revisions(
app: ToolShedApp, name: Optional[str], owner: Optional[str], tsr_id: Optional[str]
) -> List[str]:
eagerload_columns = [Repository.downloadable_revisions]
if None not in [name, owner]:
# Get the repository information.
repository = get_repository_by_name_and_owner(app, name, owner, eagerload_columns=eagerload_columns)
if repository is None:
raise ObjectNotFound(f"No repository named {name} found with owner {owner}")
elif tsr_id is not None:
repository = get_repository_in_tool_shed(app, tsr_id, eagerload_columns=eagerload_columns)
else:
error_message = "Error in the Tool Shed repositories API in get_ordered_installable_revisions: "
error_message += "invalid parameters received."
log.debug(error_message)
return []
return [revision[1] for revision in repository.installable_revisions(app, sort_revisions=True)]
[docs]def get_repository_metadata_dict(app: ToolShedApp, id: str, recursive: bool, downloadable_only: bool) -> Dict[str, Any]:
all_metadata = {}
repository = get_repository_in_tool_shed(app, id, eagerload_columns=[Repository.downloadable_revisions])
for changeset, changehash in get_metadata_revisions(
app, repository, sort_revisions=True, downloadable=downloadable_only
):
metadata = get_current_repository_metadata_for_changeset_revision(app, repository, changehash)
if metadata is None:
continue
metadata_dict = metadata.to_dict(
value_mapper={"id": app.security.encode_id, "repository_id": app.security.encode_id}
)
metadata_dict["repository"] = repository.to_dict(
value_mapper={"id": app.security.encode_id, "user_id": app.security.encode_id}
)
if metadata.has_repository_dependencies and recursive:
metadata_dict["repository_dependencies"] = get_all_dependencies(
app, metadata, processed_dependency_links=[]
)
else:
metadata_dict["repository_dependencies"] = []
if metadata.includes_tools:
metadata_dict["tools"] = metadata.metadata["tools"]
metadata_dict["invalid_tools"] = metadata.metadata.get("invalid_tools", [])
all_metadata[f"{int(changeset)}:{changehash}"] = metadata_dict
return all_metadata
[docs]def readmes(app: ToolShedApp, repository: Repository, changeset_revision: str) -> dict:
encoded_repository_id = app.security.encode_id(repository.id)
repository_metadata = get_repository_metadata_by_changeset_revision(app, encoded_repository_id, changeset_revision)
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
return build_readme_files_dict(app, repository, changeset_revision, repository_metadata.metadata)
return {}
[docs]def reset_metadata_on_repository(trans: ProvidesUserContext, repository_id) -> ResetMetadataOnRepositoryResponse:
app: ToolShedApp = trans.app
def handle_repository(trans, start_time, repository):
results = dict(start_time=start_time, repository_status=[])
try:
rmm = repository_metadata_manager.RepositoryMetadataManager(
trans,
repository=repository,
resetting_all_metadata_on_repository=True,
updating_installed_repository=False,
persist=False,
)
rmm.reset_all_metadata_on_repository_in_tool_shed()
rmm_invalid_file_tups = rmm.get_invalid_file_tups()
if rmm_invalid_file_tups:
message = generate_message_for_invalid_tools(
app, rmm_invalid_file_tups, repository, None, as_html=False
)
results["status"] = "warning"
else:
message = (
f"Successfully reset metadata on repository {repository.name} owned by {repository.user.username}"
)
results["status"] = "ok"
except Exception as e:
message = (
f"Error resetting metadata on repository {repository.name} owned by {repository.user.username}: {e}"
)
results["status"] = "error"
status = f"{repository.name} : {message}"
results["repository_status"].append(status)
return results
if repository_id is not None:
repository = get_repository_in_tool_shed(app, repository_id)
start_time = strftime("%Y-%m-%d %H:%M:%S")
log.debug(f"{start_time}...resetting metadata on repository {repository.name}")
results = handle_repository(trans, start_time, repository)
stop_time = strftime("%Y-%m-%d %H:%M:%S")
results["stop_time"] = stop_time
return ResetMetadataOnRepositoryResponse(**results)
[docs]def create_repository(trans: ProvidesUserContext, request: CreateRepositoryRequest) -> Repository:
app: ToolShedApp = trans.app
user = trans.user
assert user
category_ids = listify(request.category_ids)
name = request.name
if invalid_message := validate_repository_name(app, name, user):
raise RequestParameterInvalidException(invalid_message)
repo, _ = low_level_create_repository(
app=app,
name=name,
type=request.type_,
description=request.synopsis,
long_description=request.description,
user=user,
category_ids=category_ids,
remote_repository_url=request.remote_repository_url,
homepage_url=request.homepage_url,
)
return repo
[docs]def to_element_dict(app, repository: Repository, include_categories: bool = False) -> Dict[str, Any]:
value_mapper = get_value_mapper(app)
repository_dict = repository.to_dict(view="element", value_mapper=value_mapper)
if include_categories:
repository_dict["category_ids"] = [app.security.encode_id(x.category.id) for x in repository.categories]
return repository_dict
[docs]def repositories_by_category(
app: ToolShedApp,
category_id: str,
page: Optional[int] = None,
sort_key: str = "name",
sort_order: str = "asc",
installable: bool = True,
):
category = get_category(app, category_id)
category_dict: Dict[str, Any]
if category is None:
category_dict = dict(message=f"Unable to locate category record for id {str(id)}.", status="error")
return category_dict
category_dict = category.to_dict(view="element", value_mapper=category_value_mapper(app))
category_dict["repository_count"] = count_repositories_in_category(app, category_id)
repositories = get_repositories_by_category(
app, category.id, installable=installable, sort_order=sort_order, sort_key=sort_key, page=page
)
category_dict["repositories"] = repositories
return category_dict
[docs]def to_model(app, repository: Repository) -> SchemaRepository:
return SchemaRepository(**to_element_dict(app, repository))
[docs]def to_detailed_model(app, repository: Repository) -> DetailedRepository:
return DetailedRepository(**to_element_dict(app, repository))
[docs]def upload_tar_and_set_metadata(
trans: ProvidesRepositoriesContext,
host: str,
repository: Repository,
uploaded_file,
commit_message: str,
dry_run: bool = False,
):
app = trans.app
user = trans.user
assert user
assert user.username
repo_dir = repository.repo_path(app)
tip = repository.tip()
tar_response = upload_tar(
trans,
user.username,
repository,
uploaded_file,
commit_message,
)
(
ok,
message,
_,
content_alert_str,
_,
_,
) = tar_response
if ok:
# Update the repository files for browsing.
hg_util.update_repository(repo_dir)
# Get the new repository tip.
if tip == repository.tip():
raise MalformedContents("No changes to repository.")
else:
rmm = repository_metadata_manager.RepositoryMetadataManager(trans, repository=repository)
_, error_message = rmm.set_repository_metadata_due_to_new_tip(host, content_alert_str=content_alert_str)
if error_message:
raise InternalServerError(error_message)
dd = dependency_display.DependencyDisplayer(app)
if str(repository.type) not in [
rt_util.REPOSITORY_SUITE_DEFINITION,
rt_util.TOOL_DEPENDENCY_DEFINITION,
]:
# Provide a warning message if a tool_dependencies.xml file is provided, but tool dependencies
# weren't loaded due to a requirement tag mismatch or some other problem. Tool dependency
# definitions can define orphan tool dependencies (no relationship to any tools contained in the
# repository), so warning messages are important because orphans are always valid. The repository
# owner must be warned in case they did not intend to define an orphan dependency, but simply
# provided incorrect information (tool shed, name owner, changeset_revision) for the definition.
if repository.metadata_revisions:
# A repository's metadata revisions are order descending by update_time, so the zeroth revision
# will be the tip just after an upload.
metadata_dict = repository.metadata_revisions[0].metadata
else:
metadata_dict = {}
orphan_message = dd.generate_message_for_orphan_tool_dependencies(repository, metadata_dict)
if orphan_message:
message += orphan_message
else:
raise InternalServerError(message)
return message
[docs]def ensure_can_manage(trans: ProvidesUserContext, repository: Repository, error_message: Optional[str] = None) -> None:
if not can_manage_repo(trans, repository):
error_message = error_message or "You do not have permission to update this repository."
raise InsufficientPermissionsException(error_message)
def _get_repository_by_name_and_owner(session: scoped_session, name: str, owner: str, user_model):
stmt = (
select(Repository)
.where(Repository.deprecated == false())
.where(Repository.deleted == false())
.where(Repository.name == name)
.where(user_model.username == owner)
.where(Repository.user_id == user_model.id)
.limit(1)
)
return session.scalars(stmt).first()
def _get_repositories_by_name_and_owner_and_deleted(
session: scoped_session, name: Optional[str], owner: Optional[str], deleted: bool, user_model
):
stmt = select(Repository).where(Repository.deprecated == false()).where(Repository.deleted == deleted)
if owner is not None:
stmt = stmt.where(user_model.username == owner)
stmt = stmt.where(Repository.user_id == user_model.id)
if name is not None:
stmt = stmt.where(Repository.name == name)
stmt = stmt.order_by(Repository.name)
return session.scalars(stmt)