Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.extract
"""This module contains functionality to aid in extracting workflows from
histories.
"""
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import (
Any,
cast,
Literal,
Optional,
)
from galaxy import (
exceptions,
model,
)
from galaxy.managers.context import ProvidesHistoryContext
from galaxy.managers.jobs import JobManager
from galaxy.model import (
History,
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
HistoryItem,
ImplicitCollectionJobs,
Job,
StoredWorkflow,
User,
WorkflowStep,
)
from galaxy.model.base import ensure_object_added_to_session
from galaxy.tool_util.parser import ToolOutputCollectionPart
from galaxy.tools.parameters.basic import (
DataCollectionToolParameter,
DataToolParameter,
)
from galaxy.tools.parameters.grouping import (
Conditional,
Repeat,
Section,
)
from galaxy.util import listify
from .steps import (
attach_ordered_steps,
order_workflow_steps_with_levels,
)
# Type alias for tool input parameter values (param name -> string value)
ToolInputs = dict[str, Any]
# Type alias for data input associations (hid, input_name) pairs linking
# history items to their corresponding tool input parameters
DataInputAssociations = list[tuple[int, str]]
log = logging.getLogger(__name__)
WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored"
def _skip_output_assoc_name(name: str) -> bool:
"""True for job-output-association names that aren't workflow-visible
outputs (named-collection-part placeholders and discovered-primary-file
rows). Both extraction paths skip these."""
return ToolOutputCollectionPart.is_named_collection_part_name(name) or name.startswith("__new_primary_file")
def _connect(step: WorkflowStep, input_name: str, source: tuple[WorkflowStep, str]) -> None:
"""Wire ``step``'s ``input_name`` to ``source`` (output_step, output_name).
The source is always an earlier step - a job only consumes outputs of jobs
that ran before it. Shared by both extraction paths."""
source_step, source_name = source
conn = model.WorkflowStepConnection()
conn.input_step_input = step.get_or_add_input(input_name)
conn.output_step = source_step
conn.output_name = source_name
[docs]
def extract_workflow(
trans: ProvidesHistoryContext,
user: User,
history: Optional[History] = None,
job_ids: Optional[list[int]] = None,
dataset_ids: Optional[list[int]] = None,
dataset_collection_ids: Optional[list[int]] = None,
workflow_name: Optional[str] = None,
dataset_names: Optional[list[str]] = None,
dataset_collection_names: Optional[list[str]] = None,
) -> StoredWorkflow:
steps = extract_steps(
trans,
history=history,
job_ids=job_ids,
dataset_ids=dataset_ids,
dataset_collection_ids=dataset_collection_ids,
dataset_names=dataset_names,
dataset_collection_names=dataset_collection_names,
)
return _finalize_workflow(trans, user, workflow_name, steps)
def _finalize_workflow(
trans: ProvidesHistoryContext,
user: User,
workflow_name: Optional[str],
steps: list[WorkflowStep],
) -> StoredWorkflow:
workflow = model.Workflow()
workflow.name = workflow_name
workflow.steps = steps
attach_ordered_steps(workflow)
levorder = order_workflow_steps_with_levels(steps)
base_pos = 10
for i, steps_at_level in enumerate(levorder):
for j, index in enumerate(steps_at_level):
step = steps[index]
step.position = dict(top=(base_pos + 120 * j), left=(base_pos + 220 * i))
stored = model.StoredWorkflow()
stored.user = user
stored.name = workflow_name
workflow.stored_workflow = stored
stored.latest_workflow = workflow
trans.sa_session.add(stored)
ensure_object_added_to_session(workflow, session=trans.sa_session)
trans.sa_session.commit()
return stored
def extract_steps(
trans: ProvidesHistoryContext,
history: Optional[History] = None,
job_ids: Optional[list[int]] = None,
dataset_ids: Optional[list[int]] = None,
dataset_collection_ids: Optional[list[int]] = None,
dataset_names: Optional[list[str]] = None,
dataset_collection_names: Optional[list[str]] = None,
) -> list[WorkflowStep]:
# Ensure job_ids and dataset_ids are lists (possibly empty)
job_ids = listify(job_ids)
dataset_ids = listify(dataset_ids)
dataset_collection_ids = listify(dataset_collection_ids)
# Convert both sets of ids to integers
job_ids = [int(_) for _ in job_ids]
dataset_ids = [int(_) for _ in dataset_ids]
dataset_collection_ids = [int(_) for _ in dataset_collection_ids]
# Find each job, for security we (implicitly) check that they are
# associated with a job in the current history.
summary = WorkflowSummary(trans, history)
jobs = summary.jobs
steps = []
step_labels = set()
hid_to_output_pair = {}
# Input dataset steps
for i, input_hid in enumerate(dataset_ids):
step = model.WorkflowStep()
step.type = "data_input"
if dataset_names:
name = dataset_names[i]
else:
name = "Input Dataset"
if name not in step_labels:
step.label = name
step_labels.add(name)
step.tool_inputs = dict(name=name)
hid_to_output_pair[input_hid] = (step, "output")
steps.append(step)
for i, input_hid in enumerate(dataset_collection_ids):
step = model.WorkflowStep()
step.type = "data_collection_input"
if input_hid not in summary.collection_types:
raise exceptions.RequestParameterInvalidException(f"hid {input_hid} does not appear to be a collection")
collection_type = summary.collection_types[input_hid]
if dataset_collection_names:
name = dataset_collection_names[i]
else:
name = "Input Dataset Collection"
if name not in step_labels:
step.label = name
step_labels.add(name)
step.tool_inputs = dict(name=name, collection_type=collection_type)
hid_to_output_pair[input_hid] = (step, "output")
steps.append(step)
# Tool steps
for job_id in job_ids:
if job_id not in summary.job_id2representative_job:
log.warning(f"job_id {job_id} not found in job_id2representative_job {summary.job_id2representative_job}")
raise AssertionError("Attempt to create workflow with job not connected to current history")
job = summary.job_id2representative_job[job_id]
tool_inputs, associations = step_inputs(trans, job)
step = model.WorkflowStep()
step.type = "tool"
step.tool_id = job.tool_id
step.tool_version = job.tool_version
step.tool_inputs = tool_inputs
if job.dynamic_tool_id:
step.dynamic_tool_id = job.dynamic_tool_id
# NOTE: We shouldn't need to do two passes here since only
# an earlier job can be used as an input to a later
# job.
for other_hid, input_name in associations:
if job in summary.implicit_map_jobs:
an_implicit_output_collection = cast(HistoryDatasetCollectionAssociation, jobs[job][0][1])
input_collection = an_implicit_output_collection.find_implicit_input_collection(input_name)
if input_collection:
other_hid = input_collection.hid
else:
log.info(f"Cannot find implicit input collection for {input_name}")
if other_hid in hid_to_output_pair:
_connect(step, input_name, hid_to_output_pair[other_hid])
steps.append(step)
# Store created dataset hids
for assoc in job.output_datasets + job.output_dataset_collection_instances:
assoc_name = assoc.name
if _skip_output_assoc_name(assoc_name):
continue
if job in summary.implicit_map_jobs:
hid: Optional[int] = None
for implicit_pair in jobs[job]:
query_assoc_name, dataset_collection = implicit_pair
if query_assoc_name == assoc_name or assoc_name.startswith(
f"__new_primary_file_{query_assoc_name}|"
):
hid = summary.hid(dataset_collection)
if hid is None:
template = (
"Failed to find matching implicit job - job id is %s, implicit pairs are %s, assoc_name is %s."
)
message = template % (job.id, jobs[job], assoc_name)
log.warning(message)
raise Exception("Failed to extract job.")
else:
if hasattr(assoc, "dataset"):
has_hid = assoc.dataset
else:
has_hid = assoc.dataset_collection_instance
hid = summary.hid(has_hid)
if hid in hid_to_output_pair:
log.warning(f"duplicate hid found in extract_steps [{hid}]")
hid_to_output_pair[hid] = (step, assoc.name)
return steps
class FakeJob:
"""
Fake job object for datasets that have no creating_job_associations,
they will be treated as "input" datasets.
"""
def __init__(self, dataset: HistoryDatasetAssociation) -> None:
self.is_fake = True
self.id = f"fake_{dataset.id}"
self.name = self._guess_name_from_dataset(dataset)
def _guess_name_from_dataset(self, dataset: HistoryDatasetAssociation) -> Optional[str]:
"""Tries to guess the name of the fake job from the dataset associations."""
if dataset.copied_from_history_dataset_association:
return "Import from History"
if dataset.copied_from_library_dataset_dataset_association:
return "Import from Library"
return None
class DatasetCollectionCreationJob:
def __init__(self, dataset_collection: HistoryDatasetCollectionAssociation) -> None:
self.is_fake = True
self.id = f"fake_{dataset_collection.id}"
self.from_jobs: Optional[list[Job]] = None
self.name = "Dataset Collection Creation"
self.disabled_why = "Dataset collection created in a way not compatible with workflows"
def set_jobs(self, jobs: list[Job]) -> None:
assert jobs is not None
self.from_jobs = jobs
[docs]
def summarize(
trans: ProvidesHistoryContext, history: Optional[History] = None
) -> tuple[dict[Any, list[tuple[Optional[str], HistoryItem]]], set[str]]:
"""Return mapping of job description to datasets for active items in
supplied history - needed for building workflow from a history.
Formerly call get_job_dict in workflow web controller.
"""
summary = WorkflowSummary(trans, history)
return summary.jobs, summary.warnings
class BaseWorkflowSummary:
"""Shared helpers for workflow extraction summaries (HID-based and ID-based)."""
def __init__(self, trans: ProvidesHistoryContext) -> None:
self.trans = trans
self.warnings: set[str] = set()
def _check_state(self, hda: HistoryDatasetAssociation) -> Optional[HistoryDatasetAssociation]:
# FIXME: Create "Dataset.is_finished"
if hda.state in ("new", "running", "queued"):
self.warnings.add(WARNING_SOME_DATASETS_NOT_READY)
return None
return hda
class WorkflowSummary(BaseWorkflowSummary):
def __init__(self, trans: ProvidesHistoryContext, history: Optional[History]) -> None:
super().__init__(trans)
if not history:
history = trans.history
assert history is not None
self.history: History = history
self.jobs: dict[Any, list[tuple[Optional[str], HistoryItem]]] = {}
self.job_id2representative_job: dict[int, Job] = {} # map a non-fake job id to its representative job
self.implicit_map_jobs: list[Job] = []
self.collection_types: dict[int, str] = {}
self.hda_hid_in_history: dict[int, int] = {}
self.hdca_hid_in_history: dict[int, int] = {}
self.__summarize()
def hid(self, content: HistoryItem) -> int:
if content.history_content_type == "dataset_collection":
if content.id in self.hdca_hid_in_history:
return self.hdca_hid_in_history[content.id]
elif content.history == self.history:
assert content.hid is not None, f"HDCA {content.id} in history has no hid"
return content.hid
else:
log.warning("extraction issue, using hdca hid from outside current history and unmapped")
assert content.hid is not None, f"HDCA {content.id} from external history has no hid"
return content.hid
else:
if content.id in self.hda_hid_in_history:
return self.hda_hid_in_history[content.id]
elif content.history == self.history:
assert content.hid is not None, f"HDA {content.id} in history has no hid"
return content.hid
else:
log.warning("extraction issue, using hda hid from outside current history and unmapped")
assert content.hid is not None, f"HDA {content.id} from external history has no hid"
return content.hid
def __summarize(self) -> None:
# Make a first pass handle all singleton jobs, input dataset and dataset collections
# just grab the implicitly mapped jobs and handle in second pass. Second pass is
# needed because cannot allow selection of individual datasets from an implicit
# mapping during extraction - you get the collection or nothing.
for content in self.history.visible_contents:
self.__summarize_content(content)
def __summarize_content(self, content: HistoryItem) -> None:
# Update internal state for history content (either an HDA or
# an HDCA).
if content.history_content_type == "dataset_collection":
self.__summarize_dataset_collection(cast(HistoryDatasetCollectionAssociation, content))
else:
self.__summarize_dataset(cast(HistoryDatasetAssociation, content))
def __summarize_dataset_collection(self, dataset_collection: HistoryDatasetCollectionAssociation) -> None:
hid_in_history = dataset_collection.hid
assert hid_in_history is not None, f"HDCA {dataset_collection.id} has no hid"
dataset_collection = _original_hdca(dataset_collection)
self.hdca_hid_in_history[dataset_collection.id] = hid_in_history
hid = dataset_collection.hid
assert hid is not None, f"Original HDCA {dataset_collection.id} has no hid"
self.collection_types[hid] = dataset_collection.collection.collection_type
if cja := dataset_collection.creating_job_associations:
# Use the "first" job to represent all mapped jobs.
representative_assoc = cja[0]
representative_job = representative_assoc.job
if (
representative_job not in self.jobs
or self.jobs[representative_job][0][1].history_content_type == "dataset"
):
self.jobs[representative_job] = [(representative_assoc.name, dataset_collection)]
if dataset_collection.implicit_output_name:
self.implicit_map_jobs.append(representative_job)
else:
self.jobs[representative_job].append((representative_assoc.name, dataset_collection))
for cja_assoc in cja:
cja_job = cja_assoc.job
self.job_id2representative_job[cja_job.id] = representative_job
# Fallback for implicit output collections lacking creating_job_associations
# (e.g. reached via Sentry GALAXY-MAIN-121W / issue #22359). Trace via a leaf
# HDA's creating job instead.
elif dataset_collection.implicit_output_name:
# TODO: Optimize db call
element = dataset_collection.collection.first_dataset_element
dataset_instance = element.hda if element else None
if not dataset_instance:
# Got no dataset instance to walk back up to creating job
# (empty collection, or leaf element is not an HDA - e.g. LDDA).
# TODO track this via tool request model
self.jobs[DatasetCollectionCreationJob(dataset_collection)] = [(None, dataset_collection)]
return
if not self._check_state(dataset_instance):
# Just checking the state of one instance, don't need more but
# makes me wonder if even need this check at all?
return
original_hda = _original_hda(dataset_instance)
if not original_hda.creating_job_associations:
log.warning(
"An implicitly create output dataset collection doesn't have a creating_job_association, should not happen!"
)
self.jobs[DatasetCollectionCreationJob(dataset_collection)] = [(None, dataset_collection)]
for assoc in original_hda.creating_job_associations:
job = assoc.job
if job not in self.jobs or self.jobs[job][0][1].history_content_type == "dataset":
self.jobs[job] = [(assoc.name, dataset_collection)]
self.job_id2representative_job[job.id] = job
self.implicit_map_jobs.append(job)
else:
self.jobs[job].append((assoc.name, dataset_collection))
else:
self.jobs[DatasetCollectionCreationJob(dataset_collection)] = [(None, dataset_collection)]
def __summarize_dataset(self, dataset: HistoryDatasetAssociation) -> None:
if not self._check_state(dataset):
return
hid_in_history = dataset.hid
assert hid_in_history is not None
original_hda = _original_hda(dataset)
self.hda_hid_in_history[original_hda.id] = hid_in_history
if not original_hda.creating_job_associations:
self.jobs[FakeJob(dataset)] = [(None, dataset)]
for assoc in original_hda.creating_job_associations:
job = assoc.job
if job in self.jobs:
self.jobs[job].append((assoc.name, dataset))
else:
self.jobs[job] = [(assoc.name, dataset)]
self.job_id2representative_job[job.id] = job
def step_inputs(trans: ProvidesHistoryContext, job: Job) -> tuple[ToolInputs, DataInputAssociations]:
tool = trans.app.toolbox.tool_for_job(job, user=trans.user)
assert tool is not None, f"Tool {job.tool_id} (version {job.tool_version}) not found"
param_values = tool.get_param_values(
job, ignore_errors=True
) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here
associations = __cleanup_param_values(tool.inputs, param_values)
tool_inputs = tool.params_to_strings(param_values, trans.app)
return tool_inputs, associations
def _walk_data_param_tree(
inputs: ToolInputs,
values: ToolInputs,
leaf_handler: Callable[[Any, Any, str], None],
) -> None:
"""Walk a tool input tree, invoking ``leaf_handler`` once per
Data/DataCollection leaf with ``(input, value, full_key)``.
Clears the leaf's value in place and removes the deprecated metadata
cruft (``<key>_*`` siblings of the formal input keys) the framework
pushes into the root values dict. The cruft cleanup is shared because
both extraction variants need it; only the per-leaf association
emission differs.
"""
if "dbkey" in values:
del values["dbkey"]
root_values = values
root_input_keys = inputs.keys()
def walk(prefix: str, inputs: ToolInputs, values: ToolInputs) -> None:
for key, input in inputs.items():
if isinstance(input, (DataToolParameter, DataCollectionToolParameter)):
leaf_handler(input, values[key], prefix + key)
values[key] = None
# FIXME: Nested data params leak deprecated metadata into the
# root values dict; scrub anything starting with `<key>_` that
# isn't itself a formal input.
cruft_prefix = f"{prefix + key}_"
for k in list(root_values.keys()):
if k not in root_input_keys and k.startswith(cruft_prefix):
del root_values[k]
elif isinstance(input, Repeat):
if key in values:
group_values = values[key]
for i, rep_values in enumerate(group_values):
rep_index = rep_values["__index__"]
walk(f"{prefix}{key}_{rep_index}|", input.inputs, group_values[i])
elif isinstance(input, Conditional):
# __job_resource is a runtime-only group; strip and stop —
# workflow encoding shouldn't carry resource selections.
if input.name == "__job_resource":
if input.name in values:
del values[input.name]
return
if input.name in values:
group_values = values[input.name]
current_case = group_values["__current_case__"]
walk(f"{prefix}{key}|", input.cases[current_case].inputs, group_values)
elif isinstance(input, Section):
if input.name in values:
walk(f"{prefix}{key}|", input.inputs, values[input.name])
walk("", inputs, values)
def __cleanup_param_values(inputs: ToolInputs, values: ToolInputs) -> DataInputAssociations:
"""HID-keyed Data-leaf scrub: emit ``(hid, key)`` associations."""
associations: DataInputAssociations = []
def emit(input, value, key):
for item in listify(value):
if isinstance(item, model.DatasetCollectionElement):
item = item.first_dataset_instance()
if item: # false for a non-set optional dataset
associations.append((item.hid, key))
_walk_data_param_tree(inputs, values, emit)
return associations
[docs]
def extract_workflow_by_ids(
trans: ProvidesHistoryContext,
user: User,
workflow_name: str,
job_manager: JobManager,
job_ids: Optional[list[int]] = None,
implicit_collection_jobs_ids: Optional[list[int]] = None,
hda_ids: Optional[list[int]] = None,
hdca_ids: Optional[list[int]] = None,
dataset_names: Optional[list[str]] = None,
dataset_collection_names: Optional[list[str]] = None,
output_labels: Optional[list[Any]] = None,
) -> StoredWorkflow:
"""ID-based variant of :func:`extract_workflow`."""
steps = extract_steps_by_ids(
trans,
job_manager=job_manager,
job_ids=job_ids,
implicit_collection_jobs_ids=implicit_collection_jobs_ids,
hda_ids=hda_ids,
hdca_ids=hdca_ids,
dataset_names=dataset_names,
dataset_collection_names=dataset_collection_names,
output_labels=output_labels,
)
return _finalize_workflow(trans, user, workflow_name, steps)
IdKey = tuple[Literal["dataset", "collection"], int]
IdAssociations = list[tuple[IdKey, str]]
OutputLabelKind = Literal["hda", "hdca"]
OutputLabelKey = tuple[OutputLabelKind, int]
OutputStepKey = tuple[Literal["job", "icj"], int, str]
@dataclass(frozen=True)
class OutputLabelTarget:
key: OutputLabelKey
step_key: OutputStepKey
output_name: str
[docs]
def output_label_to_id_key(kind: OutputLabelKind, content_id: int) -> IdKey:
if kind == "hda":
return ("dataset", content_id)
return ("collection", content_id)
[docs]
def normalize_output_label_key(trans: ProvidesHistoryContext, kind: OutputLabelKind, content_id: int) -> OutputLabelKey:
"""Normalize a visible HDA/HDCA output id to the original id used by extraction wiring."""
user = getattr(trans, "user", None)
if kind == "hda":
hda = trans.app.hda_manager.get_accessible(content_id, user)
return ("hda", _original_hda(hda).id)
hdca = trans.app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", content_id)
return ("hdca", _original_hdca(hdca).id)
[docs]
def collect_output_label_targets(
trans: ProvidesHistoryContext,
job_manager: Optional[JobManager] = None,
job_ids: Optional[list[int]] = None,
implicit_collection_jobs_ids: Optional[list[int]] = None,
) -> dict[OutputLabelKey, OutputLabelTarget]:
"""Collect concrete outputs produced by the selected extraction steps."""
job_ids = list(job_ids or [])
implicit_collection_jobs_ids = list(implicit_collection_jobs_ids or [])
targets: dict[OutputLabelKey, OutputLabelTarget] = {}
for job_id in job_ids:
assert job_manager is not None, "job_manager required when job_ids supplied"
job = job_manager.get_accessible_job(trans, job_id)
for hda_assoc in job.output_datasets:
output_name = hda_assoc.name
if _skip_output_assoc_name(output_name):
continue
original_hda = _original_hda(hda_assoc.dataset)
key: OutputLabelKey = ("hda", original_hda.id)
targets[key] = OutputLabelTarget(key=key, step_key=("job", job.id, output_name), output_name=output_name)
for hdca_assoc in job.output_dataset_collection_instances:
output_name = hdca_assoc.name
original_hdca = _original_hdca(hdca_assoc.dataset_collection_instance)
key = ("hdca", original_hdca.id)
targets[key] = OutputLabelTarget(key=key, step_key=("job", job.id, output_name), output_name=output_name)
sa_session = trans.sa_session
for icj_id in implicit_collection_jobs_ids:
icj = sa_session.get(ImplicitCollectionJobs, icj_id)
if icj is None:
continue
seen_output_names: set[str] = set()
for output_hdca in icj.output_dataset_collection_instances:
output_name = output_hdca.implicit_output_name
if not output_name or output_name in seen_output_names:
continue
seen_output_names.add(output_name)
original_hdca = _original_hdca(output_hdca)
key = ("hdca", original_hdca.id)
targets[key] = OutputLabelTarget(key=key, step_key=("icj", icj.id, output_name), output_name=output_name)
return targets
[docs]
def extract_steps_by_ids(
trans: ProvidesHistoryContext,
job_manager: Optional[JobManager] = None,
job_ids: Optional[list[int]] = None,
implicit_collection_jobs_ids: Optional[list[int]] = None,
hda_ids: Optional[list[int]] = None,
hdca_ids: Optional[list[int]] = None,
dataset_names: Optional[list[str]] = None,
dataset_collection_names: Optional[list[str]] = None,
output_labels: Optional[list[Any]] = None,
) -> list[WorkflowStep]:
"""ID-based variant of :func:`extract_steps`.
Inputs are decoded DB ids; each is fetched and access-checked against the
current user via the appropriate manager. Connections are keyed by
``(content_type, db_id)`` of the resolved *original* HDA/HDCA so that
copied datasets and cross-history items map deterministically.
Mapped (map-over) steps are passed as ``implicit_collection_jobs_ids``;
each ICJ becomes a single workflow step whose inputs are wired to the
pre-map input HDCA(s) via ``ImplicitlyCreatedDatasetCollectionInput``.
``job_manager`` may be omitted only when ``job_ids`` is empty (kept
Optional for unit-test ergonomics).
"""
job_ids = list(job_ids or [])
implicit_collection_jobs_ids = list(implicit_collection_jobs_ids or [])
hda_ids = list(hda_ids or [])
hdca_ids = list(hdca_ids or [])
output_labels = list(output_labels or [])
user = getattr(trans, "user", None)
sa_session = trans.sa_session
hda_manager = trans.app.hda_manager
dataset_collection_manager = trans.app.dataset_collection_manager
steps: list[WorkflowStep] = []
step_labels: set[str] = set()
id_to_output_pair: dict[IdKey, tuple[WorkflowStep, str]] = {}
for i, hda_id in enumerate(hda_ids):
hda = hda_manager.get_accessible(hda_id, user)
step = model.WorkflowStep()
step.type = "data_input"
name = dataset_names[i] if dataset_names else "Input Dataset"
if name not in step_labels:
step.label = name
step_labels.add(name)
step.tool_inputs = dict(name=name)
steps.append(step)
original = _original_hda(hda)
id_to_output_pair[("dataset", original.id)] = (step, "output")
for i, hdca_id in enumerate(hdca_ids):
hdca = dataset_collection_manager.get_dataset_collection_instance(trans, "history", hdca_id)
step = model.WorkflowStep()
step.type = "data_collection_input"
name = dataset_collection_names[i] if dataset_collection_names else "Input Dataset Collection"
if name not in step_labels:
step.label = name
step_labels.add(name)
step.tool_inputs = dict(name=name, collection_type=hdca.collection.collection_type)
steps.append(step)
original_hdca = _original_hdca(hdca)
id_to_output_pair[("collection", original_hdca.id)] = (step, "output")
# Build the list of work items: each tuple is (representative_job,
# output_hdcas). For plain jobs output_hdcas is empty; for ICJs it
# contains the ICJ's output HDCAs (used both for access checks and to
# drive input/output wiring without inferring map/over from job state).
# Service-layer validator ensures no job in job_ids has an ICJ
# association, so this branch handles only true plain jobs.
work_items: list[tuple[Job, list[HistoryDatasetCollectionAssociation]]] = []
for job_id in job_ids:
assert job_manager is not None, "job_manager required when job_ids supplied"
job = job_manager.get_accessible_job(trans, job_id)
work_items.append((job, []))
# FIXME: representative-job param read is the only remaining HID-style
# inference here. Swap step_inputs_by_id for a Job.tool_state /
# ToolRequest.request_state reader once that exists; see
# docs/research/Problem - YAML Tool Post-Hoc State Divergence.md.
for icj_id in implicit_collection_jobs_ids:
# Service-layer validator already checked existence, populated_state,
# output-HDCA presence, and per-HDCA accessibility.
icj = sa_session.get(ImplicitCollectionJobs, icj_id)
assert icj is not None, f"ImplicitCollectionJobs {icj_id} not found"
work_items.append((icj.representative_job, icj.output_dataset_collection_instances))
# Job.id is monotonically assigned at submission, so sorting by it
# produces dependency order: a downstream job always has a larger id
# than the jobs whose outputs it consumes.
work_items.sort(key=lambda item: item[0].id)
for job, output_hdcas in work_items:
tool_inputs, associations = step_inputs_by_id(trans, job)
step = model.WorkflowStep()
step.type = "tool"
step.tool_id = job.tool_id
step.tool_version = job.tool_version
step.tool_inputs = tool_inputs
mapped_inputs: dict[str, HistoryDatasetCollectionAssociation] = {}
if output_hdcas:
for icol in output_hdcas[0].implicit_input_collections:
if icol.name and icol.input_dataset_collection is not None:
mapped_inputs[icol.name] = _original_hdca(icol.input_dataset_collection)
for key, input_name in associations:
if input_name in mapped_inputs:
key = ("collection", mapped_inputs[input_name].id)
if key in id_to_output_pair:
_connect(step, input_name, id_to_output_pair[key])
steps.append(step)
if output_hdcas:
seen_names: dict[str, HistoryDatasetCollectionAssociation] = {}
for output_hdca in output_hdcas:
output_name = output_hdca.implicit_output_name
if output_name and output_name not in seen_names:
seen_names[output_name] = output_hdca
for output_name, output_hdca in seen_names.items():
original_output = _original_hdca(output_hdca)
id_to_output_pair[("collection", original_output.id)] = (step, output_name)
else:
for hda_assoc in job.output_datasets:
hda_assoc_name = hda_assoc.name
if _skip_output_assoc_name(hda_assoc_name):
continue
original_hda = _original_hda(hda_assoc.dataset)
id_to_output_pair[("dataset", original_hda.id)] = (step, hda_assoc_name)
for hdca_assoc in job.output_dataset_collection_instances:
original_hdca = _original_hdca(hdca_assoc.dataset_collection_instance)
id_to_output_pair[("collection", original_hdca.id)] = (step, hdca_assoc.name)
for output_label in output_labels:
kind = output_label.kind
content_id = output_label.id
label = output_label.label
normalized_kind, normalized_content_id = normalize_output_label_key(trans, kind, content_id)
id_key = output_label_to_id_key(normalized_kind, normalized_content_id)
output_pair = id_to_output_pair.get(id_key)
if output_pair is None:
raise exceptions.RequestParameterInvalidException(
f"output_labels includes {kind} id {content_id} that was not produced by a selected extraction step"
)
step, output_name = output_pair
step.create_or_update_workflow_output(output_name=output_name, label=label, uuid=None)
return steps
def step_inputs_by_id(trans: ProvidesHistoryContext, job: Job) -> tuple[ToolInputs, IdAssociations]:
"""ID-based variant of :func:`step_inputs`.
Returns associations keyed by ``(content_type, db_id)`` tuples (against
the *original* HDA/HDCA after walking ``copied_from_*``). Collection
and DCE inputs come from ``Job.input_dataset_collections`` /
``Job.input_dataset_collection_elements`` directly rather than from the
param-value walk, which avoids the HID path's flattening of HDCAs to
leaf HDAs and prevents duplicate emission for DCE-as-data-param.
"""
tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version)
assert tool is not None, f"Tool {job.tool_id} (version {job.tool_version}) not found"
param_values = tool.get_param_values(job, ignore_errors=True)
associations: IdAssociations = __cleanup_param_values_by_id(tool.inputs, param_values)
for assoc in job.input_dataset_collections:
original_hdca = _original_hdca(assoc.dataset_collection)
associations.append((("collection", original_hdca.id), assoc.name))
for elem_assoc in job.input_dataset_collection_elements:
dce = elem_assoc.dataset_collection_element
leaf = dce.hda or dce.first_dataset_instance()
if not isinstance(leaf, HistoryDatasetAssociation):
continue
original_hda = _original_hda(leaf)
associations.append((("dataset", original_hda.id), elem_assoc.name))
tool_inputs = tool.params_to_strings(param_values, trans.app)
return tool_inputs, associations
def _original_hda(hda: HistoryDatasetAssociation) -> HistoryDatasetAssociation:
while hda.copied_from_history_dataset_association:
hda = hda.copied_from_history_dataset_association
return hda
def _original_hdca(hdca: HistoryDatasetCollectionAssociation) -> HistoryDatasetCollectionAssociation:
while hdca.copied_from_history_dataset_collection_association:
hdca = hdca.copied_from_history_dataset_collection_association
return hdca
def __cleanup_param_values_by_id(inputs: ToolInputs, values: ToolInputs) -> IdAssociations:
"""ID-keyed Data-leaf scrub.
HDA leaves emit ``("dataset", _original_hda(hda).id)``. DCE values and
``DataCollectionToolParameter`` leaves are scrubbed but emit nothing —
collection / DCE inputs are appended in :func:`step_inputs_by_id` from
typed DB rows so HDCAs aren't lost to ``first_dataset_instance()``
flattening and DCEs aren't double-emitted alongside their typed
``input_dataset_collection_elements`` row.
"""
associations: IdAssociations = []
def emit(input, value, key):
if isinstance(input, DataCollectionToolParameter):
return
for item in listify(value):
if isinstance(item, model.DatasetCollectionElement):
# Covered by job.input_dataset_collection_elements; skip to
# avoid duplicate connections.
continue
if isinstance(item, HistoryDatasetAssociation):
original = _original_hda(item)
associations.append((("dataset", original.id), key))
_walk_data_param_tree(inputs, values, emit)
return associations
__all__ = (
"collect_output_label_targets",
"summarize",
"extract_workflow",
"extract_workflow_by_ids",
"extract_steps_by_ids",
"normalize_output_label_key",
"output_label_to_id_key",
)