Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_workflows

import json
import os
import shutil
import time
from json import dumps
from tempfile import mkdtemp
from typing import Any, cast, Dict, Optional, Tuple, Union
from uuid import uuid4

import pytest
from requests import delete, get, put

from galaxy.exceptions import error_codes
from galaxy_test.base import rules_test_data
from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    RunJobsSummary,
    skip_without_tool,
    wait_on,
    WorkflowPopulator
)
from galaxy_test.base.workflow_fixtures import (
    WORKFLOW_NESTED_REPLACEMENT_PARAMETER,
    WORKFLOW_NESTED_RUNTIME_PARAMETER,
    WORKFLOW_NESTED_SIMPLE,
    WORKFLOW_ONE_STEP_DEFAULT,
    WORKFLOW_OPTIONAL_FALSE_INPUT_COLLECTION,
    WORKFLOW_OPTIONAL_FALSE_INPUT_DATA,
    WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION,
    WORKFLOW_OPTIONAL_TRUE_INPUT_DATA,
    WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
    WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL,
    WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED,
    WORKFLOW_RENAME_ON_INPUT,
    WORKFLOW_RUNTIME_PARAMETER_AFTER_PAUSE,
    WORKFLOW_WITH_CUSTOM_REPORT_1,
    WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA,
    WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION,
    WORKFLOW_WITH_OUTPUT_COLLECTION,
    WORKFLOW_WITH_OUTPUT_COLLECTION_MAPPING,
    WORKFLOW_WITH_RULES_1,
)
from ._framework import ApiTestCase


WORKFLOW_SIMPLE = """
class: GalaxyWorkflow
name: Simple Workflow
inputs:
  input1: data
outputs:
  wf_output_1:
    outputSource: first_cat/out_file1
steps:
  first_cat:
    tool_id: cat1
    in:
      input1: input1
"""

NESTED_WORKFLOW_AUTO_LABELS_MODERN_SYNTAX = """
class: GalaxyWorkflow
inputs:
  outer_input: data
outputs:
  outer_output:
    outputSource: second_cat/out_file1
steps:
  first_cat:
    tool_id: cat1
    in:
      input1: outer_input
  nested_workflow:
    run:
      class: GalaxyWorkflow
      inputs:
        - id: inner_input
      outputs:
        - outputSource: 1/out_file1
      steps:
        random:
          tool_id: random_lines1
          state:
            num_lines: 1
            input:
              $link: inner_input
            seed_source:
              seed_source_selector: set_seed
              seed: asdf
    in:
      inner_input: first_cat/out_file1
  second_cat:
    tool_id: cat1
    in:
      input1: nested_workflow/1:out_file1
      queries_0|input2: nested_workflow/1:out_file1
"""


