Source code for galaxy.jobs.deferred.data_transfer

"""
Module for managing data transfer jobs.
"""
import logging
import re
import shutil

from sqlalchemy import and_

from galaxy.datatypes import sniff
from galaxy.jobs.actions.post import ActionBox
from galaxy.jobs.deferred import FakeTrans
from galaxy.tools.parameters import visit_input_values
from galaxy.tools.parameters.basic import DataToolParameter
from galaxy.util.odict import odict
from galaxy.workflow.modules import module_factory


log = logging.getLogger(__name__)

__all__ = ('DataTransfer', )


[docs]class DataTransfer(object): check_interval = 15 dataset_name_re = re.compile('(dataset\d+)_(name)') dataset_datatype_re = re.compile('(dataset\d+)_(datatype)')
[docs] def __init__(self, app): self.app = app self.sa_session = app.model.context.current
[docs] def create_job(self, trans, **kwd): raise Exception("Unimplemented Method")
[docs] def check_job(self, job): raise Exception("Unimplemented Method")
[docs] def run_job(self, job): if job.params['type'] == 'init_transfer': # TODO: don't create new downloads on restart. if job.params['protocol'] in ['http', 'https']: results = [] for result in job.params['results'].values(): result['transfer_job'] = self.app.transfer_manager.new(protocol=job.params['protocol'], name=result['name'], datatype=result['datatype'], url=result['url']) results.append(result) elif job.params['protocol'] == 'scp': results = [] result = {} sample_datasets_dict = job.params['sample_datasets_dict'] # sample_datasets_dict looks something like the following. The outer dictionary keys are SampleDataset ids. # {'7': {'status': 'Not started', 'name': '3.bed', 'file_path': '/tmp/library/3.bed', 'sample_id': 7, # 'external_service_id': 2, 'error_msg': '', 'size': '8.0K'}} for sample_dataset_id, sample_dataset_info_dict in sample_datasets_dict.items(): result = {} result['transfer_job'] = self.app.transfer_manager.new(protocol=job.params['protocol'], host=job.params['host'], user_name=job.params['user_name'], password=job.params['password'], sample_dataset_id=sample_dataset_id, status=sample_dataset_info_dict['status'], name=sample_dataset_info_dict['name'], file_path=sample_dataset_info_dict['file_path'], sample_id=sample_dataset_info_dict['sample_id'], external_service_id=sample_dataset_info_dict['external_service_id'], error_msg=sample_dataset_info_dict['error_msg'], size=sample_dataset_info_dict['size']) results.append(result) self.app.transfer_manager.run([r['transfer_job'] for r in results]) for result in results: transfer_job = result.pop('transfer_job') self.create_job(None, transfer_job_id=transfer_job.id, result=transfer_job.params, sample_id=job.params['sample_id']) # Update the state of the relevant SampleDataset new_status = self.app.model.SampleDataset.transfer_status.IN_QUEUE self._update_sample_dataset_status(protocol=job.params['protocol'], sample_id=job.params['sample_id'], result_dict=transfer_job.params, new_status=new_status, error_msg='') job.state = self.app.model.DeferredJob.states.OK self.sa_session.add(job) self.sa_session.flush() # TODO: Error handling: failure executing, or errors returned from the manager if job.params['type'] == 'finish_transfer': protocol = job.params['protocol'] # Update the state of the relevant SampleDataset new_status = self.app.model.SampleDataset.transfer_status.ADD_TO_LIBRARY if protocol in ['http', 'https']: result_dict = job.params['result'] library_dataset_name = result_dict['name'] extension = result_dict['datatype'] elif protocol in ['scp']: # In this case, job.params will be a dictionary that contains a key named 'result'. The value # of the result key is a dictionary that looks something like: # {'sample_dataset_id': '8', 'status': 'Not started', 'protocol': 'scp', 'name': '3.bed', # 'file_path': '/data/library/3.bed', 'host': '127.0.0.1', 'sample_id': 8, 'external_service_id': 2, # 'local_path': '/tmp/kjl2Ss4', 'password': 'galaxy', 'user_name': 'gvk', 'error_msg': '', 'size': '8.0K'} try: tj = self.sa_session.query(self.app.model.TransferJob).get(int(job.params['transfer_job_id'])) result_dict = tj.params result_dict['local_path'] = tj.path except Exception as e: log.error("Updated transfer result unavailable, using old result. Error was: %s" % str(e)) result_dict = job.params['result'] library_dataset_name = result_dict['name'] # Determine the data format (see the relevant TODO item in the manual_data_transfer plugin).. extension = sniff.guess_ext(result_dict['local_path'], sniff_order=self.app.datatypes_registry.sniff_order) self._update_sample_dataset_status(protocol=job.params['protocol'], sample_id=int(job.params['sample_id']), result_dict=result_dict, new_status=new_status, error_msg='') sample = self.sa_session.query(self.app.model.Sample).get(int(job.params['sample_id'])) ld = self.app.model.LibraryDataset(folder=sample.folder, name=library_dataset_name) self.sa_session.add(ld) self.sa_session.flush() self.app.security_agent.copy_library_permissions(FakeTrans(self.app), sample.folder, ld) ldda = self.app.model.LibraryDatasetDatasetAssociation(name=library_dataset_name, extension=extension, dbkey='?', library_dataset=ld, create_dataset=True, sa_session=self.sa_session) ldda.message = 'Transferred by the Data Transfer Plugin' self.sa_session.add(ldda) self.sa_session.flush() ldda.state = ldda.states.QUEUED # flushed in the set property ld.library_dataset_dataset_association_id = ldda.id self.sa_session.add(ld) self.sa_session.flush() try: # Move the dataset from its temporary location shutil.move(job.transfer_job.path, ldda.file_name) ldda.init_meta() for name, spec in ldda.metadata.spec.items(): if name not in ['name', 'info', 'dbkey', 'base_name']: if spec.get('default'): setattr(ldda.metadata, name, spec.unwrap(spec.get('default'))) self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(self.app.datatypes_registry.set_external_metadata_tool, FakeTrans(self.app, history=sample.history, user=sample.request.user), incoming={'input1': ldda}) ldda.state = ldda.states.OK # TODO: not sure if this flush is necessary self.sa_session.add(ldda) self.sa_session.flush() except Exception as e: log.exception('Failure preparing library dataset for finished transfer job (id: %s) via deferred job (id: %s):' % (str(job.transfer_job.id), str(job.id))) ldda.state = ldda.states.ERROR if sample.workflow: log.debug("\n\nLogging sample mappings as: %s" % sample.workflow['mappings']) log.debug("job.params: %s" % job.params) # We have a workflow. Update all mappings to ldda's, and when the final one is done # execute_workflow with either the provided history, or a new one. sub_done = True rep_done = False for k, v in sample.workflow['mappings'].items(): if 'hda' not in v and v['ds_tag'].startswith('hi|'): sample.workflow['mappings'][k]['hda'] = self.app.security.decode_id(v['ds_tag'][3:]) for key, value in sample.workflow['mappings'].items(): if 'url' in value and value['url'] == job.params['result']['url']: # DBTODO Make sure all ds| mappings get the URL of the dataset, for linking to later. # If this dataset maps to what we just finished, update the ldda id in the sample. sample.workflow['mappings'][key]['ldda'] = ldda.id rep_done = True # DBTODO replace the hi| mappings with the hda here. Just rip off the first three chars. elif 'ldda' not in value and 'hda' not in value: # We're not done if some mappings still don't have ldda or hda mappings. sub_done = False if sub_done and rep_done: if not sample.history: new_history = self.app.model.History(name="New History From %s" % sample.name, user=sample.request.user) self.sa_session.add(new_history) sample.history = new_history self.sa_session.flush() self._execute_workflow(sample) # Check the workflow for substitution done-ness self.sa_session.add(sample) self.sa_session.flush() elif sample.history: # We don't have a workflow, but a history was provided. # No processing, go ahead and chunk everything in the history. if ldda.dataset.state in ['new', 'upload', 'queued', 'running', 'empty', 'discarded']: log.error("Cannot import dataset '%s' to user history since its state is '%s'. " % (ldda.name, ldda.dataset.state)) elif ldda.dataset.state in ['ok', 'error']: ldda.to_history_dataset_association(target_history=sample.history, add_to_history=True) # Finished job.state = self.app.model.DeferredJob.states.OK self.sa_session.add(job) self.sa_session.flush() # Update the state of the relevant SampleDataset new_status = self.app.model.SampleDataset.transfer_status.COMPLETE self._update_sample_dataset_status(protocol=job.params['protocol'], sample_id=int(job.params['sample_id']), result_dict=job.params['result'], new_status=new_status, error_msg='') if sample.datasets and not sample.untransferred_dataset_files: # Update the state of the sample to the sample's request type's final state. new_state = sample.request.type.final_sample_state self._update_sample_state(sample.id, new_state) # Update the state of the request, if possible self._update_request_state(sample.request.id)
def _missing_params(self, params, required_params): missing_params = [x for x in required_params if x not in params] if missing_params: log.error('Job parameters missing required keys: %s' % ', '.join(missing_params)) return True return False def _update_sample_dataset_status(self, protocol, sample_id, result_dict, new_status, error_msg=None): # result_dict looks something like: # {'url': '127.0.0.1/data/filtered_subreads.fa', 'name': 'Filtered reads'} # TODO: error checking on valid new_status value if protocol in ['http', 'https']: sample_dataset = self.sa_session.query(self.app.model.SampleDataset) \ .filter(and_(self.app.model.SampleDataset.table.c.sample_id == sample_id, self.app.model.SampleDataset.table.c.name == result_dict['name'], self.app.model.SampleDataset.table.c.file_path == result_dict['url'])) \ .first() elif protocol in ['scp']: sample_dataset = self.sa_session.query(self.app.model.SampleDataset).get(int(result_dict['sample_dataset_id'])) sample_dataset.status = new_status sample_dataset.error_msg = error_msg self.sa_session.add(sample_dataset) self.sa_session.flush() def _update_sample_state(self, sample_id, new_state, comment=None): sample = self.sa_session.query(self.app.model.Sample).get(sample_id) if comment is None: comment = 'Sample state set to %s' % str(new_state) event = self.app.model.SampleEvent(sample, new_state, comment) self.sa_session.add(event) self.sa_session.flush() def _update_request_state(self, request_id): request = self.sa_session.query(self.app.model.Request).get(request_id) # Make sure all the samples of the current request have the same state common_state = request.samples_have_common_state if not common_state: # If the current request state is complete and one of its samples moved from # the final sample state, then move the request state to In-progress if request.is_complete: message = "At least 1 sample state moved from the final sample state, so now the request's state is (%s)" % request.states.SUBMITTED event = self.app.model.RequestEvent(request, request.states.SUBMITTED, message) self.sa_session.add(event) self.sa_session.flush() else: request_type_state = request.type.final_sample_state if common_state.id == request_type_state.id: # Since all the samples are in the final state, change the request state to 'Complete' comment = "All samples of this sequencing request are in the final sample state (%s). " % request_type_state.name state = request.states.COMPLETE else: comment = "All samples of this sequencing request are in the (%s) sample state. " % common_state.name state = request.states.SUBMITTED event = self.app.model.RequestEvent(request, state, comment) self.sa_session.add(event) self.sa_session.flush() # TODO: handle email notification if it is configured to be sent when the samples are in this state. def _execute_workflow(self, sample): for key, value in sample.workflow['mappings'].items(): if 'hda' not in value and 'ldda' in value: # If HDA is already here, it's an external input, we're not copying anything. ldda = self.sa_session.query(self.app.model.LibraryDatasetDatasetAssociation).get(value['ldda']) if ldda.dataset.state in ['new', 'upload', 'queued', 'running', 'empty', 'discarded']: log.error("Cannot import dataset '%s' to user history since its state is '%s'. " % (ldda.name, ldda.dataset.state)) elif ldda.dataset.state in ['ok', 'error']: hda = ldda.to_history_dataset_association(target_history=sample.history, add_to_history=True) sample.workflow['mappings'][key]['hda'] = hda.id self.sa_session.add(sample) self.sa_session.flush() workflow_dict = sample.workflow import copy new_wf_dict = copy.deepcopy(workflow_dict) for key in workflow_dict['mappings']: if not isinstance(key, int): new_wf_dict['mappings'][int(key)] = workflow_dict['mappings'][key] workflow_dict = new_wf_dict fk_trans = FakeTrans(self.app, history=sample.history, user=sample.request.user) workflow = self.sa_session.query(self.app.model.Workflow).get(workflow_dict['id']) if not workflow: log.error("Workflow mapping failure.") return if len(workflow.steps) == 0: log.error("Workflow cannot be run because it does not have any steps") return if workflow.has_cycles: log.error("Workflow cannot be run because it contains cycles") return if workflow.has_errors: log.error("Workflow cannot be run because of validation errors in some steps") return # Build the state for each step errors = {} # Build a fake dictionary prior to execution. # Prepare each step for step in workflow.steps: step.upgrade_messages = {} # Contruct modules if step.type == 'tool' or step.type is None: # Restore the tool state for the step step.module = module_factory.from_workflow_step(fk_trans, step) # Fix any missing parameters step.upgrade_messages = step.module.check_and_update_state() # Any connected input needs to have value DummyDataset (these # are not persisted so we need to do it every time) step.module.add_dummy_datasets(connections=step.input_connections) # Store state with the step step.state = step.module.state # Error dict if step.tool_errors: errors[step.id] = step.tool_errors else: # Non-tool specific stuff? step.module = module_factory.from_workflow_step(fk_trans, step) step.state = step.module.get_runtime_state() # Connections by input name step.input_connections_by_name = dict((conn.input_name, conn) for conn in step.input_connections) for step in workflow.steps: step.upgrade_messages = {} # Connections by input name step.input_connections_by_name = \ dict((conn.input_name, conn) for conn in step.input_connections) # Extract just the arguments for this step by prefix step_errors = None if step.type == 'tool' or step.type is None: module = module_factory.from_workflow_step(fk_trans, step) # Fix any missing parameters step.upgrade_messages = module.check_and_update_state() # Any connected input needs to have value DummyDataset (these # are not persisted so we need to do it every time) module.add_dummy_datasets(connections=step.input_connections) # Get the tool tool = module.tool # Get the state step.state = state = module.state if step_errors: errors[step.id] = state.inputs["__errors__"] = step_errors # Run each step, connecting outputs to inputs workflow_invocation = self.app.model.WorkflowInvocation() workflow_invocation.workflow = workflow outputs = odict() for i, step in enumerate(workflow.steps): job = None if step.type == 'tool' or step.type is None: tool = self.app.toolbox.get_tool(step.tool_id) def callback(input, prefixed_name, **kwargs): if isinstance(input, DataToolParameter): if prefixed_name in step.input_connections_by_name: conn = step.input_connections_by_name[prefixed_name] return outputs[conn.output_step.id][conn.output_name] visit_input_values(tool.inputs, step.state.inputs, callback) job, out_data = tool.execute(fk_trans, step.state.inputs, history=sample.history) outputs[step.id] = out_data for pja in step.post_job_actions: if pja.action_type in ActionBox.immediate_actions: ActionBox.execute(self.app, self.sa_session, pja, job, replacement_dict=None) else: job.add_post_job_action(pja) else: job, out_data = step.module.execute(fk_trans, step.state) outputs[step.id] = out_data if step.id in workflow_dict['mappings']: data = self.sa_session.query(self.app.model.HistoryDatasetAssociation).get(workflow_dict['mappings'][str(step.id)]['hda']) outputs[step.id]['output'] = data workflow_invocation_step = self.app.model.WorkflowInvocationStep() workflow_invocation_step.workflow_invocation = workflow_invocation workflow_invocation_step.workflow_step = step workflow_invocation_step.job = job self.sa_session.add(workflow_invocation) self.sa_session.flush()