Source code for galaxy.tools

"""
Classes encapsulating galaxy tools and tool configuration.
"""

import itertools
import json
import logging
import math
import os
import re
import tarfile
import tempfile
from collections.abc import MutableMapping
from pathlib import Path
from typing import (
    Any,
    cast,
    Dict,
    List,
    NamedTuple,
    Optional,
    Set,
    Tuple,
    Type,
    TYPE_CHECKING,
    Union,
)
from urllib.parse import unquote_plus

import webob.exc
from mako.template import Template
from packaging.version import Version
from sqlalchemy import (
    delete,
    func,
    select,
)

from galaxy import (
    exceptions,
    model,
)
from galaxy.exceptions import (
    ToolInputsNotOKException,
    ToolInputsNotReadyException,
)
from galaxy.job_execution import output_collect
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model import (
    Job,
    StoredWorkflow,
)
from galaxy.model.base import transaction
from galaxy.tool_shed.util.repository_util import get_installed_repository
from galaxy.tool_shed.util.shed_util_common import set_image_paths
from galaxy.tool_util.deps import (
    build_dependency_manager,
    CachedDependencyManager,
    NullDependencyManager,
)
from galaxy.tool_util.fetcher import ToolLocationFetcher
from galaxy.tool_util.loader import (
    imported_macro_paths,
    raw_tool_xml_tree,
    template_macro_params,
)
from galaxy.tool_util.loader_directory import looks_like_a_tool
from galaxy.tool_util.ontologies.ontology_data import (
    biotools_reference,
    expand_ontology_data,
)
from galaxy.tool_util.output_checker import DETECTED_JOB_STATE
from galaxy.tool_util.parser import (
    get_tool_source,
    get_tool_source_from_representation,
    RequiredFiles,
    ToolOutputCollectionPart,
)
from galaxy.tool_util.parser.interface import (
    InputSource,
    PageSource,
    ToolSource,
)
from galaxy.tool_util.parser.xml import (
    XmlPageSource,
    XmlToolSource,
)
from galaxy.tool_util.provided_metadata import parse_tool_provided_metadata
from galaxy.tool_util.toolbox import (
    AbstractToolBox,
    AbstractToolTagManager,
    ToolSection,
)
from galaxy.tool_util.toolbox.views.sources import StaticToolBoxViewSources
from galaxy.tool_util.verify.interactor import ToolTestDescription
from galaxy.tool_util.verify.test_data import TestDataNotFoundError
from galaxy.tool_util.version import (
    LegacyVersion,
    parse_version,
)
from galaxy.tools import expressions
from galaxy.tools.actions import (
    DefaultToolAction,
    ToolAction,
)
from galaxy.tools.actions.data_manager import DataManagerToolAction
from galaxy.tools.actions.data_source import DataSourceToolAction
from galaxy.tools.actions.model_operations import ModelOperationToolAction
from galaxy.tools.cache import ToolDocumentCache
from galaxy.tools.evaluation import global_tool_errors
from galaxy.tools.imp_exp import JobImportHistoryArchiveWrapper
from galaxy.tools.parameters import (
    check_param,
    params_from_strings,
    params_to_incoming,
    params_to_strings,
    populate_state,
    visit_input_values,
)
from galaxy.tools.parameters.basic import (
    BaseURLToolParameter,
    BooleanToolParameter,
    ColumnListParameter,
    DataCollectionToolParameter,
    DataToolParameter,
    HiddenToolParameter,
    ImplicitConversionRequired,
    SelectTagParameter,
    SelectToolParameter,
    ToolParameter,
)
from galaxy.tools.parameters.dataset_matcher import (
    set_dataset_matcher_factory,
    unset_dataset_matcher_factory,
)
from galaxy.tools.parameters.grouping import (
    Conditional,
    ConditionalWhen,
    Group,
    Repeat,
    Section,
    UploadDataset,
)
from galaxy.tools.parameters.input_translation import ToolInputTranslator
from galaxy.tools.parameters.meta import expand_meta_parameters
from galaxy.tools.parameters.workflow_utils import workflow_building_modes
from galaxy.tools.parameters.wrapped_json import json_wrap
from galaxy.tools.test import parse_tests
from galaxy.util import (
    in_directory,
    listify,
    Params,
    parse_xml_string,
    parse_xml_string_to_etree,
    rst_to_html,
    string_as_bool,
    unicodify,
    UNKNOWN,
    XML,
)
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import get_fileobj_raw
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.expressions import ExpressionContext
from galaxy.util.form_builder import SelectField
from galaxy.util.json import (
    safe_loads,
    swap_inf_nan,
)
from galaxy.util.rules_dsl import RuleSet
from galaxy.util.template import (
    fill_template,
    refactoring_tool,
)
from galaxy.util.tool_shed.common_util import (
    get_tool_shed_repository_url,
    get_tool_shed_url_from_tool_shed_registry,
)
from galaxy.version import VERSION_MAJOR
from galaxy.work.context import proxy_work_context_for_history
from .execute import (
    execute as execute_job,
    MappingParameters,
)

if TYPE_CHECKING:
    from galaxy.app import UniverseApplication
    from galaxy.managers.jobs import JobSearch
    from galaxy.tools.actions.metadata import SetMetadataToolAction

log = logging.getLogger(__name__)

REQUIRES_JS_RUNTIME_MESSAGE = (
    "The tool [%s] requires a nodejs runtime to execute "
    "but node or nodejs could not be found. Please contact the Galaxy adminstrator"
)

MODEL_TOOLS_PATH = os.path.abspath(os.path.dirname(__file__))
# Tools that require Galaxy's Python environment to be preserved.
GALAXY_LIB_TOOLS_UNVERSIONED = [
    "upload1",
    "send_to_cloud",
    "__DATA_FETCH__",
    "directory_uri",
    "export_remote",
    # Legacy tools bundled with Galaxy.
    "laj_1",
    "gff2bed1",
    "gff_filter_by_feature_count",
    "Interval_Maf_Merged_Fasta2",
    "GeneBed_Maf_Fasta2",
    "maf_stats1",
    "Interval2Maf_pairwise1",
    "MAF_To_Interval1",
    "MAF_filter",
    "MAF_To_Fasta1",
    "MAF_Reverse_Complement_1",
    "MAF_split_blocks_by_species1",
    "MAF_Limit_To_Species1",
    "maf_by_block_number1",
    # Converters
    "CONVERTER_bed_to_fli_0",
    "CONVERTER_gff_to_fli_0",
    "CONVERTER_gff_to_interval_index_0",
    "CONVERTER_maf_to_fasta_0",
    "CONVERTER_maf_to_interval_0",
    # Tools improperly migrated to the tool shed (devteam)
    "qualityFilter",
    "pileup_interval",
    "count_gff_features",
    "lastz_paired_reads_wrapper",
    "subRate1",
    "find_diag_hits",
    # Tools improperly migrated using Galaxy (from shed other)
    "column_join",
    "gd_coverage_distributions",  # Genome Diversity tools from miller-lab
    "gd_dpmix",
    "gd_pca",
    "gd_phylogenetic_tree",
    "gd_population_structure",
    "gd_prepare_population_structure",
]
# Tools that needed galaxy on the PATH in the past but no longer do along
# with the version at which they were fixed.
GALAXY_LIB_TOOLS_VERSIONED = {
    "meme_fimo": parse_version("5.0.5"),
    "Extract genomic DNA 1": parse_version("3.0.0"),
    "fetchflank": parse_version("1.0.1"),
    "gops_intersect_1": parse_version("1.0.0"),
    "lastz_wrapper_2": parse_version("1.3"),
    "PEsortedSAM2readprofile": parse_version("1.1.1"),
    "sam_to_bam": parse_version("1.1.3"),
    "sam_pileup": parse_version("1.1.3"),
    "vcf_to_maf_customtrack1": parse_version("1.0.1"),
    "secure_hash_message_digest": parse_version("0.0.2"),
    "join1": parse_version("2.1.3"),
    "wiggle2simple1": parse_version("1.0.1"),
    "CONVERTER_wiggle_to_interval_0": parse_version("1.0.1"),
    "aggregate_scores_in_intervals2": parse_version("1.1.4"),
    "CONVERTER_fastq_to_fqtoc0": parse_version("1.0.1"),
    "CONVERTER_tar_to_directory": parse_version("1.0.1"),
    "tabular_to_dbnsfp": parse_version("1.0.1"),
    "cufflinks": parse_version("2.2.1.3"),
    "Convert characters1": parse_version("1.0.1"),
    "substitutions1": parse_version("1.0.1"),
    "winSplitter": parse_version("1.0.1"),
    "Interval2Maf1": parse_version("1.0.1+galaxy0"),
}

REQUIRE_FULL_DIRECTORY = {
    "includes": [{"path": "**", "path_type": "glob"}],
}
IMPLICITLY_REQUIRED_TOOL_FILES: Dict[str, Dict] = {
    "deseq2": {
        "version": parse_version("2.11.40.6"),
        "required": {"includes": [{"path": "*.R", "path_type": "glob"}]},
    },
    # minimum example:
    # "foobar": {"required": REQUIRE_FULL_DIRECTORY}
    # if no version is specified, all versions without explicit RequiredFiles will be selected
    "circos": {"required": REQUIRE_FULL_DIRECTORY},
    "cp_image_math": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
    "enumerate_charges": {"required": REQUIRE_FULL_DIRECTORY},
    "fasta_compute_length": {"required": {"includes": [{"path": "utils/*", "path_type": "glob"}]}},
    "fasta_concatenate0": {"required": {"includes": [{"path": "utils/*", "path_type": "glob"}]}},
    "filter_tabular": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
    "flanking_features_1": {"required": {"includes": [{"path": "utils/*", "path_type": "glob"}]}},
    "gops_intersect_1": {"required": {"includes": [{"path": "utils/*", "path_type": "glob"}]}},
    "gops_subtract_1": {"required": {"includes": [{"path": "utils/*", "path_type": "glob"}]}},
    "maxquant": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
    "maxquant_mqpar": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
    "query_tabular": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
    "shasta": {"required": {"includes": [{"path": "configs/*", "path_type": "glob"}]}},
    "sqlite_to_tabular": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
    "sucos_max_score": {"required": {"includes": [{"path": "*.py", "path_type": "glob"}]}},
}


