Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.cwl.runtime_actions
import json
import os
import shutil
from galaxy.util import safe_makedirs
from .cwltool_deps import ref_resolver
from .parser import (
JOB_JSON_FILE,
load_job_proxy,
)
from .util import (
SECONDARY_FILES_INDEX_PATH,
STORE_SECONDARY_FILES_WITH_BASENAME,
)
def file_dict_to_description(file_dict):
output_class = file_dict["class"]
assert output_class in ["File", "Directory"], file_dict
location = file_dict["location"]
if location.startswith("_:"):
assert output_class == "File"
return LiteralFileDescription(file_dict["contents"])
elif output_class == "File":
return PathFileDescription(_possible_uri_to_path(location))
else:
return PathDirectoryDescription(_possible_uri_to_path(location))
class FileDescription:
pass
class PathFileDescription:
def __init__(self, path):
self.path = path
def write_to(self, destination):
# TODO: Move if we can be sure this is in the working directory for instance...
shutil.copy(self.path, destination)
class PathDirectoryDescription:
def __init__(self, path):
self.path = path
def write_to(self, destination):
shutil.copytree(self.path, destination)
class LiteralFileDescription:
def __init__(self, content):
self.content = content
def write_to(self, destination):
with open(destination, "wb") as f:
f.write(self.content.encode("UTF-8"))
def _possible_uri_to_path(location):
if location.startswith("file://"):
path = ref_resolver.uri_file_path(location)
else:
path = location
return path
[docs]def handle_outputs(job_directory=None):
# Relocate dynamically collected files to pre-determined locations
# registered with ToolOutput objects via from_work_dir handling.
if job_directory is None:
job_directory = os.path.join(os.getcwd(), os.path.pardir)
cwl_job_file = os.path.join(job_directory, JOB_JSON_FILE)
if not os.path.exists(cwl_job_file):
# Not a CWL job, just continue
return
# So we only need to do strict validation when the tool was loaded,
# no reason to do it again during job execution - so this shortcut
# allows us to not need Galaxy's full configuration on job nodes.
job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False)
tool_working_directory = os.path.join(job_directory, "working")
cwl_metadata_params_path = os.path.join(job_directory, "cwl_params.json")
with open(cwl_metadata_params_path) as f:
cwl_metadata_params = json.load(f)
job_id_tag = cwl_metadata_params["job_id_tag"]
from galaxy.job_execution.output_collect import (
default_exit_code_file,
read_exit_code_from,
)
exit_code_file = default_exit_code_file(".", job_id_tag)
tool_exit_code = read_exit_code_from(exit_code_file, job_id_tag)
outputs = job_proxy.collect_outputs(tool_working_directory, tool_exit_code)
# Build galaxy.json file.
provided_metadata = {}
def move_directory(output, target_path, output_name=None):
assert output["class"] == "Directory"
output_path = _possible_uri_to_path(output["location"])
if output_path.startswith("_:"):
assert "listing" in output, "Do not know how to handle output, no 'listing' found."
listing = output["listing"]
# No a real path, just copy listing to target path.
safe_makedirs(target_path)
for listed_file in listing:
# TODO: handle directories
assert listed_file["class"] == "File"
file_description = file_dict_to_description(listed_file)
file_description.write_to(os.path.join(target_path, listed_file["basename"]))
else:
shutil.move(output_path, target_path)
return {"created_from_basename": output["basename"]}
def move_output(output, target_path, output_name=None):
assert output["class"] == "File"
file_description = file_dict_to_description(output)
file_description.write_to(target_path)
secondary_files = output.get("secondaryFiles", [])
if secondary_files:
order = []
index_contents = {"order": order}
for secondary_file in secondary_files:
if output_name is None:
raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements")
# TODO: handle nested files...
secondary_file_description = file_dict_to_description(secondary_file)
# assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path)
secondary_file_basename = secondary_file["basename"]
if not STORE_SECONDARY_FILES_WITH_BASENAME:
output_basename = output["basename"]
prefix = ""
while True:
if secondary_file_basename.startswith(output_basename):
secondary_file_name = prefix + secondary_file_basename[len(output_basename) :]
break
prefix = f"^{prefix}"
if "." not in output_basename:
secondary_file_name = prefix + secondary_file_name
break
else:
output_basename = output_basename.rsplit(".", 1)[0]
else:
secondary_file_name = secondary_file_basename
# Convert to ^ format....
secondary_files_dir = job_proxy.output_secondary_files_dir(output_name, create=True)
extra_target = os.path.join(secondary_files_dir, secondary_file_name)
secondary_file_description.write_to(extra_target)
order.append(secondary_file_name)
with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f:
json.dump(index_contents, f)
return {"created_from_basename": output["basename"]}
def handle_known_output(output, output_key, output_name):
# if output["class"] != "File":
# # This case doesn't seem like it would be reached - why is this here?
# provided_metadata[output_name] = {
# "ext": "expression.json",
# }
# else:
assert output_name
if output["class"] == "File":
target_path = job_proxy.output_path(output_name)
file_metadata = move_output(output, target_path, output_name=output_name)
elif output["class"] == "Directory":
target_path = job_proxy.output_directory_contents_dir(output_name)
file_metadata = move_directory(output, target_path, output_name=output_name)
else:
raise Exception(f"Unknown output type [{output}] encountered")
provided_metadata[output_name] = file_metadata
def handle_known_output_json(output, output_name):
target_path = job_proxy.output_path(output_name)
with open(target_path, "w") as f:
f.write(json.dumps(output))
provided_metadata[output_name] = {
"ext": "expression.json",
}
handled_outputs = []
for output_name, output in outputs.items():
handled_outputs.append(output_name)
if isinstance(output, dict) and "location" in output:
handle_known_output(output, output_name, output_name)
elif isinstance(output, dict):
prefix = f"{output_name}|__part__|"
for record_key, record_value in output.items():
record_value_output_key = f"{prefix}{record_key}"
if isinstance(record_value, dict) and "class" in record_value:
handle_known_output(record_value, record_value_output_key, output_name)
else:
# param_evaluation_noexpr
handle_known_output_json(output, output_name)
elif isinstance(output, list):
elements = []
for index, el in enumerate(output):
if isinstance(el, dict) and el["class"] == "File":
output_path = _possible_uri_to_path(el["location"])
elements.append(
{"name": str(index), "filename": output_path, "created_from_basename": el["basename"]}
)
else:
target_path = f"{output_name}____{str(index)}"
with open(target_path, "w") as f:
f.write(json.dumps(el))
elements.append({"name": str(index), "filename": target_path, "ext": "expression.json"})
provided_metadata[output_name] = {"elements": elements}
else:
handle_known_output_json(output, output_name)
for output_instance in job_proxy._tool_proxy.output_instances():
output_name = output_instance.name
if output_name not in handled_outputs:
handle_known_output_json(None, output_name)
job_metadata = os.path.join(job_directory, cwl_metadata_params["job_metadata"])
with open(job_metadata, "w") as f:
json.dump(provided_metadata, f)
__all__ = ("handle_outputs",)