Source code for galaxy.managers.jobs

import json
import logging
from datetime import (
    date,
    datetime,
)
from pathlib import Path
from typing import (
    Any,
    cast,
    Dict,
    Iterable,
    List,
    Optional,
    Set,
    Union,
)

import sqlalchemy
from boltons.iterutils import remap
from pydantic import (
    BaseModel,
    Field,
)
from sqlalchemy import (
    and_,
    exists,
    false,
    func,
    null,
    or_,
    true,
)
from sqlalchemy.orm import aliased
from sqlalchemy.sql import select
from typing_extensions import TypedDict

from galaxy import model
from galaxy.exceptions import (
    ConfigDoesNotAllowException,
    ItemAccessibilityException,
    ObjectNotFound,
    RequestParameterInvalidException,
    RequestParameterMissingException,
)
from galaxy.job_metrics import (
    RawMetric,
    Safety,
)
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.model import (
    ImplicitCollectionJobs,
    ImplicitCollectionJobsJobAssociation,
    Job,
    JobMetricNumeric,
    JobParameter,
    User,
    Workflow,
    WorkflowInvocation,
    WorkflowInvocationStep,
    WorkflowStep,
    YIELD_PER_ROWS,
)
from galaxy.model.index_filter_util import (
    raw_text_column_filter,
    text_column_filter,
)
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.schema.schema import (
    JobIndexQueryPayload,
    JobIndexSortByEnum,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.structured_app import StructuredApp
from galaxy.tools._types import (
    ToolStateDumpedToJsonInternalT,
    ToolStateJobInstancePopulatedT,
)
from galaxy.util import (
    defaultdict,
    ExecutionTimer,
    listify,
    string_as_bool_or_none,
)
from galaxy.util.search import (
    FilteredTerm,
    parse_filters_structured,
    RawTextTerm,
)

log = logging.getLogger(__name__)

JobStateT = str
JobStatesT = Union[JobStateT, Iterable[JobStateT]]


STDOUT_LOCATION = "outputs/tool_stdout"
STDERR_LOCATION = "outputs/tool_stderr"


[docs] class JobLock(BaseModel): active: bool = Field(title="Job lock status", description="If active, jobs will not dispatch")
[docs] def get_path_key(path_tuple): path_key = "" tuple_elements = len(path_tuple) for i, p in enumerate(path_tuple): if isinstance(p, int): sep = "_" else: sep = "|" if i == (tuple_elements - 2) and p == "values": # dataset inputs are always wrapped in lists. To avoid 'rep_factorName_0|rep_factorLevel_2|countsFile|values_0', # we remove the last 2 items of the path tuple (values and list index) return path_key if path_key: path_key = f"{path_key}{sep}{p}" else: path_key = p return path_key
[docs] class JobManager:
[docs] def __init__(self, app: StructuredApp): self.app = app self.dataset_manager = DatasetManager(app)
[docs] def index_query(self, trans: ProvidesUserContext, payload: JobIndexQueryPayload) -> sqlalchemy.engine.ScalarResult: """The caller is responsible for security checks on the resulting job if history_id, invocation_id, or implicit_collection_jobs_id is set. Otherwise this will only return the user's jobs or all jobs if the requesting user is acting as an admin. """ is_admin = trans.user_is_admin user_details = payload.user_details decoded_user_id = payload.user_id history_id = payload.history_id workflow_id = payload.workflow_id invocation_id = payload.invocation_id implicit_collection_jobs_id = payload.implicit_collection_jobs_id search = payload.search order_by = payload.order_by def build_and_apply_filters(stmt, objects, filter_func): if objects is not None: if isinstance(objects, (str, date, datetime)): stmt = stmt.where(filter_func(objects)) elif isinstance(objects, list): t = [] for obj in objects: t.append(filter_func(obj)) stmt = stmt.where(or_(*t)) return stmt def add_workflow_jobs(): wfi_step = select(WorkflowInvocationStep) if workflow_id is not None: wfi_step = ( wfi_step.join(WorkflowInvocation).join(Workflow).where(Workflow.stored_workflow_id == workflow_id) ) elif invocation_id is not None: wfi_step = wfi_step.where(WorkflowInvocationStep.workflow_invocation_id == invocation_id) wfi_step_sq = wfi_step.subquery() stmt1 = stmt.join(wfi_step_sq) stmt2 = stmt.join(ImplicitCollectionJobsJobAssociation).join( wfi_step_sq, ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id == wfi_step_sq.c.implicit_collection_jobs_id, ) # Ensure the result is models, not tuples sq = stmt1.union(stmt2).subquery() # SQLite won't recognize Job.foo as a valid column for the ORDER BY clause due to the UNION clause, so we'll use the subquery `columns` collection (`sq.c`). # Ref: https://github.com/galaxyproject/galaxy/pull/16852#issuecomment-1804676322 return select(aliased(Job, sq)), sq.c def add_search_criteria(stmt): search_filters = { "tool": "tool", "t": "tool", } if user_details: search_filters.update( { "user": "user", "u": "user", } ) if is_admin: search_filters.update( { "runner": "runner", "r": "runner", "handler": "handler", "h": "handler", } ) assert search parsed_search = parse_filters_structured(search, search_filters) for term in parsed_search.terms: if isinstance(term, FilteredTerm): key = term.filter if key == "user": stmt = stmt.where(text_column_filter(User.email, term)) elif key == "tool": stmt = stmt.where(text_column_filter(Job.tool_id, term)) elif key == "handler": stmt = stmt.where(text_column_filter(Job.handler, term)) elif key == "runner": stmt = stmt.where(text_column_filter(Job.job_runner_name, term)) elif isinstance(term, RawTextTerm): columns: List = [Job.tool_id] if user_details: columns.append(User.email) if is_admin: columns.append(Job.handler) columns.append(Job.job_runner_name) stmt = stmt.filter(raw_text_column_filter(columns, term)) return stmt stmt = select(Job) if is_admin: if decoded_user_id is not None: stmt = stmt.where(Job.user_id == decoded_user_id) if user_details: stmt = stmt.outerjoin(Job.user) else: if history_id is None and invocation_id is None and implicit_collection_jobs_id is None: # If we're not filtering on history, invocation or collection we filter the jobs owned by the current user if trans.user: stmt = stmt.where(Job.user_id == trans.user.id) elif trans.galaxy_session: stmt = stmt.where(Job.session_id == trans.galaxy_session.id) else: raise RequestParameterMissingException("A session is required to list jobs for anonymous users") stmt = build_and_apply_filters(stmt, payload.states, lambda s: model.Job.state == s) stmt = build_and_apply_filters(stmt, payload.tool_ids, lambda t: model.Job.tool_id == t) stmt = build_and_apply_filters(stmt, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t)) stmt = build_and_apply_filters(stmt, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin) stmt = build_and_apply_filters(stmt, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax) if history_id is not None: stmt = stmt.where(Job.history_id == history_id) order_by_columns = Job if workflow_id or invocation_id: stmt, order_by_columns = add_workflow_jobs() elif implicit_collection_jobs_id: stmt = ( stmt.join(ImplicitCollectionJobsJobAssociation, ImplicitCollectionJobsJobAssociation.job_id == Job.id) .join( ImplicitCollectionJobs, ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id, ) .where(ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id == implicit_collection_jobs_id) ) if search: stmt = add_search_criteria(stmt) if order_by == JobIndexSortByEnum.create_time: stmt = stmt.order_by(order_by_columns.create_time.desc()) else: stmt = stmt.order_by(order_by_columns.update_time.desc()) stmt = stmt.offset(payload.offset) stmt = stmt.limit(payload.limit) return trans.sa_session.scalars(stmt)
[docs] def job_lock(self) -> JobLock: return JobLock(active=self.app.job_manager.job_lock)
[docs] def update_job_lock(self, job_lock: JobLock): self.app.queue_worker.send_control_task( "admin_job_lock", kwargs={"job_lock": job_lock.active}, get_response=True ) return self.job_lock()
[docs] def get_accessible_job(self, trans: ProvidesUserContext, decoded_job_id) -> Job: job = trans.sa_session.get(Job, decoded_job_id) if job is None: raise ObjectNotFound() belongs_to_user = False if trans.user: belongs_to_user = job.user_id == trans.user.id elif trans.galaxy_session: belongs_to_user = job.session_id == trans.galaxy_session.id if not trans.user_is_admin and not belongs_to_user: # Check access granted via output datasets. if not job.output_datasets: raise ItemAccessibilityException("Job has no output datasets.") for data_assoc in job.output_datasets: if not self.dataset_manager.is_accessible(data_assoc.dataset.dataset, trans.user): raise ItemAccessibilityException("You are not allowed to rerun this job.") trans.sa_session.refresh(job) return job
[docs] def get_job_console_output( self, trans, job, stdout_position=-1, stdout_length=0, stderr_position=-1, stderr_length=0 ): if job is None: raise ObjectNotFound() # Check job destination params to see if stdout reporting is enabled dest_params = job.destination_params if not string_as_bool_or_none(dest_params.get("live_tool_output_reporting", False)): raise ConfigDoesNotAllowException() # If stdout_length and stdout_position are good values, then load standard out and add it to status console_output = {} console_output["state"] = job.state if job.state == job.states.RUNNING: working_directory = trans.app.object_store.get_filename( job, base_dir="job_work", dir_only=True, obj_dir=True ) if stdout_length > -1 and stdout_position > -1: try: stdout_path = Path(working_directory) / STDOUT_LOCATION stdout_file = open(stdout_path) stdout_file.seek(stdout_position) console_output["stdout"] = stdout_file.read(stdout_length) except Exception as e: log.error("Could not read STDOUT: %s", e) console_output["stdout"] = "" if stderr_length > -1 and stderr_position > -1: try: stderr_path = Path(working_directory) / STDERR_LOCATION stderr_file = open(stderr_path) stderr_file.seek(stderr_position) console_output["stderr"] = stderr_file.read(stderr_length) except Exception as e: log.error("Could not read STDERR: %s", e) console_output["stderr"] = "" else: console_output["stdout"] = job.tool_stdout console_output["stderr"] = job.tool_stderr return console_output
[docs] def stop(self, job, message=None): if not job.finished: job.mark_deleted(self.app.config.track_jobs_in_database, message) session = self.app.model.session session.commit() self.app.job_manager.stop(job, message=message) return True else: return False
[docs] class JobSearch: """Search for jobs using tool inputs or other jobs"""
[docs] def __init__( self, sa_session: galaxy_scoped_session, hda_manager: HDAManager, dataset_collection_manager: DatasetCollectionManager, ldda_manager: LDDAManager, id_encoding_helper: IdEncodingHelper, ): self.sa_session = sa_session self.dialect_name = sa_session.get_bind().dialect.name self.use_materialized_hint = self.supports_materialized_hint() self.hda_manager = hda_manager self.dataset_collection_manager = dataset_collection_manager self.ldda_manager = ldda_manager self.decode_id = id_encoding_helper.decode_id
[docs] def supports_materialized_hint(self) -> bool: """ Checks if the connected PostgreSQL database version supports the MATERIALIZED hint. (PostgreSQL 12 and higher support it). """ # session.bind refers to the Engine or Connection the session is bound to # dialect provides information about the database being used # server_version_info returns a tuple (major, minor, micro, ...) # e.g., (12, 5) for PostgreSQL 12.5, (13, 2) for PostgreSQL 13.2 if self.dialect_name == "postgresql": bind = self.sa_session.get_bind() server_version_info = bind.dialect and bind.dialect.server_version_info if server_version_info: return server_version_info[0] >= 12 return False
[docs] def by_tool_input( self, user: User, tool_id: str, tool_version: Optional[str], param: ToolStateJobInstancePopulatedT, param_dump: ToolStateDumpedToJsonInternalT, job_state: Optional[JobStatesT] = (Job.states.OK,), require_name_match: bool = True, ): """Search for jobs producing same results using the 'inputs' part of a tool POST.""" input_data = defaultdict(list) def populate_input_data_input_id(path, key, value): """Traverses expanded incoming using remap and collects input_ids and input_data.""" if key == "id": path_key = get_path_key(path[:-2]) current_case = param_dump for p in path: current_case = current_case[p] src = current_case.get("src") if src is None: # just a parameter named id. # TODO: dispatch on tool parameter type instead of values, # maybe with tool state variant return key, value current_case = param for i, p in enumerate(path): if p == "values" and i == len(path) - 2: continue if isinstance(current_case, (list, dict)): current_case = current_case[p] identifier = getattr(current_case, "element_identifier", None) input_data[path_key].append( { "src": src, "id": value, "identifier": identifier, } ) return key, "__id_wildcard__" return key, value wildcard_param_dump = remap(param_dump, visit=populate_input_data_input_id) return self.__search( tool_id=tool_id, tool_version=tool_version, user=user, input_data=input_data, job_state=job_state, param_dump=param_dump, wildcard_param_dump=wildcard_param_dump, require_name_match=require_name_match, )
def __search( self, tool_id: str, tool_version: Optional[str], user: model.User, input_data, job_state: Optional[JobStatesT], param_dump: ToolStateDumpedToJsonInternalT, wildcard_param_dump=None, require_name_match: bool = True, ): search_timer = ExecutionTimer() def replace_dataset_ids(path, key, value): """Exchanges dataset_ids (HDA, LDA, HDCA, not Dataset) in param_dump with dataset ids used in job.""" if key == "id": current_case = param_dump for p in path: current_case = current_case[p] src = current_case.get("src") if src is None: # just a parameter named id. # same workaround as in populate_input_data_input_id return key, value value = job_input_ids[src][value] return key, value return key, value stmt = select(model.Job.id.label("job_id")) data_conditions: List = [] # We now build the stmt filters that relate to the input datasets # that this job uses. We keep track of the requested dataset id in `requested_ids`, # the type (hda, hdca or lda) in `data_types` # and the ids that have been used in the job that has already been run in `used_ids`. requested_ids = [] data_types = [] used_ids: List = [] for k, input_list in input_data.items(): # k will be matched against the JobParameter.name column. This can be prefixed depending on whether # the input is in a repeat, or not (section and conditional) for value_index, type_values in enumerate(input_list): t = type_values["src"] v = type_values["id"] requested_ids.append(v) data_types.append(t) identifier = type_values["identifier"] if t == "hda": stmt = self._build_stmt_for_hda( stmt, data_conditions, used_ids, k, v, identifier, require_name_match=require_name_match, value_index=value_index, ) elif t == "ldda": stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v, value_index=value_index) elif t == "hdca": stmt = self._build_stmt_for_hdca( stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index ) elif t == "dce": stmt = self._build_stmt_for_dce( stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index ) else: log.error("Unknown input data type %s", t) return None stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids) stmt = self._filter_jobs(stmt, tool_id, user.id, tool_version, job_state, wildcard_param_dump) stmt = self._exclude_jobs_with_deleted_outputs(stmt) for job in self.sa_session.execute(stmt): # We found a job that is equal in terms of tool_id, user, state and input datasets, # but to be able to verify that the parameters match we need to modify all instances of # dataset_ids (HDA, LDDA, HDCA) in the incoming param_dump to point to those used by the # possibly equivalent job, which may have been run on copies of the original input data. job_input_ids = {} if len(job) > 1: # We do have datasets to check job_id, current_jobs_data_ids = job[0], job[1:] job_parameter_conditions = [model.Job.id == job_id] for src, requested_id, used_id in zip(data_types, requested_ids, current_jobs_data_ids): if src not in job_input_ids: job_input_ids[src] = {requested_id: used_id} else: job_input_ids[src][requested_id] = used_id new_param_dump = remap(param_dump, visit=replace_dataset_ids) # new_param_dump has its dataset ids remapped to those used by the job. # We now ask if the remapped job parameters match the current job. for k, v in new_param_dump.items(): if v == {"__class__": "RuntimeValue"}: # TODO: verify this is always None. e.g. run with runtime input input v = None elif k.endswith("|__identifier__"): # We've taken care of this while constructing the conditions based on ``input_data`` above continue elif k == "chromInfo" and "?.len" in v: continue elif k == "__when_value__": continue a = aliased(model.JobParameter) job_parameter_conditions.append( and_( model.Job.id == a.job_id, a.name == k, a.value == (None if v is None else json.dumps(v, sort_keys=True)), ) ) else: job_parameter_conditions = [model.Job.id == job[0]] job = get_job(self.sa_session, *job_parameter_conditions) if job is None: continue n_parameters = 0 # Verify that equivalent jobs had the same number of job parameters # We skip chrominfo, dbkey, __workflow_invocation_uuid__ and identifer # parameter as these are not passed along when expanding tool parameters # and they can differ without affecting the resulting dataset. for parameter in job.parameters: if parameter.name.startswith("__"): continue if parameter.name in {"chromInfo", "dbkey"} or parameter.name.endswith("|__identifier__"): continue n_parameters += 1 if not n_parameters == sum( 1 for k in param_dump if not k.startswith("__") and not k.endswith("|__identifier__") and k not in {"chromInfo", "dbkey"} ): continue log.info("Found equivalent job %s", search_timer) return job log.info("No equivalent jobs found %s", search_timer) return None def _filter_jobs( self, stmt, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump ): """Build subquery that selects a job with correct job parameters.""" job_ids_materialized_cte = stmt.cte("job_ids_cte") outer_select_columns = [job_ids_materialized_cte.c[col.name] for col in stmt.selected_columns] stmt = select(*outer_select_columns).select_from(job_ids_materialized_cte) stmt = ( stmt.join(model.Job, model.Job.id == job_ids_materialized_cte.c.job_id) .join(model.History, model.Job.history_id == model.History.id) .where( and_( model.Job.tool_id == tool_id, or_( model.Job.user_id == user_id, model.History.published == true(), ), model.Job.copied_from_job_id.is_(None), # Always pick original job ) ) ) if tool_version: stmt = stmt.where(Job.tool_version == str(tool_version)) if job_state is None: job_states: Set[str] = { Job.states.NEW, Job.states.QUEUED, Job.states.WAITING, Job.states.RUNNING, Job.states.OK, } else: if isinstance(job_state, str): job_states = {job_state} else: job_states = {*job_state} if wildcard_param_dump.get("__when_value__") is False: job_states = {Job.states.SKIPPED} stmt = stmt.where(Job.state.in_(job_states)) for k, v in wildcard_param_dump.items(): if v == {"__class__": "RuntimeValue"}: # TODO: verify this is always None. e.g. run with runtime input input v = None elif k.endswith("|__identifier__"): # We've taken care of this while constructing the conditions based on ``input_data`` above continue elif k == "chromInfo" and "?.len" in v: continue elif k == "__when_value__": # TODO: really need to separate this. continue value_dump = None if v is None else json.dumps(v, sort_keys=True) wildcard_value = value_dump.replace('"id": "__id_wildcard__"', '"id": %') if value_dump else None a = aliased(JobParameter) if value_dump == wildcard_value: # No wildcard needed, use exact match stmt = stmt.join(a).where( and_( Job.id == a.job_id, a.name == k, a.value == value_dump, ) ) else: stmt = stmt.join(a).where( and_( Job.id == a.job_id, a.name == k, a.value.like(wildcard_value), ) ) return stmt def _exclude_jobs_with_deleted_outputs(self, stmt): subquery_alias = stmt.subquery("filtered_jobs_subquery") outer_select_columns = [subquery_alias.c[col.name] for col in stmt.selected_columns] outer_stmt = select(*outer_select_columns).select_from(subquery_alias) job_id_from_subquery = subquery_alias.c.job_id deleted_collection_exists = exists().where( and_( model.JobToOutputDatasetCollectionAssociation.job_id == job_id_from_subquery, model.JobToOutputDatasetCollectionAssociation.dataset_collection_id == model.HistoryDatasetCollectionAssociation.id, model.HistoryDatasetCollectionAssociation.deleted == true(), ) ) # Subquery for deleted output datasets deleted_dataset_exists = exists().where( and_( model.JobToOutputDatasetAssociation.job_id == job_id_from_subquery, model.JobToOutputDatasetAssociation.dataset_id == model.HistoryDatasetAssociation.id, model.HistoryDatasetAssociation.deleted == true(), ) ) # Exclude jobs where a deleted collection OR a deleted dataset exists outer_stmt = outer_stmt.where( and_( ~deleted_collection_exists, # NOT EXISTS deleted collection ~deleted_dataset_exists, # NOT EXISTS deleted dataset ) ) unordered_results_cte = outer_stmt.cte("unordered_results") if self.use_materialized_hint: # This can be considerable faster with large job tables, # but is only available on postgresql >= 12. unordered_results_cte = unordered_results_cte.prefix_with("MATERIALIZED") final_ordered_stmt = ( select(*unordered_results_cte.c) .select_from(unordered_results_cte) .order_by(unordered_results_cte.c.job_id.desc()) ) return final_ordered_stmt def _build_stmt_for_hda( self, stmt, data_conditions, used_ids, k, v, identifier, value_index, require_name_match=True ): a = aliased(model.JobToInputDatasetAssociation) b = aliased(model.HistoryDatasetAssociation) c = aliased(model.HistoryDatasetAssociation) d = aliased(model.JobParameter) e = aliased(model.HistoryDatasetAssociationHistory) labeled_col = a.dataset_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) used_ids.append(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) # b is the HDA used for the job stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) name_condition = [] hda_history_join_conditions = [ e.history_dataset_association_id == b.id, e.extension == c.extension, e._metadata == c._metadata, e.version == a.dataset_version, ] if identifier: stmt = stmt.join(d) data_conditions.append( and_( d.name == f"{k}|__identifier__", d.value == json.dumps(identifier), ) ) elif require_name_match: hda_history_join_conditions.append(e.name == c.name) name_condition.append(b.name == c.name) stmt = stmt.outerjoin(e, and_(*hda_history_join_conditions)) data_conditions.append( and_( a.name == k, c.id == v, # c is the requested job input HDA # We need to make sure that the job we are looking for has been run with identical inputs. # Here we deal with 3 requirements: # - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready # - b's update_time is older than the job create time, meaning no changes occurred # - the job has a dataset_version recorded, and that versions' metadata matches c's metadata. or_( and_( or_(a.dataset_version.in_([0, b.version]), b.update_time < model.Job.create_time), b.extension == c.extension, b.metadata == c.metadata, *name_condition, ), e.history_dataset_association_id.isnot(None), ), or_(b.deleted == false(), c.deleted == false()), ) ) return stmt def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v, value_index): a = aliased(model.JobToInputLibraryDatasetAssociation) labeled_col = a.ldda_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) data_conditions.append(and_(a.name == k, a.ldda_id == v)) used_ids.append(labeled_col) return stmt
[docs] def agg_expression(self, column): if self.dialect_name == "sqlite": return func.group_concat(column) else: return func.array_agg(column, order_by=column)
def _build_stmt_for_hdca( self, stmt, data_conditions, used_ids, k, v, user_id, value_index, require_name_match=True ): # Strategy for efficiently finding equivalent HDCAs: # 1. Determine the structural depth of the target HDCA by its collection_type. # 2. For the target HDCA (identified by 'v'): # a. Dynamically construct Common Table Expressions (CTEs) to traverse its (potentially nested) structure down to individual datasets. # b. Generate a "path signature string" for each dataset element, uniquely identifying its path within the collection. # c. Aggregate these path strings into a canonical, sorted array (the "reference full signature") using array_agg with explicit ordering. # 3. For all candidate HDCAs: # a. Perform a similar dynamic traversal and path signature string generation. # b. Aggregate these into sorted "full signature" arrays for each candidate HDCA. # 4. Finally, identify equivalent HDCAs by comparing their full signature array directly against the reference full signature array. # # This approach is performant because: # - It translates the complex problem of structural collection comparison into efficient array equality checks directly within the database. # - It leverages the power of SQL CTEs and set-based operations, allowing the database query optimizer to find an efficient execution plan. # - Joins required to traverse collection structures are built dynamically based on the actual depth, avoiding unnecessary complexity. # - Signatures are computed and compared entirely on the database side, minimizing data transfer to the application. # # Note: CTEs are uniquely named using 'k' and 'v' to allow this logic to be embedded # within larger queries or loops processing multiple target HDCAs. Aliases are used # extensively to manage dynamic joins based on collection depth. collection_type = self.sa_session.scalar( select(model.DatasetCollection.collection_type) .select_from(model.HistoryDatasetCollectionAssociation) .join(model.DatasetCollection) .where(model.HistoryDatasetCollectionAssociation.id == v) ) depth = collection_type.count(":") if collection_type else 0 a = aliased( model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{value_index}" ) hdca_input = aliased( model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_1_{k}_{value_index}", ) _hdca_target_cte_ref = aliased(model.HistoryDatasetCollectionAssociation, name="_hdca_target_cte_ref") _target_collection_cte_ref = aliased(model.DatasetCollection, name="_target_collection_cte_ref") _dce_cte_ref_list = [ aliased(model.DatasetCollectionElement, name=f"_dce_cte_ref_{i}") for i in range(depth + 1) ] _hda_cte_ref = aliased(model.HistoryDatasetAssociation, name="_hda_cte_ref") # --- NEW CTE: reference_hdca_all_dataset_ids_cte --- # This CTE identifies all distinct dataset IDs that are part of the *reference* # History Dataset Collection Association (HDCA). This is used for an initial, # fast pre-filtering of candidate HDCAs. reference_all_dataset_ids_select = ( select(_hda_cte_ref.dataset_id.label("ref_dataset_id_for_overlap")) .select_from(_hdca_target_cte_ref) .join(_target_collection_cte_ref, _target_collection_cte_ref.id == _hdca_target_cte_ref.collection_id) .join(_dce_cte_ref_list[0], _dce_cte_ref_list[0].dataset_collection_id == _target_collection_cte_ref.id) ) for i in range(1, depth + 1): reference_all_dataset_ids_select = reference_all_dataset_ids_select.join( _dce_cte_ref_list[i], _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, ) _leaf_cte_ref = _dce_cte_ref_list[-1] reference_all_dataset_ids_select = ( reference_all_dataset_ids_select.join(_hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id) .where(_hdca_target_cte_ref.id == v) .distinct() ) reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}") # --- END NEW CTE --- # CTE 1: signature_elements_cte (for the reference HDCA) # This CTE generates a unique "path signature string" for each dataset element # within the reference HDCA. This string identifies the element's position # and content within the nested collection structure. signature_elements_select = ( select( func.concat_ws( ";", *[_dce_cte_ref_list[i].element_identifier for i in range(depth + 1)], _hda_cte_ref.dataset_id.cast(sqlalchemy.Text), ).label("path_signature_string") ) .select_from(_hdca_target_cte_ref) .join(_target_collection_cte_ref, _target_collection_cte_ref.id == _hdca_target_cte_ref.collection_id) .join(_dce_cte_ref_list[0], _dce_cte_ref_list[0].dataset_collection_id == _target_collection_cte_ref.id) ) for i in range(1, depth + 1): signature_elements_select = signature_elements_select.join( _dce_cte_ref_list[i], _dce_cte_ref_list[i].dataset_collection_id == _dce_cte_ref_list[i - 1].child_collection_id, ) _leaf_cte_ref = _dce_cte_ref_list[-1] signature_elements_select = signature_elements_select.join( _hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id ) signature_elements_select = signature_elements_select.where(_hdca_target_cte_ref.id == v) signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{value_index}") # CTE 2: reference_full_signature_cte # This CTE aggregates the path signature strings of the reference HDCA into a # canonical, sorted array. This array represents the complete "signature" of the collection. reference_full_signature_cte = ( select(self.agg_expression(signature_elements_cte.c.path_signature_string).label("signature_array")) .select_from(signature_elements_cte) .cte(f"reference_full_signature_{k}_{value_index}") ) candidate_hdca = aliased(model.HistoryDatasetCollectionAssociation, name="candidate_hdca") candidate_hdca_history = aliased(model.History, name="candidate_hdca_history") candidate_root_collection = aliased(model.DatasetCollection, name="candidate_root_collection") candidate_dce_list = [ aliased(model.DatasetCollectionElement, name=f"candidate_dce_{i}") for i in range(depth + 1) ] candidate_hda = aliased(model.HistoryDatasetAssociation, name="candidate_hda") # --- NEW CTE: candidate_hdca_pre_filter_ids_cte (First Pass Candidate Filtering) --- # This CTE performs a quick initial filter on candidate HDCAs. # It checks for: # 1. User permissions (published or owned by the current user). # 2. Whether the candidate HDCA contains any dataset IDs that are also present # in the reference HDCA (an overlap check). This is a broad filter to # reduce the number of candidates before more expensive signature generation. candidate_hdca_pre_filter_ids_select = ( select(candidate_hdca.id.label("candidate_hdca_id")) .distinct() .select_from(candidate_hdca) .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) ) for i in range(1, depth + 1): candidate_hdca_pre_filter_ids_select = candidate_hdca_pre_filter_ids_select.join( candidate_dce_list[i], candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, ) _leaf_candidate_dce_pre = candidate_dce_list[-1] candidate_hdca_pre_filter_ids_select = ( candidate_hdca_pre_filter_ids_select.join(candidate_hda, candidate_hda.id == _leaf_candidate_dce_pre.hda_id) .where(or_(candidate_hdca_history.user_id == user_id, candidate_hdca_history.published == true())) .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) ) candidate_hdca_pre_filter_ids_cte = candidate_hdca_pre_filter_ids_select.cte( f"cand_hdca_pre_filter_ids_{k}_{value_index}" ) # --- END NEW CTE --- # CTE 3: candidate_signature_elements_cte # This CTE generates the path signature string for each element of the # *pre-filtered candidate* HDCAs. candidate_signature_elements_select = ( select( candidate_hdca.id.label("candidate_hdca_id"), func.concat_ws( ";", *[candidate_dce_list[i].element_identifier for i in range(depth + 1)], candidate_hda.dataset_id.cast(sqlalchemy.Text), ).label("path_signature_string"), ) .select_from(candidate_hdca) # Apply the pre-filter here to limit the candidates for full signature generation .where(candidate_hdca.id.in_(select(candidate_hdca_pre_filter_ids_cte.c.candidate_hdca_id))) .join(candidate_hdca_history, candidate_hdca_history.id == candidate_hdca.history_id) .join(candidate_root_collection, candidate_root_collection.id == candidate_hdca.collection_id) .join(candidate_dce_list[0], candidate_dce_list[0].dataset_collection_id == candidate_root_collection.id) .where(or_(candidate_hdca_history.user_id == user_id, candidate_hdca_history.published == true())) ) for i in range(1, depth + 1): candidate_signature_elements_select = candidate_signature_elements_select.join( candidate_dce_list[i], candidate_dce_list[i].dataset_collection_id == candidate_dce_list[i - 1].child_collection_id, ) _leaf_candidate_dce = candidate_dce_list[-1] candidate_signature_elements_select = candidate_signature_elements_select.join( candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id ) candidate_signature_elements_cte = candidate_signature_elements_select.cte( f"candidate_signature_elements_{k}_{value_index}" ) # CTE 4: candidate_full_signatures_cte # This CTE aggregates the path signature strings for the candidate HDCAs into # ordered arrays, similar to the reference's full signature. candidate_full_signatures_cte = ( select( candidate_signature_elements_cte.c.candidate_hdca_id, self.agg_expression(candidate_signature_elements_cte.c.path_signature_string).label( "full_signature_array" ), ) .select_from(candidate_signature_elements_cte) .group_by(candidate_signature_elements_cte.c.candidate_hdca_id) .cte(f"candidate_full_signatures_{k}_{value_index}") ) # CTE 5: equivalent_hdca_ids_cte # This final CTE identifies the HDCAs that are truly "equivalent" by # comparing their full signature array to the reference HDCA's full signature array. equivalent_hdca_ids_cte = ( select(candidate_full_signatures_cte.c.candidate_hdca_id.label("equivalent_id")) .where( candidate_full_signatures_cte.c.full_signature_array == select(reference_full_signature_cte.c.signature_array).scalar_subquery() ) .cte(f"equivalent_hdca_ids_{k}_{value_index}") ) # Main query `stmt` construction # This section joins the base job statement with the associations and then filters # by the HDCAs identified as equivalent in the CTEs. labeled_col = a.dataset_collection_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join( hdca_input, and_( hdca_input.id == a.dataset_collection_id, # Filter the main query to only include HDCAs found in the # 'equivalent_hdca_ids_cte'. hdca_input.id.in_(select(equivalent_hdca_ids_cte.c.equivalent_id)), ), ) used_ids.append(labeled_col) data_conditions.append(a.name == k) return stmt def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, value_index): dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v) # Determine if the target DCE points to an HDA or a child collection if dce_root_target.child_collection_id: # This DCE represents a collection, apply the signature comparison approach target_collection_id = dce_root_target.child_collection_id collection_type = self.sa_session.scalar( select(model.DatasetCollection.collection_type).where( model.DatasetCollection.id == target_collection_id ) ) depth = collection_type.count(":") if collection_type else 0 # Aliases for the target DCE's collection structure _dce_target_root_ref = aliased( model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{value_index}" ) _dce_target_child_collection_ref = aliased( model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{value_index}" ) # List of aliases for each potential nested level of DatasetCollectionElements _dce_target_level_list = [ aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{value_index}_{i}") for i in range(depth + 1) ] _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{value_index}") # --- CTE: reference_dce_all_dataset_ids_cte --- # This CTE (Common Table Expression) identifies all distinct dataset IDs # that are part of the *reference* dataset collection (the one we're searching for). # This helps in the initial filtering of candidate collections. reference_all_dataset_ids_select = ( select(_hda_target_ref.dataset_id.label("ref_dataset_id_for_overlap")) .select_from(_dce_target_root_ref) .join( _dce_target_child_collection_ref, _dce_target_child_collection_ref.id == _dce_target_root_ref.child_collection_id, ) .join( _dce_target_level_list[0], _dce_target_level_list[0].dataset_collection_id == _dce_target_child_collection_ref.id, ) ) # Dynamically add joins for each nested level of the collection for i in range(1, depth + 1): reference_all_dataset_ids_select = reference_all_dataset_ids_select.join( _dce_target_level_list[i], _dce_target_level_list[i].dataset_collection_id == _dce_target_level_list[i - 1].child_collection_id, ) _leaf_target_dce_ref = _dce_target_level_list[-1] reference_all_dataset_ids_select = ( reference_all_dataset_ids_select.join( _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id ) .where(_dce_target_root_ref.id == v) .distinct() ) reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}") # --- CTE: reference_dce_signature_elements_cte --- # This CTE generates a "path signature string" for each individual element # within the *reference* collection. This signature combines identifiers # from all levels of the collection and the dataset ID, providing a unique # identifier for each dataset's position within the collection structure. path_components = [ _dce_target_root_ref.element_identifier, *[_dce_target_level_list[i].element_identifier for i in range(depth + 1)], _hda_target_ref.dataset_id.cast(sqlalchemy.Text), # Ensure type for concat_ws ] reference_dce_signature_elements_select = ( select( func.concat_ws(";", *path_components).label("path_signature_string"), _hda_target_ref.dataset_id.label("raw_dataset_id_for_ordering"), # Keep original type for ordering ) .select_from(_dce_target_root_ref) .join( _dce_target_child_collection_ref, _dce_target_child_collection_ref.id == _dce_target_root_ref.child_collection_id, ) .join( _dce_target_level_list[0], _dce_target_level_list[0].dataset_collection_id == _dce_target_child_collection_ref.id, ) ) for i in range(1, depth + 1): reference_dce_signature_elements_select = reference_dce_signature_elements_select.join( _dce_target_level_list[i], _dce_target_level_list[i].dataset_collection_id == _dce_target_level_list[i - 1].child_collection_id, ) _leaf_target_dce_ref = _dce_target_level_list[-1] reference_dce_signature_elements_select = reference_dce_signature_elements_select.join( _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id ).where(_dce_target_root_ref.id == v) reference_dce_signature_elements_cte = reference_dce_signature_elements_select.cte( f"ref_dce_sig_els_{k}_{value_index}" ) # --- CTE: reference_full_signature_cte --- # This CTE aggregates the path signatures and dataset IDs of the *reference* # collection into ordered arrays. These arrays form the "full signature" # used for direct comparison with candidate collections. reference_full_signature_cte = ( select( self.agg_expression(reference_dce_signature_elements_cte.c.path_signature_string).label( "signature_array" ), self.agg_expression(reference_dce_signature_elements_cte.c.raw_dataset_id_for_ordering).label( "ordered_dataset_id_array" ), func.count(reference_dce_signature_elements_cte.c.path_signature_string).label( "element_count" ), # Count elements based on path_signature_string ) .select_from(reference_dce_signature_elements_cte) .cte(f"ref_dce_full_sig_{k}_{value_index}") ) # --- Aliases for Candidate Dataset Collection Structure --- # These aliases are used to represent potential matching dataset collections # in the database, which will be compared against the reference. candidate_dce_root = aliased(model.DatasetCollectionElement, name=f"candidate_dce_root_{k}_{v}") candidate_dce_child_collection = aliased( model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{value_index}" ) candidate_dce_level_list = [ aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{value_index}_{i}") for i in range(depth + 1) ] candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{value_index}") candidate_history = aliased(model.History, name=f"candidate_history_{k}_{value_index}") # --- CTE: candidate_dce_pre_filter_ids_cte (Initial Candidate Filtering) --- # This CTE performs a first pass to quickly narrow down potential candidate # dataset collections. It checks for: # 1. Existence of a child collection (ensuring it's a collection, not a single HDA). # 2. User permissions (published or owned by the current user). # 3. Overlap in *any* dataset IDs with the reference collection. candidate_dce_pre_filter_ids_select = ( select(candidate_dce_root.id.label("candidate_dce_id")) .distinct() .select_from(candidate_dce_root) .where(candidate_dce_root.child_collection_id.isnot(None)) .join( candidate_dce_child_collection, candidate_dce_child_collection.id == candidate_dce_root.child_collection_id, ) .join( candidate_dce_level_list[0], candidate_dce_level_list[0].dataset_collection_id == candidate_dce_child_collection.id, ) ) for i in range(1, depth + 1): candidate_dce_pre_filter_ids_select = candidate_dce_pre_filter_ids_select.join( candidate_dce_level_list[i], candidate_dce_level_list[i].dataset_collection_id == candidate_dce_level_list[i - 1].child_collection_id, ) _leaf_candidate_dce_pre = candidate_dce_level_list[-1] candidate_dce_pre_filter_ids_select = ( candidate_dce_pre_filter_ids_select.join( candidate_hda, candidate_hda.id == _leaf_candidate_dce_pre.hda_id ) .join(candidate_history, candidate_history.id == candidate_hda.history_id) .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap))) ) candidate_dce_pre_filter_ids_cte = candidate_dce_pre_filter_ids_select.cte( f"cand_dce_pre_filter_ids_{k}_{value_index}" ) # --- CTE: candidate_dce_signature_elements_cte --- # This CTE calculates the path signature string and raw dataset ID for each # element within the *pre-filtered candidate* collections. This is similar # to the reference signature elements CTE but for the candidates. candidate_path_components_fixed = [ candidate_dce_root.element_identifier, *[candidate_dce_level_list[i].element_identifier for i in range(depth + 1)], candidate_hda.dataset_id.cast(sqlalchemy.Text), # Ensure type for concat_ws ] candidate_dce_signature_elements_select = ( select( candidate_dce_root.id.label("candidate_dce_id"), func.concat_ws(";", *candidate_path_components_fixed).label("path_signature_string"), candidate_hda.dataset_id.label("dataset_id_for_ordered_array"), # This is now Integer ) .select_from(candidate_dce_root) # Apply the initial filter here! .where(candidate_dce_root.id.in_(select(candidate_dce_pre_filter_ids_cte.c.candidate_dce_id))) .where(candidate_dce_root.child_collection_id.isnot(None)) .join( candidate_dce_child_collection, candidate_dce_child_collection.id == candidate_dce_root.child_collection_id, ) .join( candidate_dce_level_list[0], candidate_dce_level_list[0].dataset_collection_id == candidate_dce_child_collection.id, ) ) # Add dynamic joins for nested levels for i in range(1, depth + 1): candidate_dce_signature_elements_select = candidate_dce_signature_elements_select.join( candidate_dce_level_list[i], candidate_dce_level_list[i].dataset_collection_id == candidate_dce_level_list[i - 1].child_collection_id, ) _leaf_candidate_dce = candidate_dce_level_list[-1] candidate_dce_signature_elements_select = ( candidate_dce_signature_elements_select.join( candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id ) .join(candidate_history, candidate_history.id == candidate_hda.history_id) .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id)) ) candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte( f"cand_dce_sig_els_{k}_{value_index}" ) # --- CTE: candidate_pre_signatures_cte (Candidate Aggregation for Comparison) --- # This CTE aggregates the dataset IDs from the candidate collections into # ordered arrays, similar to `reference_full_signature_cte`. It also # counts the elements to ensure size consistency. candidate_pre_signatures_cte = ( select( candidate_dce_signature_elements_cte.c.candidate_dce_id, # Corrected array_agg syntax: pass column directly, use order_by keyword self.agg_expression(candidate_dce_signature_elements_cte.c.dataset_id_for_ordered_array).label( "candidate_ordered_dataset_ids_array" ), func.count(candidate_dce_signature_elements_cte.c.candidate_dce_id).label( "candidate_element_count" ), ) .select_from(candidate_dce_signature_elements_cte) .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) .cte(f"cand_dce_pre_sig_{k}_{value_index}") ) # --- CTE: filtered_cand_dce_by_dataset_ids_cte (Filtering by Element Count and Dataset ID Array) --- # This crucial CTE filters the candidate collections further by comparing: # 1. Their total element count with the reference collection's element count. # 2. Their ordered array of dataset IDs with the reference's ordered array. # This step ensures that candidate collections have the same number of elements # and contain the exact same datasets, in the same logical order (based on path). filtered_cand_dce_by_dataset_ids_cte = ( select(candidate_pre_signatures_cte.c.candidate_dce_id) .select_from(candidate_pre_signatures_cte, reference_full_signature_cte) .where( and_( candidate_pre_signatures_cte.c.candidate_element_count == reference_full_signature_cte.c.element_count, candidate_pre_signatures_cte.c.candidate_ordered_dataset_ids_array == reference_full_signature_cte.c.ordered_dataset_id_array, ) ) .cte(f"filtered_cand_dce_{k}_{value_index}") ) # --- CTE: final_candidate_signatures_cte (Final Full Signature Calculation for Matched Candidates) --- # For the candidates that passed the previous filtering, this CTE calculates # their full path signature array. This signature represents the complete # structural and content identity of the collection. final_candidate_signatures_cte = ( select( candidate_dce_signature_elements_cte.c.candidate_dce_id, self.agg_expression(candidate_dce_signature_elements_cte.c.path_signature_string).label( "full_signature_array" ), ) .select_from(candidate_dce_signature_elements_cte) .where( candidate_dce_signature_elements_cte.c.candidate_dce_id.in_( select(filtered_cand_dce_by_dataset_ids_cte.c.candidate_dce_id) ) ) .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id) .cte(f"final_cand_dce_full_sig_{k}_{value_index}") ) # --- Main Query Construction for Dataset Collection Elements --- # This section joins the main `stmt` (representing jobs) with the CTEs # to filter jobs whose input DCE matches the reference DCE's full signature. a = aliased( model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{value_index}", ) labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) input_dce = aliased(model.DatasetCollectionElement) stmt = stmt.join( input_dce, and_( input_dce.id == a.dataset_collection_element_id, # The final filter: ensure the input DCE's ID is among those candidates # whose full signature array *exactly matches* the reference's signature array. input_dce.id.in_( select(final_candidate_signatures_cte.c.candidate_dce_id).where( final_candidate_signatures_cte.c.full_signature_array == select(reference_full_signature_cte.c.signature_array).scalar_subquery() ) ), ), ) data_conditions.append(a.name == k) used_ids.append(labeled_col) return stmt else: # DCE points directly to an HDA (dce_root_target.hda_id is not None and child_collection_id is None) # For this simple case, the full signature array comparison for nested collections doesn't apply. # We can use a direct comparison of the HDA and element_identifier. # This logic needs to align with how this type of DCE was previously matched. # Aliases for the "left" side (job to input DCE path) a = aliased( model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{value_index}", ) dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{value_index}") hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{value_index}") # Aliases for the "right" side (target DCE path in the main query) dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{value_index}") hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{value_index}") # Start joins from job → input DCE association → first-level DCE (left side) labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}") stmt = stmt.add_columns(labeled_col) stmt = stmt.join(a, a.job_id == model.Job.id) stmt = stmt.join(dce_left, dce_left.id == a.dataset_collection_element_id) stmt = stmt.join(hda_left, hda_left.id == dce_left.hda_id) # Join to HDA for left side # Join to target DCE (v) directly (right side) stmt = stmt.join(dce_right, dce_right.id == v) stmt = stmt.join(hda_right, hda_right.id == dce_right.hda_id) # Join to HDA for right side # Compare element identifiers and dataset IDs data_conditions.append( and_( a.name == k, dce_left.element_identifier == dce_right.element_identifier, hda_left.dataset_id == hda_right.dataset_id, # Direct dataset_id comparison ) ) used_ids.append(labeled_col) return stmt
[docs] def view_show_job(trans, job: Job, full: bool) -> Dict: is_admin = trans.user_is_admin job_dict = job.to_dict("element", system_details=is_admin) if trans.app.config.expose_dataset_path and "command_line" not in job_dict: job_dict["command_line"] = job.command_line if full: job_dict.update( dict( tool_stdout=job.tool_stdout, tool_stderr=job.tool_stderr, job_stdout=job.job_stdout, job_stderr=job.job_stderr, stderr=job.stderr, stdout=job.stdout, job_messages=job.job_messages, dependencies=job.dependencies, ) ) if is_admin: job_dict["user_email"] = job.get_user_email() job_dict["job_metrics"] = summarize_job_metrics(trans, job) return job_dict
[docs] def invocation_job_source_iter(sa_session, invocation_id): # TODO: Handle subworkflows. join = model.WorkflowInvocationStep.table.join(model.WorkflowInvocation) statement = ( select( model.WorkflowInvocationStep.job_id, model.WorkflowInvocationStep.implicit_collection_jobs_id, model.WorkflowInvocationStep.state, ) .select_from(join) .where(model.WorkflowInvocation.id == invocation_id) ) for row in sa_session.execute(statement): if row[0]: yield ("Job", row[0], row[2]) if row[1]: yield ("ImplicitCollectionJobs", row[1], row[2])
[docs] def get_job_metrics_for_invocation(sa_session: galaxy_scoped_session, invocation_id: int): single_job_stmnt = ( select(WorkflowStep.order_index, Job.id, Job.tool_id, WorkflowStep.label, JobMetricNumeric) .join(Job, JobMetricNumeric.job_id == Job.id) .join( WorkflowInvocationStep, and_( WorkflowInvocationStep.workflow_invocation_id == invocation_id, WorkflowInvocationStep.job_id == Job.id ), ) .join(WorkflowStep, WorkflowStep.id == WorkflowInvocationStep.workflow_step_id) ) collection_job_stmnt = ( select(WorkflowStep.order_index, Job.id, Job.tool_id, WorkflowStep.label, JobMetricNumeric) .join(Job, JobMetricNumeric.job_id == Job.id) .join(ImplicitCollectionJobsJobAssociation, Job.id == ImplicitCollectionJobsJobAssociation.job_id) .join( ImplicitCollectionJobs, ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id, ) .join( WorkflowInvocationStep, and_( WorkflowInvocationStep.workflow_invocation_id == invocation_id, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id, ), ) .join(WorkflowStep, WorkflowStep.id == WorkflowInvocationStep.workflow_step_id) ) # should be sa_session.execute(single_job_stmnt.union(collection_job_stmnt)).all() but that returns # columns instead of the job metrics ORM instance. return sorted( (*sa_session.execute(single_job_stmnt).all(), *sa_session.execute(collection_job_stmnt).all()), key=lambda row: row[0], )
[docs] def fetch_job_states(sa_session, job_source_ids, job_source_types): assert len(job_source_ids) == len(job_source_types) job_ids = set() implicit_collection_job_ids = set() workflow_invocations_job_sources = {} workflow_invocation_states = ( {} ) # should be set before we walk step states to be conservative on whether things are done expanding yet for job_source_id, job_source_type in zip(job_source_ids, job_source_types): if job_source_type == "Job": job_ids.add(job_source_id) elif job_source_type == "ImplicitCollectionJobs": implicit_collection_job_ids.add(job_source_id) elif job_source_type == "WorkflowInvocation": invocation_state = sa_session.get(model.WorkflowInvocation, job_source_id).state workflow_invocation_states[job_source_id] = invocation_state workflow_invocation_job_sources = [] for ( invocation_step_source_type, invocation_step_source_id, invocation_step_state, ) in invocation_job_source_iter(sa_session, job_source_id): workflow_invocation_job_sources.append( (invocation_step_source_type, invocation_step_source_id, invocation_step_state) ) if invocation_step_source_type == "Job": job_ids.add(invocation_step_source_id) elif invocation_step_source_type == "ImplicitCollectionJobs": implicit_collection_job_ids.add(invocation_step_source_id) workflow_invocations_job_sources[job_source_id] = workflow_invocation_job_sources else: raise RequestParameterInvalidException(f"Invalid job source type {job_source_type} found.") job_summaries = {} implicit_collection_jobs_summaries = {} for job_id in job_ids: job_summaries[job_id] = summarize_jobs_to_dict(sa_session, sa_session.get(Job, job_id)) for implicit_collection_jobs_id in implicit_collection_job_ids: implicit_collection_jobs_summaries[implicit_collection_jobs_id] = summarize_jobs_to_dict( sa_session, sa_session.get(model.ImplicitCollectionJobs, implicit_collection_jobs_id) ) rval = [] for job_source_id, job_source_type in zip(job_source_ids, job_source_types): if job_source_type == "Job": rval.append(job_summaries[job_source_id]) elif job_source_type == "ImplicitCollectionJobs": rval.append(implicit_collection_jobs_summaries[job_source_id]) else: invocation_state = workflow_invocation_states[job_source_id] invocation_job_summaries = [] invocation_implicit_collection_job_summaries = [] invocation_step_states = [] for ( invocation_step_source_type, invocation_step_source_id, invocation_step_state, ) in workflow_invocations_job_sources[job_source_id]: invocation_step_states.append(invocation_step_state) if invocation_step_source_type == "Job": invocation_job_summaries.append(job_summaries[invocation_step_source_id]) else: invocation_implicit_collection_job_summaries.append( implicit_collection_jobs_summaries[invocation_step_source_id] ) rval.append( summarize_invocation_jobs( job_source_id, invocation_job_summaries, invocation_implicit_collection_job_summaries, invocation_state, invocation_step_states, ) ) return rval
[docs] def summarize_invocation_jobs( invocation_id, job_summaries, implicit_collection_job_summaries, invocation_state, invocation_step_states ): states = {} if invocation_state == "scheduled": all_scheduled = True for invocation_step_state in invocation_step_states: all_scheduled = all_scheduled and invocation_step_state == "scheduled" if all_scheduled: populated_state = "ok" else: populated_state = "new" elif invocation_state in ["cancelled", "failed"]: populated_state = "failed" else: # call new, ready => new populated_state = "new" def merge_states(component_states): for key, value in component_states.items(): if key not in states: states[key] = value else: states[key] += value for job_summary in job_summaries: merge_states(job_summary["states"]) for implicit_collection_job_summary in implicit_collection_job_summaries: # 'new' (un-populated collections might not yet have a states entry) if "states" in implicit_collection_job_summary: merge_states(implicit_collection_job_summary["states"]) component_populated_state = implicit_collection_job_summary["populated_state"] if component_populated_state == "failed": populated_state = "failed" elif component_populated_state == "new" and populated_state != "failed": populated_state = "new" rval = { "id": invocation_id, "model": "WorkflowInvocation", "states": states, "populated_state": populated_state, } return rval
[docs] class JobsSummary(TypedDict): populated_state: str states: Dict[str, int] model: str id: int
[docs] def summarize_jobs_to_dict(sa_session, jobs_source) -> Optional[JobsSummary]: """Produce a summary of jobs for job summary endpoints. :type jobs_source: a Job or ImplicitCollectionJobs or None :param jobs_source: the object to summarize :rtype: dict :returns: dictionary containing job summary information """ rval: Optional[JobsSummary] = None if jobs_source is None: pass elif isinstance(jobs_source, model.Job): rval = { "populated_state": "ok", "states": {jobs_source.state: 1}, "model": "Job", "id": jobs_source.id, } else: populated_state = jobs_source.populated_state rval = { "id": jobs_source.id, "populated_state": populated_state, "model": "ImplicitCollectionJobs", "states": {}, } if populated_state == "ok": # produce state summary... join = model.ImplicitCollectionJobs.table.join( model.ImplicitCollectionJobsJobAssociation.table.join(model.Job) ) statement = ( select(model.Job.state, func.count()) .select_from(join) .where(model.ImplicitCollectionJobs.id == jobs_source.id) .group_by(model.Job.state) ) for row in sa_session.execute(statement): rval["states"][row[0]] = row[1] return rval
[docs] def summarize_job_metrics(trans, job): """Produce a dict-ified version of job metrics ready for tabular rendering. Precondition: the caller has verified the job is accessible to the user represented by the trans parameter. """ return summarize_metrics(trans, job.metrics)
[docs] def summarize_metrics(trans: ProvidesUserContext, job_metrics): safety_level = Safety.SAFE if trans.user_is_admin: safety_level = Safety.UNSAFE elif trans.app.config.expose_potentially_sensitive_job_metrics: safety_level = Safety.POTENTIALLY_SENSITVE raw_metrics = [ RawMetric( m.metric_name, m.metric_value, m.plugin, ) for m in job_metrics ] dictifiable_metrics = trans.app.job_metrics.dictifiable_metrics(raw_metrics, safety_level) return [d.dict() for d in dictifiable_metrics]
[docs] def summarize_destination_params(trans, job): """Produce a dict-ified version of job destination parameters ready for tabular rendering. Precondition: the caller has verified the job is accessible to the user represented by the trans parameter. """ destination_params = { "Runner": job.job_runner_name, "Runner Job ID": job.job_runner_external_id, "Handler": job.handler, } if job_destination_params := job.destination_params: destination_params.update(job_destination_params) return destination_params
[docs] def summarize_job_parameters(trans: ProvidesUserContext, job: Job): """Produce a dict-ified version of job parameters ready for tabular rendering. Precondition: the caller has verified the job is accessible to the user represented by the trans parameter. """ # More client logic here than is ideal but it is hard to reason about # tool parameter types on the client relative to the server. def inputs_recursive(input_params, param_values, depth=1, upgrade_messages=None): if upgrade_messages is None: upgrade_messages = {} rval = [] for input in input_params.values(): if input.name in param_values: input_value = param_values[input.name] if input.type == "repeat": for i in range(len(input_value)): rval.extend(inputs_recursive(input.inputs, input_value[i], depth=depth + 1)) elif input.type == "section": # Get the value of the current Section parameter rval.append(dict(text=input.name, depth=depth)) rval.extend( inputs_recursive( input.inputs, input_value, depth=depth + 1, upgrade_messages=upgrade_messages.get(input.name), ) ) elif input.type == "conditional": try: current_case = input_value["__current_case__"] is_valid = True except Exception: current_case = None is_valid = False if is_valid: rval.append( dict( text=input.test_param.label or input.test_param.name, depth=depth, value=input.cases[current_case].value, ) ) rval.extend( inputs_recursive( input.cases[current_case].inputs, input_value, depth=depth + 1, upgrade_messages=upgrade_messages.get(input.name), ) ) else: rval.append( dict( text=input.name, depth=depth, notes="The previously used value is no longer valid.", error=True, ) ) elif input.type == "upload_dataset": rval.append( dict( text=input.group_title(param_values), depth=depth, value=f"{len(input_value)} uploaded datasets", ) ) elif ( input.type == "data" or input.type == "data_collection" or isinstance(input_value, model.HistoryDatasetAssociation) ): value: List[Union[Dict[str, Any], None]] = [] for element in listify(input_value): if isinstance(element, model.HistoryDatasetAssociation): hda = element value.append({"src": "hda", "id": element.id, "hid": hda.hid, "name": hda.name}) elif isinstance(element, model.DatasetCollectionElement): value.append({"src": "dce", "id": element.id, "name": element.element_identifier}) elif isinstance(element, model.HistoryDatasetCollectionAssociation): value.append({"src": "hdca", "id": element.id, "hid": element.hid, "name": element.name}) elif element is None: value.append(None) else: raise Exception( f"Unhandled data input parameter type encountered {element.__class__.__name__}" ) rval.append(dict(text=input.label or input.name, depth=depth, value=value)) elif input.visible: if hasattr(input, "label") and input.label: label = input.label else: # value for label not required, fallback to input name (same as tool panel) label = input.name rval.append( dict( text=label, depth=depth, value=input.value_to_display_text(input_value), notes=upgrade_messages.get(input.name, ""), ) ) else: # Parameter does not have a stored value. # Get parameter label. if input.type == "conditional": label = input.test_param.label else: label = input.label or input.name rval.append( dict(text=label, depth=depth, notes="not used (parameter was added after this job was run)") ) return rval # Load the tool app = trans.app toolbox = app.toolbox tool_uuid = None if dynamic_tool := job.dynamic_tool: tool_uuid = dynamic_tool.uuid tool = toolbox.get_tool(job.tool_id, job.tool_version, tool_uuid=tool_uuid, user=trans.user) params_objects = None parameters = [] upgrade_messages = {} has_parameter_errors = False # Load parameter objects, if a parameter type has changed, it's possible for the value to no longer be valid if tool: try: params_objects = job.get_param_values(app, ignore_errors=False) except Exception: params_objects = job.get_param_values(app, ignore_errors=True) # use different param_objects in the following line, since we want to display original values as much as possible upgrade_messages = tool.check_and_update_param_values( job.get_param_values(app, ignore_errors=True), trans, update_values=False ) has_parameter_errors = True parameters = inputs_recursive(tool.inputs, params_objects, depth=1, upgrade_messages=upgrade_messages) else: has_parameter_errors = True return { "parameters": parameters, "has_parameter_errors": has_parameter_errors, "outputs": summarize_job_outputs(job=job, tool=tool, params=params_objects), }
[docs] def get_output_name(tool, output, params): try: return tool.tool_action.get_output_name( output, tool=tool, params=params, ) except Exception: pass
[docs] def summarize_job_outputs(job: model.Job, tool, params): outputs = defaultdict(list) output_labels = {} possible_outputs = ( ("hda", "dataset_id", job.output_datasets), ("ldda", "ldda_id", job.output_library_datasets), ("hdca", "dataset_collection_id", job.output_dataset_collection_instances), ) for src, attribute, output_associations in possible_outputs: output_associations = cast(List, output_associations) # during iteration, mypy sees it as object for output_association in output_associations: output_name = output_association.name if output_name not in output_labels and tool: tool_output = tool.output_collections if src == "hdca" else tool.outputs output_labels[output_name] = get_output_name( tool=tool, output=tool_output.get(output_name), params=params ) label = output_labels.get(output_name) outputs[output_name].append( { "label": label, "value": {"src": src, "id": getattr(output_association, attribute)}, } ) return outputs
[docs] def get_jobs_to_check_at_startup(session: galaxy_scoped_session, track_jobs_in_database: bool, config): if track_jobs_in_database: in_list = (Job.states.QUEUED, Job.states.RUNNING, Job.states.STOPPED) else: in_list = (Job.states.NEW, Job.states.QUEUED, Job.states.RUNNING) stmt = ( select(Job) .execution_options(yield_per=YIELD_PER_ROWS) .filter(Job.state.in_(in_list) & (Job.handler == config.server_name)) ) if config.user_activation_on: # Filter out the jobs of inactive users. stmt = stmt.outerjoin(User).filter(or_((Job.user_id == null()), (User.active == true()))) return session.scalars(stmt).all()
[docs] def get_job(session: galaxy_scoped_session, *where_clauses): stmt = select(Job).where(*where_clauses).limit(1) return session.scalars(stmt).first()