Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.splitters.multi
import logging
import os
import shutil
from galaxy import model, util
from galaxy.util.getargspec import getfullargspec
log = logging.getLogger(__name__)
[docs]def do_split(job_wrapper):
    parent_job = job_wrapper.get_job()
    working_directory = os.path.abspath(job_wrapper.working_directory)
    parallel_settings = job_wrapper.get_parallelism().attributes
    # Syntax: split_inputs="input1,input2" shared_inputs="genome"
    # Designates inputs to be split or shared
    split_inputs = parallel_settings.get("split_inputs")
    if split_inputs is None:
        split_inputs = []
    else:
        split_inputs = [x.strip() for x in split_inputs.split(",")]
    shared_inputs = parallel_settings.get("shared_inputs")
    auto_shared_inputs = False
    if shared_inputs is None:
        shared_inputs = []
        auto_shared_inputs = True
    else:
        shared_inputs = [x.strip() for x in shared_inputs.split(",")]
    illegal_inputs = [x for x in shared_inputs if x in split_inputs]
    if len(illegal_inputs) > 0:
        raise Exception("Inputs have conflicting parallelism attributes: %s" % str(illegal_inputs))
    subdir_index = [0]  # use a list to get around Python 2.x lame closure support
    task_dirs = []
    def get_new_working_directory_name():
        dir = os.path.join(working_directory, 'task_%d' % subdir_index[0])
        subdir_index[0] = subdir_index[0] + 1
        if not os.path.exists(dir):
            os.makedirs(dir)
        task_dirs.append(dir)
        return dir
    # For things like paired end alignment, we need two inputs to be split. Since all inputs to all
    # derived subtasks need to be correlated, allow only one input type to be split
    # If shared_inputs are not specified, we assume all non-split inputs are shared inputs.
    # For any input we must consider if each input is None. With optional arguments, a data input could be set to optional
    type_to_input_map = {}
    for input in parent_job.input_datasets:
        if input.dataset is None:
            if input.name in shared_inputs:
                shared_inputs.remove(input.name)
            else:
                pass
        else:
            if input.name in split_inputs:
                type_to_input_map.setdefault(input.dataset.datatype, []).append(input.name)
            elif input.name in shared_inputs:
                pass  # pass original file name
            elif auto_shared_inputs:
                shared_inputs.append(input.name)
            else:
                log_error = "The input '%s' does not define a method for implementing parallelism" % str(input.name)
                log.exception(log_error)
                raise Exception(log_error)
    if len(type_to_input_map) > 1:
        log_error = "The multi splitter does not support splitting inputs of more than one type"
        log.error(log_error)
        raise Exception(log_error)
    # split the first one to build up the task directories
    input_datasets = []
    for input in parent_job.input_datasets:
        if input.name in split_inputs:
            this_input_files = job_wrapper.get_input_dataset_fnames(input.dataset)
            if len(this_input_files) > 1:
                log_error = "The input '%s' is composed of multiple files - splitting is not allowed" % str(input.name)
                log.error(log_error)
                raise Exception(log_error)
            input_datasets.append(input.dataset)
    input_type = next(iter(type_to_input_map.keys()))
    # DBTODO execute an external task to do the splitting, this should happen at refactor.
    # If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status.
    try:
        input_type.split(input_datasets, get_new_working_directory_name, parallel_settings)
    except AttributeError:
        log_error = "The type '%s' does not define a method for splitting files" % str(input_type)
        log.error(log_error)
        raise
    log.debug('do_split created %d parts' % len(task_dirs))
    # next, after we know how many divisions there are, add the shared inputs via soft links
    for input in parent_job.input_datasets:
        if input and input.name in shared_inputs:
            names = job_wrapper.get_input_dataset_fnames(input.dataset)
            for dir in task_dirs:
                for file in names:
                    os.symlink(file, os.path.join(dir, os.path.basename(file)))
    tasks = []
    prepare_files = os.path.join(util.galaxy_directory(), 'extract_dataset_parts.sh') + ' %s'
    for dir in task_dirs:
        task = model.Task(parent_job, dir, prepare_files % dir)
        tasks.append(task)
    return tasks
