Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.base.populators
"""Abstractions used by the Galaxy testing frameworks for interacting with the Galaxy API.
These abstractions are geared toward testing use cases and populating fixtures.
For a more general framework for working with the Galaxy API checkout `bioblend
<https://github.com/galaxyproject/bioblend>`__.
The populators are broken into different categories of data one might want to populate
and work with (datasets, histories, libraries, and workflows). Within each populator
type abstract classes describe high-level functionality that depend on abstract
HTTP verbs executions (e.g. methods for executing GET, POST, DELETE). The abstract
classes are :class:`galaxy_test.base.populators.BaseDatasetPopulator`,
:class:`galaxy_test.base.populators.BaseWorkflowPopulator`, and
:class:`galaxy_test.base.populators.BaseDatasetCollectionPopulator`.
There are a few different concrete ways to supply these low-level verb executions.
For instance :class:`galaxy_test.base.populators.DatasetPopulator` implements the abstract
:class:`galaxy_test.base.populators.BaseDatasetPopulator` by leveraging a galaxy interactor
:class:`galaxy.tool_util.interactor.GalaxyInteractorApi`. It is non-intuitive
that the Galaxy testing framework uses the tool testing code inside Galaxy's code
base for a lot of heavy lifting. This is due to the API testing framework organically
growing from the tool testing framework that predated it and then the tool testing
framework being extracted for re-use in `Planemo <https://github.com/galaxyproject/planemo>`__, etc..
These other two concrete implementation of the populators are much more
direct and intuitive. :class:`galaxy_test.base.populators.GiDatasetPopulator`, et. al.
are populators built based on Bioblend ``gi`` objects to build URLs and describe
API keys. :class:`galaxy_test.selenium.framework.SeleniumSessionDatasetPopulator`,
et al. are populators built based on Selenium sessions to leverage Galaxy cookies
for auth for instance.
All three of these implementations are now effectively light wrappers around
`requests <https://requests.readthedocs.io/>`__. Not leveraging requests directly
is a bit ugly and this ugliness again stems from these organically growing from a
framework that originally didn't use requests at all.
API tests and Selenium tests routinely use requests directly and that is totally fine,
requests should just be filtered through the verb abstractions if that functionality
is then added to populators to be shared across tests or across testing frameworks.
"""
import contextlib
import json
import os
import random
import string
import unittest
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from functools import wraps
from io import StringIO
from operator import itemgetter
from typing import Any, Callable, Dict, Optional
import requests
import yaml
from bioblend.galaxy import GalaxyClient
from gxformat2 import (
convert_and_import_workflow,
ImporterGalaxyInterface,
)
from gxformat2._yaml import ordered_load
from pkg_resources import resource_string
from requests.models import Response
from galaxy.tool_util.client.staging import InteractorStaging
from galaxy.tool_util.verify.test_data import TestDataResolver
from galaxy.tool_util.verify.wait import (
timeout_type,
TimeoutAssertionError,
wait_on as tool_util_wait_on,
)
from galaxy.util import (
DEFAULT_SOCKET_TIMEOUT,
unicodify,
)
from . import api_asserts
from .api import ApiTestInteractor
# Simple workflow that takes an input and call cat wrapper on it.
workflow_str = unicodify(resource_string(__name__, "data/test_workflow_1.ga"))
# Simple workflow that takes an input and filters with random lines twice in a
# row - first grabbing 8 lines at random and then 6.
workflow_random_x2_str = unicodify(resource_string(__name__, "data/test_workflow_2.ga"))
DEFAULT_TIMEOUT = 60 # Secs to wait for state to turn ok
SKIP_FLAKEY_TESTS_ON_ERROR = os.environ.get("GALAXY_TEST_SKIP_FLAKEY_TESTS_ON_ERROR", None)
[docs]def flakey(method):
@wraps(method)
def wrapped_method(test_case, *args, **kwargs):
try:
method(test_case, *args, **kwargs)
except unittest.SkipTest:
raise
except Exception:
if SKIP_FLAKEY_TESTS_ON_ERROR:
raise unittest.SkipTest("Error encountered during test marked as @flakey.")
else:
raise
return wrapped_method
[docs]def skip_without_tool(tool_id):
"""Decorate an API test method as requiring a specific tool.
Have test framework skip the test case if the tool is unavailable.
"""
def method_wrapper(method):
def get_tool_ids(api_test_case):
index = api_test_case.galaxy_interactor.get("tools", data=dict(in_panel=False))
tools = index.json()
# In panels by default, so flatten out sections...
tool_ids = [itemgetter("id")(_) for _ in tools]
return tool_ids
@wraps(method)
def wrapped_method(api_test_case, *args, **kwargs):
_raise_skip_if(tool_id not in get_tool_ids(api_test_case))
return method(api_test_case, *args, **kwargs)
return wrapped_method
return method_wrapper
[docs]def skip_without_datatype(extension):
"""Decorate an API test method as requiring a specific datatype.
Have test framework skip the test case if the datatype is unavailable.
"""
def has_datatype(api_test_case):
index_response = api_test_case.galaxy_interactor.get("datatypes")
assert index_response.status_code == 200, "Failed to fetch datatypes for target Galaxy."
datatypes = index_response.json()
assert isinstance(datatypes, list)
return extension in datatypes
def method_wrapper(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwargs):
_raise_skip_if(not has_datatype(api_test_case))
method(api_test_case, *args, **kwargs)
return wrapped_method
return method_wrapper
[docs]def is_site_up(url):
try:
response = requests.get(url, timeout=10)
return response.status_code == 200
except Exception:
return False
[docs]def skip_if_site_down(url):
def method_wrapper(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwargs):
_raise_skip_if(not is_site_up(url), f"Test depends on [{url}] being up and it appears to be down.")
method(api_test_case, *args, **kwargs)
return wrapped_method
return method_wrapper
skip_if_toolshed_down = skip_if_site_down("https://toolshed.g2.bx.psu.edu")
skip_if_github_down = skip_if_site_down("https://github.com/")
[docs]def summarize_instance_history_on_error(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwds):
try:
method(api_test_case, *args, **kwds)
except Exception:
api_test_case.dataset_populator._summarize_history(api_test_case.history_id)
raise
return wrapped_method
[docs]def uses_test_history(**test_history_kwd):
"""Can override require_new and cancel_executions using kwds to decorator.
"""
def method_wrapper(method):
@wraps(method)
def wrapped_method(api_test_case, *args, **kwds):
with api_test_case.dataset_populator.test_history(**test_history_kwd) as history_id:
method(api_test_case, history_id, *args, **kwds)
return wrapped_method
return method_wrapper
def _raise_skip_if(check, *args):
if check:
from nose.plugins.skip import SkipTest
raise SkipTest(*args)
[docs]class BasePopulator(metaclass=ABCMeta):
@abstractmethod
def _post(self, route, data=None, files=None, headers=None, admin=False, json: bool = False) -> Response:
"""POST data to target Galaxy instance on specified route."""
@abstractmethod
def _put(self, route, data=None, headers=None, admin=False, json: bool = False) -> Response:
"""PUT data to target Galaxy instance on specified route."""
@abstractmethod
def _get(self, route, data=None, headers=None, admin=False) -> Response:
"""GET data from target Galaxy instance on specified route."""
@abstractmethod
def _delete(self, route, data=None, headers=None, admin=False, json: bool = False) -> Response:
"""DELETE against target Galaxy instance on specified route."""
[docs]class BaseDatasetPopulator(BasePopulator):
""" Abstract description of API operations optimized for testing
Galaxy - implementations must implement _get, _post and _delete.
"""
[docs] def new_dataset(self, history_id: str, content=None, wait: bool = False, **kwds) -> str:
"""Create a new history dataset instance (HDA) and return its ID.
:returns: the HDA id of the new object
"""
run_response = self.new_dataset_request(history_id, content=content, wait=wait, **kwds)
assert run_response.status_code == 200, f"Failed to create new dataset with response: {run_response.text}"
return run_response.json()["outputs"][0]
[docs] def new_dataset_request(self, history_id: str, content=None, wait: bool = False, **kwds) -> requests.Response:
"""Lower-level dataset creation that returns the upload tool response object.
"""
if content is None and "ftp_files" not in kwds:
content = "TestData123"
payload = self.upload_payload(history_id, content=content, **kwds)
run_response = self.tools_post(payload)
if wait:
self.wait_for_tool_run(history_id, run_response, assert_ok=kwds.get('assert_ok', True))
return run_response
[docs] def fetch(self, payload: dict, assert_ok: bool = True, timeout: timeout_type = DEFAULT_TIMEOUT, wait: Optional[bool] = None):
tool_response = self._post("tools/fetch", data=payload)
if wait is None:
wait = assert_ok
if wait:
job = self.check_run(tool_response)
self.wait_for_job(job["id"], timeout=timeout)
if assert_ok:
job = tool_response.json()["jobs"][0]
details = self.get_job_details(job["id"]).json()
assert details["state"] == "ok", details
return tool_response
[docs] def tag_dataset(self, history_id, hda_id, tags):
url = f"histories/{history_id}/contents/{hda_id}"
response = self._put(url, {'tags': tags}, json=True)
response.raise_for_status()
return response.json()
[docs] def wait_for_tool_run(self, history_id: str, run_response: requests.Response, timeout: timeout_type = DEFAULT_TIMEOUT, assert_ok: bool = True):
job = self.check_run(run_response)
self.wait_for_job(job["id"], timeout=timeout)
self.wait_for_history(history_id, assert_ok=assert_ok, timeout=timeout)
return run_response
[docs] def check_run(self, run_response: requests.Response) -> dict:
run = run_response.json()
assert run_response.status_code == 200, run
job = run["jobs"][0]
return job
[docs] def wait_for_history(self, history_id: str, assert_ok: bool = False, timeout: timeout_type = DEFAULT_TIMEOUT) -> str:
try:
return wait_on_state(lambda: self._get(f"histories/{history_id}"), desc="history state", assert_ok=assert_ok, timeout=timeout)
except AssertionError:
self._summarize_history(history_id)
raise
[docs] def wait_for_history_jobs(self, history_id: str, assert_ok: bool = False, timeout: timeout_type = DEFAULT_TIMEOUT):
def has_active_jobs():
active_jobs = self.active_history_jobs(history_id)
if len(active_jobs) == 0:
return True
else:
return None
try:
wait_on(has_active_jobs, "active jobs", timeout=timeout)
except TimeoutAssertionError as e:
jobs = self.history_jobs(history_id)
message = f"Failed waiting on active jobs to complete, current jobs are [{jobs}]. {e}"
raise TimeoutAssertionError(message)
if assert_ok:
return self.wait_for_history(history_id, assert_ok=True, timeout=timeout)
[docs] def wait_for_job(self, job_id: str, assert_ok: bool = False, timeout: timeout_type = DEFAULT_TIMEOUT):
return wait_on_state(lambda: self.get_job_details(job_id), desc="job state", assert_ok=assert_ok, timeout=timeout)
[docs] def get_job_details(self, job_id: str, full: bool = False) -> Response:
return self._get(f"jobs/{job_id}?full={full}")
[docs] def cancel_history_jobs(self, history_id: str, wait=True) -> None:
active_jobs = self.active_history_jobs(history_id)
for active_job in active_jobs:
self.cancel_job(active_job["id"])
[docs] def history_jobs(self, history_id: str) -> dict:
query_params = {"history_id": history_id, "order_by": "create_time"}
jobs_response = self._get("jobs", query_params)
assert jobs_response.status_code == 200
return jobs_response.json()
[docs] def active_history_jobs(self, history_id: str) -> list:
all_history_jobs = self.history_jobs(history_id)
active_jobs = [j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]]
return active_jobs
[docs] def delete_history(self, history_id: str) -> None:
delete_response = self._delete(f"histories/{history_id}")
delete_response.raise_for_status()
[docs] def delete_dataset(self, history_id: str, content_id: str, purge: bool = False) -> Response:
delete_response = self._delete(f"histories/{history_id}/contents/{content_id}", {'purge': purge})
return delete_response
[docs] def create_tool_from_path(self, tool_path: str) -> Response:
tool_directory = os.path.dirname(os.path.abspath(tool_path))
payload = dict(
src="from_path",
path=tool_path,
tool_directory=tool_directory,
)
return self._create_tool_raw(payload)
[docs] def create_tool(self, representation, tool_directory: Optional[str] = None) -> Response:
if isinstance(representation, dict):
representation = json.dumps(representation)
payload = dict(
representation=representation,
tool_directory=tool_directory,
)
return self._create_tool_raw(payload)
def _create_tool_raw(self, payload) -> Response:
try:
create_response = self._post("dynamic_tools", data=payload, admin=True)
except TypeError:
create_response = self._post("dynamic_tools", data=payload)
assert create_response.status_code == 200, create_response.text
return create_response.json()
[docs] def list_dynamic_tools(self) -> list:
list_response = self._get("dynamic_tools", admin=True)
assert list_response.status_code == 200, list_response
return list_response.json()
[docs] def show_dynamic_tool(self, uuid) -> dict:
show_response = self._get(f"dynamic_tools/{uuid}", admin=True)
assert show_response.status_code == 200, show_response
return show_response.json()
[docs] def deactivate_dynamic_tool(self, uuid) -> dict:
delete_response = self._delete(f"dynamic_tools/{uuid}", admin=True)
return delete_response.json()
def _summarize_history(self, history_id: str) -> None:
"""Abstract method for summarizing a target history - override to provide details."""
[docs] @contextlib.contextmanager
def test_history(self, cancel_executions: bool = True, require_new: bool = True, **kwds):
cleanup = "GALAXY_TEST_NO_CLEANUP" not in os.environ
history_id = None
def wrap_up():
if cleanup and cancel_executions:
self.cancel_history_jobs(history_id)
try:
if not require_new:
history_id = kwds.get("GALAXY_TEST_HISTORY_ID", None)
history_id = history_id or self.new_history()
yield history_id
wrap_up()
except Exception:
if history_id:
self._summarize_history(history_id)
wrap_up()
raise
[docs] def new_history(self, name="API Test History", **kwds) -> str:
create_history_response = self._post("histories", data=dict(name=name))
assert "id" in create_history_response.json(), create_history_response.text
history_id = create_history_response.json()["id"]
return history_id
[docs] def copy_history(self, history_id, name="API Test Copied History", **kwds) -> Response:
return self._post("histories", data={"name": name, "history_id": history_id, **kwds})
[docs] def upload_payload(self, history_id: str, content: str = None, **kwds) -> dict:
name = kwds.get("name", "Test_Dataset")
dbkey = kwds.get("dbkey", "?")
file_type = kwds.get("file_type", 'txt')
upload_params = {
'files_0|NAME': name,
'dbkey': dbkey,
'file_type': file_type,
}
if dbkey is None:
del upload_params["dbkey"]
if content is None:
upload_params["files_0|ftp_files"] = kwds.get("ftp_files")
elif hasattr(content, 'read'):
upload_params["files_0|file_data"] = content
else:
upload_params['files_0|url_paste'] = content
if "to_posix_lines" in kwds:
upload_params["files_0|to_posix_lines"] = kwds["to_posix_lines"]
if "space_to_tab" in kwds:
upload_params["files_0|space_to_tab"] = kwds["space_to_tab"]
if "auto_decompress" in kwds:
upload_params["files_0|auto_decompress"] = kwds["auto_decompress"]
upload_params.update(kwds.get("extra_inputs", {}))
return self.run_tool_payload(
tool_id='upload1',
inputs=upload_params,
history_id=history_id,
upload_type='upload_dataset'
)
[docs] def get_remote_files(self, target: str = "ftp") -> dict:
response = self._get("remote_files", data={"target": target})
response.raise_for_status()
return response.json()
[docs] def run_tool_payload(self, tool_id: str, inputs: dict, history_id: str, **kwds) -> dict:
# Remove files_%d|file_data parameters from inputs dict and attach
# as __files dictionary.
for key, value in list(inputs.items()):
if key.startswith("files_") and key.endswith("|file_data"):
if "__files" not in kwds:
kwds["__files"] = {}
kwds["__files"][key] = value
del inputs[key]
return dict(
tool_id=tool_id,
inputs=json.dumps(inputs),
history_id=history_id,
**kwds
)
[docs] def build_tool_state(self, tool_id: str, history_id: str):
response = self._post(f"tools/{tool_id}/build?history_id={history_id}")
response.raise_for_status()
return response.json()
[docs] def run_tool(self, tool_id: str, inputs: dict, history_id: str, assert_ok: bool = True, **kwds):
payload = self.run_tool_payload(tool_id, inputs, history_id, **kwds)
tool_response = self.tools_post(payload)
if assert_ok:
api_asserts.assert_status_code_is(tool_response, 200)
return tool_response.json()
else:
return tool_response
[docs] def tools_post(self, payload: dict, url="tools") -> Response:
tool_response = self._post(url, data=payload)
return tool_response
[docs] def get_history_dataset_content(self, history_id: str, wait=True, filename=None, type='text', raw=False, **kwds):
dataset_id = self.__history_content_id(history_id, wait=wait, **kwds)
data = {}
if filename:
data["filename"] = filename
if raw:
data['raw'] = True
display_response = self._get_contents_request(history_id, f"/{dataset_id}/display", data=data)
assert display_response.status_code == 200, display_response.text
if type == 'text':
return display_response.text
else:
return display_response.content
[docs] def get_history_dataset_details(self, history_id: str, **kwds) -> dict:
dataset_id = self.__history_content_id(history_id, **kwds)
details_response = self.get_history_dataset_details_raw(history_id, dataset_id)
details_response.raise_for_status()
return details_response.json()
[docs] def get_history_dataset_details_raw(self, history_id: str, dataset_id: str) -> Response:
details_response = self._get_contents_request(history_id, f"/datasets/{dataset_id}")
return details_response
[docs] def get_history_dataset_extra_files(self, history_id: str, **kwds) -> list:
dataset_id = self.__history_content_id(history_id, **kwds)
details_response = self._get_contents_request(history_id, f"/{dataset_id}/extra_files")
assert details_response.status_code == 200, details_response.content
return details_response.json()
[docs] def get_history_collection_details(self, history_id: str, **kwds) -> dict:
hdca_id = self.__history_content_id(history_id, **kwds)
details_response = self._get_contents_request(history_id, f"/dataset_collections/{hdca_id}")
assert details_response.status_code == 200, details_response.content
return details_response.json()
[docs] def run_collection_creates_list(self, history_id: str, hdca_id: str) -> Response:
inputs = {
"input1": {"src": "hdca", "id": hdca_id},
}
self.wait_for_history(history_id, assert_ok=True)
return self.run_tool("collection_creates_list", inputs, history_id)
[docs] def run_exit_code_from_file(self, history_id: str, hdca_id: str) -> dict:
exit_code_inputs = {
"input": {'batch': True, 'values': [{"src": "hdca", "id": hdca_id}]},
}
response = self.run_tool("exit_code_from_file", exit_code_inputs, history_id, assert_ok=False).json()
self.wait_for_history(history_id, assert_ok=False)
return response
def __history_content_id(self, history_id: str, wait=True, **kwds) -> str:
if wait:
assert_ok = kwds.get("assert_ok", True)
self.wait_for_history(history_id, assert_ok=assert_ok)
# kwds should contain a 'dataset' object response, a 'dataset_id' or
# the last dataset in the history will be fetched.
if "dataset_id" in kwds:
history_content_id = kwds["dataset_id"]
elif "content_id" in kwds:
history_content_id = kwds["content_id"]
elif "dataset" in kwds:
history_content_id = kwds["dataset"]["id"]
else:
hid = kwds.get("hid", None) # If not hid, just grab last dataset
history_contents = self._get_contents_request(history_id).json()
if hid:
history_content_id = None
for history_item in history_contents:
if history_item["hid"] == hid:
history_content_id = history_item["id"]
if history_content_id is None:
raise Exception(f"Could not find content with HID [{hid}] in [{history_contents}]")
else:
# No hid specified - just grab most recent element of correct content type
if kwds.get('history_content_type'):
history_contents = [c for c in history_contents if c['history_content_type'] == kwds['history_content_type']]
history_content_id = history_contents[-1]["id"]
return history_content_id
def _get_contents_request(self, history_id: str, suffix: str = "", data=None) -> Response:
if data is None:
data = {}
url = f"histories/{history_id}/contents"
if suffix:
url = f"{url}{suffix}"
return self._get(url, data=data)
[docs] def ds_entry(self, history_content: dict) -> dict:
src = 'hda'
if 'history_content_type' in history_content and history_content['history_content_type'] == "dataset_collection":
src = 'hdca'
return dict(src=src, id=history_content["id"])
[docs] def dataset_storage_info(self, dataset_id: str) -> dict:
storage_response = self._get(f"datasets/{dataset_id}/storage")
storage_response.raise_for_status()
return storage_response.json()
[docs] def get_roles(self) -> list:
roles_response = self._get("roles", admin=True)
assert roles_response.status_code == 200
return roles_response.json()
[docs] def get_configuration(self, admin=False) -> Dict[str, Any]:
response = self._get("configuration", admin=admin)
api_asserts.assert_status_code_is_ok(response)
configuration = response.json()
return configuration
[docs] def user_email(self) -> str:
users_response = self._get("users")
users = users_response.json()
assert len(users) == 1
return users[0]["email"]
[docs] def user_id(self) -> str:
users_response = self._get("users")
users = users_response.json()
assert len(users) == 1
return users[0]["id"]
[docs] def user_private_role_id(self) -> str:
user_email = self.user_email()
roles = self.get_roles()
users_roles = [r for r in roles if r["name"] == user_email]
assert len(users_roles) == 1, f"Did not find exactly one role for email {user_email} - {users_roles}"
role = users_roles[0]
assert "id" in role, role
return role["id"]
[docs] def create_role(self, user_ids: list, description: str = None) -> dict:
payload = {
"name": self.get_random_name(prefix="testpop"),
"description": description or "Test Role",
"user_ids": user_ids,
}
role_response = self._post("roles", data=payload, admin=True, json=True)
assert role_response.status_code == 200
return role_response.json()
[docs] def create_quota(self, quota_payload: dict) -> dict:
quota_response = self._post("quotas", data=quota_payload, admin=True)
quota_response.raise_for_status()
return quota_response.json()
[docs] def get_quotas(self) -> list:
quota_response = self._get("quotas", admin=True)
quota_response.raise_for_status()
return quota_response.json()
[docs] def make_private(self, history_id: str, dataset_id: str) -> dict:
role_id = self.user_private_role_id()
# Give manage permission to the user.
payload = {
"access": [role_id],
"manage": [role_id],
}
url = f"histories/{history_id}/contents/{dataset_id}/permissions"
update_response = self._put(url, payload, admin=True, json=True)
assert update_response.status_code == 200, update_response.content
return update_response.json()
[docs] def validate_dataset(self, history_id, dataset_id):
url = f"histories/{history_id}/contents/{dataset_id}/validate"
update_response = self.galaxy_interactor._put(url, {})
assert update_response.status_code == 200, update_response.content
return update_response.json()
[docs] def validate_dataset_and_wait(self, history_id, dataset_id):
self.validate_dataset(history_id, dataset_id)
def validated():
metadata = self.get_history_dataset_details(history_id, dataset_id=dataset_id)
validated_state = metadata['validated_state']
if validated_state == 'unknown':
return
else:
return validated_state
return wait_on(
validated,
"dataset validation"
)
[docs] def setup_history_for_export_testing(self, history_name):
history_id = self.new_history(name=history_name)
hda = self.new_dataset(history_id, content="1 2 3")
tags = ['name:name']
response = self.tag_dataset(history_id, hda['id'], tags=tags)
assert response['tags'] == tags
deleted_hda = self.new_dataset(history_id, content="1 2 3", wait=True)
self.delete_dataset(history_id, deleted_hda["id"])
deleted_details = self.get_history_dataset_details(history_id, id=deleted_hda["id"])
assert deleted_details["deleted"]
return history_id
[docs] def prepare_export(self, history_id, data):
url = f"histories/{history_id}/exports"
put_response = self._put(url, data, json=True)
put_response.raise_for_status()
if put_response.status_code == 202:
def export_ready_response():
put_response = self._put(url)
if put_response.status_code == 202:
return None
return put_response
put_response = wait_on(export_ready_response, desc="export ready")
api_asserts.assert_status_code_is(put_response, 200)
return put_response
else:
job_desc = put_response.json()
assert "job_id" in job_desc
return self.wait_for_job(job_desc["job_id"])
[docs] def export_url(self, history_id, data, check_download=True):
put_response = self.prepare_export(history_id, data)
response = put_response.json()
api_asserts.assert_has_keys(response, "download_url")
download_url = response["download_url"]
if check_download:
self.get_export_url(download_url)
return download_url
[docs] def get_export_url(self, export_url):
full_download_url = f"{export_url}?key={self._api_key}"
download_response = self._get(full_download_url)
api_asserts.assert_status_code_is(download_response, 200)
return download_response
[docs] def import_history(self, import_data):
files = {}
archive_file = import_data.pop("archive_file", None)
if archive_file:
files["archive_file"] = archive_file
import_response = self._post("histories", data=import_data, files=files)
api_asserts.assert_status_code_is(import_response, 200)
[docs] def import_history_and_wait_for_name(self, import_data, history_name):
def history_names():
return {h["name"]: h for h in self.get_histories()}
import_name = f"imported from archive: {history_name}"
assert import_name not in history_names()
self.import_history(import_data)
def has_history_with_name():
histories = history_names()
return histories.get(import_name, None)
imported_history = wait_on(has_history_with_name, desc="import history")
imported_history_id = imported_history["id"]
self.wait_for_history(imported_history_id)
return imported_history_id
[docs] def rename_history(self, history_id, new_name):
update_url = f"histories/{history_id}"
put_response = self._put(update_url, {"name": new_name}, json=True)
return put_response
[docs] def get_histories(self):
history_index_response = self._get("histories")
api_asserts.assert_status_code_is(history_index_response, 200)
return history_index_response.json()
[docs] def wait_on_history_length(self, history_id, wait_on_history_length):
def history_has_length():
history_length = self.history_length(history_id)
return None if history_length != wait_on_history_length else True
wait_on(history_has_length, desc="import history population")
[docs] def history_length(self, history_id):
contents_response = self._get(f"histories/{history_id}/contents")
api_asserts.assert_status_code_is(contents_response, 200)
contents = contents_response.json()
return len(contents)
[docs] def reimport_history(self, history_id, history_name, wait_on_history_length, export_kwds, url, api_key):
# Export the history.
download_path = self.export_url(history_id, export_kwds, check_download=True)
# Create download for history
full_download_url = f"{url}{download_path}?key={api_key}"
import_data = dict(archive_source=full_download_url, archive_type="url")
imported_history_id = self.import_history_and_wait_for_name(import_data, history_name)
if wait_on_history_length:
self.wait_on_history_length(imported_history_id, wait_on_history_length)
return imported_history_id
[docs] def get_random_name(self, prefix=None, suffix=None, len=10):
# stolen from navigates_galaxy.py
return '{}{}{}'.format(
prefix or '',
''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(len)),
suffix or '',
)
[docs] def wait_for_dataset(self, history_id, dataset_id, assert_ok=False, timeout=DEFAULT_TIMEOUT):
return wait_on_state(lambda: self._get(f"histories/{history_id}/contents/{dataset_id}"), desc="dataset state", assert_ok=assert_ok, timeout=timeout)
[docs]class GalaxyInteractorHttpMixin:
galaxy_interactor: ApiTestInteractor
@property
def _api_key(self):
return self.galaxy_interactor.api_key
def _post(self, route, data=None, files=None, headers=None, admin=False, json: bool = False) -> Response:
return self.galaxy_interactor.post(route, data, files=files, admin=admin, headers=headers, json=json)
def _put(self, route, data=None, headers=None, admin=False, json: bool = False):
return self.galaxy_interactor.put(route, data, headers=headers, admin=admin, json=json)
def _get(self, route, data=None, headers=None, admin=False):
if data is None:
data = {}
return self.galaxy_interactor.get(route, data=data, headers=headers, admin=admin)
def _delete(self, route, data=None, headers=None, admin=False, json: bool = False):
if data is None:
data = {}
return self.galaxy_interactor.delete(route, data=data, headers=headers, admin=admin, json=json)
[docs]class DatasetPopulator(GalaxyInteractorHttpMixin, BaseDatasetPopulator):
def _summarize_history(self, history_id):
self.galaxy_interactor._summarize_history(history_id)
[docs]class BaseWorkflowPopulator(BasePopulator):
[docs] def load_workflow(self, name: str, content: str = workflow_str, add_pja=False) -> dict:
workflow = json.loads(content)
workflow["name"] = name
if add_pja:
tool_step = workflow["steps"]["2"]
tool_step["post_job_actions"]["RenameDatasetActionout_file1"] = dict(
action_type="RenameDatasetAction",
output_name="out_file1",
action_arguments=dict(newname="foo ${replaceme}"),
)
return workflow
[docs] def load_random_x2_workflow(self, name: str) -> dict:
return self.load_workflow(name, content=workflow_random_x2_str)
[docs] def load_workflow_from_resource(self, name: str, filename: Optional[str] = None) -> dict:
if filename is None:
filename = f"data/{name}.ga"
content = unicodify(resource_string(__name__, filename))
return self.load_workflow(name, content=content)
[docs] def simple_workflow(self, name: str, **create_kwds) -> str:
workflow = self.load_workflow(name)
return self.create_workflow(workflow, **create_kwds)
[docs] def import_workflow_from_path(self, from_path: str) -> str:
data = dict(
from_path=from_path
)
import_response = self._post("workflows", data=data)
api_asserts.assert_status_code_is(import_response, 200)
return import_response.json()["id"]
[docs] def create_workflow(self, workflow: dict, **create_kwds) -> str:
upload_response = self.create_workflow_response(workflow, **create_kwds)
uploaded_workflow_id = upload_response.json()["id"]
return uploaded_workflow_id
[docs] def create_workflow_response(self, workflow: dict, **create_kwds) -> Response:
data = dict(
workflow=json.dumps(workflow),
**create_kwds
)
upload_response = self._post("workflows/upload", data=data)
return upload_response
[docs] def upload_yaml_workflow(self, has_yaml, **kwds) -> str:
round_trip_conversion = kwds.get("round_trip_format_conversion", False)
client_convert = kwds.pop("client_convert", not round_trip_conversion)
kwds["convert"] = client_convert
workflow = convert_and_import_workflow(has_yaml, galaxy_interface=self, **kwds)
workflow_id = workflow["id"]
if round_trip_conversion:
workflow_yaml_wrapped = self.download_workflow(workflow_id, style="format2_wrapped_yaml")
assert "yaml_content" in workflow_yaml_wrapped, workflow_yaml_wrapped
round_trip_converted_content = workflow_yaml_wrapped["yaml_content"]
workflow_id = self.upload_yaml_workflow(round_trip_converted_content, client_convert=False, round_trip_conversion=False)
return workflow_id
[docs] def wait_for_invocation(self, workflow_id: str, invocation_id: str, timeout: timeout_type = DEFAULT_TIMEOUT, assert_ok: bool = True):
url = f"workflows/{workflow_id}/usage/{invocation_id}"
def workflow_state():
return self._get(url)
return wait_on_state(workflow_state, desc="workflow invocation state", timeout=timeout, assert_ok=assert_ok)
[docs] def history_invocations(self, history_id: str) -> list:
history_invocations_response = self._get("invocations", {"history_id": history_id})
api_asserts.assert_status_code_is(history_invocations_response, 200)
return history_invocations_response.json()
[docs] def wait_for_history_workflows(self, history_id, assert_ok=True, timeout=DEFAULT_TIMEOUT, expected_invocation_count=None):
if expected_invocation_count is not None:
def invocation_count():
invocations = self.history_invocations(history_id)
if len(invocations) == expected_invocation_count:
return True
wait_on(invocation_count, f"{expected_invocation_count} history invocations")
for invocation in self.history_invocations(history_id):
workflow_id = invocation["workflow_id"]
invocation_id = invocation["id"]
self.wait_for_workflow(workflow_id, invocation_id, history_id, timeout=timeout, assert_ok=assert_ok)
[docs] def wait_for_workflow(self, workflow_id, invocation_id, history_id, assert_ok=True, timeout=DEFAULT_TIMEOUT):
""" Wait for a workflow invocation to completely schedule and then history
to be complete. """
self.wait_for_invocation(workflow_id, invocation_id, timeout=timeout, assert_ok=assert_ok)
self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=assert_ok, timeout=timeout)
[docs] def get_invocation(self, invocation_id, step_details=False):
r = self._get(f"invocations/{invocation_id}", data={'step_details': step_details})
r.raise_for_status()
return r.json()
[docs] def get_biocompute_object(self, invocation_id):
bco_response = self._get(f"invocations/{invocation_id}/biocompute")
bco_response.raise_for_status()
return bco_response.json()
[docs] def validate_biocompute_object(self, bco, expected_schema_version='https://w3id.org/ieee/ieee-2791-schema/2791object.json'):
# TODO: actually use jsonref and jsonschema to validate this someday
api_asserts.assert_has_keys(bco, "object_id", "spec_version", "etag", "provenance_domain", "usability_domain", "description_domain", "execution_domain", "parametric_domain", "io_domain", "error_domain")
assert bco['spec_version'] == expected_schema_version
api_asserts.assert_has_keys(bco['description_domain'], "keywords", "xref", "platform", "pipeline_steps")
api_asserts.assert_has_keys(bco['execution_domain'], "script_access_type", "script", "script_driver", "software_prerequisites", "external_data_endpoints", "environment_variables")
for p in bco['parametric_domain']:
api_asserts.assert_has_keys(p, "param", "value", "step")
api_asserts.assert_has_keys(bco['io_domain'], "input_subdomain", "output_subdomain")
[docs] def invoke_workflow_raw(self, workflow_id, request: dict) -> Response:
url = f"workflows/{workflow_id}/usage"
invocation_response = self._post(url, data=request)
return invocation_response
[docs] def invoke_workflow(self, history_id: str, workflow_id: str, inputs: Optional[dict] = None, request: Optional[dict] = None, assert_ok: bool = True):
if inputs is None:
inputs = {}
if request is None:
request = {}
request["history"] = f"hist_id={history_id}",
if inputs:
request["inputs"] = json.dumps(inputs)
request["inputs_by"] = 'step_index'
invocation_response = self.invoke_workflow_raw(workflow_id, request)
if assert_ok:
api_asserts.assert_status_code_is(invocation_response, 200)
invocation_id = invocation_response.json()["id"]
return invocation_id
else:
return invocation_response
[docs] def workflow_report_json(self, workflow_id: str, invocation_id: str) -> dict:
response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}/report")
api_asserts.assert_status_code_is(response, 200)
return response.json()
[docs] def download_workflow(self, workflow_id: str, style: Optional[str] = None, history_id: Optional[str] = None) -> dict:
params = {}
if style is not None:
params["style"] = style
if history_id is not None:
params['history_id'] = history_id
response = self._get(f"workflows/{workflow_id}/download", data=params)
api_asserts.assert_status_code_is(response, 200)
if style != "format2":
return response.json()
else:
return ordered_load(response.text)
[docs] def update_workflow(self, workflow_id: str, workflow_object: dict) -> Response:
data = dict(
workflow=workflow_object
)
raw_url = f'workflows/{workflow_id}'
put_response = self._put(raw_url, data, json=True)
return put_response
[docs] def refactor_workflow(self, workflow_id: str, actions: list, dry_run: Optional[bool] = None, style: Optional[str] = None) -> Response:
data: Dict[str, Any] = dict(
actions=actions,
)
if style is not None:
data["style"] = style
if dry_run is not None:
data["dry_run"] = dry_run
raw_url = f'workflows/{workflow_id}/refactor'
put_response = self._put(raw_url, data, json=True)
return put_response
[docs] @contextlib.contextmanager
def export_for_update(self, workflow_id):
workflow_object = self.download_workflow(workflow_id)
yield workflow_object
put_respose = self.update_workflow(workflow_id, workflow_object)
put_respose.raise_for_status()
[docs] def run_workflow(self, has_workflow, test_data=None, history_id=None, wait=True, source_type=None, jobs_descriptions=None, expected_response=200, assert_ok=True, client_convert=None, round_trip_format_conversion=False, raw_yaml=False):
"""High-level wrapper around workflow API, etc. to invoke format 2 workflows."""
workflow_populator = self
if client_convert is None:
client_convert = not round_trip_format_conversion
workflow_id = workflow_populator.upload_yaml_workflow(
has_workflow,
source_type=source_type,
client_convert=client_convert,
round_trip_format_conversion=round_trip_format_conversion,
raw_yaml=raw_yaml
)
if test_data is None:
if jobs_descriptions is None:
assert source_type != "path"
jobs_descriptions = yaml.safe_load(has_workflow)
test_data = jobs_descriptions.get("test_data", {})
if not isinstance(test_data, dict):
test_data = yaml.safe_load(test_data)
parameters = test_data.pop('step_parameters', {})
replacement_parameters = test_data.pop("replacement_parameters", {})
if history_id is None:
history_id = self.dataset_populator.new_history()
inputs, label_map, has_uploads = load_data_dict(history_id, test_data, self.dataset_populator, self.dataset_collection_populator)
workflow_request = dict(
history=f"hist_id={history_id}",
workflow_id=workflow_id,
)
workflow_request["inputs"] = json.dumps(label_map)
workflow_request["inputs_by"] = 'name'
if parameters:
workflow_request["parameters"] = json.dumps(parameters)
workflow_request["parameters_normalized"] = True
if replacement_parameters:
workflow_request["replacement_params"] = json.dumps(replacement_parameters)
if has_uploads:
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
invocation_response = workflow_populator.invoke_workflow_raw(workflow_id, workflow_request)
api_asserts.assert_status_code_is(invocation_response, expected_response)
invocation = invocation_response.json()
if expected_response != 200:
assert not assert_ok
return invocation
invocation_id = invocation.get('id')
if invocation_id:
# Wait for workflow to become fully scheduled and then for all jobs
# complete.
if wait:
workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id, assert_ok=assert_ok)
jobs = self.dataset_populator.history_jobs(history_id)
return RunJobsSummary(
history_id=history_id,
workflow_id=workflow_id,
invocation_id=invocation_id,
inputs=inputs,
jobs=jobs,
invocation=invocation,
workflow_request=workflow_request
)
[docs] def dump_workflow(self, workflow_id, style=None):
raw_workflow = self.download_workflow(workflow_id, style=style)
if style == "format2_wrapped_yaml":
print(raw_workflow["yaml_content"])
else:
print(json.dumps(raw_workflow, sort_keys=True, indent=2))
RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'invocation_id', 'inputs', 'jobs', 'invocation', 'workflow_request'])
[docs]class WorkflowPopulator(GalaxyInteractorHttpMixin, BaseWorkflowPopulator, ImporterGalaxyInterface):
[docs] def __init__(self, galaxy_interactor):
self.galaxy_interactor = galaxy_interactor
self.dataset_populator = DatasetPopulator(galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(galaxy_interactor)
# Required for ImporterGalaxyInterface interface - so we can recursively import
# nested workflows.
[docs] def import_workflow(self, workflow, **kwds):
workflow_str = json.dumps(workflow, indent=4)
data = {
'workflow': workflow_str,
}
data.update(**kwds)
upload_response = self._post("workflows", data=data)
assert upload_response.status_code == 200, upload_response.content
return upload_response.json()
[docs] def import_tool(self, tool):
""" Import a workflow via POST /api/workflows or
comparable interface into Galaxy.
"""
upload_response = self._import_tool_response(tool)
assert upload_response.status_code == 200, upload_response
return upload_response.json()
def _import_tool_response(self, tool):
tool_str = json.dumps(tool, indent=4)
data = {
'representation': tool_str
}
upload_response = self._post("dynamic_tools", data=data, admin=True)
return upload_response
[docs] def scaling_workflow_yaml(self, **kwd):
workflow_dict = self._scale_workflow_dict(**kwd)
has_workflow = yaml.dump(workflow_dict)
return has_workflow
def _scale_workflow_dict(self, workflow_type="simple", **kwd):
if workflow_type == "two_outputs":
return self._scale_workflow_dict_two_outputs(**kwd)
elif workflow_type == "wave_simple":
return self._scale_workflow_dict_wave(**kwd)
else:
return self._scale_workflow_dict_simple(**kwd)
def _scale_workflow_dict_simple(self, **kwd):
collection_size = kwd.get("collection_size", 2)
workflow_depth = kwd.get("workflow_depth", 3)
scale_workflow_steps = [
{"tool_id": "create_input_collection", "state": {"collection_size": collection_size}, "label": "wf_input"},
{"tool_id": "cat", "state": {"input1": self._link("wf_input", "output")}, "label": "cat_0"}
]
for i in range(workflow_depth):
link = f"cat_{str(i)}/out_file1"
scale_workflow_steps.append(
{"tool_id": "cat", "state": {"input1": self._link(link)}, "label": f"cat_{str(i + 1)}"}
)
workflow_dict = {
"class": "GalaxyWorkflow",
"inputs": {},
"steps": scale_workflow_steps,
}
return workflow_dict
def _scale_workflow_dict_two_outputs(self, **kwd):
collection_size = kwd.get("collection_size", 10)
workflow_depth = kwd.get("workflow_depth", 10)
scale_workflow_steps = [
{"tool_id": "create_input_collection", "state": {"collection_size": collection_size}, "label": "wf_input"},
{"tool_id": "cat", "state": {"input1": self._link("wf_input"), "input2": self._link("wf_input")}, "label": "cat_0"}
]
for i in range(workflow_depth):
link1 = f"cat_{str(i)}#out_file1"
link2 = f"cat_{str(i)}#out_file2"
scale_workflow_steps.append(
{"tool_id": "cat", "state": {"input1": self._link(link1), "input2": self._link(link2)}}
)
workflow_dict = {
"class": "GalaxyWorkflow",
"inputs": {},
"steps": scale_workflow_steps,
}
return workflow_dict
def _scale_workflow_dict_wave(self, **kwd):
collection_size = kwd.get("collection_size", 10)
workflow_depth = kwd.get("workflow_depth", 10)
scale_workflow_steps = [
{"tool_id": "create_input_collection", "state": {"collection_size": collection_size}, "label": "wf_input"},
{"tool_id": "cat_list", "state": {"input1": self._link("wf_input", "output")}, "label": "step_1"},
]
for i in range(workflow_depth):
step = i + 2
if step % 2 == 1:
step_dict = {"tool_id": "cat_list", "state": {"input1": self._link(f"step_{step - 1}", "output")}}
else:
step_dict = {"tool_id": "split", "state": {"input1": self._link(f"step_{step - 1}", "out_file1")}}
step_dict["label"] = f"step_{step}"
scale_workflow_steps.append(step_dict)
workflow_dict = {
"class": "GalaxyWorkflow",
"inputs": {},
"steps": scale_workflow_steps,
}
return workflow_dict
@staticmethod
def _link(link, output_name=None):
if output_name is not None:
link = f"{str(link)}/{output_name}"
return {"$link": link}
[docs]class LibraryPopulator:
[docs] def __init__(self, galaxy_interactor):
self.galaxy_interactor = galaxy_interactor
self.dataset_populator = DatasetPopulator(galaxy_interactor)
[docs] def get_libraries(self):
get_response = self.galaxy_interactor.get("libraries")
return get_response.json()
[docs] def new_private_library(self, name):
library = self.new_library(name)
library_id = library["id"]
role_id = self.user_private_role_id()
self.set_permissions(library_id, role_id)
return library
[docs] def new_library(self, name):
data = dict(name=name)
create_response = self.galaxy_interactor.post("libraries", data=data, admin=True, json=True)
return create_response.json()
[docs] def get_permissions(
self,
library_id,
scope: Optional[str] = "current",
is_library_access: Optional[bool] = False,
page: Optional[int] = 1,
page_limit: Optional[int] = 1000,
q: Optional[str] = None,
admin: Optional[bool] = True
):
query = f"&q={q}" if q else ""
response = self.galaxy_interactor.get(
f"libraries/{library_id}/permissions?scope={scope}&is_library_access={is_library_access}&page={page}&page_limit={page_limit}{query}",
admin=admin,
)
api_asserts.assert_status_code_is(response, 200)
return response.json()
[docs] def set_permissions(self, library_id, role_id=None):
"""Old legacy way of setting permissions."""
perm_list = role_id or []
permissions = {
"LIBRARY_ACCESS_in": perm_list,
"LIBRARY_MODIFY_in": perm_list,
"LIBRARY_ADD_in": perm_list,
"LIBRARY_MANAGE_in": perm_list,
}
response = self.galaxy_interactor.post(f"libraries/{library_id}/permissions", data=permissions, admin=True, json=True)
api_asserts.assert_status_code_is(response, 200)
[docs] def set_permissions_with_action(self, library_id, role_id=None, action=None):
perm_list = role_id or []
action = action or "set_permissions"
permissions = {
"action": action,
"access_ids[]": perm_list,
"add_ids[]": perm_list,
"manage_ids[]": perm_list,
"modify_ids[]": perm_list,
}
response = self.galaxy_interactor.post(f"libraries/{library_id}/permissions", data=permissions, admin=True, json=True)
api_asserts.assert_status_code_is(response, 200)
[docs] def user_email(self):
# deprecated - use DatasetPopulator
return self.dataset_populator.user_email()
[docs] def user_private_role_id(self):
# deprecated - use DatasetPopulator
return self.dataset_populator.user_private_role_id()
[docs] def create_dataset_request(self, library, **kwds):
upload_option = kwds.get("upload_option", "upload_file")
create_data = {
"folder_id": kwds.get("folder_id", library["root_folder_id"]),
"create_type": "file",
"files_0|NAME": kwds.get("name", "NewFile"),
"upload_option": upload_option,
"file_type": kwds.get("file_type", "auto"),
"db_key": kwds.get("db_key", "?"),
}
if kwds.get("link_data"):
create_data["link_data_only"] = "link_to_files"
if upload_option == "upload_file":
files = {
"files_0|file_data": kwds.get("file", StringIO(kwds.get("contents", "TestData"))),
}
elif upload_option == "upload_paths":
create_data["filesystem_paths"] = kwds["paths"]
files = {}
elif upload_option == "upload_directory":
create_data["server_dir"] = kwds["server_dir"]
files = {}
return create_data, files
[docs] def new_library_dataset(self, name, **create_dataset_kwds):
library = self.new_private_library(name)
payload, files = self.create_dataset_request(library, **create_dataset_kwds)
dataset = self.raw_library_contents_create(library["id"], payload, files=files).json()[0]
return self.wait_on_library_dataset(library["id"], dataset["id"])
[docs] def wait_on_library_dataset(self, library_id, dataset_id):
def show():
return self.galaxy_interactor.get(f"libraries/{library_id}/contents/{dataset_id}")
wait_on_state(show, assert_ok=True, timeout=DEFAULT_TIMEOUT)
return show().json()
[docs] def raw_library_contents_create(self, library_id, payload, files=None):
if files is None:
files = {}
url_rel = f"libraries/{library_id}/contents"
return self.galaxy_interactor.post(url_rel, payload, files=files)
[docs] def show_ld(self, library_id, library_dataset_id):
response = self.galaxy_interactor.get(f"libraries/{library_id}/contents/{library_dataset_id}")
response.raise_for_status()
return response.json()
[docs] def show_ldda(self, ldda_id):
response = self.galaxy_interactor.get(f"datasets/{ldda_id}?hda_ldda=ldda")
response.raise_for_status()
return response.json()
[docs] def new_library_dataset_in_private_library(self, library_name="private_dataset", wait=True):
library = self.new_private_library(library_name)
payload, files = self.create_dataset_request(library, file_type="txt", contents="create_test")
create_response = self.galaxy_interactor.post(f"libraries/{library['id']}/contents", payload, files=files)
api_asserts.assert_status_code_is(create_response, 200)
library_datasets = create_response.json()
assert len(library_datasets) == 1
library_dataset = library_datasets[0]
if wait:
library_dataset = self.wait_on_library_dataset(library["id"], library_dataset["id"])
return library, library_dataset
[docs] def get_library_contents_with_path(self, library_id, path):
all_contents_response = self.galaxy_interactor.get(f"libraries/{library_id}/contents")
api_asserts.assert_status_code_is(all_contents_response, 200)
all_contents = all_contents_response.json()
matching = [c for c in all_contents if c["name"] == path]
if len(matching) == 0:
raise Exception(f"Failed to find library contents with path [{path}], contents are {all_contents}")
get_response = self.galaxy_interactor.get(matching[0]["url"])
api_asserts.assert_status_code_is(get_response, 200)
return get_response.json()
[docs] def setup_fetch_to_folder(self, test_name):
history_id = self.dataset_populator.new_history()
library = self.new_private_library(test_name)
folder_id = library["root_folder_id"][1:]
destination = {"type": "library_folder", "library_folder_id": folder_id}
return history_id, library, destination
[docs]class BaseDatasetCollectionPopulator:
[docs] def create_list_from_pairs(self, history_id, pairs, name="Dataset Collection from pairs"):
return self.create_nested_collection(history_id=history_id,
collection=pairs,
collection_type='list:paired',
name=name)
[docs] def nested_collection_identifiers(self, history_id, collection_type):
rank_types = list(reversed(collection_type.split(":")))
assert len(rank_types) > 0
rank_type_0 = rank_types[0]
if rank_type_0 == "list":
identifiers = self.list_identifiers(history_id)
else:
identifiers = self.pair_identifiers(history_id)
nested_collection_type = rank_type_0
for i, rank_type in enumerate(reversed(rank_types[1:])):
name = "test_level_%d" % (i + 1) if rank_type == "list" else "paired"
identifiers = [dict(
src="new_collection",
name=name,
collection_type=nested_collection_type,
element_identifiers=identifiers,
)]
nested_collection_type = f"{rank_type}:{nested_collection_type}"
return identifiers
[docs] def create_nested_collection(self, history_id, collection_type, name=None, collection=None, element_identifiers=None):
"""Create a nested collection either from collection or using collection_type)."""
assert collection_type is not None
name = name or f"Test {collection_type}"
if collection is not None:
assert element_identifiers is None
element_identifiers = []
for i, pair in enumerate(collection):
element_identifiers.append(dict(
name="test%d" % i,
src="hdca",
id=pair
))
if element_identifiers is None:
element_identifiers = self.nested_collection_identifiers(history_id, collection_type)
payload = dict(
instance_type="history",
history_id=history_id,
element_identifiers=json.dumps(element_identifiers),
collection_type=collection_type,
name=name,
)
return self.__create(payload)
[docs] def create_list_of_pairs_in_history(self, history_id, **kwds):
return self.upload_collection(history_id, "list:paired", elements=[
{
"name": "test0",
"elements": [
{"src": "pasted", "paste_content": "TestData123", "name": "forward"},
{"src": "pasted", "paste_content": "TestData123", "name": "reverse"},
]
}
])
[docs] def create_list_of_list_in_history(self, history_id, **kwds):
# create_nested_collection will generate nested collection from just datasets,
# this function uses recursive generation of history hdcas.
collection_type = kwds.pop('collection_type', 'list:list')
collection_types = collection_type.split(':')
list = self.create_list_in_history(history_id, **kwds).json()['id']
current_collection_type = 'list'
for collection_type in collection_types[1:]:
current_collection_type = f"{current_collection_type}:{collection_type}"
response = self.create_nested_collection(history_id=history_id,
collection_type=current_collection_type,
name=current_collection_type,
collection=[list])
list = response.json()['id']
return response
[docs] def create_pair_in_history(self, history_id, **kwds):
payload = self.create_pair_payload(
history_id,
instance_type="history",
**kwds
)
return self.__create(payload)
[docs] def create_list_in_history(self, history_id, **kwds):
payload = self.create_list_payload(
history_id,
instance_type="history",
**kwds
)
return self.__create(payload)
[docs] def upload_collection(self, history_id, collection_type, elements, **kwds):
payload = self.__create_payload_fetch(history_id, collection_type, contents=elements, **kwds)
return self.__create(payload)
[docs] def create_list_payload(self, history_id, **kwds):
return self.__create_payload(history_id, identifiers_func=self.list_identifiers, collection_type="list", **kwds)
[docs] def create_pair_payload(self, history_id, **kwds):
return self.__create_payload(history_id, identifiers_func=self.pair_identifiers, collection_type="paired", **kwds)
def __create_payload(self, *args, **kwds):
direct_upload = kwds.pop("direct_upload", False)
if direct_upload:
return self.__create_payload_fetch(*args, **kwds)
else:
return self.__create_payload_collection(*args, **kwds)
def __create_payload_fetch(self, history_id, collection_type, **kwds):
files = []
contents = None
if "contents" in kwds:
contents = kwds["contents"]
del kwds["contents"]
elements = []
if contents is None:
if collection_type == "paired":
contents = [("forward", "TestData123"), ("reverse", "TestData123")]
elif collection_type == "list":
contents = ["TestData123", "TestData123", "TestData123"]
else:
raise Exception(f"Unknown collection_type {collection_type}")
if isinstance(contents, list):
for i, contents_level in enumerate(contents):
# If given a full collection definition pass as is.
if isinstance(contents_level, dict):
elements.append(contents_level)
continue
element = {"src": "pasted", "ext": "txt"}
# Else older style list of contents or element ID and contents,
# convert to fetch API.
if isinstance(contents_level, tuple):
# (element_identifier, contents)
element_identifier = contents_level[0]
dataset_contents = contents_level[1]
else:
dataset_contents = contents_level
if collection_type == "list":
element_identifier = "data%d" % i
elif collection_type == "paired" and i == 0:
element_identifier = "forward"
else:
element_identifier = "reverse"
element["name"] = element_identifier
element["paste_content"] = dataset_contents
elements.append(element)
name = kwds.get("name", "Test Dataset Collection")
files_request_part = {}
for i, content in enumerate(files):
files_request_part["files_%d|file_data" % i] = StringIO(content)
targets = [{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": collection_type,
"name": name,
}]
payload = dict(
history_id=history_id,
targets=json.dumps(targets),
__files=files_request_part,
)
return payload
[docs] def wait_for_fetched_collection(self, fetch_response):
self.dataset_populator.wait_for_job(fetch_response["jobs"][0]["id"], assert_ok=True)
initial_dataset_collection = fetch_response["outputs"][0]
dataset_collection = self.dataset_populator.get_history_collection_details(initial_dataset_collection["history_id"], hid=initial_dataset_collection["hid"])
return dataset_collection
def __create_payload_collection(self, history_id, identifiers_func, collection_type, **kwds):
contents = None
if "contents" in kwds:
contents = kwds["contents"]
del kwds["contents"]
if "element_identifiers" not in kwds:
kwds["element_identifiers"] = json.dumps(identifiers_func(history_id, contents=contents))
if "name" not in kwds:
kwds["name"] = "Test Dataset Collection"
payload = dict(
history_id=history_id,
collection_type=collection_type,
**kwds
)
return payload
[docs] def pair_identifiers(self, history_id, contents=None):
hda1, hda2 = self.__datasets(history_id, count=2, contents=contents)
element_identifiers = [
dict(name="forward", src="hda", id=hda1["id"]),
dict(name="reverse", src="hda", id=hda2["id"]),
]
return element_identifiers
[docs] def list_identifiers(self, history_id, contents=None):
count = 3 if contents is None else len(contents)
# Contents can be a list of strings (with name auto-assigned here) or a list of
# 2-tuples of form (name, dataset_content).
if contents and isinstance(contents[0], tuple):
hdas = self.__datasets(history_id, count=count, contents=[c[1] for c in contents])
def hda_to_identifier(i, hda):
return dict(name=contents[i][0], src="hda", id=hda["id"])
else:
hdas = self.__datasets(history_id, count=count, contents=contents)
def hda_to_identifier(i, hda):
return dict(name="data%d" % (i + 1), src="hda", id=hda["id"])
element_identifiers = [hda_to_identifier(i, hda) for (i, hda) in enumerate(hdas)]
return element_identifiers
def __create(self, payload):
# Create a colleciton - either from existing datasets using collection creation API
# or from direct uploads with the fetch API. Dispatch on "targets" keyword in payload
# to decide which to use.
if "targets" not in payload:
return self._create_collection(payload)
else:
return self.dataset_populator.fetch(payload)
def __datasets(self, history_id, count, contents=None):
datasets = []
for i in range(count):
new_kwds = {}
if contents:
new_kwds["content"] = contents[i]
datasets.append(self.dataset_populator.new_dataset(history_id, **new_kwds))
return datasets
[docs] def wait_for_dataset_collection(self, create_payload, assert_ok=False, timeout=DEFAULT_TIMEOUT):
for element in create_payload["elements"]:
if element['element_type'] == 'hda':
self.dataset_populator.wait_for_dataset(history_id=element['object']['history_id'],
dataset_id=element['object']['id'],
assert_ok=assert_ok,
timeout=timeout)
elif element['element_type'] == 'dataset_collection':
self.wait_for_dataset_collection(element['object'], assert_ok=assert_ok, timeout=timeout)
@abstractmethod
def _create_collection(self, payload: dict) -> Response:
"""Create collection from specified payload."""
[docs]class DatasetCollectionPopulator(BaseDatasetCollectionPopulator):
[docs] def __init__(self, galaxy_interactor: ApiTestInteractor):
self.galaxy_interactor = galaxy_interactor
self.dataset_populator = DatasetPopulator(galaxy_interactor)
def _create_collection(self, payload: dict) -> Response:
create_response = self.galaxy_interactor.post("dataset_collections", data=payload)
return create_response
[docs]def load_data_dict(history_id, test_data, dataset_populator, dataset_collection_populator):
"""Load a dictionary as inputs to a workflow (test data focused)."""
def open_test_data(test_dict, mode="rb"):
test_data_resolver = TestDataResolver()
filename = test_data_resolver.get_filename(test_dict["value"])
return open(filename, mode)
def read_test_data(test_dict):
return open_test_data(test_dict, mode="r").read()
inputs = {}
label_map = {}
has_uploads = False
for key, value in test_data.items():
is_dict = isinstance(value, dict)
if is_dict and ("elements" in value or value.get('collection_type')):
elements_data = value.get("elements", [])
elements = []
for element_data in elements_data:
# Adapt differences between test_data dict and fetch API description.
if "name" not in element_data:
identifier = element_data["identifier"]
element_data["name"] = identifier
input_type = element_data.get("type", "raw")
content = None
if input_type == "File":
content = read_test_data(element_data)
else:
content = element_data["content"]
if content is not None:
element_data["src"] = "pasted"
element_data["paste_content"] = content
elements.append(element_data)
new_collection_kwds = {}
if "name" in value:
new_collection_kwds["name"] = value["name"]
collection_type = value.get('collection_type', '')
if collection_type == "list:paired":
fetch_response = dataset_collection_populator.create_list_of_pairs_in_history(history_id, contents=elements, **new_collection_kwds).json()
elif collection_type and ':' in collection_type:
fetch_response = {'outputs': [dataset_collection_populator.create_nested_collection(history_id, collection_type=collection_type, **new_collection_kwds).json()]}
elif collection_type == "list":
fetch_response = dataset_collection_populator.create_list_in_history(history_id, contents=elements, direct_upload=True, **new_collection_kwds).json()
else:
fetch_response = dataset_collection_populator.create_pair_in_history(history_id, contents=elements or None, direct_upload=True, **new_collection_kwds).json()
hdca_output = fetch_response["outputs"][0]
hdca = dataset_populator.ds_entry(hdca_output)
hdca["hid"] = hdca_output["hid"]
label_map[key] = hdca
inputs[key] = hdca
has_uploads = True
elif is_dict and "type" in value:
input_type = value["type"]
if input_type == "File":
content = open_test_data(value)
new_dataset_kwds = {
"content": content
}
if "name" in value:
new_dataset_kwds["name"] = value["name"]
if "file_type" in value:
new_dataset_kwds["file_type"] = value["file_type"]
hda = dataset_populator.new_dataset(history_id, **new_dataset_kwds)
label_map[key] = dataset_populator.ds_entry(hda)
has_uploads = True
elif input_type == "raw":
label_map[key] = value["value"]
inputs[key] = value["value"]
elif not is_dict:
has_uploads = True
hda = dataset_populator.new_dataset(history_id, content=value)
label_map[key] = dataset_populator.ds_entry(hda)
inputs[key] = hda
else:
raise ValueError(f"Invalid test_data def {test_data}")
return inputs, label_map, has_uploads
[docs]def stage_inputs(galaxy_interactor, history_id, job, use_path_paste=True, use_fetch_api=True, to_posix_lines=True):
"""Alternative to load_data_dict that uses production-style workflow inputs."""
inputs, datasets = InteractorStaging(galaxy_interactor, use_fetch_api=use_fetch_api).stage(
"workflow", history_id=history_id, job=job, use_path_paste=use_path_paste, to_posix_lines=to_posix_lines
)
return inputs, datasets
[docs]def stage_rules_example(galaxy_interactor, history_id, example):
"""Wrapper around stage_inputs for staging collections defined by rules spec DSL."""
input_dict = example["test_data"].copy()
input_dict["collection_type"] = input_dict.pop("type")
input_dict["class"] = "Collection"
inputs, _ = stage_inputs(galaxy_interactor, history_id=history_id, job={"input": input_dict})
return inputs
[docs]def wait_on_state(state_func: Callable, desc="state", skip_states=None, ok_states=None, assert_ok=False, timeout=DEFAULT_TIMEOUT) -> str:
def get_state():
response = state_func()
assert response.status_code == 200, f"Failed to fetch state update while waiting. [{response.content}]"
state = response.json()["state"]
if state in skip_states:
return None
else:
if assert_ok:
assert state in ok_states, f"Final state - {state} - not okay."
return state
if skip_states is None:
skip_states = ["running", "queued", "new", "ready", "stop", "stopped", "setting_metadata"]
if ok_states is None:
ok_states = ["ok", "scheduled"]
try:
return wait_on(get_state, desc=desc, timeout=timeout)
except TimeoutAssertionError as e:
response = state_func()
raise TimeoutAssertionError(f"{e} Current response containing state [{response.json()}].")
[docs]class GiHttpMixin:
"""Mixin for adapting Galaxy testing populators helpers to bioblend."""
_gi: GalaxyClient
@property
def _api_key(self):
return self._gi.key
def _api_url(self):
return self._gi.url
def _get(self, route, data=None):
if data is None:
data = {}
return self._gi.make_get_request(self._url(route), data=data)
def _post(self, route, data=None, files=None, headers=None, admin=False, json: bool = False) -> Response:
if data is None:
data = {}
data = data.copy()
data['key'] = self._gi.key
return requests.post(self._url(route), data=data, headers=headers, timeout=DEFAULT_SOCKET_TIMEOUT)
def _put(self, route, data=None, headers=None, admin=False, json: bool = False):
if data is None:
data = {}
data = data.copy()
data['key'] = self._gi.key
return requests.put(self._url(route), data=data, headers=headers, timeout=DEFAULT_SOCKET_TIMEOUT)
def _delete(self, route, data=None, headers=None, admin=False, json: bool = False):
if data is None:
data = {}
data = data.copy()
data['key'] = self._gi.key
return requests.delete(self._url(route), data=data, headers=headers, timeout=DEFAULT_SOCKET_TIMEOUT)
def _url(self, route):
if route.startswith("/api/"):
route = route[len("/api/"):]
return f"{self._api_url()}/{route}"
[docs]class GiDatasetPopulator(BaseDatasetPopulator, GiHttpMixin):
"""Implementation of BaseDatasetPopulator backed by bioblend."""
[docs] def __init__(self, gi):
"""Construct a dataset populator from a bioblend GalaxyInstance."""
self._gi = gi
[docs]class GiDatasetCollectionPopulator(BaseDatasetCollectionPopulator, GiHttpMixin):
"""Implementation of BaseDatasetCollectionPopulator backed by bioblend."""
[docs] def __init__(self, gi):
"""Construct a dataset collection populator from a bioblend GalaxyInstance."""
self._gi = gi
self.dataset_populator = GiDatasetPopulator(gi)
self.dataset_collection_populator = GiDatasetCollectionPopulator(gi)
def _create_collection(self, payload):
create_response = self._post("dataset_collections", data=payload)
return create_response
[docs]class GiWorkflowPopulator(BaseWorkflowPopulator, GiHttpMixin):
"""Implementation of BaseWorkflowPopulator backed by bioblend."""
[docs] def __init__(self, gi):
"""Construct a workflow populator from a bioblend GalaxyInstance."""
self._gi = gi
self.dataset_populator = GiDatasetPopulator(gi)
[docs]def wait_on(function, desc, timeout=DEFAULT_TIMEOUT):
return tool_util_wait_on(function, desc, timeout)