Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

galaxy_test.base package

Submodules

galaxy_test.base.api module

galaxy_test.base.api.celery_config()[source]
class galaxy_test.base.api.UsesCeleryTasks[source]

Bases: object

classmethod handle_galaxy_config_kwds(config)[source]
celery_worker_parameters()[source]
celery_parameters()[source]
class galaxy_test.base.api.UsesApiTestCaseMixin[source]

Bases: object

url: str
tearDown()[source]
class galaxy_test.base.api.ApiTestInteractor(test_case, api_key=None)[source]

Bases: TestCaseGalaxyInteractor

Specialized variant of the API interactor (originally developed for tool functional tests) for testing the API generally.

__init__(test_case, api_key=None)[source]
get(*args, **kwds)[source]
head(*args, **kwds)[source]
post(*args, **kwds)[source]
delete(*args, **kwds)[source]
patch(*args, **kwds)[source]
put(*args, **kwds)[source]

galaxy_test.base.api_asserts module

Utility methods for making assertions about Galaxy API responses, etc…

galaxy_test.base.api_asserts.assert_status_code_is(response: Response, expected_status_code: int)[source]

Assert that the supplied response has the expect status code.

galaxy_test.base.api_asserts.assert_status_code_is_ok(response: Response)[source]

Assert that the supplied response is okay.

The easier alternative response.raise_for_status() might be perferable generally.

galaxy_test.base.api_asserts.assert_has_keys(response: dict, *keys: str)[source]

Assert that the supplied response (dict) has the supplied keys.

galaxy_test.base.api_asserts.assert_not_has_keys(response: dict, *keys: str)[source]

Assert that the supplied response (dict) does not have the supplied keys.

galaxy_test.base.api_asserts.assert_error_code_is(response: Union[Response, dict], error_code: Union[int, ErrorCode])[source]

Assert that the supplied response has the supplied Galaxy error code.

Galaxy error codes can be imported from galaxy.exceptions.error_codes to test against.

galaxy_test.base.api_asserts.assert_object_id_error(response: Response)[source]
galaxy_test.base.api_asserts.assert_error_message_contains(response: Union[Response, dict], expected_contains: str)[source]
galaxy_test.base.api_asserts.assert_has_key(response: dict, *keys: str)

Assert that the supplied response (dict) has the supplied keys.

galaxy_test.base.api_util module

galaxy_test.base.api_util.get_admin_api_key() str[source]

Test admin API key to use for functional tests.

This key should be configured as a admin API key and should be able to create additional users and keys.

galaxy_test.base.api_util.get_user_api_key() Optional[str][source]

Test user API key to use for functional tests.

If set, this should drive API based testing - if not set an admin API key will be used to create a new user and API key for tests.

galaxy_test.base.constants module

Just constants useful for testing across test types.

galaxy_test.base.env module

Base utilities for working Galaxy test environments.

galaxy_test.base.env.setup_keep_outdir() str[source]
galaxy_test.base.env.target_url_parts() Tuple[str, Optional[str], str][source]
galaxy_test.base.env.get_ip_address(ifname: str) str[source]

galaxy_test.base.instrument module

Utilities to help instrument tool tests.

Including structured data nose plugin that allows storing arbitrary structured data on a per test case basis - used by tool test to store inputs, output problems, job tests, etc… but could easily by used by other test types in a different way.

galaxy_test.base.instrument.register_job_data(data)[source]
galaxy_test.base.instrument.fetch_job_data()[source]
class galaxy_test.base.instrument.StructuredTestDataPlugin[source]

Bases: Plugin

name = 'structureddata'
options(parser, env)[source]

Register commandline options.

Implement this method for normal options behavior with protection from OptionConflictErrors. If you override this method and want the default –with-$name option to be registered, be sure to call super().

configure(options, conf)[source]

Configure the plugin and system, based on selected options.

The base plugin class sets the plugin to enabled if the enable option for the plugin (self.enableOpt) is true.

finalize(result)[source]
addError(test, *args, **kwds)
addFailure(test, *args, **kwds)
addSuccess(test, *args, **kwds)
report(stream)[source]

galaxy_test.base.interactor module

class galaxy_test.base.interactor.TestCaseGalaxyInteractor(functional_test_case, test_user=None, api_key=None)[source]

