Source code for galaxy.tool_util.cwl.runtime_actions

import json
import os
import shutil

from galaxy.util import safe_makedirs
from .cwltool_deps import ref_resolver
from .parser import (
from .util import (

def file_dict_to_description(file_dict):
    output_class = file_dict["class"]
    assert output_class in ["File", "Directory"], file_dict
    location = file_dict["location"]
    if location.startswith("_:"):
        assert output_class == "File"
        return LiteralFileDescription(file_dict["contents"])
    elif output_class == "File":
        return PathFileDescription(_possible_uri_to_path(location))
        return PathDirectoryDescription(_possible_uri_to_path(location))

class FileDescription:

class PathFileDescription:

    def __init__(self, path):
        self.path = path

    def write_to(self, destination):
        # TODO: Move if we can be sure this is in the working directory for instance...
        shutil.copy(self.path, destination)

class PathDirectoryDescription:

    def __init__(self, path):
        self.path = path

    def write_to(self, destination):
        shutil.copytree(self.path, destination)

class LiteralFileDescription:

    def __init__(self, content):
        self.content = content

    def write_to(self, destination):
        with open(destination, "wb") as f:

def _possible_uri_to_path(location):
    if location.startswith("file://"):
        path = ref_resolver.uri_file_path(location)
        path = location
    return path

[docs]def handle_outputs(job_directory=None): # Relocate dynamically collected files to pre-determined locations # registered with ToolOutput objects via from_work_dir handling. if job_directory is None: job_directory = os.path.join(os.getcwd(), os.path.pardir) metadata_directory = os.path.join(job_directory, "metadata") metadata_params_path = os.path.join(metadata_directory, "params.json") try: with open(metadata_params_path) as f: metadata_params = json.load(f) except OSError: raise Exception("Failed to find params.json from metadata directory [%s]" % metadata_directory) cwl_job_file = os.path.join(job_directory, JOB_JSON_FILE) if not os.path.exists(cwl_job_file): # Not a CWL job, just continue return # So we only need to do strict validation when the tool was loaded, # no reason to do it again during job execution - so this shortcut # allows us to not need Galaxy's full configuration on job nodes. job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False) tool_working_directory = os.path.join(job_directory, "working") job_id_tag = metadata_params["job_id_tag"] from galaxy.job_execution.output_collect import default_exit_code_file, read_exit_code_from exit_code_file = default_exit_code_file(".", job_id_tag) tool_exit_code = read_exit_code_from(exit_code_file, job_id_tag) outputs = job_proxy.collect_outputs(tool_working_directory, tool_exit_code) # Build galaxy.json file. provided_metadata = {} def move_directory(output, target_path, output_name=None): assert output["class"] == "Directory" output_path = _possible_uri_to_path(output["location"]) if output_path.startswith("_:"): # No a real path, just copy listing to target path. safe_makedirs(target_path) for listed_file in output["listing"]: # TODO: handle directories assert listed_file["class"] == "File" file_description = file_dict_to_description(listed_file) file_description.write_to(os.path.join(target_path, listed_file["basename"])) else: shutil.move(output_path, target_path) return {"created_from_basename": output["basename"]} def move_output(output, target_path, output_name=None): assert output["class"] == "File" file_description = file_dict_to_description(output) file_description.write_to(target_path) secondary_files = output.get("secondaryFiles", []) if secondary_files: order = [] index_contents = { "order": order } for secondary_file in secondary_files: if output_name is None: raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements") # TODO: handle nested files... secondary_file_description = file_dict_to_description(secondary_file) # assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path) secondary_file_basename = secondary_file["basename"] if not STORE_SECONDARY_FILES_WITH_BASENAME: output_basename = output["basename"] prefix = "" while True: if secondary_file_basename.startswith(output_basename): secondary_file_name = prefix + secondary_file_basename[len(output_basename):] break prefix = "^%s" % prefix if "." not in output_basename: secondary_file_name = prefix + secondary_file_name break else: output_basename = output_basename.rsplit(".", 1)[0] else: secondary_file_name = secondary_file_basename # Convert to ^ format.... secondary_files_dir = job_proxy.output_secondary_files_dir( output_name, create=True ) extra_target = os.path.join(secondary_files_dir, secondary_file_name) secondary_file_description.write_to(extra_target) order.append(secondary_file_name) with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f: json.dump(index_contents, f) return {"created_from_basename": output["basename"]} def handle_known_output(output, output_key, output_name): # if output["class"] != "File": # # This case doesn't seem like it would be reached - why is this here? # provided_metadata[output_name] = { # "ext": "expression.json", # } # else: assert output_name if output["class"] == "File": target_path = job_proxy.output_path(output_name) file_metadata = move_output(output, target_path, output_name=output_name) elif output["class"] == "Directory": target_path = job_proxy.output_directory_contents_dir(output_name) file_metadata = move_directory(output, target_path, output_name=output_name) else: raise Exception("Unknown output type [%s] encountered" % output) provided_metadata[output_name] = file_metadata def handle_known_output_json(output, output_name): target_path = job_proxy.output_path(output_name) with open(target_path, "w") as f: f.write(json.dumps(output)) provided_metadata[output_name] = { "ext": "expression.json", } handled_outputs = [] for output_name, output in outputs.items(): handled_outputs.append(output_name) if isinstance(output, dict) and "location" in output: handle_known_output(output, output_name, output_name) elif isinstance(output, dict): prefix = "%s|__part__|" % output_name for record_key, record_value in output.items(): record_value_output_key = "{}{}".format(prefix, record_key) if isinstance(record_value, dict) and "class" in record_value: handle_known_output(record_value, record_value_output_key, output_name) else: # param_evaluation_noexpr handle_known_output_json(output, output_name) elif isinstance(output, list): elements = [] for index, el in enumerate(output): if isinstance(el, dict) and el["class"] == "File": output_path = _possible_uri_to_path(el["location"]) elements.append({"name": str(index), "filename": output_path, "created_from_basename": el["basename"]}) else: target_path = "{}____{}".format(output_name, str(index)) with open(target_path, "w") as f: f.write(json.dumps(el)) elements.append({"name": str(index), "filename": target_path, "ext": "expression.json"}) provided_metadata[output_name] = {"elements": elements} else: handle_known_output_json(output, output_name) for output_instance in job_proxy._tool_proxy.output_instances(): output_name = output_instance.name if output_name not in handled_outputs: handle_known_output_json(None, output_name) with open("galaxy.json", "w") as f: json.dump(provided_metadata, f)
__all__ = ( 'handle_outputs', )