[docs]class safe_update(NamedTuple): min_version: Union[LegacyVersion, Version] current_version: Union[LegacyVersion, Version]
# Tool updates that did not change parameters in a way that requires rebuilding workflows WORKFLOW_SAFE_TOOL_VERSION_UPDATES = { "Filter1": safe_update(parse_version("1.1.0"), parse_version("1.1.1")), "__BUILD_LIST__": safe_update(parse_version("1.0.0"), parse_version("1.1.0")), "__APPLY_RULES__": safe_update(parse_version("1.0.0"), parse_version("1.1.0")), "__EXTRACT_DATASET__": safe_update(parse_version("1.0.0"), parse_version("1.0.1")), "Grep1": safe_update(parse_version("1.0.1"), parse_version("1.0.4")), "Show beginning1": safe_update(parse_version("1.0.0"), parse_version("1.0.2")), "Show tail1": safe_update(parse_version("1.0.0"), parse_version("1.0.1")), "sort1": safe_update(parse_version("1.1.0"), parse_version("1.2.0")), "CONVERTER_interval_to_bgzip_0": safe_update(parse_version("1.0.1"), parse_version("1.0.2")), "CONVERTER_Bam_Bai_0": safe_update(parse_version("1.0.0"), parse_version("1.0.1")), "CONVERTER_cram_to_bam_0": safe_update(parse_version("1.0.1"), parse_version("1.0.2")), "CONVERTER_fasta_to_fai": safe_update(parse_version("1.0.0"), parse_version("1.0.1")), "CONVERTER_sam_to_bigwig_0": safe_update(parse_version("1.0.2"), parse_version("1.0.3")), "CONVERTER_bam_to_coodinate_sorted_bam": safe_update(parse_version("1.0.0"), parse_version("1.0.1")), "CONVERTER_bam_to_qname_sorted_bam": safe_update(parse_version("1.0.0"), parse_version("1.0.1")), }
[docs]def get_safe_version(tool: "Tool", requested_tool_version: str) -> Optional[str]: if tool.id: safe_version = WORKFLOW_SAFE_TOOL_VERSION_UPDATES.get(tool.id) if ( safe_version and tool.lineage and safe_version.current_version >= parse_version(requested_tool_version) >= safe_version.min_version ): # tool versions are sorted from old to new, so check newest version first for lineage_version in reversed(tool.lineage.tool_versions): if safe_version.current_version >= parse_version(lineage_version) >= safe_version.min_version: return lineage_version return None
[docs]class ToolNotFoundException(Exception): pass
[docs]def create_tool_from_source(app, tool_source, config_file=None, **kwds): # Allow specifying a different tool subclass to instantiate if (tool_module := tool_source.parse_tool_module()) is not None: module, cls = tool_module mod = __import__(module, globals(), locals(), [cls]) ToolClass = getattr(mod, cls) elif tool_source.parse_tool_type(): tool_type = tool_source.parse_tool_type() ToolClass = tool_types.get(tool_type) else: # Normal tool root = getattr(tool_source, "root", None) ToolClass = Tool tool = ToolClass(config_file, tool_source, app, **kwds) return tool
[docs]def create_tool_from_representation( app, raw_tool_source: str, tool_dir: str, tool_source_class="XmlToolSource" ) -> "Tool": tool_source = get_tool_source(tool_source_class=tool_source_class, raw_tool_source=raw_tool_source) return create_tool_from_source(app, tool_source=tool_source, tool_dir=tool_dir)
[docs]class NullToolTagManager(AbstractToolTagManager):
[docs] def reset_tags(self): return None
[docs] def handle_tags(self, tool_id, tool_definition_source): return None
[docs]class PersistentToolTagManager(AbstractToolTagManager):
[docs] def __init__(self, app): self.app = app self.sa_session = app.model.context
[docs] def reset_tags(self): log.info( f"removing all tool tag associations ({str(self.sa_session.scalar(select(func.count(self.app.model.ToolTagAssociation.id))))})" ) self.sa_session.execute(delete(self.app.model.ToolTagAssociation)) with transaction(self.sa_session): self.sa_session.commit()
[docs] def handle_tags(self, tool_id, tool_definition_source): elem = tool_definition_source if self.app.config.get_bool("enable_tool_tags", False): tag_names = elem.get("tags", "").split(",") for tag_name in tag_names: if tag_name == "": continue stmt = select(self.app.model.Tag).filter_by(name=tag_name).limit(1) tag = self.sa_session.scalars(stmt).first() if not tag: tag = self.app.model.Tag(name=tag_name) self.sa_session.add(tag) with transaction(self.sa_session): self.sa_session.commit() tta = self.app.model.ToolTagAssociation(tool_id=tool_id, tag_id=tag.id) self.sa_session.add(tta) with transaction(self.sa_session): self.sa_session.commit() else: for tagged_tool in tag.tagged_tools: if tagged_tool.tool_id == tool_id: break else: tta = self.app.model.ToolTagAssociation(tool_id=tool_id, tag_id=tag.id) self.sa_session.add(tta) with transaction(self.sa_session): self.sa_session.commit()
[docs]class ToolBox(AbstractToolBox): """ A derivative of AbstractToolBox with Galaxy tooling-specific functionality and knowledge about Tool internals - how to construct them, action types, dependency management, etc. """
[docs] def __init__(self, config_filenames, tool_root_dir, app, save_integrated_tool_panel=True): self._reload_count = 0 self.tool_location_fetcher = ToolLocationFetcher() self.cache_regions = {} # This is here to deal with the old default value, which doesn't make # sense in an "installed Galaxy" world. # FIXME: ./ if tool_root_dir == "./tools": tool_root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "bundled")) view_sources = StaticToolBoxViewSources( view_directories=app.config.panel_views_dir, view_dicts=app.config.panel_views, ) default_panel_view = app.config.default_panel_view super().__init__( config_filenames=config_filenames, tool_root_dir=tool_root_dir, app=app, view_sources=view_sources, default_panel_view=default_panel_view, save_integrated_tool_panel=save_integrated_tool_panel, ) # Load built-in converters if app.config.display_builtin_converters: self.load_builtin_converters() if old_toolbox := getattr(app, "toolbox", None): self.dependency_manager = old_toolbox.dependency_manager else: self._init_dependency_manager()
[docs] def tool_tag_manager(self): if hasattr(self.app.config, "get_bool") and self.app.config.get_bool("enable_tool_tags", False): return PersistentToolTagManager(self.app) else: return NullToolTagManager()
@property def sa_session(self): """ Returns a SQLAlchemy session """ return self.app.model.context
[docs] def reload_dependency_manager(self): self._init_dependency_manager()
[docs] def load_builtin_converters(self): id = "builtin_converters" section = ToolSection({"name": "Built-in Converters", "id": id}) self._tool_panel[id] = section converters = { tool for target in self.app.datatypes_registry.datatype_converters.values() for tool in target.values() } for tool in converters: tool.hidden = False section.elems.append_tool(tool)
[docs] def persist_cache(self, register_postfork=False): """ Persists any modified tool cache files to disk. Set ``register_postfork`` to stop database thread queue, close database connection and register re-open function that re-opens the database after forking. """ for region in self.cache_regions.values(): if not region.disabled: region.persist() if register_postfork: region.close() self.app.application_stack.register_postfork_function(region.reopen_ro)
[docs] def can_load_config_file(self, config_filename): if config_filename == self.app.config.shed_tool_config_file and not self.app.config.is_set( "shed_tool_config_file" ): if self.dynamic_confs(): # Do not load or create a default shed_tool_config_file if another shed_tool_config file has already been loaded return False elif self.app.config.is_set("tool_config_file"): log.warning( "The default shed tool config file (%s) has been added to the tool_config_file option, if this is " "not the desired behavior, please set shed_tool_config_file to your primary shed-enabled tool " "config file", self.app.config.shed_tool_config_file, ) return True
[docs] def has_reloaded(self, other_toolbox): return self._reload_count != other_toolbox._reload_count
@property def all_requirements(self): reqs = {req for _, tool in self.tools() for req in tool.tool_requirements} return [r.to_dict() for r in reqs] @property def tools_by_id(self): # Deprecated method, TODO - eliminate calls to this in test/. return self._tools_by_id
[docs] def get_cache_region(self, tool_cache_data_dir): if self.app.config.enable_tool_document_cache and tool_cache_data_dir: if tool_cache_data_dir not in self.cache_regions: self.cache_regions[tool_cache_data_dir] = ToolDocumentCache(cache_dir=tool_cache_data_dir) return self.cache_regions[tool_cache_data_dir]
[docs] def create_tool(self, config_file, tool_cache_data_dir=None, **kwds): cache = self.get_cache_region(tool_cache_data_dir) if config_file.endswith(".xml") and cache and not cache.disabled: tool_document = cache.get(config_file) if tool_document: tool_source = self.get_expanded_tool_source( config_file=config_file, xml_tree=parse_xml_string_to_etree(tool_document["document"]), macro_paths=tool_document["macro_paths"], ) else: tool_source = self.get_expanded_tool_source(config_file) cache.set(config_file, tool_source) else: tool_source = self.get_expanded_tool_source(config_file) return self._create_tool_from_source(tool_source, config_file=config_file, **kwds)
[docs] def get_expanded_tool_source(self, config_file, **kwargs): try: return get_tool_source( config_file, enable_beta_formats=getattr(self.app.config, "enable_beta_tool_formats", False), tool_location_fetcher=self.tool_location_fetcher, **kwargs, ) except Exception as e: # capture and log parsing errors global_tool_errors.add_error(config_file, "Tool XML parsing", e) raise e
def _create_tool_from_source(self, tool_source, **kwds): return create_tool_from_source(self.app, tool_source, **kwds)
[docs] def create_dynamic_tool(self, dynamic_tool, **kwds): tool_format = dynamic_tool.tool_format tool_representation = dynamic_tool.value tool_source = get_tool_source_from_representation( tool_format=tool_format, tool_representation=tool_representation, ) kwds["dynamic"] = True tool = self._create_tool_from_source(tool_source, **kwds) tool.dynamic_tool = dynamic_tool tool.uuid = dynamic_tool.uuid if not tool.id: tool.id = dynamic_tool.tool_id if not tool.name: tool.name = tool.id return tool
[docs] def get_tool_components(self, tool_id, tool_version=None, get_loaded_tools_by_lineage=False, set_selected=False): """ Retrieve all loaded versions of a tool from the toolbox and return a select list enabling selection of a different version, the list of the tool's loaded versions, and the specified tool. """ toolbox = self tool_version_select_field = None tools = [] tool = None # Backwards compatibility for datasource tools that have default tool_id configured, but which # are now using only GALAXY_URL. tool_ids = listify(tool_id) for tool_id in tool_ids: if tool_id.endswith("/"): # Some data sources send back redirects ending with `/`, this takes care of that case tool_id = tool_id[:-1] if get_loaded_tools_by_lineage: tools = toolbox.get_loaded_tools_by_lineage(tool_id) else: tools = toolbox.get_tool(tool_id, tool_version=tool_version, get_all_versions=True) if tools: tool = toolbox.get_tool(tool_id, tool_version=tool_version, get_all_versions=False) if len(tools) > 1: tool_version_select_field = self.__build_tool_version_select_field(tools, tool.id, set_selected) break return tool_version_select_field, tools, tool
def _path_template_kwds(self): return { "model_tools_path": MODEL_TOOLS_PATH, } def _get_tool_shed_repository(self, tool_shed, name, owner, installed_changeset_revision): # Abstract toolbox doesn't have a dependency on the database, so # override _get_tool_shed_repository here to provide this information. return get_installed_repository( self.app, tool_shed=tool_shed, name=name, owner=owner, installed_changeset_revision=installed_changeset_revision, from_cache=True, ) def _looks_like_a_tool(self, path): return looks_like_a_tool(path, enable_beta_formats=getattr(self.app.config, "enable_beta_tool_formats", False)) def _init_dependency_manager(self): use_tool_dependency_resolution = getattr(self.app, "use_tool_dependency_resolution", True) if not use_tool_dependency_resolution: self.dependency_manager = NullDependencyManager() return app_config_dict = self.app.config.config_dict conf_file = app_config_dict.get("dependency_resolvers_config_file") default_tool_dependency_dir = os.path.join( self.app.config.data_dir, self.app.config.schema.defaults["tool_dependency_dir"] ) self.dependency_manager = build_dependency_manager( app_config_dict=app_config_dict, conf_file=conf_file, default_tool_dependency_dir=default_tool_dependency_dir, ) def _load_workflow(self, workflow_id): """ Return an instance of 'Workflow' identified by `id`, which is encoded in the tool panel. """ id = self.app.security.decode_id(workflow_id) session = self.app.model.context stored = session.get(StoredWorkflow, id) return stored.latest_workflow def __build_tool_version_select_field(self, tools, tool_id, set_selected): """Build a SelectField whose options are the ids for the received list of tools.""" options: List[Tuple[str, str]] = [] for tool in tools: options.insert(0, (tool.version, tool.id)) select_field = SelectField(name="tool_id") for option_tup in options: selected = set_selected and option_tup[1] == tool_id if selected: select_field.add_option(f"version {option_tup[0]}", option_tup[1], selected=True) else: select_field.add_option(f"version {option_tup[0]}", option_tup[1]) return select_field
[docs]class DefaultToolState: """ Keeps track of the state of a users interaction with a tool between requests. """
[docs] def __init__(self): self.page = 0 self.rerun_remap_job_id = None self.inputs = {}
[docs] def initialize(self, trans, tool): """ Create a new `DefaultToolState` for this tool. It will be initialized with default values for inputs. Grouping elements are filled in recursively. """ self.inputs = {} context = ExpressionContext(self.inputs) for input in tool.inputs.values(): self.inputs[input.name] = input.get_initial_value(trans, context)
[docs] def encode(self, tool, app, nested=False): """ Convert the data to a string """ value = params_to_strings(tool.inputs, self.inputs, app, nested=nested) value["__page__"] = self.page value["__rerun_remap_job_id__"] = self.rerun_remap_job_id return value
[docs] def decode(self, values, tool, app): """ Restore the state from a string """ values = safe_loads(values) or {} self.page = values.pop("__page__") if "__page__" in values else None self.rerun_remap_job_id = values.pop("__rerun_remap_job_id__") if "__rerun_remap_job_id__" in values else None self.inputs = params_from_strings(tool.inputs, values, app, ignore_errors=True)
[docs] def copy(self): """ Shallow copy of the state """ new_state = DefaultToolState() new_state.page = self.page new_state.rerun_remap_job_id = self.rerun_remap_job_id new_state.inputs = self.inputs return new_state
class _Options(Bunch): sanitize: str refresh: str
[docs]class Tool(Dictifiable): """ Represents a computational tool that can be executed through Galaxy. """ job_tool_configurations: list tool_type = "default" requires_setting_metadata = True produces_entry_points = False default_tool_action = DefaultToolAction tool_action: ToolAction tool_type_local = False dict_collection_visible_keys = ["id", "name", "version", "description", "labels"] __help: Optional[Template] job_search: "JobSearch" version: str
[docs] def __init__( self, config_file, tool_source: ToolSource, app: "UniverseApplication", guid=None, repository_id=None, tool_shed_repository=None, allow_code_files=True, dynamic=False, tool_dir=None, ): """Load a tool from the config named by `config_file`""" # Determine the full path of the directory where the tool config is if config_file is not None: self.config_file = config_file self.tool_dir = tool_dir or os.path.dirname(config_file) else: self.config_file = None self.tool_dir = tool_dir self.app = app self.repository_id = repository_id self._allow_code_files = allow_code_files # setup initial attribute values self.stdio_exit_codes: List = [] self.stdio_regexes: List = [] self.inputs_by_page: List[Dict] = [] self.display_by_page: List = [] self.action: Union[str, Tuple[str, str]] = "/tool_runner/index" self.target = "galaxy_main" self.method = "post" self.labels: List = [] self.check_values = True self.nginx_upload = False self.input_required = False self.display_interface = True self.require_login = False self.rerun = False # This will be non-None for tools loaded from the database (DynamicTool objects). self.dynamic_tool = None # Define a place to keep track of all input These # differ from the inputs dictionary in that inputs can be page # elements like conditionals, but input_params are basic form # parameters like SelectField objects. This enables us to more # easily ensure that parameter dependencies like index files or # tool_data_table_conf.xml entries exist. self.input_params: List[ToolParameter] = [] # Attributes of tools installed from Galaxy tool sheds. self.tool_shed: Optional[str] = None self.repository_name = None self.repository_owner = None self.changeset_revision = None self.installed_changeset_revision = None self.sharable_url = None self.npages = 0 # The tool.id value will be the value of guid, but we'll keep the # guid attribute since it is useful to have. self.guid = guid self.old_id: Optional[str] = None self.python_template_version: Optional[Version] = None self._lineage = None self.dependencies: List = [] # populate toolshed repository info, if available self.populate_tool_shed_info(tool_shed_repository) # add tool resource parameters self.populate_resource_parameters(tool_source) self.tool_errors = None # Parse XML element containing configuration self.tool_source = tool_source self._is_workflow_compatible = None self.__help = None self.__tests: Optional[str] = None try: self.parse(tool_source, guid=guid, dynamic=dynamic) except Exception as e: global_tool_errors.add_error(config_file, "Tool Loading", e) raise e mem_optimize = getattr(self.tool_source, "mem_optimize", None) if mem_optimize is not None: mem_optimize() # The job search is only relevant in a galaxy context, and breaks # loading tools into the toolshed for validation. if self.app.name == "galaxy": self.job_search = self.app.job_search
[docs] def remove_from_cache(self): if source_path := self.tool_source.source_path: for region in self.app.toolbox.cache_regions.values(): region.delete(source_path)
@property def history_manager(self): return self.app.history_manager @property def _view(self): return self.app.dependency_resolvers_view @property def version_object(self): return parse_version(self.version) @property def sa_session(self): """Returns a SQLAlchemy session""" return self.app.model.context @property def lineage(self): """Return ToolLineage for this tool.""" return self._lineage @property def tool_versions(self): # If we have versions, return them. if self.lineage: return list(self.lineage.tool_versions) else: return [] @property def is_latest_version(self): tool_versions = self.tool_versions return not tool_versions or self.version == self.tool_versions[-1] @property def latest_version(self): if self.is_latest_version: return self else: return self.app.tool_cache.get_tool_by_id(self.lineage.get_versions()[-1].id) @property def is_datatype_converter(self): return self in self.app.datatypes_registry.converter_tools @property def tool_shed_repository(self): # If this tool is included in an installed tool shed repository, return it. if self.tool_shed: return get_installed_repository( self.app, tool_shed=self.tool_shed, name=self.repository_name, owner=self.repository_owner, installed_changeset_revision=self.installed_changeset_revision, from_cache=True, ) @property def produces_collections_with_unknown_structure(self): def output_is_dynamic(output): if not output.collection: return False return output.dynamic_structure return any(map(output_is_dynamic, self.outputs.values())) @property def valid_input_states(self): return model.Dataset.valid_input_states @property def requires_galaxy_python_environment(self): """Indicates this tool's runtime requires Galaxy's Python environment.""" # All special tool types (data source, history import/export, etc...) # seem to require Galaxy's Python. # FIXME: the (instantiated) tool class should emit this behavior, and not # use inspection by string check if self.tool_type not in ["default", "manage_data", "interactive", "data_source", "data_source_async"]: return True if self.tool_type == "manage_data" and Version(str(self.profile)) < Version("18.09"): return True if self.tool_type == "data_source" and Version(str(self.profile)) < Version("21.09"): return True if self.tool_type == "data_source_async" and self.profile < 24.0: return True config = self.app.config preserve_python_environment = config.preserve_python_environment if preserve_python_environment == "always": return True elif preserve_python_environment == "legacy_and_local" and self.tool_shed is None: return True else: unversioned_legacy_tool = self.old_id in GALAXY_LIB_TOOLS_UNVERSIONED versioned_legacy_tool = self.old_id in GALAXY_LIB_TOOLS_VERSIONED legacy_tool = unversioned_legacy_tool or ( versioned_legacy_tool and self.old_id and self.version_object < GALAXY_LIB_TOOLS_VERSIONED[self.old_id] ) return legacy_tool def __get_job_tool_configuration(self, job_params=None): """Generalized method for getting this tool's job configuration. :type job_params: dict or None :returns: `galaxy.jobs.JobToolConfiguration` -- JobToolConfiguration that matches this `Tool` and the given `job_params` """ rval = None if len(self.job_tool_configurations) == 1: # If there's only one config, use it rather than wasting time on comparisons rval = self.job_tool_configurations[0] elif job_params is None: for job_tool_config in self.job_tool_configurations: if not job_tool_config.params: rval = job_tool_config break else: for job_tool_config in self.job_tool_configurations: if job_tool_config.params: # There are job params and this config has params defined for param, value in job_params.items(): if param not in job_tool_config.params or job_tool_config.params[param] != value: break else: # All params match, use this config rval = job_tool_config break else: rval = job_tool_config assert ( rval is not None ), f"Could not get a job tool configuration for Tool {self.id} with job_params {job_params}, this is a bug" return rval
[docs] def get_configured_job_handler(self, job_params=None): """Get the configured job handler for this `Tool` given the provided `job_params`. Unlike the former ``get_job_handler()`` method, this does not perform "preassignment" (random selection of a configured handler ID from a tag). :param job_params: Any params specific to this job (e.g. the job source) :type job_params: dict or None :returns: str or None -- The configured handler for a job run of this `Tool` """ return self.__get_job_tool_configuration(job_params=job_params).handler
[docs] def get_job_destination(self, job_params=None): """ :returns: galaxy.jobs.JobDestination -- The destination definition and runner parameters. """ return self.app.job_config.get_destination(self.__get_job_tool_configuration(job_params=job_params).destination)
[docs] def get_panel_section(self): return self.app.toolbox.get_section_for_tool(self)
[docs] def allow_user_access(self, user, attempting_access=True): """ :returns: bool -- Whether the user is allowed to access the tool. """ if self.require_login and user is None: return False return True
[docs] def parse(self, tool_source: ToolSource, guid=None, dynamic=False): """ Read tool configuration from the element `root` and fill in `self`. """ self.profile = float(tool_source.parse_profile()) # Get the UNIQUE id for the tool self.old_id = tool_source.parse_id() if guid is None: self.id = self.old_id else: self.id = guid if not dynamic and not self.id: raise Exception(f"Missing tool 'id' for tool at '{tool_source}'") profile = Version(str(self.profile)) if self.app.name == "galaxy" and profile >= Version("16.04") and Version(VERSION_MAJOR) < profile: message = f"The tool [{self.id}] targets version {self.profile} of Galaxy, you should upgrade Galaxy to ensure proper functioning of this tool." raise Exception(message) self.python_template_version = tool_source.parse_python_template_version() if self.python_template_version is None: # If python_template_version not specified we assume tools with profile versions >= 19.05 are python 3 ready if profile >= Version("19.05"): self.python_template_version = Version("3.5") else: self.python_template_version = Version("2.7") # Get the (user visible) name of the tool self.name = tool_source.parse_name() if not self.name and dynamic: self.name = self.id if not dynamic and not self.name: raise Exception(f"Missing tool 'name' for tool with id '{self.id}' at '{tool_source}'") self.version = tool_source.parse_version() if not self.version: if profile < Version("16.04"): # For backward compatibility, some tools may not have versions yet. self.version = "1.0.0" else: raise Exception(f"Missing tool 'version' for tool with id '{self.id}' at '{tool_source}'") # Legacy feature, ignored by UI. self.force_history_refresh = False self.display_interface = tool_source.parse_display_interface(default=self.display_interface) self.require_login = tool_source.parse_require_login(self.require_login) request_param_translation_elem = tool_source.parse_request_param_translation_elem() if request_param_translation_elem is not None: # Load input translator, used by datasource tools to change names/values of incoming parameters self.input_translator = ToolInputTranslator.from_element(request_param_translation_elem) else: self.input_translator = None self.parse_command(tool_source) self.environment_variables = self.parse_environment_variables(tool_source) self.tmp_directory_vars = tool_source.parse_tmp_directory_vars() home_target = tool_source.parse_home_target() tmp_target = tool_source.parse_tmp_target() # If a tool explicitly sets one of these variables just respect that and turn off # explicit processing by Galaxy. for environment_variable in self.environment_variables: if environment_variable.get("name") == "HOME": home_target = None continue for tmp_directory_var in self.tmp_directory_vars: if environment_variable.get("name") == tmp_directory_var: tmp_target = None break self.home_target = home_target self.tmp_target = tmp_target self.docker_env_pass_through = tool_source.parse_docker_env_pass_through() if self.environment_variables: if not self.docker_env_pass_through: self.docker_env_pass_through = [] self.docker_env_pass_through.extend(x["name"] for x in self.environment_variables) # Parameters used to build URL for redirection to external app redirect_url_params = tool_source.parse_redirect_url_params_elem() if redirect_url_params is not None and redirect_url_params.text is not None: # get rid of leading / trailing white space redirect_url_params = redirect_url_params.text.strip() # Replace remaining white space with something we can safely split on later # when we are building the params self.redirect_url_params = redirect_url_params.replace(" ", "**^**") else: self.redirect_url_params = "" # Short description of the tool self.description = tool_source.parse_description() # Versioning for tools self.version_string_cmd = None if (version_command := tool_source.parse_version_command()) is not None: self.version_string_cmd = version_command.strip() version_cmd_interpreter = tool_source.parse_version_command_interpreter() if version_cmd_interpreter: executable = self.version_string_cmd.split()[0] abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) command_line = self.version_string_cmd.replace(executable, abs_executable, 1) self.version_string_cmd = f"{version_cmd_interpreter} {command_line}" # Parallelism for tasks, read from tool config. self.parallelism = tool_source.parse_parallelism() # Get JobToolConfiguration(s) valid for this particular Tool. At least # a 'default' will be provided that uses the 'default' handler and # 'default' destination. I thought about moving this to the # job_config, but it makes more sense to store here. -nate if self.id: self_ids = [self.id.lower()] if self.old_id and self.old_id != self.id: # Handle toolshed guids self_ids = [self.id.lower(), self.id.lower().rsplit("/", 1)[0], self.old_id.lower()] else: self_ids = [] self.all_ids = self_ids # In the toolshed context, there is no job config. if hasattr(self.app, "job_config"): # Order of this list must match documentation in job_conf.sample_advanced.yml tool_classes = [] if self.tool_type_local: tool_classes.append("local") elif self.old_id in ["upload1", "__DATA_FETCH__"]: tool_classes.append("local") if self.requires_galaxy_python_environment: tool_classes.append("requires_galaxy") self.job_tool_configurations = self.app.job_config.get_job_tool_configurations(self_ids, tool_classes) # Is this a 'hidden' tool (hidden in tool menu) self.hidden = tool_source.parse_hidden() self.license = tool_source.parse_license() self.creator = tool_source.parse_creator() self.parse_inputs(self.tool_source) self.parse_outputs(self.tool_source) self.raw_help = None if self.app.is_webapp: self.raw_help = self.__get_help_with_images(tool_source.parse_help()) self.parse_tests() self.__parse_legacy_features(tool_source) # Load any tool specific options (optional) self.options = _Options( **dict( sanitize=tool_source.parse_sanitize(), refresh=tool_source.parse_refresh(), ) ) # Read in name of galaxy.json metadata file and how to parse it. self.provided_metadata_file = tool_source.parse_provided_metadata_file() self.provided_metadata_style = tool_source.parse_provided_metadata_style() # Parse result handling for tool exit codes and stdout/stderr messages: self.parse_stdio(tool_source) self.strict_shell = tool_source.parse_strict_shell() # Any extra generated config files for the tool self.__parse_config_files(tool_source) # Action action = tool_source.parse_action_module() if action is None: self.tool_action = self.default_tool_action() else: module, cls = action mod = __import__(module, globals(), locals(), [cls]) self.tool_action = getattr(mod, cls)() if getattr(self.tool_action, "requires_js_runtime", False): try: expressions.find_engine(self.app.config) except Exception: message = REQUIRES_JS_RUNTIME_MESSAGE % self.id or getattr(self, "uuid", "unknown tool id") raise Exception(message) # Requirements (dependencies) requirements, containers, resource_requirements = tool_source.parse_requirements_and_containers() self.requirements = requirements self.containers = containers self.resource_requirements = resource_requirements required_files = tool_source.parse_required_files() if required_files is None: old_id = self.old_id if old_id in IMPLICITLY_REQUIRED_TOOL_FILES: lineage_requirement = IMPLICITLY_REQUIRED_TOOL_FILES[old_id] lineage_requirement_until = lineage_requirement.get("version") if lineage_requirement_until is None or self.version_object < lineage_requirement_until: required_files = RequiredFiles.from_dict(lineage_requirement["required"]) self.required_files = required_files self.citations = self._parse_citations(tool_source) biotools_metadata_source = getattr(self.app, "biotools_metadata_source", None) if biotools_metadata_source: ontology_data = expand_ontology_data( tool_source, self.all_ids, biotools_metadata_source, ) self.xrefs = ontology_data.xrefs self.edam_operations = ontology_data.edam_operations self.edam_topics = ontology_data.edam_topics else: self.xrefs = [] self.edam_operations = None self.edam_topics = None self.__parse_trackster_conf(tool_source) # Record macro paths so we can reload a tool if any of its macro has changes self._macro_paths = tool_source.macro_paths self.ports = tool_source.parse_interactivetool() self._is_workflow_compatible = self.check_workflow_compatible(self.tool_source)
def __parse_legacy_features(self, tool_source): self.code_namespace: Dict[str, str] = {} self.hook_map: Dict[str, str] = {} self.uihints: Dict[str, str] = {} if not hasattr(tool_source, "root"): return # TODO: Move following logic into XmlToolSource. root = tool_source.root # Load any tool specific code (optional) Edit: INS 5/29/2007, # allow code files to have access to the individual tool's # "module" if it has one. Allows us to reuse code files, etc. for code_elem in root.findall("code"): for hook_elem in code_elem.findall("hook"): for key, value in hook_elem.items(): # map hook to function self.hook_map[key] = value file_name = code_elem.get("file") code_path = os.path.join(self.tool_dir, file_name) if self._allow_code_files: with open(code_path) as f: code_string = f.read() try: compiled_code = compile(code_string, code_path, "exec") exec(compiled_code, self.code_namespace) except Exception: if ( refactoring_tool and self.python_template_version and self.python_template_version.release[0] < 3 ): # Could be a code file that uses python 2 syntax translated_code = str( refactoring_tool.refactor_string(code_string, name="auto_translated_code_file") ) compiled_code = compile(translated_code, f"futurized_{code_path}", "exec") exec(compiled_code, self.code_namespace) else: raise # User interface hints if (uihints_elem := root.find("uihints")) is not None: for key, value in uihints_elem.attrib.items(): self.uihints[key] = value def __parse_config_files(self, tool_source): self.config_files = [] if not hasattr(tool_source, "root"): return root = tool_source.root if (conf_parent_elem := root.find("configfiles")) is not None: inputs_elem = conf_parent_elem.find("inputs") if inputs_elem is not None: name = inputs_elem.get("name") filename = inputs_elem.get("filename", None) format = inputs_elem.get("format", "json") data_style = inputs_elem.get("data_style", "skip") content = dict(format=format, handle_files=data_style, type="inputs") self.config_files.append((name, filename, content)) file_sources_elem = conf_parent_elem.find("file_sources") if file_sources_elem is not None: name = file_sources_elem.get("name") filename = file_sources_elem.get("filename", None) content = dict(type="files") self.config_files.append((name, filename, content)) for conf_elem in conf_parent_elem.findall("configfile"): name = conf_elem.get("name") filename = conf_elem.get("filename", None) content = conf_elem.text self.config_files.append((name, filename, content)) def __parse_trackster_conf(self, tool_source): self.trackster_conf = None if not hasattr(tool_source, "root"): return # Trackster configuration. if (trackster_conf := tool_source.root.find("trackster_conf")) is not None: self.trackster_conf = TracksterConfig.parse(trackster_conf)
[docs] def parse_tests(self): if tests_source := self.tool_source: try: self.__tests = json.dumps([t.to_dict() for t in parse_tests(self, tests_source)], indent=None) except Exception: self.__tests = None log.exception("Failed to parse tool tests for tool '%s'", self.id)
@property def tests(self): if self.__tests: return [ToolTestDescription(d) for d in json.loads(self.__tests)] return None @property def _repository_dir(self): """If tool shed installed tool, the base directory of the repository installed.""" repository_base_dir = None if getattr(self, "tool_shed", None): tool_dir = Path(self.tool_dir) for repo_dir in itertools.chain([tool_dir], tool_dir.parents): if repo_dir.name == self.repository_name and repo_dir.parent.name == self.installed_changeset_revision: return str(repo_dir) else: log.error(f"Problem finding repository dir for tool '{self.id}'") return repository_base_dir
[docs] def test_data_path(self, filename): test_data = None if repository_dir := self._repository_dir: test_data = self.__walk_test_data(dir=repository_dir, filename=filename) else: if self.tool_dir: tool_dir = self.tool_dir if isinstance(self, DataManagerTool): tool_dir = os.path.dirname(self.tool_dir) test_data = self.__walk_test_data(tool_dir, filename=filename) if not test_data: # Fallback to Galaxy test data directory for builtin tools, tools # under development, and some older ToolShed published tools that # used stock test data. try: test_data = self.app.test_data_resolver.get_filename(filename) except TestDataNotFoundError: test_data = None return test_data
def __walk_test_data(self, dir, filename): for root, dirs, _ in os.walk(dir): if ".hg" in dirs: dirs.remove(".hg") if "test-data" in dirs: test_data_dir = os.path.join(root, "test-data") result = os.path.abspath(os.path.join(test_data_dir, filename)) if not in_directory(result, test_data_dir): # Don't raise an explicit exception and reveal details about what # files are or are not on the path, simply return None and let the # API raise a 404. return None else: if os.path.exists(result): return result
[docs] def tool_provided_metadata(self, job_wrapper): meta_file = os.path.join(job_wrapper.tool_working_directory, self.provided_metadata_file) return parse_tool_provided_metadata( meta_file, provided_metadata_style=self.provided_metadata_style, job_wrapper=job_wrapper )
[docs] def parse_command(self, tool_source): """ """ # Command line (template). Optional for tools that do not invoke a local program if (command := tool_source.parse_command()) is not None: self.command = command.lstrip() # get rid of leading whitespace # Must pre-pend this AFTER processing the cheetah command template self.interpreter = tool_source.parse_interpreter() else: self.command = "" self.interpreter = None
[docs] def parse_environment_variables(self, tool_source): return tool_source.parse_environment_variables()
[docs] def parse_inputs(self, tool_source: ToolSource): """ Parse the "<inputs>" element and create appropriate `ToolParameter` s. This implementation supports multiple pages and grouping constructs. """ # Load parameters (optional) self.inputs: Dict[str, Union[Group, ToolParameter]] = {} pages = tool_source.parse_input_pages() enctypes: Set[str] = set() if pages.inputs_defined: if hasattr(pages, "input_elem"): input_elem = pages.input_elem # Handle properties of the input form self.check_values = string_as_bool(input_elem.get("check_values", self.check_values)) self.nginx_upload = string_as_bool(input_elem.get("nginx_upload", self.nginx_upload)) self.action = input_elem.get("action", self.action) # If we have an nginx upload, save the action as a tuple instead of # a string. The actual action needs to get url_for run to add any # prefixes, and we want to avoid adding the prefix to the # nginx_upload_path. if self.nginx_upload and self.app.config.nginx_upload_path and not isinstance(self.action, tuple): if "?" in unquote_plus(self.action): raise Exception( "URL parameters in a non-default tool action can not be used " "in conjunction with nginx upload. Please convert them to " "hidden POST parameters" ) self.action = ( f"{self.app.config.nginx_upload_path}?nginx_redir=", unquote_plus(self.action), ) self.target = input_elem.get("target", self.target) self.method = input_elem.get("method", self.method) # Parse the actual parameters # Handle multiple page case for page_source in pages.page_sources: inputs = self.parse_input_elem(page_source, enctypes) display = page_source.parse_display() self.inputs_by_page.append(inputs) self.inputs.update(inputs) self.display_by_page.append(display) else: self.inputs_by_page.append(self.inputs) self.display_by_page.append(None) self.display = self.display_by_page[0] self.npages = len(self.inputs_by_page) self.last_page = len(self.inputs_by_page) - 1 self.has_multiple_pages = bool(self.last_page) # Determine the needed enctype for the form if len(enctypes) == 0: self.enctype = "application/x-www-form-urlencoded" elif len(enctypes) == 1: self.enctype = enctypes.pop() else: raise Exception(f"Conflicting required enctypes: {str(enctypes)}") # Check if the tool either has no parameters or only hidden (and # thus hardcoded) FIXME: hidden parameters aren't # parameters at all really, and should be passed in a different # way, making this check easier. template_macros = {} if isinstance(tool_source, XmlToolSource): template_macros = template_macro_params(tool_source.root) self.template_macro_params = template_macros for param in self.inputs.values(): if not isinstance(param, (HiddenToolParameter, BaseURLToolParameter)): self.input_required = True break
[docs] def parse_outputs(self, tool_source): """ Parse <outputs> elements and fill in self.outputs (keyed by name) """ self.outputs, self.output_collections = tool_source.parse_outputs(self)
# TODO: Include the tool's name in any parsing warnings.
[docs] def parse_stdio(self, tool_source: ToolSource): """ Parse <stdio> element(s) and fill in self.return_codes, self.stderr_rules, and self.stdout_rules. Return codes have a range and an error type (fault or warning). Stderr and stdout rules have a regular expression and an error level (fault or warning). """ exit_codes, regexes = tool_source.parse_stdio() self.stdio_exit_codes = exit_codes self.stdio_regexes = regexes
def _parse_citations(self, tool_source): # TODO: Move following logic into ToolSource abstraction. if not hasattr(tool_source, "root"): return [] root = tool_source.root citations: List[str] = [] citations_elem = root.find("citations") if citations_elem is None: return citations for citation_elem in citations_elem: if citation_elem.tag != "citation": pass citations_manager = getattr(self.app, "citations_manager", None) if citations_manager is not None: citation = citations_manager.parse_citation(citation_elem) if citation: citations.append(citation) return citations
[docs] def parse_input_elem( self, page_source: PageSource, enctypes, context=None ) -> Dict[str, Union[Group, ToolParameter]]: """ Parse a parent element whose children are inputs -- these could be groups (repeat, conditional) or param elements. Groups will be parsed recursively. """ rval: Dict[str, Union[Group, ToolParameter]] = {} context = ExpressionContext(rval, context) for input_source in page_source.parse_input_sources(): # Repeat group input_type = input_source.parse_input_type() if input_type == "repeat": group_r = Repeat() group_r.name = input_source.get("name") group_r.title = input_source.get("title") group_r.help = input_source.get("help", None) page_source = input_source.parse_nested_inputs_source() group_r.inputs = self.parse_input_elem(page_source, enctypes, context) group_r.default = int(input_source.get("default", 0)) group_r.min = int(input_source.get("min", 0)) # Use float instead of int so that math.inf can be used for no max group_r.max = float(input_source.get("max", math.inf)) assert group_r.min <= group_r.max, ValueError( f"Tool with id '{self.id}': min repeat count must be less-than-or-equal to the max." ) # Force default to be within min-max range group_r.default = cast(int, min(max(group_r.default, group_r.min), group_r.max)) rval[group_r.name] = group_r elif input_type == "conditional": group_c = Conditional() group_c.name = input_source.get("name") group_c.value_ref = input_source.get("value_ref", None) group_c.value_ref_in_group = input_source.get_bool("value_ref_in_group", True) value_from = input_source.get("value_from", None) if value_from: value_from = value_from.split(":") temp_value_from = locals().get(value_from[0]) group_c.test_param = rval[group_c.value_ref] assert isinstance(group_c.test_param, ToolParameter) group_c.test_param.refresh_on_change = True for attr in value_from[1].split("."): temp_value_from = getattr(temp_value_from, attr) assert callable(temp_value_from) group_c.value_from = temp_value_from for case_value, case_inputs in group_c.value_from(context, group_c, self).items(): case = ConditionalWhen() case.value = case_value if case_inputs: page_source = XmlPageSource(XML(f"<when>{case_inputs}</when>")) case.inputs = self.parse_input_elem(page_source, enctypes, context) else: case.inputs = {} group_c.cases.append(case) else: # Should have one child "input" which determines the case test_param_input_source = input_source.parse_test_input_source() group_c.test_param = self.parse_param_elem(test_param_input_source, enctypes, context) assert isinstance(group_c.test_param, (BooleanToolParameter, SelectToolParameter)) if group_c.test_param.optional: log.debug( f"Tool with id '{self.id}': declares a conditional test parameter as optional, this is invalid and will be ignored." ) group_c.test_param.optional = False possible_cases = list( group_c.test_param.legal_values ) # store possible cases, undefined whens will have no inputs # Must refresh when test_param changes group_c.test_param.refresh_on_change = True # And a set of possible cases for value, case_inputs_source in input_source.parse_when_input_sources(): case = ConditionalWhen() case.value = value case.inputs = self.parse_input_elem(case_inputs_source, enctypes, context) group_c.cases.append(case) try: possible_cases.remove(case.value) except Exception: log.debug( "Tool with id '%s': a when tag has been defined for '%s (%s) --> %s', but does not appear to be selectable.", self.id, group_c.name, group_c.test_param.name, case.value, ) for unspecified_case in possible_cases: log.warning( "Tool with id '%s': a when tag has not been defined for '%s (%s) --> %s', assuming empty inputs.", self.id, group_c.name, group_c.test_param.name, unspecified_case, ) case = ConditionalWhen() case.value = unspecified_case case.inputs = {} group_c.cases.append(case) rval[group_c.name] = group_c elif input_type == "section": group_s = Section() group_s.name = input_source.get("name") group_s.title = input_source.get("title") group_s.help = input_source.get("help", None) group_s.expanded = input_source.get_bool("expanded", False) page_source = input_source.parse_nested_inputs_source() group_s.inputs = self.parse_input_elem(page_source, enctypes, context) rval[group_s.name] = group_s elif input_type == "upload_dataset": elem = input_source.elem() group_u = UploadDataset() group_u.name = elem.get("name") group_u.title = elem.get("title") group_u.file_type_name = elem.get("file_type_name", group_u.file_type_name) group_u.default_file_type = elem.get("default_file_type", group_u.default_file_type) group_u.metadata_ref = elem.get("metadata_ref", group_u.metadata_ref) file_type_param = rval.get(group_u.file_type_name) if file_type_param: assert isinstance(file_type_param, ToolParameter) file_type_param.refresh_on_change = True group_page_source = XmlPageSource(elem) group_u.inputs = self.parse_input_elem(group_page_source, enctypes, context) rval[group_u.name] = group_u elif input_type == "param": param = self.parse_param_elem(input_source, enctypes, context) rval[param.name] = param if isinstance(param, (SelectTagParameter, ColumnListParameter)): param.ref_input = context[param.data_ref] self.input_params.append(param) return rval
[docs] def parse_param_elem(self, input_source: InputSource, enctypes, context) -> ToolParameter: """ Parse a single "<param>" element and return a ToolParameter instance. Also, if the parameter has a 'required_enctype' add it to the set enctypes. """ param = ToolParameter.build(self, input_source) if param_enctype := param.get_required_enctype(): enctypes.add(param_enctype) # If parameter depends on any other paramters, we must refresh the # form when it changes for name in param.get_dependencies(): # Let it throw exception, but give some hint what the problem might be if name not in context: log.error(f"Tool with id '{self.id}': Could not find dependency '{name}' of parameter '{param.name}'") context[name].refresh_on_change = True return param
[docs] def populate_resource_parameters(self, tool_source): root = getattr(tool_source, "root", None) if ( root is not None and hasattr(self.app, "job_config") and hasattr(self.app.job_config, "get_tool_resource_xml") ): resource_xml = self.app.job_config.get_tool_resource_xml(root.get("id", "").lower(), self.tool_type) if resource_xml is not None: inputs = root.find("inputs") if inputs is None: inputs = parse_xml_string("<inputs/>") root.append(inputs) inputs.append(resource_xml)
[docs] def populate_tool_shed_info(self, tool_shed_repository): if tool_shed_repository: self.tool_shed = tool_shed_repository.tool_shed assert self.tool_shed self.repository_name = tool_shed_repository.name self.repository_owner = tool_shed_repository.owner self.changeset_revision = tool_shed_repository.changeset_revision self.installed_changeset_revision = tool_shed_repository.installed_changeset_revision self.sharable_url = get_tool_shed_repository_url( self.app, self.tool_shed, self.repository_owner, self.repository_name )
@property def help(self) -> Template: try: return Template( rst_to_html(self.raw_help), input_encoding="utf-8", default_filters=["decode.utf8"], encoding_errors="replace", ) except Exception: log.info("Exception while parsing help for tool with id '%s'", self.id) return Template("", input_encoding="utf-8") @property def biotools_reference(self) -> Optional[str]: """Return a bio.tools ID if external reference to it is found. If multiple bio.tools references are found, return just the first one. """ return biotools_reference(self.xrefs) def __get_help_with_images(self, raw_help: Optional[str]): help_text = raw_help or "" try: if help_text.find(".. image:: ") >= 0 and (self.tool_shed_repository or self.repository_id): return set_image_paths( self.app, help_text, encoded_repository_id=self.repository_id, tool_shed_repository=self.tool_shed_repository, tool_id=self.old_id, tool_version=self.version, ) except Exception: log.exception( "Exception in parse_help, so images may not be properly displayed for tool with id '%s'", self.id ) return help_text
[docs] def find_output_def(self, name): # name is JobToOutputDatasetAssociation name. # TODO: to defensive, just throw IndexError and catch somewhere # up that stack. if ToolOutputCollectionPart.is_named_collection_part_name(name): collection_name, part = ToolOutputCollectionPart.split_output_name(name) collection_def = self.output_collections.get(collection_name, None) if not collection_def: return None return collection_def.outputs.get(part, None) else: return self.outputs.get(name, None)
@property def is_workflow_compatible(self): return self._is_workflow_compatible
[docs] def check_workflow_compatible(self, tool_source): """ Determine if a tool can be used in workflows. External tools and the upload tool are currently not supported by workflows. """ # Multiple page tools are not supported -- we're eliminating most # of these anyway if self.has_multiple_pages: return False # This is probably the best bet for detecting external web tools # right now if self.tool_type.startswith("data_source"): return False if hasattr(tool_source, "root"): root = tool_source.root if not string_as_bool(root.get("workflow_compatible", "True")): return False # TODO: Anyway to capture tools that dynamically change their own # outputs? return True
[docs] def new_state(self, trans): """ Create a new `DefaultToolState` for this tool. It will be initialized with default values for inputs. Grouping elements are filled in recursively. """ state = DefaultToolState() state.initialize(trans, self) return state
[docs] def get_param(self, key): """ Returns the parameter named `key` or None if there is no such parameter. """ return self.inputs.get(key, None)
[docs] def get_hook(self, name): """ Returns an object from the code file referenced by `code_namespace` (this will normally be a callable object) """ if self.code_namespace: # Try to look up hook in self.hook_map, otherwise resort to default if name in self.hook_map and self.hook_map[name] in self.code_namespace: return self.code_namespace[self.hook_map[name]] elif name in self.code_namespace: return self.code_namespace[name] return None
[docs] def visit_inputs(self, values, callback): """ Call the function `callback` on each parameter of this tool. Visits grouping parameters recursively and constructs unique prefixes for each nested set of The callback method is then called as: `callback( level_prefix, parameter, parameter_value )` """ # HACK: Yet another hack around check_values -- WHY HERE? if self.check_values: visit_input_values(self.inputs, values, callback)
[docs] def expand_incoming(self, trans, incoming, request_context, input_format="legacy"): rerun_remap_job_id = None if "rerun_remap_job_id" in incoming: try: rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) except Exception as exception: log.error(str(exception)) raise exceptions.MessageException( "Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id ) set_dataset_matcher_factory(request_context, self) # Fixed set of input parameters may correspond to any number of jobs. # Expand these out to individual parameters for given jobs (tool executions). expanded_incomings, collection_info = expand_meta_parameters(trans, self, incoming) # Remapping a single job to many jobs doesn't make sense, so disable # remap if multi-runs of tools are being used. produces_multiple_jobs = len(expanded_incomings) > 1 if rerun_remap_job_id and produces_multiple_jobs: raise exceptions.RequestParameterInvalidException( f"Failure executing tool with id '{self.id}' (cannot create multiple jobs when remapping existing job)." ) if self.input_translator and produces_multiple_jobs: raise exceptions.RequestParameterInvalidException( f"Failure executing tool with id '{self.id}' (cannot create multiple jobs with this type of data source tool)." ) # Process incoming data validation_timer = self.app.execution_timer_factory.get_timer( "internals.galaxy.tools.validation", "Validated and populated state for tool request", ) all_errors = [] all_params = [] for expanded_incoming in expanded_incomings: params = {} errors: Dict[str, str] = {} if self.input_translator: self.input_translator.translate(expanded_incoming) if not self.check_values: # If `self.check_values` is false we don't do any checking or # processing on input This is used to pass raw values # through to/from external sites. params = expanded_incoming else: # Update state for all inputs on the current page taking new # values from `incoming`. populate_state( request_context, self.inputs, expanded_incoming, params, errors, simple_errors=False, input_format=input_format, ) # If the tool provides a `validate_input` hook, call it. validate_input = self.get_hook("validate_input") if validate_input: # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py legacy_non_dce_params = { k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v for k, v in params.items() } validate_input(request_context, errors, legacy_non_dce_params, self.inputs) all_errors.append(errors) all_params.append(params) unset_dataset_matcher_factory(request_context) log.info(validation_timer) return all_params, all_errors, rerun_remap_job_id, collection_info
[docs] def handle_input( self, trans, incoming, history=None, use_cached_job=False, preferred_object_store_id: Optional[str] = None, input_format="legacy", ): """ Process incoming parameters for this tool from the dict `incoming`, update the tool state (or create if none existed), and either return to the form or execute the tool (only if 'execute' was clicked and there were no errors). """ request_context = proxy_work_context_for_history(trans, history=history) all_params, all_errors, rerun_remap_job_id, collection_info = self.expand_incoming( trans=trans, incoming=incoming, request_context=request_context, input_format=input_format ) # If there were errors, we stay on the same page and display them self.handle_incoming_errors(all_errors) mapping_params = MappingParameters(incoming, all_params) completed_jobs: Dict[int, Optional[model.Job]] = {} for i, param in enumerate(all_params): if use_cached_job: completed_jobs[i] = self.job_search.by_tool_input( trans=trans, tool_id=self.id, tool_version=self.version, param=param, param_dump=self.params_to_strings(param, self.app, nested=True), job_state=None, ) else: completed_jobs[i] = None execution_tracker = execute_job( trans, self, mapping_params, history=request_context.history, rerun_remap_job_id=rerun_remap_job_id, preferred_object_store_id=preferred_object_store_id, collection_info=collection_info, completed_jobs=completed_jobs, ) # Raise an exception if there were jobs to execute and none of them were submitted, # if at least one is submitted or there are no jobs to execute - return aggregate # information including per-job errors. Arguably we should just always return the # aggregate information - we just haven't done that historically. raise_execution_exception = not execution_tracker.successful_jobs and len(all_params) > 0 if raise_execution_exception: raise exceptions.MessageException(execution_tracker.execution_errors[0]) return dict( out_data=execution_tracker.output_datasets, num_jobs=len(execution_tracker.successful_jobs), job_errors=execution_tracker.execution_errors, jobs=execution_tracker.successful_jobs, output_collections=execution_tracker.output_collections, implicit_collections=execution_tracker.implicit_collections, )
[docs] def handle_incoming_errors(self, all_errors): if any(all_errors): # simple param_key -> message string for tool form. err_data = {key: unicodify(value) for d in all_errors for (key, value) in d.items()} param_errors = {} for d in all_errors: for key, value in d.items(): if hasattr(value, "to_dict"): value_obj = value.to_dict() else: value_obj = {"message": unicodify(value)} param_errors[key] = value_obj raise exceptions.RequestParameterInvalidException( ", ".join(msg for msg in err_data.values()), err_data=err_data, param_errors=param_errors )
[docs] def handle_single_execution( self, trans, rerun_remap_job_id, execution_slice, history, execution_cache=None, completed_job=None, collection_info=None, job_callback=None, preferred_object_store_id=None, flush_job=True, skip=False, ): """ Return a pair with whether execution is successful as well as either resulting output data or an error message indicating the problem. """ try: rval = self.execute( trans, incoming=execution_slice.param_combination, history=history, rerun_remap_job_id=rerun_remap_job_id, execution_cache=execution_cache, dataset_collection_elements=execution_slice.dataset_collection_elements, completed_job=completed_job, collection_info=collection_info, job_callback=job_callback, preferred_object_store_id=preferred_object_store_id, flush_job=flush_job, skip=skip, ) job = rval[0] out_data = rval[1] if len(rval) > 2: execution_slice.history = rval[2] except (webob.exc.HTTPFound, exceptions.MessageException) as e: # if it's a webob redirect exception, pass it up the stack raise e except ToolInputsNotReadyException as e: return False, e except Exception as e: log.exception("Exception caught while attempting to execute tool with id '%s':", self.id) message = f"Error executing tool with id '{self.id}': {unicodify(e)}" return False, message if isinstance(out_data, dict): return job, list(out_data.items()) else: if isinstance(out_data, str): message = out_data else: message = f"Failure executing tool with id '{self.id}' (invalid data returned from tool execution)" return False, message
@property def params_with_missing_data_table_entry(self): """ Return all parameters that are dynamically generated select lists whose options require an entry not currently in the tool_data_table_conf.xml file. """ params = [] for input_param in self.input_params: if isinstance(input_param, SelectToolParameter) and input_param.is_dynamic: options = input_param.options if options and options.missing_tool_data_table_name and input_param not in params: params.append(input_param) return params @property def params_with_missing_index_file(self): """ Return all parameters that are dynamically generated select lists whose options refer to a missing .loc file. """ params = [] for input_param in self.input_params: if isinstance(input_param, SelectToolParameter) and input_param.is_dynamic: options = input_param.options if ( options and options.tool_data_table and options.tool_data_table.missing_index_file and input_param not in params ): params.append(input_param) return params
[docs] def get_static_param_values(self, trans): """ Returns a map of parameter names and values if the tool does not require any user input. Will raise an exception if any parameter does require input. """ args = {} for key, param in self.inputs.items(): # BaseURLToolParameter is now a subclass of HiddenToolParameter, so # we must check if param is a BaseURLToolParameter first if isinstance(param, BaseURLToolParameter): args[key] = param.get_initial_value(trans, None) elif isinstance(param, HiddenToolParameter): args[key] = model.User.expand_user_properties(trans.user, param.value) else: args[key] = param.get_initial_value(trans, None) return args
[docs] def execute(self, trans, incoming=None, set_output_hid=True, history=None, **kwargs): """ Execute the tool using parameter values in `incoming`. This just dispatches to the `ToolAction` instance specified by `self.tool_action`. In general this will create a `Job` that when run will build the tool's outputs, e.g. `DefaultToolAction`. """ if incoming is None: incoming = {} try: return self.tool_action.execute( self, trans, incoming=incoming, set_output_hid=set_output_hid, history=history, **kwargs ) except exceptions.ToolExecutionError as exc: job = exc.job job_id = UNKNOWN if job is not None: job.mark_failed(info=exc.err_msg, blurb=exc.err_code.default_error_message) job_id = job.id log.error("Tool execution failed for job: %s", job_id) raise
[docs] def params_to_strings(self, params, app, nested=False): return params_to_strings(self.inputs, params, app, nested)
[docs] def params_from_strings(self, params, app, ignore_errors=False): return params_from_strings(self.inputs, params, app, ignore_errors)
[docs] def check_and_update_param_values(self, values, trans, update_values=True, workflow_building_mode=False): """ Check that all parameters have values, and fill in with default values where necessary. This could be called after loading values from a database in case new parameters have been added. """ messages = {} request_context = proxy_work_context_for_history(trans, workflow_building_mode=workflow_building_mode) def validate_inputs(input, value, error, parent, context, prefixed_name, prefixed_label, **kwargs): if not error: value, error = check_param(request_context, input, value, context) if error: if update_values and not hasattr(input, "data_ref"): try: previous_value = value value = input.get_initial_value(request_context, context) if not prefixed_name.startswith("__"): messages[prefixed_name] = ( error if previous_value == value else f"{error} Using default: '{value}'." ) parent[input.name] = value except Exception: messages[prefixed_name] = f"Attempt to replace invalid value for '{prefixed_label}' failed." else: messages[prefixed_name] = error visit_input_values(self.inputs, values, validate_inputs) return messages
[docs] def build_dependency_cache(self, **kwds): if isinstance(self.app.toolbox.dependency_manager, CachedDependencyManager): self.app.toolbox.dependency_manager.build_cache( requirements=self.requirements, installed_tool_dependencies=self.installed_tool_dependencies, tool_dir=self.tool_dir, job_directory=None, metadata=False, tool_instance=self, **kwds, )
[docs] def build_dependency_shell_commands(self, job_directory=None, metadata=False): """ Return a list of commands to be run to populate the current environment to include this tools requirements. """ return self.app.toolbox.dependency_manager.dependency_shell_commands( requirements=self.requirements, installed_tool_dependencies=self.installed_tool_dependencies, tool_dir=self.tool_dir, job_directory=job_directory, preserve_python_environment=self.requires_galaxy_python_environment, metadata=metadata, tool_instance=self, )
@property def installed_tool_dependencies(self): if self.tool_shed_repository: installed_tool_dependencies = self.tool_shed_repository.tool_dependencies_installed_or_in_error else: installed_tool_dependencies = None return installed_tool_dependencies @property def tool_requirements(self): """ Return all requiremens of type package """ return self.requirements.packages @property def tool_requirements_status(self): """ Return a list of dictionaries for all tool dependencies with their associated status """ return self._view.get_requirements_status({self.id: self.tool_requirements}, self.installed_tool_dependencies) @property def output_discover_patterns(self): # patterns to collect for remote job execution patterns = [] for output in self.outputs.values(): patterns.extend(output.output_discover_patterns) return patterns
[docs] def build_redirect_url_params(self, param_dict): """ Substitute parameter values into self.redirect_url_params """ if not self.redirect_url_params: return redirect_url_params = None # Substituting parameter values into the url params redirect_url_params = fill_template(self.redirect_url_params, context=param_dict) # Remove newlines redirect_url_params = redirect_url_params.replace("\n", " ").replace("\r", " ") return redirect_url_params
[docs] def parse_redirect_url(self, data, param_dict): """ Parse the REDIRECT_URL tool param. Tools that send data to an external application via a redirect must include the following 3 tool params: 1) REDIRECT_URL - the url to which the data is being sent 2) DATA_URL - the url to which the receiving application will send an http post to retrieve the Galaxy data 3) GALAXY_URL - the url to which the external application may post data as a response """ redirect_url = param_dict.get("REDIRECT_URL") redirect_url_params = self.build_redirect_url_params(param_dict) # Add the parameters to the redirect url. We're splitting the param # string on '**^**' because the self.parse() method replaced white # space with that separator. params = redirect_url_params.split("**^**") rup_dict = {} for param in params: p_list = param.split("=") p_name = p_list[0] p_val = p_list[1] rup_dict[p_name] = p_val DATA_URL = param_dict.get("DATA_URL", None) assert DATA_URL is not None, "DATA_URL parameter missing in tool config." DATA_URL += f"/{str(data.id)}/display" redirect_url += f"?DATA_URL={DATA_URL}" # Add the redirect_url_params to redirect_url for p_name in rup_dict: redirect_url += f"&{p_name}={rup_dict[p_name]}" # Add the current user email to redirect_url if data.user: USERNAME = str(data.user.email) else: USERNAME = "Anonymous" redirect_url += f"&USERNAME={USERNAME}" return redirect_url
[docs] def call_hook(self, hook_name, *args, **kwargs): """ Call the custom code hook function identified by 'hook_name' if any, and return the results """ try: code = self.get_hook(hook_name) if code: return code(*args, **kwargs) except Exception as e: original_message = "" if len(e.args): original_message = e.args[0] e.args = (f"Error in '{self.name}' hook '{hook_name}', original message: {original_message}",) raise
[docs] def exec_before_job(self, app, inp_data, out_data, param_dict=None): pass
[docs] def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kwds): pass
[docs] def job_failed(self, job_wrapper, message, exception=False): """ Called when a job has failed """
[docs] def discover_outputs( self, out_data, out_collections, tool_provided_metadata, tool_working_directory, job, input_ext, input_dbkey, inp_data=None, final_job_state="ok", ): """ Find any additional datasets generated by a tool and attach (for cases where number of outputs is not known in advance). """ # given the job_execution import is the only one, probably makes sense to refactor this out # into job_wrapper. tool = self permission_provider = output_collect.PermissionProvider(inp_data, tool.app.security_agent, job) metadata_source_provider = output_collect.MetadataSourceProvider(inp_data) job_context = output_collect.JobContext( tool, tool_provided_metadata, job, tool_working_directory, permission_provider, metadata_source_provider, input_dbkey, object_store=tool.app.object_store, final_job_state=final_job_state, flush_per_n_datasets=tool.app.config.flush_per_n_datasets, max_discovered_files=tool.app.config.max_discovered_files, ) collected = output_collect.collect_primary_datasets( job_context, out_data, input_ext, ) output_collect.collect_dynamic_outputs( job_context, out_collections, ) # Return value only used in unit tests. Probably should be returning number of collected # bytes instead? return collected
[docs] def to_archive(self): tool = self tarball_files = [] temp_files = [] with open(os.path.abspath(tool.config_file)) as fh1: tool_xml = fh1.read() # Retrieve tool help images and rewrite the tool's xml into a temporary file with the path # modified to be relative to the repository root. image_found = False if tool.help is not None: tool_help = tool.help._source # Check each line of the rendered tool help for an image tag that points to a location under static/ for help_line in tool_help.split("\n"): image_regex = re.compile(r'img alt="[^"]+" src="\${static_path}/([^"]+)"') matches = re.search(image_regex, help_line) if matches is not None: tool_help_image = matches.group(1) tarball_path = tool_help_image filesystem_path = os.path.abspath(os.path.join(self.app.config.root, "static", tool_help_image)) if os.path.exists(filesystem_path): tarball_files.append((filesystem_path, tarball_path)) image_found = True tool_xml = tool_xml.replace(f"${{static_path}}/{tarball_path}", tarball_path) # If one or more tool help images were found, add the modified tool XML to the tarball instead of the original. if image_found: with tempfile.NamedTemporaryFile(mode="w", suffix=".xml", delete=False) as fh2: new_tool_config = fh2.name fh2.write(tool_xml) tool_tup = (new_tool_config, os.path.split(tool.config_file)[-1]) temp_files.append(new_tool_config) else: tool_tup = (os.path.abspath(tool.config_file), os.path.split(tool.config_file)[-1]) tarball_files.append(tool_tup) # TODO: This feels hacky. tool_command = tool.command.strip().split()[0] tool_path = os.path.dirname(os.path.abspath(tool.config_file)) # Add the tool XML to the tuple that will be used to populate the tarball. if os.path.exists(os.path.join(tool_path, tool_command)): tarball_files.append((os.path.join(tool_path, tool_command), tool_command)) # Find and add macros and code files. for external_file in tool.get_externally_referenced_paths(os.path.abspath(tool.config_file)): external_file_abspath = os.path.abspath(os.path.join(tool_path, external_file)) tarball_files.append((external_file_abspath, external_file)) if os.path.exists(os.path.join(tool_path, "Dockerfile")): tarball_files.append((os.path.join(tool_path, "Dockerfile"), "Dockerfile")) # Find tests, and check them for test data. if (tests := tool.tests) is not None: for test in tests: # Add input file tuples to the list. for input in test.inputs: for input_value in test.inputs[input]: input_filename = str(input_value) input_path = os.path.abspath(os.path.join("test-data", input_filename)) if os.path.exists(input_path): td_tup = (input_path, os.path.join("test-data", input_filename)) tarball_files.append(td_tup) # And add output file tuples to the list. for _, filename, _ in test.outputs: output_filepath = os.path.abspath(os.path.join("test-data", filename)) if os.path.exists(output_filepath): td_tup = (output_filepath, os.path.join("test-data", filename)) tarball_files.append(td_tup) for param in tool.input_params: # Check for tool data table definitions. param_options = getattr(param, "options", None) if param_options is not None: if hasattr(param_options, "tool_data_table"): data_table = param_options.tool_data_table if hasattr(data_table, "filenames"): data_table_definitions = [] for data_table_filename in data_table.filenames: # FIXME: from_shed_config seems to always be False. if not data_table.filenames[data_table_filename]["from_shed_config"]: tar_file = f"{data_table.filenames[data_table_filename]['filename']}.sample" sample_file = os.path.join( data_table.filenames[data_table_filename]["tool_data_path"], tar_file ) # Use the .sample file, if one exists. If not, skip this data table. if os.path.exists(sample_file): tarfile_path, tarfile_name = os.path.split(tar_file) tarfile_path = os.path.join("tool-data", tarfile_name) tarball_files.append((sample_file, tarfile_path)) data_table_definitions.append(data_table.xml_string) if len(data_table_definitions) > 0: # Put the data table definition XML in a temporary file. table_definition = '<?xml version="1.0" encoding="utf-8"?>\n<tables>\n %s</tables>' table_definition = table_definition % "\n".join(data_table_definitions) with tempfile.NamedTemporaryFile(mode="w", delete=False) as fh3: table_conf = fh3.name fh3.write(table_definition) tarball_files.append( (table_conf, os.path.join("tool-data", "tool_data_table_conf.xml.sample")) ) temp_files.append(table_conf) # Create the tarball. with tempfile.NamedTemporaryFile(suffix=".tgz", delete=False) as fh4: tarball_archive = fh4.name tarball = tarfile.open(name=tarball_archive, mode="w:gz") # Add the files from the previously generated list. for fspath, tarpath in tarball_files: tarball.add(fspath, arcname=tarpath) tarball.close() # Delete any temporary files that were generated. for temp_file in temp_files: os.remove(temp_file) return tarball_archive
[docs] def to_dict(self, trans, link_details=False, io_details=False, tool_help=False): """Returns dict of tool.""" # Basic information tool_dict = super().to_dict() tool_dict["edam_operations"] = self.edam_operations tool_dict["edam_topics"] = self.edam_topics tool_dict["hidden"] = self.hidden tool_dict["is_workflow_compatible"] = self.is_workflow_compatible tool_dict["xrefs"] = self.xrefs # Fill in ToolShedRepository info if hasattr(self, "tool_shed") and self.tool_shed: tool_dict["tool_shed_repository"] = { "name": self.repository_name, "owner": self.repository_owner, "changeset_revision": self.changeset_revision, "tool_shed": self.tool_shed, } # If an admin user, expose the path to the actual tool config XML file. if trans.user_is_admin: config_file = None if not self.config_file else os.path.abspath(self.config_file) tool_dict["config_file"] = config_file # Add link details. if link_details: # Add details for creating a hyperlink to the tool. if not isinstance(self, DataSourceTool): link = self.app.url_for(controller="tool_runner", tool_id=self.id) else: link = self.app.url_for(controller="tool_runner", action="data_source_redirect", tool_id=self.id) # Basic information tool_dict.update({"link": link, "min_width": self.uihints.get("minwidth", -1), "target": self.target}) # Add input and output details. if io_details: tool_dict["inputs"] = [input.to_dict(trans) for input in self.inputs.values()] tool_dict["outputs"] = [output.to_dict(app=self.app) for output in self.outputs.values()] tool_dict["panel_section_id"], tool_dict["panel_section_name"] = self.get_panel_section() tool_class = self.__class__ # FIXME: the Tool class should declare directly, instead of ad hoc inspection regular_form = tool_class == Tool or isinstance(self, (DatabaseOperationTool, InteractiveTool)) tool_dict["form_style"] = "regular" if regular_form else "special" if tool_help: # create tool help help_txt = "" if self.help: help_txt = self.help.render( static_path=self.app.url_for("/static"), host_url=self.app.url_for("/", qualified=True) ) help_txt = unicodify(help_txt) tool_dict["help"] = help_txt return tool_dict
[docs] def to_json(self, trans, kwd=None, job=None, workflow_building_mode=False, history=None): """ Recursively creates a tool dictionary containing repeats, dynamic options and updated states. """ if kwd is None: kwd = {} if ( workflow_building_mode is workflow_building_modes.USE_HISTORY or workflow_building_mode is workflow_building_modes.DISABLED ): # We don't need a history when exporting a workflow for the workflow editor or when downloading a workflow history = history or trans.get_history() if history is None and job is not None: history = self.history_manager.get_owned(job.history.id, trans.user, current_history=trans.history) # We can show the tool form if the current user is anonymous and doesn't have a history user = trans.get_user() if history is None and user is not None: raise exceptions.MessageException("History unavailable. Please specify a valid history id") # build request context request_context = proxy_work_context_for_history(trans, history, workflow_building_mode=workflow_building_mode) # load job parameters into incoming tool_message = "" tool_warnings = None if job: try: job_params = job.get_param_values(self.app, ignore_errors=True) tool_warnings = self.check_and_update_param_values(job_params, request_context, update_values=True) self._map_source_to_history(request_context, self.inputs, job_params) tool_message = self._compare_tool_version(job) params_to_incoming(kwd, self.inputs, job_params, self.app) except Exception as e: raise exceptions.MessageException(unicodify(e)) # create parameter object params = Params(kwd, sanitize=False) # do param translation here, used by datasource tools if self.input_translator: self.input_translator.translate(params) set_dataset_matcher_factory(request_context, self) # create tool state state_inputs: Dict[str, str] = {} state_errors: Dict[str, str] = {} populate_state(request_context, self.inputs, params.__dict__, state_inputs, state_errors) # create tool model tool_model = self.to_dict(request_context) tool_model["inputs"] = [] self.populate_model(request_context, self.inputs, state_inputs, tool_model["inputs"]) unset_dataset_matcher_factory(request_context) # create tool help tool_help = "" if self.help: tool_help = self.help.render( static_path=self.app.url_for("/static"), host_url=self.app.url_for("/", qualified=True) ) tool_help = unicodify(tool_help, "utf-8") if isinstance(self.action, tuple): action = self.action[0] + self.app.url_for(self.action[1]) else: action = self.app.url_for(self.action) # update tool model tool_model.update( { "id": self.id, "help": tool_help, "citations": bool(self.citations), "sharable_url": self.sharable_url, "message": tool_message, "warnings": tool_warnings, "versions": self.tool_versions, "requirements": [{"name": r.name, "version": r.version} for r in self.requirements], "errors": state_errors, "tool_errors": self.tool_errors, "state_inputs": params_to_strings(self.inputs, state_inputs, self.app, use_security=True, nested=True), "job_id": trans.security.encode_id(job.id) if job else None, "job_remap": job.remappable() if job else None, "history_id": trans.security.encode_id(history.id) if history else None, "display": self.display_interface, "action": action, "license": self.license, "creator": self.creator, "method": self.method, "enctype": self.enctype, } ) return swap_inf_nan(tool_model)
[docs] def populate_model(self, request_context, inputs, state_inputs, group_inputs, other_values=None): """ Populates the tool model consumed by the client form builder. """ other_values = ExpressionContext(state_inputs, other_values) for input_index, input in enumerate(inputs.values()): tool_dict = None group_state = state_inputs.get(input.name, {}) if input.type == "repeat": tool_dict = input.to_dict(request_context) group_size = len(group_state) tool_dict["cache"] = [None] * group_size group_cache: List[List[str]] = tool_dict["cache"] for i in range(group_size): group_cache[i] = [] self.populate_model(request_context, input.inputs, group_state[i], group_cache[i], other_values) elif input.type == "conditional": tool_dict = input.to_dict(request_context) if "test_param" in tool_dict: test_param = tool_dict["test_param"] test_param["value"] = input.test_param.value_to_basic( group_state.get( test_param["name"], input.test_param.get_initial_value(request_context, other_values) ), self.app, ) test_param["text_value"] = input.test_param.value_to_display_text(test_param["value"]) for i in range(len(tool_dict["cases"])): current_state = {} if i == group_state.get("__current_case__"): current_state = group_state self.populate_model( request_context, input.cases[i].inputs, current_state, tool_dict["cases"][i]["inputs"], other_values, ) elif input.type == "section": tool_dict = input.to_dict(request_context) self.populate_model(request_context, input.inputs, group_state, tool_dict["inputs"], other_values) else: try: initial_value = input.get_initial_value(request_context, other_values) tool_dict = input.to_dict(request_context, other_values=other_values) tool_dict["value"] = input.value_to_basic( state_inputs.get(input.name, initial_value), self.app, use_security=True ) tool_dict["default_value"] = input.value_to_basic(initial_value, self.app, use_security=True) tool_dict["text_value"] = input.value_to_display_text(tool_dict["value"]) except ImplicitConversionRequired: tool_dict = input.to_dict(request_context) # This hack leads client to display a text field tool_dict["textable"] = True except Exception: tool_dict = input.to_dict(request_context) log.exception("tools::to_json() - Skipping parameter expansion '%s'", input.name) if input_index >= len(group_inputs): group_inputs.append(tool_dict) else: group_inputs[input_index] = tool_dict
def _map_source_to_history(self, trans, tool_inputs, params): # Need to remap dataset parameters. Job parameters point to original # dataset used; parameter should be the analygous dataset in the # current history. history = trans.history # Create index for hdas. hda_source_dict = {} for hda in history.datasets: key = f"{hda.hid}_{hda.dataset.id}" hda_source_dict[hda.dataset.id] = hda_source_dict[key] = hda # Ditto for dataset collections. hdca_source_dict = {} for hdca in history.dataset_collections: key = f"{hdca.hid}_{hdca.collection.id}" hdca_source_dict[hdca.collection.id] = hdca_source_dict[key] = hdca # Map dataset or collection to current history def map_to_history(value): id = None source = None if isinstance(value, self.app.model.HistoryDatasetAssociation): id = value.dataset.id source = hda_source_dict elif isinstance(value, self.app.model.HistoryDatasetCollectionAssociation): id = value.collection.id source = hdca_source_dict else: return None if (key := f"{value.hid}_{id}") in source: return source[key] elif id in source: return source[id] else: return None def mapping_callback(input, value, **kwargs): if isinstance(input, DataToolParameter): if isinstance(value, list): values = [] for val in value: new_val = map_to_history(val) if new_val: values.append(new_val) else: values.append(val) return values else: return map_to_history(value) elif isinstance(input, DataCollectionToolParameter): return map_to_history(value) visit_input_values(tool_inputs, params, mapping_callback) def _compare_tool_version(self, job): """ Compares a tool version with the tool version from a job (from ToolRunner). """ tool_id = job.tool_id tool_version = job.tool_version message = "" try: select_field, tools, tool = self.app.toolbox.get_tool_components( tool_id, tool_version=tool_version, get_loaded_tools_by_lineage=False, set_selected=True ) if tool is None: raise exceptions.MessageException( f"This dataset was created by an obsolete tool ({tool_id}). Can't re-run." ) if (self.id != tool_id and self.old_id != tool_id) or self.version != tool_version: if self.id == tool_id: if tool_version: message = f'This job was run with tool version "{tool_version}", which is not available. ' if len(tools) > 1: message += ( "You can re-run the job with the selected tool or choose another version of the tool. " ) else: message += "You can re-run the job with this tool version, which is a different version of the original tool. " else: new_tool_shed_url = f"{tool.sharable_url}/{tool.changeset_revision}/" old_tool_shed_url = get_tool_shed_url_from_tool_shed_registry(self.app, tool_id.split("/repos/")[0]) old_tool_shed_url = f"{old_tool_shed_url}/view/{tool.repository_owner}/{tool.repository_name}/" message = f'This job was run with <a href="{old_tool_shed_url}" target="_blank">tool id "{tool_id}"</a>, version "{tool_version}", which is not available. ' if len(tools) > 1: message += f'You can re-run the job with the selected <a href="{new_tool_shed_url}" target="_blank">tool id "{self.id}"</a> or choose another derivation of the tool. ' else: message += f'You can re-run the job with <a href="{new_tool_shed_url}" target="_blank">tool id "{self.id}"</a>, which is a derivation of the original tool. ' if not self.is_latest_version: message += "There is a newer version of this tool available." except Exception as e: raise exceptions.MessageException(unicodify(e)) return message
[docs] def get_default_history_by_trans(self, trans, create=False): return trans.get_history(create=create)
[docs] @classmethod def get_externally_referenced_paths(self, path): """Return relative paths to externally referenced files by the tool described by file at `path`. External components should not assume things about the structure of tool xml files (this is the tool's responsibility). """ tree = raw_tool_xml_tree(path) root = tree.getroot() external_paths = [] for code_elem in root.findall("code"): external_path = code_elem.get("file") if external_path: external_paths.append(external_path) external_paths.extend(imported_macro_paths(root)) # May also need to load external citation files as well at some point. return external_paths
[docs]class OutputParameterJSONTool(Tool): """ Alternate implementation of Tool that provides parameters and other values JSONified within the contents of an output dataset """ tool_type = "output_parameter_json" def _prepare_json_list(self, param_list): rval = [] for value in param_list: if isinstance(value, dict): rval.append(self._prepare_json_param_dict(value)) elif isinstance(value, list): rval.append(self._prepare_json_list(value)) else: rval.append(str(value)) return rval def _prepare_json_param_dict(self, param_dict): rval = {} for key, value in param_dict.items(): if isinstance(value, MutableMapping): rval[key] = self._prepare_json_param_dict(value) elif isinstance(value, list): rval[key] = self._prepare_json_list(value) else: rval[key] = str(value) return rval
[docs] def exec_before_job(self, app, inp_data, out_data, param_dict=None): if param_dict is None: param_dict = {} json_params = {} json_params["param_dict"] = self._prepare_json_param_dict( param_dict ) # it would probably be better to store the original incoming parameters here, instead of the Galaxy modified ones? json_params["output_data"] = [] json_params["job_config"] = dict( GALAXY_DATATYPES_CONF_FILE=param_dict.get("GALAXY_DATATYPES_CONF_FILE"), GALAXY_ROOT_DIR=param_dict.get("GALAXY_ROOT_DIR"), TOOL_PROVIDED_JOB_METADATA_FILE=self.provided_metadata_file, ) json_filename = None for out_name, data in out_data.items(): # use wrapped dataset to access certain values wrapped_data = param_dict.get(out_name) # allow multiple files to be created file_name = str(wrapped_data) extra_files_path = str(wrapped_data.files_path) data_dict = dict( out_data_name=out_name, ext=data.ext, dataset_id=data.dataset.id, hda_id=data.id, file_name=file_name, extra_files_path=extra_files_path, ) json_params["output_data"].append(data_dict) if json_filename is None: json_filename = file_name if json_filename is None: raise Exception("Must call 'exec_before_job' with 'out_data' containing at least one entry.") with open(json_filename, "w") as out: out.write(json.dumps(json_params))
[docs]class ExpressionTool(Tool): requires_js_runtime = True tool_type = "expression" tool_type_local = True EXPRESSION_INPUTS_NAME = "_expression_inputs_.json"
[docs] def parse_command(self, tool_source): self.command = f"cd ../; {expressions.EXPRESSION_SCRIPT_CALL}" self.interpreter = None self._expression = tool_source.parse_expression().strip()
[docs] def parse_outputs(self, tool_source): # Setup self.outputs and self.output_collections super().parse_outputs(tool_source) # Validate these outputs for expression tools. if len(self.output_collections) != 0: message = "Expression tools may not declare output collections at this time." raise Exception(message) for output in self.outputs.values(): if not hasattr(output, "from_expression"): message = "Expression tools may not declare output datasets at this time." raise Exception(message)
[docs] def exec_before_job(self, app, inp_data, out_data, param_dict=None): super().exec_before_job(app, inp_data, out_data, param_dict=param_dict) local_working_directory = param_dict["__local_working_directory__"] expression_inputs_path = os.path.join(local_working_directory, ExpressionTool.EXPRESSION_INPUTS_NAME) outputs = [] for out_name in out_data.keys(): output_def = self.outputs[out_name] wrapped_data = param_dict.get(out_name) file_name = str(wrapped_data) outputs.append( dict( name=out_name, from_expression=output_def.from_expression, path=file_name, ) ) if param_dict is None: raise Exception("Internal error - param_dict is empty.") job: Dict[str, str] = {} json_wrap(self.inputs, param_dict, self.profile, job, handle_files="OBJECT") expression_inputs = { "job": job, "script": self._expression, "outputs": outputs, } expressions.write_evalute_script(os.path.join(local_working_directory)) with open(expression_inputs_path, "w") as f: json.dump(expression_inputs, f)
[docs] def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kwds): for key, val in self.outputs.items(): if key not in out_data: # Skip filtered outputs continue if val.output_type == "data": with open(out_data[key].get_file_name()) as f: src = json.load(f) if src is None: continue assert isinstance(src, dict), f"Expected dataset 'src' to be a dictionary - actual type is {type(src)}" dataset_id = src["id"] copy_object = None for input_dataset in inp_data.values(): if input_dataset and input_dataset.id == dataset_id: copy_object = input_dataset break if copy_object is None: raise exceptions.MessageException("Failed to find dataset output.") output = out_data[key] # if change_datatype PJA is associated with expression tool output the new output already has # the desired datatype, so we use it. If the extension is "data" there's no change_dataset PJA and # we want to use the existing extension. new_ext = output.extension if output.extension != "data" else copy_object.extension require_metadata_regeneration = copy_object.extension != new_ext output.copy_from(copy_object, include_metadata=not require_metadata_regeneration) output.extension = new_ext if require_metadata_regeneration: if app.config.enable_celery_tasks: from galaxy.celery.tasks import set_metadata output._state = model.Dataset.states.SETTING_METADATA return set_metadata.si( dataset_id=output.id, task_user_id=output.history.user_id, ensure_can_set_metadata=False ) else: # TODO: move exec_after_process into metadata script so this doesn't run on the headnode ? output.init_meta() output.set_meta() output.set_metadata_success_state()
[docs] def parse_environment_variables(self, tool_source): """Setup environment variable for inputs file.""" environmnt_variables_raw = super().parse_environment_variables(tool_source) expression_script_inputs = dict( name="GALAXY_EXPRESSION_INPUTS", template=ExpressionTool.EXPRESSION_INPUTS_NAME, ) environmnt_variables_raw.append(expression_script_inputs) return environmnt_variables_raw
[docs]class DataSourceTool(OutputParameterJSONTool): """ Alternate implementation of Tool for data_source tools -- those that allow the user to query and extract data from another web site. """ tool_type = "data_source" default_tool_action = DataSourceToolAction @property def wants_params_cleaned(self): """Indicates whether received, but undeclared request params should be cleaned.""" if self.profile < 24.0: return False return True def _build_GALAXY_URL_parameter(self): return ToolParameter.build( self, XML(f'<param name="GALAXY_URL" type="baseurl" value="/tool_runner?tool_id={self.id}" />') )
[docs] def parse_inputs(self, tool_source): super().parse_inputs(tool_source) # Open all data_source tools in _top. self.target = "_top" # data_source tools cannot check param values self.check_values = False if "GALAXY_URL" not in self.inputs: self.inputs["GALAXY_URL"] = self._build_GALAXY_URL_parameter() self.inputs_by_page[0]["GALAXY_URL"] = self.inputs["GALAXY_URL"]
[docs] def exec_before_job(self, app, inp_data, out_data, param_dict=None): if param_dict is None: param_dict = {} dbkey = param_dict.get("dbkey") info = param_dict.get("info") data_type = param_dict.get("data_type") name = param_dict.get("name") json_params = {} json_params["param_dict"] = self._prepare_json_param_dict( param_dict ) # it would probably be better to store the original incoming parameters here, instead of the Galaxy modified ones? json_params["output_data"] = [] json_params["job_config"] = dict( GALAXY_DATATYPES_CONF_FILE=param_dict.get("GALAXY_DATATYPES_CONF_FILE"), GALAXY_ROOT_DIR=param_dict.get("GALAXY_ROOT_DIR"), TOOL_PROVIDED_JOB_METADATA_FILE=self.provided_metadata_file, ) json_filename = None for out_name, data in out_data.items(): # use wrapped dataset to access certain values wrapped_data = param_dict.get(out_name) # allow multiple files to be created cur_base_param_name = f"GALAXY|{out_name}|" cur_name = param_dict.get(f"{cur_base_param_name}name", name) cur_dbkey = param_dict.get(f"{cur_base_param_name}dkey", dbkey) cur_info = param_dict.get(f"{cur_base_param_name}info", info) cur_data_type = param_dict.get(f"{cur_base_param_name}data_type", data_type) if cur_name: data.name = cur_name if not data.info and cur_info: data.info = cur_info if cur_dbkey: data.dbkey = cur_dbkey if cur_data_type: data.extension = cur_data_type file_name = str(wrapped_data) extra_files_path = str(wrapped_data.files_path) data_dict = dict( out_data_name=out_name, ext=data.ext, dataset_id=data.dataset.id, hda_id=data.id, file_name=file_name, extra_files_path=extra_files_path, ) json_params["output_data"].append(data_dict) if json_filename is None: json_filename = file_name if json_filename is None: raise Exception("Must call 'exec_before_job' with 'out_data' containing at least one entry.") with open(json_filename, "w") as out: out.write(json.dumps(json_params))
[docs]class AsyncDataSourceTool(DataSourceTool): tool_type = "data_source_async" def _build_GALAXY_URL_parameter(self): return ToolParameter.build(self, XML(f'<param name="GALAXY_URL" type="baseurl" value="/async/{self.id}" />'))
[docs]class DataDestinationTool(Tool): tool_type = "data_destination"
[docs]class SetMetadataTool(Tool): """ Tool implementation for special tool that sets metadata on an existing dataset. """ tool_type = "set_metadata" requires_setting_metadata = False tool_action: "SetMetadataToolAction"
[docs] def regenerate_imported_metadata_if_needed(self, hda, history, user, session_id): if hda.has_metadata_files: job, *_ = self.tool_action.execute_via_app( self, self.app, session_id, history.id, user, incoming={"input1": hda}, overwrite=False, ) self.app.job_manager.enqueue(job=job, tool=self)
[docs] def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kwds): working_directory = app.object_store.get_filename(job, base_dir="job_work", dir_only=True, obj_dir=True) for name, dataset in inp_data.items(): external_metadata = get_metadata_compute_strategy(app.config, job.id, tool_id=self.id) sa_session = app.model.context metadata_set_successfully = external_metadata.external_metadata_set_successfully( dataset, name, sa_session, working_directory=working_directory ) if metadata_set_successfully: try: # external_metadata_set_successfully is only an approximation (the metadata json file exists), # things can still go wrong, but we don't want to fail here since it can lead to a resubmission loop external_metadata.load_metadata(dataset, name, sa_session, working_directory=working_directory) except Exception: metadata_set_successfully = False log.exception("Exception occured while loading metadata results") if not metadata_set_successfully: dataset.state = model.DatasetInstance.states.FAILED_METADATA self.sa_session.add(dataset) with transaction(self.sa_session): self.sa_session.commit() return # If setting external metadata has failed, how can we inform the # user? For now, we'll leave the default metadata and set the state # back to its original. dataset.datatype.after_setting_metadata(dataset) if job and job.tool_id == "1.0.0": dataset.state = param_dict.get("__ORIGINAL_DATASET_STATE__") else: # Revert dataset.state to fall back to dataset.dataset.state dataset.set_metadata_success_state() # Need to reset the peek, which may rely on metadata # TODO: move this into metadata setting, setting the peek requires dataset access, # and large chunks of the dataset may be read here. try: dataset.set_peek() except Exception: log.exception("Exception occured while setting dataset peek") self.sa_session.add(dataset) with transaction(self.sa_session): self.sa_session.commit()
[docs] def job_failed(self, job_wrapper, message, exception=False): job = job_wrapper.sa_session.get(Job, job_wrapper.job_id) if job: inp_data = {} for dataset_assoc in job.input_datasets: inp_data[dataset_assoc.name] = dataset_assoc.dataset return self.exec_after_process(job_wrapper.app, inp_data, {}, job_wrapper.get_param_dict(), job=job)
[docs]class ExportHistoryTool(Tool): tool_type = "export_history"
[docs]class ImportHistoryTool(Tool): tool_type = "import_history"
[docs] def exec_after_process(self, app, inp_data, out_data, param_dict, job, final_job_state=None, **kwds): super().exec_after_process(app, inp_data, out_data, param_dict, job=job, **kwds) if final_job_state != DETECTED_JOB_STATE.OK: return JobImportHistoryArchiveWrapper(self.app, job.id).cleanup_after_job()
[docs]class InteractiveTool(Tool): tool_type = "interactive" produces_entry_points = True
[docs] def __init__(self, config_file, tool_source, app, **kwd): assert app.config.interactivetools_enable, ValueError( "Trying to load an InteractiveTool, but InteractiveTools are not enabled." ) super().__init__(config_file, tool_source, app, **kwd)
def __remove_interactivetool_by_job(self, job): if job: eps = job.interactivetool_entry_points log.debug("__remove_interactivetool_by_job: %s", eps) self.app.interactivetool_manager.remove_entry_points(eps) else: log.warning("Could not determine job to stop InteractiveTool: %s", job)
[docs] def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kwds): # run original exec_after_process super().exec_after_process(app, inp_data, out_data, param_dict, job=job, **kwds) self.__remove_interactivetool_by_job(job)
[docs] def job_failed(self, job_wrapper, message, exception=False): super().job_failed(job_wrapper, message, exception=exception) job = job_wrapper.sa_session.get(Job, job_wrapper.job_id) self.__remove_interactivetool_by_job(job)
[docs]class DataManagerTool(OutputParameterJSONTool): tool_type = "manage_data" default_tool_action = DataManagerToolAction
[docs] def __init__(self, config_file, root, app, guid=None, data_manager_id=None, **kwds): self.data_manager_id = data_manager_id super().__init__(config_file, root, app, guid=guid, **kwds) if self.data_manager_id is None: self.data_manager_id = self.id
[docs] def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, final_job_state=None, **kwds): assert self.allow_user_access(job.user), "You must be an admin to access this tool." if final_job_state != DETECTED_JOB_STATE.OK: return # run original exec_after_process super().exec_after_process(app, inp_data, out_data, param_dict, job=job, **kwds) # process results of tool data_manager_id = job.data_manager_association.data_manager_id data_manager = self.app.data_managers.get_manager(data_manager_id) assert ( data_manager is not None ), f"Invalid data manager ({data_manager_id}) requested. It may have been removed before the job completed." data_manager_mode = param_dict.get("__data_manager_mode", "populate") if data_manager_mode == "populate": data_manager.process_result(out_data) elif data_manager_mode == "dry_run": pass elif data_manager_mode == "bundle": for bundle_path, dataset in data_manager.write_bundle(out_data).items(): hda = cast(model.HistoryDatasetAssociation, dataset) hda.dataset.object_store.update_from_file( hda.dataset, extra_dir=hda.dataset.extra_files_path_name, file_name=bundle_path, alt_name=os.path.basename(bundle_path), create=True, preserve_symlinks=True, ) else: raise Exception("Unknown data manager mode encountered type...")
[docs] def get_default_history_by_trans(self, trans, create=False): def _create_data_manager_history(user): history = trans.app.model.History(name="Data Manager History (automatically created)", user=user) data_manager_association = trans.app.model.DataManagerHistoryAssociation(user=user, history=history) trans.sa_session.add_all((history, data_manager_association)) with transaction(trans.sa_session): trans.sa_session.commit() return history user = trans.user assert user, "You must be logged in to use this tool." assert self.allow_user_access(user), "You must be an admin to access this tool." dm_history_associations = user.data_manager_histories if not dm_history_associations: # create if create: history = _create_data_manager_history(user) else: history = None else: for dm_history_association in reversed(dm_history_associations): history = dm_history_association.history if not history.deleted: break if history.deleted: if create: history = _create_data_manager_history(user) else: history = None return history
[docs] def allow_user_access(self, user, attempting_access=True) -> bool: """Check user access to this tool. :param user: model object representing user. :type user: galaxy.model.User :param attempting_access: is the user attempting to do something with the the tool (set false for incidental checks like toolbox listing) :type attempting_access: bool :returns: Whether the user is allowed to access the tool. Data Manager tools are only accessible to admins. """ if super().allow_user_access(user) and self.app.config.is_admin_user(user): return True # If this is just an incidental check - do not log the scary message # about users attempting to do something problematic. if attempting_access: if user: user = user.id log.debug("User (%s) attempted to access a data manager tool (%s), but is not an admin.", user, self.id) return False
[docs]class DatabaseOperationTool(Tool): default_tool_action = ModelOperationToolAction require_terminal_states = True require_dataset_ok = True tool_type_local = True require_terminal_or_paused_states = False @property def valid_input_states(self): if self.require_dataset_ok: return (model.Dataset.states.OK,) elif self.require_terminal_states: return model.Dataset.terminal_states elif self.require_terminal_or_paused_states: return model.Dataset.terminal_states or model.Dataset.states.PAUSED else: return model.Dataset.valid_input_states @property def allow_errored_inputs(self): return not self.require_dataset_ok
[docs] def check_inputs_ready(self, input_datasets, input_dataset_collections): def check_dataset_state(state): if self.require_terminal_states and state in model.Dataset.non_ready_states: raise ToolInputsNotReadyException("An input dataset is pending.") if self.require_dataset_ok: if state != model.Dataset.states.OK: raise ToolInputsNotOKException( f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'", src="hda", id=input_dataset.id, ) for input_dataset in input_datasets.values(): if input_dataset: # None is a possible input for optional inputs check_dataset_state(input_dataset.state) for input_dataset_collection_pairs in input_dataset_collections.values(): for input_dataset_collection, _ in input_dataset_collection_pairs: if not input_dataset_collection.collection.populated_optimized: raise ToolInputsNotReadyException("An input collection is not populated.") states, _ = input_dataset_collection.collection.dataset_states_and_extensions_summary for state in states: check_dataset_state(state)
def _add_datasets_to_history(self, history, elements, datasets_visible=False): for element_object in elements: if getattr(element_object, "history_content_type", None) == "dataset": element_object.visible = datasets_visible history.stage_addition(element_object)
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): return self._outputs_dict()
def _outputs_dict(self): return {}
[docs]class UnzipCollectionTool(DatabaseOperationTool): tool_type = "unzip_collection" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): has_collection = incoming["input"] if hasattr(has_collection, "element_type"): # It is a DCE collection = has_collection.element_object else: # It is an HDCA collection = has_collection.collection assert collection.collection_type == "paired" forward_o, reverse_o = collection.dataset_instances forward, reverse = forward_o.copy(copy_tags=forward_o.tags, flush=False), reverse_o.copy( copy_tags=reverse_o.tags, flush=False ) self._add_datasets_to_history(history, [forward, reverse]) out_data["forward"] = forward out_data["reverse"] = reverse
[docs]class ZipCollectionTool(DatabaseOperationTool): tool_type = "zip_collection" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): forward_o = incoming["input_forward"] reverse_o = incoming["input_reverse"] forward, reverse = forward_o.copy(copy_tags=forward_o.tags, flush=False), reverse_o.copy( copy_tags=reverse_o.tags, flush=False ) new_elements = {} new_elements["forward"] = forward new_elements["reverse"] = reverse self._add_datasets_to_history(history, [forward, reverse]) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class CrossProductFlatCollectionTool(DatabaseOperationTool): tool_type = "cross_product_flat" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): input_a = incoming["input_a"] input_b = incoming["input_b"] join_identifier = incoming["join_identifier"] output_a = {} output_b = {} all_copied_hdas = [] for input_a_dce in input_a.collection.elements: element_identifier_a = input_a_dce.element_identifier for input_b_dce in input_b.collection.elements: element_identifier_b = input_b_dce.element_identifier identifier = f"{element_identifier_a}{join_identifier}{element_identifier_b}" hda_a_copy = input_a_dce.element_object.copy(copy_tags=input_a_dce.element_object.tags, flush=False) hda_b_copy = input_b_dce.element_object.copy(copy_tags=input_b_dce.element_object.tags, flush=False) all_copied_hdas.append(hda_a_copy) all_copied_hdas.append(hda_b_copy) output_a[identifier] = hda_a_copy output_b[identifier] = hda_b_copy self._add_datasets_to_history(history, all_copied_hdas) output_collections.create_collection( self.outputs["output_a"], "output_a", elements=output_a, propagate_hda_tags=False ) output_collections.create_collection( self.outputs["output_b"], "output_b", elements=output_b, propagate_hda_tags=False )
[docs]class CrossProductNestedCollectionTool(DatabaseOperationTool): tool_type = "cross_product_nested" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): input_a = incoming["input_a"] input_b = incoming["input_b"] output_a = {} output_b = {} all_copied_hdas = [] for input_a_dce in input_a.collection.elements: element_identifier_a = input_a_dce.element_identifier iter_elements_a = {} iter_elements_b = {} for input_b_dce in input_b.collection.elements: element_identifier_b = input_b_dce.element_identifier hda_a_copy = input_a_dce.element_object.copy(copy_tags=input_a_dce.element_object.tags, flush=False) hda_b_copy = input_b_dce.element_object.copy(copy_tags=input_b_dce.element_object.tags, flush=False) all_copied_hdas.append(hda_a_copy) all_copied_hdas.append(hda_b_copy) iter_elements_a[element_identifier_b] = hda_a_copy iter_elements_b[element_identifier_b] = hda_b_copy sub_collection_a: Dict[str, Any] = {} sub_collection_a["src"] = "new_collection" sub_collection_a["collection_type"] = "list" sub_collection_a["elements"] = iter_elements_a output_a[element_identifier_a] = sub_collection_a sub_collection_b: Dict[str, Any] = {} sub_collection_b["src"] = "new_collection" sub_collection_b["collection_type"] = "list" sub_collection_b["elements"] = iter_elements_b output_b[element_identifier_a] = sub_collection_b self._add_datasets_to_history(history, all_copied_hdas) output_collections.create_collection( self.outputs["output_a"], "output_a", elements=output_a, propagate_hda_tags=False ) output_collections.create_collection( self.outputs["output_b"], "output_b", elements=output_b, propagate_hda_tags=False )
[docs]class BuildListCollectionTool(DatabaseOperationTool): tool_type = "build_list" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): new_elements = {} for i, incoming_repeat in enumerate(incoming["datasets"]): if incoming_repeat["input"]: try: id_select = incoming_repeat["id_cond"]["id_select"] except KeyError: # Prior to tool version 1.2.0 id_select = "idx" if id_select == "idx": identifier = str(i) elif id_select == "identifier": identifier = getattr(incoming_repeat["input"], "element_identifier", incoming_repeat["input"].name) elif id_select == "manual": identifier = incoming_repeat["id_cond"]["identifier"] new_elements[identifier] = incoming_repeat["input"].copy( copy_tags=incoming_repeat["input"].tags, flush=False ) self._add_datasets_to_history(history, new_elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class ExtractDatasetCollectionTool(DatabaseOperationTool): tool_type = "extract_dataset" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, tags=None, **kwds): has_collection = incoming["input"] if hasattr(has_collection, "element_type"): # It is a DCE collection = has_collection.element_object else: # It is an HDCA collection = has_collection.collection collection_type = collection.collection_type assert collection_type in ["list", "paired"] how = incoming["which"]["which_dataset"] if how == "first": extracted_element = collection.first_dataset_element if not extracted_element: raise exceptions.RequestParameterInvalidException("Input collection has no dataset elements.") elif how == "by_identifier": try: extracted_element = collection[incoming["which"]["identifier"]] except KeyError as e: raise exceptions.RequestParameterInvalidException(e.args[0]) elif how == "by_index": try: extracted_element = collection[int(incoming["which"]["index"])] except KeyError as e: raise exceptions.RequestParameterInvalidException(e.args[0]) else: raise exceptions.RequestParameterInvalidException("Invalid tool parameters.") extracted = extracted_element.element_object extracted_o = extracted.copy( copy_tags=extracted.tags, new_name=extracted_element.element_identifier, flush=False ) self._add_datasets_to_history(history, [extracted_o], datasets_visible=True) out_data["output"] = extracted_o
[docs]class MergeCollectionTool(DatabaseOperationTool): tool_type = "merge_collection" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): input_lists = [] for incoming_repeat in incoming["inputs"]: input_lists.append(incoming_repeat["input"]) dupl_actions = "keep_first" suffix_pattern = None if (advanced := incoming.get("advanced", None)) is not None: dupl_actions = advanced["conflict"]["duplicate_options"] if dupl_actions in ["suffix_conflict", "suffix_every", "suffix_conflict_rest"]: suffix_pattern = advanced["conflict"]["suffix_pattern"] new_element_structure = {} # Which inputs does the identifier appear in. identifiers_map: Dict[str, List[int]] = {} for input_num, input_list in enumerate(input_lists): for dce in input_list.collection.elements: element_identifier = dce.element_identifier if element_identifier not in identifiers_map: identifiers_map[element_identifier] = [] elif dupl_actions == "fail": raise exceptions.MessageException( f"Duplicate collection element identifiers found for [{element_identifier}]" ) identifiers_map[element_identifier].append(input_num) for copy, input_list in enumerate(input_lists): for dce in input_list.collection.elements: element = dce.element_object element_identifier = dce.element_identifier identifier_seen = element_identifier in new_element_structure appearances = identifiers_map[element_identifier] add_suffix = False if dupl_actions == "suffix_every": add_suffix = True elif dupl_actions == "suffix_conflict" and len(appearances) > 1: add_suffix = True elif dupl_actions == "suffix_conflict_rest" and len(appearances) > 1 and appearances[0] != copy: add_suffix = True if dupl_actions == "keep_first" and identifier_seen: continue if add_suffix and suffix_pattern: suffix = suffix_pattern.replace("#", str(copy + 1)) effective_identifer = f"{element_identifier}{suffix}" else: effective_identifer = element_identifier new_element_structure[effective_identifer] = element # Don't copy until we know everything is fine and we have the structure of the list ready to go. new_elements = {} for key, value in new_element_structure.items(): if getattr(value, "history_content_type", None) == "dataset": copied_value = value.copy(copy_tags=value.tags, flush=False) else: copied_value = value.copy(flush=False) new_elements[key] = copied_value self._add_datasets_to_history(history, new_elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class FilterDatasetsTool(DatabaseOperationTool): require_terminal_states = True require_dataset_ok = False def _get_new_elements(self, history, elements_to_copy): new_elements = {} for dce in elements_to_copy: element_identifier = dce.element_identifier if getattr(dce.element_object, "history_content_type", None) == "dataset": copied_value = dce.element_object.copy(copy_tags=dce.element_object.tags, flush=False) else: copied_value = dce.element_object.copy(flush=False) new_elements[element_identifier] = copied_value return new_elements
[docs] @staticmethod def element_is_valid(element: model.DatasetCollectionElement): element_object = element.element_object assert isinstance(element_object, model.DatasetInstance) return element_object.is_ok
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): collection = incoming["input"] if hasattr(collection, "element_object"): # A list elements = collection.element_object.elements collection_type = collection.element_object.collection_type else: # A list of pairs elements = collection.collection.elements collection_type = collection.collection.collection_type # We only process list or list of pair collections. Higher order collection will be mapped over assert collection_type in ("list", "list:paired") elements_to_copy = [] for element in elements: if collection_type == "list": if self.element_is_valid(element): elements_to_copy.append(element) else: valid = True for child_element in element.child_collection.elements: if not self.element_is_valid(child_element): valid = False if valid: elements_to_copy.append(element) new_elements = self._get_new_elements(history=history, elements_to_copy=elements_to_copy) self._add_datasets_to_history(history, new_elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class FilterFailedDatasetsTool(FilterDatasetsTool): tool_type = "filter_failed_datasets_collection" require_dataset_ok = False
[docs] @staticmethod def element_is_valid(element: model.DatasetCollectionElement): element_object = element.element_object assert isinstance(element_object, model.DatasetInstance) return element_object.is_ok
[docs]class KeepSuccessDatasetsTool(FilterDatasetsTool): tool_type = "keep_success_datasets_collection" require_terminal_states = False require_dataset_ok = False require_terminal_or_paused_states = True
[docs] @staticmethod def element_is_valid(element: model.DatasetCollectionElement): element_object = element.element_object assert isinstance(element_object, model.DatasetInstance) if ( element_object.state != model.Dataset.states.PAUSED and element_object.state in model.Dataset.non_ready_states ): raise ToolInputsNotReadyException("An input dataset is pending.") return element_object.is_ok
[docs]class FilterEmptyDatasetsTool(FilterDatasetsTool): tool_type = "filter_empty_datasets_collection" require_dataset_ok = False
[docs] @staticmethod def element_is_valid(element: model.DatasetCollectionElement): element_object = element.element_object assert isinstance(element_object, model.DatasetInstance) if element_object.has_data(): # We have data, but it might just be a compressed archive of nothing file_name = element_object.get_file_name() _, fh = get_fileobj_raw(file_name, mode="rb") if len(fh.read(1)): return True return False
[docs]class FlattenTool(DatabaseOperationTool): tool_type = "flatten_collection" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): hdca = incoming["input"] join_identifier = incoming["join_identifier"] new_elements = {} copied_datasets = [] def add_elements(collection, prefix=""): for dce in collection.elements: dce_object = dce.element_object dce_identifier = dce.element_identifier identifier = f"{prefix}{join_identifier}{dce_identifier}" if prefix else dce_identifier if dce.is_collection: add_elements(dce_object, prefix=identifier) else: copied_dataset = dce_object.copy(copy_tags=dce_object.tags, flush=False) new_elements[identifier] = copied_dataset copied_datasets.append(copied_dataset) add_elements(hdca.collection) self._add_datasets_to_history(history, copied_datasets) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class SortTool(DatabaseOperationTool): tool_type = "sort_collection" require_terminal_states = True require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): hdca = incoming["input"] sorttype = incoming["sort_type"]["sort_type"] new_elements = {} elements = hdca.collection.elements presort_elements = None if sorttype == "alpha": presort_elements = [(dce.element_identifier, dce) for dce in elements] elif sorttype == "numeric": presort_elements = [(int(re.sub("[^0-9]", "", dce.element_identifier)), dce) for dce in elements] elif sorttype == "file": hda = incoming["sort_type"]["sort_file"] data_lines = hda.metadata.get("data_lines", 0) if data_lines == len(elements): old_elements_dict = {} for element in elements: old_elements_dict[element.element_identifier] = element try: with open(hda.get_file_name()) as fh: sorted_elements = [old_elements_dict[line.strip()] for line in fh] except KeyError: hdca_history_name = f"{hdca.hid}: {hdca.name}" message = f"List of element identifiers does not match element identifiers in collection '{hdca_history_name}'" raise exceptions.MessageException(message) else: message = f"Number of lines must match number of list elements ({len(elements)}), but file has {data_lines} lines" raise exceptions.MessageException(message) else: raise exceptions.MessageException(f"Unknown sort_type '{sorttype}'") if presort_elements is not None: sorted_elements = [x[1] for x in sorted(presort_elements, key=lambda x: x[0])] for dce in sorted_elements: dce_object = dce.element_object if getattr(dce_object, "history_content_type", None) == "dataset": copied_dataset = dce_object.copy(copy_tags=dce_object.tags, flush=False) else: copied_dataset = dce_object.copy(flush=False) new_elements[dce.element_identifier] = copied_dataset self._add_datasets_to_history(history, new_elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class HarmonizeTool(DatabaseOperationTool): tool_type = "harmonize_list" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): # Get the 2 input collections hdca1 = incoming["input1"] hdca2 = incoming["input2"] # Get the elements of both collections elements1 = hdca1.collection.elements elements2 = hdca2.collection.elements # Put elements in dictionary with identifiers: old_elements1_dict = {} for element in elements1: old_elements1_dict[element.element_identifier] = element old_elements2_dict = {} for element in elements2: old_elements2_dict[element.element_identifier] = element # Get the list of final identifiers final_sorted_identifiers = [ element.element_identifier for element in elements1 if element.element_identifier in old_elements2_dict ] # Raise Exception if it is empty if len(final_sorted_identifiers) == 0: # Create empty collections: output_collections.create_collection( next(iter(self.outputs.values())), "output1", elements={}, propagate_hda_tags=False ) output_collections.create_collection( next(iter(self.outputs.values())), "output2", elements={}, propagate_hda_tags=False ) return def output_with_selected_identifiers(old_elements_dict, output_label): # Create a new dictionary with the elements in the good order new_elements = {} for identifier in final_sorted_identifiers: dce_object = old_elements_dict[identifier].element_object if getattr(dce_object, "history_content_type", None) == "dataset": copied_dataset = dce_object.copy(copy_tags=dce_object.tags, flush=False) else: copied_dataset = dce_object.copy(flush=False) new_elements[identifier] = copied_dataset # Add datasets: self._add_datasets_to_history(history, new_elements.values()) # Create collections: output_collections.create_collection( next(iter(self.outputs.values())), output_label, elements=new_elements, propagate_hda_tags=False ) # Create outputs: output_with_selected_identifiers(old_elements1_dict, "output1") output_with_selected_identifiers(old_elements2_dict, "output2")
[docs]class RelabelFromFileTool(DatabaseOperationTool): tool_type = "relabel_from_file"
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): hdca = incoming["input"] how_type = incoming["how"]["how_select"] new_labels_dataset_assoc = incoming["how"]["labels"] strict = string_as_bool(incoming["how"]["strict"]) new_elements = {} def add_copied_value_to_new_elements(new_label, dce_object): new_label = new_label.strip() if new_label in new_elements: raise exceptions.MessageException( f"New identifier [{new_label}] appears twice in resulting collection, these values must be unique." ) if getattr(dce_object, "history_content_type", None) == "dataset": copied_value = dce_object.copy(copy_tags=dce_object.tags, flush=False) else: copied_value = dce_object.copy(flush=False) new_elements[new_label] = copied_value new_labels_path = new_labels_dataset_assoc.get_file_name() with open(new_labels_path) as fh: new_labels = fh.readlines(1024 * 1000000) if strict and len(hdca.collection.elements) != len(new_labels): raise exceptions.MessageException("Relabel mapping file contains incorrect number of identifiers") if how_type == "tabular": # We have a tabular file, where the first column is an existing element identifier, # and the second column is the new element identifier. new_labels_dict = {} source_new_label = (line.strip().split("\t") for line in new_labels) for i, label_pair in enumerate(source_new_label): if not len(label_pair) == 2: raise exceptions.MessageException( f"Relabel mapping file line {i + 1} contains {len(label_pair)} columns, but 2 are required" ) new_labels_dict[label_pair[0]] = label_pair[1] for dce in hdca.collection.elements: dce_object = dce.element_object element_identifier = dce.element_identifier default = None if strict else element_identifier new_label = new_labels_dict.get(element_identifier, default) if not new_label: raise exceptions.MessageException(f"Failed to find new label for identifier [{element_identifier}]") add_copied_value_to_new_elements(new_label, dce_object) else: # If new_labels_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset for i, dce in enumerate(hdca.collection.elements): dce_object = dce.element_object add_copied_value_to_new_elements(new_labels[i], dce_object) for key in new_elements.keys(): if not re.match(r"^[\w\- \.,]+$", key): raise exceptions.MessageException(f"Invalid new collection identifier [{key}]") self._add_datasets_to_history(history, new_elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class ApplyRulesTool(DatabaseOperationTool): tool_type = "apply_rules"
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, tag_handler, **kwds): hdca = incoming["input"] rule_set = RuleSet(incoming["rules"]) copied_datasets = [] def copy_dataset(dataset, tags): copied_dataset = dataset.copy(copy_tags=dataset.tags, flush=False) if tags is not None: tag_handler.set_tags_from_list( trans.get_user(), copied_dataset, tags, flush=False, ) copied_dataset.history_id = history.id copied_datasets.append(copied_dataset) return copied_dataset new_elements = self.app.dataset_collection_manager.apply_rules(hdca, rule_set, copy_dataset) self._add_datasets_to_history(history, copied_datasets) output_collections.create_collection( next(iter(self.outputs.values())), "output", collection_type=rule_set.collection_type, elements=new_elements, propagate_hda_tags=False, )
[docs]class TagFromFileTool(DatabaseOperationTool): tool_type = "tag_from_file" # We don't currently discriminate which input has to be in which state # so we do need all inputs to be "ok", when in fact only the file input # needs to be ok. # require_terminal_states = True # require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, tag_handler, **kwds): hdca = incoming["input"] how = incoming["how"] new_tags_dataset_assoc = incoming["tags"] new_elements = {} new_datasets = [] def add_copied_value_to_new_elements(new_tags_dict, dce): if getattr(dce.element_object, "history_content_type", None) == "dataset": copied_value = dce.element_object.copy(copy_tags=dce.element_object.tags, flush=False) # copy should never be visible, since part of a collection copied_value.visble = False new_datasets.append(copied_value) new_tags = new_tags_dict.get(dce.element_identifier) if new_tags: if how in ("add", "remove") and dce.element_object.tags: # We need get the original tags and update them with the new tags old_tags = {tag for tag in tag_handler.get_tags_str(dce.element_object.tags).split(",") if tag} if how == "add": old_tags.update(set(new_tags)) elif how == "remove": old_tags = old_tags - set(new_tags) new_tags = old_tags tag_handler.add_tags_from_list( user=history.user, item=copied_value, new_tags_list=new_tags, flush=False, ) else: # We have a collection, and we copy the elements so that we don't manipulate the original tags copied_value = dce.element_object.copy(element_destination=history, flush=False) for new_element, old_element in zip(copied_value.dataset_elements, dce.element_object.dataset_elements): # TODO: This should be eliminated, but collections created by the collection builder # don't set `visible` to `False` if you don't hide the original elements. new_element.element_object.visible = False new_tags = new_tags_dict.get(new_element.element_identifier) if how in ("add", "remove"): old_tags = { tag for tag in tag_handler.get_tags_str(old_element.element_object.tags).split(",") if tag } if new_tags: if how == "add": old_tags.update(set(new_tags)) elif how == "remove": old_tags = old_tags - set(new_tags) new_tags = old_tags tag_handler.add_tags_from_list( user=history.user, item=new_element.element_object, new_tags_list=new_tags, flush=False ) new_elements[dce.element_identifier] = copied_value new_tags_path = new_tags_dataset_assoc.get_file_name() with open(new_tags_path) as fh: new_tags = fh.readlines(1024 * 1000000) # We have a tabular file, where the first column is an existing element identifier, # and the remaining columns represent new tags. source_new_tags = (line.strip().split("\t") for line in new_tags) new_tags_dict = {item[0]: item[1:] for item in source_new_tags} for dce in hdca.collection.elements: add_copied_value_to_new_elements(new_tags_dict, dce) self._add_datasets_to_history(history, new_datasets) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False )
[docs]class FilterFromFileTool(DatabaseOperationTool): tool_type = "filter_from_file"
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): hdca = incoming["input"] how_filter = incoming["how"]["how_filter"] filter_dataset_assoc = incoming["how"]["filter_source"] filtered_elements = {} discarded_elements = {} filtered_path = filter_dataset_assoc.get_file_name() with open(filtered_path) as fh: filtered_identifiers = [i.strip() for i in fh.readlines(1024 * 1000000)] # If filtered_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset for dce in hdca.collection.elements: dce_object = dce.element_object element_identifier = dce.element_identifier in_filter_file = element_identifier in filtered_identifiers passes_filter = in_filter_file if how_filter == "remove_if_absent" else not in_filter_file if getattr(dce_object, "history_content_type", None) == "dataset": copied_value = dce_object.copy(copy_tags=dce_object.tags, flush=False) else: copied_value = dce_object.copy(flush=False) if passes_filter: filtered_elements[element_identifier] = copied_value else: discarded_elements[element_identifier] = copied_value self._add_datasets_to_history(history, filtered_elements.values()) output_collections.create_collection( self.outputs["output_filtered"], "output_filtered", elements=filtered_elements, propagate_hda_tags=False ) self._add_datasets_to_history(history, discarded_elements.values()) output_collections.create_collection( self.outputs["output_discarded"], "output_discarded", elements=discarded_elements, propagate_hda_tags=False )
[docs]class DuplicateFileToCollectionTool(DatabaseOperationTool): tool_type = "duplicate_file_to_collection" require_terminal_states = False require_dataset_ok = False
[docs] def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): hda = incoming["input"] number = int(incoming["number"]) element_identifier = incoming["element_identifier"] elements = { f"{element_identifier} {n}": hda.copy(copy_tags=hda.tags, flush=False) for n in range(1, number + 1) } self._add_datasets_to_history(history, elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=elements, propagate_hda_tags=False )
# Populate tool_type to ToolClass mappings tool_types = {} TOOL_CLASSES: List[Type[Tool]] = [ Tool, SetMetadataTool, OutputParameterJSONTool, ExpressionTool, InteractiveTool, DataManagerTool, DataSourceTool, AsyncDataSourceTool, UnzipCollectionTool, ZipCollectionTool, MergeCollectionTool, RelabelFromFileTool, FilterFromFileTool, DuplicateFileToCollectionTool, BuildListCollectionTool, ExtractDatasetCollectionTool, DataDestinationTool, ] for tool_class in TOOL_CLASSES: tool_types[tool_class.tool_type] = tool_class # ---- Utility classes to be factored out -----------------------------------
[docs]class TracksterConfig: """Trackster configuration encapsulation."""
[docs] def __init__(self, actions): self.actions = actions
[docs] @staticmethod def parse(root): actions = [] for action_elt in root.findall("action"): actions.append(SetParamAction.parse(action_elt)) return TracksterConfig(actions)
[docs]class SetParamAction: """Set parameter action."""
[docs] def __init__(self, name, output_name): self.name = name self.output_name = output_name
[docs] @staticmethod def parse(elt): """Parse action from element.""" return SetParamAction(elt.get("name"), elt.get("output_name"))
[docs]class BadValue:
[docs] def __init__(self, value): self.value = value
[docs]class InterruptedUpload(Exception): pass