[docs]def do_merge(job_wrapper, task_wrappers):
    parallel_settings = job_wrapper.get_parallelism().attributes
    # Syntax: merge_outputs="export" pickone_outputs="genomesize"
    # Designates outputs to be merged, or selected from as a representative
    merge_outputs = parallel_settings.get("merge_outputs")
    if merge_outputs is None:
        merge_outputs = []
    else:
        merge_outputs = [x.strip() for x in merge_outputs.split(",")]
    pickone_outputs = parallel_settings.get("pickone_outputs")
    if pickone_outputs is None:
        pickone_outputs = []
    else:
        pickone_outputs = [x.strip() for x in pickone_outputs.split(",")]
    illegal_outputs = [x for x in merge_outputs if x in pickone_outputs]
    if len(illegal_outputs) > 0:
        return ('Tool file error', 'Outputs have conflicting parallelism attributes: %s' % str(illegal_outputs))
    stdout = ''
    stderr = ''
    try:
        working_directory = job_wrapper.working_directory
        task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')]
        assert task_dirs, "Should be at least one sub-task!"
        # TODO: Output datasets can be very complex. This doesn't handle metadata files
        outputs = job_wrapper.get_output_hdas_and_fnames()
        output_paths = job_wrapper.get_output_fnames()
        pickone_done = []
        task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')]
        task_dirs.sort(key=lambda x: int(x.split('task_')[-1]))
        for index, output in enumerate(outputs):
            output_file_name = str(output_paths[index])  # Use false_path if set, else real path.
            base_output_name = os.path.basename(output_file_name)
            if output in merge_outputs:
                output_dataset = outputs[output][0]
                output_type = output_dataset.datatype
                output_files = [os.path.join(dir, base_output_name) for dir in task_dirs]
                # Just include those files f in the output list for which the
                # file f exists; some files may not exist if a task fails.
                output_files = [f for f in output_files if os.path.exists(f)]
                if output_files:
                    log.debug('files %s ' % output_files)
                    if len(output_files) < len(task_dirs):
                        log.debug('merging only %i out of expected %i files for %s'
                                  % (len(output_files), len(task_dirs), output_file_name))
                    # First two args to merge always output_files and path of dataset. More
                    # complicated merge methods may require more parameters. Set those up here.
                    extra_merge_arg_names = getfullargspec(output_type.merge).args[2:]
                    extra_merge_args = {}
                    if "output_dataset" in extra_merge_arg_names:
                        extra_merge_args["output_dataset"] = output_dataset
                    output_type.merge(output_files, output_file_name, **extra_merge_args)
                    log.debug('merge finished: %s' % output_file_name)
                else:
                    msg = 'nothing to merge for %s (expected %i files)' \
                          % (output_file_name, len(task_dirs))
                    log.debug(msg)
                    stderr += msg + "\n"
            elif output in pickone_outputs:
                # just pick one of them
                if output not in pickone_done:
                    task_file_name = os.path.join(task_dirs[0], base_output_name)
                    shutil.move(task_file_name, output_file_name)
                    pickone_done.append(output)
            else:
                log_error = "The output '%s' does not define a method for implementing parallelism" % output
                log.exception(log_error)
                raise Exception(log_error)
    except Exception as e:
        stdout = 'Error merging files'
        log.exception(stdout)
        stderr = util.unicodify(e)
    for tw in task_wrappers:
        # Prevent repetitive output, e.g. "Sequence File Aligned"x20
        # Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks.
        out = tw.get_task().stdout.strip()
        err = tw.get_task().stderr.strip()
        if len(out) > 0:
            stdout += "\n" + tw.working_directory + ':\n' + out
        if len(err) > 0:
            stderr += "\n" + tw.working_directory + ':\n' + err
    return (stdout, stderr)