Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_manager.manager
import errno
import json
import logging
import os
from six import string_types
from galaxy import util
from galaxy.queue_worker import (
    send_control_task
)
from galaxy.tools.data import TabularToolDataTable
from galaxy.util.odict import odict
from galaxy.util.template import fill_template
from tool_shed.util import (
    common_util,
    repository_util
)
log = logging.getLogger(__name__)
SUPPORTED_DATA_TABLE_TYPES = (TabularToolDataTable)
VALUE_TRANSLATION_FUNCTIONS = dict(abspath=os.path.abspath)
DEFAULT_VALUE_TRANSLATION_TYPE = 'template'
[docs]class DataManagers(object):
[docs]    def __init__(self, app, xml_filename=None):
        self.app = app
        self.data_managers = odict()
        self.managed_data_tables = odict()
        self.tool_path = None
        self._reload_count = 0
        self.filename = xml_filename or self.app.config.data_manager_config_file
        for filename in util.listify(self.filename):
            if not filename:
                continue
            self.load_from_xml(filename)
        if self.app.config.shed_data_manager_config_file:
            self.load_from_xml(self.app.config.shed_data_manager_config_file, store_tool_path=True)
[docs]    def load_from_xml(self, xml_filename, store_tool_path=True):
        try:
            tree = util.parse_xml(xml_filename)
        except Exception as e:
            log.error('There was an error parsing your Data Manager config file "%s": %s' % (xml_filename, e))
            return  # we are not able to load any data managers
        root = tree.getroot()
        if root.tag != 'data_managers':
            log.error('A data managers configuration must have a "data_managers" tag as the root. "%s" is present' % (root.tag))
            return
        if store_tool_path:
            tool_path = root.get('tool_path', None)
            if tool_path is None:
                tool_path = self.app.config.tool_path
            if not tool_path:
                tool_path = '.'
            self.tool_path = tool_path
        for data_manager_elem in root.findall('data_manager'):
            self.load_manager_from_elem(data_manager_elem, tool_path=self.tool_path)
[docs]    def load_manager_from_elem(self, data_manager_elem, tool_path=None, add_manager=True):
        try:
            data_manager = DataManager(self, data_manager_elem, tool_path=tool_path)
        except Exception as e:
            log.error("Error loading data_manager '%s':\n%s" % (e, util.xml_to_string(data_manager_elem)))
            return None
        if add_manager:
            self.add_manager(data_manager)
        log.debug('Loaded Data Manager: %s' % (data_manager.id))
        return data_manager
[docs]    def add_manager(self, data_manager):
        if data_manager.id in self.data_managers:
            log.warning("A data manager has been defined twice: %s " % (data_manager.id))
        self.data_managers[data_manager.id] = data_manager
        for data_table_name in data_manager.data_tables.keys():
            if data_table_name not in self.managed_data_tables:
                self.managed_data_tables[data_table_name] = []
            self.managed_data_tables[data_table_name].append(data_manager)
[docs]    def remove_manager(self, manager_ids):
        if not isinstance(manager_ids, list):
            manager_ids = [manager_ids]
        for manager_id in manager_ids:
            data_manager = self.get_manager(manager_id, None)
            if data_manager is not None:
                del self.data_managers[manager_id]
                # remove tool from toolbox
                if data_manager.tool:
                    self.app.toolbox.remove_tool_by_id(data_manager.tool.id)
                # determine if any data_tables are no longer tracked
                for data_table_name in data_manager.data_tables.keys():
                    remove_data_table_tracking = True
                    for other_data_manager in self.data_managers.values():
                        if data_table_name in other_data_manager.data_tables:
                            remove_data_table_tracking = False
                            break
                    if remove_data_table_tracking and data_table_name in self.managed_data_tables:
                        del self.managed_data_tables[data_table_name]
[docs]class DataManager(object):
    GUID_TYPE = 'data_manager'
    DEFAULT_VERSION = "0.0.1"
[docs]    def __init__(self, data_managers, elem=None, tool_path=None):
        self.data_managers = data_managers
        self.declared_id = None
        self.name = None
        self.description = None
        self.version = self.DEFAULT_VERSION
        self.guid = None
        self.tool = None
        self.data_tables = odict()
        self.output_ref_by_data_table = {}
        self.move_by_data_table_column = {}
        self.value_translation_by_data_table_column = {}
        self.tool_shed_repository_info_dict = None
        self.undeclared_tables = False
        if elem is not None:
            self.load_from_element(elem, tool_path or self.data_managers.tool_path)
