# Contains actions that are used in External Services
import logging
import requests
from galaxy.web import url_for
from galaxy.util.template import fill_template
from result_handlers.basic import ExternalServiceActionResultHandler

log = logging.getLogger(__name__)

[docs]class PopulatedExternalServiceAction(object):
[docs] def __init__(self, action, param_dict): self.action = action self.param_dict = param_dict self.result = None self.handled_results = None
def __getattr__(self, name): return getattr(self.action, name)
[docs] def perform_action(self): if self.result is None: self.result = self.action.perform_action(self.param_dict) return self.result
[docs] def handle_results(self, trans): if self.result is None: self.perform_action() if self.handled_results is None: self.handled_results = self.action.handle_action(self.result, self.param_dict, trans) return self.handled_results
[docs]class ExternalServiceAction(object): """ Abstract Class for External Service Actions """ type = None
[docs] @classmethod def from_elem(cls, elem, parent): action_type = elem.get('type', None) assert action_type, 'ExternalServiceAction requires a type' return action_type_to_class[action_type](elem, parent)
[docs] def __init__(self, elem, parent): self.name = elem.get('name', None) assert self.name, 'ExternalServiceAction requires a name' self.label = elem.get('label', self.name) self.parent = parent self.result_handlers = [] for handler in elem.findall('result_handler'): self.result_handlers.append(ExternalServiceActionResultHandler.from_elem(handler, self))
def __action_url_id(self, param_dict): rval = self.name parent = self.parent while hasattr(parent.parent, 'parent'): rval = "%s|%s" % (parent.name, rval) parent = parent.parent rval = "%s|%s" % (param_dict['service_instance'].id, rval) return rval
[docs] def populate_action(self, param_dict): return PopulatedExternalServiceAction(self, param_dict)
[docs] def handle_action(self, completed_action, param_dict, trans): handled_results = [] for handled_result in self.result_handlers: handled_results.append(handled_result.handle_result(completed_action, param_dict, trans)) return handled_results
[docs] def perform_action(self, param_dict): raise Exception('Abstract Method')
[docs]class ExternalServiceResult(object):
[docs] def __init__(self, name, param_dict): self.name = name self.param_dict = param_dict
@property def content(self): raise Exception('Abstract Method')
[docs]class ExternalServiceWebAPIActionResult(ExternalServiceResult):
[docs] def __init__(self, name, param_dict, url, method, target): # display_handler = None ): ExternalServiceResult.__init__(self, name, param_dict) self.url = url self.method = method self.target = target self._content = None
@property def content(self): if self._content is None: self._content = requests.get(self.url).text return self._content
[docs]class ExternalServiceValueResult(ExternalServiceResult):
[docs] def __init__(self, name, param_dict, value): self.name = name self.param_dict = param_dict self.value = value
@property def content(self): return self.value
[docs]class ExternalServiceWebAPIAction(ExternalServiceAction): """ Action that accesses an external Web API and provides handlers for the requested content """ type = 'web_api'
[docs] class ExternalServiceWebAPIActionRequest(object):
[docs] def __init__(self, elem, parent): self.target = elem.get('target', '_blank') self.method = elem.get('method', 'post') self.parent = parent self.url = Template(elem.find('url'), parent)
[docs] def get_web_api_action(self, param_dict): name = self.parent.name target = self.target method = self.method url = self.url.build_template(param_dict).strip() return ExternalServiceWebAPIActionResult(name, param_dict, url, method, target)
[docs] def __init__(self, elem, parent): ExternalServiceAction.__init__(self, elem, parent) self.web_api_request = self.ExternalServiceWebAPIActionRequest(elem.find('request'), parent)
[docs] def perform_action(self, param_dict): return self.web_api_request.get_web_api_action(param_dict)
[docs]class ExternalServiceWebAction(ExternalServiceAction): """ Action that accesses an external web application """ type = 'web'
[docs] def __init__(self, elem, parent): ExternalServiceAction.__init__(self, elem, parent) self.request_elem = elem.find('request') self.url = Template(self.request_elem.find('url'), parent) self.target = self.request_elem.get('target', '_blank') self.method = self.request_elem.get('method', 'get')
[docs]class ExternalServiceTemplateAction(ExternalServiceAction): """ Action that redirects to an external URL """ type = 'template'
[docs] def __init__(self, elem, parent): ExternalServiceAction.__init__(self, elem, parent) self.template = Template(elem.find('template'), parent)
[docs] def perform_action(self, param_dict): return ExternalServiceValueResult(self.name, param_dict, self.template.build_template(param_dict))
action_type_to_class = { ExternalServiceWebAction.type: ExternalServiceWebAction, ExternalServiceWebAPIAction.type: ExternalServiceWebAPIAction, ExternalServiceTemplateAction.type: ExternalServiceTemplateAction} # utility classes
[docs]class Template(object):
[docs] def __init__(self, elem, parent): self.text = elem.text self.parent = parent
[docs] def build_template(self, param_dict): template = fill_template(self.text, context=param_dict) return template