Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.orm.engine_factory
import inspect
import logging
import os
import threading
import time
from multiprocessing.util import register_after_fork
from sqlalchemy import (
create_engine,
event,
)
from sqlalchemy.engine import Engine
log = logging.getLogger(__name__)
QUERY_COUNT_LOCAL = threading.local()
WORKING_DIRECTORY = os.getcwd()
[docs]def log_request_query_counts(req_id):
try:
times = QUERY_COUNT_LOCAL.times
if times:
log.info("Executed [{}] SQL requests in for web request [{}] ({:0.3f} ms)".format(len(times), req_id, sum(times) * 1000.))
except AttributeError:
# Didn't record anything so don't worry.
pass
[docs]def stripwd(s):
if s.startswith(WORKING_DIRECTORY):
return s[len(WORKING_DIRECTORY):]
return s
[docs]def pretty_stack():
rval = []
for _, fname, line, funcname, _, _ in inspect.stack()[2:]:
rval.append("%s:%s@%d" % (stripwd(fname), funcname, line))
return rval
[docs]def build_engine(url, engine_options, database_query_profiling_proxy=False, trace_logger=None, slow_query_log_threshold=0, thread_local_log=None, log_query_counts=False):
if database_query_profiling_proxy or slow_query_log_threshold or thread_local_log or log_query_counts:
@event.listens_for(Engine, "before_execute")
def before_execute(conn, clauseelement, multiparams, params):
conn.info.setdefault('query_start_time', []).append(time.time())
if slow_query_log_threshold or thread_local_log or log_query_counts:
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement,
parameters, context, executemany):
total = time.time() - conn.info['query_start_time'].pop(-1)
fragment = 'Slow query: '
if total > slow_query_log_threshold:
log.debug(f"{fragment}{total:f}(s)\n{statement}\nParameters: {parameters}")
if database_query_profiling_proxy:
if trace_logger:
trace_logger.log(
"sqlalchemy_query",
message="Query executed",
statement=statement,
parameters=parameters,
executemany=executemany,
duration=total
)
else:
thread_ident = threading.get_ident()
stack = " > ".join(pretty_stack())
log.debug(f"statement: {statement} parameters: {parameters} executemany: {executemany} duration: {total} stack: {stack} thread: {thread_ident}")
if log_query_counts:
try:
QUERY_COUNT_LOCAL.times.append(total)
except AttributeError:
# Not a web thread.
pass
if thread_local_log is not None:
try:
if thread_local_log.log:
log.debug(f"Request query: {total:f}(s)\n{statement}\nParameters: {parameters}")
except AttributeError:
pass
# Create the database engine
engine = create_engine(url, **engine_options)
register_after_fork(engine, lambda e: e.dispose())
return engine