[docs]    def load_from_element(self, elem, tool_path):
        assert elem.tag == 'data_manager', 'A data manager configuration must have a "data_manager" tag as the root. "%s" is present' % (elem.tag)
        self.declared_id = elem.get('id', None)
        self.guid = elem.get('guid', None)
        path = elem.get('tool_file', None)
        self.version = elem.get('version', self.version)
        tool_shed_repository_id = None
        tool_guid = None
        if path is None:
            tool_elem = elem.find('tool')
            assert tool_elem is not None, "Error loading tool for data manager. Make sure that a tool_file attribute or a tool tag set has been defined:\n%s" % (util.xml_to_string(elem))
            path = tool_elem.get("file", None)
            tool_guid = tool_elem.get("guid", None)
            # need to determine repository info so that dependencies will work correctly
            if hasattr(self.data_managers.app, 'tool_cache') and tool_guid in self.data_managers.app.tool_cache._tool_paths_by_id:
                path = self.data_managers.app.tool_cache._tool_paths_by_id[tool_guid]
                tool = self.data_managers.app.tool_cache.get_tool(path)
                tool_shed_repository = tool.tool_shed_repository
                self.tool_shed_repository_info_dict = dict(tool_shed=tool_shed_repository.tool_shed,
                                                           name=tool_shed_repository.name,
                                                           owner=tool_shed_repository.owner,
                                                           installed_changeset_revision=tool_shed_repository.installed_changeset_revision)
                tool_shed_repository_id = self.data_managers.app.security.encode_id(tool_shed_repository.id)
                tool_path = ""
            else:
                tool_shed_url = tool_elem.find('tool_shed').text
                # Handle protocol changes.
                tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(self.data_managers.app, tool_shed_url)
                # The protocol is not stored in the database.
                tool_shed = common_util.remove_protocol_from_tool_shed_url(tool_shed_url)
                repository_name = tool_elem.find('repository_name').text
                repository_owner = tool_elem.find('repository_owner').text
                installed_changeset_revision = tool_elem.find('installed_changeset_revision').text
                self.tool_shed_repository_info_dict = dict(tool_shed=tool_shed,
                                                           name=repository_name,
                                                           owner=repository_owner,
                                                           installed_changeset_revision=installed_changeset_revision)
                tool_shed_repository = \
                    repository_util.get_installed_repository(self.data_managers.app,
                                                             tool_shed=tool_shed,
                                                             name=repository_name,
                                                             owner=repository_owner,
                                                             installed_changeset_revision=installed_changeset_revision)
                if tool_shed_repository is None:
                    log.warning('Could not determine tool shed repository from database. This should only ever happen when running tests.')
                    # we'll set tool_path manually here from shed_conf_file
                    tool_shed_repository_id = None
                    try:
                        tool_path = util.parse_xml(elem.get('shed_conf_file')).getroot().get('tool_path', tool_path)
                    except Exception as e:
                        log.error('Error determining tool_path for Data Manager during testing: %s', e)
                else:
                    tool_shed_repository_id = self.data_managers.app.security.encode_id(tool_shed_repository.id)
                # use shed_conf_file to determine tool_path
                shed_conf_file = elem.get("shed_conf_file", None)
                if shed_conf_file:
                    shed_conf = self.data_managers.app.toolbox.get_shed_config_dict_by_filename(shed_conf_file, None)
                    if shed_conf:
                        tool_path = shed_conf.get("tool_path", tool_path)
        assert path is not None, "A tool file path could not be determined:\n%s" % (util.xml_to_string(elem))
        self.load_tool(os.path.join(tool_path, path),
                       guid=tool_guid,
                       data_manager_id=self.id,
                       tool_shed_repository_id=tool_shed_repository_id)
        self.name = elem.get('name', self.tool.name)
        self.description = elem.get('description', self.tool.description)
        self.undeclared_tables = util.asbool(elem.get('undeclared_tables', self.undeclared_tables))
        for data_table_elem in elem.findall('data_table'):
            data_table_name = data_table_elem.get("name")
            assert data_table_name is not None, "A name is required for a data table entry"
            if data_table_name not in self.data_tables:
                self.data_tables[data_table_name] = odict()
            output_elem = data_table_elem.find('output')
            if output_elem is not None:
                for column_elem in output_elem.findall('column'):
                    column_name = column_elem.get('name', None)
                    assert column_name is not None, "Name is required for column entry"
                    data_table_coumn_name = column_elem.get('data_table_name', column_name)
                    self.data_tables[data_table_name][data_table_coumn_name] = column_name
                    output_ref = column_elem.get('output_ref', None)
                    if output_ref is not None:
                        if data_table_name not in self.output_ref_by_data_table:
                            self.output_ref_by_data_table[data_table_name] = {}
                        self.output_ref_by_data_table[data_table_name][data_table_coumn_name] = output_ref
                    value_translation_elems = column_elem.findall('value_translation')
                    if value_translation_elems is not None:
                        for value_translation_elem in value_translation_elems:
                            value_translation = value_translation_elem.text
                            if value_translation is not None:
                                value_translation_type = value_translation_elem.get('type', DEFAULT_VALUE_TRANSLATION_TYPE)
                                if data_table_name not in self.value_translation_by_data_table_column:
                                    self.value_translation_by_data_table_column[data_table_name] = {}
                                if data_table_coumn_name not in self.value_translation_by_data_table_column[data_table_name]:
                                    self.value_translation_by_data_table_column[data_table_name][data_table_coumn_name] = []
                                if value_translation_type == 'function':
                                    if value_translation in VALUE_TRANSLATION_FUNCTIONS:
                                        value_translation = VALUE_TRANSLATION_FUNCTIONS[value_translation]
                                    else:
                                        raise ValueError("Unsupported value translation function: '%s'" % (value_translation))
                                else:
                                    assert value_translation_type == DEFAULT_VALUE_TRANSLATION_TYPE, ValueError("Unsupported value translation type: '%s'" % (value_translation_type))
                                self.value_translation_by_data_table_column[data_table_name][data_table_coumn_name].append(value_translation)
                    for move_elem in column_elem.findall('move'):
                        move_type = move_elem.get('type', 'directory')
                        relativize_symlinks = move_elem.get('relativize_symlinks', False)  # TODO: should we instead always relativize links?
                        source_elem = move_elem.find('source')
                        if source_elem is None:
                            source_base = None
                            source_value = ''
                        else:
                            source_base = source_elem.get('base', None)
                            source_value = source_elem.text
                        target_elem = move_elem.find('target')
                        if target_elem is None:
                            target_base = None
                            target_value = ''
                        else:
                            target_base = target_elem.get('base', None)
                            target_value = target_elem.text
                        if data_table_name not in self.move_by_data_table_column:
                            self.move_by_data_table_column[data_table_name] = {}
                        self.move_by_data_table_column[data_table_name][data_table_coumn_name] = \
                            dict(type=move_type,
                                 source_base=source_base,
                                 source_value=source_value,
                                 target_base=target_base,
                                 target_value=target_value,
                                 relativize_symlinks=relativize_symlinks)
    @property
    def id(self):
        return self.guid or self.declared_id  # if we have a guid, we will use that as the data_manager id
