Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.cwl.runtime_actions
import json
import os
import shutil
from galaxy.util import safe_makedirs
from .cwltool_deps import ref_resolver
from .parser import (
JOB_JSON_FILE,
load_job_proxy,
)
from .util import (
SECONDARY_FILES_INDEX_PATH,
STORE_SECONDARY_FILES_WITH_BASENAME,
)
class FileDescription(object):
pass
class PathFileDescription(object):
def __init__(self, path):
self.path = path
def write_to(self, destination):
# TODO: Move if we can be sure this is in the working directory for instance...
shutil.copy(self.path, destination)
class LiteralFileDescription(object):
def __init__(self, content):
self.content = content
def write_to(self, destination):
with open(destination, "wb") as f:
f.write(self.content.encode("UTF-8"))
def _possible_uri_to_path(location):
if location.startswith("file://"):
path = ref_resolver.uri_file_path(location)
else:
path = location
return path
def file_dict_to_description(file_dict):
assert file_dict["class"] == "File", file_dict
location = file_dict["location"]
if location.startswith("_:"):
return LiteralFileDescription(file_dict["contents"])
else:
return PathFileDescription(_possible_uri_to_path(location))
[docs]def handle_outputs(job_directory=None):
# Relocate dynamically collected files to pre-determined locations
# registered with ToolOutput objects via from_work_dir handling.
if job_directory is None:
job_directory = os.path.join(os.getcwd(), os.path.pardir)
cwl_job_file = os.path.join(job_directory, JOB_JSON_FILE)
if not os.path.exists(cwl_job_file):
# Not a CWL job, just continue
return
# So we only need to do strict validation when the tool was loaded,
# no reason to do it again during job execution - so this shortcut
# allows us to not need Galaxy's full configuration on job nodes.
job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False)
tool_working_directory = os.path.join(job_directory, "working")
outputs = job_proxy.collect_outputs(tool_working_directory)
# Build galaxy.json file.
provided_metadata = {}
def move_directory(output, target_path, output_name=None):
assert output["class"] == "Directory"
output_path = _possible_uri_to_path(output["location"])
if output_path.startswith("_:"):
# No a real path, just copy listing to target path.
safe_makedirs(target_path)
for listed_file in output["listing"]:
# TODO: handle directories
assert listed_file["class"] == "File"
file_description = file_dict_to_description(listed_file)
file_description.write_to(os.path.join(target_path, listed_file["basename"]))
else:
shutil.move(output_path, target_path)
return {"cwl_filename": output["basename"]}
def move_output(output, target_path, output_name=None):
assert output["class"] == "File"
file_description = file_dict_to_description(output)
file_description.write_to(target_path)
secondary_files = output.get("secondaryFiles", [])
if secondary_files:
order = []
index_contents = {
"order": order
}
for secondary_file in secondary_files:
if output_name is None:
raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements")
# TODO: handle nested files...
secondary_file_description = file_dict_to_description(secondary_file)
# assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path)
secondary_file_basename = secondary_file["basename"]
if not STORE_SECONDARY_FILES_WITH_BASENAME:
output_basename = output["basename"]
prefix = ""
while True:
if secondary_file_basename.startswith(output_basename):
secondary_file_name = prefix + secondary_file_basename[len(output_basename):]
break
prefix = "^%s" % prefix
if "." not in output_basename:
secondary_file_name = prefix + secondary_file_name
break
else:
output_basename = output_basename.rsplit(".", 1)[0]
else:
secondary_file_name = secondary_file_basename
# Convert to ^ format....
secondary_files_dir = job_proxy.output_secondary_files_dir(
output_name, create=True
)
extra_target = os.path.join(secondary_files_dir, secondary_file_name)
secondary_file_description.write_to(extra_target)
order.append(secondary_file_name)
with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f:
json.dump(index_contents, f)
return {"cwl_filename": output["basename"]}
def handle_known_output(output, output_key, output_name):
# if output["class"] != "File":
# # This case doesn't seem like it would be reached - why is this here?
# provided_metadata[output_name] = {
# "ext": "expression.json",
# }
# else:
assert output_name
if output["class"] == "File":
target_path = job_proxy.output_path(output_name)
file_metadata = move_output(output, target_path, output_name=output_name)
elif output["class"] == "Directory":
target_path = job_proxy.output_directory_contents_dir(output_name)
file_metadata = move_directory(output, target_path, output_name=output_name)
else:
raise Exception("Unknown output type [%s] encountered" % output)
provided_metadata[output_name] = file_metadata
for output_name, output in outputs.items():
if isinstance(output, dict) and "location" in output:
handle_known_output(output, output_name, output_name)
elif isinstance(output, dict):
prefix = "%s|__part__|" % output_name
for record_key, record_value in output.items():
record_value_output_key = "%s%s" % (prefix, record_key)
handle_known_output(record_value, record_value_output_key, output_name)
elif isinstance(output, list):
elements = []
for index, el in enumerate(output):
if isinstance(el, dict) and el["class"] == "File":
output_path = _possible_uri_to_path(el["location"])
elements.append({"name": str(index), "filename": output_path, "cwl_filename": el["basename"]})
else:
target_path = "%s____%s" % (output_name, str(index))
with open(target_path, "w") as f:
f.write(json.dumps(el))
elements.append({"name": str(index), "filename": target_path, "ext": "expression.json"})
provided_metadata[output_name] = {"elements": elements}
else:
target_path = job_proxy.output_path(output_name)
with open(target_path, "w") as f:
f.write(json.dumps(output))
provided_metadata[output_name] = {
"ext": "expression.json",
}
with open("galaxy.json", "w") as f:
json.dump(provided_metadata, f)
__all__ = (
'handle_outputs',
)