"""
Manager and Serializer for TS repositories.
"""
import json
import logging
from collections import namedtuple
from time import strftime
from typing import (
Any,
Callable,
cast,
Dict,
List,
Optional,
Union,
)
from pydantic import BaseModel
from sqlalchemy import (
false,
select,
)
from sqlalchemy.orm import scoped_session
from galaxy import web
from galaxy.exceptions import (
ConfigDoesNotAllowException,
InsufficientPermissionsException,
InternalServerError,
MalformedContents,
ObjectNotFound,
RequestParameterInvalidException,
)
from galaxy.tool_shed.util import dependency_display
from galaxy.util import listify
from galaxy.util.tool_shed.encoding_util import tool_shed_encode
from tool_shed.context import (
ProvidesRepositoriesContext,
ProvidesUserContext,
)
from tool_shed.metadata import repository_metadata_manager
from tool_shed.repository_types import util as rt_util
from tool_shed.structured_app import ToolShedApp
from tool_shed.util import hg_util
from tool_shed.util.metadata_util import (
get_all_dependencies,
get_current_repository_metadata_for_changeset_revision,
get_metadata_revisions,
get_next_downloadable_changeset_revision,
get_repository_metadata_by_changeset_revision,
)
from tool_shed.util.readme_util import build_readme_files_dict
from tool_shed.util.repository_content_util import upload_tar
from tool_shed.util.repository_util import (
create_repository as low_level_create_repository,
get_repo_info_dict,
get_repositories_by_category,
get_repository_by_name_and_owner,
get_repository_in_tool_shed,
validate_repository_name,
)
from tool_shed.util.shed_util_common import (
count_repositories_in_category,
get_category,
)
from tool_shed.util.tool_util import generate_message_for_invalid_tools
from tool_shed.webapp.model import (
Repository,
RepositoryMetadata,
)
from tool_shed.webapp.search.repo_search import RepoSearch
from tool_shed_client.schema import (
CreateRepositoryRequest,
DetailedRepository,
ExtraRepoInfo,
LegacyInstallInfoTuple,
Repository as SchemaRepository,
RepositoryMetadataInstallInfoDict,
ResetMetadataOnRepositoryResponse,
)
from .categories import get_value_mapper as category_value_mapper
log = logging.getLogger(__name__)
[docs]def search(trans: ProvidesUserContext, q: str, page: int = 1, page_size: int = 10):
"""
Perform the search over TS repositories.
Note that search works over the Whoosh index which you have
to pre-create with scripts/tool_shed/build_ts_whoosh_index.sh manually.
Also TS config option toolshed_search_on has to be True and
whoosh_index_dir has to be specified.
"""
app = trans.app
conf = app.config
if not conf.toolshed_search_on:
raise ConfigDoesNotAllowException("Searching the TS through the API is turned off for this instance.")
if not conf.whoosh_index_dir:
raise ConfigDoesNotAllowException(
"There is no directory for the search index specified. Please contact the administrator."
)
search_term = q.strip()
if len(search_term) < 1:
raise RequestParameterInvalidException("The search term has to be at least one character long.")
repo_search = RepoSearch()
Boosts = namedtuple(
"Boosts",
[
"repo_name_boost",
"repo_description_boost",
"repo_long_description_boost",
"repo_homepage_url_boost",
"repo_remote_repository_url_boost",
"categories_boost",
"repo_owner_username_boost",
],
)
boosts = Boosts(
float(conf.get("repo_name_boost", 0.9)),
float(conf.get("repo_description_boost", 0.6)),
float(conf.get("repo_long_description_boost", 0.5)),
float(conf.get("repo_homepage_url_boost", 0.3)),
float(conf.get("repo_remote_repository_url_boost", 0.2)),
float(conf.get("categories_boost", 0.5)),
float(conf.get("repo_owner_username_boost", 0.3)),
)
results = repo_search.search(trans, search_term, page, page_size, boosts)
results["hostname"] = web.url_for("/", qualified=True)
return results
[docs]class UpdatesRequest(BaseModel):
name: Optional[str] = None
owner: Optional[str] = None
changeset_revision: str
hexlify: bool = True
[docs]def check_updates(app: ToolShedApp, request: UpdatesRequest) -> Union[str, Dict[str, Any]]:
name = request.name
owner = request.owner
changeset_revision = request.changeset_revision
hexlify_this = request.hexlify
repository = get_repository_by_name_and_owner(
app, name, owner, eagerload_columns=[Repository.downloadable_revisions]
)
if repository and repository.downloadable_revisions:
repository_metadata = get_repository_metadata_by_changeset_revision(
app, app.security.encode_id(repository.id), changeset_revision
)
tool_shed_status_dict = {}
# Handle repository deprecation.
tool_shed_status_dict["repository_deprecated"] = str(repository.deprecated)
tip_revision = repository.downloadable_revisions[0]
# Handle latest installable revision.
if changeset_revision == tip_revision:
tool_shed_status_dict["latest_installable_revision"] = "True"
else:
next_installable_revision = get_next_downloadable_changeset_revision(app, repository, changeset_revision)
if repository_metadata is None:
if next_installable_revision and next_installable_revision != changeset_revision:
tool_shed_status_dict["latest_installable_revision"] = "True"
else:
tool_shed_status_dict["latest_installable_revision"] = "False"
else:
if next_installable_revision and next_installable_revision != changeset_revision:
tool_shed_status_dict["latest_installable_revision"] = "False"
else:
tool_shed_status_dict["latest_installable_revision"] = "True"
# Handle revision updates.
if changeset_revision == tip_revision:
tool_shed_status_dict["revision_update"] = "False"
else:
if repository_metadata is None:
tool_shed_status_dict["revision_update"] = "True"
else:
tool_shed_status_dict["revision_update"] = "False"
# Handle revision upgrades.
metadata_revisions = [revision[1] for revision in get_metadata_revisions(app, repository)]
num_metadata_revisions = len(metadata_revisions)
for index, metadata_revision in enumerate(metadata_revisions):
if index == num_metadata_revisions:
tool_shed_status_dict["revision_upgrade"] = "False"
break
if metadata_revision == changeset_revision:
if num_metadata_revisions - index > 1:
tool_shed_status_dict["revision_upgrade"] = "True"
else:
tool_shed_status_dict["revision_upgrade"] = "False"
break
return tool_shed_encode(tool_shed_status_dict) if hexlify_this else json.dumps(tool_shed_status_dict)
return tool_shed_encode({}) if hexlify_this else json.dumps({})
[docs]def guid_to_repository(app: ToolShedApp, tool_id: str) -> "Repository":
# tool_id = remove_protocol_and_user_from_clone_url(tool_id)
shed, _, owner, name, rest = tool_id.split("/", 5)
return _get_repository_by_name_and_owner(app.model.context, name, owner, app.model.User)
[docs]def index_repositories(app: ToolShedApp, name: Optional[str], owner: Optional[str], deleted: bool):
return list(
_get_repositories_by_name_and_owner_and_deleted(app.model.context, name, owner, deleted, app.model.User)
)
[docs]def can_manage_repo(trans: ProvidesUserContext, repository: Repository) -> bool:
security_agent = trans.app.security_agent
return trans.user_is_admin or security_agent.user_can_administer_repository(trans.user, repository)
[docs]def can_update_repo(trans: ProvidesUserContext, repository: Repository) -> bool:
app = trans.app
security_agent = app.security_agent
return can_manage_repo(trans, repository) or security_agent.can_push(app, trans.user, repository)
[docs]def get_install_info(trans: ProvidesRepositoriesContext, name, owner, changeset_revision) -> LegacyInstallInfoTuple:
app = trans.app
value_mapper = get_value_mapper(app)
# Example URL:
# http://<xyz>/api/repositories/get_repository_revision_install_info?name=<n>&owner=<o>&changeset_revision=<cr>
if name and owner and changeset_revision:
# Get the repository information.
repository = get_repository_by_name_and_owner(
app, name, owner, eagerload_columns=[Repository.downloadable_revisions]
)
if repository is None:
log.debug(f"Cannot locate repository {name} owned by {owner}")
return {}, {}, {}
encoded_repository_id = app.security.encode_id(repository.id)
repository_dict: dict = repository.to_dict(view="element", value_mapper=value_mapper)
repository_dict["url"] = web.url_for(controller="repositories", action="show", id=encoded_repository_id)
# Get the repository_metadata information.
repository_metadata = get_repository_metadata_by_changeset_revision(
app, encoded_repository_id, changeset_revision
)
if repository_metadata is None:
# The changeset_revision column in the repository_metadata table has been updated with a new
# value value, so find the changeset_revision to which we need to update.
new_changeset_revision = get_next_downloadable_changeset_revision(app, repository, changeset_revision)
repository_metadata = get_repository_metadata_by_changeset_revision(
app, encoded_repository_id, new_changeset_revision
)
changeset_revision = new_changeset_revision
if repository_metadata is not None:
encoded_repository_metadata_id = app.security.encode_id(repository_metadata.id)
repository_metadata_dict: RepositoryMetadataInstallInfoDict = cast(
RepositoryMetadataInstallInfoDict,
repository_metadata.to_dict(view="collection", value_mapper=value_mapper),
)
repository_metadata_dict["url"] = web.url_for(
controller="repository_revisions", action="show", id=encoded_repository_metadata_id
)
if "tools" in repository_metadata.metadata:
repository_metadata_dict["valid_tools"] = repository_metadata.metadata["tools"]
# Get the repo_info_dict for installing the repository.
repo_info_dict: ExtraRepoInfo
(
repo_info_dict,
includes_tools,
includes_tool_dependencies,
includes_tools_for_display_in_tool_panel,
has_repository_dependencies,
has_repository_dependencies_only_if_compiling_contained_td,
) = get_repo_info_dict(trans, encoded_repository_id, changeset_revision)
return repository_dict, repository_metadata_dict, repo_info_dict
else:
log.debug(
"Unable to locate repository_metadata record for repository id %s and changeset_revision %s",
repository.id,
changeset_revision,
)
return repository_dict, {}, {}
else:
debug_msg = "Error in the Tool Shed repositories API in get_repository_revision_install_info: "
debug_msg += f"Invalid name {name} or owner {owner} or changeset_revision {changeset_revision} received."
log.debug(debug_msg)
return {}, {}, {}
[docs]def get_value_mapper(app: ToolShedApp) -> Dict[str, Callable]:
value_mapper = {
"id": app.security.encode_id,
"repository_id": app.security.encode_id,
"user_id": app.security.encode_id,
}
return value_mapper
[docs]def get_ordered_installable_revisions(
app: ToolShedApp, name: Optional[str], owner: Optional[str], tsr_id: Optional[str]
) -> List[str]:
eagerload_columns = [Repository.downloadable_revisions]
if None not in [name, owner]:
# Get the repository information.
repository = get_repository_by_name_and_owner(app, name, owner, eagerload_columns=eagerload_columns)
if repository is None:
raise ObjectNotFound(f"No repository named {name} found with owner {owner}")
elif tsr_id is not None:
repository = get_repository_in_tool_shed(app, tsr_id, eagerload_columns=eagerload_columns)
else:
error_message = "Error in the Tool Shed repositories API in get_ordered_installable_revisions: "
error_message += "invalid parameters received."
log.debug(error_message)
return []
return [revision[1] for revision in repository.installable_revisions(app, sort_revisions=True)]
[docs]def readmes(app: ToolShedApp, repository: Repository, changeset_revision: str) -> dict:
encoded_repository_id = app.security.encode_id(repository.id)
repository_metadata = get_repository_metadata_by_changeset_revision(app, encoded_repository_id, changeset_revision)
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
return build_readme_files_dict(app, repository, changeset_revision, repository_metadata.metadata)
return {}
[docs]def create_repository(trans: ProvidesUserContext, request: CreateRepositoryRequest) -> Repository:
app: ToolShedApp = trans.app
user = trans.user
assert user
category_ids = listify(request.category_ids)
name = request.name
if invalid_message := validate_repository_name(app, name, user):
raise RequestParameterInvalidException(invalid_message)
repo, _ = low_level_create_repository(
app=app,
name=name,
type=request.type_,
description=request.synopsis,
long_description=request.description,
user=user,
category_ids=category_ids,
remote_repository_url=request.remote_repository_url,
homepage_url=request.homepage_url,
)
return repo
[docs]def to_element_dict(app, repository: Repository, include_categories: bool = False) -> Dict[str, Any]:
value_mapper = get_value_mapper(app)
repository_dict = repository.to_dict(view="element", value_mapper=value_mapper)
if include_categories:
repository_dict["category_ids"] = [app.security.encode_id(x.category.id) for x in repository.categories]
return repository_dict
[docs]def repositories_by_category(
app: ToolShedApp,
category_id: str,
page: Optional[int] = None,
sort_key: str = "name",
sort_order: str = "asc",
installable: bool = True,
):
category = get_category(app, category_id)
category_dict: Dict[str, Any]
if category is None:
category_dict = dict(message=f"Unable to locate category record for id {str(id)}.", status="error")
return category_dict
category_dict = category.to_dict(view="element", value_mapper=category_value_mapper(app))
category_dict["repository_count"] = count_repositories_in_category(app, category_id)
repositories = get_repositories_by_category(
app, category.id, installable=installable, sort_order=sort_order, sort_key=sort_key, page=page
)
category_dict["repositories"] = repositories
return category_dict
[docs]def to_model(app, repository: Repository) -> SchemaRepository:
return SchemaRepository(**to_element_dict(app, repository))
[docs]def to_detailed_model(app, repository: Repository) -> DetailedRepository:
return DetailedRepository(**to_element_dict(app, repository))
def _get_repository_by_name_and_owner(session: scoped_session, name: str, owner: str, user_model):
stmt = (
select(Repository)
.where(Repository.deprecated == false())
.where(Repository.deleted == false())
.where(Repository.name == name)
.where(user_model.username == owner)
.where(Repository.user_id == user_model.id)
.limit(1)
)
return session.scalars(stmt).first()
def _get_repositories_by_name_and_owner_and_deleted(
session: scoped_session, name: Optional[str], owner: Optional[str], deleted: bool, user_model
):
stmt = select(Repository).where(Repository.deprecated == false()).where(Repository.deleted == deleted)
if owner is not None:
stmt = stmt.where(user_model.username == owner)
stmt = stmt.where(Repository.user_id == user_model.id)
if name is not None:
stmt = stmt.where(Repository.name == name)
stmt = stmt.order_by(Repository.name)
return session.scalars(stmt)