[docs]    def load_tool(self, tool_filename, guid=None, data_manager_id=None, tool_shed_repository_id=None):
        toolbox = self.data_managers.app.toolbox
        tool = toolbox.load_hidden_tool(tool_filename,
                                        guid=guid,
                                        data_manager_id=data_manager_id,
                                        repository_id=tool_shed_repository_id,
                                        use_cached=True)
        self.data_managers.app.toolbox.data_manager_tools[tool.id] = tool
        self.tool = tool
        return tool
[docs]    def process_result(self, out_data):
        data_manager_dicts = {}
        data_manager_dict = {}
        # TODO: fix this merging below
        for output_name, output_dataset in out_data.items():
            try:
                output_dict = json.loads(open(output_dataset.file_name).read())
            except Exception as e:
                log.warning('Error reading DataManagerTool json for "%s": %s' % (output_name, e))
                continue
            data_manager_dicts[output_name] = output_dict
            for key, value in output_dict.items():
                if key not in data_manager_dict:
                    data_manager_dict[key] = {}
                data_manager_dict[key].update(value)
            data_manager_dict.update(output_dict)
        data_tables_dict = data_manager_dict.get('data_tables', {})
        for data_table_name in self.data_tables.keys():
            data_table_values = data_tables_dict.pop(data_table_name, None)
            if not data_table_values:
                log.warning('No values for data table "%s" were returned by the data manager "%s".' % (data_table_name, self.id))
                continue  # next data table
            data_table = self.data_managers.app.tool_data_tables.get(data_table_name, None)
            if data_table is None:
                log.error('The data manager "%s" returned an unknown data table "%s" with new entries "%s". These entries will not be created. Please confirm that an entry for "%s" exists in your "%s" file.' % (self.id, data_table_name, data_table_values, data_table_name, 'tool_data_table_conf.xml'))
                continue  # next table name
            if not isinstance(data_table, SUPPORTED_DATA_TABLE_TYPES):
                log.error('The data manager "%s" returned an unsupported data table "%s" with type "%s" with new entries "%s". These entries will not be created. Please confirm that the data table is of a supported type (%s).' % (self.id, data_table_name, type(data_table), data_table_values, SUPPORTED_DATA_TABLE_TYPES))
                continue  # next table name
            output_ref_values = {}
            if data_table_name in self.output_ref_by_data_table:
                for data_table_column, output_ref in self.output_ref_by_data_table[data_table_name].items():
                    output_ref_dataset = out_data.get(output_ref, None)
                    assert output_ref_dataset is not None, "Referenced output was not found."
                    output_ref_values[data_table_column] = output_ref_dataset
            if not isinstance(data_table_values, list):
                data_table_values = [data_table_values]
            for data_table_row in data_table_values:
                data_table_value = dict(**data_table_row)  # keep original values here
                for name, value in data_table_row.items():  # FIXME: need to loop through here based upon order listed in data_manager config
                    if name in output_ref_values:
                        self.process_move(data_table_name, name, output_ref_values[name].extra_files_path, **data_table_value)
                        data_table_value[name] = self.process_value_translation(data_table_name, name, **data_table_value)
                data_table.add_entry(data_table_value, persist=True, entry_source=self)
            send_control_task(self.data_managers.app,
                              'reload_tool_data_tables',
                              noop_self=True,
                              kwargs={'table_name': data_table_name})
        if self.undeclared_tables and data_tables_dict:
            # We handle the data move, by just moving all the data out of the extra files path
            # moving a directory and the target already exists, we move the contents instead
            log.debug('Attempting to add entries for undeclared tables: %s.', ', '.join(data_tables_dict.keys()))
            for ref_file in out_data.values():
                if ref_file.extra_files_path_exists():
                    util.move_merge(ref_file.extra_files_path, self.data_managers.app.config.galaxy_data_manager_data_path)
            path_column_names = ['path']
            for data_table_name, data_table_values in data_tables_dict.items():
                data_table = self.data_managers.app.tool_data_tables.get(data_table_name, None)
                if not isinstance(data_table_values, list):
                    data_table_values = [data_table_values]
                for data_table_row in data_table_values:
                    data_table_value = dict(**data_table_row)  # keep original values here
                    for name, value in data_table_row.items():
                        if name in path_column_names:
                            data_table_value[name] = os.path.abspath(os.path.join(self.data_managers.app.config.galaxy_data_manager_data_path, value))
                    data_table.add_entry(data_table_value, persist=True, entry_source=self)
                send_control_task(self.data_managers.app, 'reload_tool_data_tables',
                                  noop_self=True,
                                  kwargs={'table_name': data_table_name})
        else:
            for data_table_name, data_table_values in data_tables_dict.items():
                # tool returned extra data table entries, but data table was not declared in data manager
                # do not add these values, but do provide messages
                log.warning('The data manager "%s" returned an undeclared data table "%s" with new entries "%s". These entries will not be created. Please confirm that an entry for "%s" exists in your "%s" file.' % (self.id, data_table_name, data_table_values, data_table_name, self.data_managers.filename))
