Source code for galaxy.jobs.splitters.multi

import logging
import os
import shutil

from galaxy import model, util
from galaxy.util.getargspec import getfullargspec


log = logging.getLogger(__name__)


[docs]def do_split(job_wrapper): parent_job = job_wrapper.get_job() working_directory = os.path.abspath(job_wrapper.working_directory) parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: split_inputs="input1,input2" shared_inputs="genome" # Designates inputs to be split or shared split_inputs = parallel_settings.get("split_inputs") if split_inputs is None: split_inputs = [] else: split_inputs = [x.strip() for x in split_inputs.split(",")] shared_inputs = parallel_settings.get("shared_inputs") auto_shared_inputs = False if shared_inputs is None: shared_inputs = [] auto_shared_inputs = True else: shared_inputs = [x.strip() for x in shared_inputs.split(",")] illegal_inputs = [x for x in shared_inputs if x in split_inputs] if len(illegal_inputs) > 0: raise Exception("Inputs have conflicting parallelism attributes: %s" % str(illegal_inputs)) subdir_index = [0] # use a list to get around Python 2.x lame closure support task_dirs = [] def get_new_working_directory_name(): dir = os.path.join(working_directory, 'task_%d' % subdir_index[0]) subdir_index[0] = subdir_index[0] + 1 if not os.path.exists(dir): os.makedirs(dir) task_dirs.append(dir) return dir # For things like paired end alignment, we need two inputs to be split. Since all inputs to all # derived subtasks need to be correlated, allow only one input type to be split # If shared_inputs are not specified, we assume all non-split inputs are shared inputs. # For any input we must consider if each input is None. With optional arguments, a data input could be set to optional type_to_input_map = {} for input in parent_job.input_datasets: if input.dataset is None: if input.name in shared_inputs: shared_inputs.remove(input.name) else: pass else: if input.name in split_inputs: type_to_input_map.setdefault(input.dataset.datatype, []).append(input.name) elif input.name in shared_inputs: pass # pass original file name elif auto_shared_inputs: shared_inputs.append(input.name) else: log_error = "The input '%s' does not define a method for implementing parallelism" % str(input.name) log.exception(log_error) raise Exception(log_error) if len(type_to_input_map) > 1: log_error = "The multi splitter does not support splitting inputs of more than one type" log.error(log_error) raise Exception(log_error) # split the first one to build up the task directories input_datasets = [] for input in parent_job.input_datasets: if input.name in split_inputs: this_input_files = job_wrapper.get_input_dataset_fnames(input.dataset) if len(this_input_files) > 1: log_error = "The input '%s' is composed of multiple files - splitting is not allowed" % str(input.name) log.error(log_error) raise Exception(log_error) input_datasets.append(input.dataset) input_type = next(iter(type_to_input_map.keys())) # DBTODO execute an external task to do the splitting, this should happen at refactor. # If the number of tasks is sufficiently high, we can use it to calculate job completion % and give a running status. try: input_type.split(input_datasets, get_new_working_directory_name, parallel_settings) except AttributeError: log_error = "The type '%s' does not define a method for splitting files" % str(input_type) log.error(log_error) raise log.debug('do_split created %d parts' % len(task_dirs)) # next, after we know how many divisions there are, add the shared inputs via soft links for input in parent_job.input_datasets: if input and input.name in shared_inputs: names = job_wrapper.get_input_dataset_fnames(input.dataset) for dir in task_dirs: for file in names: os.symlink(file, os.path.join(dir, os.path.basename(file))) tasks = [] prepare_files = os.path.join(util.galaxy_directory(), 'extract_dataset_parts.sh') + ' %s' for dir in task_dirs: task = model.Task(parent_job, dir, prepare_files % dir) tasks.append(task) return tasks
[docs]def do_merge(job_wrapper, task_wrappers): parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: merge_outputs="export" pickone_outputs="genomesize" # Designates outputs to be merged, or selected from as a representative merge_outputs = parallel_settings.get("merge_outputs") if merge_outputs is None: merge_outputs = [] else: merge_outputs = [x.strip() for x in merge_outputs.split(",")] pickone_outputs = parallel_settings.get("pickone_outputs") if pickone_outputs is None: pickone_outputs = [] else: pickone_outputs = [x.strip() for x in pickone_outputs.split(",")] illegal_outputs = [x for x in merge_outputs if x in pickone_outputs] if len(illegal_outputs) > 0: return ('Tool file error', 'Outputs have conflicting parallelism attributes: %s' % str(illegal_outputs)) stdout = '' stderr = '' try: working_directory = job_wrapper.working_directory task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')] assert task_dirs, "Should be at least one sub-task!" # TODO: Output datasets can be very complex. This doesn't handle metadata files outputs = job_wrapper.get_output_hdas_and_fnames() output_paths = job_wrapper.get_output_fnames() pickone_done = [] task_dirs = [os.path.join(working_directory, x) for x in os.listdir(working_directory) if x.startswith('task_')] task_dirs.sort(key=lambda x: int(x.split('task_')[-1])) for index, output in enumerate(outputs): output_file_name = str(output_paths[index]) # Use false_path if set, else real path. base_output_name = os.path.basename(output_file_name) if output in merge_outputs: output_dataset = outputs[output][0] output_type = output_dataset.datatype output_files = [os.path.join(dir, base_output_name) for dir in task_dirs] # Just include those files f in the output list for which the # file f exists; some files may not exist if a task fails. output_files = [f for f in output_files if os.path.exists(f)] if output_files: log.debug('files %s ' % output_files) if len(output_files) < len(task_dirs): log.debug('merging only %i out of expected %i files for %s' % (len(output_files), len(task_dirs), output_file_name)) # First two args to merge always output_files and path of dataset. More # complicated merge methods may require more parameters. Set those up here. extra_merge_arg_names = getfullargspec(output_type.merge).args[2:] extra_merge_args = {} if "output_dataset" in extra_merge_arg_names: extra_merge_args["output_dataset"] = output_dataset output_type.merge(output_files, output_file_name, **extra_merge_args) log.debug('merge finished: %s' % output_file_name) else: msg = 'nothing to merge for %s (expected %i files)' \ % (output_file_name, len(task_dirs)) log.debug(msg) stderr += msg + "\n" elif output in pickone_outputs: # just pick one of them if output not in pickone_done: task_file_name = os.path.join(task_dirs[0], base_output_name) shutil.move(task_file_name, output_file_name) pickone_done.append(output) else: log_error = "The output '%s' does not define a method for implementing parallelism" % output log.exception(log_error) raise Exception(log_error) except Exception as e: stdout = 'Error merging files' log.exception(stdout) stderr = util.unicodify(e) for tw in task_wrappers: # Prevent repetitive output, e.g. "Sequence File Aligned"x20 # Eventually do a reduce for jobs that output "N reads mapped", combining all N for tasks. out = tw.get_task().stdout.strip() err = tw.get_task().stderr.strip() if len(out) > 0: stdout += "\n" + tw.working_directory + ':\n' + out if len(err) > 0: stderr += "\n" + tw.working_directory + ':\n' + err return (stdout, stderr)