Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.managers.repositories

"""
Manager and Serializer for TS repositories.
"""
import json
import logging
from collections import namedtuple
from time import strftime
from typing import (
    Any,
    Callable,
    cast,
    Dict,
    List,
    Optional,
    Union,
)

from pydantic import BaseModel
from sqlalchemy import (
    false,
    select,
)
from sqlalchemy.orm import scoped_session

from galaxy import web
from galaxy.exceptions import (
    ConfigDoesNotAllowException,
    InsufficientPermissionsException,
    InternalServerError,
    MalformedContents,
    ObjectNotFound,
    RequestParameterInvalidException,
)
from galaxy.tool_shed.util import dependency_display
from galaxy.util import listify
from galaxy.util.tool_shed.encoding_util import tool_shed_encode
from tool_shed.context import (
    ProvidesRepositoriesContext,
    ProvidesUserContext,
)
from tool_shed.metadata import repository_metadata_manager
from tool_shed.repository_types import util as rt_util
from tool_shed.structured_app import ToolShedApp
from tool_shed.util import hg_util
from tool_shed.util.metadata_util import (
    get_all_dependencies,
    get_current_repository_metadata_for_changeset_revision,
    get_metadata_revisions,
    get_next_downloadable_changeset_revision,
    get_repository_metadata_by_changeset_revision,
)
from tool_shed.util.readme_util import build_readme_files_dict
from tool_shed.util.repository_content_util import upload_tar
from tool_shed.util.repository_util import (
    create_repository as low_level_create_repository,
    get_repo_info_dict,
    get_repositories_by_category,
    get_repository_by_name_and_owner,
    get_repository_in_tool_shed,
    validate_repository_name,
)
from tool_shed.util.shed_util_common import (
    count_repositories_in_category,
    get_category,
)
from tool_shed.util.tool_util import generate_message_for_invalid_tools
from tool_shed.webapp.model import (
    Repository,
    RepositoryMetadata,
)
from tool_shed.webapp.search.repo_search import RepoSearch
from tool_shed_client.schema import (
    CreateRepositoryRequest,
    DetailedRepository,
    ExtraRepoInfo,
    LegacyInstallInfoTuple,
    Repository as SchemaRepository,
    RepositoryMetadataInstallInfoDict,
    ResetMetadataOnRepositoryResponse,
)
from .categories import get_value_mapper as category_value_mapper

log = logging.getLogger(__name__)