Bases: GalaxyInteractorApi

__init__(functional_test_case, test_user=None, api_key=None)[source]

galaxy_test.base.nose_util module

Utilities for dealing with nose.

There was some duplication between Galaxy, Tool Shed, and Install/Test, trying to reduce that here.

galaxy_test.base.nose_util.run(test_config, plugins=None)[source]

galaxy_test.base.populators module

Abstractions used by the Galaxy testing frameworks for interacting with the Galaxy API.

These abstractions are geared toward testing use cases and populating fixtures. For a more general framework for working with the Galaxy API checkout bioblend.

The populators are broken into different categories of data one might want to populate and work with (datasets, histories, libraries, and workflows). Within each populator type abstract classes describe high-level functionality that depend on abstract HTTP verbs executions (e.g. methods for executing GET, POST, DELETE). The abstract classes are galaxy_test.base.populators.BaseDatasetPopulator, galaxy_test.base.populators.BaseWorkflowPopulator, and galaxy_test.base.populators.BaseDatasetCollectionPopulator.

There are a few different concrete ways to supply these low-level verb executions. For instance galaxy_test.base.populators.DatasetPopulator implements the abstract galaxy_test.base.populators.BaseDatasetPopulator by leveraging a galaxy interactor galaxy.tool_util.interactor.GalaxyInteractorApi. It is non-intuitive that the Galaxy testing framework uses the tool testing code inside Galaxy’s code base for a lot of heavy lifting. This is due to the API testing framework organically growing from the tool testing framework that predated it and then the tool testing framework being extracted for re-use in Planemo, etc..

These other two concrete implementation of the populators are much more direct and intuitive. galaxy_test.base.populators.GiDatasetPopulator, et. al. are populators built based on Bioblend gi objects to build URLs and describe API keys. galaxy_test.selenium.framework.SeleniumSessionDatasetPopulator, et al. are populators built based on Selenium sessions to leverage Galaxy cookies for auth for instance.

All three of these implementations are now effectively light wrappers around requests. Not leveraging requests directly is a bit ugly and this ugliness again stems from these organically growing from a framework that originally didn’t use requests at all.

API tests and Selenium tests routinely use requests directly and that is totally fine, requests should just be filtered through the verb abstractions if that functionality is then added to populators to be shared across tests or across testing frameworks.

galaxy_test.base.populators.flakey(method)[source]
galaxy_test.base.populators.skip_without_tool(tool_id)[source]

Decorate an API test method as requiring a specific tool.

Have test framework skip the test case if the tool is unavailable.

galaxy_test.base.populators.skip_without_asgi(method)[source]
galaxy_test.base.populators.skip_without_datatype(extension)[source]

Decorate an API test method as requiring a specific datatype.

Have test framework skip the test case if the datatype is unavailable.

galaxy_test.base.populators.is_site_up(url)[source]
galaxy_test.base.populators.skip_if_site_down(url)[source]
galaxy_test.base.populators.skip_if_toolshed_down(method)
galaxy_test.base.populators.skip_if_github_down(method)
galaxy_test.base.populators.summarize_instance_history_on_error(method)[source]
galaxy_test.base.populators.uses_test_history(**test_history_kwd)[source]

Can override require_new and cancel_executions using kwds to decorator.

galaxy_test.base.populators.conformance_tests_gen(directory, filename='conformance_tests.yaml')[source]
class galaxy_test.base.populators.CwlRun(dataset_populator, history_id)[source]

Bases: object

__init__(dataset_populator, history_id)[source]
class galaxy_test.base.populators.CwlToolRun(dataset_populator, history_id, run_response)[source]

Bases: CwlRun

__init__(dataset_populator, history_id, run_response)[source]
property job_id
wait()[source]
class galaxy_test.base.populators.CwlWorkflowRun(dataset_populator, workflow_populator, history_id, workflow_id, invocation_id)[source]

Bases: CwlRun

__init__(dataset_populator, workflow_populator, history_id, workflow_id, invocation_id)[source]
wait()[source]
class galaxy_test.base.populators.BasePopulator[source]

Bases: object

galaxy_interactor: ApiTestInteractor
class galaxy_test.base.populators.BaseDatasetPopulator[source]

