Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.deferred.manual_data_transfer

"""
Generic module for managing manual data transfer jobs using Galaxy's built-in file browser.
This module can be used by various external services that are configured to transfer data manually.
"""
import logging

from .data_transfer import DataTransfer

log = logging.getLogger(__name__)

__all__ = ('ManualDataTransferPlugin', )


[docs]class ManualDataTransferPlugin(DataTransfer):
[docs] def __init__(self, app): super(ManualDataTransferPlugin, self).__init__(app)
[docs] def create_job(self, trans, **kwd): if 'sample' in kwd and 'sample_datasets' in kwd and 'external_service' in kwd and 'external_service_type' in kwd: sample = kwd['sample'] sample_datasets = kwd['sample_datasets'] external_service = kwd['external_service'] external_service_type = kwd['external_service_type'] # TODO: is there a better way to store the protocol? protocol = next(iter(external_service_type.data_transfer.keys())) host = external_service.form_values.content['host'] user_name = external_service.form_values.content['user_name'] password = external_service.form_values.content['password'] # TODO: In the future, we may want to implement a way for the user to associate a selected file with one of # the run outputs configured in the <run_details><results> section of the external service config file. The # following was a first pass at implementing something (the datatype was included in the sample_dataset_dict), # but without a way for the user to associate stuff it's useless. However, allowing the user this ability may # open a can of worms, so maybe we shouldn't do it??? # # for run_result_file_name, run_result_file_datatype in external_service_type.run_details[ 'results' ].items(): # # external_service_type.run_details[ 'results' ] looks something like: {'dataset1_name': 'dataset1_datatype'} # if run_result_file_datatype in external_service.form_values.content: # datatype = external_service.form_values.content[ run_result_file_datatype ] # # When the transfer is automatic (the process used in the SMRT Portal plugin), the datasets and datatypes # can be matched up to those configured in the <run_details><results> settings in the external service type config # (e.g., pacific_biosciences_smrt_portal.xml). However, that's a bit trickier here since the user is manually # selecting files for transfer. sample_datasets_dict = {} for sample_dataset in sample_datasets: sample_dataset_id = sample_dataset.id sample_dataset_dict = dict(sample_id=sample_dataset.sample.id, name=sample_dataset.name, file_path=sample_dataset.file_path, status=sample_dataset.status, error_msg=sample_dataset.error_msg, size=sample_dataset.size, external_service_id=sample_dataset.external_service.id) sample_datasets_dict[sample_dataset_id] = sample_dataset_dict params = {'type' : 'init_transfer', 'sample_id' : sample.id, 'sample_datasets_dict' : sample_datasets_dict, 'protocol' : protocol, 'host' : host, 'user_name' : user_name, 'password' : password} elif 'transfer_job_id' in kwd: params = {'type' : 'finish_transfer', 'protocol' : kwd['result']['protocol'], 'sample_id' : kwd['sample_id'], 'result' : kwd['result'], 'transfer_job_id' : kwd['transfer_job_id']} else: log.error('No job was created because kwd does not include "samples" and "sample_datasets" or "transfer_job_id".') return deferred_job = self.app.model.DeferredJob(state=self.app.model.DeferredJob.states.NEW, plugin='ManualDataTransferPlugin', params=params) self.sa_session.add(deferred_job) self.sa_session.flush() log.debug('Created a deferred job in the ManualDataTransferPlugin of type: %s' % params['type'])
# TODO: error reporting to caller (if possible?)
[docs] def check_job(self, job): if self._missing_params(job.params, ['type']): return self.job_states.INVALID if job.params['type'] == 'init_transfer': if job.params['protocol'] in ['http', 'https']: raise Exception("Manual data transfer is not yet supported for http(s).") elif job.params['protocol'] == 'scp': if self._missing_params(job.params, ['protocol', 'host', 'user_name', 'password', 'sample_id', 'sample_datasets_dict']): return self.job_states.INVALID # TODO: what kind of checks do we need here? return self.job_states.READY return self.job_states.WAIT if job.params['type'] == 'finish_transfer': if self._missing_params(job.params, ['transfer_job_id']): return self.job_states.INVALID # Get the TransferJob object and add it to the DeferredJob so we only look it up once. if not hasattr(job, 'transfer_job'): job.transfer_job = self.sa_session.query(self.app.model.TransferJob).get(int(job.params['transfer_job_id'])) state = self.app.transfer_manager.get_state(job.transfer_job) if not state: log.error('No state for transfer job id: %s' % job.transfer_job.id) return self.job_states.WAIT if state['state'] in self.app.model.TransferJob.terminal_states: return self.job_states.READY log.debug("Checked on finish transfer job %s, not done yet." % job.id) return self.job_states.WAIT else: log.error('Unknown job type for ManualDataTransferPlugin: %s' % str(job.params['type'])) return self.job_states.INVALID