Source code for galaxy.web.proxy

import json
import logging
import os
import time
from collections import namedtuple

from galaxy.util import (
    requests,
    sockets,
    sqlite,
    unique_id,
)
from galaxy.util.filelock import FileLock
from galaxy.util.lazy_process import (
    LazyProcess,
    NoOpLazyProcess,
)
from galaxy.web.framework import url_for

log = logging.getLogger(__name__)


DEFAULT_PROXY_TO_HOST = "localhost"
SECURE_COOKIE = "galaxysession"
# Randomly generate a password every launch


[docs]class ProxyManager: valid_update_keys = ( "host", "port", )
[docs] def __init__(self, config): for option in [ "manage_dynamic_proxy", "dynamic_proxy_bind_port", "dynamic_proxy_bind_ip", "dynamic_proxy_debug", "dynamic_proxy_external_proxy", "dynamic_proxy_prefix", "proxy_session_map", "dynamic_proxy", "cookie_path", "dynamic_proxy_golang_noaccess", "dynamic_proxy_golang_clean_interval", "dynamic_proxy_golang_docker_address", "dynamic_proxy_golang_api_key", ]: setattr(self, option, getattr(config, option)) if self.manage_dynamic_proxy: self.lazy_process = self.__setup_lazy_process(config) else: self.lazy_process = NoOpLazyProcess() if self.dynamic_proxy_golang_api_key is None: self.dynamic_proxy_golang_api_key = unique_id() self.proxy_ipc = proxy_ipc(config)
[docs] def shutdown(self): self.lazy_process.shutdown()
[docs] def setup_proxy( self, trans, host=DEFAULT_PROXY_TO_HOST, port=None, proxy_prefix="", route_name="", container_ids=None, container_interface=None, ): if self.manage_dynamic_proxy: log.info("Attempting to start dynamic proxy process") log.debug(f"Cmd: {' '.join(self.lazy_process.command_and_args)}") self.lazy_process.start_process() if container_ids is None: container_ids = [] authentication = AuthenticationToken(trans) proxy_requests = ProxyRequests(host=host, port=port) self.proxy_ipc.handle_requests( authentication, proxy_requests, f"/{route_name}", container_ids, container_interface, ) # TODO: These shouldn't need to be request.host and request.scheme - # though they are reasonable defaults. host = trans.request.host if ":" in host: host = host[0 : host.index(":")] scheme = trans.request.scheme if not self.dynamic_proxy_external_proxy: proxy_url = "%s://%s:%d" % (scheme, host, self.dynamic_proxy_bind_port) else: proxy_url = f"{scheme}://{host}{proxy_prefix}" return { "proxy_url": proxy_url, "proxied_port": proxy_requests.port, "proxied_host": proxy_requests.host, }
[docs] def update_proxy(self, trans, **kwargs): authentication = AuthenticationToken(trans) for k in kwargs.keys(): if k not in self.valid_update_keys: raise Exception(f"Invalid proxy request update key: {k}") return self.proxy_ipc.update_requests(authentication, **kwargs)
[docs] def query_proxy(self, trans): authentication = AuthenticationToken(trans) return self.proxy_ipc.fetch_requests(authentication)
def __setup_lazy_process(self, config): launcher = self.proxy_launcher() command = launcher.launch_proxy_command(config) return LazyProcess(command)
[docs] def proxy_launcher(self): if self.dynamic_proxy == "node": return NodeProxyLauncher() elif self.dynamic_proxy == "golang": return GolangProxyLauncher() else: raise Exception("Unknown proxy type")
[docs]class ProxyLauncher:
[docs] def launch_proxy_command(self, config): raise NotImplementedError()
[docs]class NodeProxyLauncher:
[docs] def launch_proxy_command(self, config): args = [ "--sessions", config.proxy_session_map, "--ip", config.dynamic_proxy_bind_ip, "--port", str(config.dynamic_proxy_bind_port), "--reverseProxy", ] if config.dynamic_proxy_debug: args.append("--verbose") parent_directory = os.path.dirname(__file__) path_to_application = os.path.join(parent_directory, "js", "lib", "main.js") command = [path_to_application] + args return command
[docs]class GolangProxyLauncher:
[docs] def launch_proxy_command(self, config): args = [ "gxproxy", # Must be on path. TODO: wheel? "--listenAddr", "%s:%d" % ( config.dynamic_proxy_bind_ip, config.dynamic_proxy_bind_port, ), "--listenPath", "/".join(((config.cookie_path or url_for("/")), config.dynamic_proxy_prefix)), "--cookieName", "galaxysession", "--storage", config.proxy_session_map.replace(".sqlite", ".xml"), # just in case. "--apiKey", config.dynamic_proxy_golang_api_key, "--noAccess", config.dynamic_proxy_golang_noaccess, "--cleanInterval", config.dynamic_proxy_golang_clean_interval, "--dockerAddr", config.dynamic_proxy_golang_docker_address, ] if config.dynamic_proxy_debug: args.append("--verbose") return args
[docs]class AuthenticationToken:
[docs] def __init__(self, trans): self.cookie_name = SECURE_COOKIE self.cookie_value = trans.get_cookie(self.cookie_name)
[docs]class ProxyRequests:
[docs] def __init__(self, host=None, port=None): if host is None: host = DEFAULT_PROXY_TO_HOST if port is None: port = sockets.unused_port() log.info("Obtained unused port %d" % port) self.host = host self.port = port
[docs]def proxy_ipc(config): proxy_session_map = config.proxy_session_map if config.dynamic_proxy == "node": if proxy_session_map.endswith(".sqlite"): return SqliteProxyIpc(proxy_session_map) else: return JsonFileProxyIpc(proxy_session_map) elif config.dynamic_proxy == "golang": return RestGolangProxyIpc(config)
[docs]class ProxyIpc:
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface): raise NotImplementedError()
[docs] def fetch_requests(self, authentication, key): raise NotImplementedError()
[docs]class JsonFileProxyIpc:
[docs] def __init__(self, proxy_session_map): self.proxy_session_map = proxy_session_map
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface): key = authentication.cookie_value with FileLock(self.proxy_session_map): if not os.path.exists(self.proxy_session_map): open(self.proxy_session_map, "w").write("{}") json_data = open(self.proxy_session_map).read() session_map = json.loads(json_data) session_map[key] = { "host": proxy_requests.host, "port": proxy_requests.port, "container_ids": container_ids, "container_interface": container_interface, } new_json_data = json.dumps(session_map) open(self.proxy_session_map, "w").write(new_json_data)
[docs] def update_requests(self, authentication, host=None, port=None): key = authentication.cookie_value with FileLock(self.proxy_session_map): session_map = json.load(open(self.proxy_session_map)) session_map[key]["host"] = host session_map[key]["port"] = port new_json_data = json.dumps(session_map) open(self.proxy_session_map, "w").write(new_json_data)
[docs] def fetch_requests(self, authentication): key = authentication.cookie_value try: with open(self.proxy_session_map) as fh: session_map = json.load(fh) m = session_map[key] return ProxyMapping( host=m["host"], port=m["port"], container_ids=m["container_ids"], container_interface=m["container_interface"], ) except (TypeError, KeyError): log.warning("fetch_requests(): invalid key: %s", key) return None
[docs]class SqliteProxyIpc:
[docs] def __init__(self, proxy_session_map): self.proxy_session_map = proxy_session_map
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface): key = authentication.cookie_value with FileLock(self.proxy_session_map): conn = sqlite.connect(self.proxy_session_map) try: c = conn.cursor() try: # Create table c.execute( """CREATE TABLE gxproxy2 (key text PRIMARY KEY, host text, port integer, container_ids text, container_interface text)""" ) except Exception: pass delete = """DELETE FROM gxproxy2 WHERE key=?""" c.execute(delete, (key,)) insert = """INSERT INTO gxproxy2 (key, host, port, container_ids, container_interface) VALUES (?, ?, ?, ?, ?)""" c.execute( insert, (key, proxy_requests.host, proxy_requests.port, json.dumps(container_ids), container_interface), ) conn.commit() finally: conn.close()
[docs] def update_requests(self, authentication, host=None, port=None): key = authentication.cookie_value with FileLock(self.proxy_session_map): conn = sqlite.connect(self.proxy_session_map) try: c = conn.cursor() update = """UPDATE gxproxy2 SET host = ?, port = ? WHERE key = ?""" c.execute(update, (host, port, key)) conn.commit() finally: conn.close()
[docs] def fetch_requests(self, authentication): key = authentication.cookie_value with FileLock(self.proxy_session_map): conn = sqlite.connect(self.proxy_session_map) try: c = conn.cursor() select = """SELECT host, port, container_ids, container_interface FROM gxproxy2 WHERE key=?""" c.execute(select, (key,)) try: host, port, container_ids, container_interface = c.fetchone() except TypeError: log.warning("fetch_requests(): invalid key: %s", key) return None return ProxyMapping( host=host, port=port, container_ids=json.loads(container_ids), container_interface=container_interface, ) finally: conn.close()
[docs]class RestGolangProxyIpc:
[docs] def __init__(self, config): self.config = config self.api_url = f"http://127.0.0.1:{self.config.dynamic_proxy_bind_port}/api?api_key={self.config.dynamic_proxy_golang_api_key}"
[docs] def handle_requests(self, authentication, proxy_requests, route_name, container_ids, container_interface, sleep=1): """Make a POST request to the GO proxy to register a route""" values = { "FrontendPath": route_name, "BackendAddr": f"{proxy_requests.host}:{proxy_requests.port}", "AuthorizedCookie": authentication.cookie_value, "ContainerIds": container_ids, } # Sometimes it takes our poor little proxy a second or two to get # going, so if this fails, re-call ourselves with an increased timeout. try: requests.get(self.api_url, headers={"Content-Type": "application/json"}, data=json.dumps(values)) except requests.exceptions.ConnectionError as err: log.exception(err) if sleep > 5: excp = f"Could not contact proxy after {sum(range(sleep + 1))} seconds" raise Exception(excp) time.sleep(sleep) self.handle_requests( authentication, proxy_requests, route_name, container_ids, container_interface, sleep=sleep + 1 )
ProxyMapping = namedtuple("ProxyMapping", ["host", "port", "container_ids", "container_interface"]) # TODO: MQ diven proxy?