Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.cli
"""
"""
import importlib
import json
import pkgutil
from glob import glob
from os import getcwd
from os.path import (
basename,
join
)
DEFAULT_SHELL_PLUGIN = 'LocalShell'
ERROR_MESSAGE_NO_JOB_PLUGIN = "No job plugin parameter found, cannot create CLI job interface"
ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN = "Failed to find job_plugin of type %s, available types include %s"
[docs]class CliInterface:
"""
High-level interface for loading shell and job plugins and matching
them to specified parameters.
"""
[docs] def __init__(self, code_dir='lib'):
"""
"""
def __load_from_code_dir(module_path):
module_pattern = join(join(getcwd(), code_dir, *module_path.split('.')), '*.py')
for file in glob(module_pattern):
if basename(file).startswith('_'):
continue
module_name = '{}.{}'.format(module_path, basename(file).rsplit('.py', 1)[0])
module = __import__(module_name)
for comp in module_name.split(".")[1:]:
module = getattr(module, comp)
yield module
def __load_from_path(module_path):
base_module = importlib.import_module(module_path)
for module_info in pkgutil.iter_modules(base_module.__path__):
module = importlib.import_module('{}.{}'.format(module_path, module_info.name))
yield module
def __load(module_path, d):
if code_dir is not None:
module_generator = __load_from_code_dir
else:
module_generator = __load_from_path
for module in module_generator(module_path):
for name in module.__all__:
try:
d[name] = getattr(module, name)
except TypeError:
raise TypeError("Invalid type for name %s" % name)
self.cli_shells = {}
self.cli_job_interfaces = {}
self.active_cli_shells = {}
module_prefix = self.__module__
__load('%s.shell' % module_prefix, self.cli_shells)
__load('%s.job' % module_prefix, self.cli_job_interfaces)
[docs] def get_plugins(self, shell_params, job_params):
"""
Return shell and job interface defined by and configured via
specified params.
"""
shell = self.get_shell_plugin(shell_params)
job_interface = self.get_job_interface(job_params)
return shell, job_interface
[docs] def get_shell_plugin(self, shell_params):
shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN)
requested_shell_settings = json.dumps(shell_params, sort_keys=True)
if requested_shell_settings not in self.active_cli_shells:
self.active_cli_shells[requested_shell_settings] = self.cli_shells[shell_plugin](**shell_params)
return self.active_cli_shells[requested_shell_settings]
[docs] def get_job_interface(self, job_params):
job_plugin = job_params.get('plugin', None)
if not job_plugin:
raise ValueError(ERROR_MESSAGE_NO_JOB_PLUGIN)
job_plugin_class = self.cli_job_interfaces.get(job_plugin, None)
if not job_plugin_class:
raise ValueError(ERROR_MESSAGE_NO_SUCH_JOB_PLUGIN % (job_plugin, list(self.cli_job_interfaces.keys())))
job_interface = job_plugin_class(**job_params)
return job_interface
[docs]def split_params(params):
shell_params = {k.replace('shell_', '', 1): v for k, v in params.items() if k.startswith('shell_')}
job_params = {k.replace('job_', '', 1): v for k, v in params.items() if k.startswith('job_')}
return shell_params, job_params