Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.client.staging
"""Client for staging inputs for Galaxy Tools and Workflows.
Implement as a connector to serve a bridge between galactic_job_json
utility and a Galaxy API library.
"""
import abc
import json
import logging
import os
import yaml
from galaxy.tool_util.cwl.util import (
DirectoryUploadTarget,
FileLiteralTarget,
FileUploadTarget,
galactic_job_json,
path_or_uri_to_uri,
)
log = logging.getLogger(__name__)
UPLOAD_TOOL_ID = "upload1"
LOAD_TOOLS_FROM_PATH = True
DEFAULT_USE_FETCH_API = True
DEFAULT_FILE_TYPE = "auto"
DEFAULT_DBKEY = "?"
[docs]class StagingInterace(metaclass=abc.ABCMeta):
"""Client that parses a job input and populates files into the Galaxy API.
Abstract class that must override _post (and optionally other things such
_attach_file, _log, etc..) to adapt to bioblend (for Planemo) or using the
tool test interactor infrastructure.
"""
@abc.abstractmethod
def _post(self, api_path, payload, files_attached=False):
"""Make a post to the Galaxy API along supplied path."""
def _attach_file(self, path):
return open(path, 'rb')
def _tools_post(self, payload, files_attached=False):
tool_response = self._post("tools", payload, files_attached=files_attached)
for job in tool_response.get("jobs", []):
self._handle_job(job)
return tool_response
def _fetch_post(self, payload, files_attached=False):
tool_response = self._post("tools/fetch", payload, files_attached=files_attached)
for job in tool_response.get("jobs", []):
self._handle_job(job)
return tool_response
def _handle_job(self, job_response):
"""Implementer can decide if to wait for job(s) individually or not here."""
[docs] def stage(self, tool_or_workflow, history_id, job=None, job_path=None, use_path_paste=LOAD_TOOLS_FROM_PATH, to_posix_lines=True):
files_attached = [False]
def upload_func_fetch(upload_target):
def _attach_file(upload_payload, uri, index=0):
uri = path_or_uri_to_uri(uri)
is_path = uri.startswith("file://")
if not is_path or use_path_paste:
return {"src": "url", "url": uri}
else:
files_attached[0] = True
path = uri[len("file://"):]
upload_payload["__files"]["files_%s|file_data" % index] = self._attach_file(path)
return {"src": "files"}
fetch_payload = None
if isinstance(upload_target, FileUploadTarget):
file_path = upload_target.path
file_type = upload_target.properties.get('filetype', None) or DEFAULT_FILE_TYPE
dbkey = upload_target.properties.get('dbkey', None) or DEFAULT_DBKEY
fetch_payload = _fetch_payload(
history_id,
file_type=file_type,
dbkey=dbkey,
to_posix_lines=to_posix_lines,
)
name = _file_path_to_name(file_path)
if file_path is not None:
src = _attach_file(fetch_payload, file_path)
fetch_payload["targets"][0]["elements"][0].update(src)
if upload_target.composite_data:
composite_items = []
for i, composite_data in enumerate(upload_target.composite_data):
composite_item_src = _attach_file(fetch_payload, composite_data, index=i)
composite_items.append(composite_item_src)
fetch_payload["targets"][0]["elements"][0]["src"] = "composite"
fetch_payload["targets"][0]["elements"][0]["composite"] = {
"items": composite_items,
}
tags = upload_target.properties.get("tags")
if tags:
fetch_payload["targets"][0]["elements"][0]["tags"] = tags
fetch_payload["targets"][0]["elements"][0]["name"] = name
elif isinstance(upload_target, FileLiteralTarget):
fetch_payload = _fetch_payload(history_id)
# For file literals - take them as is - never convert line endings.
fetch_payload["targets"][0]["elements"][0].update({
"src": "pasted",
"paste_content": upload_target.contents,
"to_posix_lines": False,
})
tags = upload_target.properties.get("tags")
if tags:
fetch_payload["targets"][0]["elements"][0]["tags"] = tags
elif isinstance(upload_target, DirectoryUploadTarget):
fetch_payload = _fetch_payload(history_id, file_type="directory")
fetch_payload["targets"][0].pop("elements")
tar_path = upload_target.path
src = _attach_file(fetch_payload, tar_path)
fetch_payload["targets"][0]["elements_from"] = src
else:
content = json.dumps(upload_target.object)
fetch_payload = _fetch_payload(history_id, file_type="expression.json")
fetch_payload["targets"][0]["elements"][0].update({
"src": "pasted",
"paste_content": content,
})
tags = upload_target.properties.get("tags")
fetch_payload["targets"][0]["elements"][0]["tags"] = tags
return self._fetch_post(fetch_payload, files_attached=files_attached[0])
# Save legacy upload_func to target older Galaxy servers
def upload_func(upload_target):
def _attach_file(upload_payload, uri, index=0):
uri = path_or_uri_to_uri(uri)
is_path = uri.startswith("file://")
if not is_path or use_path_paste:
upload_payload["inputs"]["files_%d|url_paste" % index] = uri
else:
files_attached[0] = True
path = uri[len("file://"):]
upload_payload["__files"]["files_%d|file_data" % index] = self._attach_file(path)
if isinstance(upload_target, FileUploadTarget):
file_path = upload_target.path
file_type = upload_target.properties.get('filetype', None) or DEFAULT_FILE_TYPE
dbkey = upload_target.properties.get('dbkey', None) or DEFAULT_DBKEY
upload_payload = _upload_payload(
history_id,
file_type=file_type,
to_posix_lines=dbkey,
)
name = _file_path_to_name(file_path)
upload_payload["inputs"]["files_0|auto_decompress"] = False
upload_payload["inputs"]["auto_decompress"] = False
if file_path is not None:
_attach_file(upload_payload, file_path)
upload_payload["inputs"]["files_0|NAME"] = name
if upload_target.secondary_files:
_attach_file(upload_payload, upload_target.secondary_files, index=1)
upload_payload["inputs"]["files_1|type"] = "upload_dataset"
upload_payload["inputs"]["files_1|auto_decompress"] = True
upload_payload["inputs"]["file_count"] = "2"
upload_payload["inputs"]["force_composite"] = "True"
# galaxy.exceptions.RequestParameterInvalidException: Not input source type
# defined for input '{'class': 'File', 'filetype': 'imzml', 'composite_data':
# ['Example_Continuous.imzML', 'Example_Continuous.ibd']}'.\n"}]]
if upload_target.composite_data:
for i, composite_data in enumerate(upload_target.composite_data):
upload_payload["inputs"]["files_%s|type" % i] = "upload_dataset"
_attach_file(upload_payload, composite_data, index=i)
self._log("upload_payload is %s" % upload_payload)
return self._tools_post(upload_payload, files_attached=files_attached[0])
elif isinstance(upload_target, FileLiteralTarget):
# For file literals - take them as is - never convert line endings.
payload = _upload_payload(history_id, file_type="auto", auto_decompress=False, to_posix_lines=False)
payload["inputs"]["files_0|url_paste"] = upload_target.contents
return self._tools_post(payload)
elif isinstance(upload_target, DirectoryUploadTarget):
tar_path = upload_target.tar_path
upload_payload = _upload_payload(
history_id,
file_type="tar",
)
upload_payload["inputs"]["files_0|auto_decompress"] = False
_attach_file(upload_payload, tar_path)
tar_upload_response = self._tools_post(upload_payload, files_attached=files_attached[0])
convert_payload = dict(
tool_id="CONVERTER_tar_to_directory",
tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}},
history_id=history_id,
)
convert_response = self._tools_post(convert_payload)
assert "outputs" in convert_response, convert_response
return convert_response
else:
content = json.dumps(upload_target.object)
payload = _upload_payload(history_id, file_type="expression.json")
payload["files_0|url_paste"] = content
return self._tools_post(payload)
def create_collection_func(element_identifiers, collection_type):
payload = {
"name": "dataset collection",
"instance_type": "history",
"history_id": history_id,
"element_identifiers": element_identifiers,
"collection_type": collection_type,
"fields": None if collection_type != "record" else "auto",
}
return self._post("dataset_collections", payload)
if job_path is not None:
assert job is None
with open(job_path) as f:
job = yaml.safe_load(f)
job_dir = os.path.dirname(os.path.abspath(job_path))
else:
assert job is not None
# Figure out what "." should be here instead.
job_dir = "."
if self.use_fetch_api:
upload = upload_func_fetch
else:
upload = upload_func
job_dict, datasets = galactic_job_json(
job,
job_dir,
upload,
create_collection_func,
tool_or_workflow,
)
return job_dict, datasets
# extension point for planemo to override logging
def _log(self, message):
log.debug(message)
@abc.abstractproperty
def use_fetch_api(self):
"""Return true is this should use (modern) data fetch API."""
[docs]class InteractorStaging(StagingInterace):
[docs] def __init__(self, galaxy_interactor, use_fetch_api=DEFAULT_USE_FETCH_API):
self.galaxy_interactor = galaxy_interactor
self._use_fetch_api = use_fetch_api
def _post(self, api_path, payload, files_attached=False):
response = self.galaxy_interactor._post(api_path, payload, json=True)
assert response.status_code == 200, response.text
return response.json()
def _handle_job(self, job_response):
self.galaxy_interactor.wait_for_job(job_response["id"])
@property
def use_fetch_api(self):
return self._use_fetch_api
def _file_path_to_name(file_path):
if file_path is not None:
name = os.path.basename(file_path)
else:
name = "defaultname"
return name
def _upload_payload(history_id, tool_id=UPLOAD_TOOL_ID, file_type=DEFAULT_FILE_TYPE, dbkey=DEFAULT_DBKEY, **kwd):
"""Adapted from bioblend tools client."""
payload = {}
payload["history_id"] = history_id
payload["tool_id"] = tool_id
tool_input = {}
tool_input["file_type"] = file_type
tool_input["dbkey"] = dbkey
if not kwd.get('to_posix_lines', True):
tool_input['files_0|to_posix_lines'] = False
elif kwd.get('space_to_tab', False):
tool_input['files_0|space_to_tab'] = 'Yes'
if 'file_name' in kwd:
tool_input["files_0|NAME"] = kwd['file_name']
tool_input["files_0|type"] = "upload_dataset"
payload["inputs"] = tool_input
payload["__files"] = {}
return payload
def _fetch_payload(history_id, file_type=DEFAULT_FILE_TYPE, dbkey=DEFAULT_DBKEY, **kwd):
element = {
"ext": file_type,
"dbkey": dbkey,
}
for arg in ['to_posix_lines', 'space_to_tab']:
if arg in kwd:
element[arg] = kwd[arg]
if 'file_name' in kwd:
element['name'] = kwd['file_name']
target = {
"destination": {"type": "hdas"},
"elements": [element],
"auto_decompress": False,
}
targets = [target]
payload = {
"history_id": history_id,
"targets": targets,
"__files": {}
}
return payload