Source code for galaxy.jobs.splitters.multi

import logging
import os
import shutil

from galaxy import (
    model,
    util,
)

log = logging.getLogger(__name__)


[docs]def do_split(job_wrapper): parent_job = job_wrapper.get_job() working_directory = os.path.abspath(job_wrapper.working_directory) parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: split_inputs="input1,input2" shared_inputs="genome" # Designates inputs to be split or shared split_inputs = parallel_settings.get("split_inputs") if split_inputs is None: split_inputs = [] else: split_inputs = [x.strip() for x in split_inputs.split(",")] shared_inputs = parallel_settings.get("shared_inputs") auto_shared_inputs = False if shared_inputs is None: shared_inputs = [] auto_shared_inputs = True else: shared_inputs = [x.strip() for x in shared_inputs.split(",")] illegal_inputs = [x for x in shared_inputs if x in split_inputs] if len(illegal_inputs) > 0: raise Exception(f"Inputs have conflicting parallelism attributes: {str(illegal_inputs)}") subdir_index = [0] # use a list to get around Python 2.x lame closure support task_dirs = [] def get_new_working_directory_name(): dir = os.path.join(working_directory, "task_%d" % subdir_index[0]) subdir_index[0] = subdir_index[0] + 1 if not os.path.exists(dir): os.makedirs(dir) task_dirs.append(dir) return dir # For things like paired end alignment, we need two inputs to be split. Since all inputs to all # derived subtasks need to be correlated, allow only one input type to be split # If shared_inputs are not specified, we assume all non-split inputs are shared inputs. # For any input we must consider if each input is None. With optional arguments, a data input could be set to optional type_to_input_map = {} for input in parent_job.input_datasets: if input.dataset is None: if input.name in shared_inputs: shared_inputs.remove(input.name) else: pass else: if input.name in split_inputs: type_to_input_map.setdefault(input.dataset.datatype, []).append(input.name) elif input.name in shared_inputs: pass # pass original file name elif auto_shared_inputs: shared_inputs.append(input.name) else: log_error = f"The input '{str(input.name)}' does not define a method for implementing parallelism" log.exception(log_error) raise Exception(log_error) if len(type_to_input_map) > 1: log_error = "The multi splitter does not support splitting inputs of more than one type" log.error(log_error) raise Exception(log_error) # split the first one to build up the task directories input_datasets = [] for input in parent_job.input_datasets: if input.name in split_inputs: this_input_files = job_wrapper.job_io.get_input_dataset_fnames(input.dataset) if len(this_input_files) > 1: log_error = f"The input '{str(input.name)}' is composed of multiple files - splitting is not allowed" log.error(log_error) raise Exception(log_error) input_datasets.append(input.dataset) input_type = next(iter(type_to_input_map.keys())) # DBTODO execute an external task to do the splitting, this should happen at refactor. # If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status. try: input_type.split(input_datasets, get_new_working_directory_name, parallel_settings) except AttributeError: log_error = f"The type '{str(input_type)}' does not define a method for splitting files" log.error(log_error) raise log.debug("do_split created %d parts" % len(task_dirs)) # next, after we know how many divisions there are, add the shared inputs via soft links for input in parent_job.input_datasets: if input and input.name in shared_inputs: names = job_wrapper.job_io.get_input_dataset_fnames(input.dataset) for dir in task_dirs: for file in names: os.symlink(file, os.path.join(dir, os.path.basename(file))) tasks = [] prepare_files = f"'{os.path.join(util.galaxy_directory(), 'extract_dataset_parts.sh')}' %s" for dir in task_dirs: task = model.Task(parent_job, dir, prepare_files % dir) tasks.append(task) return tasks
[docs]def do_merge(job_wrapper, task_wrappers): parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: merge_outputs="export" pickone_outputs="genomesize" # Designates outputs to be merged, or selected from as a representative merge_outputs = parallel_settings.get("merge_outputs") if merge_outputs is None: merge_outputs = [] else: merge_outputs = [x.strip() for x in merge_outputs.split(",")] pickone_outputs = parallel_settings.get("pickone_outputs") if pickone_outputs is None: pickone_outputs = [] else: pickone_outputs = [x.strip() for x in pickone_outputs.split(",")] illegal_outputs = [x for x in merge_outputs if x in pickone_outputs] if len(illegal_outputs) > 0: return ("Tool file error", f"Outputs have conflicting parallelism attributes: {str(illegal_outputs)}") stdout = "" stderr = "" try: working_directory = job_wrapper.working_directory task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith("task_")] assert task_dirs, "Should be at least one sub-task!" # TODO: Output datasets can be very complex. This doesn't handle metadata files outputs = job_wrapper.job_io.get_output_hdas_and_fnames() output_paths = job_wrapper.job_io.get_output_fnames() pickone_done = [] task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith("task_")] task_dirs.sort(key=lambda x: int(x.split("task_")[-1])) for index, output in enumerate(outputs): output_file_name = str(output_paths[index]) # Use false_path if set, else real path. base_output_name = os.path.basename(output_file_name) if output in merge_outputs: output_type = outputs[output][0].datatype output_files = [os.path.join(dir, base_output_name) for dir in task_dirs] # Just include those files f in the output list for which the # file f exists; some files may not exist if a task fails. output_files = [f for f in output_files if os.path.exists(f)] if output_files: log.debug(f"files {output_files} ") if len(output_files) < len(task_dirs): log.debug( "merging only %i out of expected %i files for %s" % (len(output_files), len(task_dirs), output_file_name) ) output_type.merge(output_files, output_file_name) log.debug(f"merge finished: {output_file_name}") else: msg = "nothing to merge for %s (expected %i files)" % (output_file_name, len(task_dirs)) log.debug(msg) stderr += f"{msg}\n" elif output in pickone_outputs: # just pick one of them if output not in pickone_done: task_file_name = os.path.join(task_dirs[0], base_output_name) shutil.move(task_file_name, output_file_name) pickone_done.append(output) else: log_error = f"The output '{output}' does not define a method for implementing parallelism" log.exception(log_error) raise Exception(log_error) except Exception as e: stdout = "Error merging files" log.exception(stdout) stderr = util.unicodify(e) for tw in task_wrappers: # Prevent repetitive output, e.g. "Sequence File Aligned"x20 # Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks. out = tw.get_task().stdout.strip() err = tw.get_task().stderr.strip() if len(out) > 0: stdout += f"\n{tw.working_directory}:\n{out}" if len(err) > 0: stderr += f"\n{tw.working_directory}:\n{err}" return (stdout, stderr)