Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.client.staging

"""Client for staging inputs for Galaxy Tools and Workflows.

Implement as a connector to serve a bridge between galactic_job_json
utility and a Galaxy API library.
"""

import abc
import json
import logging
import os
from typing import (
    Any,
    BinaryIO,
    Dict,
    List,
    Optional,
    Tuple,
    TYPE_CHECKING,
)

import yaml
from typing_extensions import Literal

from galaxy.tool_util.cwl.util import (
    DirectoryUploadTarget,
    FileLiteralTarget,
    FileUploadTarget,
    galactic_job_json,
    ObjectUploadTarget,
    path_or_uri_to_uri,
    UploadTarget,
)

if TYPE_CHECKING:
    from galaxy_test.base.api import ApiTestInteractor

log = logging.getLogger(__name__)

UPLOAD_TOOL_ID = "upload1"
LOAD_TOOLS_FROM_PATH = True
DEFAULT_USE_FETCH_API = True
DEFAULT_FILE_TYPE = "auto"
DEFAULT_DBKEY = "?"
DEFAULT_DECOMPRESS = False


[docs]class StagingInterface(metaclass=abc.ABCMeta): """Client that parses a job input and populates files into the Galaxy API. Abstract class that must override _post (and optionally other things such _attach_file, _log, etc..) to adapt to bioblend (for Planemo) or using the tool test interactor infrastructure. """ @abc.abstractmethod def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Make a post to the Galaxy API along supplied path.""" def _attach_file(self, path: str) -> BinaryIO: return open(path, "rb") def _tools_post(self, payload: Dict[str, Any]) -> Dict[str, Any]: tool_response = self._post("tools", payload) for job in tool_response.get("jobs", []): self._handle_job(job) return tool_response def _fetch_post(self, payload: Dict[str, Any]) -> Dict[str, Any]: tool_response = self._post("tools/fetch", payload) for job in tool_response.get("jobs", []): self._handle_job(job) return tool_response @abc.abstractmethod def _handle_job(self, job_response: Dict[str, Any]): """Implementer can decide if to wait for job(s) individually or not here."""
[docs] def stage( self, tool_or_workflow: Literal["tool", "workflow"], history_id: str, job: Optional[Dict[str, Any]] = None, job_path: Optional[str] = None, use_path_paste: bool = LOAD_TOOLS_FROM_PATH, to_posix_lines: bool = True, job_dir: str = ".", ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: def upload_func_fetch(upload_target: UploadTarget) -> Dict[str, Any]: def _attach_file(upload_payload: Dict[str, Any], uri: str, index: int = 0) -> Dict[str, str]: uri = path_or_uri_to_uri(uri) is_path = uri.startswith("file://") if not is_path or use_path_paste: return {"src": "url", "url": uri} else: path = uri[len("file://") :] upload_payload["__files"][f"files_{index}|file_data"] = self._attach_file(path) return {"src": "files"} fetch_payload = None if isinstance(upload_target, FileUploadTarget): file_path = upload_target.path file_type = upload_target.properties.get("filetype", None) or DEFAULT_FILE_TYPE dbkey = upload_target.properties.get("dbkey", None) or DEFAULT_DBKEY fetch_payload = _fetch_payload( history_id, file_type=file_type, dbkey=dbkey, to_posix_lines=to_posix_lines, decompress=upload_target.properties.get("decompress") or DEFAULT_DECOMPRESS, ) name = _file_path_to_name(file_path) if file_path is not None: src = _attach_file(fetch_payload, file_path) fetch_payload["targets"][0]["elements"][0].update(src) if upload_target.composite_data: composite_items = [] for i, composite_data in enumerate(upload_target.composite_data): composite_item_src = _attach_file(fetch_payload, composite_data, index=i) composite_items.append(composite_item_src) fetch_payload["targets"][0]["elements"][0]["src"] = "composite" fetch_payload["targets"][0]["elements"][0]["composite"] = { "items": composite_items, } tags = upload_target.properties.get("tags") if tags: fetch_payload["targets"][0]["elements"][0]["tags"] = tags fetch_payload["targets"][0]["elements"][0]["name"] = name elif isinstance(upload_target, FileLiteralTarget): fetch_payload = _fetch_payload(history_id) # For file literals - take them as is - never convert line endings. fetch_payload["targets"][0]["elements"][0].update( { "src": "pasted", "paste_content": upload_target.contents, "to_posix_lines": False, } ) tags = upload_target.properties.get("tags") if tags: fetch_payload["targets"][0]["elements"][0]["tags"] = tags elif isinstance(upload_target, DirectoryUploadTarget): fetch_payload = _fetch_payload(history_id, file_type="directory") fetch_payload["targets"][0].pop("elements") tar_path = upload_target.tar_path src = _attach_file(fetch_payload, tar_path) fetch_payload["targets"][0]["elements_from"] = src elif isinstance(upload_target, ObjectUploadTarget): content = json.dumps(upload_target.object) fetch_payload = _fetch_payload(history_id, file_type="expression.json") fetch_payload["targets"][0]["elements"][0].update( { "src": "pasted", "paste_content": content, } ) tags = upload_target.properties.get("tags") if tags: fetch_payload["targets"][0]["elements"][0]["tags"] = tags else: raise ValueError(f"Unsupported type for upload_target: {type(upload_target)}") return self._fetch_post(fetch_payload) # Save legacy upload_func to target older Galaxy servers def upload_func(upload_target: UploadTarget) -> Dict[str, Any]: def _attach_file(upload_payload: Dict[str, Any], uri: str, index: int = 0) -> None: uri = path_or_uri_to_uri(uri) is_path = uri.startswith("file://") if not is_path or use_path_paste: upload_payload["inputs"]["files_%d|url_paste" % index] = uri else: path = uri[len("file://") :] upload_payload["__files"]["files_%d|file_data" % index] = self._attach_file(path) if isinstance(upload_target, FileUploadTarget): file_path = upload_target.path file_type = upload_target.properties.get("filetype", None) or DEFAULT_FILE_TYPE dbkey = upload_target.properties.get("dbkey", None) or DEFAULT_DBKEY upload_payload = _upload_payload( history_id, file_type=file_type, to_posix_lines=dbkey, ) name = _file_path_to_name(file_path) upload_payload["inputs"]["files_0|auto_decompress"] = False upload_payload["inputs"]["auto_decompress"] = False if file_path is not None: _attach_file(upload_payload, file_path) upload_payload["inputs"]["files_0|NAME"] = name if upload_target.secondary_files: _attach_file(upload_payload, upload_target.secondary_files, index=1) upload_payload["inputs"]["files_1|type"] = "upload_dataset" upload_payload["inputs"]["files_1|auto_decompress"] = True upload_payload["inputs"]["file_count"] = "2" upload_payload["inputs"]["force_composite"] = "True" # galaxy.exceptions.RequestParameterInvalidException: Not input source type # defined for input '{'class': 'File', 'filetype': 'imzml', 'composite_data': # ['Example_Continuous.imzML', 'Example_Continuous.ibd']}'.\n"}]] if upload_target.composite_data: for i, composite_data in enumerate(upload_target.composite_data): upload_payload["inputs"][f"files_{i}|type"] = "upload_dataset" _attach_file(upload_payload, composite_data, index=i) self._log(f"upload_payload is {upload_payload}") return self._tools_post(upload_payload) elif isinstance(upload_target, FileLiteralTarget): # For file literals - take them as is - never convert line endings. payload = _upload_payload(history_id, file_type="auto", auto_decompress=False, to_posix_lines=False) payload["inputs"]["files_0|url_paste"] = upload_target.contents return self._tools_post(payload) elif isinstance(upload_target, DirectoryUploadTarget): tar_path = upload_target.tar_path upload_payload = _upload_payload( history_id, file_type="tar", ) upload_payload["inputs"]["files_0|auto_decompress"] = False _attach_file(upload_payload, tar_path) tar_upload_response = self._tools_post(upload_payload) convert_payload = dict( tool_id="CONVERTER_tar_to_directory", tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}}, history_id=history_id, ) convert_response = self._tools_post(convert_payload) assert "outputs" in convert_response, convert_response return convert_response elif isinstance(upload_target, ObjectUploadTarget): content = json.dumps(upload_target.object) payload = _upload_payload(history_id, file_type="expression.json") payload["files_0|url_paste"] = content return self._tools_post(payload) else: raise ValueError(f"Unsupported type for upload_target: {type(upload_target)}") def create_collection_func(element_identifiers: List[Dict[str, Any]], collection_type: str) -> Dict[str, Any]: payload = { "name": "dataset collection", "instance_type": "history", "history_id": history_id, "element_identifiers": element_identifiers, "collection_type": collection_type, "fields": None if collection_type != "record" else "auto", } return self._post("dataset_collections", payload) if job_path is not None: assert job is None with open(job_path) as f: job = yaml.safe_load(f) job_dir = os.path.dirname(os.path.abspath(job_path)) else: assert job is not None if self.use_fetch_api: upload = upload_func_fetch else: upload = upload_func return galactic_job_json( job, job_dir, upload, create_collection_func, tool_or_workflow, )
# extension point for planemo to override logging def _log(self, message): log.debug(message) @property @abc.abstractmethod def use_fetch_api(self): """Return true is this should use (modern) data fetch API."""
[docs]class InteractorStaging(StagingInterface):
[docs] def __init__(self, galaxy_interactor: "ApiTestInteractor", use_fetch_api: bool = DEFAULT_USE_FETCH_API) -> None: self.galaxy_interactor = galaxy_interactor self._use_fetch_api = use_fetch_api
def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]: response = self.galaxy_interactor._post(api_path, payload, json=True) assert response.status_code == 200, response.text return response.json() def _handle_job(self, job_response: Dict[str, Any]): self.galaxy_interactor.wait_for_job(job_response["id"]) @property def use_fetch_api(self): return self._use_fetch_api
def _file_path_to_name(file_path: Optional[str]) -> str: if file_path is not None: name = os.path.basename(file_path) else: name = "defaultname" return name def _upload_payload( history_id: str, file_type: str = DEFAULT_FILE_TYPE, dbkey: str = DEFAULT_DBKEY, **kwd ) -> Dict[str, Any]: """Adapted from BioBlend tools client.""" payload: Dict[str, Any] = {} payload["history_id"] = history_id payload["tool_id"] = UPLOAD_TOOL_ID tool_input: Dict[str, Any] = {} tool_input["file_type"] = file_type tool_input["dbkey"] = dbkey if not kwd.get("to_posix_lines", True): tool_input["files_0|to_posix_lines"] = False elif kwd.get("space_to_tab", False): tool_input["files_0|space_to_tab"] = "Yes" if "file_name" in kwd: tool_input["files_0|NAME"] = kwd["file_name"] tool_input["files_0|type"] = "upload_dataset" payload["inputs"] = tool_input payload["__files"] = {} return payload def _fetch_payload(history_id, file_type=DEFAULT_FILE_TYPE, dbkey=DEFAULT_DBKEY, **kwd): element = { "ext": file_type, "dbkey": dbkey, } for arg in ["to_posix_lines", "space_to_tab"]: if arg in kwd: element[arg] = kwd[arg] if "file_name" in kwd: element["name"] = kwd["file_name"] if "decompress" in kwd: element["auto_decompress"] = kwd["decompress"] target = { "destination": {"type": "hdas"}, "elements": [element], "auto_decompress": False, } targets = [target] payload = {"history_id": history_id, "targets": targets, "__files": {}} return payload