Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.extract
""" This module contains functionality to aid in extracting workflows from
histories.
"""
import logging
from typing import Optional
from galaxy import (
exceptions,
model,
)
from galaxy.model.base import (
ensure_object_added_to_session,
transaction,
)
from galaxy.tool_util.parser import ToolOutputCollectionPart
from galaxy.tools.parameters.basic import (
DataCollectionToolParameter,
DataToolParameter,
)
from galaxy.tools.parameters.grouping import (
Conditional,
Repeat,
Section,
)
from galaxy.util import listify
from .steps import (
attach_ordered_steps,
order_workflow_steps_with_levels,
)
log = logging.getLogger(__name__)
WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored"
[docs]def extract_workflow(
trans,
user,
history=None,
job_ids=None,
dataset_ids=None,
dataset_collection_ids=None,
workflow_name=None,
dataset_names=None,
dataset_collection_names=None,
):
steps = extract_steps(
trans,
history=history,
job_ids=job_ids,
dataset_ids=dataset_ids,
dataset_collection_ids=dataset_collection_ids,
dataset_names=dataset_names,
dataset_collection_names=None,
)
# Workflow to populate
workflow = model.Workflow()
workflow.name = workflow_name
workflow.steps = steps
# Order the steps if possible
attach_ordered_steps(workflow)
# And let's try to set up some reasonable locations on the canvas
# (these are pretty arbitrary values)
levorder = order_workflow_steps_with_levels(steps)
base_pos = 10
for i, steps_at_level in enumerate(levorder):
for j, index in enumerate(steps_at_level):
step = steps[index]
step.position = dict(top=(base_pos + 120 * j), left=(base_pos + 220 * i))
# Store it
stored = model.StoredWorkflow()
stored.user = user
stored.name = workflow_name
workflow.stored_workflow = stored
stored.latest_workflow = workflow
trans.sa_session.add(stored)
ensure_object_added_to_session(workflow, session=trans.sa_session)
with transaction(trans.sa_session):
trans.sa_session.commit()
return stored
def extract_steps(
trans,
history=None,
job_ids=None,
dataset_ids=None,
dataset_collection_ids=None,
dataset_names=None,
dataset_collection_names=None,
):
# Ensure job_ids and dataset_ids are lists (possibly empty)
job_ids = listify(job_ids)
dataset_ids = listify(dataset_ids)
dataset_collection_ids = listify(dataset_collection_ids)
# Convert both sets of ids to integers
job_ids = [int(_) for _ in job_ids]
dataset_ids = [int(_) for _ in dataset_ids]
dataset_collection_ids = [int(_) for _ in dataset_collection_ids]
# Find each job, for security we (implicitly) check that they are
# associated with a job in the current history.
summary = WorkflowSummary(trans, history)
jobs = summary.jobs
steps = []
step_labels = set()
hid_to_output_pair = {}
# Input dataset steps
for i, hid in enumerate(dataset_ids):
step = model.WorkflowStep()
step.type = "data_input"
if dataset_names:
name = dataset_names[i]
else:
name = "Input Dataset"
if name not in step_labels:
step.label = name
step_labels.add(name)
step.tool_inputs = dict(name=name) # type:ignore[assignment]
hid_to_output_pair[hid] = (step, "output")
steps.append(step)
for i, hid in enumerate(dataset_collection_ids):
step = model.WorkflowStep()
step.type = "data_collection_input"
if hid not in summary.collection_types:
raise exceptions.RequestParameterInvalidException(f"hid {hid} does not appear to be a collection")
collection_type = summary.collection_types[hid]
if dataset_collection_names:
name = dataset_collection_names[i]
else:
name = "Input Dataset Collection"
if name not in step_labels:
step.label = name
step_labels.add(name)
step.tool_inputs = dict(name=name, collection_type=collection_type) # type:ignore[assignment]
hid_to_output_pair[hid] = (step, "output")
steps.append(step)
# Tool steps
for job_id in job_ids:
if job_id not in summary.job_id2representative_job:
log.warning(f"job_id {job_id} not found in job_id2representative_job {summary.job_id2representative_job}")
raise AssertionError("Attempt to create workflow with job not connected to current history")
job = summary.job_id2representative_job[job_id]
tool_inputs, associations = step_inputs(trans, job)
step = model.WorkflowStep()
step.type = "tool"
step.tool_id = job.tool_id
step.tool_version = job.tool_version
step.tool_inputs = tool_inputs
# NOTE: We shouldn't need to do two passes here since only
# an earlier job can be used as an input to a later
# job.
for other_hid, input_name in associations:
if job in summary.implicit_map_jobs:
an_implicit_output_collection = jobs[job][0][1]
input_collection = an_implicit_output_collection.find_implicit_input_collection(input_name)
if input_collection:
other_hid = input_collection.hid
else:
log.info(f"Cannot find implicit input collection for {input_name}")
if other_hid in hid_to_output_pair:
step_input = step.get_or_add_input(input_name)
other_step, other_name = hid_to_output_pair[other_hid]
conn = model.WorkflowStepConnection()
conn.input_step_input = step_input
# Should always be connected to an earlier step
conn.output_step = other_step
conn.output_name = other_name
steps.append(step)
# Store created dataset hids
for assoc in job.output_datasets + job.output_dataset_collection_instances:
assoc_name = assoc.name
if ToolOutputCollectionPart.is_named_collection_part_name(assoc_name):
continue
if assoc_name.startswith("__new_primary_file"):
continue
if job in summary.implicit_map_jobs:
hid = None
for implicit_pair in jobs[job]:
query_assoc_name, dataset_collection = implicit_pair
if query_assoc_name == assoc_name or assoc_name.startswith(
f"__new_primary_file_{query_assoc_name}|"
):
hid = summary.hid(dataset_collection)
if hid is None:
template = (
"Failed to find matching implicit job - job id is %s, implicit pairs are %s, assoc_name is %s."
)
message = template % (job.id, jobs[job], assoc_name)
log.warning(message)
raise Exception("Failed to extract job.")
else:
if hasattr(assoc, "dataset"):
has_hid = assoc.dataset
else:
has_hid = assoc.dataset_collection_instance
hid = summary.hid(has_hid)
if hid in hid_to_output_pair:
log.warning(f"duplicate hid found in extract_steps [{hid}]")
hid_to_output_pair[hid] = (step, assoc.name)
return steps
class FakeJob:
"""
Fake job object for datasets that have no creating_job_associations,
they will be treated as "input" datasets.
"""
def __init__(self, dataset):
self.is_fake = True
self.id = f"fake_{dataset.id}"
self.name = self._guess_name_from_dataset(dataset)
def _guess_name_from_dataset(self, dataset) -> Optional[str]:
"""Tries to guess the name of the fake job from the dataset associations."""
if dataset.copied_from_history_dataset_association:
return "Import from History"
if dataset.copied_from_library_dataset_dataset_association:
return "Import from Library"
return None
class DatasetCollectionCreationJob:
def __init__(self, dataset_collection):
self.is_fake = True
self.id = f"fake_{dataset_collection.id}"
self.from_jobs = None
self.name = "Dataset Collection Creation"
self.disabled_why = "Dataset collection created in a way not compatible with workflows"
def set_jobs(self, jobs):
assert jobs is not None
self.from_jobs = jobs
[docs]def summarize(trans, history=None):
"""Return mapping of job description to datasets for active items in
supplied history - needed for building workflow from a history.
Formerly call get_job_dict in workflow web controller.
"""
summary = WorkflowSummary(trans, history)
return summary.jobs, summary.warnings
class WorkflowSummary:
def __init__(self, trans, history):
if not history:
history = trans.get_history()
self.history = history
self.warnings = set()
self.jobs = {}
self.job_id2representative_job = {} # map a non-fake job id to its representative job
self.implicit_map_jobs = []
self.collection_types = {}
self.hda_hid_in_history = {}
self.hdca_hid_in_history = {}
self.__summarize()
def hid(self, object):
if object.history_content_type == "dataset_collection":
if object.id in self.hdca_hid_in_history:
return self.hdca_hid_in_history[object.id]
elif object.history == self.history:
return object.hid
else:
log.warning("extraction issue, using hdca hid from outside current history and unmapped")
return object.hid
else:
if object.id in self.hda_hid_in_history:
return self.hda_hid_in_history[object.id]
elif object.history == self.history:
return object.hid
else:
log.warning("extraction issue, using hda hid from outside current history and unmapped")
return object.hid
def __summarize(self):
# Make a first pass handle all singleton jobs, input dataset and dataset collections
# just grab the implicitly mapped jobs and handle in second pass. Second pass is
# needed because cannot allow selection of individual datasets from an implicit
# mapping during extraction - you get the collection or nothing.
for content in self.history.active_contents:
self.__summarize_content(content)
def __summarize_content(self, content):
# Update internal state for history content (either an HDA or
# an HDCA).
if content.history_content_type == "dataset_collection":
self.__summarize_dataset_collection(content)
else:
self.__summarize_dataset(content)
def __summarize_dataset_collection(self, dataset_collection):
hid_in_history = dataset_collection.hid
dataset_collection = self.__original_hdca(dataset_collection)
self.hdca_hid_in_history[dataset_collection.id] = hid_in_history
hid = dataset_collection.hid
self.collection_types[hid] = dataset_collection.collection.collection_type
if cja := dataset_collection.creating_job_associations:
# Use the "first" job to represent all mapped jobs.
representative_assoc = cja[0]
representative_job = representative_assoc.job
if (
representative_job not in self.jobs
or self.jobs[representative_job][0][1].history_content_type == "dataset"
):
self.jobs[representative_job] = [(representative_assoc.name, dataset_collection)]
if dataset_collection.implicit_output_name:
self.implicit_map_jobs.append(representative_job)
else:
self.jobs[representative_job].append((representative_assoc.name, dataset_collection))
for assoc in cja:
job = assoc.job
self.job_id2representative_job[job.id] = representative_job
# This whole elif condition may no longer be needed do to additional
# tracking with creating_job_associations. Will delete at some point.
elif dataset_collection.implicit_output_name:
# TODO: Optimize db call
element = dataset_collection.collection.first_dataset_element
if not element:
# Got no dataset instance to walk back up to creating job.
# TODO track this via tool request model
job = DatasetCollectionCreationJob(dataset_collection)
self.jobs[job] = [(None, dataset_collection)]
return
else:
dataset_instance = element.hda
if not self.__check_state(dataset_instance):
# Just checking the state of one instance, don't need more but
# makes me wonder if even need this check at all?
return
original_hda = self.__original_hda(dataset_instance)
if not original_hda.creating_job_associations:
log.warning(
"An implicitly create output dataset collection doesn't have a creating_job_association, should not happen!"
)
job = DatasetCollectionCreationJob(dataset_collection)
self.jobs[job] = [(None, dataset_collection)]
for assoc in original_hda.creating_job_associations:
job = assoc.job
if job not in self.jobs or self.jobs[job][0][1].history_content_type == "dataset":
self.jobs[job] = [(assoc.name, dataset_collection)]
self.job_id2representative_job[job.id] = job
self.implicit_map_jobs.append(job)
else:
self.jobs[job].append((assoc.name, dataset_collection))
else:
job = DatasetCollectionCreationJob(dataset_collection)
self.jobs[job] = [(None, dataset_collection)]
def __summarize_dataset(self, dataset):
if not self.__check_state(dataset):
return
hid_in_history = dataset.hid
original_hda = self.__original_hda(dataset)
self.hda_hid_in_history[original_hda.id] = hid_in_history
if not original_hda.creating_job_associations:
self.jobs[FakeJob(dataset)] = [(None, dataset)]
for assoc in original_hda.creating_job_associations:
job = assoc.job
if job in self.jobs:
self.jobs[job].append((assoc.name, dataset))
else:
self.jobs[job] = [(assoc.name, dataset)]
self.job_id2representative_job[job.id] = job
def __original_hdca(self, hdca):
while hdca.copied_from_history_dataset_collection_association:
hdca = hdca.copied_from_history_dataset_collection_association
return hdca
def __original_hda(self, hda):
# if this hda was copied from another, we need to find the job that created the original hda
while hda.copied_from_history_dataset_association:
hda = hda.copied_from_history_dataset_association
return hda
def __check_state(self, hda):
# FIXME: Create "Dataset.is_finished"
if hda.state in ("new", "running", "queued"):
self.warnings.add(WARNING_SOME_DATASETS_NOT_READY)
return
return hda
def step_inputs(trans, job):
tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version)
param_values = job.get_param_values(
trans.app, ignore_errors=True
) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here
associations = __cleanup_param_values(tool.inputs, param_values)
tool_inputs = tool.params_to_strings(param_values, trans.app)
return tool_inputs, associations
def __cleanup_param_values(inputs, values):
"""
Remove 'Data' values from `param_values`, along with metadata cruft,
but track the associations.
"""
associations = []
# dbkey is pushed in by the framework
if "dbkey" in values:
del values["dbkey"]
root_values = values
root_input_keys = inputs.keys()
# Recursively clean data inputs and dynamic selects
def cleanup(prefix, inputs, values):
for key, input in inputs.items():
if isinstance(input, DataToolParameter) or isinstance(input, DataCollectionToolParameter):
items = values[key]
values[key] = None
# HACK: Nested associations are not yet working, but we
# still need to clean them up so we can serialize
# if not( prefix ):
for item in listify(items):
if isinstance(item, model.DatasetCollectionElement):
item = item.first_dataset_instance()
if item: # this is false for a non-set optional dataset
associations.append((item.hid, prefix + key))
# Cleanup the other deprecated crap associated with datasets
# as well. Worse, for nested datasets all the metadata is
# being pushed into the root. FIXME: MUST REMOVE SOON
key = f"{prefix + key}_"
for k in root_values.keys():
if k not in root_input_keys and k.startswith(key):
del root_values[k]
elif isinstance(input, Repeat):
if key in values:
group_values = values[key]
for i, rep_values in enumerate(group_values):
rep_index = rep_values["__index__"]
cleanup("%s%s_%d|" % (prefix, key, rep_index), input.inputs, group_values[i])
elif isinstance(input, Conditional):
# Scrub dynamic resource related parameters from workflows,
# they cause problems and the workflow probably should include
# their state in workflow encoding.
if input.name == "__job_resource":
if input.name in values:
del values[input.name]
return
if input.name in values:
group_values = values[input.name]
current_case = group_values["__current_case__"]
cleanup(f"{prefix}{key}|", input.cases[current_case].inputs, group_values)
elif isinstance(input, Section):
if input.name in values:
cleanup(f"{prefix}{key}|", input.inputs, values[input.name])
cleanup("", inputs, values)
return associations
__all__ = ("summarize", "extract_workflow")