[docs]class BaseWorkflowsApiTestCase(ApiTestCase): # TODO: Find a new file for this class.
[docs] def setUp(self): super().setUp() self.workflow_populator = WorkflowPopulator(self.galaxy_interactor) self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
def _assert_user_has_workflow_with_name(self, name): names = self._workflow_names() assert name in names, f"No workflows with name {name} in users workflows <{names}>" def _workflow_names(self): index_response = self._get("workflows") self._assert_status_code_is(index_response, 200) names = [w["name"] for w in index_response.json()] return names
[docs] def import_workflow(self, workflow, **kwds): upload_response = self.workflow_populator.import_workflow(workflow, **kwds) return upload_response
def _upload_yaml_workflow(self, has_yaml, **kwds) -> str: return self.workflow_populator.upload_yaml_workflow(has_yaml, **kwds) def _setup_workflow_run(self, workflow: Optional[Dict[str, Any]] = None, inputs_by: str = 'step_id', history_id: Optional[str] = None, workflow_id: Optional[str] = None) -> Tuple[Dict[str, Any], str, str]: return self.workflow_populator.setup_workflow_run( workflow, inputs_by, history_id, workflow_id ) def _ds_entry(self, history_content): return self.dataset_populator.ds_entry(history_content) def _invocation_details(self, workflow_id, invocation_id, **kwds): invocation_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}", data=kwds) self._assert_status_code_is(invocation_details_response, 200) invocation_details = invocation_details_response.json() return invocation_details def _run_jobs(self, has_workflow, history_id=None, **kwds) -> Union[Dict[str, Any], RunJobsSummary]: if history_id is None: history_id = self.history_id return self.workflow_populator.run_workflow(has_workflow, history_id=history_id, **kwds) def _run_workflow(self, has_workflow, history_id=None, **kwds) -> RunJobsSummary: if history_id is None: history_id = self.history_id assert "expected_response" not in kwds run_summary = self.workflow_populator.run_workflow(has_workflow, history_id=history_id, **kwds) return cast(RunJobsSummary, run_summary) def _history_jobs(self, history_id): return self._get("jobs", {"history_id": history_id, "order_by": "create_time"}).json() def _assert_history_job_count(self, history_id, n): jobs = self._history_jobs(history_id) self.assertEqual(len(jobs), n) def _download_workflow(self, workflow_id, style=None, history_id=None): return self.workflow_populator.download_workflow(workflow_id, style=style, history_id=history_id) def _assert_is_runtime_input(self, tool_state_value): if not isinstance(tool_state_value, dict): tool_state_value = json.loads(tool_state_value) assert isinstance(tool_state_value, dict) assert "__class__" in tool_state_value assert tool_state_value["__class__"] == "RuntimeValue"
[docs]class ChangeDatatypeTestCase: dataset_populator: DatasetPopulator workflow_populator: WorkflowPopulator
[docs] def test_assign_column_pja(self): with self.dataset_populator.test_history() as history_id: self.workflow_populator.run_workflow(""" class: GalaxyWorkflow inputs: input1: data steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: change_datatype: bed set_columns: chromCol: 1 endCol: 2 startCol: 3 """, test_data=""" input1: value: 1.bed type: File """, history_id=history_id) details_dataset_new_col = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True) assert details_dataset_new_col["history_content_type"] == "dataset", details_dataset_new_col assert details_dataset_new_col['metadata_endCol'] == 2 assert details_dataset_new_col['metadata_startCol'] == 3
# Workflow API TODO: # - Allow history_id as param to workflow run action. (hist_id) # - Allow post to workflows/<workflow_id>/run in addition to posting to # /workflows with id in payload. # - Much more testing obviously, always more testing.
[docs]class WorkflowsApiTestCase(BaseWorkflowsApiTestCase, ChangeDatatypeTestCase):
[docs] def test_show_valid(self): workflow_id = self.workflow_populator.simple_workflow("dummy") workflow_id = self.workflow_populator.simple_workflow("test_regular") show_response = self._get(f"workflows/{workflow_id}", {"style": "instance"}) workflow = show_response.json() self._assert_looks_like_instance_workflow_representation(workflow) assert len(workflow["steps"]) == 3 self.assertEqual(sorted(step["id"] for step in workflow["steps"].values()), [0, 1, 2]) show_response = self._get(f"workflows/{workflow_id}", {"legacy": True}) workflow = show_response.json() self._assert_looks_like_instance_workflow_representation(workflow) assert len(workflow["steps"]) == 3 # Can't reay say what the legacy IDs are but must be greater than 3 because dummy # workflow was created first in this instance. self.assertNotEqual(sorted(step["id"] for step in workflow["steps"].values()), [0, 1, 2])
[docs] def test_show_invalid_key_is_400(self): show_response = self._get(f"workflows/{self._random_key()}") self._assert_status_code_is(show_response, 400)
[docs] def test_cannot_show_private_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_not_importable") with self._different_user(): show_response = self._get(f"workflows/{workflow_id}") self._assert_status_code_is(show_response, 403) # Try as anonymous user workflows_url = self._api_url(f"workflows/{workflow_id}") assert get(workflows_url).status_code == 403
[docs] def test_cannot_download_private_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_not_downloadable") with self._different_user(): with pytest.raises(AssertionError) as excinfo: self._download_workflow(workflow_id) assert '403' in str(excinfo.value) workflows_url = self._api_url(f"workflows/{workflow_id}/download") assert get(workflows_url).status_code == 403
[docs] def test_anon_can_download_importable_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_downloadable", importable=True) workflows_url = self._api_url(f"workflows/{workflow_id}/download") response = get(workflows_url) response.raise_for_status() assert response.json()["a_galaxy_workflow"] == "true"
[docs] def test_anon_can_download_public_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_downloadable", publish=True) workflows_url = self._api_url(f"workflows/{workflow_id}/download") response = get(workflows_url) response.raise_for_status() assert response.json()['a_galaxy_workflow'] == 'true'
[docs] def test_delete(self): workflow_id = self.workflow_populator.simple_workflow("test_delete") workflow_name = "test_delete" self._assert_user_has_workflow_with_name(workflow_name) workflow_url = self._api_url(f"workflows/{workflow_id}", use_key=True) delete_response = delete(workflow_url) self._assert_status_code_is(delete_response, 200) # Make sure workflow is no longer in index by default. assert workflow_name not in self._workflow_names()
[docs] def test_other_cannot_delete(self): workflow_id = self.workflow_populator.simple_workflow("test_other_delete") with self._different_user(): workflow_url = self._api_url(f"workflows/{workflow_id}", use_key=True) delete_response = delete(workflow_url) self._assert_status_code_is(delete_response, 403)
[docs] def test_index(self): index_response = self._get("workflows") self._assert_status_code_is(index_response, 200) assert isinstance(index_response.json(), list)
[docs] def test_index_deleted(self): workflow_id = self.workflow_populator.simple_workflow("test_delete") workflow_index = self._get("workflows").json() assert [w for w in workflow_index if w['id'] == workflow_id] workflow_url = self._api_url(f"workflows/{workflow_id}", use_key=True) delete_response = delete(workflow_url) self._assert_status_code_is(delete_response, 200) workflow_index = self._get("workflows").json() assert not [w for w in workflow_index if w['id'] == workflow_id] workflow_index = self._get("workflows?show_deleted=true").json() assert [w for w in workflow_index if w['id'] == workflow_id]
[docs] def test_index_hidden(self): workflow_id = self.workflow_populator.simple_workflow("test_delete") workflow_index = self._get("workflows").json() workflow = [w for w in workflow_index if w['id'] == workflow_id][0] workflow['hidden'] = True update_response = self.workflow_populator.update_workflow(workflow_id, workflow) self._assert_status_code_is(update_response, 200) assert update_response.json()['hidden'] workflow_index = self._get("workflows").json() assert not [w for w in workflow_index if w['id'] == workflow_id] workflow_index = self._get("workflows?show_hidden=true").json() assert [w for w in workflow_index if w['id'] == workflow_id]
[docs] def test_upload(self): self.__test_upload(use_deprecated_route=False)
[docs] def test_upload_deprecated(self): self.__test_upload(use_deprecated_route=True)
[docs] def test_import_tools_requires_admin(self): response = self.__test_upload(import_tools=True, assert_ok=False) assert response.status_code == 403
def __test_upload(self, use_deprecated_route=False, name="test_import", workflow=None, assert_ok=True, import_tools=False): if workflow is None: workflow = self.workflow_populator.load_workflow(name=name) data = dict( workflow=dumps(workflow), ) if import_tools: data["import_tools"] = import_tools if use_deprecated_route: route = "workflows/upload" else: route = "workflows" upload_response = self._post(route, data=data) if assert_ok: self._assert_status_code_is(upload_response, 200) self._assert_user_has_workflow_with_name(name) return upload_response
[docs] def test_get_tool_predictions(self): request = {"tool_sequence": "Cut1", "remote_model_url": "https://github.com/galaxyproject/galaxy-test-data/raw/master/tool_recommendation_model.hdf5"} actual_recommendations = ['Filter1', 'cat1', 'addValue', 'comp1', 'Grep1'] route = "workflows/get_tool_predictions" response = self._post(route, data=request) recommendation_response = response.json() is_empty = bool(recommendation_response["current_tool"]) if is_empty is False: self._assert_status_code_is(response, 400) else: # check Ok response from the API self._assert_status_code_is(response, 200) recommendation_response = response.json() # check the input tool sequence assert recommendation_response["current_tool"] == request["tool_sequence"] # check non-empty predictions list predicted_tools = recommendation_response["predicted_data"]["children"] assert len(predicted_tools) > 0 # check for the correct predictions for tool in predicted_tools: assert tool["tool_id"] in actual_recommendations break
[docs] def test_update(self): original_workflow = self.workflow_populator.load_workflow(name="test_import") uuids = {} labels = {} for order_index, step_dict in original_workflow["steps"].items(): uuid = str(uuid4()) step_dict["uuid"] = uuid uuids[order_index] = uuid label = f"label_{order_index}" step_dict["label"] = label labels[order_index] = label def check_label_and_uuid(order_index, step_dict): assert order_index in uuids assert order_index in labels self.assertEqual(uuids[order_index], step_dict["uuid"]) self.assertEqual(labels[order_index], step_dict["label"]) upload_response = self.__test_upload(workflow=original_workflow) workflow_id = upload_response.json()["id"] def update(workflow_object): put_response = self._update_workflow(workflow_id, workflow_object) self._assert_status_code_is(put_response, 200) return put_response workflow_content = self._download_workflow(workflow_id) steps = workflow_content["steps"] def tweak_step(step): order_index, step_dict = step check_label_and_uuid(order_index, step_dict) assert step_dict['position']['top'] != 1 assert step_dict['position']['left'] != 1 step_dict['position'] = {'top': 1, 'left': 1} map(tweak_step, steps.items()) update(workflow_content) def check_step(step): order_index, step_dict = step check_label_and_uuid(order_index, step_dict) assert step_dict['position']['top'] == 1 assert step_dict['position']['left'] == 1 updated_workflow_content = self._download_workflow(workflow_id) map(check_step, updated_workflow_content['steps'].items()) # Re-update against original workflow... update(original_workflow) updated_workflow_content = self._download_workflow(workflow_id) # Make sure the positions have been updated. map(tweak_step, updated_workflow_content['steps'].items())
[docs] def test_update_tags(self): workflow_object = self.workflow_populator.load_workflow(name="test_import") upload_response = self.__test_upload(workflow=workflow_object) workflow = upload_response.json() workflow['tags'] = ['a_tag', 'b_tag'] update_response = self._update_workflow(workflow['id'], workflow).json() assert update_response['tags'] == ['a_tag', 'b_tag'] del workflow['tags'] update_response = self._update_workflow(workflow['id'], workflow).json() assert update_response['tags'] == ['a_tag', 'b_tag'] workflow['tags'] = [] update_response = self._update_workflow(workflow['id'], workflow).json() assert update_response['tags'] == []
[docs] def test_update_name(self): original_name = "test update name" workflow_object = self.workflow_populator.load_workflow(name=original_name) workflow_object["license"] = "AAL" upload_response = self.__test_upload(workflow=workflow_object, name=original_name) workflow = upload_response.json() workflow_id = workflow['id'] assert workflow['name'] == original_name workflow_dict = self.workflow_populator.download_workflow(workflow_id) assert workflow_dict["license"] == "AAL" data = {"name": "my cool new name"} update_response = self._update_workflow(workflow['id'], data).json() assert update_response['name'] == "my cool new name" workflow_dict = self.workflow_populator.download_workflow(workflow_id) assert workflow_dict["license"] == "AAL"
[docs] def test_refactor(self): workflow_id = self.workflow_populator.upload_yaml_workflow(""" class: GalaxyWorkflow inputs: test_input: data steps: first_cat: tool_id: cat in: input1: test_input """) actions = [ {"action_type": "update_step_label", "step": {"order_index": 0}, "label": "new_label"}, ] # perform refactoring as dry run refactor_response = self.workflow_populator.refactor_workflow(workflow_id, actions, dry_run=True) refactor_response.raise_for_status() assert refactor_response.json()["workflow"]["steps"]["0"]["label"] == "new_label" # perform refactoring as dry run but specify editor style response refactor_response = self.workflow_populator.refactor_workflow(workflow_id, actions, dry_run=True, style="editor") refactor_response.raise_for_status() assert refactor_response.json()["workflow"]["steps"]["0"]["label"] == "new_label" # download the original workflow and make sure the dry run didn't modify that label workflow_dict = self.workflow_populator.download_workflow(workflow_id) assert workflow_dict["steps"]["0"]["label"] == "test_input" refactor_response = self.workflow_populator.refactor_workflow(workflow_id, actions) refactor_response.raise_for_status() assert refactor_response.json()["workflow"]["steps"]["0"]["label"] == "new_label" # this time dry_run was default of False, so the label is indeed changed workflow_dict = self.workflow_populator.download_workflow(workflow_id) assert workflow_dict["steps"]["0"]["label"] == "new_label"
[docs] def test_update_no_tool_id(self): workflow_object = self.workflow_populator.load_workflow(name="test_import") upload_response = self.__test_upload(workflow=workflow_object) workflow_id = upload_response.json()["id"] del workflow_object["steps"]["2"]["tool_id"] put_response = self._update_workflow(workflow_id, workflow_object) self._assert_status_code_is(put_response, 400)
[docs] def test_update_missing_tool(self): # Create allows missing tools, update doesn't currently... workflow_object = self.workflow_populator.load_workflow(name="test_import") upload_response = self.__test_upload(workflow=workflow_object) workflow_id = upload_response.json()["id"] workflow_object["steps"]["2"]["tool_id"] = "cat-not-found" put_response = self._update_workflow(workflow_id, workflow_object) self._assert_status_code_is(put_response, 400)
[docs] def test_require_unique_step_uuids(self): workflow_dup_uuids = self.workflow_populator.load_workflow(name="test_import") uuid0 = str(uuid4()) for step_dict in workflow_dup_uuids["steps"].values(): step_dict["uuid"] = uuid0 response = self.workflow_populator.create_workflow_response(workflow_dup_uuids) self._assert_status_code_is(response, 400)
[docs] def test_require_unique_step_labels(self): workflow_dup_label = self.workflow_populator.load_workflow(name="test_import") for step_dict in workflow_dup_label["steps"].values(): step_dict["label"] = "my duplicated label" response = self.workflow_populator.create_workflow_response(workflow_dup_label) self._assert_status_code_is(response, 400)
[docs] def test_import_deprecated(self): workflow_id = self.workflow_populator.simple_workflow("test_import_published_deprecated", publish=True) with self._different_user(): other_import_response = self.__import_workflow(workflow_id) self._assert_status_code_is(other_import_response, 200) self._assert_user_has_workflow_with_name("imported: test_import_published_deprecated")
[docs] def test_import_export_dynamic(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: - type: input label: input1 - tool_id: cat1 label: first_cat state: input1: $link: 0 - label: embed1 run: class: GalaxyTool command: echo 'hello world 2' > $output1 outputs: output1: format: txt - tool_id: cat1 state: input1: $link: first_cat/out_file1 queries: input2: $link: embed1/output1 test_data: input1: "hello world" """) downloaded_workflow = self._download_workflow(workflow_id) # The _upload_yaml_workflow entry point uses an admin key, but if we try to # do the raw re-import as a regular user we expect a 403 error. response = self.workflow_populator.create_workflow_response(downloaded_workflow) self._assert_status_code_is(response, 403)
[docs] def test_import_annotations(self): workflow_id = self.workflow_populator.simple_workflow("test_import_annotations", publish=True) with self._different_user(): other_import_response = self.__import_workflow(workflow_id) self._assert_status_code_is(other_import_response, 200) # Test annotations preserved during upload and copied over during # import. other_id = other_import_response.json()["id"] imported_workflow = self._show_workflow(other_id) assert imported_workflow["annotation"] == "simple workflow" step_annotations = {step["annotation"] for step in imported_workflow["steps"].values()} assert "input1 description" in step_annotations
[docs] def test_import_subworkflows(self): def get_subworkflow_content_id(workflow_id): workflow_contents = self._download_workflow(workflow_id, style="editor") steps = workflow_contents['steps'] subworkflow_step = next(s for s in steps.values() if s["type"] == "subworkflow") return subworkflow_step['content_id'] workflow_id = self._upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE, publish=True) subworkflow_content_id = get_subworkflow_content_id(workflow_id) instance_response = self._get(f"workflows/{subworkflow_content_id}?instance=true") self._assert_status_code_is(instance_response, 200) subworkflow = instance_response.json() assert subworkflow['inputs']['0']['label'] == 'inner_input' assert subworkflow['name'] == 'Workflow' assert subworkflow['hidden'] with self._different_user(): other_import_response = self.__import_workflow(workflow_id) self._assert_status_code_is(other_import_response, 200) imported_workflow_id = other_import_response.json()["id"] imported_subworkflow_content_id = get_subworkflow_content_id(imported_workflow_id) assert subworkflow_content_id != imported_subworkflow_content_id
[docs] def test_subworkflow_inputs_optional_editor(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: nested_workflow: run: class: GalaxyWorkflow inputs: - id: inner_input optional: true outputs: - outputSource: inner_input/output steps: [] """) workflow_contents = self._download_workflow(workflow_id, style="editor") assert workflow_contents['steps']['0']['inputs'][0]['optional']
[docs] def test_not_importable_prevents_import(self): workflow_id = self.workflow_populator.simple_workflow("test_not_importportable") with self._different_user(): other_import_response = self.__import_workflow(workflow_id) self._assert_status_code_is(other_import_response, 403)
[docs] def test_anonymous_published(self): def anonymous_published_workflows(): workflows_url = self._api_url("workflows?show_published=True") return get(workflows_url).json() names = [w["name"] for w in anonymous_published_workflows()] assert "test published example" not in names workflow_id = self.workflow_populator.simple_workflow("test published example", publish=True) names = [w["name"] for w in anonymous_published_workflows()] assert "test published example" in names ids = [w["id"] for w in anonymous_published_workflows()] assert workflow_id in ids
[docs] def test_import_published(self): workflow_id = self.workflow_populator.simple_workflow("test_import_published", publish=True) with self._different_user(): other_import_response = self.__import_workflow(workflow_id, deprecated_route=True) self._assert_status_code_is(other_import_response, 200) self._assert_user_has_workflow_with_name("imported: test_import_published")
[docs] def test_export(self): uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export") downloaded_workflow = self._download_workflow(uploaded_workflow_id) assert downloaded_workflow["name"] == "test_for_export" steps = downloaded_workflow["steps"] assert len(steps) == 3 assert "0" in steps first_step = steps["0"] self._assert_has_keys(first_step, "inputs", "outputs") inputs = first_step["inputs"] assert len(inputs) > 0, first_step first_input = inputs[0] assert first_input["name"] == "WorkflowInput1" assert first_input["description"] == "input1 description" self._assert_has_keys(downloaded_workflow, "a_galaxy_workflow", "format-version", "annotation", "uuid", "steps") for step in downloaded_workflow["steps"].values(): self._assert_has_keys( step, 'id', 'type', 'tool_id', 'tool_version', 'name', 'tool_state', 'annotation', 'inputs', 'workflow_outputs', 'outputs' ) if step['type'] == "tool": self._assert_has_keys(step, "post_job_actions")
[docs] def test_export_format2(self): uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export_format2") downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="format2") assert downloaded_workflow["class"] == "GalaxyWorkflow"
[docs] def test_export_editor(self): uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export") downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="editor") self._assert_has_keys(downloaded_workflow, "name", "steps", "upgrade_messages") for step in downloaded_workflow["steps"].values(): self._assert_has_keys( step, 'id', 'type', 'content_id', 'name', 'tool_state', 'tooltip', 'inputs', 'outputs', 'config_form', 'annotation', 'post_job_actions', 'workflow_outputs', 'uuid', 'label', )
[docs] @skip_without_tool('output_filter_with_input') def test_export_editor_filtered_outputs(self): template = """ class: GalaxyWorkflow steps: - tool_id: output_filter_with_input state: produce_out_1: {produce_out_1} filter_text_1: {filter_text_1} produce_collection: false produce_paired_collection: false """ workflow_id = self._upload_yaml_workflow(template.format(produce_out_1='false', filter_text_1='false')) downloaded_workflow = self._download_workflow(workflow_id, style="editor") outputs = downloaded_workflow['steps']['0']['outputs'] assert len(outputs) == 1 assert outputs[0]['name'] == 'out_3' workflow_id = self._upload_yaml_workflow(template.format(produce_out_1='true', filter_text_1='false')) downloaded_workflow = self._download_workflow(workflow_id, style="editor") outputs = downloaded_workflow['steps']['0']['outputs'] assert len(outputs) == 2 assert outputs[0]['name'] == 'out_1' assert outputs[1]['name'] == 'out_3' workflow_id = self._upload_yaml_workflow(template.format(produce_out_1='true', filter_text_1='foo')) downloaded_workflow = self._download_workflow(workflow_id, style="editor") outputs = downloaded_workflow['steps']['0']['outputs'] assert len(outputs) == 3 assert outputs[0]['name'] == 'out_1' assert outputs[1]['name'] == 'out_2' assert outputs[2]['name'] == 'out_3'
[docs] @skip_without_tool('output_filter_exception_1') def test_export_editor_filtered_outputs_exception_handling(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: - tool_id: output_filter_exception_1 """) downloaded_workflow = self._download_workflow(workflow_id, style="editor") outputs = downloaded_workflow['steps']['0']['outputs'] assert len(outputs) == 2 assert outputs[0]['name'] == 'out_1' assert outputs[1]['name'] == 'out_2'
[docs] @skip_without_tool('collection_type_source') def test_export_editor_collection_type_source(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: - id: text_input1 type: collection collection_type: "list:paired" steps: - tool_id: collection_type_source in: input_collect: text_input1 """) downloaded_workflow = self._download_workflow(workflow_id, style="editor") steps = downloaded_workflow['steps'] assert len(steps) == 2 # Non-subworkflow collection_type_source tools will be handled by the client, # so collection_type should be None here. assert steps['1']['outputs'][0]['collection_type'] is None
[docs] @skip_without_tool('collection_type_source') def test_export_editor_subworkflow_collection_type_source(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: outer_input: data steps: inner_workflow: run: class: GalaxyWorkflow inputs: inner_input: type: collection collection_type: "list:paired" outputs: workflow_output: outputSource: collection_type_source/list_output steps: collection_type_source: tool_id: collection_type_source in: input_collect: inner_input in: inner_input: outer_input """) downloaded_workflow = self._download_workflow(workflow_id, style="editor") steps = downloaded_workflow['steps'] assert len(steps) == 2 assert steps['1']['type'] == 'subworkflow' assert steps['1']['outputs'][0]['collection_type'] == 'list:paired'
[docs] def test_import_missing_tool(self): workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_missing_tool") workflow_id = self.workflow_populator.create_workflow(workflow) workflow_description = self._show_workflow(workflow_id) steps = workflow_description["steps"] missing_tool_steps = [v for v in steps.values() if v['tool_id'] == 'cat_missing_tool'] assert len(missing_tool_steps) == 1
[docs] def test_import_no_tool_id(self): # Import works with missing tools, but not with absent content/tool id. workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_missing_tool") del workflow["steps"]["2"]["tool_id"] create_response = self.__test_upload(workflow=workflow, assert_ok=False) self._assert_status_code_is(create_response, 400)
[docs] def test_import_export_with_runtime_inputs(self): workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_with_runtime_input") workflow_id = self.workflow_populator.create_workflow(workflow) downloaded_workflow = self._download_workflow(workflow_id) assert len(downloaded_workflow["steps"]) == 2 runtime_step = downloaded_workflow["steps"]["1"] for runtime_input in runtime_step["inputs"]: if runtime_input["name"] == "num_lines": break assert runtime_input["description"].startswith("runtime parameter for tool") tool_state = json.loads(runtime_step["tool_state"]) assert "num_lines" in tool_state self._assert_is_runtime_input(tool_state["num_lines"])
[docs] @skip_without_tool("cat1") def test_run_workflow_by_index(self): self.__run_cat_workflow(inputs_by='step_index')
[docs] @skip_without_tool("cat1") def test_run_workflow_by_uuid(self): self.__run_cat_workflow(inputs_by='step_uuid')
[docs] @skip_without_tool("cat1") def test_run_workflow_by_uuid_implicitly(self): self.__run_cat_workflow(inputs_by='uuid_implicitly')
[docs] @skip_without_tool("cat1") def test_run_workflow_by_name(self): self.__run_cat_workflow(inputs_by='name')
[docs] @skip_without_tool("cat1") def test_run_workflow(self): self.__run_cat_workflow(inputs_by='step_id')
[docs] @skip_without_tool("multiple_versions") def test_run_versioned_tools(self): with self.dataset_populator.test_history() as history_01_id: workflow_version_01 = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: multiple: tool_id: multiple_versions tool_version: "0.1" state: inttest: 0 """) self.workflow_populator.invoke_workflow_and_wait(workflow_version_01, history_id=history_01_id) with self.dataset_populator.test_history() as history_02_id: workflow_version_02 = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: multiple: tool_id: multiple_versions tool_version: "0.2" state: inttest: 1 """) self.workflow_populator.invoke_workflow_and_wait(workflow_version_02, history_id=history_02_id)
def __run_cat_workflow(self, inputs_by): workflow = self.workflow_populator.load_workflow(name="test_for_run") workflow["steps"]["0"]["uuid"] = str(uuid4()) workflow["steps"]["1"]["uuid"] = str(uuid4()) workflow_request, _, workflow_id = self._setup_workflow_run(workflow, inputs_by=inputs_by) invocation_id = self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) invocation = self._invocation_details(workflow_id, invocation_id) assert invocation["state"] == "scheduled", invocation
[docs] @skip_without_tool("collection_creates_pair") def test_workflow_run_output_collections(self) -> None: with self.dataset_populator.test_history() as history_id: self._run_workflow(WORKFLOW_WITH_OUTPUT_COLLECTION, history_id=history_id) self.assertEqual("a\nc\nb\nd\n", self.dataset_populator.get_history_dataset_content(history_id, hid=0))
[docs] @skip_without_tool("job_properties") @skip_without_tool("identifier_multiple_in_conditional") def test_workflow_resume_from_failed_step(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: job_props: tool_id: job_properties state: thebool: true failbool: true identifier: tool_id: identifier_multiple_in_conditional state: outer_cond: cond_param_outer: true inner_cond: cond_param_inner: true input1: $link: 0/out_file1 thedata: null cat: tool_id: cat1 in: input1: identifier/output1 queries_0|input2: identifier/output1 """) with self.dataset_populator.test_history() as history_id: self.workflow_populator.invoke_workflow_and_wait(workflow_id, history_id=history_id, assert_ok=False) failed_dataset_one = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False) assert failed_dataset_one['state'] == 'error', failed_dataset_one paused_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False) assert paused_dataset['state'] == 'paused', paused_dataset inputs = {"thebool": "false", "failbool": "false", "rerun_remap_job_id": failed_dataset_one['creating_job']} self.dataset_populator.run_tool( tool_id='job_properties', inputs=inputs, history_id=history_id, ) unpaused_dataset_1 = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False) assert unpaused_dataset_1['state'] == 'ok' self.dataset_populator.wait_for_history(history_id, assert_ok=False) unpaused_dataset_2 = self.dataset_populator.get_history_dataset_details(history_id, hid=6, wait=True, assert_ok=False) assert unpaused_dataset_2['state'] == 'ok'
[docs] @skip_without_tool("job_properties") @skip_without_tool("collection_creates_list") def test_workflow_resume_from_failed_step_with_hdca_input(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: job_props: tool_id: job_properties state: thebool: true failbool: true list_in_list_out: tool_id: collection_creates_list in: input1: job_props/list_output identifier: tool_id: identifier_collection in: input1: list_in_list_out/list_output """) with self.dataset_populator.test_history() as history_id: invocation_id = self.__invoke_workflow(workflow_id, history_id=history_id) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=False) failed_dataset_one = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False) assert failed_dataset_one['state'] == 'error', failed_dataset_one paused_colletion = self.dataset_populator.get_history_collection_details(history_id, hid=7, wait=True, assert_ok=False) first_paused_element = paused_colletion['elements'][0]['object'] assert first_paused_element['state'] == 'paused', first_paused_element dependent_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=8, wait=True, assert_ok=False) assert dependent_dataset['state'] == 'paused' inputs = {"thebool": "false", "failbool": "false", "rerun_remap_job_id": failed_dataset_one['creating_job']} self.dataset_populator.run_tool( tool_id='job_properties', inputs=inputs, history_id=history_id, ) paused_colletion = self.dataset_populator.get_history_collection_details(history_id, hid=7, wait=True, assert_ok=False) first_paused_element = paused_colletion['elements'][0]['object'] assert first_paused_element['state'] == 'ok' self.dataset_populator.wait_for_history(history_id, assert_ok=False) dependent_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=8, wait=True, assert_ok=False) assert dependent_dataset['name'].startswith('identifier_collection') assert dependent_dataset['state'] == 'ok'
[docs] @skip_without_tool("fail_identifier") @skip_without_tool("identifier_collection") def test_workflow_resume_with_mapped_over_input(self): with self.dataset_populator.test_history() as history_id: self._run_workflow(""" class: GalaxyWorkflow inputs: input_datasets: collection steps: fail_identifier_1: tool_id: fail_identifier state: failbool: true in: input1: input_datasets identifier: tool_id: identifier_collection in: input1: fail_identifier_1/out_file1 test_data: input_datasets: collection_type: list elements: - identifier: fail value: 1.fastq type: File - identifier: success value: 1.fastq type: File """, history_id=history_id, assert_ok=False, wait=True) history_contents = self.dataset_populator._get_contents_request(history_id=history_id).json() first_input = history_contents[1] assert first_input['history_content_type'] == 'dataset' paused_dataset = history_contents[-1] failed_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, assert_ok=False) assert paused_dataset['state'] == 'paused', paused_dataset assert failed_dataset['state'] == 'error', failed_dataset inputs = {"input1": {'values': [{'src': 'hda', 'id': first_input['id']}] }, "failbool": "false", "rerun_remap_job_id": failed_dataset['creating_job']} run_dict = self.dataset_populator.run_tool( tool_id='fail_identifier', inputs=inputs, history_id=history_id, ) unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False) assert unpaused_dataset['state'] == 'ok' contents = self.dataset_populator.get_history_dataset_content(history_id, hid=7, assert_ok=False) assert contents == 'fail\nsuccess\n', contents replaced_hda_id = run_dict['outputs'][0]['id'] replaced_hda = self.dataset_populator.get_history_dataset_details(history_id, dataset_id=replaced_hda_id, wait=True, assert_ok=False) assert not replaced_hda['visible'], replaced_hda
[docs] def test_workflow_resume_with_mapped_over_collection_input(self): # Test that replacement and resume also works if the failed job re-run works on a input DCE with self.dataset_populator.test_history() as history_id: job_summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input_collection: collection steps: - tool_id: collection_creates_list_of_pairs state: failbool: true in: input1: source: input_collection - tool_id: collection_creates_list_of_pairs state: failbool: false in: input1: source: 1/list_output test_data: input_collection: collection_type: "list:list:paired" """, history_id=history_id, assert_ok=False, wait=True) invocation = self.workflow_populator.get_invocation(job_summary.invocation_id, step_details=True) # TODO: return steps sorted by order_index ? Why don't we do that ?? invocation['steps'].sort(key=lambda step: step['order_index']) failed_step = invocation['steps'][1] assert failed_step['jobs'][0]['state'] == 'error' failed_hdca_id = failed_step['output_collections']['list_output']['id'] failed_hdca = self.dataset_populator.get_history_collection_details(history_id=history_id, content_id=failed_hdca_id, assert_ok=False) assert failed_hdca['elements'][0]['object']['elements'][0]['object']['elements'][0]['object']['state'] == 'error' paused_step = invocation['steps'][2] # job not created, input in error state assert paused_step['jobs'][0]['state'] == 'paused' input_hdca = self.dataset_populator.get_history_collection_details(history_id=history_id, content_id=job_summary.inputs['input_collection']['id'], assert_ok=False) # now re-run errored job inputs = {"input1": {'values': [{'src': 'dce', 'id': input_hdca['elements'][0]['id']}] }, "failbool": "false", "rerun_remap_job_id": failed_step['jobs'][0]['id']} run_response = self.dataset_populator.run_tool( tool_id='collection_creates_list_of_pairs', inputs=inputs, history_id=history_id, ) assert not run_response["output_collections"][0]["visible"] self.dataset_populator.wait_for_job(paused_step['jobs'][0]['id']) invocation = self.workflow_populator.get_invocation(job_summary.invocation_id, step_details=True) rerun_step = invocation['steps'][1] assert rerun_step['jobs'][0]['state'] == 'ok' replaced_hdca = self.dataset_populator.get_history_collection_details(history_id=history_id, content_id=failed_hdca_id, assert_ok=False) assert replaced_hdca['elements'][0]['object']['elements'][0]['object']['elements'][0]['object']['state'] == 'ok'
[docs] @skip_without_tool('multi_data_optional') def test_workflow_list_list_multi_data_map_over(self): # Test that a list:list is reduced to list with a multiple="true" data input with self.dataset_populator.test_history() as history_id: workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: input_datasets: collection steps: multi_data_optional: tool_id: multi_data_optional in: input1: input_datasets """) with self.dataset_populator.test_history() as history_id: hdca_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id).json() self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { '0': self._ds_entry(hdca_id), } invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) output_collection = self.dataset_populator.get_history_collection_details(history_id, hid=6) assert output_collection['collection_type'] == 'list' assert output_collection['job_source_type'] == 'ImplicitCollectionJobs'
[docs] @skip_without_tool("cat_list") @skip_without_tool("collection_creates_pair") def test_workflow_run_output_collection_mapping(self): workflow_id = self._upload_yaml_workflow(WORKFLOW_WITH_OUTPUT_COLLECTION_MAPPING) with self.dataset_populator.test_history() as history_id: hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=["a\nb\nc\nd\n", "e\nf\ng\nh\n"]).json() self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { '0': self._ds_entry(hdca1), } invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) self.assertEqual("a\nc\nb\nd\ne\ng\nf\nh\n", self.dataset_populator.get_history_dataset_content(history_id, hid=0))
[docs] @skip_without_tool("cat_list") @skip_without_tool("collection_split_on_column") def test_workflow_run_dynamic_output_collections(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION, history_id=history_id, assert_ok=True, wait=True) details = self.dataset_populator.get_history_dataset_details(history_id, hid=0) last_item_hid = details["hid"] assert last_item_hid == 7, f"Expected 7 history items, got {last_item_hid}" content = self.dataset_populator.get_history_dataset_content(history_id, hid=0) self.assertEqual("10.0\n30.0\n20.0\n40.0\n", content)
[docs] @skip_without_tool("collection_split_on_column") @skip_without_tool("min_repeat") def test_workflow_run_dynamic_output_collections_2(self): # A more advanced output collection workflow, testing regression of # https://github.com/galaxyproject/galaxy/issues/776 with self.dataset_populator.test_history() as history_id: workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: test_input_1: data test_input_2: data test_input_3: data steps: split_up: tool_id: collection_split_on_column in: input1: test_input_2 min_repeat: tool_id: min_repeat in: queries_0|input: test_input_1 queries2_0|input2: split_up/split_output """) hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n") hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t20.0\nsamp2\t40.0\n") hda3 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t60.0\n") self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { '0': self._ds_entry(hda1), '1': self._ds_entry(hda2), '2': self._ds_entry(hda3), } invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=7) assert collection_details["populated_state"] == "ok" content = self.dataset_populator.get_history_dataset_content(history_id, hid=11) self.assertEqual(content.strip(), "samp1\t10.0\nsamp2\t20.0")
[docs] @skip_without_tool("cat") @skip_without_tool("collection_split_on_column") def test_workflow_run_dynamic_output_collections_3(self): # Test a workflow that create a list:list:list followed by a mapping step. with self.dataset_populator.test_history() as history_id: workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: text_input1: data text_input2: data steps: cat_inputs: tool_id: cat1 in: input1: text_input1 queries_0|input2: text_input2 split_up_1: tool_id: collection_split_on_column in: input1: cat_inputs/out_file1 split_up_2: tool_id: collection_split_on_column in: input1: split_up_1/split_output cat_output: tool_id: cat in: input1: split_up_2/split_output """) hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n") hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t40.0\n") self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { '0': self._ds_entry(hda1), '1': self._ds_entry(hda2), } invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
[docs] @skip_without_tool("cat1") @skip_without_tool("__FLATTEN__") def test_workflow_input_tags(self): workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_with_input_tags") workflow_id = self.workflow_populator.create_workflow(workflow) downloaded_workflow = self._download_workflow(workflow_id) count = 0 tag_test = ["tag1", "tag2"] for step in downloaded_workflow["steps"]: current = json.loads(downloaded_workflow["steps"][step]["tool_state"]) assert current["tag"] == tag_test[count] count += 1
[docs] @skip_without_tool('column_param') def test_empty_file_data_column_specified(self): # Regression test for https://github.com/galaxyproject/galaxy/pull/10981 with self.dataset_populator.test_history() as history_id: self._run_jobs("""class: GalaxyWorkflow steps: empty_output: tool_id: empty_output outputs: out_file1: change_datatype: tabular column_param: tool_id: column_param in: input1: empty_output/out_file1 state: col: 2 col_names: 'B' """, history_id=history_id)
[docs] @skip_without_tool('column_param_list') def test_comma_separated_columns(self): # Regression test for https://github.com/galaxyproject/galaxy/pull/10981 with self.dataset_populator.test_history() as history_id: self._run_jobs("""class: GalaxyWorkflow steps: empty_output: tool_id: empty_output outputs: out_file1: change_datatype: tabular column_param_list: tool_id: column_param_list in: input1: empty_output/out_file1 state: col: '2,3' col_names: 'B' """, history_id=history_id)
[docs] @skip_without_tool('column_param') def test_runtime_data_column_parameter(self): with self.dataset_populator.test_history() as history_id: self._run_jobs("""class: GalaxyWorkflow inputs: bed_input: data steps: cat1: tool_id: cat1 in: input1: bed_input column_param_list: tool_id: column_param in: input1: cat1/out_file1 state: col: 9 col_names: notacolumn test_data: step_parameters: '2': 'col': 1 'col_names': 'c1: chr1' bed_input: value: 1.bed file_type: bed type: File """, history_id=history_id)
[docs] @skip_without_tool("mapper") @skip_without_tool("pileup") def test_workflow_metadata_validation_0(self): # Testing regression of # https://github.com/galaxyproject/galaxy/issues/1514 with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input_fastqs: collection reference: data steps: map_over_mapper: tool_id: mapper in: input1: input_fastqs reference: reference pileup: tool_id: pileup in: input1: map_over_mapper/out_file1 reference: reference test_data: input_fastqs: collection_type: list elements: - identifier: samp1 value: 1.fastq type: File - identifier: samp2 value: 1.fastq type: File reference: value: 1.fasta type: File """, history_id=history_id)
[docs] def test_run_subworkflow_simple(self) -> None: with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(WORKFLOW_NESTED_SIMPLE, test_data=""" outer_input: value: 1.bed type: File """, history_id=history_id) invocation_id = summary.invocation_id content = self.dataset_populator.get_history_dataset_content(history_id) self.assertEqual("chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", content) steps = self.workflow_populator.get_invocation(invocation_id)['steps'] assert sum(1 for step in steps if step['subworkflow_invocation_id'] is None) == 3 subworkflow_invocation_id = [step['subworkflow_invocation_id'] for step in steps if step['subworkflow_invocation_id']][0] subworkflow_invocation = self.workflow_populator.get_invocation(subworkflow_invocation_id) assert [step for step in subworkflow_invocation['steps'] if step['order_index'] == 0][0]['workflow_step_label'] == 'inner_input' assert [step for step in subworkflow_invocation['steps'] if step['order_index'] == 1][0]['workflow_step_label'] == 'random_lines' bco = self.workflow_populator.get_biocompute_object(invocation_id) self.workflow_populator.validate_biocompute_object(bco)
[docs] @skip_without_tool("random_lines1") def test_run_subworkflow_runtime_parameters(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_NESTED_RUNTIME_PARAMETER, test_data=""" step_parameters: '1': '1|num_lines': 2 outer_input: value: 1.bed type: File """, history_id=history_id) content = self.dataset_populator.get_history_dataset_content(history_id) assert len([x for x in content.split("\n") if x]) == 2
[docs] @skip_without_tool("cat") def test_run_subworkflow_replacement_parameters(self): with self.dataset_populator.test_history() as history_id: test_data = """ replacement_parameters: replaceme: moocow outer_input: value: 1.bed type: File """ self._run_jobs(WORKFLOW_NESTED_REPLACEMENT_PARAMETER, test_data=test_data, history_id=history_id) details = self.dataset_populator.get_history_dataset_details(history_id) assert details["name"] == "moocow suffix"
[docs] @skip_without_tool("create_2") def test_placements_from_text_inputs(self): with self.dataset_populator.test_history() as history_id: run_def = """ class: GalaxyWorkflow inputs: [] steps: create_2: tool_id: create_2 state: sleep_time: 0 outputs: out_file1: rename: "${replaceme} name" out_file2: rename: "${replaceme} name 2" test_data: replacement_parameters: replaceme: moocow """ self._run_jobs(run_def, history_id=history_id) details = self.dataset_populator.get_history_dataset_details(history_id) assert details["name"] == "moocow name 2" run_def = """ class: GalaxyWorkflow inputs: replaceme: text steps: create_2: tool_id: create_2 state: sleep_time: 0 outputs: out_file1: rename: "${replaceme} name" out_file2: rename: "${replaceme} name 2" test_data: replaceme: value: moocow type: raw """ self._run_jobs(run_def, history_id=history_id) details = self.dataset_populator.get_history_dataset_details(history_id) assert details["name"] == "moocow name 2", details["name"]
[docs] @skip_without_tool("random_lines1") def test_run_runtime_parameters_after_pause(self): with self.dataset_populator.test_history() as history_id: workflow_run_description = f"""{WORKFLOW_RUNTIME_PARAMETER_AFTER_PAUSE} test_data: step_parameters: '2': 'num_lines': 2 input1: value: 1.bed type: File """ job_summary = self._run_workflow(workflow_run_description, history_id=history_id, wait=False) uploaded_workflow_id, invocation_id = job_summary.workflow_id, job_summary.invocation_id # Wait for at least one scheduling step. self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id) # Make sure the history didn't enter a failed state in there. self.dataset_populator.wait_for_history(history_id, assert_ok=True) # Assert the workflow hasn't finished scheduling, we can be pretty sure we # are at the pause step in this case then. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) # Review the paused steps to allow the workflow to continue. self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=1, action=True) # Wait for the workflow to finish scheduling and ensure both the invocation # and the history are in valid states. invocation_scheduled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'scheduled') assert invocation_scheduled, "Workflow state is not scheduled..." self.dataset_populator.wait_for_history(history_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert len([x for x in content.split("\n") if x]) == 2
[docs] def test_run_subworkflow_auto_labels(self): def run_test(workflow_text): with self.dataset_populator.test_history() as history_id: test_data = """ outer_input: value: 1.bed type: File """ summary = self._run_workflow(workflow_text, test_data=test_data, history_id=history_id) jobs = summary.jobs assert len(jobs) == 4, "4 jobs expected, got %d jobs" % len(jobs) content = self.dataset_populator.get_history_dataset_content(history_id) self.assertEqual( "chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", content) run_test(NESTED_WORKFLOW_AUTO_LABELS_MODERN_SYNTAX)
[docs] @skip_without_tool("cat1") @skip_without_tool("collection_paired_test") def test_workflow_run_zip_collections(self): with self.dataset_populator.test_history() as history_id: workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: test_input_1: data test_input_2: data steps: first_cat: tool_id: cat1 in: input1: test_input_1 zip_it: tool_id: "__ZIP_COLLECTION__" in: input_forward: first_cat/out_file1 input_reverse: test_input_2 concat_pair: tool_id: collection_paired_test in: f1: zip_it/output """) hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n") hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t20.0\nsamp2\t40.0\n") self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { '0': self._ds_entry(hda1), '1': self._ds_entry(hda2), } invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) content = self.dataset_populator.get_history_dataset_content(history_id) self.assertEqual(content.strip(), "samp1\t10.0\nsamp2\t20.0\nsamp1\t20.0\nsamp2\t40.0")
[docs] @skip_without_tool("collection_paired_test") def test_workflow_flatten(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow steps: nested: tool_id: collection_creates_dynamic_nested state: sleep_time: 0 foo: 'dummy' flatten: tool_id: '__FLATTEN__' state: input: $link: nested/list_output join_identifier: '-' """, test_data={}, history_id=history_id) details = self.dataset_populator.get_history_collection_details(history_id, hid=14) assert details['collection_type'] == "list" elements = details["elements"] identifiers = [e['element_identifier'] for e in elements] assert len(identifiers) == 6 assert "oe1-ie1" in identifiers
[docs] @skip_without_tool("collection_paired_test") def test_workflow_flatten_with_mapped_over_execution(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(r""" class: GalaxyWorkflow inputs: input_fastqs: collection steps: split_up: tool_id: collection_split_on_column in: input1: input_fastqs flatten: tool_id: '__FLATTEN__' in: input: split_up/split_output join_identifier: '-' test_data: input_fastqs: collection_type: list elements: - identifier: samp1 content: "0\n1" """, history_id=history_id) history = self._get(f'histories/{history_id}/contents').json() flattened_collection = history[-1] assert flattened_collection['history_content_type'] == 'dataset_collection' assert flattened_collection['collection_type'] == 'list' assert flattened_collection['element_count'] == 2 nested_collection = self.dataset_populator.get_history_collection_details(history_id, hid=3) assert nested_collection['collection_type'] == 'list:list' assert nested_collection['element_count'] == 1 assert nested_collection['elements'][0]['object']['populated'] assert nested_collection['elements'][0]['object']['element_count'] == 2
[docs] @skip_without_tool("cat") def test_workflow_invocation_report_1(self): test_data = """ input_1: value: 1.bed type: File """ with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input_1: data outputs: output_1: outputSource: first_cat/out_file1 steps: first_cat: tool_id: cat in: input1: input_1 """, test_data=test_data, history_id=history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id report_json = self.workflow_populator.workflow_report_json(workflow_id, invocation_id) assert "markdown" in report_json self._assert_has_keys(report_json, "markdown", "render_format") assert report_json["render_format"] == "markdown" markdown_content = report_json["markdown"] assert "## Workflow Outputs" in markdown_content assert "## Workflow Inputs" in markdown_content assert "## About This Report" not in markdown_content
[docs] @skip_without_tool("cat") def test_workflow_invocation_report_custom(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow( WORKFLOW_WITH_CUSTOM_REPORT_1, test_data=WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA, history_id=history_id ) workflow_id = summary.workflow_id invocation_id = summary.invocation_id downloaded_workflow = self._download_workflow(workflow_id) assert "report" in downloaded_workflow report_config = downloaded_workflow["report"] assert "markdown" in report_config report_json = self.workflow_populator.workflow_report_json(workflow_id, invocation_id) assert "markdown" in report_json, f"markdown not in report json {report_json}" self._assert_has_keys(report_json, "markdown", "render_format") assert report_json["render_format"] == "markdown" markdown_content = report_json["markdown"] assert "## Workflow Outputs" in markdown_content assert "\n```galaxy\nhistory_dataset_display(history_dataset_id=" in markdown_content assert "## Workflow Inputs" in markdown_content assert "## About This Report" in markdown_content
[docs] @skip_without_tool("cat1") def test_export_invocation_bco(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id) invocation_id = summary.invocation_id bco = self.workflow_populator.get_biocompute_object(invocation_id) self.workflow_populator.validate_biocompute_object(bco) self.assertEqual(bco['provenance_domain']['name'], "Simple Workflow")
[docs] @skip_without_tool("__APPLY_RULES__") def test_workflow_run_apply_rules(self): with self.dataset_populator.test_history() as history_id: self._run_workflow(WORKFLOW_WITH_RULES_1, history_id=history_id, wait=True, assert_ok=True, round_trip_format_conversion=True) output_content = self.dataset_populator.get_history_collection_details(history_id, hid=6) rules_test_data.check_example_2(output_content, self.dataset_populator)
[docs] def test_filter_failed_mapping(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input_c: collection steps: mixed_collection: tool_id: exit_code_from_file state: input: $link: input_c filtered_collection: tool_id: "__FILTER_FAILED_DATASETS__" state: input: $link: mixed_collection/out_file1 cat: tool_id: cat1 state: input1: $link: filtered_collection """, test_data=""" input_c: collection_type: list elements: - identifier: i1 content: "0" - identifier: i2 content: "1" """, history_id=history_id, wait=True, assert_ok=False) jobs = summary.jobs def filter_jobs_by_tool(tool_id): return [j for j in summary.jobs if j["tool_id"] == tool_id] assert len(filter_jobs_by_tool("__DATA_FETCH__")) == 1, jobs assert len(filter_jobs_by_tool("exit_code_from_file")) == 2, jobs assert len(filter_jobs_by_tool("__FILTER_FAILED_DATASETS__")) == 1, jobs # Follow proves one job was filtered out of the result of cat1 assert len(filter_jobs_by_tool("cat1")) == 1, jobs
[docs] def test_workflow_request(self): workflow = self.workflow_populator.load_workflow(name="test_for_queue") workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) run_workflow_response = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True) invocation_id = run_workflow_response.json()["id"] self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
[docs] def test_workflow_new_autocreated_history(self): workflow = self.workflow_populator.load_workflow(name="test_for_new_autocreated_history") workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) del workflow_request['history'] # Not passing a history param means asking for a new history to be automatically created run_workflow_dict = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True).json() new_history_id = run_workflow_dict["history_id"] assert history_id != new_history_id invocation_id = run_workflow_dict["id"] self.workflow_populator.wait_for_invocation_and_jobs(new_history_id, workflow_id, invocation_id)
[docs] def test_workflow_output_dataset(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 0 assert len(invocation["outputs"]) == 1 output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=invocation["outputs"]["wf_output_1"]["id"]) assert "hello world" == output_content.strip()
[docs] @skip_without_tool("cat") def test_workflow_output_dataset_collection(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow_with_output_collections(history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 1 assert len(invocation["outputs"]) == 0 output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"]) self._assert_has_keys(output_content, "id", "elements") assert output_content["collection_type"] == "list" elements = output_content["elements"] assert len(elements) == 1 elements0 = elements[0] assert elements0["element_identifier"] == "el1"
def _run_workflow_with_output_collections(self, history_id) -> RunJobsSummary: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input1: type: data_collection_input collection_type: list outputs: wf_output_1: outputSource: first_cat/out_file1 steps: first_cat: tool_id: cat in: input1: input1 """, test_data=""" input1: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File """, history_id=history_id, round_trip_format_conversion=True) return summary def _run_workflow_with_inputs_as_outputs(self, history_id) -> RunJobsSummary: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input1: data text_input: text outputs: wf_output_1: outputSource: input1 wf_output_param: outputSource: text_input steps: [] """, test_data={"input1": "hello world", "text_input": {"value": "A text variable", "type": "raw"}}, history_id=history_id) return summary
[docs] def test_workflow_input_as_output(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow_with_inputs_as_outputs(history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 0 assert len(invocation["outputs"]) == 1 assert len(invocation["output_values"]) == 1 assert "wf_output_param" in invocation["output_values"] assert invocation["output_values"]["wf_output_param"] == "A text variable", invocation["output_values"] output_content = self.dataset_populator.get_history_dataset_content(history_id, content_id=invocation["outputs"]["wf_output_1"]["id"]) assert output_content == "hello world\n"
[docs] def test_subworkflow_output_as_output(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input1: data outputs: wf_output_1: outputSource: nested_workflow/inner_output steps: nested_workflow: run: class: GalaxyWorkflow inputs: inner_input: data outputs: inner_output: outputSource: inner_input steps: [] in: inner_input: input1 """, test_data={"input1": "hello world"}, history_id=history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 0 assert len(invocation["outputs"]) == 1 output_content = self.dataset_populator.get_history_dataset_content(history_id, content_id=invocation["outputs"]["wf_output_1"]["id"]) assert output_content == "hello world\n"
[docs] @skip_without_tool("cat") def test_workflow_input_mapping(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input1: data outputs: wf_output_1: outputSource: first_cat/out_file1 steps: first_cat: tool_id: cat in: input1: input1 """, test_data=""" input1: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File - identifier: el2 value: 1.fastq type: File """, history_id=history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 1 assert len(invocation["outputs"]) == 0 output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"]) self._assert_has_keys(output_content, "id", "elements") elements = output_content["elements"] assert len(elements) == 2 elements0 = elements[0] assert elements0["element_identifier"] == "el1"
[docs] @skip_without_tool("collection_creates_pair") def test_workflow_run_input_mapping_with_output_collections(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow(""" class: GalaxyWorkflow inputs: text_input: data outputs: wf_output_1: outputSource: split_up/paired_output steps: split_up: tool_id: collection_creates_pair in: input1: text_input """, test_data=""" text_input: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File - identifier: el2 value: 1.fastq type: File """, history_id=history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 1 assert len(invocation["outputs"]) == 0 output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"]) self._assert_has_keys(output_content, "id", "elements") assert output_content["collection_type"] == "list:paired", output_content elements = output_content["elements"] assert len(elements) == 2 elements0 = elements[0] assert elements0["element_identifier"] == "el1" self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) jobs_summary_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}/jobs_summary") self._assert_status_code_is(jobs_summary_response, 200) jobs_summary = jobs_summary_response.json() assert 'states' in jobs_summary invocation_states = jobs_summary['states'] assert invocation_states and 'ok' in invocation_states, jobs_summary assert invocation_states['ok'] == 2, jobs_summary assert jobs_summary['model'] == 'WorkflowInvocation', jobs_summary jobs_summary_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}/step_jobs_summary") self._assert_status_code_is(jobs_summary_response, 200) jobs_summary = jobs_summary_response.json() assert len(jobs_summary) == 1 collection_summary = jobs_summary[0] assert 'states' in collection_summary collection_states = collection_summary['states'] assert collection_states and 'ok' in collection_states, collection_states assert collection_states['ok'] == 2, collection_summary assert collection_summary['model'] == 'ImplicitCollectionJobs', collection_summary
[docs] def test_workflow_run_input_mapping_with_subworkflows(self): with self.dataset_populator.test_history() as history_id: test_data = """ outer_input: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File - identifier: el2 value: 1.fastq type: File """ summary = self._run_workflow(WORKFLOW_NESTED_SIMPLE, test_data=test_data, history_id=history_id) workflow_id = summary.workflow_id invocation_id = summary.invocation_id invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}") self._assert_status_code_is(invocation_response, 200) invocation = invocation_response.json() self._assert_has_keys(invocation, "id", "outputs", "output_collections") assert len(invocation["output_collections"]) == 1, invocation assert len(invocation["outputs"]) == 0 output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["outer_output"]["id"]) self._assert_has_keys(output_content, "id", "elements") assert output_content["collection_type"] == "list", output_content elements = output_content["elements"] assert len(elements) == 2 elements0 = elements[0] assert elements0["element_identifier"] == "el1"
[docs] @skip_without_tool("cat_list") @skip_without_tool("random_lines1") @skip_without_tool("split") def test_subworkflow_recover_mapping_1(self): # This test case tests an outer workflow continues to scheduling and handle # collection mapping properly after the last step of a subworkflow requires delayed # evaluation. Testing rescheduling and propagating connections within a subworkflow # is handled by the next test case. with self.dataset_populator.test_history() as history_id: self._run_workflow(""" class: GalaxyWorkflow inputs: outer_input: data outputs: outer_output: outputSource: second_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: outer_input nested_workflow: run: class: GalaxyWorkflow inputs: inner_input: data outputs: workflow_output: outputSource: random_lines/out_file1 steps: random_lines: tool_id: random_lines1 state: num_lines: 2 input: $link: inner_input seed_source: seed_source_selector: set_seed seed: asdf in: inner_input: first_cat/out_file1 split: tool_id: split in: input1: nested_workflow/workflow_output second_cat: tool_id: cat_list in: input1: split/output test_data: outer_input: value: 1.bed type: File """, history_id=history_id, wait=True, round_trip_format_conversion=True) self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
# self.assertEqual("chr16\t142908\t143003\tCCDS10397.1_cds_0_0_chr16_142909_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat_list") @skip_without_tool("random_lines1") @skip_without_tool("split") def test_subworkflow_recover_mapping_2(self): # Like the above test case, this test case tests an outer workflow continues to # schedule and handle collection mapping properly after a subworkflow needs to be # delayed, but this also tests recovering and handling scheduling within the subworkflow # since the delayed step (split) isn't the last step of the subworkflow. with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: outer_input: data outputs: outer_output: outputSource: second_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: outer_input nested_workflow: run: class: GalaxyWorkflow inputs: inner_input: data outputs: workflow_output: outputSource: inner_cat/out_file1 steps: random_lines: tool_id: random_lines1 in: input: inner_input num_lines: default: 2 seed_source|seed_source_selector: default: set_seed seed_source|seed: default: asdf split: tool_id: split in: input1: random_lines/out_file1 inner_cat: tool_id: cat1 in: input1: split/output in: inner_input: first_cat/out_file1 second_cat: tool_id: cat_list in: input1: nested_workflow/workflow_output """, test_data=""" outer_input: value: 1.bed type: File """, history_id=history_id, wait=True, round_trip_format_conversion=True) self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat_list") @skip_without_tool("random_lines1") @skip_without_tool("split") def test_recover_mapping_in_subworkflow(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: outer_input: data outputs: outer_output: outputSource: second_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: outer_input nested_workflow: run: class: GalaxyWorkflow inputs: inner_input: data outputs: workflow_output: outputSource: split/output steps: random_lines: tool_id: random_lines1 state: num_lines: 2 input: $link: inner_input seed_source: seed_source_selector: set_seed seed: asdf split: tool_id: split in: input1: random_lines/out_file1 in: inner_input: first_cat/out_file1 second_cat: tool_id: cat_list in: input1: nested_workflow/workflow_output """, test_data=""" outer_input: value: 1.bed type: File """, history_id=history_id, wait=True, round_trip_format_conversion=True) self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("empty_list") @skip_without_tool("count_list") @skip_without_tool("random_lines1") def test_empty_list_mapping(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data outputs: count_list: outputSource: count_list/out_file1 steps: empty_list: tool_id: empty_list in: input1: input1 random_lines: tool_id: random_lines1 state: num_lines: 2 input: $link: empty_list/output seed_source: seed_source_selector: set_seed seed: asdf count_list: tool_id: count_list in: input1: random_lines/out_file1 """, test_data=""" input1: value: 1.bed type: File """, history_id=history_id, wait=True) self.assertEqual("0\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("random_lines1") def test_change_datatype_collection_map_over(self): with self.dataset_populator.test_history() as history_id: jobs_summary = self._run_workflow(""" class: GalaxyWorkflow inputs: text_input1: collection steps: map_over: tool_id: random_lines1 in: input: text_input1 outputs: out_file1: change_datatype: csv """, test_data=""" text_input1: collection_type: "list:paired" """, history_id=history_id) hdca = self.dataset_populator.get_history_collection_details(history_id=jobs_summary.history_id, hid=4) assert hdca['collection_type'] == 'list:paired' assert len(hdca['elements'][0]['object']["elements"]) == 2 forward, reverse = hdca['elements'][0]['object']["elements"] assert forward['object']['file_ext'] == 'csv' assert reverse['object']['file_ext'] == 'csv'
[docs] @skip_without_tool("collection_type_source_map_over") def test_mapping_and_subcollection_mapping(self): with self.dataset_populator.test_history() as history_id: jobs_summary = self._run_workflow(""" class: GalaxyWorkflow inputs: text_input1: collection steps: map_over: tool_id: collection_type_source_map_over in: input_collect: text_input1 """, test_data=""" text_input1: collection_type: "list:paired" """, history_id=history_id) hdca = self.dataset_populator.get_history_collection_details(history_id=jobs_summary.history_id, hid=1) assert hdca['collection_type'] == 'list:paired' assert len(hdca['elements'][0]['object']["elements"]) == 2
[docs] @skip_without_tool("empty_list") @skip_without_tool("count_multi_file") @skip_without_tool("random_lines1") def test_empty_list_reduction(self): with self.dataset_populator.test_history() as history_id: self._run_workflow(""" class: GalaxyWorkflow inputs: input1: data outputs: count_multi_file: outputSource: count_multi_file/out_file1 steps: empty_list: tool_id: empty_list in: input1: input1 random_lines: tool_id: random_lines1 state: num_lines: 2 input: $link: empty_list/output seed_source: seed_source_selector: set_seed seed: asdf count_multi_file: tool_id: count_multi_file in: input1: random_lines/out_file1 """, test_data=""" input1: value: 1.bed type: File """, history_id=history_id, wait=True, round_trip_format_conversion=True) self.assertEqual("0\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat") def test_cancel_new_workflow_when_history_deleted(self): with self.dataset_populator.test_history() as history_id: # Invoke a workflow with a pause step. uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id) # There is no pause of anything in here, so likely the invocation is # is still in a new state. If it isn't that is fine, continue with the # test it will just happen to test the same thing as below. # Wait for all the datasets to complete, make sure the workflow invocation # is not complete. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) self._delete(f"histories/{history_id}") invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'cancelled') assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("cat") def test_cancel_ready_workflow_when_history_deleted(self): # Same as previous test but make sure invocation isn't a new state before # cancelling. with self.dataset_populator.test_history() as history_id: # Invoke a workflow with a pause step. uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id) # Wait for at least one scheduling step. self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id) # Wait for all the datasets to complete, make sure the workflow invocation # is not complete. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) self._delete(f"histories/{history_id}") invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'cancelled') assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("cat") def test_workflow_pause(self): with self.dataset_populator.test_history() as history_id: # Invoke a workflow with a pause step. uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id) # Wait for at least one scheduling step. self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id) # Make sure the history didn't enter a failed state in there. self.dataset_populator.wait_for_history(history_id, assert_ok=True) # Assert the workflow hasn't finished scheduling, we can be pretty sure we # are at the pause step in this case then. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) # Review the paused steps to allow the workflow to continue. self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=2, action=True) # Wait for the workflow to finish scheduling and ensure both the invocation # and the history are in valid states. invocation_scheduled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'scheduled') assert invocation_scheduled, "Workflow state is not scheduled..." self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_tool("cat") def test_workflow_pause_cancel(self): with self.dataset_populator.test_history() as history_id: # Invoke a workflow with a pause step. uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id) # Wait for at least one scheduling step. self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id) # Make sure the history didn't enter a failed state in there. self.dataset_populator.wait_for_history(history_id, assert_ok=True) # Assert the workflow hasn't finished scheduling, we can be pretty sure we # are at the pause step in this case then. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) # Review the paused workflow and cancel it at the paused step. self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=2, action=False) # Ensure the workflow eventually becomes cancelled. invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'cancelled') assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("head") def test_workflow_map_reduce_pause(self): with self.dataset_populator.test_history() as history_id: workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_map_reduce_pause") uploaded_workflow_id = self.workflow_populator.create_workflow(workflow) hda1 = self.dataset_populator.new_dataset(history_id, content="reviewed\nunreviewed") hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=["1\n2\n3", "4\n5\n6"]).json() index_map = { '0': self._ds_entry(hda1), '1': self._ds_entry(hdca1), } invocation_id = self.__invoke_workflow(uploaded_workflow_id, inputs=index_map, history_id=history_id) # Wait for at least one scheduling step. self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id) # Make sure the history didn't enter a failed state in there. self.dataset_populator.wait_for_history(history_id, assert_ok=True) # Assert the workflow hasn't finished scheduling, we can be pretty sure we # are at the pause step in this case then. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=4, action=True) self.workflow_populator.wait_for_invocation_and_jobs(history_id, uploaded_workflow_id, invocation_id) invocation = self._invocation_details(uploaded_workflow_id, invocation_id) assert invocation['state'] == 'scheduled' self.assertEqual("reviewed\n1\nreviewed\n4\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat") def test_cancel_workflow_invocation(self): with self.dataset_populator.test_history() as history_id: # Invoke a workflow with a pause step. uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id) # Wait for at least one scheduling step. self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id) # Make sure the history didn't enter a failed state in there. self.dataset_populator.wait_for_history(history_id, assert_ok=True) # Assert the workflow hasn't finished scheduling, we can be pretty sure we # are at the pause step in this case then. self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id) invocation_url = self._api_url(f"workflows/{uploaded_workflow_id}/usage/{invocation_id}", use_key=True) delete_response = delete(invocation_url) self._assert_status_code_is(delete_response, 200) invocation = self._invocation_details(uploaded_workflow_id, invocation_id) assert invocation['state'] == 'cancelled'
[docs] @skip_without_tool("cat") def test_pause_outputs_with_deleted_inputs(self): self._deleted_inputs_workflow(purge=False)
[docs] @skip_without_tool("cat") def test_error_outputs_with_purged_inputs(self): self._deleted_inputs_workflow(purge=True)
def _deleted_inputs_workflow(self, purge): # We run a workflow on a collection with a deleted element. with self.dataset_populator.test_history() as history_id: workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: input1: type: collection collection_type: list steps: first_cat: tool_id: cat in: input1: input1 second_cat: tool_id: cat in: input1: first_cat/out_file1 """) DELETED = 0 PAUSED_1 = 3 PAUSED_2 = 5 hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=[("sample1-1", "1 2 3")]).json() self.dataset_populator.wait_for_history(history_id, assert_ok=True) deleted_id = hdca1['elements'][DELETED]['object']['id'] r = self._delete(f"histories/{history_id}/contents/{deleted_id}?purge={purge}") label_map = {"input1": self._ds_entry(hdca1)} workflow_request = dict( history=f"hist_id={history_id}", ds_map=self.workflow_populator.build_ds_map(workflow_id, label_map), ) r = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request) self._assert_status_code_is(r, 200) invocation_id = r.json()["id"] # If this starts failing we may have prevented running workflows on collections with deleted members, # in which case we can disable this test. self.workflow_populator.wait_for_invocation_and_jobs(workflow_id, history_id, invocation_id, assert_ok=False) # Why is this sleep needed? -John if not purge: time.sleep(5) contents = self.__history_contents(history_id) assert contents[DELETED]['deleted'] state = 'error' if purge else 'paused' assert contents[PAUSED_1]['state'] == state assert contents[PAUSED_2]['state'] == 'paused'
[docs] def test_run_with_implicit_connection(self): with self.dataset_populator.test_history() as history_id: run_summary = self._run_workflow(""" class: GalaxyWorkflow inputs: test_input: data steps: first_cat: tool_id: cat1 in: input1: test_input the_pause: type: pause in: input: first_cat/out_file1 second_cat: tool_id: cat1 in: input1: the_pause third_cat: tool_id: random_lines1 in: $step: second_cat state: num_lines: 1 input: $link: test_input seed_source: seed_source_selector: set_seed seed: asdf """, test_data={"test_input": "hello world"}, history_id=history_id, wait=False, round_trip_format_conversion=True) history_id = run_summary.history_id workflow_id = run_summary.workflow_id invocation_id = run_summary.invocation_id # Wait for first two jobs to be scheduled - upload and first cat. wait_on(lambda: len(self._history_jobs(history_id)) >= 2 or None, "history jobs") self.dataset_populator.wait_for_history(history_id, assert_ok=True) invocation = self._invocation_details(workflow_id, invocation_id) assert invocation['state'] != 'scheduled', invocation # Expect two jobs - the upload and first cat. randomlines shouldn't run # it is implicitly dependent on second cat. self._assert_history_job_count(history_id, 2) self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) self._assert_history_job_count(history_id, 4)
[docs] def test_run_with_optional_data_specified_to_multi_data(self): with self.dataset_populator.test_history() as history_id: self._run_workflow(WORKFLOW_OPTIONAL_TRUE_INPUT_DATA, test_data=""" input1: value: 1.bed type: File """, history_id=history_id, wait=True, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert "CCDS989.1_cds_0_0_chr1_147962193_r" in content
[docs] def test_run_with_optional_data_unspecified_to_multi_data(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_DATA, test_data={}, history_id=history_id, wait=True, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert "No input selected" in content
[docs] def test_run_with_non_optional_data_unspecified_fails_invocation(self): with self.dataset_populator.test_history() as history_id: error = self._run_jobs(WORKFLOW_OPTIONAL_FALSE_INPUT_DATA, test_data={}, history_id=history_id, wait=False, assert_ok=False, expected_response=400) self._assert_failed_on_non_optional_input(error, "input1")
[docs] def test_run_with_optional_collection_specified(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION, test_data=""" input1: collection_type: paired name: the_dataset_pair elements: - identifier: forward value: 1.fastq type: File - identifier: reverse value: 1.fastq type: File """, history_id=history_id, wait=True, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert "GAATTGATCAGGACATAGGACAACTGTAGGCACCAT" in content
[docs] def test_run_with_optional_collection_unspecified(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION, test_data={}, history_id=history_id, wait=True, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert "No input specified." in content
[docs] def test_run_with_non_optional_collection_unspecified_fails_invocation(self): with self.dataset_populator.test_history() as history_id: error = self._run_jobs(WORKFLOW_OPTIONAL_FALSE_INPUT_COLLECTION, test_data={}, history_id=history_id, wait=False, assert_ok=False, expected_response=400) self._assert_failed_on_non_optional_input(error, "input1")
def _assert_failed_on_non_optional_input(self, error, input_name): assert "err_msg" in error err_msg = error["err_msg"] assert input_name in err_msg assert "is not optional and no input" in err_msg
[docs] def test_run_with_validated_parameter_connection_optional(self): with self.dataset_populator.test_history() as history_id: self._run_workflow(""" class: GalaxyWorkflow inputs: text_input: text steps: validation: tool_id: validation_repeat state: r2: - text: $link: text_input """, test_data=""" text_input: value: "abd" type: raw """, history_id=history_id, wait=True, round_trip_format_conversion=True) jobs = self._history_jobs(history_id) assert len(jobs) == 1
[docs] def test_run_with_int_parameter(self): with self.dataset_populator.test_history() as history_id: failed = False try: self._run_jobs(WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED, test_data=""" data_input: value: 1.bed type: File """, history_id=history_id, wait=True, assert_ok=True) except AssertionError as e: assert '(int_input) is not optional' in str(e) failed = True assert failed run_response = self._run_workflow(WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED, test_data=""" data_input: value: 1.bed type: File int_input: value: 1 type: raw """, history_id=history_id, wait=True, assert_ok=True) # self.dataset_populator.wait_for_history(history_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert len(content.splitlines()) == 1, content invocation = self.workflow_populator.get_invocation(run_response.invocation_id) assert invocation['input_step_parameters']['int_input']['parameter_value'] == 1 run_response = self._run_workflow(WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL, test_data=""" data_input: value: 1.bed type: File """, history_id=history_id, wait=True, assert_ok=True) invocation = self.workflow_populator.get_invocation(run_response.invocation_id) # Optional step parameter without default value will not be recorded. assert 'int_input' not in invocation['input_step_parameters']
[docs] def test_run_with_int_parameter_nested(self): with self.dataset_populator.test_history() as history_id: workflow = self.workflow_populator.load_workflow_from_resource("test_subworkflow_with_integer_input") workflow_id = self.workflow_populator.create_workflow(workflow) hda: dict = self.dataset_populator.new_dataset(history_id, content="1 2 3") workflow_request = { 'history_id': history_id, 'inputs_by': 'name', 'inputs': json.dumps({ 'input_dataset': {'src': 'hda', 'id': hda['id']}, 'int_parameter': 1, }) } self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True)
[docs] def test_run_with_validated_parameter_connection_default_values(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT, test_data=""" data_input: value: 1.bed type: File """, history_id=history_id, wait=True, assert_ok=True) self.dataset_populator.wait_for_history(history_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert len(content.splitlines()) == 3, content
[docs] def test_run_with_validated_parameter_connection_invalid(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: text_input: text steps: validation: tool_id: validation_repeat state: r2: - text: $link: text_input """, test_data=""" text_input: value: "" type: raw """, history_id=history_id, wait=True, assert_ok=False)
[docs] def test_run_with_text_input_connection(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: data_input: data text_input: text steps: randomlines: tool_id: random_lines1 state: num_lines: 1 input: $link: data_input seed_source: seed_source_selector: set_seed seed: $link: text_input """, test_data=""" data_input: value: 1.bed type: File text_input: value: asdf type: raw """, history_id=history_id) self.dataset_populator.wait_for_history(history_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) self.assertEqual("chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", content)
[docs] def test_run_with_numeric_input_connection(self): history_id = self.dataset_populator.new_history() self._run_jobs(""" class: GalaxyWorkflow steps: - label: forty_two tool_id: expression_forty_two state: {} - label: consume_expression_parameter tool_id: cheetah_casting state: floattest: 3.14 inttest: $link: forty_two/out1 test_data: {} """, history_id=history_id) self.dataset_populator.wait_for_history(history_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) lines = content.split("\n") assert len(lines) == 4 str_43 = lines[0] str_4point14 = lines[2] assert lines[3] == "" assert int(str_43) == 43 assert abs(float(str_4point14) - 4.14) < .0001
[docs] @skip_without_tool("param_value_from_file") def test_expression_tool_map_over(self): history_id = self.dataset_populator.new_history() self._run_jobs(""" class: GalaxyWorkflow inputs: text_input1: collection steps: - label: param_out tool_id: param_value_from_file in: input1: text_input1 - label: consume_expression_parameter tool_id: validation_default in: input1: param_out/text_param outputs: out_file1: rename: "replaced_param_collection" test_data: text_input1: collection_type: list elements: - identifier: A content: A - identifier: B content: B """, history_id=history_id) history_contents = self._get(f'histories/{history_id}/contents').json() collection = [c for c in history_contents if c['history_content_type'] == 'dataset_collection' and c['name'] == 'replaced_param_collection'][0] collection_details = self._get(collection['url']).json() assert collection_details['element_count'] == 2 elements = collection_details['elements'] assert elements[0]['element_identifier'] == 'A' assert elements[1]['element_identifier'] == 'B' element_a_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=elements[0]['object']) element_b_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=elements[1]['object']) assert element_a_content.strip() == 'A' assert element_b_content.strip() == 'B'
[docs] @skip_without_tool('create_input_collection') def test_workflow_optional_input_text_parameter_reevaluation(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: text_input: type: text optional: true default: '' steps: create_collection: tool_id: create_input_collection nested_workflow: in: inner_input: create_collection/output inner_text_input: text_input run: class: GalaxyWorkflow inputs: inner_input: type: data_collection_input inner_text_input: type: text optional: true default: '' steps: apply: tool_id: __APPLY_RULES__ in: input: inner_input state: rules: rules: - type: add_column_metadata value: identifier0 mapping: - type: list_identifiers columns: [0] echo: cat1: in: input1: apply/output outputs: out_file1: rename: "#{inner_text_input} suffix" """, history_id=history_id)
[docs] @skip_without_tool('cat1') def test_workflow_rerun_with_use_cached_job(self): workflow = self.workflow_populator.load_workflow(name="test_for_run") # We launch a workflow with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two: workflow_request, _, workflow_id = self._setup_workflow_run(workflow, history_id=history_id_one) invocation_id = self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) invocation_1 = self.workflow_populator.get_invocation(invocation_id) # We copy the workflow inputs to a new history new_workflow_request = workflow_request.copy() new_ds_map = json.loads(new_workflow_request['ds_map']) for key, input_values in invocation_1['inputs'].items(): copy_payload = {"content": input_values['id'], "source": "hda", "type": "dataset"} copy_response = self._post(f"histories/{history_id_two}/contents", data=copy_payload, json=True).json() new_ds_map[key]['id'] = copy_response['id'] new_workflow_request['ds_map'] = json.dumps(new_ds_map, sort_keys=True) new_workflow_request['history'] = f"hist_id={history_id_two}" new_workflow_request['use_cached_job'] = True # We run the workflow again, it should not produce any new outputs new_workflow_response = self.workflow_populator.invoke_workflow_raw(workflow_id, new_workflow_request, assert_ok=True).json() invocation_id = new_workflow_response["id"] self.workflow_populator.wait_for_invocation_and_jobs(history_id_two, workflow_id, invocation_id) # get_history_dataset_details defaults to last item in history, so since we've done # wait_for_invocation_and_jobs - this will be the output of the cat1 job for both histories # (the only job in the loaded workflow). first_wf_output_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id_one) second_wf_output_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id_two) first_wf_output = self._get(f"datasets/{first_wf_output_hda['id']}").json() second_wf_output = self._get(f"datasets/{second_wf_output_hda['id']}").json() assert first_wf_output['file_name'] == second_wf_output['file_name'], \ f"first output:\n{first_wf_output}\nsecond output:\n{second_wf_output}"
[docs] @skip_without_tool('cat1') def test_nested_workflow_rerun_with_use_cached_job(self): with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two: test_data = """ outer_input: value: 1.bed type: File """ run_jobs_summary = self._run_workflow(WORKFLOW_NESTED_SIMPLE, test_data=test_data, history_id=history_id_one) workflow_id = run_jobs_summary.workflow_id workflow_request = run_jobs_summary.workflow_request # We copy the inputs to a new history and re-run the workflow inputs = json.loads(workflow_request['inputs']) dataset_type = inputs['outer_input']['src'] dataset_id = inputs['outer_input']['id'] copy_payload = {"content": dataset_id, "source": dataset_type, "type": "dataset"} copy_response = self._post(f"histories/{history_id_two}/contents", data=copy_payload, json=True) self._assert_status_code_is(copy_response, 200) new_dataset_id = copy_response.json()['id'] inputs['outer_input']['id'] = new_dataset_id workflow_request['use_cached_job'] = True workflow_request['history'] = f"hist_id={history_id_two}" workflow_request['inputs'] = json.dumps(inputs) self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=run_jobs_summary.workflow_request, assert_ok=True) # Now make sure that the HDAs in each history point to the same dataset instances history_one_contents = self.__history_contents(history_id_one) history_two_contents = self.__history_contents(history_id_two) assert len(history_one_contents) == len(history_two_contents) for i, (item_one, item_two) in enumerate(zip(history_one_contents, history_two_contents)): assert item_one['dataset_id'] == item_two['dataset_id'], \ 'Dataset ids should match, but "%s" and "%s" are not the same for History item %i.' % (item_one['dataset_id'], item_two['dataset_id'], i + 1)
[docs] def test_cannot_run_inaccessible_workflow(self): workflow = self.workflow_populator.load_workflow(name="test_for_run_cannot_access") workflow_request, _, workflow_id = self._setup_workflow_run(workflow) with self._different_user(): run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request) self._assert_status_code_is(run_workflow_response, 403)
[docs] def test_400_on_invalid_workflow_id(self): workflow = self.workflow_populator.load_workflow(name="test_for_run_does_not_exist") workflow_request, _, _ = self._setup_workflow_run(workflow) run_workflow_response = self._post(f"workflows/{self._random_key()}/invocations", data=workflow_request) self._assert_status_code_is(run_workflow_response, 400)
[docs] def test_cannot_run_against_other_users_history(self): workflow = self.workflow_populator.load_workflow(name="test_for_run_does_not_exist") workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) with self._different_user(): other_history_id = self.dataset_populator.new_history() workflow_request["history"] = f"hist_id={other_history_id}" run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request) self._assert_status_code_is(run_workflow_response, 403)
[docs] def test_cannot_run_bootstrap_admin_workflow(self): workflow = self.workflow_populator.load_workflow(name="test_bootstrap_admin_cannot_run") workflow_request, *_ = self._setup_workflow_run(workflow) run_workflow_response = self._post("workflows", data=workflow_request, key=self.master_api_key, json=True) self._assert_status_code_is(run_workflow_response, 400)
[docs] @skip_without_tool("cat") @skip_without_tool("cat_list") def test_workflow_run_with_matching_lists(self): workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_matching_lists") workflow_id = self.workflow_populator.create_workflow(workflow) with self.dataset_populator.test_history() as history_id: hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=[("sample1-1", "1 2 3"), ("sample2-1", "7 8 9")]).json() hdca2 = self.dataset_collection_populator.create_list_in_history(history_id, contents=[("sample1-2", "4 5 6"), ("sample2-2", "0 a b")]).json() self.dataset_populator.wait_for_history(history_id, assert_ok=True) label_map = {"list1": self._ds_entry(hdca1), "list2": self._ds_entry(hdca2)} workflow_request = dict( ds_map=self.workflow_populator.build_ds_map(workflow_id, label_map), ) self.workflow_populator.invoke_workflow_and_wait(workflow_id, history_id=history_id, request=workflow_request, assert_ok=True) self.assertEqual("1 2 3\n4 5 6\n7 8 9\n0 a b\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] def test_workflow_stability(self): # Run this index stability test with following command: # ./run_tests.sh test/api/test_workflows.py:WorkflowsApiTestCase.test_workflow_stability num_tests = 1 for workflow_file in ["test_workflow_topoambigouity", "test_workflow_topoambigouity_auto_laidout"]: workflow = self.workflow_populator.load_workflow_from_resource(workflow_file) last_step_map = self._step_map(workflow) for _ in range(num_tests): uploaded_workflow_id = self.workflow_populator.create_workflow(workflow) downloaded_workflow = self._download_workflow(uploaded_workflow_id) step_map = self._step_map(downloaded_workflow) assert step_map == last_step_map last_step_map = step_map
def _step_map(self, workflow): # Build dict mapping 'tep index to input name. step_map = {} for step_index, step in workflow["steps"].items(): if step["type"] == "data_input": step_map[step_index] = step["inputs"][0]["name"] return step_map
[docs] def test_empty_create(self): response = self._post("workflows") self._assert_status_code_is(response, 400) self._assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_MISSING_PARAMETER"])
[docs] def test_invalid_create_multiple_types(self): data = { 'shared_workflow_id': '1234567890abcdef', 'from_history_id': '1234567890abcdef' } response = self._post("workflows", data) self._assert_status_code_is(response, 400) self._assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_INVALID_PARAMETER"])
[docs] @skip_without_tool("cat1") def test_run_with_pja(self): workflow = self.workflow_populator.load_workflow(name="test_for_pja_run", add_pja=True) workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow, inputs_by='step_index') workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced")) run_workflow_response = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True) invocation_id = run_workflow_response.json()["id"] self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) assert content["name"] == "foo was replaced"
[docs] @skip_without_tool("hidden_param") def test_hidden_param_in_workflow(self): with self.dataset_populator.test_history() as history_id: run_object = self._run_workflow(""" class: GalaxyWorkflow steps: step1: tool_id: hidden_param """, test_data={}, history_id=history_id, wait=False) self.workflow_populator.wait_for_invocation_and_jobs(history_id, run_object.workflow_id, run_object.invocation_id) contents = self.__history_contents(history_id) assert len(contents) == 1 okay_dataset = contents[0] assert okay_dataset["state"] == "ok" content = self.dataset_populator.get_history_dataset_content(history_id, hid=1) assert content == '1\n'
[docs] @skip_without_tool("output_filter") def test_optional_workflow_output(self): with self.dataset_populator.test_history() as history_id: run_object = self._run_workflow(""" class: GalaxyWorkflow inputs: [] outputs: wf_output_1: outputSource: output_filter/out_1 steps: output_filter: tool_id: output_filter state: produce_out_1: False filter_text_1: '1' produce_collection: False """, test_data={}, history_id=history_id, wait=False) self.workflow_populator.wait_for_invocation_and_jobs(history_id, run_object.workflow_id, run_object.invocation_id) contents = self.__history_contents(history_id) assert len(contents) == 1 okay_dataset = contents[0] assert okay_dataset["state"] == "ok"
[docs] @skip_without_tool("output_filter_with_input_optional") def test_workflow_optional_input_filtering(self): with self.dataset_populator.test_history() as history_id: test_data = """ input1: collection_type: list elements: - identifier: A content: A """ run_object = self._run_workflow(""" class: GalaxyWorkflow inputs: input1: type: collection collection_type: list outputs: wf_output_1: outputSource: output_filter/out_1 steps: output_filter: tool_id: output_filter_with_input_optional in: input_1: input1 """, test_data=test_data, history_id=history_id, wait=False) self.workflow_populator.wait_for_invocation_and_jobs(history_id, run_object.workflow_id, run_object.invocation_id) contents = self.__history_contents(history_id) assert len(contents) == 4 for content in contents: if content["history_content_type"] == "dataset": assert content["state"] == "ok" else: print(content) assert content["populated_state"] == "ok"
[docs] @skip_without_tool("cat") def test_run_rename_on_mapped_over_collection(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: type: collection collection_type: list steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: rename: "my new name" """, test_data=""" input1: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File """, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, hid=4, wait=True, assert_ok=True) name = content["name"] assert name == "my new name", name assert content["history_content_type"] == "dataset" content = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) name = content["name"] assert content["history_content_type"] == "dataset_collection", content assert name == "my new name", name
[docs] @skip_without_tool("cat") def test_run_rename_based_on_inputs_on_mapped_over_collection(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: type: collection collection_type: list steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: rename: "#{input1} suffix" """, test_data=""" input1: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File """, history_id=history_id) content = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) name = content["name"] assert content["history_content_type"] == "dataset_collection", content assert name == "the_dataset_list suffix", name
[docs] @skip_without_tool("collection_creates_pair") def test_run_rename_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: - tool_id: collection_creates_pair in: input1: input1 outputs: paired_output: rename: "my new name" """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=4, wait=True, assert_ok=True) assert details1['elements'][0]['object']['visible'] is False assert details1["name"] == "my new name", details1 assert details1["history_content_type"] == "dataset_collection"
[docs] @skip_without_tool("__BUILD_LIST__") def test_run_build_list_hide_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: - tool_id: __BUILD_LIST__ in: datasets_0|input: input1 state: datasets: - id_cond: id_select: id outputs: output: hide: true """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) assert details1['elements'][0]['object']['visible'] is False assert details1["name"] == "data 1 (as list)", details1 assert details1["visible"] is False
[docs] @skip_without_tool("__BUILD_LIST__") def test_run_build_list_delete_intermediate_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: - tool_id: __BUILD_LIST__ in: datasets_0|input: input1 state: datasets: - id_cond: id_select: id outputs: output: delete_intermediate_datasets: true """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) assert details1['elements'][0]['object']['visible'] is False assert details1["name"] == "data 1 (as list)", details1 # FIXME: this doesn't work because the workflow is still being scheduled # TODO: Implement a way to run PJAs that couldn't be run during/after the job # after the workflow has run to completion assert details1["deleted"] is False
[docs] @skip_without_tool("__BUILD_LIST__") def test_run_build_list_change_datatype_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: - tool_id: __BUILD_LIST__ in: datasets_0|input: input1 state: datasets: - id_cond: id_select: idx outputs: output: change_datatype: txt - tool_id: __BUILD_LIST__ in: datasets_0|input: input1 state: datasets: - id_cond: id_select: idx """, test_data=""" input1: value: 1.fasta type: File file_type: fasta name: fasta1 """, history_id=history_id) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) assert details1["name"] == "data 1 (as list)", details1 assert details1['elements'][0]['object']['visible'] is False assert details1['elements'][0]['object']['file_ext'] == 'txt' details2 = self.dataset_populator.get_history_collection_details(history_id, hid=5, wait=True, assert_ok=True) # Also check that we don't overwrite the original HDA's datatype assert details2['elements'][0]['object']['file_ext'] == 'fasta'
[docs] @skip_without_tool("__BUILD_LIST__") def test_run_build_list_rename_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: - tool_id: __BUILD_LIST__ in: datasets_0|input: input1 state: datasets: - id_cond: id_select: idx outputs: output: rename: "my new name" """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) assert details1['elements'][0]['object']['visible'] is False assert details1["name"] == "my new name", details1 assert details1["history_content_type"] == "dataset_collection"
[docs] @skip_without_tool("create_2") def test_run_rename_multiple_outputs(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: [] steps: create_2: tool_id: create_2 state: sleep_time: 0 outputs: out_file1: rename: "my new name" out_file2: rename: "my other new name" """, test_data={}, history_id=history_id) details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=True) details2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2) assert details1["name"] == "my new name" assert details2["name"] == "my other new name"
[docs] @skip_without_tool("cat") def test_run_rename_based_on_input(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_RENAME_ON_INPUT, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) name = content["name"] assert name == "fasta1 suffix", name
[docs] @skip_without_tool("fail_identifier") @skip_without_tool("cat") def test_run_rename_when_resuming_jobs(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: first_fail: tool_id: fail_identifier state: failbool: true input1: $link: input1 outputs: out_file1: rename: "cat1 out" cat: tool_id: cat in: input1: first_fail/out_file1 outputs: out_file1: rename: "#{input1} suffix" """, test_data=""" input1: value: 1.fasta type: File name: fail """, history_id=history_id, wait=True, assert_ok=False) content = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=False) name = content["name"] assert content['state'] == 'error', content input1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False) job_id = content['creating_job'] inputs = {"input1": {'values': [{'src': 'hda', 'id': input1['id']}] }, "failbool": "false", "rerun_remap_job_id": job_id} self.dataset_populator.run_tool( tool_id='fail_identifier', inputs=inputs, history_id=history_id, ) unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False) assert unpaused_dataset['state'] == 'ok' assert unpaused_dataset['name'] == f"{name} suffix"
[docs] @skip_without_tool("cat") def test_run_rename_based_on_input_recursive(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: rename: "#{input1} #{input1 | upper} suffix" """, test_data=""" input1: value: 1.fasta type: File name: '#{input1}' """, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) name = content["name"] assert name == "#{input1} #{INPUT1} suffix", name
[docs] @skip_without_tool("cat") def test_run_rename_based_on_input_repeat(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data input2: data steps: first_cat: tool_id: cat state: input1: $link: input1 queries: - input2: $link: input2 outputs: out_file1: rename: "#{queries_0.input2| basename} suffix" """, test_data=""" input1: value: 1.fasta type: File name: fasta1 input2: value: 1.fasta type: File name: fasta2 """, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) name = content["name"] assert name == "fasta2 suffix", name
[docs] @skip_without_tool("mapper2") def test_run_rename_based_on_input_conditional(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: fasta_input: data fastq_input: data steps: mapping: tool_id: mapper2 state: fastq_input: fastq_input_selector: single fastq_input1: $link: fastq_input reference: $link: fasta_input outputs: out_file1: # Wish it was qualified for conditionals but it doesn't seem to be. -John # rename: "#{fastq_input.fastq_input1 | basename} suffix" rename: "#{fastq_input1 | basename} suffix" """, test_data=""" fasta_input: value: 1.fasta type: File name: fasta1 file_type: fasta fastq_input: value: 1.fastqsanger type: File name: fastq1 file_type: fastqsanger """, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) name = content["name"] assert name == "fastq1 suffix", name
[docs] @skip_without_tool("mapper2") def test_run_rename_based_on_input_collection(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: fasta_input: data fastq_inputs: data steps: mapping: tool_id: mapper2 state: fastq_input: fastq_input_selector: paired_collection fastq_input1: $link: fastq_inputs reference: $link: fasta_input outputs: out_file1: # Wish it was qualified for conditionals but it doesn't seem to be. -John # rename: "#{fastq_input.fastq_input1 | basename} suffix" rename: "#{fastq_input1} suffix" """, test_data=""" fasta_input: value: 1.fasta type: File name: fasta1 file_type: fasta fastq_inputs: collection_type: list name: the_dataset_pair elements: - identifier: forward value: 1.fastq type: File - identifier: reverse value: 1.fastq type: File """, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) name = content["name"] assert name == "the_dataset_pair suffix", name
[docs] @skip_without_tool("collection_creates_pair") def test_run_hide_on_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: create_pair: tool_id: collection_creates_pair state: input1: $link: input1 outputs: paired_output: hide: true """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=4, wait=True, assert_ok=True) assert details1["history_content_type"] == "dataset_collection" assert not details1["visible"], details1
[docs] @skip_without_tool("cat") def test_run_hide_on_mapped_over_collection(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: - id: input1 type: data_collection_input collection_type: list steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: hide: true """, test_data=""" input1: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File """, history_id=history_id) content = self.dataset_populator.get_history_dataset_details(history_id, hid=4, wait=True, assert_ok=True) assert content["history_content_type"] == "dataset" assert not content["visible"] content = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) assert content["history_content_type"] == "dataset_collection", content assert not content["visible"]
[docs] @skip_without_tool("cat") def test_tag_auto_propagation(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: add_tags: - "name:treated1fb" - "group:condition:treated" - "group:type:single-read" - "machine:illumina" second_cat: tool_id: cat in: input1: first_cat/out_file1 """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id, round_trip_format_conversion=True) details0 = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True) tags = details0["tags"] assert len(tags) == 4, details0 assert "name:treated1fb" in tags, tags assert "group:condition:treated" in tags, tags assert "group:type:single-read" in tags, tags assert "machine:illumina" in tags, tags details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=3, wait=True, assert_ok=True) tags = details1["tags"] assert len(tags) == 1, details1 assert "name:treated1fb" in tags, tags
[docs] @skip_without_tool("collection_creates_pair") def test_run_add_tag_on_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: create_pair: tool_id: collection_creates_pair in: input1: input1 outputs: paired_output: add_tags: - "name:foo" """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id, round_trip_format_conversion=True) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=4, wait=True, assert_ok=True) assert details1["history_content_type"] == "dataset_collection" assert details1["tags"][0] == "name:foo", details1
[docs] @skip_without_tool("cat") def test_run_add_tag_on_mapped_over_collection(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: type: collection collection_type: list steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: add_tags: - "name:foo" """, test_data=""" input1: collection_type: list name: the_dataset_list elements: - identifier: el1 value: 1.fastq type: File """, history_id=history_id, round_trip_format_conversion=True) details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True) assert details1["history_content_type"] == "dataset_collection" assert details1["tags"][0] == "name:foo", details1
[docs] @skip_without_tool("collection_creates_pair") @skip_without_tool("cat") def test_run_remove_tag_on_collection_output(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data steps: first_cat: tool_id: cat in: input1: input1 outputs: out_file1: add_tags: - "name:foo" create_pair: tool_id: collection_creates_pair in: input1: first_cat/out_file1 outputs: paired_output: remove_tags: - "name:foo" """, test_data=""" input1: value: 1.fasta type: File name: fasta1 """, history_id=history_id, round_trip_format_conversion=True) details_dataset_with_tag = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True) assert details_dataset_with_tag["history_content_type"] == "dataset", details_dataset_with_tag assert details_dataset_with_tag["tags"][0] == "name:foo", details_dataset_with_tag details_collection_without_tag = self.dataset_populator.get_history_collection_details(history_id, hid=5, wait=True, assert_ok=True) assert details_collection_without_tag["history_content_type"] == "dataset_collection", details_collection_without_tag assert len(details_collection_without_tag["tags"]) == 0, details_collection_without_tag
[docs] @skip_without_tool("cat1") def test_run_with_runtime_pja(self): workflow = self.workflow_populator.load_workflow(name="test_for_pja_runtime") uuid0, uuid1, uuid2 = str(uuid4()), str(uuid4()), str(uuid4()) workflow["steps"]["0"]["uuid"] = uuid0 workflow["steps"]["1"]["uuid"] = uuid1 workflow["steps"]["2"]["uuid"] = uuid2 workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow, inputs_by='step_index') workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced")) pja_map = { "RenameDatasetActionout_file1": dict( action_type="RenameDatasetAction", output_name="out_file1", action_arguments=dict(newname="foo ${replaceme}"), ) } workflow_request["parameters"] = dumps({ uuid2: {"__POST_JOB_ACTIONS__": pja_map} }) self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) assert content["name"] == "foo was replaced", content["name"] # Test for regression of previous behavior where runtime post job actions # would be added to the original workflow post job actions. downloaded_workflow = self._download_workflow(workflow_id) pjas = list(downloaded_workflow["steps"]["2"]["post_job_actions"].values()) assert len(pjas) == 0, len(pjas)
[docs] @skip_without_tool("cat1") def test_run_with_delayed_runtime_pja(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow inputs: test_input: data steps: first_cat: tool_id: cat1 in: input1: test_input the_pause: type: pause in: input: first_cat/out_file1 second_cat: tool_id: cat1 in: input1: the_pause """, round_trip_format_conversion=True) downloaded_workflow = self._download_workflow(workflow_id) uuid_dict = {int(index): step["uuid"] for index, step in downloaded_workflow["steps"].items()} with self.dataset_populator.test_history() as history_id: hda = self.dataset_populator.new_dataset(history_id, content="1 2 3") self.dataset_populator.wait_for_history(history_id) inputs = { '0': self._ds_entry(hda), } uuid2 = uuid_dict[3] workflow_request = {} workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced")) pja_map = { "RenameDatasetActionout_file1": dict( action_type="RenameDatasetAction", output_name="out_file1", action_arguments=dict(newname="foo ${replaceme}"), ) } workflow_request["parameters"] = dumps({ uuid2: {"__POST_JOB_ACTIONS__": pja_map} }) invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, request=workflow_request, history_id=history_id) time.sleep(2) self.dataset_populator.wait_for_history(history_id) self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True) self.workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id) time.sleep(1) content = self.dataset_populator.get_history_dataset_details(history_id) assert content["name"] == "foo was replaced", content["name"]
[docs] @skip_without_tool("cat1") def test_delete_intermediate_datasets_pja_1(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data outputs: wf_output_1: outputSource: third_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: input1 second_cat: tool_id: cat1 in: input1: first_cat/out_file1 third_cat: tool_id: cat1 in: input1: second_cat/out_file1 outputs: out_file1: delete_intermediate_datasets: true """, test_data={"input1": "hello world"}, history_id=history_id) hda1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1) hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2) hda3 = self.dataset_populator.get_history_dataset_details(history_id, hid=3) hda4 = self.dataset_populator.get_history_dataset_details(history_id, hid=4) assert not hda1["deleted"] assert hda2["deleted"] # I think hda3 should be deleted, but the inputs to # steps with workflow outputs are not deleted. # assert hda3["deleted"] print(hda3["deleted"]) assert not hda4["deleted"]
[docs] @skip_without_tool("cat1") def test_validated_post_job_action_validated(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data outputs: wf_output_1: outputSource: first_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: input1 post_job_actions: ValidateOutputsAction: action_type: ValidateOutputsAction """, test_data={"input1": {"type": "File", "file_type": "fastqsanger", "value": "1.fastqsanger"}}, history_id=history_id) hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2) assert hda2["validated_state"] == "ok"
[docs] @skip_without_tool("cat1") def test_validated_post_job_action_unvalidated_default(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_SIMPLE, test_data={"input1": {"type": "File", "file_type": "fastqsanger", "value": "1.fastqsanger"}}, history_id=history_id) hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2) assert hda2["validated_state"] == "unknown"
[docs] @skip_without_tool("cat1") def test_validated_post_job_action_invalid(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow inputs: input1: data outputs: wf_output_1: outputSource: first_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: input1 post_job_actions: ValidateOutputsAction: action_type: ValidateOutputsAction """, test_data={"input1": {"type": "File", "file_type": "fastqcssanger", "value": "1.fastqsanger"}}, history_id=history_id) hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2) assert hda2["validated_state"] == "invalid"
[docs] def test_value_restriction_with_select_and_text_param(self): workflow_id = self.workflow_populator.upload_yaml_workflow(""" class: GalaxyWorkflow inputs: select_text: type: text restrictOnConnections: true steps: select: tool_id: multi_select in: select_ex: select_text tool_with_text_input: tool_id: param_text_option in: text_param: select_text """) with self.dataset_populator.test_history() as history_id: run_workflow = self._download_workflow(workflow_id, style='run', history_id=history_id) options = run_workflow['steps'][0]['inputs'][0]['options'] assert len(options) == 5 assert options[0] == ['Ex1', '--ex1', False]
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_by_tool(self): workflow_request, history_id, workflow_id = self._setup_random_x2_workflow("test_for_replace_tool_params") workflow_request["parameters"] = dumps(dict(random_lines1=dict(num_lines=5))) self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) # Would be 8 and 6 without modification self.__assert_lines_hid_line_count_is(history_id, 2, 5) self.__assert_lines_hid_line_count_is(history_id, 3, 5)
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_by_uuid(self): workflow_request, history_id, workflow_id = self._setup_random_x2_workflow("test_for_replace_") workflow_request["parameters"] = dumps({ "58dffcc9-bcb7-4117-a0e1-61513524b3b1": dict(num_lines=4), "58dffcc9-bcb7-4117-a0e1-61513524b3b2": dict(num_lines=3), }) self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) # Would be 8 and 6 without modification self.__assert_lines_hid_line_count_is(history_id, 2, 4) self.__assert_lines_hid_line_count_is(history_id, 3, 3)
[docs] @skip_without_tool("cat1") @skip_without_tool("addValue") def test_run_batch(self): workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_batch") workflow_id = self.workflow_populator.create_workflow(workflow) with self.dataset_populator.test_history() as history_id: hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3") hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6") hda3 = self.dataset_populator.new_dataset(history_id, content="7 8 9") hda4 = self.dataset_populator.new_dataset(history_id, content="10 11 12") parameters = { "0": {"input": {"batch": True, "values": [{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}, {"id": hda2.get("id"), "hid": hda2.get("hid"), "src": "hda"}, {"id": hda3.get("id"), "hid": hda2.get("hid"), "src": "hda"}, {"id": hda4.get("id"), "hid": hda2.get("hid"), "src": "hda"}]}}, "1": {"input": {"batch": False, "values": [{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}]}, "exp": "2"}} workflow_request = { "history_id": history_id, "batch": True, "parameters_normalized": True, "parameters": dumps(parameters), } invocation_response = self._post(f"workflows/{workflow_id}/usage", data=workflow_request) self._assert_status_code_is(invocation_response, 200) time.sleep(5) self.dataset_populator.wait_for_history(history_id, assert_ok=True) r1 = "1 2 3\t1\n1 2 3\t2\n" r2 = "4 5 6\t1\n1 2 3\t2\n" r3 = "7 8 9\t1\n1 2 3\t2\n" r4 = "10 11 12\t1\n1 2 3\t2\n" t1 = self.dataset_populator.get_history_dataset_content(history_id, hid=7) t2 = self.dataset_populator.get_history_dataset_content(history_id, hid=10) t3 = self.dataset_populator.get_history_dataset_content(history_id, hid=13) t4 = self.dataset_populator.get_history_dataset_content(history_id, hid=16) self.assertEqual(r1, t1) self.assertEqual(r2, t2) self.assertEqual(r3, t3) self.assertEqual(r4, t4)
[docs] @skip_without_tool("cat1") @skip_without_tool("addValue") def test_run_batch_inputs(self): workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_batch") workflow_id = self.workflow_populator.create_workflow(workflow) with self.dataset_populator.test_history() as history_id: hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3") hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6") hda3 = self.dataset_populator.new_dataset(history_id, content="7 8 9") hda4 = self.dataset_populator.new_dataset(history_id, content="10 11 12") inputs = { "coolinput": {"batch": True, "values": [{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}, {"id": hda2.get("id"), "hid": hda2.get("hid"), "src": "hda"}, {"id": hda3.get("id"), "hid": hda2.get("hid"), "src": "hda"}, {"id": hda4.get("id"), "hid": hda2.get("hid"), "src": "hda"}]} } parameters = { "1": {"input": {"batch": False, "values": [{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}]}, "exp": "2"} } workflow_request = { "history_id": history_id, "batch": True, "inputs": dumps(inputs), "inputs_by": "name", "parameters_normalized": True, "parameters": dumps(parameters), } invocation_response = self._post(f"workflows/{workflow_id}/usage", data=workflow_request) self._assert_status_code_is(invocation_response, 200) time.sleep(5) self.dataset_populator.wait_for_history(history_id, assert_ok=True) r1 = "1 2 3\t1\n1 2 3\t2\n" r2 = "4 5 6\t1\n1 2 3\t2\n" r3 = "7 8 9\t1\n1 2 3\t2\n" r4 = "10 11 12\t1\n1 2 3\t2\n" t1 = self.dataset_populator.get_history_dataset_content(history_id, hid=7) t2 = self.dataset_populator.get_history_dataset_content(history_id, hid=10) t3 = self.dataset_populator.get_history_dataset_content(history_id, hid=13) t4 = self.dataset_populator.get_history_dataset_content(history_id, hid=16) self.assertEqual(r1, t1) self.assertEqual(r2, t2) self.assertEqual(r3, t3) self.assertEqual(r4, t4)
[docs] @skip_without_tool("validation_default") def test_parameter_substitution_sanitization(self): substitions = dict(input1="\" ; echo \"moo") run_workflow_response, history_id = self._run_validation_workflow_with_substitions(substitions) self.dataset_populator.wait_for_history(history_id, assert_ok=True) self.assertEqual("__dq__ X echo __dq__moo\n", self.dataset_populator.get_history_dataset_content(history_id, hid=1))
[docs] @skip_without_tool("validation_repeat") def test_parameter_substitution_validation_value_errors_0(self): with self.dataset_populator.test_history() as history_id: workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: validation: tool_id: validation_repeat state: r2: - text: "abd" """) workflow_request = dict( history=f"hist_id={history_id}", parameters=dumps(dict(validation_repeat={"r2_0|text": ""})) ) url = f"workflows/{workflow_id}/invocations" invocation_response = self._post(url, data=workflow_request) # Take a valid stat and make it invalid, assert workflow won't run. self._assert_status_code_is(invocation_response, 400)
[docs] @skip_without_tool("validation_default") def test_parameter_substitution_validation_value_errors_1(self): substitions = dict(select_param="\" ; echo \"moo") run_workflow_response, history_id = self._run_validation_workflow_with_substitions(substitions) self._assert_status_code_is(run_workflow_response, 400)
[docs] @skip_without_tool("validation_repeat") def test_workflow_import_state_validation_1(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(""" class: GalaxyWorkflow steps: validation: tool_id: validation_repeat state: r2: - text: "" """, history_id=history_id, wait=False, expected_response=400, assert_ok=False)
def _run_validation_workflow_with_substitions(self, substitions): workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_validation_1") uploaded_workflow_id = self.workflow_populator.create_workflow(workflow) history_id = self.dataset_populator.new_history() workflow_request = dict( history=f"hist_id={history_id}", workflow_id=uploaded_workflow_id, parameters=dumps(dict(validation_default=substitions)) ) run_workflow_response = self.workflow_populator.invoke_workflow_raw(uploaded_workflow_id, workflow_request) return run_workflow_response, history_id
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_by_steps(self): workflow_request, history_id, workflow_id, steps = self._setup_random_x2_workflow_steps("test_for_replace_step_params") params = dumps({str(steps[1]["id"]): dict(num_lines=5)}) workflow_request["parameters"] = params self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) # Would be 8 and 6 without modification self.__assert_lines_hid_line_count_is(history_id, 2, 8) self.__assert_lines_hid_line_count_is(history_id, 3, 5)
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_nested(self): workflow_request, history_id, workflow_id, steps = self._setup_random_x2_workflow_steps("test_for_replace_step_params_nested") seed_source = dict( seed_source_selector="set_seed", seed="moo", ) params = dumps({str(steps[0]["id"]): dict(num_lines=1, seed_source=seed_source), str(steps[1]["id"]): dict(num_lines=1, seed_source=seed_source)}) workflow_request["parameters"] = params self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) self.assertEqual("2\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_nested_normalized(self): workflow_request, history_id, workflow_id, steps = self._setup_random_x2_workflow_steps("test_for_replace_step_normalized_params_nested") parameters = { "num_lines": 1, "seed_source|seed_source_selector": "set_seed", "seed_source|seed": "moo", } params = dumps({str(steps[0]["id"]): parameters, str(steps[1]["id"]): parameters}) workflow_request["parameters"] = params workflow_request["parameters_normalized"] = False self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request, assert_ok=True) self.assertEqual("2\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_over_default(self): with self.dataset_populator.test_history() as history_id: self._run_jobs(WORKFLOW_ONE_STEP_DEFAULT, test_data=""" step_parameters: '1': num_lines: 4 input: value: 1.bed type: File """, history_id=history_id, wait=True, assert_ok=True, round_trip_format_conversion=True) result = self.dataset_populator.get_history_dataset_content(history_id) assert result.count("\n") == 4
[docs] @skip_without_tool("random_lines1") def test_defaults_editor(self): workflow_id = self._upload_yaml_workflow(WORKFLOW_ONE_STEP_DEFAULT, publish=True) workflow_object = self._download_workflow(workflow_id, style="editor") put_response = self._update_workflow(workflow_id, workflow_object) assert put_response.status_code == 200
[docs] @skip_without_tool("random_lines1") def test_run_replace_params_over_default_delayed(self): with self.dataset_populator.test_history() as history_id: run_summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input: data steps: first_cat: tool_id: cat1 in: input1: input the_pause: type: pause in: input: first_cat/out_file1 randomlines: tool_id: random_lines1 in: input: the_pause num_lines: default: 6 """, test_data=""" step_parameters: '3': num_lines: 4 input: value: 1.bed type: File """, history_id=history_id, wait=False) wait_on(lambda: len(self._history_jobs(history_id)) >= 2 or None, "history jobs") self.dataset_populator.wait_for_history(history_id, assert_ok=True) workflow_id = run_summary.workflow_id invocation_id = run_summary.invocation_id self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) result = self.dataset_populator.get_history_dataset_content(history_id) assert result.count("\n") == 4
[docs] def test_pja_import_export(self): workflow = self.workflow_populator.load_workflow(name="test_for_pja_import", add_pja=True) uploaded_workflow_id = self.workflow_populator.create_workflow(workflow) downloaded_workflow = self._download_workflow(uploaded_workflow_id) self._assert_has_keys(downloaded_workflow["steps"], "0", "1", "2") pjas = list(downloaded_workflow["steps"]["2"]["post_job_actions"].values()) assert len(pjas) == 1, len(pjas) pja = pjas[0] self._assert_has_keys(pja, "action_type", "output_name", "action_arguments")
[docs] def test_invocation_filtering(self): with self._different_user(email=f"{uuid4()}@test.com"): # new user, start with no invocations assert not self._assert_invocation_for_url_is('invocations') self._run_jobs(""" class: GalaxyWorkflow inputs: input: type: data optional: true steps: [] """, wait=False) first_invocation = self._assert_invocation_for_url_is('invocations') new_history_id = self.dataset_populator.new_history() # new history has no invocations assert not self._assert_invocation_for_url_is(f"invocations?history_id={new_history_id}") self._run_jobs(""" class: GalaxyWorkflow inputs: input: type: data optional: true steps: [] """, history_id=new_history_id, wait=False) # new history has one invocation now new_invocation = self._assert_invocation_for_url_is(f"invocations?history_id={new_history_id}") # filter invocation by workflow instance id self._assert_invocation_for_url_is(f"invocations?workflow_id={first_invocation['workflow_id']}&instance=true", first_invocation) # limit to 1, newest invocation first by default self._assert_invocation_for_url_is("invocations?limit=1", target_invocation=new_invocation) # limit to 1, descending sort on date self._assert_invocation_for_url_is("invocations?limit=1&sort_by=create_time&sort_desc=true", target_invocation=new_invocation) # limit to 1, ascending sort on date self._assert_invocation_for_url_is("invocations?limit=1&sort_by=create_time&sort_desc=false", target_invocation=first_invocation) # limit to 1, ascending sort on date, offset 1 self._assert_invocation_for_url_is("invocations?limit=1&sort_by=create_time&sort_desc=false&offset=1", target_invocation=new_invocation)
def _assert_invocation_for_url_is(self, route, target_invocation=None): response = self._get(route) self._assert_status_code_is(response, 200) invocations = response.json() if target_invocation: assert len(invocations) == 1 assert invocations[0]['id'] == target_invocation['id'] if invocations: assert len(invocations) == 1 return invocations[0]
[docs] @skip_without_tool("cat1") def test_only_own_invocations_indexed_and_accessible(self): workflow_id, usage = self._run_workflow_once_get_invocation("test_usage_accessiblity") with self._different_user(): usage_details_response = self._get(f"workflows/{workflow_id}/usage/{usage['id']}") self._assert_status_code_is(usage_details_response, 403) index_response = self._get(f"workflows/{workflow_id}/invocations") self._assert_status_code_is(index_response, 200) assert len(index_response.json()) == 0 invocation_ids = self._all_user_invocation_ids() assert usage["id"] in invocation_ids with self._different_user(): invocation_ids = self._all_user_invocation_ids() assert usage["id"] not in invocation_ids
[docs] @skip_without_tool("cat1") def test_invocation_usage(self): workflow_id, usage = self._run_workflow_once_get_invocation("test_usage") invocation_id = usage["id"] usage_details = self._invocation_details(workflow_id, invocation_id) # Assert some high-level things about the structure of data returned. self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id", "history_id") # Check invocations for this workflow invocation by history and regardless of history. history_invocations_response = self._get("invocations", {"history_id": usage_details["history_id"]}) self._assert_status_code_is(history_invocations_response, 200) assert len(history_invocations_response.json()) == 1 assert history_invocations_response.json()[0]["id"] == invocation_id # Check history invocations for this workflow invocation. invocation_ids = self._all_user_invocation_ids() assert invocation_id in invocation_ids # Wait for the invocation to be fully scheduled, so we have details on all steps. self._wait_for_invocation_state(workflow_id, invocation_id, 'scheduled') usage_details = self._invocation_details(workflow_id, invocation_id) invocation_steps = usage_details["steps"] invocation_input_step, invocation_tool_step = {}, {} for invocation_step in invocation_steps: self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id") order_index = invocation_step["order_index"] assert order_index in [0, 1, 2], order_index if order_index == 0: invocation_input_step = invocation_step elif order_index == 2: invocation_tool_step = invocation_step # Tool steps have non-null job_ids (deprecated though they may be) assert invocation_input_step.get("job_id", None) is None job_id = invocation_tool_step.get("job_id", None) assert job_id is not None invocation_tool_step_id = invocation_tool_step["id"] invocation_tool_step_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}/steps/{invocation_tool_step_id}") self._assert_status_code_is(invocation_tool_step_response, 200) self._assert_has_keys(invocation_tool_step_response.json(), "id", "order_index", "job_id") assert invocation_tool_step_response.json()["job_id"] == job_id
[docs] def test_invocation_with_collection_mapping(self): workflow_id, invocation_id = self._run_mapping_workflow() usage_details = self._invocation_details(workflow_id, invocation_id) # Assert some high-level things about the structure of data returned. self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id") invocation_steps = usage_details["steps"] invocation_input_step, invocation_tool_step = None, None for invocation_step in invocation_steps: self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id") order_index = invocation_step["order_index"] assert order_index in [0, 1] if invocation_step["order_index"] == 0: assert invocation_input_step is None invocation_input_step = invocation_step else: assert invocation_tool_step is None invocation_tool_step = invocation_step assert invocation_input_step assert invocation_tool_step # Tool steps have non-null job_ids (deprecated though they may be) assert invocation_input_step.get("job_id", None) is None assert invocation_tool_step.get("job_id", None) is None assert invocation_tool_step["state"] == "scheduled" usage_details = self._invocation_details(workflow_id, invocation_id, legacy_job_state="true") # Assert some high-level things about the structure of data returned. self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id") invocation_steps = usage_details["steps"] invocation_input_step = None invocation_tool_steps = [] for invocation_step in invocation_steps: self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id") order_index = invocation_step["order_index"] assert order_index in [0, 1] if invocation_step["order_index"] == 0: assert invocation_input_step is None invocation_input_step = invocation_step else: invocation_tool_steps.append(invocation_step) assert len(invocation_tool_steps) == 2 assert invocation_tool_steps[0]["state"] == "ok"
def _run_mapping_workflow(self): history_id = self.dataset_populator.new_history() summary = self._run_workflow(""" class: GalaxyWorkflow inputs: input_c: collection steps: cat1: tool_id: cat1 in: input1: input_c """, test_data=""" input_c: collection_type: list elements: - identifier: i1 content: "0" - identifier: i2 content: "1" """, history_id=history_id, wait=True, assert_ok=True) workflow_id = summary.workflow_id invocation_id = summary.invocation_id return workflow_id, invocation_id
[docs] @skip_without_tool("cat1") def test_invocations_accessible_imported_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True) with self._different_user(): other_import_response = self.__import_workflow(workflow_id) self._assert_status_code_is(other_import_response, 200) other_id = other_import_response.json()["id"] workflow_request, history_id, _ = self._setup_workflow_run(workflow_id=other_id) response = self._get(f"workflows/{other_id}/usage") self._assert_status_code_is(response, 200) assert len(response.json()) == 0 run_workflow_response = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True) run_workflow_dict = run_workflow_response.json() invocation_id = run_workflow_dict['id'] usage_details_response = self._get(f"workflows/{other_id}/usage/{invocation_id}") self._assert_status_code_is(usage_details_response, 200)
[docs] @skip_without_tool("cat1") def test_invocations_accessible_published_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True) with self._different_user(): workflow_request, history_id, _ = self._setup_workflow_run(workflow_id=workflow_id) response = self._get(f"workflows/{workflow_id}/usage") self._assert_status_code_is(response, 200) assert len(response.json()) == 0 run_workflow_response = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True) run_workflow_dict = run_workflow_response.json() invocation_id = run_workflow_dict['id'] usage_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}") self._assert_status_code_is(usage_details_response, 200)
[docs] @skip_without_tool("cat1") def test_invocations_not_accessible_by_different_user_for_published_workflow(self): workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True) workflow_request, history_id, _ = self._setup_workflow_run(workflow_id=workflow_id) response = self._get(f"workflows/{workflow_id}/usage") self._assert_status_code_is(response, 200) assert len(response.json()) == 0 run_workflow_response = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True) run_workflow_dict = run_workflow_response.json() invocation_id = run_workflow_dict['id'] with self._different_user(): usage_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}") self._assert_status_code_is(usage_details_response, 403)
[docs] def test_workflow_publishing(self): workflow_id = self.workflow_populator.simple_workflow("dummy") response = self._show_workflow(workflow_id) assert not response['published'] published_worklow = self._put(f'workflows/{workflow_id}', data={'published': True}, json=True).json() assert published_worklow['published'] unpublished_worklow = self._put(f'workflows/{workflow_id}', data={'published': False}, json=True).json() assert not unpublished_worklow['published']
[docs] def test_workflow_from_path_requires_admin(self): # There are two ways to import workflows from paths, just verify both require an admin. workflow_directory = mkdtemp() try: workflow_path = os.path.join(workflow_directory, "workflow.yml") with open(workflow_path, "w") as f: f.write(WORKFLOW_NESTED_REPLACEMENT_PARAMETER) import_response = self.workflow_populator.import_workflow_from_path_raw(workflow_path) self._assert_status_code_is(import_response, 403) self._assert_error_code_is(import_response, error_codes.error_codes_by_name["ADMIN_REQUIRED"]) path_as_uri = f"file://{workflow_path}" import_data = dict(archive_source=path_as_uri) import_response = self._post("workflows", data=import_data) self._assert_status_code_is(import_response, 403) self._assert_error_code_is(import_response, error_codes.error_codes_by_name["ADMIN_REQUIRED"]) finally: shutil.rmtree(workflow_directory)
def _invoke_paused_workflow(self, history_id): workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_pause") workflow_id = self.workflow_populator.create_workflow(workflow) hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3") index_map = { '0': self._ds_entry(hda1), } invocation_id = self.__invoke_workflow( workflow_id, history_id=history_id, inputs=index_map, ) return workflow_id, invocation_id def _wait_for_invocation_non_new(self, workflow_id, invocation_id): target_state_reached = False for _ in range(50): invocation = self._invocation_details(workflow_id, invocation_id) if invocation['state'] != 'new': target_state_reached = True break time.sleep(.25) return target_state_reached def _assert_invocation_non_terminal(self, workflow_id, invocation_id): invocation = self._invocation_details(workflow_id, invocation_id) assert invocation['state'] in ['ready', 'new'], invocation def _wait_for_invocation_state(self, workflow_id, invocation_id, target_state): target_state_reached = False for _ in range(25): invocation = self._invocation_details(workflow_id, invocation_id) if invocation['state'] == target_state: target_state_reached = True break time.sleep(.5) return target_state_reached def _update_workflow(self, workflow_id, workflow_object): return self.workflow_populator.update_workflow(workflow_id, workflow_object) def _invocation_step_details(self, workflow_id, invocation_id, step_id): invocation_step_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}/steps/{step_id}") self._assert_status_code_is(invocation_step_response, 200) invocation_step_details = invocation_step_response.json() return invocation_step_details def _execute_invocation_step_action(self, workflow_id, invocation_id, step_id, action): raw_url = f"workflows/{workflow_id}/usage/{invocation_id}/steps/{step_id}" url = self._api_url(raw_url, use_key=True) payload = dumps(dict(action=action)) action_response = put(url, data=payload) self._assert_status_code_is(action_response, 200) invocation_step_details = action_response.json() return invocation_step_details def _run_workflow_once_get_invocation(self, name: str): workflow = self.workflow_populator.load_workflow(name=name) workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) response = self._get(f"workflows/{workflow_id}/usage") self._assert_status_code_is(response, 200) assert len(response.json()) == 0 run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request) self._assert_status_code_is(run_workflow_response, 200) response = self._get(f"workflows/{workflow_id}/usage") self._assert_status_code_is(response, 200) usages = response.json() assert len(usages) == 1 return workflow_id, usages[0] def _setup_random_x2_workflow_steps(self, name: str): workflow_request, history_id, workflow_id = self._setup_random_x2_workflow(name) random_line_steps = self._random_lines_steps(workflow_request, workflow_id) return workflow_request, history_id, workflow_id, random_line_steps def _random_lines_steps(self, workflow_request: dict, workflow_id: str): workflow_summary_response = self._get(f"workflows/{workflow_id}") self._assert_status_code_is(workflow_summary_response, 200) steps = workflow_summary_response.json()["steps"] return sorted((step for step in steps.values() if step["tool_id"] == "random_lines1"), key=lambda step: step["id"]) def _setup_random_x2_workflow(self, name: str): workflow = self.workflow_populator.load_random_x2_workflow(name) uploaded_workflow_id = self.workflow_populator.create_workflow(workflow) workflow_inputs = self.workflow_populator.workflow_inputs(uploaded_workflow_id) key = next(iter(workflow_inputs.keys())) history_id = self.dataset_populator.new_history() ten_lines = "\n".join(str(_) for _ in range(10)) hda1 = self.dataset_populator.new_dataset(history_id, content=ten_lines) workflow_request = dict( history=f"hist_id={history_id}", ds_map=dumps({ key: self._ds_entry(hda1), }), ) return workflow_request, history_id, uploaded_workflow_id def __review_paused_steps(self, uploaded_workflow_id, invocation_id, order_index, action=True): invocation = self._invocation_details(uploaded_workflow_id, invocation_id) invocation_steps = invocation["steps"] pause_steps = [s for s in invocation_steps if s['order_index'] == order_index] for pause_step in pause_steps: pause_step_id = pause_step['id'] self._execute_invocation_step_action(uploaded_workflow_id, invocation_id, pause_step_id, action=action) def __assert_lines_hid_line_count_is(self, history, hid, lines): contents_url = f"histories/{history}/contents" history_contents = self.__history_contents(history) hda_summary = next(hc for hc in history_contents if hc["hid"] == hid) hda_info_response = self._get(f"{contents_url}/{hda_summary['id']}") self._assert_status_code_is(hda_info_response, 200) self.assertEqual(hda_info_response.json()["metadata_data_lines"], lines) def __history_contents(self, history_id): contents_url = f"histories/{history_id}/contents" history_contents_response = self._get(contents_url) self._assert_status_code_is(history_contents_response, 200) return history_contents_response.json() def __invoke_workflow(self, *args, **kwds): return self.workflow_populator.invoke_workflow(*args, **kwds) def __import_workflow(self, workflow_id, deprecated_route=False): if deprecated_route: route = "workflows/import" import_data = dict( workflow_id=workflow_id, ) else: route = "workflows" import_data = dict( shared_workflow_id=workflow_id, ) return self._post(route, import_data) def _show_workflow(self, workflow_id): show_response = self._get(f"workflows/{workflow_id}") self._assert_status_code_is(show_response, 200) return show_response.json() def _assert_looks_like_instance_workflow_representation(self, workflow): self._assert_has_keys( workflow, 'url', 'owner', 'inputs', 'annotation', 'steps' ) for step in workflow["steps"].values(): self._assert_has_keys( step, 'id', 'type', 'tool_id', 'tool_version', 'annotation', 'tool_inputs', 'input_steps', ) def _all_user_invocation_ids(self): all_invocations_for_user = self._get("invocations") self._assert_status_code_is(all_invocations_for_user, 200) invocation_ids = [i["id"] for i in all_invocations_for_user.json()] return invocation_ids
[docs]class AdminWorkflowsApiTestCase(BaseWorkflowsApiTestCase): require_admin_user = True
[docs] def test_import_export_dynamic_tools(self): workflow_id = self._upload_yaml_workflow(""" class: GalaxyWorkflow steps: - type: input label: input1 - tool_id: cat1 label: first_cat state: input1: $link: 0 - label: embed1 run: class: GalaxyTool command: echo 'hello world 2' > $output1 outputs: output1: format: txt - tool_id: cat1 state: input1: $link: first_cat/out_file1 queries: - input2: $link: embed1/output1 test_data: input1: "hello world" """) downloaded_workflow = self._download_workflow(workflow_id) response = self.workflow_populator.create_workflow_response(downloaded_workflow) workflow_id = response.json()["id"] history_id = self.dataset_populator.new_history() hda1 = self.dataset_populator.new_dataset(history_id, content="Hello World Second!") workflow_request = dict( inputs_by="name", inputs=json.dumps({'input1': self._ds_entry(hda1)}), ) invocation_id = self.workflow_populator.invoke_workflow(workflow_id, history_id=history_id, request=workflow_request, assert_ok=True) self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) self.assertEqual("Hello World Second!\nhello world 2\n", self.dataset_populator.get_history_dataset_content(history_id, hid=4))