Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.cwl.runtime_actions

import json
import os
import shutil

from galaxy.util import safe_makedirs
from .cwltool_deps import ref_resolver
from .parser import (
    JOB_JSON_FILE,
    load_job_proxy,
)
from .util import (
    SECONDARY_FILES_INDEX_PATH,
    STORE_SECONDARY_FILES_WITH_BASENAME,
)


def file_dict_to_description(file_dict):
    output_class = file_dict["class"]
    assert output_class in ["File", "Directory"], file_dict
    location = file_dict["location"]
    if location.startswith("_:"):
        assert output_class == "File"
        return LiteralFileDescription(file_dict["contents"])
    elif output_class == "File":
        return PathFileDescription(_possible_uri_to_path(location))
    else:
        return PathDirectoryDescription(_possible_uri_to_path(location))


class FileDescription:
    pass


class PathFileDescription:
    def __init__(self, path):
        self.path = path

    def write_to(self, destination):
        # TODO: Move if we can be sure this is in the working directory for instance...
        shutil.copy(self.path, destination)


class PathDirectoryDescription:
    def __init__(self, path):
        self.path = path

    def write_to(self, destination):
        shutil.copytree(self.path, destination)


class LiteralFileDescription:
    def __init__(self, content):
        self.content = content

    def write_to(self, destination):
        with open(destination, "wb") as f:
            f.write(self.content.encode("UTF-8"))


def _possible_uri_to_path(location):
    if location.startswith("file://"):
        path = ref_resolver.uri_file_path(location)
    else:
        path = location
    return path


[docs]def handle_outputs(job_directory=None): # Relocate dynamically collected files to pre-determined locations # registered with ToolOutput objects via from_work_dir handling. if job_directory is None: job_directory = os.path.join(os.getcwd(), os.path.pardir) metadata_directory = os.path.join(job_directory, "metadata") metadata_params_path = os.path.join(metadata_directory, "params.json") try: with open(metadata_params_path) as f: metadata_params = json.load(f) except OSError: raise Exception(f"Failed to find params.json from metadata directory [{metadata_directory}]") cwl_job_file = os.path.join(job_directory, JOB_JSON_FILE) if not os.path.exists(cwl_job_file): # Not a CWL job, just continue return # So we only need to do strict validation when the tool was loaded, # no reason to do it again during job execution - so this shortcut # allows us to not need Galaxy's full configuration on job nodes. job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False) tool_working_directory = os.path.join(job_directory, "working") job_id_tag = metadata_params["job_id_tag"] from galaxy.job_execution.output_collect import ( default_exit_code_file, read_exit_code_from, ) exit_code_file = default_exit_code_file(".", job_id_tag) tool_exit_code = read_exit_code_from(exit_code_file, job_id_tag) outputs = job_proxy.collect_outputs(tool_working_directory, tool_exit_code) # Build galaxy.json file. provided_metadata = {} def move_directory(output, target_path, output_name=None): assert output["class"] == "Directory" output_path = _possible_uri_to_path(output["location"]) if output_path.startswith("_:"): assert "listing" in output, "Do not know how to handle output, no 'listing' found." listing = output["listing"] # No a real path, just copy listing to target path. safe_makedirs(target_path) for listed_file in listing: # TODO: handle directories assert listed_file["class"] == "File" file_description = file_dict_to_description(listed_file) file_description.write_to(os.path.join(target_path, listed_file["basename"])) else: shutil.move(output_path, target_path) return {"created_from_basename": output["basename"]} def move_output(output, target_path, output_name=None): assert output["class"] == "File" file_description = file_dict_to_description(output) file_description.write_to(target_path) secondary_files = output.get("secondaryFiles", []) if secondary_files: order = [] index_contents = {"order": order} for secondary_file in secondary_files: if output_name is None: raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements") # TODO: handle nested files... secondary_file_description = file_dict_to_description(secondary_file) # assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path) secondary_file_basename = secondary_file["basename"] if not STORE_SECONDARY_FILES_WITH_BASENAME: output_basename = output["basename"] prefix = "" while True: if secondary_file_basename.startswith(output_basename): secondary_file_name = prefix + secondary_file_basename[len(output_basename) :] break prefix = f"^{prefix}" if "." not in output_basename: secondary_file_name = prefix + secondary_file_name break else: output_basename = output_basename.rsplit(".", 1)[0] else: secondary_file_name = secondary_file_basename # Convert to ^ format.... secondary_files_dir = job_proxy.output_secondary_files_dir(output_name, create=True) extra_target = os.path.join(secondary_files_dir, secondary_file_name) secondary_file_description.write_to(extra_target) order.append(secondary_file_name) with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f: json.dump(index_contents, f) return {"created_from_basename": output["basename"]} def handle_known_output(output, output_key, output_name): # if output["class"] != "File": # # This case doesn't seem like it would be reached - why is this here? # provided_metadata[output_name] = { # "ext": "expression.json", # } # else: assert output_name if output["class"] == "File": target_path = job_proxy.output_path(output_name) file_metadata = move_output(output, target_path, output_name=output_name) elif output["class"] == "Directory": target_path = job_proxy.output_directory_contents_dir(output_name) file_metadata = move_directory(output, target_path, output_name=output_name) else: raise Exception(f"Unknown output type [{output}] encountered") provided_metadata[output_name] = file_metadata def handle_known_output_json(output, output_name): target_path = job_proxy.output_path(output_name) with open(target_path, "w") as f: f.write(json.dumps(output)) provided_metadata[output_name] = { "ext": "expression.json", } handled_outputs = [] for output_name, output in outputs.items(): handled_outputs.append(output_name) if isinstance(output, dict) and "location" in output: handle_known_output(output, output_name, output_name) elif isinstance(output, dict): prefix = f"{output_name}|__part__|" for record_key, record_value in output.items(): record_value_output_key = f"{prefix}{record_key}" if isinstance(record_value, dict) and "class" in record_value: handle_known_output(record_value, record_value_output_key, output_name) else: # param_evaluation_noexpr handle_known_output_json(output, output_name) elif isinstance(output, list): elements = [] for index, el in enumerate(output): if isinstance(el, dict) and el["class"] == "File": output_path = _possible_uri_to_path(el["location"]) elements.append( {"name": str(index), "filename": output_path, "created_from_basename": el["basename"]} ) else: target_path = f"{output_name}____{str(index)}" with open(target_path, "w") as f: f.write(json.dumps(el)) elements.append({"name": str(index), "filename": target_path, "ext": "expression.json"}) provided_metadata[output_name] = {"elements": elements} else: handle_known_output_json(output, output_name) for output_instance in job_proxy._tool_proxy.output_instances(): output_name = output_instance.name if output_name not in handled_outputs: handle_known_output_json(None, output_name) with open("galaxy.json", "w") as f: json.dump(provided_metadata, f)
__all__ = ("handle_outputs",)