Bases: BasePopulator

Abstract description of API operations optimized for testing Galaxy - implementations must implement _get, _post and _delete.

new_dataset(history_id: str, content=None, wait: bool = False, fetch_data=True, to_posix_lines=True, auto_decompress=True, **kwds) dict[source]

Create a new history dataset instance (HDA) and return its ID.

Returns

the HDA id of the new object

new_dataset_request(history_id: str, content=None, wait: bool = False, fetch_data=True, **kwds) Response[source]

Lower-level dataset creation that returns the upload tool response object.

fetch(payload: dict, assert_ok: bool = True, timeout: Union[int, float] = 60, wait: Optional[bool] = None)[source]
fetch_hdas(history_id: str, items: List[Dict[str, Any]], wait: bool = True) List[Dict[str, Any]][source]
fetch_hda(history_id, item: Dict[str, Any], wait: bool = True) Dict[str, Any][source]
create_deferred_hda(history_id, uri: str, ext: Optional[str] = None) Dict[str, Any][source]
tag_dataset(history_id, hda_id, tags)[source]
create_from_store_raw(payload: Dict[str, Any]) Response[source]
create_from_store_raw_async(payload: Dict[str, Any]) Response[source]
create_from_store(store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None, model_store_format: Optional[str] = None) Dict[str, Any][source]
create_from_store_async(store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None, model_store_format: Optional[str] = None) Dict[str, Any][source]
create_contents_from_store_raw(history_id: str, payload: Dict[str, Any]) Response[source]
create_contents_from_store(history_id: str, store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None) List[Dict[str, Any]][source]
download_contents_to_store(history_id: str, history_content: Dict[str, Any], extension='.tgz') str[source]
reupload_contents(history_content: Dict[str, Any])[source]
wait_for_tool_run(history_id: str, run_response: Response, timeout: Union[int, float] = 60, assert_ok: bool = True)[source]
check_run(run_response: Response) dict[source]
wait_for_history(history_id: str, assert_ok: bool = False, timeout: Union[int, float] = 60) str[source]
wait_for_history_jobs(history_id: str, assert_ok: bool = False, timeout: Union[int, float] = 60)[source]
wait_for_jobs(jobs: Union[List[dict], List[str]], assert_ok: bool = False, timeout: Union[int, float] = 60, ok_states=None)[source]
wait_for_job(job_id: str, assert_ok: bool = False, timeout: Union[int, float] = 60, ok_states=None)[source]
get_job_details(job_id: str, full: bool = False) Response[source]
cancel_history_jobs(history_id: str, wait=True) None[source]
history_jobs(history_id: str) List[Dict[str, Any]][source]
active_history_jobs(history_id: str) list[source]
cancel_job(job_id: str) Response[source]
delete_history(history_id: str) None[source]
delete_dataset(history_id: str, content_id: str, purge: bool = False, stop_job: bool = False, wait_for_purge: bool = False, use_query_params: bool = False) Response[source]
wait_for_purge(history_id, content_id)[source]
create_tool_from_path(tool_path: str) Dict[str, Any][source]
create_tool(representation, tool_directory: Optional[str] = None) Dict[str, Any][source]
list_dynamic_tools() list[source]
show_dynamic_tool(uuid) dict[source]
deactivate_dynamic_tool(uuid) dict[source]
test_history(cancel_executions: bool = True, require_new: bool = True, **kwds)[source]
new_history(name='API Test History', **kwds) str[source]
copy_history(history_id, name='API Test Copied History', **kwds) Response[source]
fetch_payload(history_id: str, content: str, auto_decompress: bool = False, file_type: str = 'txt', dbkey: str = '?', name: str = 'Test_Dataset', **kwds) dict[source]
upload_payload(history_id: str, content: Optional[str] = None, **kwds) dict[source]
get_remote_files(target: str = 'ftp') dict[source]
run_tool_payload(tool_id: Optional[str], inputs: dict, history_id: str, **kwds) dict[source]
build_tool_state(tool_id: str, history_id: str)[source]
run_tool_raw(tool_id: Optional[str], inputs: dict, history_id: str, **kwds) Response[source]
run_tool(tool_id: str, inputs: dict, history_id: str, **kwds)[source]
tools_post(payload: dict, url='tools') Response[source]
materialize_dataset_instance(history_id: str, id: str, source: str = 'hda')[source]
get_history_dataset_content(history_id: str, wait=True, filename=None, type='text', raw=False, **kwds)[source]
get_history_dataset_source_transform_actions(history_id: str, **kwd) Set[str][source]
get_history_dataset_details(history_id: str, **kwds) dict[source]
get_history_dataset_details_raw(history_id: str, dataset_id: str) Response[source]
get_history_dataset_extra_files(history_id: str, **kwds) list[source]
get_history_collection_details(history_id: str, **kwds) dict[source]
run_collection_creates_list(history_id: str, hdca_id: str) Response[source]
run_exit_code_from_file(history_id: str, hdca_id: str) dict[source]
get_history_contents(history_id: str) List[Dict[str, Any]][source]
ds_entry(history_content: dict) dict[source]
dataset_storage_info(dataset_id: str) Dict[str, Any][source]
dataset_storage_info_raw(dataset_id: str) Response[source]
get_roles() list[source]
get_configuration(admin=False) Dict[str, Any][source]
user_email() str[source]
user_id() str[source]
user_private_role_id() str[source]
create_role(user_ids: list, description: Optional[str] = None) dict[source]
create_quota(quota_payload: dict) dict[source]
get_quotas() list[source]
make_private(history_id: str, dataset_id: str) dict[source]
make_public(history_id: str) dict[source]
validate_dataset(history_id: str, dataset_id: str) Dict[str, Any][source]
validate_dataset_and_wait(history_id, dataset_id) Optional[str][source]
setup_history_for_export_testing(history_name)[source]
prepare_export(history_id, data)[source]
export_url(history_id: str, data, check_download: bool = True) str[source]
get_export_url(export_url) Response[source]
import_history(import_data)[source]
wait_for_history_with_name(history_name: str, desc: str) Dict[str, Any][source]
import_history_and_wait_for_name(import_data, history_name)[source]
history_names() Dict[str, Dict][source]
rename_history(history_id, new_name)[source]
get_histories()[source]
wait_on_history_length(history_id: str, wait_on_history_length: int)[source]
wait_on_download(download_request_response: Response) Response[source]
assert_download_request_ok(download_request_response: Response) str[source]

