Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.cwl.runtime_actions

import json
import os
import shutil

from galaxy.util import safe_makedirs
from .cwltool_deps import ref_resolver
from .parser import (
    JOB_JSON_FILE,
    load_job_proxy,
)
from .util import (
    SECONDARY_FILES_INDEX_PATH,
    STORE_SECONDARY_FILES_WITH_BASENAME,
)


class FileDescription(object):
    pass


class PathFileDescription(object):

    def __init__(self, path):
        self.path = path

    def write_to(self, destination):
        # TODO: Move if we can be sure this is in the working directory for instance...
        shutil.copy(self.path, destination)


class LiteralFileDescription(object):

    def __init__(self, content):
        self.content = content

    def write_to(self, destination):
        with open(destination, "wb") as f:
            f.write(self.content.encode("UTF-8"))


def _possible_uri_to_path(location):
    if location.startswith("file://"):
        path = ref_resolver.uri_file_path(location)
    else:
        path = location
    return path


def file_dict_to_description(file_dict):
    assert file_dict["class"] == "File", file_dict
    location = file_dict["location"]
    if location.startswith("_:"):
        return LiteralFileDescription(file_dict["contents"])
    else:
        return PathFileDescription(_possible_uri_to_path(location))


[docs]def handle_outputs(job_directory=None): # Relocate dynamically collected files to pre-determined locations # registered with ToolOutput objects via from_work_dir handling. if job_directory is None: job_directory = os.path.join(os.getcwd(), os.path.pardir) cwl_job_file = os.path.join(job_directory, JOB_JSON_FILE) if not os.path.exists(cwl_job_file): # Not a CWL job, just continue return # So we only need to do strict validation when the tool was loaded, # no reason to do it again during job execution - so this shortcut # allows us to not need Galaxy's full configuration on job nodes. job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False) tool_working_directory = os.path.join(job_directory, "working") outputs = job_proxy.collect_outputs(tool_working_directory) # Build galaxy.json file. provided_metadata = {} def move_directory(output, target_path, output_name=None): assert output["class"] == "Directory" output_path = _possible_uri_to_path(output["location"]) if output_path.startswith("_:"): # No a real path, just copy listing to target path. safe_makedirs(target_path) for listed_file in output["listing"]: # TODO: handle directories assert listed_file["class"] == "File" file_description = file_dict_to_description(listed_file) file_description.write_to(os.path.join(target_path, listed_file["basename"])) else: shutil.move(output_path, target_path) return {"cwl_filename": output["basename"]} def move_output(output, target_path, output_name=None): assert output["class"] == "File" file_description = file_dict_to_description(output) file_description.write_to(target_path) secondary_files = output.get("secondaryFiles", []) if secondary_files: order = [] index_contents = { "order": order } for secondary_file in secondary_files: if output_name is None: raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements") # TODO: handle nested files... secondary_file_description = file_dict_to_description(secondary_file) # assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path) secondary_file_basename = secondary_file["basename"] if not STORE_SECONDARY_FILES_WITH_BASENAME: output_basename = output["basename"] prefix = "" while True: if secondary_file_basename.startswith(output_basename): secondary_file_name = prefix + secondary_file_basename[len(output_basename):] break prefix = "^%s" % prefix if "." not in output_basename: secondary_file_name = prefix + secondary_file_name break else: output_basename = output_basename.rsplit(".", 1)[0] else: secondary_file_name = secondary_file_basename # Convert to ^ format.... secondary_files_dir = job_proxy.output_secondary_files_dir( output_name, create=True ) extra_target = os.path.join(secondary_files_dir, secondary_file_name) secondary_file_description.write_to(extra_target) order.append(secondary_file_name) with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f: json.dump(index_contents, f) return {"cwl_filename": output["basename"]} def handle_known_output(output, output_key, output_name): # if output["class"] != "File": # # This case doesn't seem like it would be reached - why is this here? # provided_metadata[output_name] = { # "ext": "expression.json", # } # else: assert output_name if output["class"] == "File": target_path = job_proxy.output_path(output_name) file_metadata = move_output(output, target_path, output_name=output_name) elif output["class"] == "Directory": target_path = job_proxy.output_directory_contents_dir(output_name) file_metadata = move_directory(output, target_path, output_name=output_name) else: raise Exception("Unknown output type [%s] encountered" % output) provided_metadata[output_name] = file_metadata for output_name, output in outputs.items(): if isinstance(output, dict) and "location" in output: handle_known_output(output, output_name, output_name) elif isinstance(output, dict): prefix = "%s|__part__|" % output_name for record_key, record_value in output.items(): record_value_output_key = "%s%s" % (prefix, record_key) handle_known_output(record_value, record_value_output_key, output_name) elif isinstance(output, list): elements = [] for index, el in enumerate(output): if isinstance(el, dict) and el["class"] == "File": output_path = _possible_uri_to_path(el["location"]) elements.append({"name": str(index), "filename": output_path, "cwl_filename": el["basename"]}) else: target_path = "%s____%s" % (output_name, str(index)) with open(target_path, "w") as f: f.write(json.dumps(el)) elements.append({"name": str(index), "filename": target_path, "ext": "expression.json"}) provided_metadata[output_name] = {"elements": elements} else: target_path = job_proxy.output_path(output_name) with open(target_path, "w") as f: f.write(json.dumps(output)) provided_metadata[output_name] = { "ext": "expression.json", } with open("galaxy.json", "w") as f: json.dump(provided_metadata, f)
__all__ = ( 'handle_outputs', )