Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.base.instrument

""" Utilities to help instrument tool tests.

Including structured data nose plugin that allows storing arbitrary structured
data on a per test case basis - used by tool test to store inputs,
output problems, job tests, etc... but could easily by used by other test
types in a different way.
"""

import json
import threading

from nose.plugins import Plugin

NO_JOB_DATA = object()
JOB_DATA = threading.local()
JOB_DATA.new = True
JOB_DATA.data = NO_JOB_DATA


[docs]def register_job_data(data): if not JOB_DATA.new: return JOB_DATA.data = data JOB_DATA.new = False
[docs]def fetch_job_data(): try: if JOB_DATA.new: return NO_JOB_DATA else: return JOB_DATA.data finally: JOB_DATA.new = True
[docs]class StructuredTestDataPlugin(Plugin): name = 'structureddata'
[docs] def options(self, parser, env): super(StructuredTestDataPlugin, self).options(parser, env=env) parser.add_option( '--structured-data-file', action='store', dest='structured_data_file', metavar="FILE", default=env.get('NOSE_STRUCTURED_DATA', 'structured_test_data.json'), help=("Path to JSON file to store the Galaxy structured data report in." "Default is structured_test_data.json in the working directory " "[NOSE_STRUCTURED_DATA]"))
[docs] def configure(self, options, conf): super(StructuredTestDataPlugin, self).configure(options, conf) self.conf = conf if not self.enabled: return self.tests = [] self.structured_data_report_file = open(options.structured_data_file, 'w')
[docs] def finalize(self, result): pass
def _handle_result(self, test, *args, **kwds): job_data = fetch_job_data() id = test.id() has_data = job_data is not NO_JOB_DATA entry = { 'id': id, 'has_data': has_data, 'data': job_data if has_data else None, } self.tests.append(entry) addError = _handle_result addFailure = _handle_result addSuccess = _handle_result
[docs] def report(self, stream): report_obj = { 'version': '0.1', 'tests': self.tests, } json.dump(report_obj, self.structured_data_report_file) self.structured_data_report_file.close()