Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.cli
"""
"""
import json
from glob import glob
from os import getcwd
from os.path import (
basename,
join
)
DEFAULT_SHELL_PLUGIN = 'LocalShell'
ERROR_MESSAGE_NO_JOB_PLUGIN = "No job plugin parameter found, cannot create CLI job interface"
ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN = "Failed to find job_plugin of type %s, available types include %s"
[docs]class CliInterface(object):
"""
High-level interface for loading shell and job plugins and matching
them to specified parameters.
"""
[docs] def __init__(self, code_dir='lib'):
"""
"""
def __load(module_path, d):
module_pattern = join(join(getcwd(), code_dir, *module_path.split('.')), '*.py')
for file in glob(module_pattern):
if basename(file).startswith('_'):
continue
module_name = '%s.%s' % (module_path, basename(file).rsplit('.py', 1)[0])
module = __import__(module_name)
for comp in module_name.split(".")[1:]:
module = getattr(module, comp)
for name in module.__all__:
try:
d[name] = getattr(module, name)
except TypeError:
raise TypeError("Invalid type for name %s" % name)
self.cli_shells = {}
self.cli_job_interfaces = {}
self.active_cli_shells = {}
module_prefix = self.__module__
__load('%s.shell' % module_prefix, self.cli_shells)
__load('%s.job' % module_prefix, self.cli_job_interfaces)
[docs] def get_plugins(self, shell_params, job_params):
"""
Return shell and job interface defined by and configured via
specified params.
"""
shell = self.get_shell_plugin(shell_params)
job_interface = self.get_job_interface(job_params)
return shell, job_interface
[docs] def get_shell_plugin(self, shell_params):
shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN)
requested_shell_settings = json.dumps(shell_params, sort_keys=True)
if requested_shell_settings not in self.active_cli_shells:
self.active_cli_shells[requested_shell_settings] = self.cli_shells[shell_plugin](**shell_params)
return self.active_cli_shells[requested_shell_settings]
[docs] def get_job_interface(self, job_params):
job_plugin = job_params.get('plugin', None)
if not job_plugin:
raise ValueError(ERROR_MESSAGE_NO_JOB_PLUGIN)
job_plugin_class = self.cli_job_interfaces.get(job_plugin, None)
if not job_plugin_class:
raise ValueError(ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN % (job_plugin, list(self.cli_job_interfaces.keys())))
job_interface = job_plugin_class(**job_params)
return job_interface
[docs]def split_params(params):
shell_params = dict((k.replace('shell_', '', 1), v) for k, v in params.items() if k.startswith('shell_'))
job_params = dict((k.replace('job_', '', 1), v) for k, v in params.items() if k.startswith('job_'))
return shell_params, job_params