Assert response is valid and okay and extract storage request ID.

wait_for_download_ready(storage_request_id: str)[source]
wait_on_task(async_task_response: Response)[source]
wait_on_task_id(task_id: str)[source]
wait_on_download_request(storage_request_id: str) Response[source]
history_length(history_id)[source]
reimport_history(history_id, history_name, wait_on_history_length, export_kwds, task_based=False)[source]
get_random_name(prefix=None, suffix=None, len=10)[source]
wait_for_dataset(history_id: str, dataset_id: str, assert_ok: bool = False, timeout: Union[int, float] = 60) str[source]
new_page(slug: str = 'mypage', title: str = 'MY PAGE', content_format: str = 'html', content: Optional[str] = None) Dict[str, Any][source]
new_page_raw(slug: str = 'mypage', title: str = 'MY PAGE', content_format: str = 'html', content: Optional[str] = None) Response[source]
new_page_payload(slug: str = 'mypage', title: str = 'MY PAGE', content_format: str = 'html', content: Optional[str] = None) Dict[str, str][source]
export_history_to_uri_async(history_id: str, target_uri: str, model_store_format: str = 'tgz', include_files: bool = True)[source]
import_history_from_uri_async(target_uri: str, model_store_format: str)[source]
galaxy_interactor: ApiTestInteractor
class galaxy_test.base.populators.GalaxyInteractorHttpMixin[source]

Bases: object

galaxy_interactor: ApiTestInteractor
class galaxy_test.base.populators.DatasetPopulator(galaxy_interactor)[source]

Bases: GalaxyInteractorHttpMixin, BaseDatasetPopulator

__init__(galaxy_interactor)[source]
galaxy_interactor: ApiTestInteractor
class galaxy_test.base.populators.BaseWorkflowPopulator[source]

Bases: BasePopulator

