Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.extract
""" This module contains functionality to aid in extracting workflows from
histories.
"""
import logging
from collections import OrderedDict
from galaxy import exceptions, model
from galaxy.tool_util.parser import ToolOutputCollectionPart
from galaxy.tools.parameters.basic import (
    DataCollectionToolParameter,
    DataToolParameter
)
from galaxy.tools.parameters.grouping import (
    Conditional,
    Repeat,
    Section
)
from galaxy.util import listify
from .steps import (
    attach_ordered_steps,
    order_workflow_steps_with_levels
)
log = logging.getLogger(__name__)
WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored"
[docs]def extract_workflow(trans, user, history=None, job_ids=None, dataset_ids=None, dataset_collection_ids=None, workflow_name=None, dataset_names=None, dataset_collection_names=None):
    steps = extract_steps(trans, history=history, job_ids=job_ids, dataset_ids=dataset_ids, dataset_collection_ids=dataset_collection_ids, dataset_names=dataset_names, dataset_collection_names=None)
    # Workflow to populate
    workflow = model.Workflow()
    workflow.name = workflow_name
    # Order the steps if possible
    attach_ordered_steps(workflow, steps)
    # And let's try to set up some reasonable locations on the canvas
    # (these are pretty arbitrary values)
    levorder = order_workflow_steps_with_levels(steps)
    base_pos = 10
    for i, steps_at_level in enumerate(levorder):
        for j, index in enumerate(steps_at_level):
            step = steps[index]
            step.position = dict(top=(base_pos + 120 * j),
                                 left=(base_pos + 220 * i))
    # Store it
    stored = model.StoredWorkflow()
    stored.user = user
    stored.name = workflow_name
    workflow.stored_workflow = stored
    stored.latest_workflow = workflow
    trans.sa_session.add(stored)
    trans.sa_session.flush()
    return stored
def extract_steps(trans, history=None, job_ids=None, dataset_ids=None, dataset_collection_ids=None, dataset_names=None, dataset_collection_names=None):
    # Ensure job_ids and dataset_ids are lists (possibly empty)
    job_ids = listify(job_ids)
    dataset_ids = listify(dataset_ids)
    dataset_collection_ids = listify(dataset_collection_ids)
    # Convert both sets of ids to integers
    job_ids = [int(_) for _ in job_ids]
    dataset_ids = [int(_) for _ in dataset_ids]
    dataset_collection_ids = [int(_) for _ in dataset_collection_ids]
    # Find each job, for security we (implicitly) check that they are
    # associated with a job in the current history.
    summary = WorkflowSummary(trans, history)
    jobs = summary.jobs
    steps = []
    hid_to_output_pair = {}
    # Input dataset steps
    for i, hid in enumerate(dataset_ids):
        step = model.WorkflowStep()
        step.type = 'data_input'
        if dataset_names:
            name = dataset_names[i]
        else:
            name = "Input Dataset"
        step.tool_inputs = dict(name=name)
        hid_to_output_pair[hid] = (step, 'output')
        steps.append(step)
    for i, hid in enumerate(dataset_collection_ids):
        step = model.WorkflowStep()
        step.type = 'data_collection_input'
        if hid not in summary.collection_types:
            raise exceptions.RequestParameterInvalidException("hid %s does not appear to be a collection" % hid)
        collection_type = summary.collection_types[hid]
        if dataset_collection_names:
            name = dataset_collection_names[i]
        else:
            name = "Input Dataset Collection"
        step.tool_inputs = dict(name=name, collection_type=collection_type)
        hid_to_output_pair[hid] = (step, 'output')
        steps.append(step)
    # Tool steps
    for job_id in job_ids:
        if job_id not in summary.job_id2representative_job:
            log.warning("job_id {} not found in job_id2representative_job {}".format(job_id, summary.job_id2representative_job))
            raise AssertionError("Attempt to create workflow with job not connected to current history")
        job = summary.job_id2representative_job[job_id]
        tool_inputs, associations = step_inputs(trans, job)
        step = model.WorkflowStep()
        step.type = 'tool'
        step.tool_id = job.tool_id
        step.tool_version = job.tool_version
        step.tool_inputs = tool_inputs
        # NOTE: We shouldn't need to do two passes here since only
        #       an earlier job can be used as an input to a later
        #       job.
        for other_hid, input_name in associations:
            if job in summary.implicit_map_jobs:
                an_implicit_output_collection = jobs[job][0][1]
                input_collection = an_implicit_output_collection.find_implicit_input_collection(input_name)
                if input_collection:
                    other_hid = input_collection.hid
                else:
                    log.info("Cannot find implicit input collection for %s" % input_name)
            if other_hid in hid_to_output_pair:
                step_input = step.get_or_add_input(input_name)
                other_step, other_name = hid_to_output_pair[other_hid]
                conn = model.WorkflowStepConnection()
                conn.input_step_input = step_input
                # Should always be connected to an earlier step
                conn.output_step = other_step
                conn.output_name = other_name
        steps.append(step)
        # Store created dataset hids
        for assoc in (job.output_datasets + job.output_dataset_collection_instances):
            assoc_name = assoc.name
            if ToolOutputCollectionPart.is_named_collection_part_name(assoc_name):
                continue
            if job in summary.implicit_map_jobs:
                hid = None
                for implicit_pair in jobs[job]:
                    query_assoc_name, dataset_collection = implicit_pair
                    if query_assoc_name == assoc_name or assoc_name.startswith("__new_primary_file_%s|" % query_assoc_name):
                        hid = dataset_collection.hid
                if hid is None:
                    template = "Failed to find matching implicit job - job id is %s, implicit pairs are %s, assoc_name is %s."
                    message = template % (job.id, jobs[job], assoc_name)
                    log.warning(message)
                    raise Exception("Failed to extract job.")
            else:
                if hasattr(assoc, "dataset"):
                    hid = assoc.dataset.hid
                else:
                    hid = assoc.dataset_collection_instance.hid
            hid_to_output_pair[hid] = (step, assoc.name)
    return steps
