Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.base.instrument
""" Utilities to help instrument tool tests.
Including structured data nose plugin that allows storing arbitrary structured
data on a per test case basis - used by tool test to store inputs,
output problems, job tests, etc... but could easily by used by other test
types in a different way.
"""
import json
import threading
from nose.plugins import Plugin
NO_JOB_DATA = object()
JOB_DATA = threading.local()
JOB_DATA.new = True
JOB_DATA.data = NO_JOB_DATA
[docs]def register_job_data(data):
if not JOB_DATA.new:
return
JOB_DATA.data = data
JOB_DATA.new = False
[docs]def fetch_job_data():
try:
if JOB_DATA.new:
return NO_JOB_DATA
else:
return JOB_DATA.data
finally:
JOB_DATA.new = True
[docs]class StructuredTestDataPlugin(Plugin):
name = 'structureddata'
[docs] def options(self, parser, env):
super(StructuredTestDataPlugin, self).options(parser, env=env)
parser.add_option(
'--structured-data-file', action='store',
dest='structured_data_file', metavar="FILE",
default=env.get('NOSE_STRUCTURED_DATA', 'structured_test_data.json'),
help=("Path to JSON file to store the Galaxy structured data report in."
"Default is structured_test_data.json in the working directory "
"[NOSE_STRUCTURED_DATA]"))
[docs] def configure(self, options, conf):
super(StructuredTestDataPlugin, self).configure(options, conf)
self.conf = conf
if not self.enabled:
return
self.tests = []
self.structured_data_report_file = open(options.structured_data_file, 'w')
def _handle_result(self, test, *args, **kwds):
job_data = fetch_job_data()
id = test.id()
has_data = job_data is not NO_JOB_DATA
entry = {
'id': id,
'has_data': has_data,
'data': job_data if has_data else None,
}
self.tests.append(entry)
addError = _handle_result
addFailure = _handle_result
addSuccess = _handle_result
[docs] def report(self, stream):
report_obj = {
'version': '0.1',
'tests': self.tests,
}
json.dump(report_obj, self.structured_data_report_file)
self.structured_data_report_file.close()