Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web.proxy

import json
import logging
import os
import time
from collections import namedtuple

from galaxy.util import (
    requests,
    sockets,
    sqlite,
    unique_id,
)
from galaxy.util.filelock import FileLock
from galaxy.util.lazy_process import (
    LazyProcess,
    NoOpLazyProcess,
)
from galaxy.web.framework import url_for

log = logging.getLogger(__name__)


DEFAULT_PROXY_TO_HOST = "localhost"
SECURE_COOKIE = "galaxysession"
# Randomly generate a password every launch


[docs]class ProxyManager: valid_update_keys = ( "host", "port", )
[docs] def __init__(self, config): for option in [ "manage_dynamic_proxy", "dynamic_proxy_bind_port", "dynamic_proxy_bind_ip", "dynamic_proxy_debug", "dynamic_proxy_external_proxy", "dynamic_proxy_prefix", "proxy_session_map", "dynamic_proxy", "cookie_path", "dynamic_proxy_golang_noaccess", "dynamic_proxy_golang_clean_interval", "dynamic_proxy_golang_docker_address", "dynamic_proxy_golang_api_key", ]: setattr(self, option, getattr(config, option)) if self.manage_dynamic_proxy: self.lazy_process = self.__setup_lazy_process(config) else: self.lazy_process = NoOpLazyProcess() if self.dynamic_proxy_golang_api_key is None: self.dynamic_proxy_golang_api_key = unique_id() self.proxy_ipc = proxy_ipc(config)
[docs] def shutdown(self): self.lazy_process.shutdown()
[docs] def setup_proxy( self, trans, host=DEFAULT_PROXY_TO_HOST, port=None, proxy_prefix="", route_name="", container_ids=None, container_interface=None, ): if self.manage_dynamic_proxy: log.info("Attempting to start dynamic proxy process") log.debug(f"Cmd: {' '.join(self.lazy_process.command_and_args)}") self.lazy_process.start_process() if container_ids is None: container_ids = [] authentication = AuthenticationToken(trans) proxy_requests = ProxyRequests(host=host, port=port) self.proxy_ipc.handle_requests( authentication, proxy_requests, f"/{route_name}", container_ids, container_interface, ) # TODO: These shouldn't need to be request.host and request.scheme - # though they are reasonable defaults. host = trans.request.host if ":" in host: host = host[0 : host.index(":")] scheme = trans.request.scheme if not self.dynamic_proxy_external_proxy: proxy_url = "%s://%s:%d" % (scheme, host, self.dynamic_proxy_bind_port) else: proxy_url = f"{scheme}://{host}{proxy_prefix}" return { "proxy_url": proxy_url, "proxied_port": proxy_requests.port, "proxied_host": proxy_requests.host, }
[docs] def update_proxy(self, trans, **kwargs): authentication = AuthenticationToken(trans) for k in kwargs.keys(): if k not in self.valid_update_keys: raise Exception(f"Invalid proxy request update key: {k}") return self.proxy_ipc.update_requests(authentication, **kwargs)
[docs] def query_proxy(self, trans): authentication = AuthenticationToken(trans) return self.proxy_ipc.fetch_requests(authentication)
def __setup_lazy_process(self, config): launcher = self.proxy_launcher() command = launcher.launch_proxy_command(config) return LazyProcess(command)
[docs] def proxy_launcher(self): if self.dynamic_proxy == "node": return NodeProxyLauncher() elif self.dynamic_proxy == "golang": return GolangProxyLauncher() else: raise Exception("Unknown proxy type")
[docs]class ProxyLauncher:
[docs] def launch_proxy_command(self, config): raise NotImplementedError()
[docs]class NodeProxyLauncher:
[docs] def launch_proxy_command(self, config): args = [ "--sessions", config.proxy_session_map, "--ip", config.dynamic_proxy_bind_ip, "--port", str(config.dynamic_proxy_bind_port), "--reverseProxy", ] if config.dynamic_proxy_debug: args.append("--verbose") parent_directory = os.path.dirname(__file__) path_to_application = os.path.join(parent_directory, "js", "lib", "main.js") command = [path_to_application] + args return command
[docs]class GolangProxyLauncher:
[docs] def launch_proxy_command(self, config): args = [ "gxproxy", # Must be on path. TODO: wheel? "--listenAddr", "%s:%d" % ( config.dynamic_proxy_bind_ip, config.dynamic_proxy_bind_port, ), "--listenPath", "/".join(((config.cookie_path or url_for("/")), config.dynamic_proxy_prefix)), "--cookieName", "galaxysession", "--storage", config.proxy_session_map.replace(".sqlite", ".xml"), # just in case. "--apiKey", config.dynamic_proxy_golang_api_key, "--noAccess", config.dynamic_proxy_golang_noaccess, "--cleanInterval", config.dynamic_proxy_golang_clean_interval, "--dockerAddr", config.dynamic_proxy_golang_docker_address, ] if config.dynamic_proxy_debug: args.append("--verbose") return args
[docs]class AuthenticationToken:
[docs] def __init__(self, trans): self.cookie_name = SECURE_COOKIE self.cookie_value = trans.get_cookie(self.cookie_name)
[docs]class ProxyRequests:
[docs] def __init__(self, host=None, port=None): if host is None: host = DEFAULT_PROXY_TO_HOST if port is None: port = sockets.unused_port() log.info("Obtained unused port %d" % port) self.host = host self.port = port
[docs]def proxy_ipc(config): proxy_session_map = config.proxy_session_map if config.dynamic_proxy == "node": if proxy_session_map.endswith(".sqlite"): return SqliteProxyIpc(proxy_session_map) else: return JsonFileProxyIpc(proxy_session_map) elif config.dynamic_proxy == "golang": return RestGolangProxyIpc(config)
[docs]class ProxyIpc:
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface): raise NotImplementedError()
[docs] def fetch_requests(self, authentication, key): raise NotImplementedError()
[docs]class JsonFileProxyIpc:
[docs] def __init__(self, proxy_session_map): self.proxy_session_map = proxy_session_map
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface): key = authentication.cookie_value with FileLock(self.proxy_session_map): if not os.path.exists(self.proxy_session_map): open(self.proxy_session_map, "w").write("{}") json_data = open(self.proxy_session_map).read() session_map = json.loads(json_data) session_map[key] = { "host": proxy_requests.host, "port": proxy_requests.port, "container_ids": container_ids, "container_interface": container_interface, } new_json_data = json.dumps(session_map) open(self.proxy_session_map, "w").write(new_json_data)
[docs] def update_requests(self, authentication, host=None, port=None): key = authentication.cookie_value with FileLock(self.proxy_session_map): session_map = json.load(open(self.proxy_session_map)) session_map[key]["host"] = host session_map[key]["port"] = port new_json_data = json.dumps(session_map) open(self.proxy_session_map, "w").write(new_json_data)
[docs] def fetch_requests(self, authentication): key = authentication.cookie_value try: with open(self.proxy_session_map) as fh: session_map = json.load(fh) m = session_map[key] return ProxyMapping( host=m["host"], port=m["port"], container_ids=m["container_ids"], container_interface=m["container_interface"], ) except (TypeError, KeyError): log.warning("fetch_requests(): invalid key: %s", key) return None
[docs]class SqliteProxyIpc:
[docs] def __init__(self, proxy_session_map): self.proxy_session_map = proxy_session_map
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface): key = authentication.cookie_value with FileLock(self.proxy_session_map): conn = sqlite.connect(self.proxy_session_map) try: c = conn.cursor() try: # Create table c.execute( """CREATE TABLE gxproxy2 (key text PRIMARY KEY, host text, port integer, container_ids text, container_interface text)""" ) except Exception: pass delete = """DELETE FROM gxproxy2 WHERE key=?""" c.execute(delete, (key,)) insert = """INSERT INTO gxproxy2 (key, host, port, container_ids, container_interface) VALUES (?, ?, ?, ?, ?)""" c.execute( insert, (key, proxy_requests.host, proxy_requests.port, json.dumps(container_ids), container_interface), ) conn.commit() finally: conn.close()
[docs] def update_requests(self, authentication, host=None, port=None): key = authentication.cookie_value with FileLock(self.proxy_session_map): conn = sqlite.connect(self.proxy_session_map) try: c = conn.cursor() update = """UPDATE gxproxy2 SET host = ?, port = ? WHERE key = ?""" c.execute(update, (host, port, key)) conn.commit() finally: conn.close()
[docs] def fetch_requests(self, authentication): key = authentication.cookie_value with FileLock(self.proxy_session_map): conn = sqlite.connect(self.proxy_session_map) try: c = conn.cursor() select = """SELECT host, port, container_ids, container_interface FROM gxproxy2 WHERE key=?""" c.execute(select, (key,)) try: host, port, container_ids, container_interface = c.fetchone() except TypeError: log.warning("fetch_requests(): invalid key: %s", key) return None return ProxyMapping( host=host, port=port, container_ids=json.loads(container_ids), container_interface=container_interface, ) finally: conn.close()
[docs]class RestGolangProxyIpc:
[docs] def __init__(self, config): self.config = config self.api_url = f"http://127.0.0.1:{self.config.dynamic_proxy_bind_port}/api?api_key={self.config.dynamic_proxy_golang_api_key}"
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface, sleep=1): """Make a POST request to the GO proxy to register a route""" values = { "FrontendPath": route_name, "BackendAddr": f"{proxy_requests.host}:{proxy_requests.port}", "AuthorizedCookie": authentication.cookie_value, "ContainerIds": container_ids, } # Sometimes it takes our poor little proxy a second or two to get # going, so if this fails, re-call ourselves with an increased timeout. try: requests.get(self.api_url, headers={"Content-Type": "application/json"}, data=json.dumps(values)) except requests.exceptions.ConnectionError as err: log.exception(err) if sleep > 5: excp = f"Could not contact proxy after {sum(range(sleep + 1))} seconds" raise Exception(excp) time.sleep(sleep) self.handle_requests( authentication, proxy_requests, route_name, container_ids, container_interface, sleep=sleep + 1 )
ProxyMapping = namedtuple("ProxyMapping", ["host", "port", "container_ids", "container_interface"]) # TODO: MQ diven proxy?