dataset_populator: BaseDatasetPopulator
dataset_collection_populator: BaseDatasetCollectionPopulator
load_workflow(name: str, content: str = '{\n    "a_galaxy_workflow": "true", \n    "annotation": "simple workflow",\n    "format-version": "0.1", \n    "name": "TestWorkflow1", \n    "steps": {\n        "0": {\n            "annotation": "input1 description", \n            "id": 0, \n            "input_connections": {}, \n            "inputs": [\n                {\n                    "description": "input1 description", \n                    "name": "WorkflowInput1"\n                }\n            ], \n            "name": "Input dataset", \n            "outputs": [], \n            "position": {\n                "left": 199.55555772781372, \n                "top": 200.66666460037231\n            }, \n            "tool_errors": null, \n            "tool_id": null, \n            "tool_state": "{\\"name\\": \\"WorkflowInput1\\"}", \n            "tool_version": null, \n            "type": "data_input", \n            "user_outputs": []\n        }, \n        "1": {\n            "annotation": "", \n            "id": 1, \n            "input_connections": {}, \n            "inputs": [\n                {\n                    "description": "", \n                    "name": "WorkflowInput2"\n                }\n            ], \n            "name": "Input dataset", \n            "outputs": [], \n            "position": {\n                "left": 206.22221422195435, \n                "top": 327.33335161209106\n            }, \n            "tool_errors": null, \n            "tool_id": null, \n            "tool_state": "{\\"name\\": \\"WorkflowInput2\\"}", \n            "tool_version": null, \n            "type": "data_input", \n            "user_outputs": []\n        }, \n        "2": {\n            "annotation": "", \n            "id": 2, \n            "input_connections": {\n                "input1": {\n                    "id": 0, \n                    "output_name": "output"\n                }, \n                "queries_0|input2": {\n                    "id": 1, \n                    "output_name": "output"\n                }\n            }, \n            "inputs": [], \n            "name": "Concatenate datasets", \n            "outputs": [\n                {\n                    "name": "out_file1", \n                    "type": "input"\n                }\n            ], \n            "position": {\n                "left": 419.33335876464844, \n                "top": 200.44446563720703\n            }, \n            "post_job_actions": {}, \n            "tool_errors": null, \n            "tool_id": "cat1", \n            "tool_state": "{\\"__page__\\": 0, \\"__rerun_remap_job_id__\\": null, \\"input1\\": \\"null\\", \\"queries\\": \\"[{\\\\\\"input2\\\\\\": null, \\\\\\"__index__\\\\\\": 0}]\\"}", \n            "tool_version": "1.0.0", \n            "type": "tool", \n            "user_outputs": []\n        }\n    }\n}\n', add_pja=False) dict[source]
load_random_x2_workflow(name: str) dict[source]
load_workflow_from_resource(name: str, filename: Optional[str] = None) dict[source]
simple_workflow(name: str, **create_kwds) str[source]
import_workflow_from_path_raw(from_path: str, object_id: Optional[str] = None) Response[source]
import_workflow_from_path(from_path: str, object_id: Optional[str] = None) str[source]
create_workflow(workflow: Dict[str, Any], **create_kwds) str[source]
create_workflow_response(workflow: Dict[str, Any], **create_kwds) Response[source]
upload_yaml_workflow(has_yaml, **kwds) str[source]
wait_for_invocation(workflow_id: str, invocation_id: str, timeout: Union[int, float] = 60, assert_ok: bool = True) str[source]
workflow_invocations(workflow_id: str) List[Dict[str, Any]][source]
history_invocations(history_id: str) List[Dict[str, Any]][source]
wait_for_history_workflows(history_id: str, assert_ok: bool = True, timeout: Union[int, float] = 60, expected_invocation_count: Optional[int] = None) None[source]
wait_for_workflow(workflow_id: str, invocation_id: str, history_id: str, assert_ok: bool = True, timeout: Union[int, float] = 60) None[source]

Wait for a workflow invocation to completely schedule and then history to be complete.

