Source code for galaxy.jobs.deferred.manual_data_transfer

"""
Generic module for managing manual data transfer jobs using Galaxy's built-in file browser.
This module can be used by various external services that are configured to transfer data manually.
"""
import logging

from .data_transfer import DataTransfer

log = logging.getLogger(__name__)

__all__ = ('ManualDataTransferPlugin', )


[docs]class ManualDataTransferPlugin(DataTransfer):
[docs] def __init__(self, app): super(ManualDataTransferPlugin, self).__init__(app)
[docs] def create_job(self, trans, **kwd): if 'sample' in kwd and 'sample_datasets' in kwd and 'external_service' in kwd and 'external_service_type' in kwd: sample = kwd['sample'] sample_datasets = kwd['sample_datasets'] external_service = kwd['external_service'] external_service_type = kwd['external_service_type'] # TODO: is there a better way to store the protocol? protocol = next(iter(external_service_type.data_transfer.keys())) host = external_service.form_values.content['host'] user_name = external_service.form_values.content['user_name'] password = external_service.form_values.content['password'] # TODO: In the future, we may want to implement a way for the user to associate a selected file with one of # the run outputs configured in the <run_details><results> section of the external service config file. The # following was a first pass at implementing something (the datatype was included in the sample_dataset_dict), # but without a way for the user to associate stuff it's useless. However, allowing the user this ability may # open a can of worms, so maybe we shouldn't do it??? # # for run_result_file_name, run_result_file_datatype in external_service_type.run_details[ 'results' ].items(): # # external_service_type.run_details[ 'results' ] looks something like: {'dataset1_name': 'dataset1_datatype'} # if run_result_file_datatype in external_service.form_values.content: # datatype = external_service.form_values.content[ run_result_file_datatype ] # # When the transfer is automatic (the process used in the SMRT Portal plugin), the datasets and datatypes # can be matched up to those configured in the <run_details><results> settings in the external service type config # (e.g., pacific_biosciences_smrt_portal.xml). However, that's a bit trickier here since the user is manually # selecting files for transfer. sample_datasets_dict = {} for sample_dataset in sample_datasets: sample_dataset_id = sample_dataset.id sample_dataset_dict = dict(sample_id=sample_dataset.sample.id, name=sample_dataset.name, file_path=sample_dataset.file_path, status=sample_dataset.status, error_msg=sample_dataset.error_msg, size=sample_dataset.size, external_service_id=sample_dataset.external_service.id) sample_datasets_dict[sample_dataset_id] = sample_dataset_dict params = {'type' : 'init_transfer', 'sample_id' : sample.id, 'sample_datasets_dict' : sample_datasets_dict, 'protocol' : protocol, 'host' : host, 'user_name' : user_name, 'password' : password} elif 'transfer_job_id' in kwd: params = {'type' : 'finish_transfer', 'protocol' : kwd['result']['protocol'], 'sample_id' : kwd['sample_id'], 'result' : kwd['result'], 'transfer_job_id' : kwd['transfer_job_id']} else: log.error('No job was created because kwd does not include "samples" and "sample_datasets" or "transfer_job_id".') return deferred_job = self.app.model.DeferredJob(state=self.app.model.DeferredJob.states.NEW, plugin='ManualDataTransferPlugin', params=params) self.sa_session.add(deferred_job) self.sa_session.flush() log.debug('Created a deferred job in the ManualDataTransferPlugin of type: %s' % params['type'])
# TODO: error reporting to caller (if possible?)
[docs] def check_job(self, job): if self._missing_params(job.params, ['type']): return self.job_states.INVALID if job.params['type'] == 'init_transfer': if job.params['protocol'] in ['http', 'https']: raise Exception("Manual data transfer is not yet supported for http(s).") elif job.params['protocol'] == 'scp': if self._missing_params(job.params, ['protocol', 'host', 'user_name', 'password', 'sample_id', 'sample_datasets_dict']): return self.job_states.INVALID # TODO: what kind of checks do we need here? return self.job_states.READY return self.job_states.WAIT if job.params['type'] == 'finish_transfer': if self._missing_params(job.params, ['transfer_job_id']): return self.job_states.INVALID # Get the TransferJob object and add it to the DeferredJob so we only look it up once. if not hasattr(job, 'transfer_job'): job.transfer_job = self.sa_session.query(self.app.model.TransferJob).get(int(job.params['transfer_job_id'])) state = self.app.transfer_manager.get_state(job.transfer_job) if not state: log.error('No state for transfer job id: %s' % job.transfer_job.id) return self.job_states.WAIT if state['state'] in self.app.model.TransferJob.terminal_states: return self.job_states.READY log.debug("Checked on finish transfer job %s, not done yet." % job.id) return self.job_states.WAIT else: log.error('Unknown job type for ManualDataTransferPlugin: %s' % str(job.params['type'])) return self.job_states.INVALID