Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web.framework.middleware.profile
"""
Middleware that profiles the request with cProfile and displays profiling
information at the bottom of each page.
"""
import cProfile
import pstats
import threading
import markupsafe
from paste import response
template = """
<script>
function show_profile_output()
{
var win = window.open("", "win"); // a window object
var doc = win.document;
doc.open("text/html", "replace");
doc.write("<HTML><HEAD><TITLE>Profiler output</TITLE></HEAD><BODY>")
doc.write(document.getElementById( 'profile_output' ).innerHTML)
doc.write("</BODY></HTML>");
doc.close();
}
function show_inline()
{
document.getElementById( 'profile_output' ).style.display="block";
}
</script>
<div style="background-color: #ff9; color: #000; border: 2px solid #000; padding: 5px;">
show profile output: <a href="javascript:show_inline();">inline</a> | <a href="javascript:show_profile_output();">new window</a>
<div id="profile_output" style="display: none">
<hr />
%s
</div>
</div>
"""
[docs]class ProfileMiddleware:
"""
Middleware that profiles all requests.
All HTML pages will have profiling information appended to them.
The data is isolated to that single request, and does not include
data from previous requests.
"""
[docs] def __init__(self, app, global_conf=None, limit=40):
self.app = app
self.lock = threading.Lock()
self.limit = limit
def __call__(self, environ, start_response):
catch_response = []
body = []
def replace_start_response(status, headers, exc_info=None):
catch_response.extend([status, headers])
start_response(status, headers, exc_info)
return body.append
def run_app():
body.extend(self.app(environ, replace_start_response))
# Run in profiler
prof = cProfile.Profile()
prof.runctx("run_app()", globals(), locals())
# Build up body with stats
body = "".join(body)
headers = catch_response[1]
content_type = response.header_value(headers, "content-type")
if not content_type.startswith("text/html"):
# We can't add info to non-HTML output
return [body]
stats = pstats.Stats(prof)
stats.strip_dirs()
stats.sort_stats("time", "calls")
output = pstats_as_html(stats, self.limit)
body += template % output
return [body]
[docs]def pstats_as_html(stats, *sel_list):
"""
Return an HTML representation of a pstats.Stats object.
"""
rval = []
# Number of function calls, primitive calls, total time
rval.append(
f"<div>{stats.total_calls} function calls ({stats.prim_calls} primitive) in {stats.total_tt:0.3f} CPU seconds</div>"
)
# Extract functions that match 'sel_list'
funcs, order_message, select_message = get_func_list(stats, sel_list)
# Deal with any ordering or selection messages
if order_message:
rval.append(f"<div>{markupsafe.escape(order_message)}</div>")
if select_message:
rval.append(f"<div>{markupsafe.escape(select_message)}</div>")
# Build a table for the functions
if funcs:
rval.append("<table>")
# Header
rval.append(
"<tr><th>ncalls</th>"
"<th>tottime</th>"
"<th>percall</th>"
"<th>cumtime</th>"
"<th>percall</th>"
"<th>filename:lineno(function)</th></tr>"
)
for func in funcs:
rval.append("<tr>")
# Calculate each field
cc, nc, tt, ct, callers = stats.stats[func]
# ncalls
ncalls = str(nc)
if nc != cc:
ncalls = f"{ncalls}/{str(cc)}"
rval.append(f"<td>{markupsafe.escape(ncalls)}</td>")
# tottime
rval.append(f"<td>{tt:0.8f}</td>")
# percall
if nc == 0:
percall = ""
else:
percall = f"{tt / nc:0.8f}"
rval.append(f"<td>{markupsafe.escape(percall)}</td>")
# cumtime
rval.append(f"<td>{ct:0.8f}</td>")
# ctpercall
if cc == 0:
ctpercall = ""
else:
ctpercall = f"{ct / cc:0.8f}"
rval.append(f"<td>{markupsafe.escape(ctpercall)}</td>")
# location
rval.append(f"<td>{markupsafe.escape(func_std_string(func))}</td>")
# row complete
rval.append("</tr>")
rval.append("</table>")
# Concatenate result
return "".join(rval)
[docs]def get_func_list(stats, sel_list):
"""
Use 'sel_list' to select a list of functions to display.
"""
# Determine if an ordering was applied
if stats.fcn_list:
list = stats.fcn_list[:]
order_message = f"Ordered by: {stats.sort_type}"
else:
list = list(stats.stats.keys())
order_message = "Random listing order was used"
# Do the selection and accumulate messages
select_message = ""
for selection in sel_list:
list, select_message = stats.eval_print_amount(selection, list, select_message)
# Return the list of functions selected and the message
return list, order_message, select_message
[docs]def func_std_string(func_name):
"""
Match what old profile produced
"""
if func_name[:2] == ("~", 0):
# special case for built-in functions
name = func_name[2]
if name.startswith("<") and name.endswith(">"):
return f"{{{name[1:-1]}}}"
else:
return name
else:
return "{}:{}({})".format(*func_name)