Source code for

import logging
from json import dumps

import galaxy.queue_worker
from galaxy import exceptions, managers, util, web
from galaxy.managers.collections_util import dictify_dataset_collection_instance
from galaxy.util.json import safe_dumps
from galaxy.util.odict import odict
from galaxy.visualization.genomes import GenomeRegion
from galaxy.web import _future_expose_api as expose_api
from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous
from galaxy.web import _future_expose_api_anonymous_and_sessionless as expose_api_anonymous_and_sessionless
from galaxy.web import _future_expose_api_raw_anonymous_and_sessionless as expose_api_raw_anonymous_and_sessionless
from galaxy.web.base.controller import BaseAPIController
from galaxy.web.base.controller import UsesVisualizationMixin
from ._fetch_util import validate_and_normalize_targets

log = logging.getLogger(__name__)

# Do not allow these tools to be called directly - they (it) enforces extra security and
# provides access via a different API endpoint.

[docs]class ToolsController(BaseAPIController, UsesVisualizationMixin): """ RESTful controller for interactions with tools. """
[docs] def __init__(self, app): super(ToolsController, self).__init__(app) self.history_manager = managers.histories.HistoryManager(app) self.hda_manager = managers.hdas.HDAManager(app)
[docs] @expose_api_anonymous_and_sessionless def index(self, trans, **kwds): """ GET /api/tools: returns a list of tools defined by parameters:: parameters: in_panel - if true, tools are returned in panel structure, including sections and labels trackster - if true, only tools that are compatible with Trackster are returned q - if present search on the given query will be performed tool_id - if present the given tool_id will be searched for all installed versions """ # Read params. in_panel = util.string_as_bool(kwds.get('in_panel', 'True')) trackster = util.string_as_bool(kwds.get('trackster', 'False')) q = kwds.get('q', '') tool_id = kwds.get('tool_id', '') # Find whether to search. if q: hits = self._search(q) results = [] if hits: for hit in hits: try: tool = self._get_tool(hit, user=trans.user) if tool: results.append( except exceptions.AuthenticationFailed: pass except exceptions.ObjectNotFound: pass return results # Find whether to detect. if tool_id: detected_versions = self._detect(trans, tool_id) return detected_versions # Return everything. try: return, in_panel=in_panel, trackster=trackster) except Exception: raise exceptions.InternalServerError("Error: Could not convert toolbox to dictionary")
[docs] @expose_api_anonymous_and_sessionless def show(self, trans, id, **kwd): """ GET /api/tools/{tool_id} Returns tool information parameters: io_details - if true, parameters and inputs are returned link_details - if true, hyperlink to the tool is returned tool_version - if provided return this tool version """ io_details = util.string_as_bool(kwd.get('io_details', False)) link_details = util.string_as_bool(kwd.get('link_details', False)) tool_version = kwd.get('tool_version') tool = self._get_tool(id, user=trans.user, tool_version=tool_version) return tool.to_dict(trans, io_details=io_details, link_details=link_details)
[docs] @expose_api_anonymous def build(self, trans, id, **kwd): """ GET /api/tools/{tool_id}/build Returns a tool model including dynamic parameters and updated values, repeats block etc. """ if 'payload' in kwd: kwd = kwd.get('payload') tool_version = kwd.get('tool_version', None) tool = self._get_tool(id, tool_version=tool_version, user=trans.user) return tool.to_json(trans, kwd.get('inputs', kwd))
[docs] @expose_api @web.require_admin def test_data_path(self, trans, id, **kwd): """ GET /api/tools/{tool_id}/test_data_path?tool_version={tool_version} """ # TODO: eliminate copy and paste with above code. if 'payload' in kwd: kwd = kwd.get('payload') tool_version = kwd.get('tool_version', None) tool = self._get_tool(id, tool_version=tool_version, user=trans.user) path = tool.test_data_path(kwd.get("filename")) if path: return path else: raise exceptions.ObjectNotFound("Specified test data path not found.")
[docs] @expose_api_anonymous_and_sessionless def tests_summary(self, trans, **kwd): """ GET /api/tools/tests_summary Fetch summary information for each tool and version combination with tool tests defined. This summary information currently includes tool name and a count of the tests. Fetch complete test data for each tool with /api/tools/{tool_id}/test_data?tool_version=<tool_version> """ test_counts_by_tool = {} for id, tool in tests = tool.tests if tests: if not in test_counts_by_tool: test_counts_by_tool[] = {} available_versions = test_counts_by_tool[] available_versions[tool.version] = { "tool_name":, "count": len(tests), } return test_counts_by_tool
[docs] @expose_api_raw_anonymous_and_sessionless def test_data(self, trans, id, **kwd): """ GET /api/tools/{tool_id}/test_data?tool_version={tool_version} This API endpoint is unstable and experimental. In particular the format of the response has not been entirely nailed down (it exposes too many Galaxy internals/Pythonisms in a rough way). If this endpoint is being used from outside of scripts shipped with Galaxy let us know and please be prepared for the response from this API to change its format in some ways. """ # TODO: eliminate copy and paste with above code. if 'payload' in kwd: kwd = kwd.get('payload') tool_version = kwd.get('tool_version', None) tool = self._get_tool(id, tool_version=tool_version, user=trans.user) # Encode in this method to handle odict objects in tool representation. def json_encodeify(obj): if isinstance(obj, odict): return dict(obj) elif isinstance(obj, map): return list(obj) else: return obj result = [t.to_dict() for t in tool.tests] return safe_dumps(result, default=json_encodeify)
[docs] @expose_api @web.require_admin def reload(self, trans, id, **kwd): """ GET /api/tools/{tool_id}/reload Reload specified tool. """ galaxy.queue_worker.send_control_task(, 'reload_tool', noop_self=True, kwargs={'tool_id': id}) message, status = if status == 'error': raise exceptions.MessageException(message) return {'message': message}
[docs] @expose_api @web.require_admin def all_requirements(self, trans, **kwds): """ GET /api/tools/all_requirements Return list of unique requirements for all tools. """ return
[docs] @expose_api @web.require_admin def requirements(self, trans, id, **kwds): """ GET /api/tools/{tool_id}/requirements Return the resolver status for a specific tool id. [{"status": "installed", "name": "hisat2", "versionless": false, "resolver_type": "conda", "version": "2.0.3", "type": "package"}] """ tool = self._get_tool(id) return tool.tool_requirements_status
[docs] @expose_api @web.require_admin def install_dependencies(self, trans, id, **kwds): """ POST /api/tools/{tool_id}/dependencies This endpoint is also available through POST /api/tools/{tool_id}/install_dependencies, but will be deprecated in the future. Attempts to install requirements via the dependency resolver parameters: build_dependency_cache: If true, attempts to cache dependencies for this tool force_rebuild: If true and cache dir exists, attempts to delete cache dir """ tool = self._get_tool(id) tool._view.install_dependencies(tool.requirements) if kwds.get('build_dependency_cache'): tool.build_dependency_cache(**kwds) # TODO: rework resolver install system to log and report what has been done. # _view.install_dependencies should return a dict with stdout, stderr and success status return tool.tool_requirements_status
[docs] @expose_api @web.require_admin def uninstall_dependencies(self, trans, id, **kwds): """ DELETE /api/tools/{tool_id}/dependencies Attempts to uninstall requirements via the dependency resolver """ tool = self._get_tool(id) tool._view.uninstall_dependencies(index=None, requirements=tool.requirements) # TODO: rework resolver install system to log and report what has been done. return tool.tool_requirements_status
[docs] @expose_api @web.require_admin def build_dependency_cache(self, trans, id, **kwds): """ POST /api/tools/{tool_id}/build_dependency_cache Attempts to cache installed dependencies. parameters: force_rebuild: If true and chache dir exists, attempts to delete cache dir """ tool = self._get_tool(id) tool.build_dependency_cache(**kwds) # TODO: Should also have a more meaningful return. return tool.tool_requirements_status
[docs] @expose_api @web.require_admin def diagnostics(self, trans, id, **kwd): """ GET /api/tools/{tool_id}/diagnostics Return diagnostic information to help debug panel and dependency related problems. """ # TODO: Move this into tool. def to_dict(x): return x.to_dict() tool = self._get_tool(id, user=trans.user) if hasattr(tool, 'lineage'): lineage_dict = tool.lineage.to_dict() else: lineage_dict = None tool_shed_dependencies = tool.installed_tool_dependencies if tool_shed_dependencies: tool_shed_dependencies_dict = list(map(to_dict, tool_shed_dependencies)) else: tool_shed_dependencies_dict = None return { "tool_id":, "tool_version": tool.version, "dependency_shell_commands": tool.build_dependency_shell_commands(), "lineage": lineage_dict, "requirements": list(map(to_dict, tool.requirements)), "installed_tool_shed_dependencies": tool_shed_dependencies_dict, "tool_dir": tool.tool_dir, "tool_shed": tool.tool_shed, "repository_name": tool.repository_name, "repository_owner": tool.repository_owner, "installed_changeset_revision": None, "guid": tool.guid, }
def _detect(self, trans, tool_id): """ Detect whether the tool with the given id is installed. :param tool_id: exact id of the tool :type tool_id: str :return: list with available versions "return type: list """ tools =, get_all_versions=True) detected_versions = [] if tools: for tool in tools: if tool and tool.allow_user_access(trans.user): detected_versions.append(tool.version) return detected_versions def _search(self, q): """ Perform the search on the given query. Boosts and numer of results are configurable in galaxy.ini file. :param q: the query to search with :type q: str :return: Dictionary containing the tools' ids of the best hits. :return type: dict """ tool_name_boost ='tool_name_boost', 9) tool_section_boost ='tool_section_boost', 3) tool_description_boost ='tool_description_boost', 2) tool_label_boost ='tool_label_boost', 1) tool_stub_boost ='tool_stub_boost', 5) tool_help_boost ='tool_help_boost', 0.5) tool_search_limit ='tool_search_limit', 20) tool_enable_ngram_search ='tool_enable_ngram_search', False) tool_ngram_minsize ='tool_ngram_minsize', 3) tool_ngram_maxsize ='tool_ngram_maxsize', 4) results =, tool_name_boost=tool_name_boost, tool_section_boost=tool_section_boost, tool_description_boost=tool_description_boost, tool_label_boost=tool_label_boost, tool_stub_boost=tool_stub_boost, tool_help_boost=tool_help_boost, tool_search_limit=tool_search_limit, tool_enable_ngram_search=tool_enable_ngram_search, tool_ngram_minsize=tool_ngram_minsize, tool_ngram_maxsize=tool_ngram_maxsize) return results
[docs] @expose_api_anonymous_and_sessionless def citations(self, trans, id, **kwds): tool = self._get_tool(id, user=trans.user) rval = [] for citation in tool.citations: rval.append(citation.to_dict('bibtex')) return rval
[docs] @web.expose_api_raw @web.require_admin def download(self, trans, id, **kwds): tool_tarball =, id) trans.response.set_content_type('application/x-gzip') download_file = open(tool_tarball, "rb") trans.response.headers["Content-Disposition"] = 'attachment; filename="%s.tgz"' % (id) return download_file
[docs] @expose_api_anonymous def fetch(self, trans, payload, **kwd): """Adapt clean API to tool-constrained API. """ request_version = '1' history_id = payload.pop("history_id") clean_payload = {} files_payload = {} for key, value in payload.items(): if key == "key": continue if key.startswith('files_') or key.startswith('__files_'): files_payload[key] = value continue clean_payload[key] = value validate_and_normalize_targets(trans, clean_payload) clean_payload["check_content"] = request = dumps(clean_payload) create_payload = { 'tool_id': "__DATA_FETCH__", 'history_id': history_id, 'inputs': { 'request_version': request_version, 'request_json': request, 'file_count': str(len(files_payload)) }, } create_payload.update(files_payload) return self._create(trans, create_payload, **kwd)
[docs] @expose_api_anonymous def create(self, trans, payload, **kwd): """ POST /api/tools Executes tool using specified inputs and returns tool's outputs. """ tool_id = payload.get("tool_id") if tool_id in PROTECTED_TOOLS: raise exceptions.RequestParameterInvalidException("Cannot execute tool [%s] directly, must use alternative endpoint." % tool_id) if tool_id is None: raise exceptions.RequestParameterInvalidException("Must specify a valid tool_id to use this endpoint.") return self._create(trans, payload, **kwd)
def _create(self, trans, payload, **kwd): # HACK: for now, if action is rerun, rerun tool. action = payload.get('action', None) if action == 'rerun': return self._rerun_tool(trans, payload, **kwd) # -- Execute tool. -- # Get tool. tool_version = payload.get('tool_version', None) tool =['tool_id'], tool_version) if 'tool_id' in payload else None if not tool or not tool.allow_user_access(trans.user): raise exceptions.MessageException('Tool not found or not accessible.') if if not trans.user: log.warning("Anonymous user attempts to execute tool, but account activation is turned on.") elif not log.warning("User \"%s\" attempts to execute tool, but account activation is turned on and user account is not active." % # Set running history from payload parameters. # History not set correctly as part of this API call for # dataset upload. history_id = payload.get('history_id', None) if history_id: decoded_id = self.decode_id(history_id) target_history = self.history_manager.get_owned(decoded_id, trans.user, current_history=trans.history) else: target_history = None # Set up inputs. inputs = payload.get('inputs', {}) # Find files coming in as multipart file data and add to inputs. for k, v in payload.items(): if k.startswith('files_') or k.startswith('__files_'): inputs[k] = v # for inputs that are coming from the Library, copy them into the history input_patch = {} for k, v in inputs.items(): if isinstance(v, dict) and v.get('src', '') == 'ldda' and 'id' in v: ldda = trans.sa_session.query(['id'])) if trans.user_is_admin() or, ldda.dataset): input_patch[k] = ldda.to_history_dataset_association(target_history, add_to_history=True) for k, v in input_patch.items(): inputs[k] = v # TODO: encode data ids and decode ids. # TODO: handle dbkeys params = util.Params(inputs, sanitize=False) incoming = params.__dict__ # use_cached_job can be passed in via the top-level payload or among the tool inputs. # I think it should be a top-level parameter, but because the selector is implemented # as a regular tool parameter we accept both. use_cached_job = payload.get('use_cached_job', False) or util.string_as_bool(inputs.get('use_cached_job', 'false')) vars = tool.handle_input(trans, incoming, history=target_history, use_cached_job=use_cached_job) # TODO: check for errors and ensure that output dataset(s) are available. output_datasets = vars.get('out_data', []) rval = {'outputs': [], 'output_collections': [], 'jobs': [], 'implicit_collections': []} job_errors = vars.get('job_errors', []) if job_errors: # If we are here - some jobs were successfully executed but some failed. rval['errors'] = job_errors outputs = rval['outputs'] # TODO:?? poss. only return ids? for output_name, output in output_datasets: output_dict = output.to_dict() # add the output name back into the output data structure # so it's possible to figure out which newly created elements # correspond with which tool file outputs output_dict['output_name'] = output_name outputs.append(, skip_startswith="metadata_")) for job in vars.get('jobs', []): rval['jobs'].append(self.encode_all_ids(trans, job.to_dict(view='collection'), recursive=True)) for output_name, collection_instance in vars.get('output_collections', []): history = target_history or trans.history output_dict = dictify_dataset_collection_instance(collection_instance,, parent=history) output_dict['output_name'] = output_name rval['output_collections'].append(output_dict) for output_name, collection_instance in vars.get('implicit_collections', {}).items(): history = target_history or trans.history output_dict = dictify_dataset_collection_instance(collection_instance,, parent=history) output_dict['output_name'] = output_name rval['implicit_collections'].append(output_dict) return rval # # -- Helper methods -- # def _get_tool(self, id, tool_version=None, user=None): tool =, tool_version) if not tool: raise exceptions.ObjectNotFound("Could not find tool with id '%s'." % id) if not tool.allow_user_access(user): raise exceptions.AuthenticationFailed("Access denied, please login for tool with id '%s'." % id) return tool def _rerun_tool(self, trans, payload, **kwargs): """ Rerun a tool to produce a new output dataset that corresponds to a dataset that a user is currently viewing. """ # # TODO: refactor to use same code as run_tool. # # Run tool on region if region is specificied. run_on_regions = False regions = payload.get('regions', None) if regions: if isinstance(regions, dict): # Regions is a single region. regions = [GenomeRegion.from_dict(regions)] elif isinstance(regions, list): # There is a list of regions. regions = [GenomeRegion.from_dict(r) for r in regions] if len(regions) > 1: # Sort by chrom name, start so that data is not fetched out of order. regions = sorted(regions, key=lambda r: (r.chrom.lower(), r.start)) # Merge overlapping regions so that regions do not overlap # and hence data is not included multiple times. prev = regions[0] cur = regions[1] index = 1 while True: if cur.chrom == prev.chrom and cur.start <= prev.end: # Found overlapping regions, so join them into prev. prev.end = cur.end del regions[index] else: # No overlap, move to next region. prev = cur index += 1 # Get next region or exit. if index == len(regions): # Done. break else: cur = regions[index] run_on_regions = True # Dataset check. decoded_dataset_id = self.decode_id(payload.get('target_dataset_id')) original_dataset = self.hda_manager.get_accessible(decoded_dataset_id, user=trans.user) original_dataset = self.hda_manager.error_if_uploading(original_dataset) msg = self.hda_manager.data_conversion_status(original_dataset) if msg: return msg # Set tool parameters--except non-hidden dataset parameters--using combination of # job's previous parameters and incoming parameters. Incoming parameters # have priority. # original_job = self.hda_manager.creating_job(original_dataset) tool = if not tool or not tool.allow_user_access(trans.user): return tool_params = dict([(, p.value) for p in original_job.parameters]) # TODO: rather than set new inputs using dict of json'ed value, unpack parameters and set using set_param_value below. # TODO: need to handle updates to conditional parameters; conditional # params are stored in dicts (and dicts within dicts). new_inputs = payload['inputs'] tool_params.update(dict([(key, dumps(value)) for key, value in new_inputs.items() if key in tool.inputs and value is not None])) tool_params = tool.params_from_strings(tool_params, # # If running tool on region, convert input datasets (create indices) so # that can regions of data can be quickly extracted. # data_provider_registry = messages_list = [] if run_on_regions: for jida in original_job.input_datasets: input_dataset = jida.dataset data_provider = data_provider_registry.get_data_provider(trans, original_dataset=input_dataset, source='data') if data_provider and (not data_provider.converted_dataset or data_provider.converted_dataset.state != # Can convert but no converted dataset yet, so return message about why. data_sources = input_dataset.datatype.data_sources msg = input_dataset.convert_dataset(trans, data_sources['data']) if msg is not None: messages_list.append(msg) # Return any messages generated during conversions. return_message = self._get_highest_priority_msg(messages_list) if return_message: return return_message # # Set target history (the history that tool will use for inputs/outputs). # If user owns dataset, put new data in original dataset's history; if # user does not own dataset (and hence is accessing dataset via sharing), # put new data in user's current history. # if original_dataset.history.user == trans.user: target_history = original_dataset.history else: target_history = trans.get_history(create=True) hda_permissions = def set_param_value(param_dict, param_name, param_value): """ Set new parameter value in a tool's parameter dictionary. """ # Recursive function to set param value. def set_value(param_dict, group_name, group_index, param_name, param_value): if group_name in param_dict: param_dict[group_name][group_index][param_name] = param_value return True elif param_name in param_dict: param_dict[param_name] = param_value return True else: # Recursive search. return_val = False for value in param_dict.values(): if isinstance(value, dict): return_val = set_value(value, group_name, group_index, param_name, param_value) if return_val: return return_val return False # Parse parameter name if necessary. if param_name.find("|") == -1: # Non-grouping parameter. group_name = group_index = None else: # Grouping parameter. group, param_name = param_name.split("|") index = group.rfind("_") group_name = group[:index] group_index = int(group[index + 1:]) return set_value(param_dict, group_name, group_index, param_name, param_value) # Set parameters based tool's trackster config. params_set = {} for action in tool.trackster_conf.actions: success = False for joda in original_job.output_datasets: if == action.output_name: set_param_value(tool_params,, joda.dataset) params_set[] = True success = True break if not success: return # # Set input datasets for tool. If running on regions, extract and use subset # when possible. # if run_on_regions: regions_str = ",".join([str(r) for r in regions]) for jida in original_job.input_datasets: # If param set previously by config actions, do nothing. if in params_set: continue input_dataset = jida.dataset if input_dataset is None: # optional dataset and dataset wasn't selected tool_params[] = None elif run_on_regions and 'data' in input_dataset.datatype.data_sources: # Dataset is indexed and hence a subset can be extracted and used # as input. # Look for subset. subset_dataset_association = trans.sa_session.query( \ .filter_by(hda=input_dataset, location=regions_str) \ .first() if subset_dataset_association: # Data subset exists. subset_dataset = subset_dataset_association.subset else: # Need to create subset. data_source = input_dataset.datatype.data_sources['data'] input_dataset.get_converted_dataset(trans, data_source) input_dataset.get_converted_dataset_deps(trans, data_source) # Create new HDA for input dataset's subset. new_dataset =, dbkey=input_dataset.dbkey, create_dataset=True, sa_session=trans.sa_session, name="Subset [%s] of data %i" % (regions_str, input_dataset.hid), visible=False) target_history.add_dataset(new_dataset) trans.sa_session.add(new_dataset), hda_permissions) # Write subset of data to new dataset data_provider = data_provider_registry.get_data_provider(trans, original_dataset=input_dataset, source='data') data_provider.write_data_to_file(regions, new_dataset.file_name) # TODO: (a) size not working; (b) need to set peek. new_dataset.set_size() = "Data subset for trackster" new_dataset.set_dataset_state( # Set metadata. # TODO: set meta internally if dataset is small enough?, trans, incoming={'input1': new_dataset}, overwrite=False, job_params={"source": "trackster"}) # Add HDA subset association. subset_association =, subset=new_dataset, location=regions_str) trans.sa_session.add(subset_association) subset_dataset = new_dataset trans.sa_session.flush() # Add dataset to tool's parameters. if not set_param_value(tool_params,, subset_dataset): return {"error": True, "message": "error setting parameter %s" %} # # Execute tool and handle outputs. # try: subset_job, subset_job_outputs = tool.execute(trans, incoming=tool_params, history=target_history, job_params={"source": "trackster"}) except Exception as e: # Lots of things can go wrong when trying to execute tool. return {"error": True, "message": e.__class__.__name__ + ": " + str(e)} if run_on_regions: for output in subset_job_outputs.values(): output.visible = False trans.sa_session.flush() # # Return new track that corresponds to the original dataset. # output_name = None for joda in original_job.output_datasets: if joda.dataset == original_dataset: output_name = break for joda in subset_job.output_datasets: if == output_name: output_dataset = joda.dataset dataset_dict = output_dataset.to_dict() dataset_dict['id'] =['id']) dataset_dict['track_config'] = self.get_new_track_config(trans, output_dataset) return dataset_dict