[docs]class UpdatesRequest(BaseModel): name: Optional[str] = None owner: Optional[str] = None changeset_revision: str hexlify: bool = True
[docs]def check_updates(app: ToolShedApp, request: UpdatesRequest) -> Union[str, Dict[str, Any]]: name = request.name owner = request.owner changeset_revision = request.changeset_revision hexlify_this = request.hexlify repository = get_repository_by_name_and_owner( app, name, owner, eagerload_columns=[Repository.downloadable_revisions] ) if repository and repository.downloadable_revisions: repository_metadata = get_repository_metadata_by_changeset_revision( app, app.security.encode_id(repository.id), changeset_revision ) tool_shed_status_dict = {} # Handle repository deprecation. tool_shed_status_dict["repository_deprecated"] = str(repository.deprecated) tip_revision = repository.downloadable_revisions[0] # Handle latest installable revision. if changeset_revision == tip_revision: tool_shed_status_dict["latest_installable_revision"] = "True" else: next_installable_revision = get_next_downloadable_changeset_revision(app, repository, changeset_revision) if repository_metadata is None: if next_installable_revision and next_installable_revision != changeset_revision: tool_shed_status_dict["latest_installable_revision"] = "True" else: tool_shed_status_dict["latest_installable_revision"] = "False" else: if next_installable_revision and next_installable_revision != changeset_revision: tool_shed_status_dict["latest_installable_revision"] = "False" else: tool_shed_status_dict["latest_installable_revision"] = "True" # Handle revision updates. if changeset_revision == tip_revision: tool_shed_status_dict["revision_update"] = "False" else: if repository_metadata is None: tool_shed_status_dict["revision_update"] = "True" else: tool_shed_status_dict["revision_update"] = "False" # Handle revision upgrades. metadata_revisions = [revision[1] for revision in get_metadata_revisions(app, repository)] num_metadata_revisions = len(metadata_revisions) for index, metadata_revision in enumerate(metadata_revisions): if index == num_metadata_revisions: tool_shed_status_dict["revision_upgrade"] = "False" break if metadata_revision == changeset_revision: if num_metadata_revisions - index > 1: tool_shed_status_dict["revision_upgrade"] = "True" else: tool_shed_status_dict["revision_upgrade"] = "False" break return tool_shed_encode(tool_shed_status_dict) if hexlify_this else json.dumps(tool_shed_status_dict) return tool_shed_encode({}) if hexlify_this else json.dumps({})
[docs]def guid_to_repository(app: ToolShedApp, tool_id: str) -> "Repository": # tool_id = remove_protocol_and_user_from_clone_url(tool_id) shed, _, owner, name, rest = tool_id.split("/", 5) return _get_repository_by_name_and_owner(app.model.context, name, owner, app.model.User)
[docs]def index_tool_ids(app: ToolShedApp, tool_ids: List[str]) -> Dict[str, Any]: repository_found = [] all_metadata = dict() for tool_id in tool_ids: repository = guid_to_repository(app, tool_id) owner = repository.user.username name = repository.name repository = _get_repository_by_name_and_owner(app.model.context.current, name, owner, app.model.User) if not repository: log.warning(f"Repository {owner}/{name} does not exist, skipping") continue for changeset, changehash in repository.installable_revisions(app): metadata = get_current_repository_metadata_for_changeset_revision(app, repository, changehash) tools: Optional[List[Dict[str, Any]]] = metadata.metadata.get("tools") if not tools: log.warning(f"Repository {owner}/{name}/{changehash} does not contain valid tools, skipping") continue for tool_metadata in tools: if tool_metadata["guid"] in tool_ids: repository_found.append("%d:%s" % (int(changeset), changehash)) metadata = get_current_repository_metadata_for_changeset_revision(app, repository, changehash) if metadata is None: continue metadata_dict = metadata.to_dict( value_mapper={"id": app.security.encode_id, "repository_id": app.security.encode_id} ) metadata_dict["repository"] = repository.to_dict(value_mapper={"id": app.security.encode_id}) if metadata.has_repository_dependencies: metadata_dict["repository_dependencies"] = get_all_dependencies( app, metadata, processed_dependency_links=[] ) else: metadata_dict["repository_dependencies"] = [] if metadata.includes_tool_dependencies: metadata_dict["tool_dependencies"] = repository.get_tool_dependencies(app, changehash) else: metadata_dict["tool_dependencies"] = {} if metadata.includes_tools: metadata_dict["tools"] = metadata.metadata["tools"] all_metadata[f"{int(changeset)}:{changehash}"] = metadata_dict if repository_found: all_metadata["current_changeset"] = repository_found[0] # all_metadata[ 'found_changesets' ] = repository_found return all_metadata else: return {}
[docs]def index_repositories(app: ToolShedApp, name: Optional[str], owner: Optional[str], deleted: bool): return list( _get_repositories_by_name_and_owner_and_deleted(app.model.context, name, owner, deleted, app.model.User) )
[docs]def can_manage_repo(trans: ProvidesUserContext, repository: Repository) -> bool: security_agent = trans.app.security_agent return trans.user_is_admin or security_agent.user_can_administer_repository(trans.user, repository)
[docs]def can_update_repo(trans: ProvidesUserContext, repository: Repository) -> bool: app = trans.app security_agent = app.security_agent return can_manage_repo(trans, repository) or security_agent.can_push(app, trans.user, repository)
[docs]def get_repository_metadata_for_management( trans: ProvidesUserContext, encoded_repository_id: str, changeset_revision: str ) -> RepositoryMetadata: repository = get_repository_in_tool_shed(trans.app, encoded_repository_id) if not can_manage_repo(trans, repository): raise InsufficientPermissionsException("Cannot manage target repository") revisions = [r for r in repository.metadata_revisions if r.changeset_revision == changeset_revision] if len(revisions) != 1: raise ObjectNotFound() repository_metadata = revisions[0] return repository_metadata
[docs]def get_install_info(trans: ProvidesRepositoriesContext, name, owner, changeset_revision) -> LegacyInstallInfoTuple: app = trans.app value_mapper = get_value_mapper(app) # Example URL: # http://<xyz>/api/repositories/get_repository_revision_install_info?name=<n>&owner=<o>&changeset_revision=<cr> if name and owner and changeset_revision: # Get the repository information. repository = get_repository_by_name_and_owner( app, name, owner, eagerload_columns=[Repository.downloadable_revisions] ) if repository is None: log.debug(f"Cannot locate repository {name} owned by {owner}") return {}, {}, {} encoded_repository_id = app.security.encode_id(repository.id) repository_dict: dict = repository.to_dict(view="element", value_mapper=value_mapper) repository_dict["url"] = web.url_for(controller="repositories", action="show", id=encoded_repository_id) # Get the repository_metadata information. repository_metadata = get_repository_metadata_by_changeset_revision( app, encoded_repository_id, changeset_revision ) if repository_metadata is None: # The changeset_revision column in the repository_metadata table has been updated with a new # value value, so find the changeset_revision to which we need to update. new_changeset_revision = get_next_downloadable_changeset_revision(app, repository, changeset_revision) repository_metadata = get_repository_metadata_by_changeset_revision( app, encoded_repository_id, new_changeset_revision ) changeset_revision = new_changeset_revision if repository_metadata is not None: encoded_repository_metadata_id = app.security.encode_id(repository_metadata.id) repository_metadata_dict: RepositoryMetadataInstallInfoDict = cast( RepositoryMetadataInstallInfoDict, repository_metadata.to_dict(view="collection", value_mapper=value_mapper), ) repository_metadata_dict["url"] = web.url_for( controller="repository_revisions", action="show", id=encoded_repository_metadata_id ) if "tools" in repository_metadata.metadata: repository_metadata_dict["valid_tools"] = repository_metadata.metadata["tools"] # Get the repo_info_dict for installing the repository. repo_info_dict: ExtraRepoInfo ( repo_info_dict, includes_tools, includes_tool_dependencies, includes_tools_for_display_in_tool_panel, has_repository_dependencies, has_repository_dependencies_only_if_compiling_contained_td, ) = get_repo_info_dict(trans, encoded_repository_id, changeset_revision) return repository_dict, repository_metadata_dict, repo_info_dict else: log.debug( "Unable to locate repository_metadata record for repository id %s and changeset_revision %s", repository.id, changeset_revision, ) return repository_dict, {}, {} else: debug_msg = "Error in the Tool Shed repositories API in get_repository_revision_install_info: " debug_msg += f"Invalid name {name} or owner {owner} or changeset_revision {changeset_revision} received." log.debug(debug_msg) return {}, {}, {}
[docs]def get_value_mapper(app: ToolShedApp) -> Dict[str, Callable]: value_mapper = { "id": app.security.encode_id, "repository_id": app.security.encode_id, "user_id": app.security.encode_id, } return value_mapper
[docs]def get_ordered_installable_revisions( app: ToolShedApp, name: Optional[str], owner: Optional[str], tsr_id: Optional[str] ) -> List[str]: eagerload_columns = [Repository.downloadable_revisions] if None not in [name, owner]: # Get the repository information. repository = get_repository_by_name_and_owner(app, name, owner, eagerload_columns=eagerload_columns) if repository is None: raise ObjectNotFound(f"No repository named {name} found with owner {owner}") elif tsr_id is not None: repository = get_repository_in_tool_shed(app, tsr_id, eagerload_columns=eagerload_columns) else: error_message = "Error in the Tool Shed repositories API in get_ordered_installable_revisions: " error_message += "invalid parameters received." log.debug(error_message) return [] return [revision[1] for revision in repository.installable_revisions(app, sort_revisions=True)]
[docs]def get_repository_metadata_dict(app: ToolShedApp, id: str, recursive: bool, downloadable_only: bool) -> Dict[str, Any]: all_metadata = {} repository = get_repository_in_tool_shed(app, id, eagerload_columns=[Repository.downloadable_revisions]) for changeset, changehash in get_metadata_revisions( app, repository, sort_revisions=True, downloadable=downloadable_only ): metadata = get_current_repository_metadata_for_changeset_revision(app, repository, changehash) if metadata is None: continue metadata_dict = metadata.to_dict( value_mapper={"id": app.security.encode_id, "repository_id": app.security.encode_id} ) metadata_dict["repository"] = repository.to_dict(value_mapper={"id": app.security.encode_id}) if metadata.has_repository_dependencies and recursive: metadata_dict["repository_dependencies"] = get_all_dependencies( app, metadata, processed_dependency_links=[] ) else: metadata_dict["repository_dependencies"] = [] if metadata.includes_tools: metadata_dict["tools"] = metadata.metadata["tools"] metadata_dict["invalid_tools"] = metadata.metadata.get("invalid_tools", []) all_metadata[f"{int(changeset)}:{changehash}"] = metadata_dict return all_metadata
[docs]def readmes(app: ToolShedApp, repository: Repository, changeset_revision: str) -> dict: encoded_repository_id = app.security.encode_id(repository.id) repository_metadata = get_repository_metadata_by_changeset_revision(app, encoded_repository_id, changeset_revision) if repository_metadata: metadata = repository_metadata.metadata if metadata: return build_readme_files_dict(app, repository, changeset_revision, repository_metadata.metadata) return {}
[docs]def reset_metadata_on_repository(trans: ProvidesUserContext, repository_id) -> ResetMetadataOnRepositoryResponse: app: ToolShedApp = trans.app def handle_repository(trans, start_time, repository): results = dict(start_time=start_time, repository_status=[]) try: rmm = repository_metadata_manager.RepositoryMetadataManager( trans, repository=repository, resetting_all_metadata_on_repository=True, updating_installed_repository=False, persist=False, ) rmm.reset_all_metadata_on_repository_in_tool_shed() rmm_invalid_file_tups = rmm.get_invalid_file_tups() if rmm_invalid_file_tups: message = generate_message_for_invalid_tools( app, rmm_invalid_file_tups, repository, None, as_html=False ) results["status"] = "warning" else: message = ( f"Successfully reset metadata on repository {repository.name} owned by {repository.user.username}" ) results["status"] = "ok" except Exception as e: message = ( f"Error resetting metadata on repository {repository.name} owned by {repository.user.username}: {e}" ) results["status"] = "error" status = f"{repository.name} : {message}" results["repository_status"].append(status) return results if repository_id is not None: repository = get_repository_in_tool_shed(app, repository_id) start_time = strftime("%Y-%m-%d %H:%M:%S") log.debug(f"{start_time}...resetting metadata on repository {repository.name}") results = handle_repository(trans, start_time, repository) stop_time = strftime("%Y-%m-%d %H:%M:%S") results["stop_time"] = stop_time return ResetMetadataOnRepositoryResponse(**results)
[docs]def create_repository(trans: ProvidesUserContext, request: CreateRepositoryRequest) -> Repository: app: ToolShedApp = trans.app user = trans.user assert user category_ids = listify(request.category_ids) name = request.name invalid_message = validate_repository_name(app, name, user) if invalid_message: raise RequestParameterInvalidException(invalid_message) repo, _ = low_level_create_repository( app=app, name=name, type=request.type_, description=request.synopsis, long_description=request.description, user=user, category_ids=category_ids, remote_repository_url=request.remote_repository_url, homepage_url=request.homepage_url, ) return repo
[docs]def to_element_dict(app, repository: Repository, include_categories: bool = False) -> Dict[str, Any]: value_mapper = get_value_mapper(app) repository_dict = repository.to_dict(view="element", value_mapper=value_mapper) if include_categories: repository_dict["category_ids"] = [app.security.encode_id(x.category.id) for x in repository.categories] return repository_dict
[docs]def repositories_by_category( app: ToolShedApp, category_id: str, page: Optional[int] = None, sort_key: str = "name", sort_order: str = "asc", installable: bool = True, ): category = get_category(app, category_id) category_dict: Dict[str, Any] if category is None: category_dict = dict(message=f"Unable to locate category record for id {str(id)}.", status="error") return category_dict category_dict = category.to_dict(view="element", value_mapper=category_value_mapper(app)) category_dict["repository_count"] = count_repositories_in_category(app, category_id) repositories = get_repositories_by_category( app, category.id, installable=installable, sort_order=sort_order, sort_key=sort_key, page=page ) category_dict["repositories"] = repositories return category_dict
[docs]def to_model(app, repository: Repository) -> SchemaRepository: return SchemaRepository(**to_element_dict(app, repository))
[docs]def to_detailed_model(app, repository: Repository) -> DetailedRepository: return DetailedRepository(**to_element_dict(app, repository))
[docs]def upload_tar_and_set_metadata( trans: ProvidesRepositoriesContext, host: str, repository: Repository, uploaded_file, commit_message: str, dry_run: bool = False, ): app = trans.app user = trans.user assert user repo_dir = repository.repo_path(app) tip = repository.tip() tar_response = upload_tar( trans, user.username, repository, uploaded_file, commit_message, ) ( ok, message, _, content_alert_str, _, _, ) = tar_response if ok: # Update the repository files for browsing. hg_util.update_repository(repo_dir) # Get the new repository tip. if tip == repository.tip(): raise MalformedContents("No changes to repository.") else: rmm = repository_metadata_manager.RepositoryMetadataManager(trans, repository=repository) _, error_message = rmm.set_repository_metadata_due_to_new_tip(host, content_alert_str=content_alert_str) if error_message: raise InternalServerError(error_message) dd = dependency_display.DependencyDisplayer(app) if str(repository.type) not in [ rt_util.REPOSITORY_SUITE_DEFINITION, rt_util.TOOL_DEPENDENCY_DEFINITION, ]: # Provide a warning message if a tool_dependencies.xml file is provided, but tool dependencies # weren't loaded due to a requirement tag mismatch or some other problem. Tool dependency # definitions can define orphan tool dependencies (no relationship to any tools contained in the # repository), so warning messages are important because orphans are always valid. The repository # owner must be warned in case they did not intend to define an orphan dependency, but simply # provided incorrect information (tool shed, name owner, changeset_revision) for the definition. if repository.metadata_revisions: # A repository's metadata revisions are order descending by update_time, so the zeroth revision # will be the tip just after an upload. metadata_dict = repository.metadata_revisions[0].metadata else: metadata_dict = {} orphan_message = dd.generate_message_for_orphan_tool_dependencies(repository, metadata_dict) if orphan_message: message += orphan_message else: raise InternalServerError(message) return message
def _get_repository_by_name_and_owner(session: scoped_session, name: str, owner: str, user_model): stmt = ( select(Repository) .where(Repository.deprecated == false()) .where(Repository.deleted == false()) .where(Repository.name == name) .where(user_model.username == owner) .where(Repository.user_id == user_model.id) .limit(1) ) return session.scalars(stmt).first() def _get_repositories_by_name_and_owner_and_deleted( session: scoped_session, name: Optional[str], owner: Optional[str], deleted: bool, user_model ): stmt = select(Repository).where(Repository.deprecated == false()).where(Repository.deleted == deleted) if owner is not None: stmt = stmt.where(user_model.username == owner) stmt = stmt.where(Repository.user_id == user_model.id) if name is not None: stmt = stmt.where(Repository.name == name) stmt = stmt.order_by(Repository.name) return session.scalars(stmt)