Source code for galaxy.managers.interactivetool
import logging
import sqlite3
from sqlalchemy import or_
from galaxy import (
exceptions,
model,
)
from galaxy.model.base import transaction
from galaxy.util.filelock import FileLock
log = logging.getLogger(__name__)
DATABASE_TABLE_NAME = "gxitproxy"
[docs]class InteractiveToolSqlite:
[docs] def __init__(self, sqlite_filename, encode_id):
self.sqlite_filename = sqlite_filename
self.encode_id = encode_id
[docs] def get(self, key, key_type):
with FileLock(self.sqlite_filename):
conn = sqlite3.connect(self.sqlite_filename)
try:
c = conn.cursor()
select = f"""SELECT token, host, port, info
FROM {DATABASE_TABLE_NAME}
WHERE key=? and key_type=?"""
c.execute(
select,
(
key,
key_type,
),
)
try:
token, host, port, info = c.fetchone()
except TypeError:
log.warning("get(): invalid key: %s key_type %s", key, key_type)
return None
return dict(key=key, key_type=key_type, token=token, host=host, port=port, info=info)
finally:
conn.close()
[docs] def save(self, key, key_type, token, host, port, info=None):
"""
Writeout a key, key_type, token, value store that is can be used for coordinating
with external resources.
"""
assert key, ValueError("A non-zero length key is required.")
assert key_type, ValueError("A non-zero length key_type is required.")
assert token, ValueError("A non-zero length token is required.")
with FileLock(self.sqlite_filename):
conn = sqlite3.connect(self.sqlite_filename)
try:
c = conn.cursor()
try:
# Create table
c.execute(
"""CREATE TABLE %s
(key text,
key_type text,
token text,
host text,
port integer,
info text,
PRIMARY KEY (key, key_type)
)"""
% (DATABASE_TABLE_NAME)
)
except Exception:
pass
delete = f"""DELETE FROM {DATABASE_TABLE_NAME} WHERE key=? and key_type=?"""
c.execute(
delete,
(
key,
key_type,
),
)
insert = """INSERT INTO %s
(key, key_type, token, host, port, info)
VALUES (?, ?, ?, ?, ?, ?)""" % (
DATABASE_TABLE_NAME
)
c.execute(
insert,
(
key,
key_type,
token,
host,
port,
info,
),
)
conn.commit()
finally:
conn.close()
[docs] def remove(self, **kwd):
"""
Remove entry from a key, key_type, token, value store that is can be used for coordinating
with external resources. Remove entries that match all provided key=values
"""
assert kwd, ValueError("You must provide some values to key upon")
delete = f"DELETE FROM {DATABASE_TABLE_NAME} WHERE"
value_list = []
for i, (key, value) in enumerate(kwd.items()):
if i != 0:
delete += " and"
delete += f" {key}=?"
value_list.append(value)
with FileLock(self.sqlite_filename):
conn = sqlite3.connect(self.sqlite_filename)
try:
c = conn.cursor()
try:
# Delete entry
c.execute(delete, tuple(value_list))
except Exception as e:
log.debug("Error removing entry (%s): %s", delete, e)
conn.commit()
finally:
conn.close()
[docs] def save_entry_point(self, entry_point):
"""Convenience method to easily save an entry_point."""
return self.save(
self.encode_id(entry_point.id),
entry_point.__class__.__name__.lower(),
entry_point.token,
entry_point.host,
entry_point.port,
None,
)
[docs] def remove_entry_point(self, entry_point):
"""Convenience method to easily remove an entry_point."""
return self.remove(key=self.encode_id(entry_point.id), key_type=entry_point.__class__.__name__.lower())
[docs]class InteractiveToolManager:
"""
Manager for dealing with InteractiveTools
"""
[docs] def __init__(self, app):
self.app = app
self.model = app.model
self.security = app.security
self.sa_session = app.model.context
self.job_manager = app.job_manager
self.propagator = InteractiveToolSqlite(app.config.interactivetools_map, app.security.encode_id)
[docs] def create_entry_points(self, job, tool, entry_points=None, flush=True):
entry_points = entry_points or tool.ports
for entry in entry_points:
ep = self.model.InteractiveToolEntryPoint(
job=job,
tool_port=entry["port"],
entry_url=entry["url"],
name=entry["name"],
requires_domain=entry["requires_domain"],
short_token=self.app.config.interactivetools_shorten_url,
)
self.sa_session.add(ep)
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
[docs] def configure_entry_point(self, job, tool_port=None, host=None, port=None, protocol=None):
return self.configure_entry_points(
job, {tool_port: dict(tool_port=tool_port, host=host, port=port, protocol=protocol)}
)
[docs] def configure_entry_points(self, job, ports_dict):
# There can be multiple entry points that reference the same tool port (could have different entry URLs)
configured = []
not_configured = []
for ep in job.interactivetool_entry_points:
port_dict = ports_dict.get(str(ep.tool_port), None)
if port_dict is None:
log.error("Did not find port to assign to InteractiveToolEntryPoint by tool port: %s.", ep.tool_port)
not_configured.append(ep)
else:
ep.host = port_dict["host"]
ep.port = port_dict["port"]
ep.protocol = port_dict["protocol"]
ep.configured = True
self.sa_session.add(ep)
self.save_entry_point(ep)
configured.append(ep)
if configured:
with transaction(self.sa_session):
self.sa_session.commit()
return dict(not_configured=not_configured, configured=configured)
[docs] def save_entry_point(self, entry_point):
"""
Writeout a key, key_type, token, value store that is used validating access
"""
self.propagator.save_entry_point(entry_point)
[docs] def create_interactivetool(self, job, tool, entry_points):
# create from initial job
if job and tool:
self.create_entry_points(job, tool, entry_points)
else:
log.warning(
"Called InteractiveToolManager.create_interactivetool, but job (%s) or tool (%s) is None", job, tool
)
[docs] def get_nonterminal_for_user_by_trans(self, trans):
if trans.user is None and trans.get_galaxy_session() is None:
return []
if trans.user:
jobs = trans.sa_session.query(trans.app.model.Job).filter(trans.app.model.Job.user == trans.user)
else:
jobs = trans.sa_session.query(trans.app.model.Job).filter(
trans.app.model.Job.session_id == trans.get_galaxy_session().id
)
def build_and_apply_filters(query, objects, filter_func):
if objects is not None:
if isinstance(objects, str):
query = query.filter(filter_func(objects))
elif isinstance(objects, list):
t = []
for obj in objects:
t.append(filter_func(obj))
query = query.filter(or_(*t))
return query
jobs = build_and_apply_filters(
jobs, trans.app.model.Job.non_ready_states, lambda s: trans.app.model.Job.state == s
)
return trans.sa_session.query(trans.app.model.InteractiveToolEntryPoint).filter(
trans.app.model.InteractiveToolEntryPoint.job_id.in_([job.id for job in jobs])
)
[docs] def can_access_job(self, trans, job):
if job:
if trans.user is None:
galaxy_session = trans.get_galaxy_session()
if galaxy_session is None or job.session_id != galaxy_session.id:
return False
elif job.user != trans.user:
return False
else:
return False
return True
[docs] def can_access_entry_point(self, trans, entry_point):
if entry_point:
return self.can_access_job(trans, entry_point.job)
return False
[docs] def can_access_entry_points(self, trans, entry_points):
for ep in entry_points:
if not self.can_access_entry_point(trans, ep):
return False
return True
[docs] def stop(self, trans, entry_point):
self.remove_entry_point(entry_point)
job = entry_point.job
if not job.finished:
log.debug("Stopping Job: %s for InteractiveToolEntryPoint: %s", job, entry_point)
job.mark_stopped(trans.app.config.track_jobs_in_database)
# This self.job_manager.stop(job) does nothing without changing job.state, manually or e.g. with .mark_deleted()
self.job_manager.stop(job)
trans.sa_session.add(job)
with transaction(trans.sa_session):
trans.sa_session.commit()
[docs] def remove_entry_points(self, entry_points):
if entry_points:
for entry_point in entry_points:
self.remove_entry_point(entry_point, flush=False)
with transaction(self.sa_session):
self.sa_session.commit()
[docs] def remove_entry_point(self, entry_point, flush=True):
entry_point.deleted = True
self.sa_session.add(entry_point)
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
self.propagator.remove_entry_point(entry_point)
[docs] def target_if_active(self, trans, entry_point):
if entry_point.active and not entry_point.deleted:
request_host = trans.request.host
if not self.app.config.interactivetools_upstream_proxy and self.app.config.interactivetools_proxy_host:
request_host = self.app.config.interactivetools_proxy_host
protocol = trans.request.host_url.split("//", 1)[0]
if entry_point.requires_domain:
rval = f"{protocol}//{self.get_entry_point_subdomain(trans, entry_point)}.{request_host}/"
if entry_point.entry_url:
rval = "{}/{}".format(rval.rstrip("/"), entry_point.entry_url.lstrip("/"))
else:
rval = self.get_entry_point_path(trans, entry_point)
if not self.app.config.interactivetools_upstream_proxy and self.app.config.interactivetools_proxy_host:
rval = f"{protocol}//{request_host}{rval}"
return rval
[docs] def get_entry_point_subdomain(self, trans, entry_point):
entry_point_encoded_id = trans.security.encode_id(entry_point.id)
entry_point_class = entry_point.__class__.__name__.lower()
entry_point_prefix = self.app.config.interactivetools_prefix
entry_point_token = entry_point.token
if self.app.config.interactivetools_shorten_url:
return f"{entry_point_encoded_id}-{entry_point_token[:10]}.{entry_point_prefix}"
return f"{entry_point_encoded_id}-{entry_point_token}.{entry_point_class}.{entry_point_prefix}"
[docs] def get_entry_point_path(self, trans, entry_point):
entry_point_encoded_id = trans.security.encode_id(entry_point.id)
entry_point_class = entry_point.__class__.__name__.lower()
entry_point_prefix = self.app.config.interactivetools_prefix
rval = "/"
if not entry_point.requires_domain:
rval = str(self.app.config.interactivetools_base_path).rstrip("/").lstrip("/")
if self.app.config.interactivetools_shorten_url:
rval = f"/{rval}/{entry_point_prefix}/{entry_point_encoded_id}/{entry_point.token[:10]}/"
else:
rval = f"/{rval}/{entry_point_prefix}/access/{entry_point_class}/{entry_point_encoded_id}/{entry_point.token}/"
if entry_point.entry_url:
rval = f"{rval.rstrip('/')}/{entry_point.entry_url.lstrip('/')}"
rval = "/" + rval.lstrip("/")
return rval
[docs] def access_entry_point_target(self, trans, entry_point_id):
entry_point = trans.sa_session.query(model.InteractiveToolEntryPoint).get(entry_point_id)
if self.app.interactivetool_manager.can_access_entry_point(trans, entry_point):
if entry_point.active:
return self.target_if_active(trans, entry_point)
elif entry_point.deleted:
raise exceptions.MessageException("InteractiveTool has ended. You will have to start a new one.")
else:
raise exceptions.MessageException(
"InteractiveTool is not active. If you recently launched this tool it may not be ready yet, please wait a moment and refresh this page."
)
raise exceptions.ItemAccessibilityException("You do not have access to this InteractiveTool entry point.")