Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.splitters.multi

import logging
import os
import shutil

from galaxy import (
    model,
    util,
)

log = logging.getLogger(__name__)


[docs]def do_split(job_wrapper): parent_job = job_wrapper.get_job() working_directory = os.path.abspath(job_wrapper.working_directory) parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: split_inputs="input1,input2" shared_inputs="genome" # Designates inputs to be split or shared split_inputs = parallel_settings.get("split_inputs") if split_inputs is None: split_inputs = [] else: split_inputs = [x.strip() for x in split_inputs.split(",")] shared_inputs = parallel_settings.get("shared_inputs") auto_shared_inputs = False if shared_inputs is None: shared_inputs = [] auto_shared_inputs = True else: shared_inputs = [x.strip() for x in shared_inputs.split(",")] illegal_inputs = [x for x in shared_inputs if x in split_inputs] if len(illegal_inputs) > 0: raise Exception(f"Inputs have conflicting parallelism attributes: {str(illegal_inputs)}") subdir_index = [0] # use a list to get around Python 2.x lame closure support task_dirs = [] def get_new_working_directory_name(): dir = os.path.join(working_directory, f"task_{subdir_index[0]}") subdir_index[0] = subdir_index[0] + 1 if not os.path.exists(dir): os.makedirs(dir) task_dirs.append(dir) return dir # For things like paired end alignment, we need two inputs to be split. Since all inputs to all # derived subtasks need to be correlated, allow only one input type to be split # If shared_inputs are not specified, we assume all non-split inputs are shared inputs. # For any input we must consider if each input is None. With optional arguments, a data input could be set to optional type_to_input_map = {} for input in parent_job.input_datasets: if input.dataset is None: if input.name in shared_inputs: shared_inputs.remove(input.name) else: pass else: if input.name in split_inputs: type_to_input_map.setdefault(input.dataset.datatype, []).append(input.name) elif input.name in shared_inputs: pass # pass original file name elif auto_shared_inputs: shared_inputs.append(input.name) else: log_error = f"The input '{str(input.name)}' does not define a method for implementing parallelism" log.exception(log_error) raise Exception(log_error) if len(type_to_input_map) > 1: log_error = "The multi splitter does not support splitting inputs of more than one type" log.error(log_error) raise Exception(log_error) # split the first one to build up the task directories input_datasets = [] for input in parent_job.input_datasets: if input.name in split_inputs: this_input_files = job_wrapper.job_io.get_input_dataset_fnames(input.dataset) if len(this_input_files) > 1: log_error = f"The input '{str(input.name)}' is composed of multiple files - splitting is not allowed" log.error(log_error) raise Exception(log_error) input_datasets.append(input.dataset) input_type = next(iter(type_to_input_map.keys())) # DBTODO execute an external task to do the splitting, this should happen at refactor. # If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status. try: input_type.split(input_datasets, get_new_working_directory_name, parallel_settings) except AttributeError: log_error = f"The type '{str(input_type)}' does not define a method for splitting files" log.error(log_error) raise log.debug("do_split created %d parts", len(task_dirs)) # next, after we know how many divisions there are, add the shared inputs via soft links for input in parent_job.input_datasets: if input and input.name in shared_inputs: names = job_wrapper.job_io.get_input_dataset_fnames(input.dataset) for dir in task_dirs: for file in names: os.symlink(file, os.path.join(dir, os.path.basename(file))) tasks = [] prepare_files = f"'{os.path.join(util.galaxy_directory(), 'extract_dataset_parts.sh')}' %s" for dir in task_dirs: task = model.Task(parent_job, dir, prepare_files % dir) tasks.append(task) return tasks
[docs]def do_merge(job_wrapper, task_wrappers): parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: merge_outputs="export" pickone_outputs="genomesize" # Designates outputs to be merged, or selected from as a representative merge_outputs = parallel_settings.get("merge_outputs") if merge_outputs is None: merge_outputs = [] else: merge_outputs = [x.strip() for x in merge_outputs.split(",")] pickone_outputs = parallel_settings.get("pickone_outputs") if pickone_outputs is None: pickone_outputs = [] else: pickone_outputs = [x.strip() for x in pickone_outputs.split(",")] illegal_outputs = [x for x in merge_outputs if x in pickone_outputs] if len(illegal_outputs) > 0: return ("Tool file error", f"Outputs have conflicting parallelism attributes: {str(illegal_outputs)}") stdout = "" stderr = "" try: working_directory = job_wrapper.working_directory task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith("task_")] assert task_dirs, "Should be at least one sub-task!" # TODO: Output datasets can be very complex. This doesn't handle metadata files outputs = job_wrapper.job_io.get_output_hdas_and_fnames() output_paths = job_wrapper.job_io.get_output_fnames() pickone_done = [] task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith("task_")] task_dirs.sort(key=lambda x: int(x.split("task_")[-1])) for index, output in enumerate(outputs): output_file_name = str(output_paths[index]) # Use false_path if set, else real path. base_output_name = os.path.basename(output_file_name) if output in merge_outputs: output_type = outputs[output][0].datatype output_files = [os.path.join(dir, base_output_name) for dir in task_dirs] # Just include those files f in the output list for which the # file f exists; some files may not exist if a task fails. output_files = [f for f in output_files if os.path.exists(f)] if output_files: log.debug(f"files {output_files} ") if len(output_files) < len(task_dirs): log.debug( "merging only %i out of expected %i files for %s", len(output_files), len(task_dirs), output_file_name, ) output_type.merge(output_files, output_file_name) log.debug(f"merge finished: {output_file_name}") else: msg = f"nothing to merge for {output_file_name} (expected {len(task_dirs)} files)" log.debug(msg) stderr += f"{msg}\n" elif output in pickone_outputs: # just pick one of them if output not in pickone_done: task_file_name = os.path.join(task_dirs[0], base_output_name) shutil.move(task_file_name, output_file_name) pickone_done.append(output) else: log_error = f"The output '{output}' does not define a method for implementing parallelism" log.exception(log_error) raise Exception(log_error) except Exception as e: stdout = "Error merging files" log.exception(stdout) stderr = util.unicodify(e) for tw in task_wrappers: # Prevent repetitive output, e.g. "Sequence File Aligned"x20 # Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks. out = tw.get_task().stdout.strip() err = tw.get_task().stderr.strip() if len(out) > 0: stdout += f"\n{tw.working_directory}:\n{out}" if len(err) > 0: stderr += f"\n{tw.working_directory}:\n{err}" return (stdout, stderr)