get_invocation(invocation_id, step_details=False)[source]
download_invocation_to_store(invocation_id, extension='tgz')[source]
download_invocation_to_uri(invocation_id, target_uri, extension='tgz')[source]
create_invocation_from_store_raw(history_id: str, store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None, model_store_format: Optional[str] = None) Response[source]
create_invocation_from_store(history_id: str, store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None, model_store_format: Optional[str] = None) Response[source]
get_biocompute_object(invocation_id)[source]
validate_biocompute_object(bco, expected_schema_version='https://w3id.org/ieee/ieee-2791-schema/2791object.json')[source]
invoke_workflow_raw(workflow_id: str, request: dict, assert_ok: bool = False) Response[source]
invoke_workflow(workflow_id: str, history_id: Optional[str] = None, inputs: Optional[dict] = None, request: Optional[dict] = None, inputs_by: str = 'step_index') Response[source]
invoke_workflow_and_assert_ok(workflow_id: str, history_id: Optional[str] = None, inputs: Optional[dict] = None, request: Optional[dict] = None, inputs_by: str = 'step_index') str[source]
invoke_workflow_and_wait(workflow_id: str, history_id: Optional[str] = None, inputs: Optional[dict] = None, request: Optional[dict] = None) Response[source]
workflow_report_json(workflow_id: str, invocation_id: str) dict[source]
download_workflow(workflow_id: str, style: Optional[str] = None, history_id: Optional[str] = None) dict[source]
set_tags(workflow_id: str, tags: List[str]) None[source]
update_workflow(workflow_id: str, workflow_object: dict) Response[source]
refactor_workflow(workflow_id: str, actions: list, dry_run: Optional[bool] = None, style: Optional[str] = None) Response[source]
export_for_update(workflow_id)[source]
run_workflow(has_workflow, test_data=None, history_id=None, wait=True, source_type=None, jobs_descriptions=None, expected_response=200, assert_ok=True, client_convert=None, round_trip_format_conversion=False, invocations=1, raw_yaml=False)[source]

High-level wrapper around workflow API, etc. to invoke format 2 workflows.

dump_workflow(workflow_id, style=None)[source]
workflow_inputs(workflow_id: str) Dict[str, Dict[str, Any]][source]
build_ds_map(workflow_id: str, label_map: Dict[str, Any]) str[source]
setup_workflow_run(workflow: Optional[Dict[str, Any]] = None, inputs_by: str = 'step_id', history_id: Optional[str] = None, workflow_id: Optional[str] = None) Tuple[Dict[str, Any], str, str][source]
wait_for_invocation_and_jobs(history_id: str, workflow_id: str, invocation_id: str, assert_ok: bool = True) None[source]
index(show_shared: Optional[bool] = None, show_published: Optional[bool] = None, sort_by: Optional[str] = None, sort_desc: Optional[bool] = None, limit: Optional[int] = None, offset: Optional[int] = None, search: Optional[str] = None, skip_step_counts: Optional[bool] = None)[source]
index_ids(show_shared: Optional[bool] = None, show_published: Optional[bool] = None, sort_by: Optional[str] = None, sort_desc: Optional[bool] = None, limit: Optional[int] = None, offset: Optional[int] = None, search: Optional[str] = None)[source]
share_with_user(workflow_id: str, user_id_or_email: str)[source]
class galaxy_test.base.populators.RunJobsSummary(history_id, workflow_id, invocation_id, inputs, jobs, invocation, workflow_request)[source]

Bases: tuple

property history_id

Alias for field number 0

property workflow_id

Alias for field number 1

property invocation_id

Alias for field number 2

property inputs

Alias for field number 3

property jobs

Alias for field number 4

property invocation

Alias for field number 5

property workflow_request

Alias for field number 6

class galaxy_test.base.populators.WorkflowPopulator(galaxy_interactor)[source]

Bases: GalaxyInteractorHttpMixin, BaseWorkflowPopulator, ImporterGalaxyInterface

__init__(galaxy_interactor)[source]
galaxy_interactor: ApiTestInteractor
import_workflow(workflow, **kwds) Dict[str, Any][source]

Import a workflow via POST /api/workflows or comparable interface into Galaxy.

import_tool(tool) Dict[str, Any][source]

Import a workflow via POST /api/workflows or comparable interface into Galaxy.

scaling_workflow_yaml(**kwd)[source]
class galaxy_test.base.populators.CwlPopulator(dataset_populator: DatasetPopulator, workflow_populator: WorkflowPopulator)[source]

