Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.workflow.extract

""" This module contains functionality to aid in extracting workflows from
histories.
"""

import logging
from typing import Optional

from galaxy import (
    exceptions,
    model,
)
from galaxy.model.base import (
    ensure_object_added_to_session,
    transaction,
)
from galaxy.tool_util.parser import ToolOutputCollectionPart
from galaxy.tools.parameters.basic import (
    DataCollectionToolParameter,
    DataToolParameter,
)
from galaxy.tools.parameters.grouping import (
    Conditional,
    Repeat,
    Section,
)
from galaxy.util import listify
from .steps import (
    attach_ordered_steps,
    order_workflow_steps_with_levels,
)

log = logging.getLogger(__name__)

WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored"


[docs]def extract_workflow( trans, user, history=None, job_ids=None, dataset_ids=None, dataset_collection_ids=None, workflow_name=None, dataset_names=None, dataset_collection_names=None, ): steps = extract_steps( trans, history=history, job_ids=job_ids, dataset_ids=dataset_ids, dataset_collection_ids=dataset_collection_ids, dataset_names=dataset_names, dataset_collection_names=None, ) # Workflow to populate workflow = model.Workflow() workflow.name = workflow_name workflow.steps = steps # Order the steps if possible attach_ordered_steps(workflow) # And let's try to set up some reasonable locations on the canvas # (these are pretty arbitrary values) levorder = order_workflow_steps_with_levels(steps) base_pos = 10 for i, steps_at_level in enumerate(levorder): for j, index in enumerate(steps_at_level): step = steps[index] step.position = dict(top=(base_pos + 120 * j), left=(base_pos + 220 * i)) # Store it stored = model.StoredWorkflow() stored.user = user stored.name = workflow_name workflow.stored_workflow = stored stored.latest_workflow = workflow trans.sa_session.add(stored) ensure_object_added_to_session(workflow, session=trans.sa_session) with transaction(trans.sa_session): trans.sa_session.commit() return stored
def extract_steps( trans, history=None, job_ids=None, dataset_ids=None, dataset_collection_ids=None, dataset_names=None, dataset_collection_names=None, ): # Ensure job_ids and dataset_ids are lists (possibly empty) job_ids = listify(job_ids) dataset_ids = listify(dataset_ids) dataset_collection_ids = listify(dataset_collection_ids) # Convert both sets of ids to integers job_ids = [int(_) for _ in job_ids] dataset_ids = [int(_) for _ in dataset_ids] dataset_collection_ids = [int(_) for _ in dataset_collection_ids] # Find each job, for security we (implicitly) check that they are # associated with a job in the current history. summary = WorkflowSummary(trans, history) jobs = summary.jobs steps = [] step_labels = set() hid_to_output_pair = {} # Input dataset steps for i, hid in enumerate(dataset_ids): step = model.WorkflowStep() step.type = "data_input" if dataset_names: name = dataset_names[i] else: name = "Input Dataset" if name not in step_labels: step.label = name step_labels.add(name) step.tool_inputs = dict(name=name) # type:ignore[assignment] hid_to_output_pair[hid] = (step, "output") steps.append(step) for i, hid in enumerate(dataset_collection_ids): step = model.WorkflowStep() step.type = "data_collection_input" if hid not in summary.collection_types: raise exceptions.RequestParameterInvalidException(f"hid {hid} does not appear to be a collection") collection_type = summary.collection_types[hid] if dataset_collection_names: name = dataset_collection_names[i] else: name = "Input Dataset Collection" if name not in step_labels: step.label = name step_labels.add(name) step.tool_inputs = dict(name=name, collection_type=collection_type) # type:ignore[assignment] hid_to_output_pair[hid] = (step, "output") steps.append(step) # Tool steps for job_id in job_ids: if job_id not in summary.job_id2representative_job: log.warning(f"job_id {job_id} not found in job_id2representative_job {summary.job_id2representative_job}") raise AssertionError("Attempt to create workflow with job not connected to current history") job = summary.job_id2representative_job[job_id] tool_inputs, associations = step_inputs(trans, job) step = model.WorkflowStep() step.type = "tool" step.tool_id = job.tool_id step.tool_version = job.tool_version step.tool_inputs = tool_inputs # NOTE: We shouldn't need to do two passes here since only # an earlier job can be used as an input to a later # job. for other_hid, input_name in associations: if job in summary.implicit_map_jobs: an_implicit_output_collection = jobs[job][0][1] input_collection = an_implicit_output_collection.find_implicit_input_collection(input_name) if input_collection: other_hid = input_collection.hid else: log.info(f"Cannot find implicit input collection for {input_name}") if other_hid in hid_to_output_pair: step_input = step.get_or_add_input(input_name) other_step, other_name = hid_to_output_pair[other_hid] conn = model.WorkflowStepConnection() conn.input_step_input = step_input # Should always be connected to an earlier step conn.output_step = other_step conn.output_name = other_name steps.append(step) # Store created dataset hids for assoc in job.output_datasets + job.output_dataset_collection_instances: assoc_name = assoc.name if ToolOutputCollectionPart.is_named_collection_part_name(assoc_name): continue if assoc_name.startswith("__new_primary_file"): continue if job in summary.implicit_map_jobs: hid = None for implicit_pair in jobs[job]: query_assoc_name, dataset_collection = implicit_pair if query_assoc_name == assoc_name or assoc_name.startswith( f"__new_primary_file_{query_assoc_name}|" ): hid = summary.hid(dataset_collection) if hid is None: template = ( "Failed to find matching implicit job - job id is %s, implicit pairs are %s, assoc_name is %s." ) message = template % (job.id, jobs[job], assoc_name) log.warning(message) raise Exception("Failed to extract job.") else: if hasattr(assoc, "dataset"): has_hid = assoc.dataset else: has_hid = assoc.dataset_collection_instance hid = summary.hid(has_hid) if hid in hid_to_output_pair: log.warning(f"duplicate hid found in extract_steps [{hid}]") hid_to_output_pair[hid] = (step, assoc.name) return steps class FakeJob: """ Fake job object for datasets that have no creating_job_associations, they will be treated as "input" datasets. """ def __init__(self, dataset): self.is_fake = True self.id = f"fake_{dataset.id}" self.name = self._guess_name_from_dataset(dataset) def _guess_name_from_dataset(self, dataset) -> Optional[str]: """Tries to guess the name of the fake job from the dataset associations.""" if dataset.copied_from_history_dataset_association: return "Import from History" if dataset.copied_from_library_dataset_dataset_association: return "Import from Library" return None class DatasetCollectionCreationJob: def __init__(self, dataset_collection): self.is_fake = True self.id = f"fake_{dataset_collection.id}" self.from_jobs = None self.name = "Dataset Collection Creation" self.disabled_why = "Dataset collection created in a way not compatible with workflows" def set_jobs(self, jobs): assert jobs is not None self.from_jobs = jobs
[docs]def summarize(trans, history=None): """Return mapping of job description to datasets for active items in supplied history - needed for building workflow from a history. Formerly call get_job_dict in workflow web controller. """ summary = WorkflowSummary(trans, history) return summary.jobs, summary.warnings
class WorkflowSummary: def __init__(self, trans, history): if not history: history = trans.get_history() self.history = history self.warnings = set() self.jobs = {} self.job_id2representative_job = {} # map a non-fake job id to its representative job self.implicit_map_jobs = [] self.collection_types = {} self.hda_hid_in_history = {} self.hdca_hid_in_history = {} self.__summarize() def hid(self, object): if object.history_content_type == "dataset_collection": if object.id in self.hdca_hid_in_history: return self.hdca_hid_in_history[object.id] elif object.history == self.history: return object.hid else: log.warning("extraction issue, using hdca hid from outside current history and unmapped") return object.hid else: if object.id in self.hda_hid_in_history: return self.hda_hid_in_history[object.id] elif object.history == self.history: return object.hid else: log.warning("extraction issue, using hda hid from outside current history and unmapped") return object.hid def __summarize(self): # Make a first pass handle all singleton jobs, input dataset and dataset collections # just grab the implicitly mapped jobs and handle in second pass. Second pass is # needed because cannot allow selection of individual datasets from an implicit # mapping during extraction - you get the collection or nothing. for content in self.history.active_contents: self.__summarize_content(content) def __summarize_content(self, content): # Update internal state for history content (either an HDA or # an HDCA). if content.history_content_type == "dataset_collection": self.__summarize_dataset_collection(content) else: self.__summarize_dataset(content) def __summarize_dataset_collection(self, dataset_collection): hid_in_history = dataset_collection.hid dataset_collection = self.__original_hdca(dataset_collection) self.hdca_hid_in_history[dataset_collection.id] = hid_in_history hid = dataset_collection.hid self.collection_types[hid] = dataset_collection.collection.collection_type if cja := dataset_collection.creating_job_associations: # Use the "first" job to represent all mapped jobs. representative_assoc = cja[0] representative_job = representative_assoc.job if ( representative_job not in self.jobs or self.jobs[representative_job][0][1].history_content_type == "dataset" ): self.jobs[representative_job] = [(representative_assoc.name, dataset_collection)] if dataset_collection.implicit_output_name: self.implicit_map_jobs.append(representative_job) else: self.jobs[representative_job].append((representative_assoc.name, dataset_collection)) for assoc in cja: job = assoc.job self.job_id2representative_job[job.id] = representative_job # This whole elif condition may no longer be needed do to additional # tracking with creating_job_associations. Will delete at some point. elif dataset_collection.implicit_output_name: # TODO: Optimize db call element = dataset_collection.collection.first_dataset_element if not element: # Got no dataset instance to walk back up to creating job. # TODO track this via tool request model job = DatasetCollectionCreationJob(dataset_collection) self.jobs[job] = [(None, dataset_collection)] return else: dataset_instance = element.hda if not self.__check_state(dataset_instance): # Just checking the state of one instance, don't need more but # makes me wonder if even need this check at all? return original_hda = self.__original_hda(dataset_instance) if not original_hda.creating_job_associations: log.warning( "An implicitly create output dataset collection doesn't have a creating_job_association, should not happen!" ) job = DatasetCollectionCreationJob(dataset_collection) self.jobs[job] = [(None, dataset_collection)] for assoc in original_hda.creating_job_associations: job = assoc.job if job not in self.jobs or self.jobs[job][0][1].history_content_type == "dataset": self.jobs[job] = [(assoc.name, dataset_collection)] self.job_id2representative_job[job.id] = job self.implicit_map_jobs.append(job) else: self.jobs[job].append((assoc.name, dataset_collection)) else: job = DatasetCollectionCreationJob(dataset_collection) self.jobs[job] = [(None, dataset_collection)] def __summarize_dataset(self, dataset): if not self.__check_state(dataset): return hid_in_history = dataset.hid original_hda = self.__original_hda(dataset) self.hda_hid_in_history[original_hda.id] = hid_in_history if not original_hda.creating_job_associations: self.jobs[FakeJob(dataset)] = [(None, dataset)] for assoc in original_hda.creating_job_associations: job = assoc.job if job in self.jobs: self.jobs[job].append((assoc.name, dataset)) else: self.jobs[job] = [(assoc.name, dataset)] self.job_id2representative_job[job.id] = job def __original_hdca(self, hdca): while hdca.copied_from_history_dataset_collection_association: hdca = hdca.copied_from_history_dataset_collection_association return hdca def __original_hda(self, hda): # if this hda was copied from another, we need to find the job that created the original hda while hda.copied_from_history_dataset_association: hda = hda.copied_from_history_dataset_association return hda def __check_state(self, hda): # FIXME: Create "Dataset.is_finished" if hda.state in ("new", "running", "queued"): self.warnings.add(WARNING_SOME_DATASETS_NOT_READY) return return hda def step_inputs(trans, job): tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version) param_values = job.get_param_values( trans.app, ignore_errors=True ) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here associations = __cleanup_param_values(tool.inputs, param_values) tool_inputs = tool.params_to_strings(param_values, trans.app) return tool_inputs, associations def __cleanup_param_values(inputs, values): """ Remove 'Data' values from `param_values`, along with metadata cruft, but track the associations. """ associations = [] # dbkey is pushed in by the framework if "dbkey" in values: del values["dbkey"] root_values = values root_input_keys = inputs.keys() # Recursively clean data inputs and dynamic selects def cleanup(prefix, inputs, values): for key, input in inputs.items(): if isinstance(input, DataToolParameter) or isinstance(input, DataCollectionToolParameter): items = values[key] values[key] = None # HACK: Nested associations are not yet working, but we # still need to clean them up so we can serialize # if not( prefix ): for item in listify(items): if isinstance(item, model.DatasetCollectionElement): item = item.first_dataset_instance() if item: # this is false for a non-set optional dataset associations.append((item.hid, prefix + key)) # Cleanup the other deprecated crap associated with datasets # as well. Worse, for nested datasets all the metadata is # being pushed into the root. FIXME: MUST REMOVE SOON key = f"{prefix + key}_" for k in root_values.keys(): if k not in root_input_keys and k.startswith(key): del root_values[k] elif isinstance(input, Repeat): if key in values: group_values = values[key] for i, rep_values in enumerate(group_values): rep_index = rep_values["__index__"] cleanup("%s%s_%d|" % (prefix, key, rep_index), input.inputs, group_values[i]) elif isinstance(input, Conditional): # Scrub dynamic resource related parameters from workflows, # they cause problems and the workflow probably should include # their state in workflow encoding. if input.name == "__job_resource": if input.name in values: del values[input.name] return if input.name in values: group_values = values[input.name] current_case = group_values["__current_case__"] cleanup(f"{prefix}{key}|", input.cases[current_case].inputs, group_values) elif isinstance(input, Section): if input.name in values: cleanup(f"{prefix}{key}|", input.inputs, values[input.name]) cleanup("", inputs, values) return associations __all__ = ("summarize", "extract_workflow")