Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_manager.manager
import errno
import json
import logging
import os
from six import string_types
from galaxy import util
from galaxy.queue_worker import (
send_control_task
)
from galaxy.tools.data import TabularToolDataTable
from galaxy.util.odict import odict
from galaxy.util.template import fill_template
from tool_shed.util import (
common_util,
repository_util
)
log = logging.getLogger(__name__)
SUPPORTED_DATA_TABLE_TYPES = (TabularToolDataTable)
VALUE_TRANSLATION_FUNCTIONS = dict(abspath=os.path.abspath)
DEFAULT_VALUE_TRANSLATION_TYPE = 'template'
[docs]class DataManagers(object):
[docs] def __init__(self, app, xml_filename=None):
self.app = app
self.data_managers = odict()
self.managed_data_tables = odict()
self.tool_path = None
self._reload_count = 0
self.filename = xml_filename or self.app.config.data_manager_config_file
for filename in util.listify(self.filename):
if not filename:
continue
self.load_from_xml(filename)
if self.app.config.shed_data_manager_config_file:
self.load_from_xml(self.app.config.shed_data_manager_config_file, store_tool_path=True)
[docs] def load_from_xml(self, xml_filename, store_tool_path=True):
try:
tree = util.parse_xml(xml_filename)
except Exception as e:
log.error('There was an error parsing your Data Manager config file "%s": %s' % (xml_filename, e))
return # we are not able to load any data managers
root = tree.getroot()
if root.tag != 'data_managers':
log.error('A data managers configuration must have a "data_managers" tag as the root. "%s" is present' % (root.tag))
return
if store_tool_path:
tool_path = root.get('tool_path', None)
if tool_path is None:
tool_path = self.app.config.tool_path
if not tool_path:
tool_path = '.'
self.tool_path = tool_path
for data_manager_elem in root.findall('data_manager'):
self.load_manager_from_elem(data_manager_elem, tool_path=self.tool_path)
[docs] def load_manager_from_elem(self, data_manager_elem, tool_path=None, add_manager=True):
try:
data_manager = DataManager(self, data_manager_elem, tool_path=tool_path)
except Exception as e:
log.error("Error loading data_manager '%s':\n%s" % (e, util.xml_to_string(data_manager_elem)))
return None
if add_manager:
self.add_manager(data_manager)
log.debug('Loaded Data Manager: %s' % (data_manager.id))
return data_manager
[docs] def add_manager(self, data_manager):
if data_manager.id in self.data_managers:
log.warning("A data manager has been defined twice: %s " % (data_manager.id))
self.data_managers[data_manager.id] = data_manager
for data_table_name in data_manager.data_tables.keys():
if data_table_name not in self.managed_data_tables:
self.managed_data_tables[data_table_name] = []
self.managed_data_tables[data_table_name].append(data_manager)
[docs] def remove_manager(self, manager_ids):
if not isinstance(manager_ids, list):
manager_ids = [manager_ids]
for manager_id in manager_ids:
data_manager = self.get_manager(manager_id, None)
if data_manager is not None:
del self.data_managers[manager_id]
# remove tool from toolbox
if data_manager.tool:
self.app.toolbox.remove_tool_by_id(data_manager.tool.id)
# determine if any data_tables are no longer tracked
for data_table_name in data_manager.data_tables.keys():
remove_data_table_tracking = True
for other_data_manager in self.data_managers.values():
if data_table_name in other_data_manager.data_tables:
remove_data_table_tracking = False
break
if remove_data_table_tracking and data_table_name in self.managed_data_tables:
del self.managed_data_tables[data_table_name]
[docs]class DataManager(object):
GUID_TYPE = 'data_manager'
DEFAULT_VERSION = "0.0.1"
[docs] def __init__(self, data_managers, elem=None, tool_path=None):
self.data_managers = data_managers
self.declared_id = None
self.name = None
self.description = None
self.version = self.DEFAULT_VERSION
self.guid = None
self.tool = None
self.data_tables = odict()
self.output_ref_by_data_table = {}
self.move_by_data_table_column = {}
self.value_translation_by_data_table_column = {}
self.tool_shed_repository_info_dict = None
self.undeclared_tables = False
if elem is not None:
self.load_from_element(elem, tool_path or self.data_managers.tool_path)
[docs] def load_from_element(self, elem, tool_path):
assert elem.tag == 'data_manager', 'A data manager configuration must have a "data_manager" tag as the root. "%s" is present' % (elem.tag)
self.declared_id = elem.get('id', None)
self.guid = elem.get('guid', None)
path = elem.get('tool_file', None)
self.version = elem.get('version', self.version)
tool_shed_repository_id = None
tool_guid = None
if path is None:
tool_elem = elem.find('tool')
assert tool_elem is not None, "Error loading tool for data manager. Make sure that a tool_file attribute or a tool tag set has been defined:\n%s" % (util.xml_to_string(elem))
path = tool_elem.get("file", None)
tool_guid = tool_elem.get("guid", None)
# need to determine repository info so that dependencies will work correctly
if hasattr(self.data_managers.app, 'tool_cache') and tool_guid in self.data_managers.app.tool_cache._tool_paths_by_id:
path = self.data_managers.app.tool_cache._tool_paths_by_id[tool_guid]
tool = self.data_managers.app.tool_cache.get_tool(path)
tool_shed_repository = tool.tool_shed_repository
self.tool_shed_repository_info_dict = dict(tool_shed=tool_shed_repository.tool_shed,
name=tool_shed_repository.name,
owner=tool_shed_repository.owner,
installed_changeset_revision=tool_shed_repository.installed_changeset_revision)
tool_shed_repository_id = self.data_managers.app.security.encode_id(tool_shed_repository.id)
tool_path = ""
else:
tool_shed_url = tool_elem.find('tool_shed').text
# Handle protocol changes.
tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(self.data_managers.app, tool_shed_url)
# The protocol is not stored in the database.
tool_shed = common_util.remove_protocol_from_tool_shed_url(tool_shed_url)
repository_name = tool_elem.find('repository_name').text
repository_owner = tool_elem.find('repository_owner').text
installed_changeset_revision = tool_elem.find('installed_changeset_revision').text
self.tool_shed_repository_info_dict = dict(tool_shed=tool_shed,
name=repository_name,
owner=repository_owner,
installed_changeset_revision=installed_changeset_revision)
tool_shed_repository = \
repository_util.get_installed_repository(self.data_managers.app,
tool_shed=tool_shed,
name=repository_name,
owner=repository_owner,
installed_changeset_revision=installed_changeset_revision)
if tool_shed_repository is None:
log.warning('Could not determine tool shed repository from database. This should only ever happen when running tests.')
# we'll set tool_path manually here from shed_conf_file
tool_shed_repository_id = None
try:
tool_path = util.parse_xml(elem.get('shed_conf_file')).getroot().get('tool_path', tool_path)
except Exception as e:
log.error('Error determining tool_path for Data Manager during testing: %s', e)
else:
tool_shed_repository_id = self.data_managers.app.security.encode_id(tool_shed_repository.id)
# use shed_conf_file to determine tool_path
shed_conf_file = elem.get("shed_conf_file", None)
if shed_conf_file:
shed_conf = self.data_managers.app.toolbox.get_shed_config_dict_by_filename(shed_conf_file, None)
if shed_conf:
tool_path = shed_conf.get("tool_path", tool_path)
assert path is not None, "A tool file path could not be determined:\n%s" % (util.xml_to_string(elem))
self.load_tool(os.path.join(tool_path, path),
guid=tool_guid,
data_manager_id=self.id,
tool_shed_repository_id=tool_shed_repository_id)
self.name = elem.get('name', self.tool.name)
self.description = elem.get('description', self.tool.description)
self.undeclared_tables = util.asbool(elem.get('undeclared_tables', self.undeclared_tables))
for data_table_elem in elem.findall('data_table'):
data_table_name = data_table_elem.get("name")
assert data_table_name is not None, "A name is required for a data table entry"
if data_table_name not in self.data_tables:
self.data_tables[data_table_name] = odict()
output_elem = data_table_elem.find('output')
if output_elem is not None:
for column_elem in output_elem.findall('column'):
column_name = column_elem.get('name', None)
assert column_name is not None, "Name is required for column entry"
data_table_coumn_name = column_elem.get('data_table_name', column_name)
self.data_tables[data_table_name][data_table_coumn_name] = column_name
output_ref = column_elem.get('output_ref', None)
if output_ref is not None:
if data_table_name not in self.output_ref_by_data_table:
self.output_ref_by_data_table[data_table_name] = {}
self.output_ref_by_data_table[data_table_name][data_table_coumn_name] = output_ref
value_translation_elems = column_elem.findall('value_translation')
if value_translation_elems is not None:
for value_translation_elem in value_translation_elems:
value_translation = value_translation_elem.text
if value_translation is not None:
value_translation_type = value_translation_elem.get('type', DEFAULT_VALUE_TRANSLATION_TYPE)
if data_table_name not in self.value_translation_by_data_table_column:
self.value_translation_by_data_table_column[data_table_name] = {}
if data_table_coumn_name not in self.value_translation_by_data_table_column[data_table_name]:
self.value_translation_by_data_table_column[data_table_name][data_table_coumn_name] = []
if value_translation_type == 'function':
if value_translation in VALUE_TRANSLATION_FUNCTIONS:
value_translation = VALUE_TRANSLATION_FUNCTIONS[value_translation]
else:
raise ValueError("Unsupported value translation function: '%s'" % (value_translation))
else:
assert value_translation_type == DEFAULT_VALUE_TRANSLATION_TYPE, ValueError("Unsupported value translation type: '%s'" % (value_translation_type))
self.value_translation_by_data_table_column[data_table_name][data_table_coumn_name].append(value_translation)
for move_elem in column_elem.findall('move'):
move_type = move_elem.get('type', 'directory')
relativize_symlinks = move_elem.get('relativize_symlinks', False) # TODO: should we instead always relativize links?
source_elem = move_elem.find('source')
if source_elem is None:
source_base = None
source_value = ''
else:
source_base = source_elem.get('base', None)
source_value = source_elem.text
target_elem = move_elem.find('target')
if target_elem is None:
target_base = None
target_value = ''
else:
target_base = target_elem.get('base', None)
target_value = target_elem.text
if data_table_name not in self.move_by_data_table_column:
self.move_by_data_table_column[data_table_name] = {}
self.move_by_data_table_column[data_table_name][data_table_coumn_name] = \
dict(type=move_type,
source_base=source_base,
source_value=source_value,
target_base=target_base,
target_value=target_value,
relativize_symlinks=relativize_symlinks)
@property
def id(self):
return self.guid or self.declared_id # if we have a guid, we will use that as the data_manager id
[docs] def load_tool(self, tool_filename, guid=None, data_manager_id=None, tool_shed_repository_id=None):
toolbox = self.data_managers.app.toolbox
tool = toolbox.load_hidden_tool(tool_filename,
guid=guid,
data_manager_id=data_manager_id,
repository_id=tool_shed_repository_id,
use_cached=True)
self.data_managers.app.toolbox.data_manager_tools[tool.id] = tool
self.tool = tool
return tool
[docs] def process_result(self, out_data):
data_manager_dicts = {}
data_manager_dict = {}
# TODO: fix this merging below
for output_name, output_dataset in out_data.items():
try:
output_dict = json.loads(open(output_dataset.file_name).read())
except Exception as e:
log.warning('Error reading DataManagerTool json for "%s": %s' % (output_name, e))
continue
data_manager_dicts[output_name] = output_dict
for key, value in output_dict.items():
if key not in data_manager_dict:
data_manager_dict[key] = {}
data_manager_dict[key].update(value)
data_manager_dict.update(output_dict)
data_tables_dict = data_manager_dict.get('data_tables', {})
for data_table_name in self.data_tables.keys():
data_table_values = data_tables_dict.pop(data_table_name, None)
if not data_table_values:
log.warning('No values for data table "%s" were returned by the data manager "%s".' % (data_table_name, self.id))
continue # next data table
data_table = self.data_managers.app.tool_data_tables.get(data_table_name, None)
if data_table is None:
log.error('The data manager "%s" returned an unknown data table "%s" with new entries "%s". These entries will not be created. Please confirm that an entry for "%s" exists in your "%s" file.' % (self.id, data_table_name, data_table_values, data_table_name, 'tool_data_table_conf.xml'))
continue # next table name
if not isinstance(data_table, SUPPORTED_DATA_TABLE_TYPES):
log.error('The data manager "%s" returned an unsupported data table "%s" with type "%s" with new entries "%s". These entries will not be created. Please confirm that the data table is of a supported type (%s).' % (self.id, data_table_name, type(data_table), data_table_values, SUPPORTED_DATA_TABLE_TYPES))
continue # next table name
output_ref_values = {}
if data_table_name in self.output_ref_by_data_table:
for data_table_column, output_ref in self.output_ref_by_data_table[data_table_name].items():
output_ref_dataset = out_data.get(output_ref, None)
assert output_ref_dataset is not None, "Referenced output was not found."
output_ref_values[data_table_column] = output_ref_dataset
if not isinstance(data_table_values, list):
data_table_values = [data_table_values]
for data_table_row in data_table_values:
data_table_value = dict(**data_table_row) # keep original values here
for name, value in data_table_row.items(): # FIXME: need to loop through here based upon order listed in data_manager config
if name in output_ref_values:
self.process_move(data_table_name, name, output_ref_values[name].extra_files_path, **data_table_value)
data_table_value[name] = self.process_value_translation(data_table_name, name, **data_table_value)
data_table.add_entry(data_table_value, persist=True, entry_source=self)
send_control_task(self.data_managers.app,
'reload_tool_data_tables',
noop_self=True,
kwargs={'table_name': data_table_name})
if self.undeclared_tables and data_tables_dict:
# We handle the data move, by just moving all the data out of the extra files path
# moving a directory and the target already exists, we move the contents instead
log.debug('Attempting to add entries for undeclared tables: %s.', ', '.join(data_tables_dict.keys()))
for ref_file in out_data.values():
if os.path.exists(ref_file.extra_files_path):
util.move_merge(ref_file.extra_files_path, self.data_managers.app.config.galaxy_data_manager_data_path)
path_column_names = ['path']
for data_table_name, data_table_values in data_tables_dict.items():
data_table = self.data_managers.app.tool_data_tables.get(data_table_name, None)
if not isinstance(data_table_values, list):
data_table_values = [data_table_values]
for data_table_row in data_table_values:
data_table_value = dict(**data_table_row) # keep original values here
for name, value in data_table_row.items():
if name in path_column_names:
data_table_value[name] = os.path.abspath(os.path.join(self.data_managers.app.config.galaxy_data_manager_data_path, value))
data_table.add_entry(data_table_value, persist=True, entry_source=self)
send_control_task(self.data_managers.app, 'reload_tool_data_tables',
noop_self=True,
kwargs={'table_name': data_table_name})
else:
for data_table_name, data_table_values in data_tables_dict.items():
# tool returned extra data table entries, but data table was not declared in data manager
# do not add these values, but do provide messages
log.warning('The data manager "%s" returned an undeclared data table "%s" with new entries "%s". These entries will not be created. Please confirm that an entry for "%s" exists in your "%s" file.' % (self.id, data_table_name, data_table_values, data_table_name, self.data_managers.filename))
[docs] def process_move(self, data_table_name, column_name, source_base_path, relative_symlinks=False, **kwd):
if data_table_name in self.move_by_data_table_column and column_name in self.move_by_data_table_column[data_table_name]:
move_dict = self.move_by_data_table_column[data_table_name][column_name]
source = move_dict['source_base']
if source is None:
source = source_base_path
else:
source = fill_template(source, GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd)
if move_dict['source_value']:
source = os.path.join(source, fill_template(move_dict['source_value'], GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd))
target = move_dict['target_base']
if target is None:
target = self.data_managers.app.config.galaxy_data_manager_data_path
else:
target = fill_template(target, GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd)
if move_dict['target_value']:
target = os.path.join(target, fill_template(move_dict['target_value'], GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd))
if move_dict['type'] == 'file':
dirs = os.path.split(target)[0]
try:
os.makedirs(dirs)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
# moving a directory and the target already exists, we move the contents instead
if os.path.exists(source):
util.move_merge(source, target)
if move_dict.get('relativize_symlinks', False):
util.relativize_symlinks(target)
return True
return False
[docs] def process_value_translation(self, data_table_name, column_name, **kwd):
value = kwd.get(column_name)
if data_table_name in self.value_translation_by_data_table_column and column_name in self.value_translation_by_data_table_column[data_table_name]:
for value_translation in self.value_translation_by_data_table_column[data_table_name][column_name]:
if isinstance(value_translation, string_types):
value = fill_template(value_translation, GALAXY_DATA_MANAGER_DATA_PATH=self.data_managers.app.config.galaxy_data_manager_data_path, **kwd)
else:
value = value_translation(value)
return value