Bases: object

__init__(dataset_populator: DatasetPopulator, workflow_populator: WorkflowPopulator)[source]
get_conformance_test(version: str, doc: str)[source]
run_cwl_job(artifact: str, job_path: Optional[str] = None, job: Optional[Dict] = None, test_data_directory: Optional[str] = None, history_id: Optional[str] = None, assert_ok=True)[source]
Parameters

artifact – CWL tool id, or (absolute or relative) path to a CWL tool or workflow file

run_conformance_test(version: str, doc: str)[source]
class galaxy_test.base.populators.LibraryPopulator(galaxy_interactor)[source]

Bases: object

__init__(galaxy_interactor)[source]
get_libraries()[source]
new_private_library(name)[source]
create_from_store_raw(payload: Dict[str, Any]) Response[source]
create_from_store(store_dict: Optional[Dict[str, Any]] = None, store_path: Optional[str] = None) List[Dict[str, Any]][source]
new_library(name)[source]
fetch_single_url_to_folder(file_type='auto', assert_ok=True)[source]
get_permissions(library_id, scope: Optional[str] = 'current', is_library_access: Optional[bool] = False, page: Optional[int] = 1, page_limit: Optional[int] = 1000, q: Optional[str] = None, admin: Optional[bool] = True)[source]
set_permissions(library_id, role_id=None)[source]

Old legacy way of setting permissions.

set_permissions_with_action(library_id, role_id=None, action=None)[source]
set_access_permission(library_id, role_id, action=None)[source]
set_add_permission(library_id, role_id, action=None)[source]
set_manage_permission(library_id, role_id, action=None)[source]
set_modify_permission(library_id, role_id, action=None)[source]
user_email()[source]
user_private_role_id()[source]
create_dataset_request(library, **kwds)[source]
new_library_dataset(name, **create_dataset_kwds)[source]
wait_on_library_dataset(library_id, dataset_id)[source]
raw_library_contents_create(library_id, payload, files=None)[source]
show_ld(library_id, library_dataset_id)[source]
show_ldda(ldda_id)[source]
new_library_dataset_in_private_library(library_name='private_dataset', wait=True)[source]
get_library_contents(library_id: str) List[Dict[str, Any]][source]
get_library_contents_with_path(library_id: str, path: str) Dict[str, Any][source]
setup_fetch_to_folder(test_name)[source]
class galaxy_test.base.populators.BaseDatasetCollectionPopulator[source]

Bases: object

dataset_populator: BaseDatasetPopulator
create_list_from_pairs(history_id, pairs, name='Dataset Collection from pairs')[source]
nested_collection_identifiers(history_id, collection_type)[source]
create_nested_collection(history_id, collection_type, name=None, collection=None, element_identifiers=None)[source]

Create a nested collection either from collection or using collection_type).

create_list_of_pairs_in_history(history_id, **kwds)[source]
create_list_of_list_in_history(history_id, **kwds)[source]
create_pair_in_history(history_id, wait=False, **kwds)[source]
create_list_in_history(history_id, wait=False, **kwds)[source]
upload_collection(history_id, collection_type, elements, wait=False, **kwds)[source]
create_list_payload(history_id, **kwds)[source]
create_pair_payload(history_id, **kwds)[source]
wait_for_fetched_collection(fetch_response: Union[Dict[str, Any], Response])[source]
pair_identifiers(history_id, contents=None)[source]
list_identifiers(history_id, contents=None)[source]
wait_for_dataset_collection(create_payload: dict, assert_ok: bool = False, timeout: Union[int, float] = 60) None[source]
class galaxy_test.base.populators.DatasetCollectionPopulator(galaxy_interactor: ApiTestInteractor)[source]

Bases: BaseDatasetCollectionPopulator

__init__(galaxy_interactor: ApiTestInteractor)[source]
dataset_populator: BaseDatasetPopulator
galaxy_test.base.populators.load_data_dict(history_id: str, test_data: Dict[str, Any], dataset_populator: BaseDatasetPopulator, dataset_collection_populator: BaseDatasetCollectionPopulator) Tuple[Dict[str, Any], Dict[str, Any], bool][source]

Load a dictionary as inputs to a workflow (test data focused).

