Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_shed.util.tool_util
import logging
import os
import shutil
import galaxy.tools
from galaxy import util
from galaxy.datatypes.sniff import is_column_based
from galaxy.tool_shed.util import basic_util
from galaxy.util import checkers
from galaxy.util.expressions import ExpressionContext
from galaxy.web.form_builder import SelectField
log = logging.getLogger(__name__)
[docs]def build_shed_tool_conf_select_field(app):
"""Build a SelectField whose options are the keys in app.toolbox.shed_tool_confs."""
options = []
for dynamic_tool_conf_filename in app.toolbox.dynamic_conf_filenames():
if dynamic_tool_conf_filename.startswith('./'):
option_label = dynamic_tool_conf_filename.replace('./', '', 1)
else:
option_label = dynamic_tool_conf_filename
options.append((option_label, dynamic_tool_conf_filename))
select_field = SelectField(name='shed_tool_conf')
for option_tup in options:
select_field.add_option(option_tup[0], option_tup[1])
return select_field
[docs]def build_tool_panel_section_select_field(app):
"""Build a SelectField whose options are the sections of the current in-memory toolbox."""
options = []
for section_id, section_name in app.toolbox.get_sections():
options.append((section_name, section_id))
select_field = SelectField(name='tool_panel_section_id', field_id='tool_panel_section_select')
for option_tup in options:
select_field.add_option(option_tup[0], option_tup[1])
return select_field
[docs]def copy_sample_file(app, filename, dest_path=None):
"""
Copies a sample file at `filename` to `the dest_path`
directory and strips the '.sample' extensions from `filename`.
Returns the path to the copied file (with the .sample extension).
"""
if dest_path is None:
dest_path = os.path.abspath(app.config.tool_data_path)
sample_file_name = basic_util.strip_path(filename)
copied_file = sample_file_name.rsplit('.sample', 1)[0]
full_source_path = os.path.abspath(filename)
full_destination_path = os.path.join(dest_path, sample_file_name)
# Don't copy a file to itself - not sure how this happens, but sometimes it does...
if full_source_path != full_destination_path:
# It's ok to overwrite the .sample version of the file.
shutil.copy(full_source_path, full_destination_path)
# Only create the .loc file if it does not yet exist. We don't overwrite it in case it
# contains stuff proprietary to the local instance.
non_sample_path = os.path.join(dest_path, copied_file)
if not os.path.lexists(non_sample_path):
shutil.copy(full_source_path, os.path.join(dest_path, copied_file))
return non_sample_path
[docs]def copy_sample_files(app, sample_files, tool_path=None, sample_files_copied=None, dest_path=None):
"""
Copy all appropriate files to dest_path in the local Galaxy environment that have not
already been copied. Those that have been copied are contained in sample_files_copied.
The default value for dest_path is ~/tool-data. We need to be careful to copy only
appropriate files here because tool shed repositories can contain files ending in .sample
that should not be copied to the ~/tool-data directory.
"""
filenames_not_to_copy = ['tool_data_table_conf.xml.sample']
sample_files_copied = util.listify(sample_files_copied)
for filename in sample_files:
filename_sans_path = os.path.split(filename)[1]
if filename_sans_path not in filenames_not_to_copy and filename not in sample_files_copied:
if tool_path:
filename = os.path.join(tool_path, filename)
# Attempt to ensure we're copying an appropriate file.
if is_data_index_sample_file(filename):
copy_sample_file(app, filename, dest_path=dest_path)
[docs]def generate_message_for_invalid_tools(app, invalid_file_tups, repository, metadata_dict, as_html=True,
displaying_invalid_tool=False):
if as_html:
new_line = '<br/>'
bold_start = '<b>'
bold_end = '</b>'
else:
new_line = '\n'
bold_start = ''
bold_end = ''
message = ''
if app.name == 'galaxy':
tip_rev = str(repository.changeset_revision)
else:
tip_rev = str(repository.tip())
if not displaying_invalid_tool:
if metadata_dict:
message += "Metadata may have been defined for some items in revision '%s'. " % tip_rev
message += "Correct the following problems if necessary and reset metadata.%s" % new_line
else:
message += "Metadata cannot be defined for revision '%s' so this revision cannot be automatically " % tip_rev
message += "installed into a local Galaxy instance. Correct the following problems and reset metadata.%s" % new_line
for itc_tup in invalid_file_tups:
tool_file, exception_msg = itc_tup
if exception_msg.find('No such file or directory') >= 0:
exception_items = exception_msg.split()
missing_file_items = exception_items[7].split('/')
missing_file = missing_file_items[-1].rstrip('\'')
if missing_file.endswith('.loc'):
sample_ext = '%s.sample' % missing_file
else:
sample_ext = missing_file
correction_msg = "This file refers to a missing file %s%s%s. " % \
(bold_start, str(missing_file), bold_end)
correction_msg += "Upload a file named %s%s%s to the repository to correct this error." % \
(bold_start, sample_ext, bold_end)
else:
if as_html:
correction_msg = exception_msg
else:
correction_msg = exception_msg.replace('<br/>', new_line).replace('<b>', bold_start).replace('</b>', bold_end)
message += "{}{}{} - {}{}".format(bold_start, tool_file, bold_end, correction_msg, new_line)
return message
[docs]def get_tool_path_install_dir(partial_install_dir, shed_tool_conf_dict, tool_dict, config_elems):
for elem in config_elems:
if elem.tag == 'tool':
if elem.get('guid') == tool_dict['guid']:
tool_path = shed_tool_conf_dict['tool_path']
relative_install_dir = os.path.join(tool_path, partial_install_dir)
return tool_path, relative_install_dir
elif elem.tag == 'section':
for section_elem in elem:
if section_elem.tag == 'tool':
if section_elem.get('guid') == tool_dict['guid']:
tool_path = shed_tool_conf_dict['tool_path']
relative_install_dir = os.path.join(tool_path, partial_install_dir)
return tool_path, relative_install_dir
return None, None
[docs]def handle_missing_index_file(app, tool_path, sample_files, repository_tools_tups, sample_files_copied):
"""
Inspect each tool to see if it has any input parameters that are dynamically
generated select lists that depend on a .loc file. This method is not called
from the tool shed, but from Galaxy when a repository is being installed.
"""
for index, repository_tools_tup in enumerate(repository_tools_tups):
tup_path, guid, repository_tool = repository_tools_tup
params_with_missing_index_file = repository_tool.params_with_missing_index_file
for param in params_with_missing_index_file:
options = param.options
missing_file_name = basic_util.strip_path(options.missing_index_file)
if missing_file_name not in sample_files_copied:
# The repository must contain the required xxx.loc.sample file.
for sample_file in sample_files:
sample_file_name = basic_util.strip_path(sample_file)
if sample_file_name == '%s.sample' % missing_file_name:
target_path = copy_sample_file(app, os.path.join(tool_path, sample_file))
if options.tool_data_table and options.tool_data_table.missing_index_file:
options.tool_data_table.handle_found_index_file(target_path)
sample_files_copied.append(target_path)
break
return repository_tools_tups, sample_files_copied
[docs]def is_data_index_sample_file(file_path):
"""
Attempt to determine if a .sample file is appropriate for copying to ~/tool-data when
a tool shed repository is being installed into a Galaxy instance.
"""
# Currently most data index files are tabular, so check that first. We'll assume that
# if the file is tabular, it's ok to copy.
if is_column_based(file_path):
return True
# If the file is any of the following, don't copy it.
if checkers.check_html(file_path):
return False
if checkers.check_image(file_path):
return False
if checkers.check_binary(name=file_path):
return False
if checkers.is_bz2(file_path):
return False
if checkers.is_gzip(file_path):
return False
if checkers.is_zip(file_path):
return False
# Default to copying the file if none of the above are true.
return True
[docs]def new_state(trans, tool, invalid=False):
"""Create a new `DefaultToolState` for the received tool. Only inputs on the first page will be initialized."""
state = galaxy.tools.DefaultToolState()
state.inputs = {}
if invalid:
# We're attempting to display a tool in the tool shed that has been determined to have errors, so is invalid.
return state
try:
# Attempt to generate the tool state using the standard Galaxy-side code
return tool.new_state(trans)
except Exception as e:
# Fall back to building tool state as below
log.debug('Failed to build tool state for tool "%s" using standard method, will try to fall back on custom method: %s', tool.id, e)
inputs = tool.inputs_by_page[0]
context = ExpressionContext(state.inputs, parent=None)
for input in inputs.values():
try:
state.inputs[input.name] = input.get_initial_value(trans, context)
except Exception:
# FIXME: not all values should be an empty list
state.inputs[input.name] = []
return state
[docs]def panel_entry_per_tool(tool_section_dict):
# Return True if tool_section_dict looks like this.
# {<Tool guid> :
# [{ tool_config : <tool_config_file>,
# id: <ToolSection id>,
# version : <ToolSection version>,
# name : <TooSection name>}]}
# But not like this.
# { id: <ToolSection id>, version : <ToolSection version>, name : <TooSection name>}
if not tool_section_dict:
return False
if len(tool_section_dict) != 3:
return True
for k, v in tool_section_dict:
if k not in ['id', 'version', 'name']:
return True
return False