[docs]    def process_move(self, data_table_name, column_name, source_base_path, relative_symlinks=False, **kwd):
        if data_table_name in self.move_by_data_table_column and column_name in self.move_by_data_table_column[data_table_name]:
            move_dict = self.move_by_data_table_column[data_table_name][column_name]
            source = move_dict['source_base']
            if source is None:
                source = source_base_path
            else:
                source = fill_template(source, GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd)
            if move_dict['source_value']:
                source = os.path.join(source, fill_template(move_dict['source_value'], GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd))
            target = move_dict['target_base']
            if target is None:
                target = self.data_managers.app.config.galaxy_data_manager_data_path
            else:
                target = fill_template(target, GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd)
            if move_dict['target_value']:
                target = os.path.join(target, fill_template(move_dict['target_value'], GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd))
            if move_dict['type'] == 'file':
                dirs = os.path.split(target)[0]
                try:
                    os.makedirs(dirs)
                except OSError as e:
                    if e.errno != errno.EEXIST:
                        raise e
            # moving a directory and the target already exists, we move the contents instead
            if os.path.exists(source):
                util.move_merge(source, target)
            if move_dict.get('relativize_symlinks', False):
                util.relativize_symlinks(target)
            return True
        return False
[docs]    def process_value_translation(self, data_table_name, column_name, **kwd):
        value = kwd.get(column_name)
        if data_table_name in self.value_translation_by_data_table_column and column_name in self.value_translation_by_data_table_column[data_table_name]:
            for value_translation in self.value_translation_by_data_table_column[data_table_name][column_name]:
                if isinstance(value_translation, string_types):
                    value = fill_template(value_translation, GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd)
                else:
                    value = value_translation(value)
        return value