Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.biotools.source

import functools
import json
import os
from typing import (
    Callable,
    Dict,
    List,
    Optional,
)

from galaxy.util import (
    DEFAULT_SOCKET_TIMEOUT,
    requests,
)
from .interface import BiotoolsEntry


[docs]class BiotoolsMetadataSource:
[docs] def get_biotools_metadata(self, biotools_reference: str) -> Optional[BiotoolsEntry]: """Return a BiotoolsEntry if available."""
[docs]class GitContentBiotoolsMetadataSource(BiotoolsMetadataSource): """Parse entries from a repository clone of https://github.com/bio-tools/content."""
[docs] def __init__(self, content_directory): self._content_directory = content_directory
[docs] def get_biotools_metadata(self, biotools_reference: str) -> Optional[BiotoolsEntry]: """Return a BiotoolsEntry if available.""" path = os.path.join(self._content_directory, "data", biotools_reference, f"{biotools_reference}.biotools.json") if not os.path.exists(path): return None with open(path) as f: content_json = json.load(f) return BiotoolsEntry.from_json(content_json)
[docs]class InMemoryCache: backend: Dict[str, Optional[str]] = {}
[docs] def get(self, key: str, createfunc: Callable[[], Optional[str]]): backend = self.backend if key not in backend: backend[key] = createfunc() return backend.get(key)
[docs]class ApiBiotoolsMetadataSource(BiotoolsMetadataSource): """Parse entries from bio.tools API."""
[docs] def __init__(self, cache=None): self._cache = cache or InMemoryCache()
def _raw_get_metadata(self, biotools_reference) -> Optional[str]: api_url = f"https://bio.tools/api/tool/{biotools_reference}?format=json" try: req = requests.get(api_url, timeout=DEFAULT_SOCKET_TIMEOUT) req.raise_for_status() req.encoding = req.apparent_encoding return req.text except Exception: return None
[docs] def get_biotools_metadata(self, biotools_reference: str) -> Optional[BiotoolsEntry]: createfunc = functools.partial(self._raw_get_metadata, biotools_reference) content = self._cache.get(key=biotools_reference, createfunc=createfunc) if content is not None: return BiotoolsEntry.from_json(json.loads(content)) else: return None
[docs]class CascadingBiotoolsMetadataSource(BiotoolsMetadataSource):
[docs] def __init__(self, use_api=False, cache=None, content_directory: Optional[str] = None): sources: List[BiotoolsMetadataSource] = [] if content_directory: git_content_source = GitContentBiotoolsMetadataSource(content_directory) sources.append(git_content_source) if use_api: api_metadata_source = ApiBiotoolsMetadataSource(cache=cache) sources.append(api_metadata_source) self._sources = sources
[docs] def get_biotools_metadata(self, biotools_reference: str) -> Optional[BiotoolsEntry]: for source in self._sources: entry = source.get_biotools_metadata(biotools_reference) if entry is not None: return entry return None
[docs]class BiotoolsMetadataSourceConfig: use_api: bool = False content_directory: Optional[str] = None cache = None
[docs]def get_biotools_metadata_source(metadata_source_config: BiotoolsMetadataSourceConfig) -> BiotoolsMetadataSource: return CascadingBiotoolsMetadataSource( use_api=metadata_source_config.use_api, content_directory=metadata_source_config.content_directory, cache=metadata_source_config.cache, )