Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.splitters.multi
import logging
import os
import shutil
from galaxy import model, util
from galaxy.util.getargspec import getfullargspec
log = logging.getLogger(__name__)
[docs]def do_split(job_wrapper):
parent_job = job_wrapper.get_job()
working_directory = os.path.abspath(job_wrapper.working_directory)
parallel_settings = job_wrapper.get_parallelism().attributes
# Syntax: split_inputs="input1,input2" shared_inputs="genome"
# Designates inputs to be split or shared
split_inputs = parallel_settings.get("split_inputs")
if split_inputs is None:
split_inputs = []
else:
split_inputs = [x.strip() for x in split_inputs.split(",")]
shared_inputs = parallel_settings.get("shared_inputs")
auto_shared_inputs = False
if shared_inputs is None:
shared_inputs = []
auto_shared_inputs = True
else:
shared_inputs = [x.strip() for x in shared_inputs.split(",")]
illegal_inputs = [x for x in shared_inputs if x in split_inputs]
if len(illegal_inputs) > 0:
raise Exception(f"Inputs have conflicting parallelism attributes: {str(illegal_inputs)}")
subdir_index = [0] # use a list to get around Python 2.x lame closure support
task_dirs = []
def get_new_working_directory_name():
dir = os.path.join(working_directory, 'task_%d' % subdir_index[0])
subdir_index[0] = subdir_index[0] + 1
if not os.path.exists(dir):
os.makedirs(dir)
task_dirs.append(dir)
return dir
# For things like paired end alignment, we need two inputs to be split. Since all inputs to all
# derived subtasks need to be correlated, allow only one input type to be split
# If shared_inputs are not specified, we assume all non-split inputs are shared inputs.
# For any input we must consider if each input is None. With optional arguments, a data input could be set to optional
type_to_input_map = {}
for input in parent_job.input_datasets:
if input.dataset is None:
if input.name in shared_inputs:
shared_inputs.remove(input.name)
else:
pass
else:
if input.name in split_inputs:
type_to_input_map.setdefault(input.dataset.datatype, []).append(input.name)
elif input.name in shared_inputs:
pass # pass original file name
elif auto_shared_inputs:
shared_inputs.append(input.name)
else:
log_error = f"The input '{str(input.name)}' does not define a method for implementing parallelism"
log.exception(log_error)
raise Exception(log_error)
if len(type_to_input_map) > 1:
log_error = "The multi splitter does not support splitting inputs of more than one type"
log.error(log_error)
raise Exception(log_error)
# split the first one to build up the task directories
input_datasets = []
for input in parent_job.input_datasets:
if input.name in split_inputs:
this_input_files = job_wrapper.get_input_dataset_fnames(input.dataset)
if len(this_input_files) > 1:
log_error = f"The input '{str(input.name)}' is composed of multiple files - splitting is not allowed"
log.error(log_error)
raise Exception(log_error)
input_datasets.append(input.dataset)
input_type = next(iter(type_to_input_map.keys()))
# DBTODO execute an external task to do the splitting, this should happen at refactor.
# If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status.
try:
input_type.split(input_datasets, get_new_working_directory_name, parallel_settings)
except AttributeError:
log_error = f"The type '{str(input_type)}' does not define a method for splitting files"
log.error(log_error)
raise
log.debug('do_split created %d parts' % len(task_dirs))
# next, after we know how many divisions there are, add the shared inputs via soft links
for input in parent_job.input_datasets:
if input and input.name in shared_inputs:
names = job_wrapper.get_input_dataset_fnames(input.dataset)
for dir in task_dirs:
for file in names:
os.symlink(file, os.path.join(dir, os.path.basename(file)))
tasks = []
prepare_files = f"'{os.path.join(util.galaxy_directory(), 'extract_dataset_parts.sh')}' %s"
for dir in task_dirs:
task = model.Task(parent_job, dir, prepare_files % dir)
tasks.append(task)
return tasks
[docs]def do_merge(job_wrapper, task_wrappers):
parallel_settings = job_wrapper.get_parallelism().attributes
# Syntax: merge_outputs="export" pickone_outputs="genomesize"
# Designates outputs to be merged, or selected from as a representative
merge_outputs = parallel_settings.get("merge_outputs")
if merge_outputs is None:
merge_outputs = []
else:
merge_outputs = [x.strip() for x in merge_outputs.split(",")]
pickone_outputs = parallel_settings.get("pickone_outputs")
if pickone_outputs is None:
pickone_outputs = []
else:
pickone_outputs = [x.strip() for x in pickone_outputs.split(",")]
illegal_outputs = [x for x in merge_outputs if x in pickone_outputs]
if len(illegal_outputs) > 0:
return ('Tool file error', f'Outputs have conflicting parallelism attributes: {str(illegal_outputs)}')
stdout = ''
stderr = ''
try:
working_directory = job_wrapper.working_directory
task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')]
assert task_dirs, "Should be at least one sub-task!"
# TODO: Output datasets can be very complex. This doesn't handle metadata files
outputs = job_wrapper.get_output_hdas_and_fnames()
output_paths = job_wrapper.get_output_fnames()
pickone_done = []
task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')]
task_dirs.sort(key=lambda x: int(x.split('task_')[-1]))
for index, output in enumerate(outputs):
output_file_name = str(output_paths[index]) # Use false_path if set, else real path.
base_output_name = os.path.basename(output_file_name)
if output in merge_outputs:
output_dataset = outputs[output][0]
output_type = output_dataset.datatype
output_files = [os.path.join(dir, base_output_name) for dir in task_dirs]
# Just include those files f in the output list for which the
# file f exists; some files may not exist if a task fails.
output_files = [f for f in output_files if os.path.exists(f)]
if output_files:
log.debug(f'files {output_files} ')
if len(output_files) < len(task_dirs):
log.debug('merging only %i out of expected %i files for %s'
% (len(output_files), len(task_dirs), output_file_name))
# First two args to merge always output_files and path of dataset. More
# complicated merge methods may require more parameters. Set those up here.
extra_merge_arg_names = getfullargspec(output_type.merge).args[2:]
extra_merge_args = {}
if "output_dataset" in extra_merge_arg_names:
extra_merge_args["output_dataset"] = output_dataset
output_type.merge(output_files, output_file_name, **extra_merge_args)
log.debug(f'merge finished: {output_file_name}')
else:
msg = 'nothing to merge for %s (expected %i files)' \
% (output_file_name, len(task_dirs))
log.debug(msg)
stderr += f"{msg}\n"
elif output in pickone_outputs:
# just pick one of them
if output not in pickone_done:
task_file_name = os.path.join(task_dirs[0], base_output_name)
shutil.move(task_file_name, output_file_name)
pickone_done.append(output)
else:
log_error = f"The output '{output}' does not define a method for implementing parallelism"
log.exception(log_error)
raise Exception(log_error)
except Exception as e:
stdout = 'Error merging files'
log.exception(stdout)
stderr = util.unicodify(e)
for tw in task_wrappers:
# Prevent repetitive output, e.g. "Sequence File Aligned"x20
# Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks.
out = tw.get_task().stdout.strip()
err = tw.get_task().stderr.strip()
if len(out) > 0:
stdout += f"\n{tw.working_directory}:\n{out}"
if len(err) > 0:
stderr += f"\n{tw.working_directory}:\n{err}"
return (stdout, stderr)