Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web.framework.middleware.remoteuser
"""
Middleware for handling $REMOTE_USER if use_remote_user is enabled.
"""
import logging
import socket
from galaxy.util import safe_str_cmp
log = logging.getLogger(__name__)
errorpage = """
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html lang="en">
<head>
<title>Galaxy</title>
<style type="text/css">
body {
min-width: 500px;
text-align: center;
}
.errormessage {
font: 75%% verdana, "Bitstream Vera Sans", geneva, arial, helvetica, helve, sans-serif;
padding: 10px;
margin: 100px auto;
min-height: 32px;
max-width: 500px;
border: 1px solid #AA6666;
background-color: #FFCCCC;
text-align: left;
}
</style>
</head>
<body>
<div class="errormessage">
<h4>%s</h4>
<p>%s</p>
</div>
</body>
</html>
"""
[docs]class RemoteUser:
[docs] def __init__(
self,
app,
maildomain=None,
display_servers=None,
admin_users=None,
single_user=None,
remote_user_header=None,
remote_user_secret_header=None,
normalize_remote_user_email=False,
):
self.app = app
self.maildomain = maildomain
self.display_servers = display_servers or []
self.admin_users = admin_users or []
self.remote_user_header = remote_user_header or "HTTP_REMOTE_USER"
self.single_user = single_user
self.config_secret_header = remote_user_secret_header
self.normalize_remote_user_email = normalize_remote_user_email
def __call__(self, environ, start_response):
# Allow display servers
if self.display_servers and "REMOTE_ADDR" in environ:
try:
host = socket.gethostbyaddr(environ["REMOTE_ADDR"])[0]
except (OSError, socket.herror, socket.gaierror, socket.timeout):
# in the event of a lookup failure, deny access
host = None
if host in self.display_servers:
environ[self.remote_user_header] = "remote_display_server@%s" % (self.maildomain or "example.org")
return self.app(environ, start_response)
if self.single_user:
assert self.remote_user_header not in environ
environ[self.remote_user_header] = self.single_user
if environ.get(self.remote_user_header, "").startswith("(null)"):
# Throw away garbage headers.
# Apache sets REMOTE_USER to the string '(null)' when using the
# Rewrite* method for passing REMOTE_USER and a user is not authenticated.
# Any other possible values need to go here as well.
log.debug(
"Discarding invalid remote user header %s:%s.",
self.remote_user_header,
environ.get(self.remote_user_header, None),
)
environ.pop(self.remote_user_header)
if self.remote_user_header in environ:
# process remote user with configuration options.
if self.normalize_remote_user_email:
environ[self.remote_user_header] = environ[self.remote_user_header].lower()
if self.maildomain and "@" not in environ[self.remote_user_header]:
environ[self.remote_user_header] = f"{environ[self.remote_user_header]}@{self.maildomain}"
path_info = environ.get("PATH_INFO", "")
# The API handles its own authentication via keys
# Check for API key before checking for header
if path_info.startswith("/api/"):
return self.app(environ, start_response)
# If the secret header is enabled, we expect upstream to send along some key
# in HTTP_GX_SECRET, so we'll need to compare that here to the correct value
#
# This is not an ideal location for this function. The reason being
# that because this check is done BEFORE the REMOTE_USER check, it is
# possible to attack the GX_SECRET key without having correct
# credentials. However, that's why it's not "ideal", but it is "good
# enough". The only users able to exploit this are ones with access to
# the local system (unless Galaxy is listening on 0.0.0.0....). It
# seems improbable that an attacker with access to the server hosting
# Galaxy would not have access to Galaxy itself, and be attempting to
# attack the system
if self.config_secret_header is not None:
if environ.get("HTTP_GX_SECRET") is None:
title = "Access to Galaxy is denied"
message = """
Galaxy is configured to authenticate users via an external
method (such as HTTP authentication in Apache), but
no shared secret key was provided by the
upstream (proxy) server.</p>
<p>Please contact your local Galaxy administrator. The
variable <code>remote_user_secret</code> and
<code>GX_SECRET</code> header must be set before you may
access Galaxy.
"""
return self.error(start_response, title, message)
if not safe_str_cmp(environ.get("HTTP_GX_SECRET", ""), self.config_secret_header):
title = "Access to Galaxy is denied"
message = """
Galaxy is configured to authenticate users via an external
method (such as HTTP authentication in Apache), but an
incorrect shared secret key was provided by the
upstream (proxy) server.</p>
<p>Please contact your local Galaxy administrator. The
variable <code>remote_user_secret</code> and
<code>GX_SECRET</code> header must be set before you may
access Galaxy.
"""
return self.error(start_response, title, message)
if environ.get(self.remote_user_header, None):
if not environ[self.remote_user_header].count("@"):
if self.maildomain is not None:
environ[self.remote_user_header] += f"@{self.maildomain}"
else:
title = "Access to Galaxy is denied"
message = """
Galaxy is configured to authenticate users via an external
method (such as HTTP authentication in Apache), but only a
username (not an email address) was provided by the
upstream (proxy) server. Since Galaxy usernames are email
addresses, a default mail domain must be set.</p>
<p>Please contact your local Galaxy administrator. The
variable <code>remote_user_maildomain</code> must be set
before you may access Galaxy.
"""
return self.error(start_response, title, message)
user_accessible_paths = (
"/users",
"/user/api_key",
"/user/edit_username",
"/user/dbkeys",
"/user/logout",
"/user/toolbox_filters",
"/user/set_default_permissions",
)
admin_accessible_paths = (
"/user/create",
"/user/logout",
"/user/manage_user_info",
"/user/edit_info",
)
if not path_info.startswith("/user"):
# shortcut the following allowlist for non-user-controller
# requests.
pass
elif environ[self.remote_user_header] in self.admin_users and any(
path_info.startswith(prefix) for prefix in admin_accessible_paths
):
# If the user is an admin user, and any of the admin accessible paths match..., allow them to execute that action.
pass
elif any(path_info.startswith(prefix) for prefix in user_accessible_paths):
# If the user is allowed to access the path, pass
pass
elif path_info == "/user" or path_info == "/user/":
pass # We do allow access to the root user preferences page.
elif path_info.startswith("/user"):
# Any other endpoint in the user controller is off limits
title = "Access to Galaxy user controls is disabled"
message = """
User controls are disabled when Galaxy is configured
for external authentication.
"""
return self.error(start_response, title, message)
return self.app(environ, start_response)
else:
log.debug(f"Unable to identify user. {self.remote_user_header} not found")
for k, v in environ.items():
log.debug("%s = %s", k, v)
title = "Access to Galaxy is denied"
message = """
Galaxy is configured to authenticate users via an external
method (such as HTTP authentication in Apache), but a username
was not provided by the upstream (proxy) server. This is
generally due to a misconfiguration in the upstream server.</p>
<p>Please contact your local Galaxy administrator.
"""
return self.error(start_response, title, message)
[docs] def error(self, start_response, title="Access denied", message="Please contact your local Galaxy administrator."):
start_response("403 Forbidden", [("Content-type", "text/html")])
return [errorpage % (title, message)]