import logging
import os
import shutil
from galaxy import (
model,
util,
)
log = logging.getLogger(__name__)
[docs]def do_split(job_wrapper):
parent_job = job_wrapper.get_job()
working_directory = os.path.abspath(job_wrapper.working_directory)
parallel_settings = job_wrapper.get_parallelism().attributes
# Syntax: split_inputs="input1,input2" shared_inputs="genome"
# Designates inputs to be split or shared
split_inputs = parallel_settings.get("split_inputs")
if split_inputs is None:
split_inputs = []
else:
split_inputs = [x.strip() for x in split_inputs.split(",")]
shared_inputs = parallel_settings.get("shared_inputs")
auto_shared_inputs = False
if shared_inputs is None:
shared_inputs = []
auto_shared_inputs = True
else:
shared_inputs = [x.strip() for x in shared_inputs.split(",")]
illegal_inputs = [x for x in shared_inputs if x in split_inputs]
if len(illegal_inputs) > 0:
raise Exception(f"Inputs have conflicting parallelism attributes: {str(illegal_inputs)}")
subdir_index = [0] # use a list to get around Python 2.x lame closure support
task_dirs = []
def get_new_working_directory_name():
dir = os.path.join(working_directory, "task_%d" % subdir_index[0])
subdir_index[0] = subdir_index[0] + 1
if not os.path.exists(dir):
os.makedirs(dir)
task_dirs.append(dir)
return dir
# For things like paired end alignment, we need two inputs to be split. Since all inputs to all
# derived subtasks need to be correlated, allow only one input type to be split
# If shared_inputs are not specified, we assume all non-split inputs are shared inputs.
# For any input we must consider if each input is None. With optional arguments, a data input could be set to optional
type_to_input_map = {}
for input in parent_job.input_datasets:
if input.dataset is None:
if input.name in shared_inputs:
shared_inputs.remove(input.name)
else:
pass
else:
if input.name in split_inputs:
type_to_input_map.setdefault(input.dataset.datatype, []).append(input.name)
elif input.name in shared_inputs:
pass # pass original file name
elif auto_shared_inputs:
shared_inputs.append(input.name)
else:
log_error = f"The input '{str(input.name)}' does not define a method for implementing parallelism"
log.exception(log_error)
raise Exception(log_error)
if len(type_to_input_map) > 1:
log_error = "The multi splitter does not support splitting inputs of more than one type"
log.error(log_error)
raise Exception(log_error)
# split the first one to build up the task directories
input_datasets = []
for input in parent_job.input_datasets:
if input.name in split_inputs:
this_input_files = job_wrapper.job_io.get_input_dataset_fnames(input.dataset)
if len(this_input_files) > 1:
log_error = f"The input '{str(input.name)}' is composed of multiple files - splitting is not allowed"
log.error(log_error)
raise Exception(log_error)
input_datasets.append(input.dataset)
input_type = next(iter(type_to_input_map.keys()))
# DBTODO execute an external task to do the splitting, this should happen at refactor.
# If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status.
try:
input_type.split(input_datasets, get_new_working_directory_name, parallel_settings)
except AttributeError:
log_error = f"The type '{str(input_type)}' does not define a method for splitting files"
log.error(log_error)
raise
log.debug("do_split created %d parts" % len(task_dirs))
# next, after we know how many divisions there are, add the shared inputs via soft links
for input in parent_job.input_datasets:
if input and input.name in shared_inputs:
names = job_wrapper.job_io.get_input_dataset_fnames(input.dataset)
for dir in task_dirs:
for file in names:
os.symlink(file, os.path.join(dir, os.path.basename(file)))
tasks = []
prepare_files = f"'{os.path.join(util.galaxy_directory(), 'extract_dataset_parts.sh')}' %s"
for dir in task_dirs:
task = model.Task(parent_job, dir, prepare_files % dir)
tasks.append(task)
return tasks
[docs]def do_merge(job_wrapper, task_wrappers):
parallel_settings = job_wrapper.get_parallelism().attributes
# Syntax: merge_outputs="export" pickone_outputs="genomesize"
# Designates outputs to be merged, or selected from as a representative
merge_outputs = parallel_settings.get("merge_outputs")
if merge_outputs is None:
merge_outputs = []
else:
merge_outputs = [x.strip() for x in merge_outputs.split(",")]
pickone_outputs = parallel_settings.get("pickone_outputs")
if pickone_outputs is None:
pickone_outputs = []
else:
pickone_outputs = [x.strip() for x in pickone_outputs.split(",")]
illegal_outputs = [x for x in merge_outputs if x in pickone_outputs]
if len(illegal_outputs) > 0:
return ("Tool file error", f"Outputs have conflicting parallelism attributes: {str(illegal_outputs)}")
stdout = ""
stderr = ""
try:
working_directory = job_wrapper.working_directory
task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith("task_")]
assert task_dirs, "Should be at least one sub-task!"
# TODO: Output datasets can be very complex. This doesn't handle metadata files
outputs = job_wrapper.job_io.get_output_hdas_and_fnames()
output_paths = job_wrapper.job_io.get_output_fnames()
pickone_done = []
task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith("task_")]
task_dirs.sort(key=lambda x: int(x.split("task_")[-1]))
for index, output in enumerate(outputs):
output_file_name = str(output_paths[index]) # Use false_path if set, else real path.
base_output_name = os.path.basename(output_file_name)
if output in merge_outputs:
output_type = outputs[output][0].datatype
output_files = [os.path.join(dir, base_output_name) for dir in task_dirs]
# Just include those files f in the output list for which the
# file f exists; some files may not exist if a task fails.
output_files = [f for f in output_files if os.path.exists(f)]
if output_files:
log.debug(f"files {output_files} ")
if len(output_files) < len(task_dirs):
log.debug(
"merging only %i out of expected %i files for %s"
% (len(output_files), len(task_dirs), output_file_name)
)
output_type.merge(output_files, output_file_name)
log.debug(f"merge finished: {output_file_name}")
else:
msg = "nothing to merge for %s (expected %i files)" % (output_file_name, len(task_dirs))
log.debug(msg)
stderr += f"{msg}\n"
elif output in pickone_outputs:
# just pick one of them
if output not in pickone_done:
task_file_name = os.path.join(task_dirs[0], base_output_name)
shutil.move(task_file_name, output_file_name)
pickone_done.append(output)
else:
log_error = f"The output '{output}' does not define a method for implementing parallelism"
log.exception(log_error)
raise Exception(log_error)
except Exception as e:
stdout = "Error merging files"
log.exception(stdout)
stderr = util.unicodify(e)
for tw in task_wrappers:
# Prevent repetitive output, e.g. "Sequence File Aligned"x20
# Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks.
out = tw.get_task().stdout.strip()
err = tw.get_task().stderr.strip()
if len(out) > 0:
stdout += f"\n{tw.working_directory}:\n{out}"
if len(err) > 0:
stderr += f"\n{tw.working_directory}:\n{err}"
return (stdout, stderr)