Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.client.staging

"""Client for staging inputs for Galaxy Tools and Workflows.

Implement as a connector to serve a bridge between galactic_job_json
utility and a Galaxy API library.
"""

import abc
import json
import logging
import os
from typing import (
    Any,
    BinaryIO,
    Dict,
    List,
    Optional,
    Tuple,
    TYPE_CHECKING,
)

import yaml
from typing_extensions import Literal

from galaxy.tool_util.cwl.util import (
    DirectoryUploadTarget,
    FileLiteralTarget,
    FileUploadTarget,
    galactic_job_json,
    ObjectUploadTarget,
    path_or_uri_to_uri,
    UploadTarget,
)

if TYPE_CHECKING:
    from galaxy_test.base.api import ApiTestInteractor

log = logging.getLogger(__name__)

UPLOAD_TOOL_ID = "upload1"
LOAD_TOOLS_FROM_PATH = True
DEFAULT_USE_FETCH_API = True
DEFAULT_FILE_TYPE = "auto"
DEFAULT_DBKEY = "?"
DEFAULT_DECOMPRESS = False


[docs]class StagingInterface(metaclass=abc.ABCMeta): """Client that parses a job input and populates files into the Galaxy API. Abstract class that must override _post (and optionally other things such _attach_file, _log, etc..) to adapt to bioblend (for Planemo) or using the tool test interactor infrastructure. """ @abc.abstractmethod def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Make a post to the Galaxy API along supplied path.""" def _attach_file(self, path: str) -> BinaryIO: return open(path, "rb") def _tools_post(self, payload: Dict[str, Any]) -> Dict[str, Any]: tool_response = self._post("tools", payload) for job in tool_response.get("jobs", []): self._handle_job(job) return tool_response def _fetch_post(self, payload: Dict[str, Any]) -> Dict[str, Any]: tool_response = self._post("tools/fetch", payload) for job in tool_response.get("jobs", []): self._handle_job(job) return tool_response @abc.abstractmethod def _handle_job(self, job_response): """Implementer can decide if to wait for job(s) individually or not here."""
[docs] def stage( self, tool_or_workflow: Literal["tool", "workflow"], history_id: str, job: Optional[Dict[str, Any]] = None, job_path: Optional[str] = None, use_path_paste: bool = LOAD_TOOLS_FROM_PATH, to_posix_lines: bool = True, job_dir: str = ".", ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: def upload_func_fetch(upload_target: UploadTarget) -> Dict[str, Any]: def _attach_file(upload_payload: Dict[str, Any], uri: str, index: int = 0) -> Dict[str, str]: uri = path_or_uri_to_uri(uri) is_path = uri.startswith("file://") if not is_path or use_path_paste: return {"src": "url", "url": uri} else: path = uri[len("file://") :] upload_payload["__files"][f"files_{index}|file_data"] = self._attach_file(path) return {"src": "files"} fetch_payload = None if isinstance(upload_target, FileUploadTarget): file_path = upload_target.path file_type = upload_target.properties.get("filetype", None) or DEFAULT_FILE_TYPE dbkey = upload_target.properties.get("dbkey", None) or DEFAULT_DBKEY fetch_payload = _fetch_payload( history_id, file_type=file_type, dbkey=dbkey, to_posix_lines=to_posix_lines, decompress=upload_target.properties.get("decompress") or DEFAULT_DECOMPRESS, ) name = _file_path_to_name(file_path) if file_path is not None: src = _attach_file(fetch_payload, file_path) fetch_payload["targets"][0]["elements"][0].update(src) if upload_target.composite_data: composite_items = [] for i, composite_data in enumerate(upload_target.composite_data): composite_item_src = _attach_file(fetch_payload, composite_data, index=i) composite_items.append(composite_item_src) fetch_payload["targets"][0]["elements"][0]["src"] = "composite" fetch_payload["targets"][0]["elements"][0]["composite"] = { "items": composite_items, } tags = upload_target.properties.get("tags") if tags: fetch_payload["targets"][0]["elements"][0]["tags"] = tags fetch_payload["targets"][0]["elements"][0]["name"] = name elif isinstance(upload_target, FileLiteralTarget): fetch_payload = _fetch_payload(history_id) # For file literals - take them as is - never convert line endings. fetch_payload["targets"][0]["elements"][0].update( { "src": "pasted", "paste_content": upload_target.contents, "to_posix_lines": False, } ) tags = upload_target.properties.get("tags") if tags: fetch_payload["targets"][0]["elements"][0]["tags"] = tags elif isinstance(upload_target, DirectoryUploadTarget): fetch_payload = _fetch_payload(history_id, file_type="directory") fetch_payload["targets"][0].pop("elements") tar_path = upload_target.tar_path src = _attach_file(fetch_payload, tar_path) fetch_payload["targets"][0]["elements_from"] = src elif isinstance(upload_target, ObjectUploadTarget): content = json.dumps(upload_target.object) fetch_payload = _fetch_payload(history_id, file_type="expression.json") fetch_payload["targets"][0]["elements"][0].update( { "src": "pasted", "paste_content": content, } ) tags = upload_target.properties.get("tags") if tags: fetch_payload["targets"][0]["elements"][0]["tags"] = tags else: raise ValueError(f"Unsupported type for upload_target: {type(upload_target)}") return self._fetch_post(fetch_payload) # Save legacy upload_func to target older Galaxy servers def upload_func(upload_target: UploadTarget) -> Dict[str, Any]: def _attach_file(upload_payload: Dict[str, Any], uri: str, index: int = 0) -> None: uri = path_or_uri_to_uri(uri) is_path = uri.startswith("file://") if not is_path or use_path_paste: upload_payload["inputs"]["files_%d|url_paste" % index] = uri else: path = uri[len("file://") :] upload_payload["__files"]["files_%d|file_data" % index] = self._attach_file(path) if isinstance(upload_target, FileUploadTarget): file_path = upload_target.path file_type = upload_target.properties.get("filetype", None) or DEFAULT_FILE_TYPE dbkey = upload_target.properties.get("dbkey", None) or DEFAULT_DBKEY upload_payload = _upload_payload( history_id, file_type=file_type, to_posix_lines=dbkey, ) name = _file_path_to_name(file_path) upload_payload["inputs"]["files_0|auto_decompress"] = False upload_payload["inputs"]["auto_decompress"] = False if file_path is not None: _attach_file(upload_payload, file_path) upload_payload["inputs"]["files_0|NAME"] = name if upload_target.secondary_files: _attach_file(upload_payload, upload_target.secondary_files, index=1) upload_payload["inputs"]["files_1|type"] = "upload_dataset" upload_payload["inputs"]["files_1|auto_decompress"] = True upload_payload["inputs"]["file_count"] = "2" upload_payload["inputs"]["force_composite"] = "True" # galaxy.exceptions.RequestParameterInvalidException: Not input source type # defined for input '{'class': 'File', 'filetype': 'imzml', 'composite_data': # ['Example_Continuous.imzML', 'Example_Continuous.ibd']}'.\n"}]] if upload_target.composite_data: for i, composite_data in enumerate(upload_target.composite_data): upload_payload["inputs"][f"files_{i}|type"] = "upload_dataset" _attach_file(upload_payload, composite_data, index=i) self._log(f"upload_payload is {upload_payload}") return self._tools_post(upload_payload) elif isinstance(upload_target, FileLiteralTarget): # For file literals - take them as is - never convert line endings. payload = _upload_payload(history_id, file_type="auto", auto_decompress=False, to_posix_lines=False) payload["inputs"]["files_0|url_paste"] = upload_target.contents return self._tools_post(payload) elif isinstance(upload_target, DirectoryUploadTarget): tar_path = upload_target.tar_path upload_payload = _upload_payload( history_id, file_type="tar", ) upload_payload["inputs"]["files_0|auto_decompress"] = False _attach_file(upload_payload, tar_path) tar_upload_response = self._tools_post(upload_payload) convert_payload = dict( tool_id="CONVERTER_tar_to_directory", tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}}, history_id=history_id, ) convert_response = self._tools_post(convert_payload) assert "outputs" in convert_response, convert_response return convert_response elif isinstance(upload_target, ObjectUploadTarget): content = json.dumps(upload_target.object) payload = _upload_payload(history_id, file_type="expression.json") payload["files_0|url_paste"] = content return self._tools_post(payload) else: raise ValueError(f"Unsupported type for upload_target: {type(upload_target)}") def create_collection_func(element_identifiers: List[Dict[str, Any]], collection_type: str) -> Dict[str, Any]: payload = { "name": "dataset collection", "instance_type": "history", "history_id": history_id, "element_identifiers": element_identifiers, "collection_type": collection_type, "fields": None if collection_type != "record" else "auto", } return self._post("dataset_collections", payload) if job_path is not None: assert job is None with open(job_path) as f: job = yaml.safe_load(f) job_dir = os.path.dirname(os.path.abspath(job_path)) else: assert job is not None if self.use_fetch_api: upload = upload_func_fetch else: upload = upload_func return galactic_job_json( job, job_dir, upload, create_collection_func, tool_or_workflow, )
# extension point for planemo to override logging def _log(self, message): log.debug(message) @property @abc.abstractmethod def use_fetch_api(self): """Return true is this should use (modern) data fetch API."""
[docs]class InteractorStaging(StagingInterface):
[docs] def __init__(self, galaxy_interactor: "ApiTestInteractor", use_fetch_api: bool = DEFAULT_USE_FETCH_API) -> None: self.galaxy_interactor = galaxy_interactor self._use_fetch_api = use_fetch_api
def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]: response = self.galaxy_interactor._post(api_path, payload, json=True) assert response.status_code == 200, response.text return response.json() def _handle_job(self, job_response): self.galaxy_interactor.wait_for_job(job_response["id"]) @property def use_fetch_api(self): return self._use_fetch_api
def _file_path_to_name(file_path: Optional[str]) -> str: if file_path is not None: name = os.path.basename(file_path) else: name = "defaultname" return name def _upload_payload( history_id: str, file_type: str = DEFAULT_FILE_TYPE, dbkey: str = DEFAULT_DBKEY, **kwd ) -> Dict[str, Any]: """Adapted from BioBlend tools client.""" payload: Dict[str, Any] = {} payload["history_id"] = history_id payload["tool_id"] = UPLOAD_TOOL_ID tool_input: Dict[str, Any] = {} tool_input["file_type"] = file_type tool_input["dbkey"] = dbkey if not kwd.get("to_posix_lines", True): tool_input["files_0|to_posix_lines"] = False elif kwd.get("space_to_tab", False): tool_input["files_0|space_to_tab"] = "Yes" if "file_name" in kwd: tool_input["files_0|NAME"] = kwd["file_name"] tool_input["files_0|type"] = "upload_dataset" payload["inputs"] = tool_input payload["__files"] = {} return payload def _fetch_payload(history_id, file_type=DEFAULT_FILE_TYPE, dbkey=DEFAULT_DBKEY, **kwd): element = { "ext": file_type, "dbkey": dbkey, } for arg in ["to_posix_lines", "space_to_tab"]: if arg in kwd: element[arg] = kwd[arg] if "file_name" in kwd: element["name"] = kwd["file_name"] if "decompress" in kwd: element["auto_decompress"] = kwd["decompress"] target = { "destination": {"type": "hdas"}, "elements": [element], "auto_decompress": False, } targets = [target] payload = {"history_id": history_id, "targets": targets, "__files": {}} return payload