class FakeJob:
    """
    Fake job object for datasets that have no creating_job_associations,
    they will be treated as "input" datasets.
    """
    def __init__(self, dataset):
        self.is_fake = True
        self.id = "fake_%s" % dataset.id
class DatasetCollectionCreationJob:
    def __init__(self, dataset_collection):
        self.is_fake = True
        self.id = "fake_%s" % dataset_collection.id
        self.from_jobs = None
        self.name = "Dataset Collection Creation"
        self.disabled_why = "Dataset collection created in a way not compatible with workflows"
    def set_jobs(self, jobs):
        assert jobs is not None
        self.from_jobs = jobs
[docs]def summarize(trans, history=None):
    """ Return mapping of job description to datasets for active items in
    supplied history - needed for building workflow from a history.
    Formerly call get_job_dict in workflow web controller.
    """
    summary = WorkflowSummary(trans, history)
    return summary.jobs, summary.warnings
class WorkflowSummary:
    def __init__(self, trans, history):
        if not history:
            history = trans.get_history()
        self.history = history
        self.warnings = set()
        self.jobs = OrderedDict()
        self.job_id2representative_job = {}  # map a non-fake job id to its representative job
        self.implicit_map_jobs = []
        self.collection_types = {}
        self.__summarize()
    def __summarize(self):
        # Make a first pass handle all singleton jobs, input dataset and dataset collections
        # just grab the implicitly mapped jobs and handle in second pass. Second pass is
        # needed because cannot allow selection of individual datasets from an implicit
        # mapping during extraction - you get the collection or nothing.
        for content in self.history.active_contents:
            self.__summarize_content(content)
    def __summarize_content(self, content):
        # Update internal state for history content (either an HDA or
        # an HDCA).
        if content.history_content_type == "dataset_collection":
            self.__summarize_dataset_collection(content)
        else:
            self.__summarize_dataset(content)
    def __summarize_dataset_collection(self, dataset_collection):
        dataset_collection = self.__original_hdca(dataset_collection)
        hid = dataset_collection.hid
        self.collection_types[hid] = dataset_collection.collection.collection_type
        cja = dataset_collection.creating_job_associations
        if cja:
            # Use the "first" job to represent all mapped jobs.
            representative_assoc = cja[0]
            representative_job = representative_assoc.job
            if representative_job not in self.jobs or self.jobs[representative_job][0][1].history_content_type == "dataset":
                self.jobs[representative_job] = [(representative_assoc.name, dataset_collection)]
                if dataset_collection.implicit_output_name:
                    self.implicit_map_jobs.append(representative_job)
            else:
                self.jobs[representative_job].append((representative_assoc.name, dataset_collection))
            for assoc in cja:
                job = assoc.job
                self.job_id2representative_job[job.id] = representative_job
        # This whole elif condition may no longer be needed do to additional
        # tracking with creating_job_associations. Will delete at some point.
        elif dataset_collection.implicit_output_name:
            # TODO: Optimize db call
            dataset_instance = dataset_collection.collection.dataset_instances[0]
            if not self.__check_state(dataset_instance):
                # Just checking the state of one instance, don't need more but
                # makes me wonder if even need this check at all?
                return
            original_hda = self.__original_hda(dataset_instance)
            if not original_hda.creating_job_associations:
                log.warning("An implicitly create output dataset collection doesn't have a creating_job_association, should not happen!")
                job = DatasetCollectionCreationJob(dataset_collection)
                self.jobs[job] = [(None, dataset_collection)]
            for assoc in original_hda.creating_job_associations:
                job = assoc.job
                if job not in self.jobs or self.jobs[job][0][1].history_content_type == "dataset":
                    self.jobs[job] = [(assoc.name, dataset_collection)]
                    self.job_id2representative_job[job.id] = job
                    self.implicit_map_jobs.append(job)
                else:
                    self.jobs[job].append((assoc.name, dataset_collection))
        else:
            job = DatasetCollectionCreationJob(dataset_collection)
            self.jobs[job] = [(None, dataset_collection)]
    def __summarize_dataset(self, dataset):
        if not self.__check_state(dataset):
            return
        original_hda = self.__original_hda(dataset)
        if not original_hda.creating_job_associations:
            self.jobs[FakeJob(dataset)] = [(None, dataset)]
        for assoc in original_hda.creating_job_associations:
            job = assoc.job
            if job in self.jobs:
                self.jobs[job].append((assoc.name, dataset))
            else:
                self.jobs[job] = [(assoc.name, dataset)]
                self.job_id2representative_job[job.id] = job
    def __original_hdca(self, hdca):
        while hdca.copied_from_history_dataset_collection_association:
            hdca = hdca.copied_from_history_dataset_collection_association
        return hdca
    def __original_hda(self, hda):
        # if this hda was copied from another, we need to find the job that created the original hda
        while hda.copied_from_history_dataset_association:
            hda = hda.copied_from_history_dataset_association
        return hda
    def __check_state(self, hda):
        # FIXME: Create "Dataset.is_finished"
        if hda.state in ('new', 'running', 'queued'):
            self.warnings.add(WARNING_SOME_DATASETS_NOT_READY)
            return
        return hda
