Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.interactivetool
import json
import logging
from urllib.parse import (
urlsplit,
urlunsplit,
)
from sqlalchemy import (
Column,
create_engine,
delete,
insert,
Integer,
MetaData,
or_,
select,
String,
Table,
Text,
)
from galaxy import exceptions
from galaxy.model import (
InteractiveToolEntryPoint,
Job,
)
from galaxy.model.base import transaction
from galaxy.security.idencoding import IdAsLowercaseAlphanumEncodingHelper
log = logging.getLogger(__name__)
gxitproxy = Table(
"gxitproxy",
MetaData(),
Column("key", String(16), primary_key=True),
Column("key_type", Text(), primary_key=True),
Column("token", String(32)),
Column("host", Text()),
Column("port", Integer()),
Column("info", Text()),
)
[docs]class InteractiveToolPropagatorSQLAlchemy:
"""
Propagator for InteractiveToolManager implemented using SQLAlchemy.
"""
[docs] def __init__(self, database_url, encode_id):
"""
Constructor that sets up the propagator using a SQLAlchemy database URL.
:param database_url: SQLAlchemy database URL, read more on the SQLAlchemy documentation
https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls.
:param encode_id: A helper class that can encode ids as lowercase alphanumeric strings and vice versa.
"""
self._engine = create_engine(database_url)
self._encode_id = encode_id
[docs] def save(self, key, key_type, token, host, port, info=None):
"""
Write out a key, key_type, token, value store that is can be used for coordinating with external resources.
"""
assert key, ValueError("A non-zero length key is required.")
assert key_type, ValueError("A non-zero length key_type is required.")
assert token, ValueError("A non-zero length token is required.")
with self._engine.connect() as conn:
# create database table if not exists
gxitproxy.create(conn, checkfirst=True)
# delete existing data with same key
stmt_delete = delete(gxitproxy).where(
gxitproxy.c.key == key,
gxitproxy.c.key_type == key_type,
)
conn.execute(stmt_delete)
# save data
stmt_insert = insert(gxitproxy).values(
key=key,
key_type=key_type,
token=token,
host=host,
port=port,
info=info,
)
conn.execute(stmt_insert)
conn.commit()
[docs] def remove(self, **kwd):
"""
Remove entry from a key, key_type, token, value store that is can be used for coordinating
with external resources. Remove entries that match all provided key=values
"""
assert kwd, ValueError("You must provide some values to key upon")
with self._engine.connect() as conn:
stmt = delete(gxitproxy).where(
*(gxitproxy.c[key] == value for key, value in kwd.items()),
)
conn.execute(stmt)
conn.commit()
[docs] def save_entry_point(self, entry_point):
"""Convenience method to easily save an entry_point."""
return self.save(
self._encode_id(entry_point.id),
entry_point.__class__.__name__.lower(),
entry_point.token,
entry_point.host,
entry_point.port,
json.dumps(
{
"requires_path_in_url": entry_point.requires_path_in_url,
"requires_path_in_header_named": entry_point.requires_path_in_header_named,
}
),
)
[docs] def remove_entry_point(self, entry_point):
"""Convenience method to easily remove an entry_point."""
return self.remove(key=self._encode_id(entry_point.id), key_type=entry_point.__class__.__name__.lower())
[docs]class InteractiveToolManager:
"""
Manager for dealing with InteractiveTools
"""
[docs] def __init__(self, app):
self.app = app
self.model = app.model
self.security = app.security
self.sa_session = app.model.context
self.job_manager = app.job_manager
self.encoder = IdAsLowercaseAlphanumEncodingHelper(app.security)
self.propagator = InteractiveToolPropagatorSQLAlchemy(
app.config.interactivetoolsproxy_map or app.config.interactivetools_map,
self.encoder.encode_id,
)
[docs] def create_entry_points(self, job, tool, entry_points=None, flush=True):
entry_points = entry_points or tool.ports
for entry in entry_points:
ep = self.model.InteractiveToolEntryPoint(
job=job,
tool_port=entry["port"],
entry_url=entry["url"],
name=entry["name"],
label=entry["label"],
requires_domain=entry["requires_domain"],
requires_path_in_url=entry["requires_path_in_url"],
requires_path_in_header_named=entry["requires_path_in_header_named"],
)
self.sa_session.add(ep)
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
[docs] def configure_entry_point(self, job, tool_port=None, host=None, port=None, protocol=None):
return self.configure_entry_points(
job, {tool_port: dict(tool_port=tool_port, host=host, port=port, protocol=protocol)}
)
[docs] def configure_entry_points(self, job, ports_dict):
# There can be multiple entry points that reference the same tool port (could have different entry URLs)
configured = []
not_configured = []
for ep in job.interactivetool_entry_points:
port_dict = ports_dict.get(str(ep.tool_port), None)
if port_dict is None:
log.error("Did not find port to assign to InteractiveToolEntryPoint by tool port: %s.", ep.tool_port)
not_configured.append(ep)
else:
ep.host = port_dict["host"]
ep.port = port_dict["port"]
ep.protocol = port_dict["protocol"]
ep.configured = True
self.sa_session.add(ep)
self.save_entry_point(ep)
configured.append(ep)
if configured:
with transaction(self.sa_session):
self.sa_session.commit()
return dict(not_configured=not_configured, configured=configured)
[docs] def save_entry_point(self, entry_point):
"""
Writeout a key, key_type, token, value store that is used validating access
"""
self.propagator.save_entry_point(entry_point)
[docs] def create_interactivetool(self, job, tool, entry_points):
# create from initial job
if job and tool:
self.create_entry_points(job, tool, entry_points)
else:
log.warning(
"Called InteractiveToolManager.create_interactivetool, but job (%s) or tool (%s) is None", job, tool
)
[docs] def get_nonterminal_for_user_by_trans(self, trans):
if trans.user is None and trans.get_galaxy_session() is None:
return []
def build_subquery():
if trans.user:
stmt = select(Job.id).where(Job.user == trans.user)
else:
stmt = select(Job.id).where(Job.session_id == trans.get_galaxy_session().id)
filters = []
for state in Job.non_ready_states:
filters.append(Job.state == state)
stmt = stmt.where(or_(*filters))
return stmt.subquery()
stmt = select(InteractiveToolEntryPoint).where(InteractiveToolEntryPoint.job_id.in_(build_subquery()))
return trans.sa_session.scalars(stmt)
[docs] def can_access_job(self, trans, job):
if job:
if trans.user is None:
galaxy_session = trans.get_galaxy_session()
if galaxy_session is None or job.session_id != galaxy_session.id:
return False
elif job.user != trans.user:
return False
else:
return False
return True
[docs] def can_access_entry_point(self, trans, entry_point):
if entry_point:
return self.can_access_job(trans, entry_point.job)
return False
[docs] def can_access_entry_points(self, trans, entry_points):
for ep in entry_points:
if not self.can_access_entry_point(trans, ep):
return False
return True
[docs] def stop(self, trans, entry_point):
self.remove_entry_point(entry_point)
job = entry_point.job
if not job.finished:
log.debug("Stopping Job: %s for InteractiveToolEntryPoint: %s", job, entry_point)
job.mark_stopped(trans.app.config.track_jobs_in_database)
# This self.job_manager.stop(job) does nothing without changing job.state, manually or e.g. with .mark_deleted()
self.job_manager.stop(job)
trans.sa_session.add(job)
with transaction(trans.sa_session):
trans.sa_session.commit()
[docs] def remove_entry_points(self, entry_points):
if entry_points:
for entry_point in entry_points:
self.remove_entry_point(entry_point, flush=False)
with transaction(self.sa_session):
self.sa_session.commit()
[docs] def remove_entry_point(self, entry_point, flush=True):
entry_point.deleted = True
self.sa_session.add(entry_point)
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
self.propagator.remove_entry_point(entry_point)
[docs] def target_if_active(self, trans, entry_point):
if entry_point.active and not entry_point.deleted:
use_it_proxy_host_cfg = (
not self.app.config.interactivetools_upstream_proxy and self.app.config.interactivetools_proxy_host
)
url_parts = urlsplit(trans.request.host_url)
url_host = self.app.config.interactivetools_proxy_host if use_it_proxy_host_cfg else trans.request.host
url_path = url_parts.path
if entry_point.requires_domain:
url_host = f"{self.get_entry_point_subdomain(trans, entry_point)}.{url_host}"
if entry_point.entry_url:
url_path = f"{url_path.rstrip('/')}/{entry_point.entry_url.lstrip('/')}"
else:
url_path = self.get_entry_point_path(trans, entry_point)
if not use_it_proxy_host_cfg:
return url_path
return urlunsplit((url_parts.scheme, url_host, url_path, "", ""))
def _get_entry_point_url_elements(self, trans, entry_point):
encoder = IdAsLowercaseAlphanumEncodingHelper(trans.security)
ep_encoded_id = encoder.encode_id(entry_point.id)
ep_class_id = entry_point.class_id
ep_prefix = self.app.config.interactivetools_prefix
ep_token = entry_point.token
return ep_encoded_id, ep_class_id, ep_prefix, ep_token
[docs] def get_entry_point_subdomain(self, trans, entry_point):
ep_encoded_id, ep_class_id, ep_prefix, ep_token = self._get_entry_point_url_elements(trans, entry_point)
return f"{ep_encoded_id}-{ep_token}.{ep_class_id}.{ep_prefix}"
[docs] def get_entry_point_path(self, trans, entry_point):
url_path = "/"
if not entry_point.requires_domain:
ep_encoded_id, ep_class_id, ep_prefix, ep_token = self._get_entry_point_url_elements(trans, entry_point)
path_parts = [
part.strip("/")
for part in (
str(self.app.config.interactivetools_base_path),
ep_prefix,
ep_class_id,
ep_encoded_id,
ep_token,
)
]
url_path += "/".join(part for part in path_parts if part) + "/"
if entry_point.entry_url:
url_path += entry_point.entry_url.lstrip("/")
return url_path
[docs] def access_entry_point_target(self, trans, entry_point_id):
entry_point = trans.sa_session.get(InteractiveToolEntryPoint, entry_point_id)
if self.app.interactivetool_manager.can_access_entry_point(trans, entry_point):
if entry_point.active:
return self.target_if_active(trans, entry_point)
elif entry_point.deleted:
raise exceptions.MessageException("InteractiveTool has ended. You will have to start a new one.")
else:
raise exceptions.MessageException(
"InteractiveTool is not active. If you recently launched this tool it may not be ready yet, please wait a moment and refresh this page."
)
raise exceptions.ItemAccessibilityException("You do not have access to this InteractiveTool entry point.")