Source code for galaxy.managers.interactivetool
import json
import logging
import sqlite3
from urllib.parse import (
urlsplit,
urlunsplit,
)
from sqlalchemy import (
or_,
select,
)
from galaxy import exceptions
from galaxy.model import (
InteractiveToolEntryPoint,
Job,
)
from galaxy.model.base import transaction
from galaxy.security.idencoding import IdAsLowercaseAlphanumEncodingHelper
from galaxy.util.filelock import FileLock
log = logging.getLogger(__name__)
DATABASE_TABLE_NAME = "gxitproxy"
[docs]class InteractiveToolSqlite:
[docs] def __init__(self, sqlite_filename, encode_id):
self.sqlite_filename = sqlite_filename
self.encode_id = encode_id
[docs] def get(self, key, key_type):
with FileLock(self.sqlite_filename):
conn = sqlite3.connect(self.sqlite_filename)
try:
c = conn.cursor()
select = f"""SELECT token, host, port, info
FROM {DATABASE_TABLE_NAME}
WHERE key=? and key_type=?"""
c.execute(
select,
(
key,
key_type,
),
)
try:
token, host, port, info = c.fetchone()
except TypeError:
log.warning("get(): invalid key: %s key_type %s", key, key_type)
return None
return dict(key=key, key_type=key_type, token=token, host=host, port=port, info=info)
finally:
conn.close()
[docs] def save(self, key, key_type, token, host, port, info=None):
"""
Writeout a key, key_type, token, value store that is can be used for coordinating
with external resources.
"""
assert key, ValueError("A non-zero length key is required.")
assert key_type, ValueError("A non-zero length key_type is required.")
assert token, ValueError("A non-zero length token is required.")
with FileLock(self.sqlite_filename):
conn = sqlite3.connect(self.sqlite_filename)
try:
c = conn.cursor()
try:
# Create table
c.execute(
f"""CREATE TABLE {DATABASE_TABLE_NAME}
(key text,
key_type text,
token text,
host text,
port integer,
info text,
PRIMARY KEY (key, key_type)
)"""
)
except Exception:
pass
delete = f"""DELETE FROM {DATABASE_TABLE_NAME} WHERE key=? and key_type=?"""
c.execute(
delete,
(
key,
key_type,
),
)
insert = f"""INSERT INTO {DATABASE_TABLE_NAME}
(key, key_type, token, host, port, info)
VALUES (?, ?, ?, ?, ?, ?)"""
c.execute(
insert,
(
key,
key_type,
token,
host,
port,
info,
),
)
conn.commit()
finally:
conn.close()
[docs] def remove(self, **kwd):
"""
Remove entry from a key, key_type, token, value store that is can be used for coordinating
with external resources. Remove entries that match all provided key=values
"""
assert kwd, ValueError("You must provide some values to key upon")
delete = f"DELETE FROM {DATABASE_TABLE_NAME} WHERE"
value_list = []
for i, (key, value) in enumerate(kwd.items()):
if i != 0:
delete += " and"
delete += f" {key}=?"
value_list.append(value)
with FileLock(self.sqlite_filename):
conn = sqlite3.connect(self.sqlite_filename)
try:
c = conn.cursor()
try:
# Delete entry
c.execute(delete, tuple(value_list))
except Exception as e:
log.debug("Error removing entry (%s): %s", delete, e)
conn.commit()
finally:
conn.close()
[docs] def save_entry_point(self, entry_point):
"""Convenience method to easily save an entry_point."""
return self.save(
self.encode_id(entry_point.id),
entry_point.__class__.__name__.lower(),
entry_point.token,
entry_point.host,
entry_point.port,
json.dumps(
{
"requires_path_in_url": entry_point.requires_path_in_url,
"requires_path_in_header_named": entry_point.requires_path_in_header_named,
}
),
)
[docs] def remove_entry_point(self, entry_point):
"""Convenience method to easily remove an entry_point."""
return self.remove(key=self.encode_id(entry_point.id), key_type=entry_point.__class__.__name__.lower())
[docs]class InteractiveToolManager:
"""
Manager for dealing with InteractiveTools
"""
[docs] def __init__(self, app):
self.app = app
self.model = app.model
self.security = app.security
self.sa_session = app.model.context
self.job_manager = app.job_manager
self.encoder = IdAsLowercaseAlphanumEncodingHelper(app.security)
self.propagator = InteractiveToolSqlite(app.config.interactivetools_map, self.encoder.encode_id)
[docs] def create_entry_points(self, job, tool, entry_points=None, flush=True):
entry_points = entry_points or tool.ports
for entry in entry_points:
ep = self.model.InteractiveToolEntryPoint(
job=job,
tool_port=entry["port"],
entry_url=entry["url"],
name=entry["name"],
label=entry["label"],
requires_domain=entry["requires_domain"],
requires_path_in_url=entry["requires_path_in_url"],
requires_path_in_header_named=entry["requires_path_in_header_named"],
)
self.sa_session.add(ep)
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
[docs] def configure_entry_point(self, job, tool_port=None, host=None, port=None, protocol=None):
return self.configure_entry_points(
job, {tool_port: dict(tool_port=tool_port, host=host, port=port, protocol=protocol)}
)
[docs] def configure_entry_points(self, job, ports_dict):
# There can be multiple entry points that reference the same tool port (could have different entry URLs)
configured = []
not_configured = []
for ep in job.interactivetool_entry_points:
port_dict = ports_dict.get(str(ep.tool_port), None)
if port_dict is None:
log.error("Did not find port to assign to InteractiveToolEntryPoint by tool port: %s.", ep.tool_port)
not_configured.append(ep)
else:
ep.host = port_dict["host"]
ep.port = port_dict["port"]
ep.protocol = port_dict["protocol"]
ep.configured = True
self.sa_session.add(ep)
self.save_entry_point(ep)
configured.append(ep)
if configured:
with transaction(self.sa_session):
self.sa_session.commit()
return dict(not_configured=not_configured, configured=configured)
[docs] def save_entry_point(self, entry_point):
"""
Writeout a key, key_type, token, value store that is used validating access
"""
self.propagator.save_entry_point(entry_point)
[docs] def create_interactivetool(self, job, tool, entry_points):
# create from initial job
if job and tool:
self.create_entry_points(job, tool, entry_points)
else:
log.warning(
"Called InteractiveToolManager.create_interactivetool, but job (%s) or tool (%s) is None", job, tool
)
[docs] def get_nonterminal_for_user_by_trans(self, trans):
if trans.user is None and trans.get_galaxy_session() is None:
return []
def build_subquery():
if trans.user:
stmt = select(Job.id).where(Job.user == trans.user)
else:
stmt = select(Job.id).where(Job.session_id == trans.get_galaxy_session().id)
filters = []
for state in Job.non_ready_states:
filters.append(Job.state == state)
stmt = stmt.where(or_(*filters))
return stmt.subquery()
stmt = select(InteractiveToolEntryPoint).where(InteractiveToolEntryPoint.job_id.in_(build_subquery()))
return trans.sa_session.scalars(stmt)
[docs] def can_access_job(self, trans, job):
if job:
if trans.user is None:
galaxy_session = trans.get_galaxy_session()
if galaxy_session is None or job.session_id != galaxy_session.id:
return False
elif job.user != trans.user:
return False
else:
return False
return True
[docs] def can_access_entry_point(self, trans, entry_point):
if entry_point:
return self.can_access_job(trans, entry_point.job)
return False
[docs] def can_access_entry_points(self, trans, entry_points):
for ep in entry_points:
if not self.can_access_entry_point(trans, ep):
return False
return True
[docs] def stop(self, trans, entry_point):
self.remove_entry_point(entry_point)
job = entry_point.job
if not job.finished:
log.debug("Stopping Job: %s for InteractiveToolEntryPoint: %s", job, entry_point)
job.mark_stopped(trans.app.config.track_jobs_in_database)
# This self.job_manager.stop(job) does nothing without changing job.state, manually or e.g. with .mark_deleted()
self.job_manager.stop(job)
trans.sa_session.add(job)
with transaction(trans.sa_session):
trans.sa_session.commit()
[docs] def remove_entry_points(self, entry_points):
if entry_points:
for entry_point in entry_points:
self.remove_entry_point(entry_point, flush=False)
with transaction(self.sa_session):
self.sa_session.commit()
[docs] def remove_entry_point(self, entry_point, flush=True):
entry_point.deleted = True
self.sa_session.add(entry_point)
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
self.propagator.remove_entry_point(entry_point)
[docs] def target_if_active(self, trans, entry_point):
if entry_point.active and not entry_point.deleted:
use_it_proxy_host_cfg = (
not self.app.config.interactivetools_upstream_proxy and self.app.config.interactivetools_proxy_host
)
url_parts = urlsplit(trans.request.host_url)
url_host = self.app.config.interactivetools_proxy_host if use_it_proxy_host_cfg else trans.request.host
url_path = url_parts.path
if entry_point.requires_domain:
url_host = f"{self.get_entry_point_subdomain(trans, entry_point)}.{url_host}"
if entry_point.entry_url:
url_path = f"{url_path.rstrip('/')}/{entry_point.entry_url.lstrip('/')}"
else:
url_path = self.get_entry_point_path(trans, entry_point)
if not use_it_proxy_host_cfg:
return url_path
return urlunsplit((url_parts.scheme, url_host, url_path, "", ""))
def _get_entry_point_url_elements(self, trans, entry_point):
encoder = IdAsLowercaseAlphanumEncodingHelper(trans.security)
ep_encoded_id = encoder.encode_id(entry_point.id)
ep_class_id = entry_point.class_id
ep_prefix = self.app.config.interactivetools_prefix
ep_token = entry_point.token
return ep_encoded_id, ep_class_id, ep_prefix, ep_token
[docs] def get_entry_point_subdomain(self, trans, entry_point):
ep_encoded_id, ep_class_id, ep_prefix, ep_token = self._get_entry_point_url_elements(trans, entry_point)
return f"{ep_encoded_id}-{ep_token}.{ep_class_id}.{ep_prefix}"
[docs] def get_entry_point_path(self, trans, entry_point):
url_path = "/"
if not entry_point.requires_domain:
ep_encoded_id, ep_class_id, ep_prefix, ep_token = self._get_entry_point_url_elements(trans, entry_point)
path_parts = [
part.strip("/")
for part in (
str(self.app.config.interactivetools_base_path),
ep_prefix,
ep_class_id,
ep_encoded_id,
ep_token,
)
]
url_path += "/".join(part for part in path_parts if part) + "/"
if entry_point.entry_url:
url_path += entry_point.entry_url.lstrip("/")
return url_path
[docs] def access_entry_point_target(self, trans, entry_point_id):
entry_point = trans.sa_session.get(InteractiveToolEntryPoint, entry_point_id)
if self.app.interactivetool_manager.can_access_entry_point(trans, entry_point):
if entry_point.active:
return self.target_if_active(trans, entry_point)
elif entry_point.deleted:
raise exceptions.MessageException("InteractiveTool has ended. You will have to start a new one.")
else:
raise exceptions.MessageException(
"InteractiveTool is not active. If you recently launched this tool it may not be ready yet, please wait a moment and refresh this page."
)
raise exceptions.ItemAccessibilityException("You do not have access to this InteractiveTool entry point.")