Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.base.populators
import contextlib
import json
import os
import random
import string
import unittest
from collections import namedtuple
from functools import wraps
from operator import itemgetter
try:
from nose.tools import nottest
except ImportError:
def nottest(x):
return x
import requests
import yaml
from gxformat2 import (
convert_and_import_workflow,
ImporterGalaxyInterface,
)
from gxformat2._yaml import ordered_load
from pkg_resources import resource_string
from six import StringIO
from galaxy.tool_util.client.staging import InteractorStaging
from galaxy.tool_util.verify.test_data import TestDataResolver
from galaxy.tool_util.verify.wait import (
TimeoutAssertionError,
wait_on as tool_util_wait_on,
)
from galaxy.util import unicodify
from . import api_asserts
# Simple workflow that takes an input and call cat wrapper on it.
workflow_str = unicodify(resource_string(__name__, "data/test_workflow_1.ga"))
# Simple workflow that takes an input and filters with random lines twice in a
# row - first grabbing 8 lines at random and then 6.
workflow_random_x2_str = unicodify(resource_string(__name__, "data/test_workflow_2.ga"))
DEFAULT_TIMEOUT = 60 # Secs to wait for state to turn ok
SKIP_FLAKEY_TESTS_ON_ERROR = os.environ.get("GALAXY_TEST_SKIP_FLAKEY_TESTS_ON_ERROR", None)
[docs]def flakey(method):
@wraps(method)
def wrapped_method(test_case, *args, **kwargs):
try:
method(test_case, *args, **kwargs)
except unittest.SkipTest:
raise
except Exception:
if SKIP_FLAKEY_TESTS_ON_ERROR:
raise unittest.SkipTest("Error encountered during test marked as @flakey.")
else:
raise
return wrapped_method
[docs]def skip_without_tool(tool_id):
"""Decorate an API test method as requiring a specific tool.
Have test framework skip the test case if the tool is unavailable.
"""
def method_wrapper(method):
def get_tool_ids(api_test_case):
index = api_test_case.galaxy_interactor.get("tools", data=dict(in_panel=False))
tools = index.json()
# In panels by default, so flatten out sections...
tool_ids = [itemgetter("id")(_) for _ in tools]
return tool_ids
@wraps(method)
def wrapped_method(api_test_case, *args, **kwargs):
_raise_skip_if(tool_id not in get_tool_ids(api_test_case))
return method(api_test_case, *args, **kwargs)
return wrapped_method
return method_wrapper
[docs]def skip_without_datatype(extension):
"""Decorate an API test method as requiring a specific datatype.
Have test framework skip the test case if the datatype is unavailable.
"""
def has_datatype(api_test_case):
index_response = api_test_case.galaxy_interactor.get("datatypes")
assert index_response.status_code == 200, "Failed to fetch datatypes for target Galaxy."
datatypes = index_response.json()
assert isinstance(datatypes, list)
return extension in datatypes
def method_wrapper(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwargs):
_raise_skip_if(not has_datatype(api_test_case))
method(api_test_case, *args, **kwargs)
return wrapped_method
return method_wrapper
[docs]def skip_if_site_down(url):
def site_down():
try:
response = requests.get(url, timeout=10)
return response.status_code != 200
except Exception:
return False
def method_wrapper(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwargs):
_raise_skip_if(site_down(), "Test depends on [%s] being up and it appears to be down." % url)
method(api_test_case, *args, **kwargs)
return wrapped_method
return method_wrapper
skip_if_toolshed_down = skip_if_site_down("https://toolshed.g2.bx.psu.edu")
skip_if_github_down = skip_if_site_down("https://github.com/")
[docs]def summarize_instance_history_on_error(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwds):
try:
method(api_test_case, *args, **kwds)
except Exception:
api_test_case.dataset_populator._summarize_history(api_test_case.history_id)
raise
return wrapped_method
[docs]@nottest
def uses_test_history(**test_history_kwd):
"""Can override require_new and cancel_executions using kwds to decorator.
"""
def method_wrapper(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwds):
with api_test_case.dataset_populator.test_history(**test_history_kwd) as history_id:
method(api_test_case, history_id, *args, **kwds)
return wrapped_method
return method_wrapper
def _raise_skip_if(check, *args):
if check:
from nose.plugins.skip import SkipTest
raise SkipTest(*args)
# Deprecated mixin, use dataset populator instead.
# TODO: Rework existing tests to target DatasetPopulator in a setup method instead.
[docs]class TestsDatasets:
def _new_dataset(self, history_id, content='TestData123', **kwds):
return DatasetPopulator(self.galaxy_interactor).new_dataset(history_id, content=content, **kwds)
def _wait_for_history(self, history_id, assert_ok=False):
return DatasetPopulator(self.galaxy_interactor).wait_for_history(history_id, assert_ok=assert_ok)
def _new_history(self, **kwds):
return DatasetPopulator(self.galaxy_interactor).new_history(**kwds)
def _upload_payload(self, history_id, content, **kwds):
return DatasetPopulator(self.galaxy_interactor).upload_payload(history_id, content, **kwds)
def _run_tool_payload(self, tool_id, inputs, history_id, **kwds):
return DatasetPopulator(self.galaxy_interactor).run_tool_payload(tool_id, inputs, history_id, **kwds)
[docs]class BaseDatasetPopulator:
""" Abstract description of API operations optimized for testing
Galaxy - implementations must implement _get, _post and _delete.
"""
[docs] def new_dataset(self, history_id, content=None, wait=False, **kwds):
run_response = self.new_dataset_request(history_id, content=content, wait=wait, **kwds)
assert run_response.status_code == 200, "Failed to create new dataset with response: %s" % run_response.content
return run_response.json()["outputs"][0]
[docs] def new_dataset_request(self, history_id, content=None, wait=False, **kwds):
if content is None and "ftp_files" not in kwds:
content = "TestData123"
payload = self.upload_payload(history_id, content=content, **kwds)
run_response = self.tools_post(payload)
if wait:
self.wait_for_tool_run(history_id, run_response, assert_ok=kwds.get('assert_ok', True))
return run_response
[docs] def fetch(self, payload, assert_ok=True, timeout=DEFAULT_TIMEOUT, wait=None):
tool_response = self._post("tools/fetch", data=payload)
if wait is None:
wait = assert_ok
if wait:
job = self.check_run(tool_response)
self.wait_for_job(job["id"], timeout=timeout)
if assert_ok:
job = tool_response.json()["jobs"][0]
details = self.get_job_details(job["id"]).json()
assert details["state"] == "ok", details
return tool_response
[docs] def wait_for_tool_run(self, history_id, run_response, timeout=DEFAULT_TIMEOUT, assert_ok=True):
job = self.check_run(run_response)
self.wait_for_job(job["id"], timeout=timeout)
self.wait_for_history(history_id, assert_ok=assert_ok, timeout=timeout)
return run_response
[docs] def check_run(self, run_response):
run = run_response.json()
assert run_response.status_code == 200, run
job = run["jobs"][0]
return job
[docs] def wait_for_history(self, history_id, assert_ok=False, timeout=DEFAULT_TIMEOUT):
try:
return wait_on_state(lambda: self._get("histories/%s" % history_id), desc="history state", assert_ok=assert_ok, timeout=timeout)
except AssertionError:
self._summarize_history(history_id)
raise
[docs] def wait_for_history_jobs(self, history_id, assert_ok=False, timeout=DEFAULT_TIMEOUT):
def has_active_jobs():
active_jobs = self.active_history_jobs(history_id)
if len(active_jobs) == 0:
return True
else:
return None
try:
wait_on(has_active_jobs, "active jobs", timeout=timeout)
except TimeoutAssertionError as e:
jobs = self.history_jobs(history_id)
message = "Failed waiting on active jobs to complete, current jobs are [{}]. {}".format(jobs, e)
raise TimeoutAssertionError(message)
if assert_ok:
return self.wait_for_history(history_id, assert_ok=True, timeout=timeout)
[docs] def wait_for_job(self, job_id, assert_ok=False, timeout=DEFAULT_TIMEOUT):
return wait_on_state(lambda: self.get_job_details(job_id), desc="job state", assert_ok=assert_ok, timeout=timeout)
[docs] def get_job_details(self, job_id, full=False):
return self._get("jobs/{}?full={}".format(job_id, full))
[docs] def cancel_history_jobs(self, history_id, wait=True):
active_jobs = self.active_history_jobs(history_id)
for active_job in active_jobs:
self.cancel_job(active_job["id"])
[docs] def history_jobs(self, history_id):
query_params = {"history_id": history_id, "order_by": "create_time"}
jobs_response = self._get("jobs", query_params)
assert jobs_response.status_code == 200
return jobs_response.json()
[docs] def active_history_jobs(self, history_id):
all_history_jobs = self.history_jobs(history_id)
active_jobs = [j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]]
return active_jobs
[docs] def delete_dataset(self, history_id, content_id):
delete_response = self._delete("histories/{}/contents/{}".format(history_id, content_id))
return delete_response
[docs] def create_tool_from_path(self, tool_path):
tool_directory = os.path.dirname(os.path.abspath(tool_path))
payload = dict(
src="from_path",
path=tool_path,
tool_directory=tool_directory,
)
return self._create_tool_raw(payload)
[docs] def create_tool(self, representation, tool_directory=None):
if isinstance(representation, dict):
representation = json.dumps(representation)
payload = dict(
representation=representation,
tool_directory=tool_directory,
)
return self._create_tool_raw(payload)
def _create_tool_raw(self, payload):
try:
create_response = self._post("dynamic_tools", data=payload, admin=True)
except TypeError:
create_response = self._post("dynamic_tools", data=payload)
assert create_response.status_code == 200, create_response.text
return create_response.json()
[docs] def list_dynamic_tools(self):
list_response = self._get("dynamic_tools", admin=True)
assert list_response.status_code == 200, list_response
return list_response.json()
[docs] def show_dynamic_tool(self, uuid):
show_response = self._get("dynamic_tools/%s" % uuid, admin=True)
assert show_response.status_code == 200, show_response
return show_response.json()
[docs] def deactivate_dynamic_tool(self, uuid):
delete_response = self._delete("dynamic_tools/%s" % uuid, admin=True)
return delete_response.json()
def _summarize_history(self, history_id):
pass
[docs] @contextlib.contextmanager
def test_history(self, **kwds):
cleanup = "GALAXY_TEST_NO_CLEANUP" not in os.environ
def wrap_up():
cancel_executions = kwds.get("cancel_executions", True)
if cleanup and cancel_executions:
self.cancel_history_jobs(history_id)
require_new = kwds.get("require_new", True)
try:
history_id = None
if not require_new:
history_id = kwds.get("GALAXY_TEST_HISTORY_ID", None)
history_id = history_id or self.new_history()
yield history_id
wrap_up()
except Exception:
self._summarize_history(history_id)
wrap_up()
raise
[docs] def new_history(self, **kwds):
name = kwds.get("name", "API Test History")
create_history_response = self._post("histories", data=dict(name=name))
assert "id" in create_history_response.json(), create_history_response.text
history_id = create_history_response.json()["id"]
return history_id
[docs] def upload_payload(self, history_id, content=None, **kwds):
name = kwds.get("name", "Test_Dataset")
dbkey = kwds.get("dbkey", "?")
file_type = kwds.get("file_type", 'txt')
upload_params = {
'files_0|NAME': name,
'dbkey': dbkey,
'file_type': file_type,
}
if dbkey is None:
del upload_params["dbkey"]
if content is None:
upload_params["files_0|ftp_files"] = kwds.get("ftp_files")
elif hasattr(content, 'read'):
upload_params["files_0|file_data"] = content
else:
upload_params['files_0|url_paste'] = content
if "to_posix_lines" in kwds:
upload_params["files_0|to_posix_lines"] = kwds["to_posix_lines"]
if "space_to_tab" in kwds:
upload_params["files_0|space_to_tab"] = kwds["space_to_tab"]
if "auto_decompress" in kwds:
upload_params["files_0|auto_decompress"] = kwds["auto_decompress"]
upload_params.update(kwds.get("extra_inputs", {}))
return self.run_tool_payload(
tool_id='upload1',
inputs=upload_params,
history_id=history_id,
upload_type='upload_dataset'
)
[docs] def get_remote_files(self, target="ftp"):
return self._get("remote_files", data={"target": target}).json()
[docs] def run_tool_payload(self, tool_id, inputs, history_id, **kwds):
# Remove files_%d|file_data parameters from inputs dict and attach
# as __files dictionary.
for key, value in list(inputs.items()):
if key.startswith("files_") and key.endswith("|file_data"):
if "__files" not in kwds:
kwds["__files"] = {}
kwds["__files"][key] = value
del inputs[key]
return dict(
tool_id=tool_id,
inputs=json.dumps(inputs),
history_id=history_id,
**kwds
)
[docs] def run_tool(self, tool_id, inputs, history_id, assert_ok=True, **kwds):
payload = self.run_tool_payload(tool_id, inputs, history_id, **kwds)
tool_response = self.tools_post(payload)
if assert_ok:
api_asserts.assert_status_code_is(tool_response, 200)
return tool_response.json()
else:
return tool_response
[docs] def tools_post(self, payload, url="tools"):
tool_response = self._post(url, data=payload)
return tool_response
[docs] def get_history_dataset_content(self, history_id, wait=True, filename=None, type='text', raw=False, **kwds):
dataset_id = self.__history_content_id(history_id, wait=wait, **kwds)
data = {}
if filename:
data["filename"] = filename
if raw:
data['raw'] = True
display_response = self._get_contents_request(history_id, "/%s/display" % dataset_id, data=data)
assert display_response.status_code == 200, display_response.text
if type == 'text':
return display_response.text
else:
return display_response.content
[docs] def get_history_dataset_details(self, history_id, **kwds):
dataset_id = self.__history_content_id(history_id, **kwds)
details_response = self._get_contents_request(history_id, "/datasets/%s" % dataset_id)
assert details_response.status_code == 200
return details_response.json()
[docs] def get_history_dataset_extra_files(self, history_id, **kwds):
dataset_id = self.__history_content_id(history_id, **kwds)
details_response = self._get_contents_request(history_id, "/%s/extra_files" % dataset_id)
assert details_response.status_code == 200, details_response.content
return details_response.json()
[docs] def get_history_collection_details(self, history_id, **kwds):
hdca_id = self.__history_content_id(history_id, **kwds)
details_response = self._get_contents_request(history_id, "/dataset_collections/%s" % hdca_id)
assert details_response.status_code == 200, details_response.content
return details_response.json()
[docs] def run_collection_creates_list(self, history_id, hdca_id):
inputs = {
"input1": {"src": "hdca", "id": hdca_id},
}
self.wait_for_history(history_id, assert_ok=True)
return self.run_tool("collection_creates_list", inputs, history_id)
[docs] def run_exit_code_from_file(self, history_id, hdca_id):
exit_code_inputs = {
"input": {'batch': True, 'values': [{"src": "hdca", "id": hdca_id}]},
}
response = self.run_tool("exit_code_from_file", exit_code_inputs, history_id, assert_ok=False).json()
self.wait_for_history(history_id, assert_ok=False)
return response
def __history_content_id(self, history_id, wait=True, **kwds):
if wait:
assert_ok = kwds.get("assert_ok", True)
self.wait_for_history(history_id, assert_ok=assert_ok)
# kwds should contain a 'dataset' object response, a 'dataset_id' or
# the last dataset in the history will be fetched.
if "dataset_id" in kwds:
history_content_id = kwds["dataset_id"]
elif "content_id" in kwds:
history_content_id = kwds["content_id"]
elif "dataset" in kwds:
history_content_id = kwds["dataset"]["id"]
else:
hid = kwds.get("hid", None) # If not hid, just grab last dataset
history_contents = self._get_contents_request(history_id).json()
if hid:
history_content_id = None
for history_item in history_contents:
if history_item["hid"] == hid:
history_content_id = history_item["id"]
if history_content_id is None:
raise Exception("Could not find content with HID [{}] in [{}]".format(hid, history_contents))
else:
# No hid specified - just grab most recent element.
history_content_id = history_contents[-1]["id"]
return history_content_id
def _get_contents_request(self, history_id, suffix="", data=None):
if data is None:
data = {}
url = "histories/%s/contents" % history_id
if suffix:
url = "{}{}".format(url, suffix)
return self._get(url, data=data)
[docs] def ds_entry(self, history_content):
src = 'hda'
if 'history_content_type' in history_content and history_content['history_content_type'] == "dataset_collection":
src = 'hdca'
return dict(src=src, id=history_content["id"])
[docs] def get_roles(self):
roles_response = self.galaxy_interactor.get("roles", admin=True)
assert roles_response.status_code == 200
return roles_response.json()
[docs] def user_email(self):
users_response = self.galaxy_interactor.get("users")
users = users_response.json()
assert len(users) == 1
return users[0]["email"]
[docs] def user_id(self):
users_response = self.galaxy_interactor.get("users")
users = users_response.json()
assert len(users) == 1
return users[0]["id"]
[docs] def user_private_role_id(self):
user_email = self.user_email()
roles = self.get_roles()
users_roles = [r for r in roles if r["name"] == user_email]
assert len(users_roles) == 1
return users_roles[0]["id"]
[docs] def create_role(self, user_ids, description=None):
payload = {
"name": self.get_random_name(prefix="testpop"),
"description": description or "Test Role",
"user_ids": json.dumps(user_ids),
}
role_response = self.galaxy_interactor.post("roles", data=payload, admin=True)
assert role_response.status_code == 200
return role_response.json()[0]
[docs] def make_private(self, history_id, dataset_id):
role_id = self.user_private_role_id()
# Give manage permission to the user.
payload = {
"access": json.dumps([role_id]),
"manage": json.dumps([role_id]),
}
url = "histories/{}/contents/{}/permissions".format(history_id, dataset_id)
update_response = self.galaxy_interactor._put(url, payload, admin=True)
assert update_response.status_code == 200, update_response.content
return update_response.json()
[docs] def validate_dataset(self, history_id, dataset_id):
url = "histories/{}/contents/{}/validate".format(history_id, dataset_id)
update_response = self.galaxy_interactor._put(url, {})
assert update_response.status_code == 200, update_response.content
return update_response.json()
[docs] def validate_dataset_and_wait(self, history_id, dataset_id):
self.validate_dataset(history_id, dataset_id)
def validated():
metadata = self.get_history_dataset_details(history_id, dataset_id=dataset_id)
validated_state = metadata['validated_state']
if validated_state == 'unknown':
return
else:
return validated_state
return wait_on(
validated,
"dataset validation"
)
[docs] def export_url(self, history_id, data, check_download=True):
url = "histories/%s/exports" % history_id
put_response = self._put(url, data)
api_asserts.assert_status_code_is(put_response, 202)
def export_ready_response():
put_response = self._put(url)
if put_response.status_code == 202:
return None
return put_response
put_response = wait_on(export_ready_response, desc="export ready")
api_asserts.assert_status_code_is(put_response, 200)
response = put_response.json()
api_asserts.assert_has_keys(response, "download_url")
download_url = response["download_url"]
if check_download:
self.get_export_url(download_url)
return download_url
[docs] def get_export_url(self, export_url):
full_download_url = "{}?key={}".format(export_url, self._api_key)
download_response = self._get(full_download_url)
api_asserts.assert_status_code_is(download_response, 200)
return download_response
[docs] def import_history(self, import_data):
files = {}
archive_file = import_data.pop("archive_file", None)
if archive_file:
files["archive_file"] = archive_file
import_response = self._post("histories", data=import_data, files=files)
api_asserts.assert_status_code_is(import_response, 200)
[docs] def import_history_and_wait_for_name(self, import_data, history_name):
def history_names():
return {h["name"]: h for h in self.get_histories()}
import_name = "imported from archive: %s" % history_name
assert import_name not in history_names()
self.import_history(import_data)
def has_history_with_name():
histories = history_names()
return histories.get(import_name, None)
imported_history = wait_on(has_history_with_name, desc="import history")
imported_history_id = imported_history["id"]
self.wait_for_history(imported_history_id)
return imported_history_id
[docs] def rename_history(self, history_id, new_name):
update_url = "histories/%s" % history_id
put_response = self._put(update_url, {"name": new_name})
return put_response
[docs] def get_histories(self):
history_index_response = self._get("histories")
api_asserts.assert_status_code_is(history_index_response, 200)
return history_index_response.json()
[docs] def wait_on_history_length(self, history_id, wait_on_history_length):
def history_has_length():
history_length = self.history_length(history_id)
return None if history_length != wait_on_history_length else True
wait_on(history_has_length, desc="import history population")
[docs] def history_length(self, history_id):
contents_response = self._get("histories/%s/contents" % history_id)
api_asserts.assert_status_code_is(contents_response, 200)
contents = contents_response.json()
return len(contents)
[docs] def reimport_history(self, history_id, history_name, wait_on_history_length, export_kwds, url, api_key):
# Export the history.
download_path = self.export_url(history_id, export_kwds, check_download=True)
# Create download for history
full_download_url = "{}{}?key={}".format(url, download_path, api_key)
import_data = dict(archive_source=full_download_url, archive_type="url")
imported_history_id = self.import_history_and_wait_for_name(import_data, history_name)
if wait_on_history_length:
self.wait_on_history_length(imported_history_id, wait_on_history_length)
return imported_history_id
[docs] def get_random_name(self, prefix=None, suffix=None, len=10):
# stolen from navigates_galaxy.py
return '{}{}{}'.format(
prefix or '',
''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(len)),
suffix or '',
)
[docs]class DatasetPopulator(BaseDatasetPopulator):
@property
def _api_key(self):
return self.galaxy_interactor.api_key
def _post(self, route, data=None, files=None, admin=False):
return self.galaxy_interactor.post(route, data, files=files, admin=admin)
def _put(self, route, data=None):
return self.galaxy_interactor.put(route, data)
def _get(self, route, data=None, admin=False):
if data is None:
data = {}
return self.galaxy_interactor.get(route, data=data, admin=admin)
def _delete(self, route, data=None, admin=False):
if data is None:
data = {}
return self.galaxy_interactor.delete(route, data=data, admin=admin)
def _summarize_history(self, history_id):
self.galaxy_interactor._summarize_history(history_id)
[docs] def wait_for_dataset(self, history_id, dataset_id, assert_ok=False, timeout=DEFAULT_TIMEOUT):
return wait_on_state(lambda: self._get("histories/{}/contents/{}".format(history_id, dataset_id)), desc="dataset state", assert_ok=assert_ok, timeout=timeout)
[docs]class BaseWorkflowPopulator:
[docs] def load_workflow(self, name, content=workflow_str, add_pja=False):
workflow = json.loads(content)
workflow["name"] = name
if add_pja:
tool_step = workflow["steps"]["2"]
tool_step["post_job_actions"]["RenameDatasetActionout_file1"] = dict(
action_type="RenameDatasetAction",
output_name="out_file1",
action_arguments=dict(newname="foo ${replaceme}"),
)
return workflow
[docs] def load_random_x2_workflow(self, name):
return self.load_workflow(name, content=workflow_random_x2_str)
[docs] def load_workflow_from_resource(self, name, filename=None):
if filename is None:
filename = "data/%s.ga" % name
content = unicodify(resource_string(__name__, filename))
return self.load_workflow(name, content=content)
[docs] def simple_workflow(self, name, **create_kwds):
workflow = self.load_workflow(name)
return self.create_workflow(workflow, **create_kwds)
[docs] def import_workflow_from_path(self, from_path):
data = dict(
from_path=from_path
)
import_response = self._post("workflows", data=data)
api_asserts.assert_status_code_is(import_response, 200)
return import_response.json()["id"]
[docs] def create_workflow(self, workflow, **create_kwds):
upload_response = self.create_workflow_response(workflow, **create_kwds)
uploaded_workflow_id = upload_response.json()["id"]
return uploaded_workflow_id
[docs] def create_workflow_response(self, workflow, **create_kwds):
data = dict(
workflow=json.dumps(workflow),
**create_kwds
)
upload_response = self._post("workflows/upload", data=data)
return upload_response
[docs] def upload_yaml_workflow(self, has_yaml, **kwds):
round_trip_conversion = kwds.get("round_trip_format_conversion", False)
client_convert = kwds.pop("client_convert", not round_trip_conversion)
kwds["convert"] = client_convert
workflow = convert_and_import_workflow(has_yaml, galaxy_interface=self, **kwds)
workflow_id = workflow["id"]
if round_trip_conversion:
workflow_yaml_wrapped = self.download_workflow(workflow_id, style="format2_wrapped_yaml")
assert "yaml_content" in workflow_yaml_wrapped, workflow_yaml_wrapped
round_trip_converted_content = workflow_yaml_wrapped["yaml_content"]
workflow_id = self.upload_yaml_workflow(round_trip_converted_content, client_convert=False, round_trip_conversion=False)
return workflow_id
[docs] def wait_for_invocation(self, workflow_id, invocation_id, timeout=DEFAULT_TIMEOUT):
url = "workflows/{}/usage/{}".format(workflow_id, invocation_id)
return wait_on_state(lambda: self._get(url), desc="workflow invocation state", timeout=timeout)
[docs] def history_invocations(self, history_id):
history_invocations_response = self._get("invocations", {"history_id": history_id})
api_asserts.assert_status_code_is(history_invocations_response, 200)
return history_invocations_response.json()
[docs] def wait_for_history_workflows(self, history_id, assert_ok=True, timeout=DEFAULT_TIMEOUT, expected_invocation_count=None):
if expected_invocation_count is not None:
def invocation_count():
invocations = self.history_invocations(history_id)
if len(invocations) == expected_invocation_count:
return True
wait_on(invocation_count, "%s history invocations" % expected_invocation_count)
for invocation in self.history_invocations(history_id):
workflow_id = invocation["workflow_id"]
invocation_id = invocation["id"]
self.wait_for_workflow(workflow_id, invocation_id, history_id, timeout=timeout, assert_ok=assert_ok)
[docs] def wait_for_workflow(self, workflow_id, invocation_id, history_id, assert_ok=True, timeout=DEFAULT_TIMEOUT):
""" Wait for a workflow invocation to completely schedule and then history
to be complete. """
self.wait_for_invocation(workflow_id, invocation_id, timeout=timeout)
self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=assert_ok, timeout=timeout)
[docs] def get_invocation(self, invocation_id):
r = self._get("invocations/%s" % invocation_id)
r.raise_for_status()
return r.json()
[docs] def get_biocompute_object(self, invocation_id):
bco_response = self._get("invocations/%s/biocompute" % invocation_id)
bco_response.raise_for_status()
return bco_response.json()
[docs] def validate_biocompute_object(self, bco, expected_schema_version='https://w3id.org/ieee/ieee-2791-schema/2791object.json'):
# TODO: actually use jsonref and jsonschema to validate this someday
api_asserts.assert_has_keys(bco, "object_id", "spec_version", "etag", "provenance_domain", "usability_domain", "description_domain", "execution_domain", "parametric_domain", "io_domain", "error_domain")
assert bco['spec_version'] == expected_schema_version
api_asserts.assert_has_keys(bco['description_domain'], "keywords", "xref", "platform", "pipeline_steps")
api_asserts.assert_has_keys(bco['execution_domain'], "script_access_type", "script", "script_driver", "software_prerequisites", "external_data_endpoints", "environment_variables")
for p in bco['parametric_domain']:
api_asserts.assert_has_keys(p, "param", "value", "step")
api_asserts.assert_has_keys(bco['io_domain'], "input_subdomain", "output_subdomain")
[docs] def invoke_workflow_raw(self, workflow_id, request):
url = "workflows/%s/usage" % (workflow_id)
invocation_response = self._post(url, data=request)
return invocation_response
[docs] def invoke_workflow(self, history_id, workflow_id, inputs=None, request=None, assert_ok=True):
if inputs is None:
inputs = {}
if request is None:
request = {}
request["history"] = "hist_id=%s" % history_id,
if inputs:
request["inputs"] = json.dumps(inputs)
request["inputs_by"] = 'step_index'
invocation_response = self.invoke_workflow_raw(workflow_id, request)
if assert_ok:
api_asserts.assert_status_code_is(invocation_response, 200)
invocation_id = invocation_response.json()["id"]
return invocation_id
else:
return invocation_response
[docs] def workflow_report_json(self, workflow_id, invocation_id):
response = self._get("workflows/{}/invocations/{}/report".format(workflow_id, invocation_id))
api_asserts.assert_status_code_is(response, 200)
return response.json()
[docs] def download_workflow(self, workflow_id, style=None):
params = {}
if style is not None:
params["style"] = style
response = self._get("workflows/%s/download" % workflow_id, data=params)
api_asserts.assert_status_code_is(response, 200)
if style != "format2":
return response.json()
else:
return ordered_load(response.text)
[docs] def update_workflow(self, workflow_id, workflow_object):
data = dict(
workflow=workflow_object
)
raw_url = 'workflows/%s' % workflow_id
put_response = self.galaxy_interactor._put(raw_url, data=json.dumps(data))
return put_response
[docs] @contextlib.contextmanager
def export_for_update(self, workflow_id):
workflow_object = self.download_workflow(workflow_id)
yield workflow_object
self.update_workflow(workflow_id, workflow_object)
[docs] def run_workflow(self, has_workflow, test_data=None, history_id=None, wait=True, source_type=None, jobs_descriptions=None, expected_response=200, assert_ok=True, client_convert=None, round_trip_format_conversion=False, raw_yaml=False):
"""High-level wrapper around workflow API, etc. to invoke format 2 workflows."""
workflow_populator = self
if client_convert is None:
client_convert = not round_trip_format_conversion
workflow_id = workflow_populator.upload_yaml_workflow(has_workflow, source_type=source_type, client_convert=client_convert, round_trip_format_conversion=round_trip_format_conversion, raw_yaml=raw_yaml)
if test_data is None:
if jobs_descriptions is None:
assert source_type != "path"
jobs_descriptions = yaml.safe_load(has_workflow)
test_data = jobs_descriptions.get("test_data", {})
if not isinstance(test_data, dict):
test_data = yaml.safe_load(test_data)
parameters = test_data.pop('step_parameters', {})
replacement_parameters = test_data.pop("replacement_parameters", {})
inputs, label_map, has_uploads = load_data_dict(history_id, test_data, self.dataset_populator, self.dataset_collection_populator)
workflow_request = dict(
history="hist_id=%s" % history_id,
workflow_id=workflow_id,
)
workflow_request["inputs"] = json.dumps(label_map)
workflow_request["inputs_by"] = 'name'
if parameters:
workflow_request["parameters"] = json.dumps(parameters)
workflow_request["parameters_normalized"] = True
if replacement_parameters:
workflow_request["replacement_params"] = json.dumps(replacement_parameters)
if has_uploads:
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
invocation_response = workflow_populator.invoke_workflow_raw(workflow_id, workflow_request)
api_asserts.assert_status_code_is(invocation_response, expected_response)
invocation = invocation_response.json()
if expected_response != 200:
assert not assert_ok
return invocation
invocation_id = invocation.get('id')
if invocation_id:
# Wait for workflow to become fully scheduled and then for all jobs
# complete.
if wait:
workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id, assert_ok=assert_ok)
jobs = self.dataset_populator.history_jobs(history_id)
return RunJobsSummary(
history_id=history_id,
workflow_id=workflow_id,
invocation_id=invocation_id,
inputs=inputs,
jobs=jobs,
invocation=invocation,
workflow_request=workflow_request
)
[docs] def dump_workflow(self, workflow_id, style=None):
raw_workflow = self.download_workflow(workflow_id, style=style)
if style == "format2_wrapped_yaml":
print(raw_workflow["yaml_content"])
else:
print(json.dumps(raw_workflow, sort_keys=True, indent=2))
RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'invocation_id', 'inputs', 'jobs', 'invocation', 'workflow_request'])
[docs]class WorkflowPopulator(BaseWorkflowPopulator, ImporterGalaxyInterface):
[docs] def __init__(self, galaxy_interactor):
self.galaxy_interactor = galaxy_interactor
self.dataset_populator = DatasetPopulator(galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(galaxy_interactor)
def _post(self, route, data=None, admin=False):
if data is None:
data = {}
return self.galaxy_interactor.post(route, data, admin=admin)
def _get(self, route, data=None):
if data is None:
data = {}
return self.galaxy_interactor.get(route, data=data)
# Required for ImporterGalaxyInterface interface - so we can recurisvely import
# nested workflows.
[docs] def import_workflow(self, workflow, **kwds):
workflow_str = json.dumps(workflow, indent=4)
data = {
'workflow': workflow_str,
}
data.update(**kwds)
upload_response = self._post("workflows", data=data)
assert upload_response.status_code == 200, upload_response.content
return upload_response.json()
[docs] def import_tool(self, tool):
""" Import a workflow via POST /api/workflows or
comparable interface into Galaxy.
"""
upload_response = self._import_tool_response(tool)
assert upload_response.status_code == 200, upload_response
return upload_response.json()
def _import_tool_response(self, tool):
tool_str = json.dumps(tool, indent=4)
data = {
'representation': tool_str
}
upload_response = self._post("dynamic_tools", data=data, admin=True)
return upload_response
[docs]class LibraryPopulator:
[docs] def __init__(self, galaxy_interactor):
self.galaxy_interactor = galaxy_interactor
self.dataset_populator = DatasetPopulator(galaxy_interactor)
[docs] def get_libraries(self):
get_response = self.galaxy_interactor.get("libraries")
return get_response.json()
[docs] def new_private_library(self, name):
library = self.new_library(name)
library_id = library["id"]
role_id = self.user_private_role_id()
self.set_permissions(library_id, role_id)
return library
[docs] def new_library(self, name):
data = dict(name=name)
create_response = self.galaxy_interactor.post("libraries", data=data, admin=True)
return create_response.json()
[docs] def set_permissions(self, library_id, role_id=None):
if role_id:
perm_list = json.dumps(role_id)
else:
perm_list = json.dumps([])
permissions = dict(
LIBRARY_ACCESS_in=perm_list,
LIBRARY_MODIFY_in=perm_list,
LIBRARY_ADD_in=perm_list,
LIBRARY_MANAGE_in=perm_list,
)
response = self.galaxy_interactor.post("libraries/%s/permissions" % library_id, data=permissions, admin=True)
api_asserts.assert_status_code_is(response, 200)
[docs] def user_email(self):
# deprecated - use DatasetPopulator
return self.dataset_populator.user_email()
[docs] def user_private_role_id(self):
# deprecated - use DatasetPopulator
return self.dataset_populator.user_private_role_id()
[docs] def create_dataset_request(self, library, **kwds):
upload_option = kwds.get("upload_option", "upload_file")
create_data = {
"folder_id": kwds.get("folder_id", library["root_folder_id"]),
"create_type": "file",
"files_0|NAME": kwds.get("name", "NewFile"),
"upload_option": upload_option,
"file_type": kwds.get("file_type", "auto"),
"db_key": kwds.get("db_key", "?"),
}
if kwds.get("link_data"):
create_data["link_data_only"] = "link_to_files"
if upload_option == "upload_file":
files = {
"files_0|file_data": kwds.get("file", StringIO(kwds.get("contents", "TestData"))),
}
elif upload_option == "upload_paths":
create_data["filesystem_paths"] = kwds["paths"]
files = {}
elif upload_option == "upload_directory":
create_data["server_dir"] = kwds["server_dir"]
files = {}
return create_data, files
[docs] def new_library_dataset(self, name, **create_dataset_kwds):
library = self.new_private_library(name)
payload, files = self.create_dataset_request(library, **create_dataset_kwds)
dataset = self.raw_library_contents_create(library["id"], payload, files=files).json()[0]
return self.wait_on_library_dataset(library, dataset)
[docs] def wait_on_library_dataset(self, library, dataset):
def show():
return self.galaxy_interactor.get("libraries/{}/contents/{}".format(library["id"], dataset["id"]))
wait_on_state(show, assert_ok=True, timeout=DEFAULT_TIMEOUT)
return show().json()
[docs] def raw_library_contents_create(self, library_id, payload, files=None):
if files is None:
files = {}
url_rel = "libraries/%s/contents" % library_id
return self.galaxy_interactor.post(url_rel, payload, files=files)
[docs] def show_ldda(self, library_id, library_dataset_id):
return self.galaxy_interactor.get("libraries/{}/contents/{}".format(library_id, library_dataset_id))
[docs] def new_library_dataset_in_private_library(self, library_name="private_dataset", wait=True):
library = self.new_private_library(library_name)
payload, files = self.create_dataset_request(library, file_type="txt", contents="create_test")
create_response = self.galaxy_interactor.post("libraries/%s/contents" % library["id"], payload, files=files)
api_asserts.assert_status_code_is(create_response, 200)
library_datasets = create_response.json()
assert len(library_datasets) == 1
library_dataset = library_datasets[0]
if wait:
def show():
return self.show_ldda(library["id"], library_dataset["id"])
wait_on_state(show, assert_ok=True)
library_dataset = show().json()
return library, library_dataset
[docs] def get_library_contents_with_path(self, library_id, path):
all_contents_response = self.galaxy_interactor.get("libraries/%s/contents" % library_id)
api_asserts.assert_status_code_is(all_contents_response, 200)
all_contents = all_contents_response.json()
matching = [c for c in all_contents if c["name"] == path]
if len(matching) == 0:
raise Exception("Failed to find library contents with path [{}], contents are {}".format(path, all_contents))
get_response = self.galaxy_interactor.get(matching[0]["url"])
api_asserts.assert_status_code_is(get_response, 200)
return get_response.json()
[docs] def setup_fetch_to_folder(self, test_name):
history_id = self.dataset_populator.new_history()
library = self.new_private_library(test_name)
folder_id = library["root_folder_id"][1:]
destination = {"type": "library_folder", "library_folder_id": folder_id}
return history_id, library, destination
[docs]class BaseDatasetCollectionPopulator:
[docs] def create_list_from_pairs(self, history_id, pairs, name="Dataset Collection from pairs"):
return self.create_nested_collection(history_id=history_id,
collection=pairs,
collection_type='list:paired',
name=name)
[docs] def nested_collection_identifiers(self, history_id, collection_type):
rank_types = list(reversed(collection_type.split(":")))
assert len(rank_types) > 0
rank_type_0 = rank_types[0]
if rank_type_0 == "list":
identifiers = self.list_identifiers(history_id)
else:
identifiers = self.pair_identifiers(history_id)
nested_collection_type = rank_type_0
for i, rank_type in enumerate(reversed(rank_types[1:])):
name = "test_level_%d" % (i + 1) if rank_type == "list" else "paired"
identifiers = [dict(
src="new_collection",
name=name,
collection_type=nested_collection_type,
element_identifiers=identifiers,
)]
nested_collection_type = "{}:{}".format(rank_type, nested_collection_type)
return identifiers
[docs] def create_nested_collection(self, history_id, collection_type, name=None, collection=None, element_identifiers=None):
"""Create a nested collection either from collection or using collection_type)."""
assert collection_type is not None
name = name or "Test %s" % collection_type
if collection is not None:
assert element_identifiers is None
element_identifiers = []
for i, pair in enumerate(collection):
element_identifiers.append(dict(
name="test%d" % i,
src="hdca",
id=pair
))
if element_identifiers is None:
element_identifiers = self.nested_collection_identifiers(history_id, collection_type)
payload = dict(
instance_type="history",
history_id=history_id,
element_identifiers=json.dumps(element_identifiers),
collection_type=collection_type,
name=name,
)
return self.__create(payload)
[docs] def create_list_of_pairs_in_history(self, history_id, **kwds):
return self.upload_collection(history_id, "list:paired", elements=[
{
"name": "test0",
"elements": [
{"src": "pasted", "paste_content": "TestData123", "name": "forward"},
{"src": "pasted", "paste_content": "TestData123", "name": "reverse"},
]
}
])
[docs] def create_list_of_list_in_history(self, history_id, **kwds):
# create_nested_collection will generate nested collection from just datasets,
# this function uses recursive generation of history hdcas.
collection_type = kwds.pop('collection_type', 'list:list')
collection_types = collection_type.split(':')
list = self.create_list_in_history(history_id, **kwds).json()['id']
current_collection_type = 'list'
for collection_type in collection_types[1:]:
current_collection_type = "{}:{}".format(current_collection_type, collection_type)
response = self.create_nested_collection(history_id=history_id,
collection_type=current_collection_type,
name=current_collection_type,
collection=[list])
list = response.json()['id']
return response
[docs] def create_pair_in_history(self, history_id, **kwds):
payload = self.create_pair_payload(
history_id,
instance_type="history",
**kwds
)
return self.__create(payload)
[docs] def create_list_in_history(self, history_id, **kwds):
payload = self.create_list_payload(
history_id,
instance_type="history",
**kwds
)
return self.__create(payload)
[docs] def upload_collection(self, history_id, collection_type, elements, **kwds):
payload = self.__create_payload_fetch(history_id, collection_type, contents=elements, **kwds)
return self.__create(payload)
[docs] def create_list_payload(self, history_id, **kwds):
return self.__create_payload(history_id, identifiers_func=self.list_identifiers, collection_type="list", **kwds)
[docs] def create_pair_payload(self, history_id, **kwds):
return self.__create_payload(history_id, identifiers_func=self.pair_identifiers, collection_type="paired", **kwds)
def __create_payload(self, *args, **kwds):
direct_upload = kwds.pop("direct_upload", False)
if direct_upload:
return self.__create_payload_fetch(*args, **kwds)
else:
return self.__create_payload_collection(*args, **kwds)
def __create_payload_fetch(self, history_id, collection_type, **kwds):
files = []
contents = None
if "contents" in kwds:
contents = kwds["contents"]
del kwds["contents"]
elements = []
if contents is None:
if collection_type == "paired":
contents = [("forward", "TestData123"), ("reverse", "TestData123")]
elif collection_type == "list":
contents = ["TestData123", "TestData123", "TestData123"]
else:
raise Exception("Unknown collection_type %s" % collection_type)
if isinstance(contents, list):
for i, contents_level in enumerate(contents):
# If given a full collection definition pass as is.
if isinstance(contents_level, dict):
elements.append(contents_level)
continue
element = {"src": "pasted", "ext": "txt"}
# Else older style list of contents or element ID and contents,
# convert to fetch API.
if isinstance(contents_level, tuple):
# (element_identifier, contents)
element_identifier = contents_level[0]
dataset_contents = contents_level[1]
else:
dataset_contents = contents_level
if collection_type == "list":
element_identifier = "data%d" % i
elif collection_type == "paired" and i == 0:
element_identifier = "forward"
else:
element_identifier = "reverse"
element["name"] = element_identifier
element["paste_content"] = dataset_contents
elements.append(element)
name = kwds.get("name", "Test Dataset Collection")
files_request_part = {}
for i, content in enumerate(files):
files_request_part["files_%d|file_data" % i] = StringIO(content)
targets = [{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": collection_type,
"name": name,
}]
payload = dict(
history_id=history_id,
targets=json.dumps(targets),
__files=files_request_part,
)
return payload
[docs] def wait_for_fetched_collection(self, fetch_response):
self.dataset_populator.wait_for_job(fetch_response["jobs"][0]["id"], assert_ok=True)
initial_dataset_collection = fetch_response["outputs"][0]
dataset_collection = self.dataset_populator.get_history_collection_details(initial_dataset_collection["history_id"], hid=initial_dataset_collection["hid"])
return dataset_collection
def __create_payload_collection(self, history_id, identifiers_func, collection_type, **kwds):
contents = None
if "contents" in kwds:
contents = kwds["contents"]
del kwds["contents"]
if "element_identifiers" not in kwds:
kwds["element_identifiers"] = json.dumps(identifiers_func(history_id, contents=contents))
if "name" not in kwds:
kwds["name"] = "Test Dataset Collection"
payload = dict(
history_id=history_id,
collection_type=collection_type,
**kwds
)
return payload
[docs] def pair_identifiers(self, history_id, contents=None):
hda1, hda2 = self.__datasets(history_id, count=2, contents=contents)
element_identifiers = [
dict(name="forward", src="hda", id=hda1["id"]),
dict(name="reverse", src="hda", id=hda2["id"]),
]
return element_identifiers
[docs] def list_identifiers(self, history_id, contents=None):
count = 3 if contents is None else len(contents)
# Contents can be a list of strings (with name auto-assigned here) or a list of
# 2-tuples of form (name, dataset_content).
if contents and isinstance(contents[0], tuple):
hdas = self.__datasets(history_id, count=count, contents=[c[1] for c in contents])
def hda_to_identifier(i, hda):
return dict(name=contents[i][0], src="hda", id=hda["id"])
else:
hdas = self.__datasets(history_id, count=count, contents=contents)
def hda_to_identifier(i, hda):
return dict(name="data%d" % (i + 1), src="hda", id=hda["id"])
element_identifiers = [hda_to_identifier(i, hda) for (i, hda) in enumerate(hdas)]
return element_identifiers
def __create(self, payload):
# Create a colleciton - either from existing datasets using collection creation API
# or from direct uploads with the fetch API. Dispatch on "targets" keyword in payload
# to decide which to use.
if "targets" not in payload:
return self._create_collection(payload)
else:
return self.dataset_populator.fetch(payload)
def __datasets(self, history_id, count, contents=None):
datasets = []
for i in range(count):
new_kwds = {}
if contents:
new_kwds["content"] = contents[i]
datasets.append(self.dataset_populator.new_dataset(history_id, **new_kwds))
return datasets
[docs] def wait_for_dataset_collection(self, create_payload, assert_ok=False, timeout=DEFAULT_TIMEOUT):
for element in create_payload["elements"]:
if element['element_type'] == 'hda':
self.dataset_populator.wait_for_dataset(history_id=element['object']['history_id'],
dataset_id=element['object']['id'],
assert_ok=assert_ok,
timeout=timeout)
elif element['element_type'] == 'dataset_collection':
self.wait_for_dataset_collection(element['object'], assert_ok=assert_ok, timeout=timeout)
[docs]class DatasetCollectionPopulator(BaseDatasetCollectionPopulator):
[docs] def __init__(self, galaxy_interactor):
self.galaxy_interactor = galaxy_interactor
self.dataset_populator = DatasetPopulator(galaxy_interactor)
def _create_collection(self, payload):
create_response = self.galaxy_interactor.post("dataset_collections", data=payload)
return create_response
[docs]def load_data_dict(history_id, test_data, dataset_populator, dataset_collection_populator):
"""Load a dictionary as inputs to a workflow (test data focused)."""
def open_test_data(test_dict, mode="rb"):
test_data_resolver = TestDataResolver()
filename = test_data_resolver.get_filename(test_dict["value"])
return open(filename, mode)
def read_test_data(test_dict):
return open_test_data(test_dict, mode="r").read()
inputs = {}
label_map = {}
has_uploads = False
for key, value in test_data.items():
is_dict = isinstance(value, dict)
if is_dict and ("elements" in value or value.get("type", None) in ["list:paired", "list", "paired"]):
elements_data = value.get("elements", [])
elements = []
for element_data in elements_data:
# Adapt differences between test_data dict and fetch API description.
if "name" not in element_data:
identifier = element_data["identifier"]
element_data["name"] = identifier
input_type = element_data.get("type", "raw")
content = None
if input_type == "File":
content = read_test_data(element_data)
else:
content = element_data["content"]
if content is not None:
element_data["src"] = "pasted"
element_data["paste_content"] = content
elements.append(element_data)
# TODO: make this collection_type
collection_type = value["type"]
new_collection_kwds = {}
if "name" in value:
new_collection_kwds["name"] = value["name"]
if collection_type == "list:paired":
fetch_response = dataset_collection_populator.create_list_of_pairs_in_history(history_id, contents=elements, **new_collection_kwds).json()
elif collection_type == "list":
fetch_response = dataset_collection_populator.create_list_in_history(history_id, contents=elements, direct_upload=True, **new_collection_kwds).json()
else:
fetch_response = dataset_collection_populator.create_pair_in_history(history_id, contents=elements or None, direct_upload=True, **new_collection_kwds).json()
hdca_output = fetch_response["outputs"][0]
hdca = dataset_populator.ds_entry(hdca_output)
hdca["hid"] = hdca_output["hid"]
label_map[key] = hdca
inputs[key] = hdca
has_uploads = True
elif is_dict and "type" in value:
input_type = value["type"]
if input_type == "File":
content = open_test_data(value)
new_dataset_kwds = {
"content": content
}
if "name" in value:
new_dataset_kwds["name"] = value["name"]
if "file_type" in value:
new_dataset_kwds["file_type"] = value["file_type"]
hda = dataset_populator.new_dataset(history_id, **new_dataset_kwds)
label_map[key] = dataset_populator.ds_entry(hda)
has_uploads = True
elif input_type == "raw":
label_map[key] = value["value"]
inputs[key] = value["value"]
elif not is_dict:
has_uploads = True
hda = dataset_populator.new_dataset(history_id, content=value)
label_map[key] = dataset_populator.ds_entry(hda)
inputs[key] = hda
else:
raise ValueError("Invalid test_data def %s" % test_data)
return inputs, label_map, has_uploads
[docs]def stage_inputs(galaxy_interactor, history_id, job, use_path_paste=True, use_fetch_api=True, to_posix_lines=True):
"""Alternative to load_data_dict that uses production-style workflow inputs."""
inputs, datasets = InteractorStaging(galaxy_interactor, use_fetch_api=use_fetch_api).stage(
"workflow", history_id=history_id, job=job, use_path_paste=use_path_paste, to_posix_lines=to_posix_lines
)
return inputs, datasets
[docs]def stage_rules_example(galaxy_interactor, history_id, example):
"""Wrapper around stage_inputs for staging collections defined by rules spec DSL."""
input_dict = example["test_data"].copy()
input_dict["collection_type"] = input_dict.pop("type")
input_dict["class"] = "Collection"
inputs, _ = stage_inputs(galaxy_interactor, history_id=history_id, job={"input": input_dict})
return inputs
[docs]def wait_on_state(state_func, desc="state", skip_states=None, assert_ok=False, timeout=DEFAULT_TIMEOUT):
def get_state():
response = state_func()
assert response.status_code == 200, "Failed to fetch state update while waiting. [%s]" % response.content
state = response.json()["state"]
if state in skip_states:
return None
else:
if assert_ok:
assert state == "ok", "Final state - %s - not okay." % state
return state
if skip_states is None:
skip_states = ["running", "queued", "new", "ready"]
try:
return wait_on(get_state, desc=desc, timeout=timeout)
except TimeoutAssertionError as e:
response = state_func()
raise TimeoutAssertionError("{} Current response containing state [{}].".format(e, response.json()))
[docs]class GiPostGetMixin:
"""Mixin for adapting Galaxy testing populators helpers to bioblend."""
@property
def _api_key(self):
return self._gi.key
def _api_url(self):
return self._gi.url
def _get(self, route, data=None):
if data is None:
data = {}
return self._gi.make_get_request(self._url(route), data=data)
def _post(self, route, data=None):
if data is None:
data = {}
data = data.copy()
data['key'] = self._gi.key
return requests.post(self._url(route), data=data)
def _put(self, route, data=None):
if data is None:
data = {}
data = data.copy()
data['key'] = self._gi.key
return requests.put(self._url(route), data=data)
def _delete(self, route, data=None):
if data is None:
data = {}
data = data.copy()
data['key'] = self._gi.key
return requests.delete(self._url(route), data=data)
def _url(self, route):
if route.startswith("/api/"):
route = route[len("/api/"):]
return self._api_url() + "/" + route
[docs]class GiDatasetPopulator(BaseDatasetPopulator, GiPostGetMixin):
"""Implementation of BaseDatasetPopulator backed by bioblend."""
[docs] def __init__(self, gi):
"""Construct a dataset populator from a bioblend GalaxyInstance."""
self._gi = gi
[docs]class GiDatasetCollectionPopulator(BaseDatasetCollectionPopulator, GiPostGetMixin):
"""Implementation of BaseDatasetCollectionPopulator backed by bioblend."""
[docs] def __init__(self, gi):
"""Construct a dataset collection populator from a bioblend GalaxyInstance."""
self._gi = gi
self.dataset_populator = GiDatasetPopulator(gi)
self.dataset_collection_populator = GiDatasetCollectionPopulator(gi)
def _create_collection(self, payload):
create_response = self._post("dataset_collections", data=payload)
return create_response
[docs]class GiWorkflowPopulator(BaseWorkflowPopulator, GiPostGetMixin):
"""Implementation of BaseWorkflowPopulator backed by bioblend."""
[docs] def __init__(self, gi):
"""Construct a workflow populator from a bioblend GalaxyInstance."""
self._gi = gi
self.dataset_populator = GiDatasetPopulator(gi)
[docs]def wait_on(function, desc, timeout=DEFAULT_TIMEOUT):
return tool_util_wait_on(function, desc, timeout)