def step_inputs(trans, job):
    tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version)
    param_values = job.get_param_values(trans.app, ignore_errors=True)  # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here
    associations = __cleanup_param_values(tool.inputs, param_values)
    tool_inputs = tool.params_to_strings(param_values, trans.app)
    return tool_inputs, associations
def __cleanup_param_values(inputs, values):
    """
    Remove 'Data' values from `param_values`, along with metadata cruft,
    but track the associations.
    """
    associations = []
    # dbkey is pushed in by the framework
    if 'dbkey' in values:
        del values['dbkey']
    root_values = values
    root_input_keys = inputs.keys()
    # Recursively clean data inputs and dynamic selects
    def cleanup(prefix, inputs, values):
        for key, input in inputs.items():
            if isinstance(input, DataToolParameter) or isinstance(input, DataCollectionToolParameter):
                items = values[key]
                values[key] = None
                # HACK: Nested associations are not yet working, but we
                #       still need to clean them up so we can serialize
                # if not( prefix ):
                for item in listify(items):
                    if isinstance(item, model.DatasetCollectionElement):
                        item = item.first_dataset_instance()
                    if item:  # this is false for a non-set optional dataset
                        associations.append((item.hid, prefix + key))
                # Cleanup the other deprecated crap associated with datasets
                # as well. Worse, for nested datasets all the metadata is
                # being pushed into the root. FIXME: MUST REMOVE SOON
                key = prefix + key + "_"
                for k in root_values.keys():
                    if k not in root_input_keys and k.startswith(key):
                        del root_values[k]
            elif isinstance(input, Repeat):
                if key in values:
                    group_values = values[key]
                    for i, rep_values in enumerate(group_values):
                        rep_index = rep_values['__index__']
                        cleanup("%s%s_%d|" % (prefix, key, rep_index), input.inputs, group_values[i])
            elif isinstance(input, Conditional):
                # Scrub dynamic resource related parameters from workflows,
                # they cause problems and the workflow probably should include
                # their state in workflow encoding.
                if input.name == '__job_resource':
                    if input.name in values:
                        del values[input.name]
                    return
                if input.name in values:
                    group_values = values[input.name]
                    current_case = group_values['__current_case__']
                    cleanup("{}{}|".format(prefix, key), input.cases[current_case].inputs, group_values)
            elif isinstance(input, Section):
                if input.name in values:
                    cleanup("{}{}|".format(prefix, key), input.inputs, values[input.name])
    cleanup("", inputs, values)
    return associations
__all__ = ('summarize', 'extract_workflow')