Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.test

import logging
import os
import os.path

from six import string_types

import galaxy.tools.parameters.basic
import galaxy.tools.parameters.grouping
from galaxy.util import string_as_bool

try:
    from nose.tools import nottest
except ImportError:
    def nottest(x):
        return x

log = logging.getLogger( __name__ )

DEFAULT_FTYPE = 'auto'
DEFAULT_DBKEY = 'hg17'
DEFAULT_INTERACTOR = "api"  # Default mechanism test code uses for interacting with Galaxy instance.
DEFAULT_MAX_SECS = None


[docs]@nottest def parse_tests(tool, tests_source): """ Build ToolTestBuilder objects for each "<test>" elements and return default interactor (if any). """ default_interactor = os.environ.get( 'GALAXY_TEST_DEFAULT_INTERACTOR', DEFAULT_INTERACTOR ) tests_dict = tests_source.parse_tests_to_dict() tests_default_interactor = tests_dict.get( 'interactor', default_interactor ) tests = [] for i, test_dict in enumerate(tests_dict.get('tests', [])): test = ToolTestBuilder( tool, test_dict, i, default_interactor=tests_default_interactor ) tests.append( test ) return tests
[docs]class ToolTestBuilder( object ): """ Encapsulates information about a tool test, and allows creation of a dynamic TestCase class (the unittest framework is very class oriented, doing dynamic tests in this way allows better integration) """
[docs] def __init__( self, tool, test_dict, i, default_interactor ): name = test_dict.get( 'name', 'Test-%d' % (i + 1) ) maxseconds = test_dict.get( 'maxseconds', DEFAULT_MAX_SECS ) if maxseconds is not None: maxseconds = int( maxseconds ) self.tool = tool self.name = name self.maxseconds = maxseconds self.required_files = [] self.inputs = {} self.outputs = [] # By default do not making assertions on number of outputs - but to # test filtering allow explicitly state number of outputs. self.num_outputs = None self.error = False self.exception = None self.__handle_test_dict( test_dict, i, default_interactor )
[docs] def test_data( self ): """ Iterator over metadata representing the required files for upload. """ return test_data_iter( self.required_files )
def __matching_case_for_value( self, cond, declared_value ): test_param = cond.test_param if isinstance(test_param, galaxy.tools.parameters.basic.BooleanToolParameter): if declared_value is None: # No explicit value for param in test case, determine from default query_value = test_param.checked else: query_value = _process_bool_param_value( test_param, declared_value ) def matches_declared_value(case_value): return _process_bool_param_value( test_param, case_value ) == query_value elif isinstance(test_param, galaxy.tools.parameters.basic.SelectToolParameter): if declared_value is not None: # Test case supplied explicit value to check against. def matches_declared_value(case_value): return case_value == declared_value elif test_param.static_options: # No explicit value in test case, not much to do if options are dynamic but # if static options are available can find the one specified as default or # fallback on top most option (like GUI). for (name, value, selected) in test_param.static_options: if selected: default_option = name else: first_option = test_param.static_options[0] first_option_value = first_option[1] default_option = first_option_value def matches_declared_value(case_value): return case_value == default_option else: # No explicit value for this param and cannot determine a # default - give up. Previously this would just result in a key # error exception. msg = "Failed to find test parameter value specification required for conditional %s" % cond.name raise Exception( msg ) # Check the tool's defined cases against predicate to determine # selected or default. for i, case in enumerate( cond.cases ): if matches_declared_value( case.value ): return case else: msg_template = "%s - Failed to find case matching value (%s) for test parameter specification for conditional %s. Remainder of test behavior is unspecified." msg = msg_template % ( self.tool.id, declared_value, cond.name ) log.info( msg ) def __split_if_str( self, value ): split = isinstance(value, string_types) if split: value = value.split(",") return value def __handle_test_dict( self, test_dict, i, default_interactor ): try: # Mechanism test code uses for interacting with Galaxy instance, # until 'api' is the default switch this to API to use its new # features. Once 'api' is the default set to 'twill' to use legacy # features or workarounds. self.interactor = test_dict.get( 'interactor', default_interactor ) self.inputs = self.__process_raw_inputs( self.tool.inputs, test_dict["inputs"] ) self.outputs = test_dict["outputs"] self.output_collections = test_dict["output_collections"] num_outputs = test_dict.get( 'expect_num_outputs', None ) if num_outputs: num_outputs = int( num_outputs ) self.num_outputs = num_outputs self.command_line = test_dict.get("command", None) self.stdout = test_dict.get("stdout", None) self.stderr = test_dict.get("stderr", None) self.expect_exit_code = test_dict.get("expect_exit_code", None) self.expect_failure = test_dict.get("expect_failure", False) self.md5 = test_dict.get("md5", None) except Exception as e: self.inputs = {} self.error = True self.exception = e def __process_raw_inputs( self, tool_inputs, raw_inputs, parent_context=None ): """ Recursively expand flat list of inputs into "tree" form of flat list (| using to nest to new levels) structure and expand dataset information as proceeding to populate self.required_files. """ parent_context = parent_context or RootParamContext() expanded_inputs = {} for key, value in tool_inputs.items(): if isinstance( value, galaxy.tools.parameters.grouping.Conditional ): cond_context = ParamContext( name=value.name, parent_context=parent_context ) case_context = ParamContext( name=value.test_param.name, parent_context=cond_context ) raw_input = case_context.extract_value( raw_inputs ) case_value = raw_input[ 1 ] if raw_input else None case = self.__matching_case_for_value( value, case_value ) if case: for input_name, input_value in case.inputs.items(): case_inputs = self.__process_raw_inputs( { input_name: input_value }, raw_inputs, parent_context=cond_context ) expanded_inputs.update( case_inputs ) if not value.type == "text": expanded_case_value = self.__split_if_str( case.value ) if case_value is not None: # A bit tricky here - we are growing inputs with value # that may be implicit (i.e. not defined by user just # a default defined in tool). So we do not want to grow # expanded_inputs and risk repeat block viewing this # as a new instance with value defined and hence enter # an infinite loop - hence the "case_value is not None" # check. processed_value = _process_simple_value( value.test_param, expanded_case_value ) expanded_inputs[ case_context.for_state() ] = processed_value elif isinstance( value, galaxy.tools.parameters.grouping.Section ): context = ParamContext( name=value.name, parent_context=parent_context ) for r_name, r_value in value.inputs.items(): expanded_input = self.__process_raw_inputs( { context.for_state(): r_value }, raw_inputs, parent_context=context ) if expanded_input: expanded_inputs.update( expanded_input ) elif isinstance( value, galaxy.tools.parameters.grouping.Repeat ): repeat_index = 0 while True: context = ParamContext( name=value.name, index=repeat_index, parent_context=parent_context ) updated = False for r_name, r_value in value.inputs.items(): expanded_input = self.__process_raw_inputs( { context.for_state(): r_value }, raw_inputs, parent_context=context ) if expanded_input: expanded_inputs.update( expanded_input ) updated = True if not updated: break repeat_index += 1 else: context = ParamContext( name=value.name, parent_context=parent_context ) raw_input = context.extract_value( raw_inputs ) if raw_input: (name, param_value, param_extra) = raw_input if not value.type == "text": param_value = self.__split_if_str( param_value ) if isinstance( value, galaxy.tools.parameters.basic.DataToolParameter ): if not isinstance(param_value, list): param_value = [ param_value ] map( lambda v: self.__add_uploaded_dataset( context.for_state(), v, param_extra, value ), param_value ) processed_value = param_value elif isinstance( value, galaxy.tools.parameters.basic.DataCollectionToolParameter ): assert 'collection' in param_extra collection_def = param_extra[ 'collection' ] for ( name, value, extra ) in collection_def.collect_inputs(): require_file( name, value, extra, self.required_files ) processed_value = collection_def else: processed_value = _process_simple_value( value, param_value ) expanded_inputs[ context.for_state() ] = processed_value return expanded_inputs def __add_uploaded_dataset( self, name, value, extra, input_parameter ): if value is None: assert input_parameter.optional, '%s is not optional. You must provide a valid filename.' % name return value return require_file( name, value, extra, self.required_files )
def _process_simple_value( param, param_value ): if isinstance( param, galaxy.tools.parameters.basic.SelectToolParameter ) and hasattr( param, 'static_options' ): # Tests may specify values as either raw value or the value # as they appear in the list - the API doesn't and shouldn't # accept the text value - so we need to convert the text # into the form value. def process_param_value( param_value ): found_value = False value_for_text = None if param.static_options: for (text, opt_value, selected) in param.static_options: if param_value == opt_value: found_value = True if value_for_text is None and param_value == text: value_for_text = opt_value if not found_value and value_for_text is not None: processed_value = value_for_text else: processed_value = param_value return processed_value # Do replacement described above for lists or singleton # values. if isinstance( param_value, list ): processed_value = list( map( process_param_value, param_value ) ) else: processed_value = process_param_value( param_value ) elif isinstance( param, galaxy.tools.parameters.basic.BooleanToolParameter ): # Like above, tests may use the tool define values of simply # true/false. processed_value = _process_bool_param_value( param, param_value ) else: processed_value = param_value return processed_value def _process_bool_param_value( param, param_value ): assert isinstance( param, galaxy.tools.parameters.basic.BooleanToolParameter ) was_list = False if isinstance( param_value, list ): was_list = True param_value = param_value[0] if param.truevalue == param_value: processed_value = True elif param.falsevalue == param_value: processed_value = False else: processed_value = string_as_bool( param_value ) return [ processed_value ] if was_list else processed_value
[docs]@nottest def test_data_iter( required_files ): for fname, extra in required_files: data_dict = dict( fname=fname, metadata=extra.get( 'metadata', [] ), composite_data=extra.get( 'composite_data', [] ), ftype=extra.get( 'ftype', DEFAULT_FTYPE ), dbkey=extra.get( 'dbkey', DEFAULT_DBKEY ), ) edit_attributes = extra.get( 'edit_attributes', [] ) # currently only renaming is supported for edit_att in edit_attributes: if edit_att.get( 'type', None ) == 'name': new_name = edit_att.get( 'value', None ) assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' data_dict['name'] = new_name else: raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) ) yield data_dict
[docs]def require_file( name, value, extra, required_files ): if ( value, extra ) not in required_files: required_files.append( ( value, extra ) ) # these files will be uploaded name_change = [ att for att in extra.get( 'edit_attributes', [] ) if att.get( 'type' ) == 'name' ] if name_change: name_change = name_change[-1].get( 'value' ) # only the last name change really matters value = name_change # change value for select to renamed uploaded file for e.g. composite dataset else: for end in [ '.zip', '.gz' ]: if value.endswith( end ): value = value[ :-len( end ) ] break value = os.path.basename( value ) # if uploading a file in a path other than root of test-data return value
[docs]class ParamContext(object):
[docs] def __init__( self, name, index=None, parent_context=None ): self.parent_context = parent_context self.name = name self.index = None if index is None else int( index )
[docs] def for_state( self ): name = self.name if self.index is None else "%s_%d" % ( self.name, self.index ) parent_for_state = self.parent_context.for_state() if parent_for_state: return "%s|%s" % ( parent_for_state, name ) else: return name
def __str__( self ): return "Context[for_state=%s]" % self.for_state()
[docs] def param_names( self ): for parent_context_param in self.parent_context.param_names(): if self.index is not None: yield "%s|%s_%d" % ( parent_context_param, self.name, self.index ) else: yield "%s|%s" % ( parent_context_param, self.name ) if self.index is not None: yield "%s_%d" % ( self.name, self.index ) else: yield self.name
[docs] def extract_value( self, raw_inputs ): for param_name in self.param_names(): value = self.__raw_param_found( param_name, raw_inputs) if value: return value return None
def __raw_param_found( self, param_name, raw_inputs ): index = None for i, raw_input in enumerate( raw_inputs ): if raw_input[ 0 ] == param_name: index = i if index is not None: raw_input = raw_inputs[ index ] del raw_inputs[ index ] return raw_input else: return None
[docs]class RootParamContext(object):
[docs] def __init__( self ): pass
[docs] def for_state( self ): return ""
[docs] def param_names( self ): return []
[docs] def get_index( self ): return 0