Source code for galaxy.tool_util.loader_directory

"""Utilities for loading and reasoning about unparsed tools in directories."""

import fnmatch
import glob
import logging
import os
import re
import sys

import yaml

from galaxy.tool_util import loader
from galaxy.tool_util.parser import get_tool_source
from galaxy.util import checkers

log = logging.getLogger(__name__)

PATH_DOES_NOT_EXIST_ERROR = "Could not load tools from path [%s] - this path does not exist."
PATH_AND_RECURSIVE_ERROR = "Cannot specify a single file and recursive."
LOAD_FAILURE_ERROR = "Failed to load tool with path %s."
TOOL_LOAD_ERROR = object()
TOOL_REGEX = re.compile(r"<tool\s")
DATA_MANAGER_REGEX = re.compile(r"\stool_type=\"manage_data\"")

YAML_EXTENSIONS = [".yaml", ".yml", ".json"]
CWL_EXTENSIONS = YAML_EXTENSIONS + [".cwl"]
EXCLUDE_WALK_DIRS = [".hg", ".git", ".venv"]


def load_exception_handler(path, exc_info):
    """Default exception handler for use by load_tool_elements_from_path."""
    log.warning(LOAD_FAILURE_ERROR % path, exc_info=exc_info)


[docs]def find_possible_tools_from_path( path, recursive=False, enable_beta_formats=False, ): """Walk a directory and find potential tool files.""" possible_tool_files = [] for possible_tool_file in _find_tool_files(path, recursive=recursive, enable_beta_formats=enable_beta_formats): try: does_look_like_a_tool = looks_like_a_tool(possible_tool_file, enable_beta_formats=enable_beta_formats) except OSError: # Some problem reading the tool file, skip. continue if does_look_like_a_tool: possible_tool_files.append(possible_tool_file) return possible_tool_files
[docs]def load_tool_sources_from_path( path, load_exception_handler=load_exception_handler, recursive=False, register_load_errors=False, ): """Walk a directory and ToolSource objects.""" return _load_tools_from_path( path, load_exception_handler=load_exception_handler, recursive=recursive, register_load_errors=register_load_errors, loader_func=get_tool_source, enable_beta_formats=True, )
[docs]def load_tool_elements_from_path( path, load_exception_handler=load_exception_handler, recursive=False, register_load_errors=False, ): """Walk a directory and load tool XML elements.""" return _load_tools_from_path( path, load_exception_handler=load_exception_handler, recursive=recursive, register_load_errors=register_load_errors, loader_func=loader.load_tool, enable_beta_formats=False, )
def _load_tools_from_path( path, load_exception_handler, recursive, register_load_errors, loader_func, enable_beta_formats, ): loaded_objects = [] for possible_tool_file in find_possible_tools_from_path( path, recursive=recursive, enable_beta_formats=enable_beta_formats, ): try: tool_element = loader_func(possible_tool_file) loaded_objects.append((possible_tool_file, tool_element)) except Exception: exc_info = sys.exc_info() load_exception_handler(possible_tool_file, exc_info) if register_load_errors: loaded_objects.append((possible_tool_file, TOOL_LOAD_ERROR)) return loaded_objects
[docs]def is_tool_load_error(obj): """Predicate to determine if object loaded for tool is a tool error.""" return obj is TOOL_LOAD_ERROR
def looks_like_a_tool(path_or_uri_like, invalid_names=None, enable_beta_formats=False): """Quick check to see if a file looks like it may be a tool file. Whether true in a strict sense or not, lets say the intention and purpose of this procedure is to serve as a filter - all valid tools must "looks_like_a_tool" but not everything that looks like a tool is actually a valid tool. invalid_names may be supplied in the context of the tool shed to quickly rule common tool shed XML files. """ invalid_names = invalid_names or [] path = resolved_path(path_or_uri_like) if path is UNRESOLVED_URI: # Assume the path maps to a real tool. return True looks = False if os.path.basename(path) in invalid_names: return False if looks_like_a_tool_xml(path): looks = True if not looks and enable_beta_formats: for tool_checker in BETA_TOOL_CHECKERS.values(): if tool_checker(path): looks = True break return looks def looks_like_xml(path, regex=TOOL_REGEX): full_path = os.path.abspath(path) if not full_path.endswith(".xml"): return False if not os.path.getsize(full_path): return False if ( checkers.check_binary(full_path) or checkers.check_image(full_path) or checkers.is_gzip(full_path) or checkers.is_bz2(full_path) or checkers.is_zip(full_path) ): return False with open(path, encoding="utf-8") as f: try: start_contents = f.read(5 * 1024) except UnicodeDecodeError: return False if regex.search(start_contents): return True return False
[docs]def looks_like_a_tool_xml(path): """Quick check to see if a file looks like it may be a Galaxy XML tool file.""" return looks_like_xml(path=path, regex=TOOL_REGEX)
def looks_like_a_data_manager_xml(path): """Quick check to see if a file looks like it may be a Galaxy data manager XML file.""" return looks_like_xml(path=path, regex=DATA_MANAGER_REGEX) def as_dict_if_looks_like_yaml_or_cwl_with_class(path, classes): """ get a dict from yaml file if it contains a line `class: CLASS`, where CLASS is any string given in CLASSES. must appear in the first 5k and also load properly in total. """ with open(path, encoding="utf-8") as f: try: start_contents = f.read(5 * 1024) except UnicodeDecodeError: return False, None if re.search(rf"^class:\s+{'|'.join(classes)}\s*$", start_contents, re.MULTILINE) is None: return False, None with open(path) as f: try: as_dict = yaml.safe_load(f) except Exception: return False, None if not isinstance(as_dict, dict): return False, None file_class = as_dict.get("class", None) if file_class not in classes: return False, None return True, as_dict
[docs]def is_a_yaml_with_class(path, classes): """Determine if a file is a valid YAML with a supplied ``class`` entry.""" if not _has_extension(path, YAML_EXTENSIONS): return False is_yaml, as_dict = as_dict_if_looks_like_yaml_or_cwl_with_class(path, classes) return is_yaml
[docs]def looks_like_a_tool_yaml(path): """Quick check to see if a file looks like it may be a Galaxy YAML tool file.""" return is_a_yaml_with_class(path, ["GalaxyTool"])
[docs]def looks_like_a_cwl_artifact(path, classes=None): """Quick check to see if a file looks like it may be a CWL artifact.""" if not _has_extension(path, CWL_EXTENSIONS): return False is_yaml, as_dict = as_dict_if_looks_like_yaml_or_cwl_with_class(path, classes) if not is_yaml: return False file_cwl_version = as_dict.get("cwlVersion", None) return file_cwl_version is not None
[docs]def looks_like_a_tool_cwl(path): """Quick check to see if a file looks like it may be a CWL tool.""" return looks_like_a_cwl_artifact(path, classes=["CommandLineTool", "ExpressionTool"])
def _find_tool_files(path_or_uri_like, recursive, enable_beta_formats): path = resolved_path(path_or_uri_like) if path is UNRESOLVED_URI: # Pass the URI through and assume it maps to a real tool. return [path_or_uri_like] is_file = not os.path.isdir(path) if not os.path.exists(path): raise Exception(PATH_DOES_NOT_EXIST_ERROR % path) elif is_file and recursive: raise Exception(PATH_AND_RECURSIVE_ERROR) elif is_file: return [os.path.abspath(path)] else: if enable_beta_formats: if not recursive: files = glob.glob(f"{path}/*") else: files = _find_files(path, "*") else: if not recursive: files = glob.glob(f"{path}/*.xml") else: files = _find_files(path, "*.xml") return [os.path.abspath(_) for _ in files] def _has_extension(path, extensions): return any(path.endswith(e) for e in extensions) def _find_files(directory, pattern="*"): if not os.path.exists(directory): raise ValueError(f"Directory not found {directory}") matches = [] for root, dirnames, filenames in os.walk(directory): # exclude some directories (like .hg) from traversing dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS] for filename in filenames: full_path = os.path.join(root, filename) if fnmatch.filter([full_path], pattern): matches.append(os.path.join(root, filename)) return matches UNRESOLVED_URI = object() def resolved_path(path_or_uri_like): """If this is a simple file path, return the path else UNRESOLVED_URI.""" if "://" not in path_or_uri_like: return path_or_uri_like elif path_or_uri_like.startswith("file://"): return path_or_uri_like[len("file://") :] else: return UNRESOLVED_URI BETA_TOOL_CHECKERS = { "yaml": looks_like_a_tool_yaml, "cwl": looks_like_a_tool_cwl, } __all__ = ( "find_possible_tools_from_path", "is_a_yaml_with_class", "is_tool_load_error", "load_tool_elements_from_path", "load_tool_sources_from_path", "looks_like_a_cwl_artifact", "looks_like_a_tool_cwl", "looks_like_a_tool_xml", "looks_like_a_tool_yaml", )