Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.workflow.trs_proxy

import logging
import os
import urllib.parse

import requests
import yaml

from galaxy.exceptions import MessageException
from galaxy.util import (
    asbool,
    DEFAULT_SOCKET_TIMEOUT,
)
from galaxy.util.search import parse_filters

log = logging.getLogger(__name__)

# Make configurable via YAML,JSON
DEFAULT_TRS_SERVERS = [
    {
        "id": "dockstore",
        "api_url": "https://dockstore.org/api",
        "link_url": "https://dockstore.org",
        "label": "Dockstore",
        "doc": "Dockstore is an open platform used by the GA4GH for sharing Docker-based tools and workflows.",
    },
]
GA4GH_GALAXY_DESCRIPTOR = "GALAXY"


[docs]def parse_search_kwds(search_query): filters = { "organization": "organization", "o": "organization", "name": "name", "n": "name", } keyed_terms, description_term = parse_filters(search_query, filters) query_kwd = { "toolClass": "Workflow", "descriptorType": "GALAXY", } if description_term and description_term.strip(): query_kwd["description"] = description_term, if keyed_terms is not None: for (key, value) in keyed_terms: query_kwd[key] = value return query_kwd
[docs]class TrsProxy:
[docs] def __init__(self, config=None): config_file = getattr(config, "trs_servers_config_file", None) if config_file and os.path.exists(config_file): with open(config_file) as f: server_list = yaml.safe_load(f) else: server_list = DEFAULT_TRS_SERVERS self._server_list = server_list if server_list else [] self._server_dict = {t["id"]: t for t in self._server_list}
[docs] def get_servers(self): return self._server_list
[docs] def get_tools(self, trs_server, **kwd): query_kwd = {} for key in ["toolClass", "descriptorType", "description", "name", "organization"]: value = kwd.pop(key, None) if value is not None: query_kwd[key] = value trs_api_url = self._get_api_endpoint(trs_server, **kwd) return self._get(trs_api_url, params=query_kwd)
[docs] def get_tool(self, trs_server, tool_id, **kwd): trs_api_url = self._get_tool_api_endpoint(trs_server, tool_id, **kwd) return self._get(trs_api_url)
[docs] def get_versions(self, trs_server, tool_id, **kwd): trs_api_url = f"{self._get_tool_api_endpoint(trs_server, tool_id, **kwd)}/versions" return self._get(trs_api_url)
[docs] def get_version(self, trs_server, tool_id, version_id, **kwd): trs_api_url = f"{self._get_tool_api_endpoint(trs_server, tool_id, **kwd)}/versions/{version_id}" return self._get(trs_api_url)
[docs] def get_version_descriptor(self, trs_server, tool_id, version_id, **kwd): trs_api_url = f"{self._get_tool_api_endpoint(trs_server, tool_id, **kwd)}/versions/{version_id}/{GA4GH_GALAXY_DESCRIPTOR}/descriptor" return self._get(trs_api_url)["content"]
def _quote(self, tool_id, **kwd): if asbool(kwd.get("tool_id_b64_encoded", False)): import base64 tool_id = base64.b64decode(tool_id) tool_id = urllib.parse.quote_plus(tool_id) return tool_id def _get(self, url, params=None): response = requests.get(url, params=params, timeout=DEFAULT_SOCKET_TIMEOUT) if response.ok: return response.json() else: code = response.status_code message = response.text try: trs_error_dict = response.json() code = int(trs_error_dict["code"]) message = trs_error_dict["message"] except Exception: pass raise MessageException.from_code(code, message) def _get_api_endpoint(self, trs_server, **kwd): trs_url = self._server_dict[trs_server]["api_url"] trs_api_endpoint = f"{trs_url}/ga4gh/trs/v2/tools" return trs_api_endpoint def _get_tool_api_endpoint(self, trs_server, tool_id, **kwd): tool_id = self._quote(tool_id, **kwd) trs_api_url = f"{self._get_api_endpoint(trs_server, **kwd)}/{tool_id}" return trs_api_url