Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.base.api
import os
import stat
import uuid
from collections.abc import Mapping
from logging import getLogger
from typing import (
Any,
Optional,
TYPE_CHECKING,
)
import anyio
from fastapi import (
FastAPI,
HTTPException,
Request,
status,
)
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from starlette.responses import (
FileResponse,
Response,
StreamingResponse,
)
from starlette_context import context
from starlette_context.middleware import RawContextMiddleware
from starlette_context.plugins import (
Plugin,
RequestIdPlugin,
)
from galaxy.exceptions import MessageException
from galaxy.exceptions.utils import (
api_error_to_dict,
validation_error_to_message_exception,
)
from galaxy.util.path import StrPath
from galaxy.web.framework.base import walk_controller_modules
if TYPE_CHECKING:
from starlette.background import BackgroundTask
from starlette.routing import BaseRoute
from starlette.types import (
Receive,
Scope,
Send,
)
from galaxy.schema.schema import MessageExceptionModel
log = getLogger(__name__)
# Copied from https://github.com/tiangolo/fastapi/issues/1240#issuecomment-1055396884
def _get_range_header(range_header: str, file_size: int) -> tuple[int, int]:
def _invalid_range():
return HTTPException(
status.HTTP_416_RANGE_NOT_SATISFIABLE,
detail=f"Invalid request range (Range:{range_header!r})",
)
try:
h = range_header.replace("bytes=", "").rsplit("-", 1)
start = int(h[0]) if h[0] != "" else 0
end = int(h[1]) if h[1] != "" else file_size - 1
except ValueError:
raise _invalid_range()
if start > end or start < 0 or end > file_size - 1:
raise _invalid_range()
return start, end
def _live_request_sessions() -> list:
"""Concrete request-scoped SQLAlchemy Sessions that already exist for this request.
Reads ``galaxy.app.app``'s ``model`` + ``install_model`` scoped registries
directly, keyed by the current request-id (``request_scopefunc``). It never
calls the ``scoped_session`` proxy, which would lazily *create* a session.
Returns ``[]`` when no Galaxy app is bound (the tool shed / reports webapps
reuse this module) or when no session was opened during the request.
"""
try:
from galaxy import app as galaxy_app # lazy: keep base/api.py importable by non-Galaxy webapps
except Exception:
return []
app = getattr(galaxy_app, "app", None)
if app is None:
return []
sessions = []
for mapping in (getattr(app, "model", None), getattr(app, "install_model", None)):
if mapping is None:
continue
existing = mapping.scoped_registry.registry.get(mapping.request_scopefunc())
if existing is not None:
sessions.append(existing)
return sessions
def _release_request_sessions(sessions: list) -> None:
"""Close the captured request-scoped sessions, returning their pooled connections.
``Session.close()`` is idempotent, so the later guarded close in
``get_app_with_request_session``'s teardown is harmless.
"""
for session in sessions:
try:
session.close()
except Exception:
log.warning("Failed to release request-scoped DB session before streaming", exc_info=True)
[docs]
class GalaxyFileResponse(FileResponse):
"""
Augments starlette FileResponse with x-accel-redirect/x-sendfile and byte-range handling.
Like :class:`GalaxyStreamingResponse`, it releases the request-scoped DB
connection(s) before sending the file body — see that class's docstring for
the rationale and the contract/footgun. File downloads never touch the
database after the response is constructed.
"""
nginx_x_accel_redirect_base: str | None = None
apache_xsendfile: bool | None = None
[docs]
def __init__(
self,
path: StrPath,
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: Optional["BackgroundTask"] = None,
filename: str | None = None,
stat_result: os.stat_result | None = None,
content_disposition_type: str = "attachment",
) -> None:
super().__init__(
path=path,
status_code=status_code,
headers=headers,
media_type=media_type,
background=background,
filename=filename,
stat_result=stat_result,
content_disposition_type=content_disposition_type,
)
self.headers["accept-ranges"] = "bytes"
self.xsendfile = self.nginx_x_accel_redirect_base or self.apache_xsendfile
if self.nginx_x_accel_redirect_base:
self.headers["x-accel-redirect"] = self.nginx_x_accel_redirect_base + os.path.abspath(path)
elif self.apache_xsendfile:
self.headers["x-sendfile"] = os.path.abspath(path)
# Capture the live request-scoped session(s) now, while we are in the
# request's asyncio task (so the request-id ContextVar resolves), to
# release before streaming the body. See GalaxyStreamingResponse.
self._sessions_to_release = _live_request_sessions()
async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
if self.stat_result is None:
try:
stat_result = await anyio.to_thread.run_sync(os.stat, self.path)
self.set_stat_headers(stat_result)
except FileNotFoundError:
raise RuntimeError(f"File at path {self.path} does not exist.")
else:
mode = stat_result.st_mode
if not stat.S_ISREG(mode):
raise RuntimeError(f"File at path {self.path} is not a file.")
# This is where we diverge from the superclass, this adds support for byte range requests
is_head_request = scope["method"].upper() == "HEAD"
if not is_head_request and self.xsendfile:
# Not a head request, but nginx_x_accel_redirect_base / send_header_only, we don't send a body
self.headers["content-length"] = "0"
send_header_only = self.xsendfile or is_head_request
start = 0
end = stat_result.st_size - 1
if not send_header_only:
http_range = ""
for key, value in scope["headers"]:
if key == b"range":
http_range = value.decode("latin-1")
start, end = _get_range_header(http_range, stat_result.st_size)
self.headers["content-length"] = str(end - start + 1)
self.headers["content-range"] = f"bytes {start}-{end}/{stat_result.st_size}"
self.status_code = status.HTTP_206_PARTIAL_CONTENT
break
# All DB work for this request is done; the body below is pure file I/O
# (or a header-only x-accel/x-sendfile/HEAD response). Release the pooled
# connection(s) before we start sending so a slow/large download doesn't
# pin them for the whole transfer.
_release_request_sessions(self._sessions_to_release)
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
if send_header_only:
await send({"type": "http.response.body", "body": b"", "more_body": False})
else:
# This also diverges from the superclass by seeking to start and limiting to end if handling byte range requests
async with await anyio.open_file(self.path, mode="rb") as file:
more_body = True
if start:
await file.seek(start)
while more_body:
if http_range:
pos = await file.tell()
read_size = min(self.chunk_size, end + 1 - pos)
if pos + read_size == end + 1:
more_body = False
else:
read_size = self.chunk_size
chunk = await file.read(read_size)
if more_body:
more_body = len(chunk) == self.chunk_size
await send(
{
"type": "http.response.body",
"body": chunk,
"more_body": more_body,
}
)
if self.background is not None:
await self.background()
[docs]
class GalaxyStreamingResponse(StreamingResponse):
"""A ``StreamingResponse`` that releases the request-scoped DB connection(s)
before the body starts streaming.
WHY: FastAPI's request-scoped SQLAlchemy session (and its pooled DB
connection) is held checked-out by the ``get_app_with_request_session``
yield-dependency until the *entire* response body has been sent. For a
long-lived stream (SSE, a large file/archive download, an upstream proxy)
that can be minutes to hours, pinning one pooled connection per in-flight
stream and exhausting the pool / server connection slots. This class closes
the session(s) the moment streaming begins, returning the connection to the
pool while bytes are still flowing.
CONTRACT — read before using this class:
This is ONLY safe when the response body performs NO database access after
the response object is constructed. Every byte the body yields must come
from data already materialized (a zipstream over files on disk, a
``BytesIO``, an upstream HTTP proxy, the SSE in-memory queue). If the body
lazily loads an ORM relationship, issues a query, or commits, it trips the
FOOTGUN below.
FOOTGUN: the request-id ``ContextVar`` that keys the ``scoped_session`` is
still set while the body streams (same asyncio task). If body code touches
``app.model.session`` after we close it, ``scoped_session`` will SILENTLY
create a brand-new session under the same request-id — re-pinning a
connection for the rest of the stream, and any writes on it are a latent
correctness bug. Do not stream DB-backed lazy data through this class.
Idempotency: ``Session.close()`` is a no-op on an already-closed session, and
the dependency teardown's ``unset_request_id`` is guarded, so the
double-close at request end is harmless. This class never deletes the
registry entry — the teardown owns that.
"""
[docs]
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# Capture concrete, already-open request-scoped sessions NOW, while we
# are guaranteed to be in the request's asyncio task (so the request-id
# ContextVar resolves). We never call the scoped proxy, which would
# lazily create a session if none exists.
self._sessions_to_release = _live_request_sessions()
[docs]
async def stream_response(self, send: "Send") -> None:
_release_request_sessions(self._sessions_to_release)
await super().stream_response(send)
[docs]
def add_sentry_middleware(app: FastAPI) -> None:
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
app.add_middleware(SentryAsgiMiddleware)
[docs]
def get_error_response_for_request(request: Request, exc: MessageException) -> JSONResponse:
error_dict = api_error_to_dict(exception=exc)
status_code = exc.status_code
if "ga4gh" in (path := request.url.path):
# When serving GA4GH APIs use limited exceptions to conform their expected
# error schema. Tailored to DRS currently.
message = error_dict["err_msg"]
if "drs" in path:
content = {"status_code": status_code, "msg": message}
elif "trs" in path:
content = {"code": status_code, "message": message}
else:
# unknown schema - just yield the most useful error message
content = error_dict
else:
content = error_dict
retry_after: int | None = getattr(exc, "retry_after", None)
headers: dict[str, str] = {}
if retry_after:
headers["Retry-After"] = str(retry_after)
return JSONResponse(status_code=status_code, content=content, headers=headers)
[docs]
def add_exception_handler(app: FastAPI) -> None:
@app.exception_handler(RequestValidationError)
async def validate_exception_middleware(request: Request, exc: RequestValidationError) -> Response:
message_exception = validation_error_to_message_exception(exc)
return get_error_response_for_request(request, message_exception)
@app.exception_handler(MessageException)
async def message_exception_middleware(request: Request, exc: MessageException) -> Response:
# Intentionally not logging traceback here as the full context will be
# dispatched to Sentry if configured. This just makes logs less opaque
# when one sees a 500.
if exc.status_code >= 500:
log.info(f"MessageException: {exc}")
return get_error_response_for_request(request, exc)
[docs]
class AccessLoggingMiddleware(Plugin):
key = "access_line"
[docs]
async def process_request(self, request):
scope = request.scope
path = scope["root_path"] + scope["path"]
if scope["query_string"]:
path = f"{path}?{scope['query_string'].decode('ascii')}"
access_line = f"{scope['method']} {path} {uuid.uuid4()}"
log.debug(access_line)
return access_line
[docs]
async def enrich_response(self, response) -> None:
access_line = context.get("access_line")
if status := response.get("status"):
log.debug(f"{access_line} {status}")
[docs]
def add_raw_context_middlewares(app: FastAPI):
getLogger("uvicorn.access").handlers = []
plugins = (RequestIdPlugin(force_new_uuid=True), AccessLoggingMiddleware())
app.add_middleware(RawContextMiddleware, plugins=plugins)
[docs]
def add_request_id_middleware(app: FastAPI):
app.add_middleware(RawContextMiddleware, plugins=(RequestIdPlugin(force_new_uuid=True),))
[docs]
def build_route_name_index(app: FastAPI) -> dict[str, list["BaseRoute"]]:
"""Build a name -> [route] index for O(1) route lookup.
Routes are immutable after app startup, so this index is built once
and reused for all subsequent requests. For most route names there
is exactly one candidate, making lookups O(1) instead of O(n).
"""
index: dict[str, list[BaseRoute]] = {}
for route in app.routes:
name = getattr(route, "name", None)
if name:
index.setdefault(name, []).append(route)
return index
[docs]
def include_all_package_routers(app: FastAPI, package_name: str):
responses: dict[int | str, dict[str, Any]] = {
"4XX": {
"description": "Request Error",
"model": MessageExceptionModel,
},
"5XX": {
"description": "Server Error",
"model": MessageExceptionModel,
},
}
for _, module in walk_controller_modules(package_name):
router = getattr(module, "router", None)
if router:
app.include_router(router, responses=responses)
# handle CORS preflight requests - synchronize with wsgi behavior.
# this needs to happen last so it doesn't clobber routes with explicit cors handling
# it doesn't affect the CORS middleware since the middleware terminates the request handling before routing
@app.options("/api/{rest_of_path:path}", include_in_schema=False)
async def preflight_handler(request: Request, rest_of_path: str) -> Response:
response = Response()
response.headers["Access-Control-Allow-Headers"] = "*"
response.headers["Access-Control-Max-Age"] = "600"
return response