galaxy_test.base.populators.stage_inputs(galaxy_interactor, history_id, job, use_path_paste=True, use_fetch_api=True, to_posix_lines=True, tool_or_workflow='workflow', job_dir=None)[source]

Alternative to load_data_dict that uses production-style workflow inputs.

galaxy_test.base.populators.stage_rules_example(galaxy_interactor, history_id, example)[source]

Wrapper around stage_inputs for staging collections defined by rules spec DSL.

galaxy_test.base.populators.wait_on_state(state_func: Callable, desc: str = 'state', skip_states=None, ok_states=None, assert_ok: bool = False, timeout: Union[int, float] = 60) str[source]
class galaxy_test.base.populators.GiHttpMixin[source]

Bases: object

Mixin for adapting Galaxy testing populators helpers to bioblend.

class galaxy_test.base.populators.GiDatasetPopulator(gi)[source]

Bases: GiHttpMixin, BaseDatasetPopulator

Implementation of BaseDatasetPopulator backed by bioblend.

__init__(gi)[source]

Construct a dataset populator from a bioblend GalaxyInstance.

class galaxy_test.base.populators.GiDatasetCollectionPopulator(gi)[source]

Bases: GiHttpMixin, BaseDatasetCollectionPopulator

Implementation of BaseDatasetCollectionPopulator backed by bioblend.

__init__(gi)[source]

Construct a dataset collection populator from a bioblend GalaxyInstance.

class galaxy_test.base.populators.GiWorkflowPopulator(gi)[source]

Bases: GiHttpMixin, BaseWorkflowPopulator

Implementation of BaseWorkflowPopulator backed by bioblend.

__init__(gi)[source]

Construct a workflow populator from a bioblend GalaxyInstance.

galaxy_test.base.populators.wait_on(function: Callable, desc: str, timeout: Union[int, float] = 60)[source]

galaxy_test.base.rules_test_data module

galaxy_test.base.rules_test_data.check_example_1(hdca, dataset_populator)[source]
galaxy_test.base.rules_test_data.check_example_2(hdca, dataset_populator)[source]
galaxy_test.base.rules_test_data.check_example_3(hdca, dataset_populator)[source]
galaxy_test.base.rules_test_data.check_example_4(hdca, dataset_populator)[source]
galaxy_test.base.rules_test_data.check_example_5(hdca, dataset_populator)[source]
galaxy_test.base.rules_test_data.check_example_6(hdca, dataset_populator)[source]

galaxy_test.base.testcase module

class galaxy_test.base.testcase.FunctionalTestCase(methodName='runTest')[source]

Bases: TestCase

Base class for tests targetting actual Galaxy servers.

Subclass should override galaxy_driver_class if a Galaxy server needs to be launched to run the test, this base class assumes a server is already running.

galaxy_driver_class: Optional[type] = None
history_id: Optional[str]
host: str
port: Optional[str]
url: str
keepOutdir: str
test_data_resolver: TestDataResolver
setUp() None[source]

Hook method for setting up the test fixture before exercising it.

classmethod setUpClass()[source]

Configure and start Galaxy for a test.

classmethod tearDownClass()[source]

Shutdown Galaxy server and cleanup temp directory.

get_filename(filename: str) str[source]

galaxy_test.base.uses_shed module

class galaxy_test.base.uses_shed.UsesShed[source]

Bases: object

shed_tools_dir: ClassVar[str]
shed_tool_data_dir: ClassVar[str]
conda_tmp_prefix: ClassVar[str]
galaxy_interactor: GalaxyInteractorApi
classmethod configure_shed(config)[source]
classmethod configure_shed_and_conda(config)[source]
setup_shed_config()[source]
reset_shed_tools()[source]
delete_repo_request(payload)[source]
install_repo_request(payload)[source]
repository_operation(operation, owner, name, changeset, tool_shed_url='https://toolshed.g2.bx.psu.edu')[source]
install_repository(owner, name, changeset, tool_shed_url='https://toolshed.g2.bx.psu.edu')[source]
uninstall_repository(owner, name, changeset, tool_shed_url='https://toolshed.g2.bx.psu.edu')[source]

galaxy_test.base.workflow_fixtures module