Source code for galaxy.managers.interactivetool

import logging
import sqlite3

from sqlalchemy import or_

from galaxy import (
from galaxy.model.base import transaction
from galaxy.util.filelock import FileLock

log = logging.getLogger(__name__)


[docs]class InteractiveToolSqlite:
[docs] def __init__(self, sqlite_filename, encode_id): self.sqlite_filename = sqlite_filename self.encode_id = encode_id
[docs] def get(self, key, key_type): with FileLock(self.sqlite_filename): conn = sqlite3.connect(self.sqlite_filename) try: c = conn.cursor() select = f"""SELECT token, host, port, info FROM {DATABASE_TABLE_NAME} WHERE key=? and key_type=?""" c.execute( select, ( key, key_type, ), ) try: token, host, port, info = c.fetchone() except TypeError: log.warning("get(): invalid key: %s key_type %s", key, key_type) return None return dict(key=key, key_type=key_type, token=token, host=host, port=port, info=info) finally: conn.close()
[docs] def save(self, key, key_type, token, host, port, info=None): """ Writeout a key, key_type, token, value store that is can be used for coordinating with external resources. """ assert key, ValueError("A non-zero length key is required.") assert key_type, ValueError("A non-zero length key_type is required.") assert token, ValueError("A non-zero length token is required.") with FileLock(self.sqlite_filename): conn = sqlite3.connect(self.sqlite_filename) try: c = conn.cursor() try: # Create table c.execute( """CREATE TABLE %s (key text, key_type text, token text, host text, port integer, info text, PRIMARY KEY (key, key_type) )""" % (DATABASE_TABLE_NAME) ) except Exception: pass delete = f"""DELETE FROM {DATABASE_TABLE_NAME} WHERE key=? and key_type=?""" c.execute( delete, ( key, key_type, ), ) insert = """INSERT INTO %s (key, key_type, token, host, port, info) VALUES (?, ?, ?, ?, ?, ?)""" % ( DATABASE_TABLE_NAME ) c.execute( insert, ( key, key_type, token, host, port, info, ), ) conn.commit() finally: conn.close()
[docs] def remove(self, **kwd): """ Remove entry from a key, key_type, token, value store that is can be used for coordinating with external resources. Remove entries that match all provided key=values """ assert kwd, ValueError("You must provide some values to key upon") delete = f"DELETE FROM {DATABASE_TABLE_NAME} WHERE" value_list = [] for i, (key, value) in enumerate(kwd.items()): if i != 0: delete += " and" delete += f" {key}=?" value_list.append(value) with FileLock(self.sqlite_filename): conn = sqlite3.connect(self.sqlite_filename) try: c = conn.cursor() try: # Delete entry c.execute(delete, tuple(value_list)) except Exception as e: log.debug("Error removing entry (%s): %s", delete, e) conn.commit() finally: conn.close()
[docs] def save_entry_point(self, entry_point): """Convenience method to easily save an entry_point.""" return self.encode_id(, entry_point.__class__.__name__.lower(), entry_point.token,, entry_point.port, None, )
[docs] def remove_entry_point(self, entry_point): """Convenience method to easily remove an entry_point.""" return self.remove(key=self.encode_id(, key_type=entry_point.__class__.__name__.lower())
[docs]class InteractiveToolManager: """ Manager for dealing with InteractiveTools """
[docs] def __init__(self, app): = app self.model = app.model = self.sa_session = app.model.context self.job_manager = app.job_manager self.propagator = InteractiveToolSqlite(app.config.interactivetools_map,
[docs] def create_entry_points(self, job, tool, entry_points=None, flush=True): entry_points = entry_points or tool.ports for entry in entry_points: ep = self.model.InteractiveToolEntryPoint( job=job, tool_port=entry["port"], entry_url=entry["url"], name=entry["name"], requires_domain=entry["requires_domain"],, ) self.sa_session.add(ep) if flush: with transaction(self.sa_session): self.sa_session.commit()
[docs] def configure_entry_point(self, job, tool_port=None, host=None, port=None, protocol=None): return self.configure_entry_points( job, {tool_port: dict(tool_port=tool_port, host=host, port=port, protocol=protocol)} )
[docs] def configure_entry_points(self, job, ports_dict): # There can be multiple entry points that reference the same tool port (could have different entry URLs) configured = [] not_configured = [] for ep in job.interactivetool_entry_points: port_dict = ports_dict.get(str(ep.tool_port), None) if port_dict is None: log.error("Did not find port to assign to InteractiveToolEntryPoint by tool port: %s.", ep.tool_port) not_configured.append(ep) else: = port_dict["host"] ep.port = port_dict["port"] ep.protocol = port_dict["protocol"] ep.configured = True self.sa_session.add(ep) self.save_entry_point(ep) configured.append(ep) if configured: with transaction(self.sa_session): self.sa_session.commit() return dict(not_configured=not_configured, configured=configured)
[docs] def save_entry_point(self, entry_point): """ Writeout a key, key_type, token, value store that is used validating access """ self.propagator.save_entry_point(entry_point)
[docs] def create_interactivetool(self, job, tool, entry_points): # create from initial job if job and tool: self.create_entry_points(job, tool, entry_points) else: log.warning( "Called InteractiveToolManager.create_interactivetool, but job (%s) or tool (%s) is None", job, tool )
[docs] def get_nonterminal_for_user_by_trans(self, trans): if trans.user is None and trans.get_galaxy_session() is None: return [] if trans.user: jobs = trans.sa_session.query( == trans.user) else: jobs = trans.sa_session.query( == trans.get_galaxy_session().id ) def build_and_apply_filters(query, objects, filter_func): if objects is not None: if isinstance(objects, str): query = query.filter(filter_func(objects)) elif isinstance(objects, list): t = [] for obj in objects: t.append(filter_func(obj)) query = query.filter(or_(*t)) return query jobs = build_and_apply_filters( jobs,, lambda s: == s ) return trans.sa_session.query([ for job in jobs]) )
[docs] def can_access_job(self, trans, job): if job: if trans.user is None: galaxy_session = trans.get_galaxy_session() if galaxy_session is None or job.session_id != return False elif job.user != trans.user: return False else: return False return True
[docs] def can_access_entry_point(self, trans, entry_point): if entry_point: return self.can_access_job(trans, entry_point.job) return False
[docs] def can_access_entry_points(self, trans, entry_points): for ep in entry_points: if not self.can_access_entry_point(trans, ep): return False return True
[docs] def stop(self, trans, entry_point): self.remove_entry_point(entry_point) job = entry_point.job if not job.finished: log.debug("Stopping Job: %s for InteractiveToolEntryPoint: %s", job, entry_point) job.mark_stopped( # This self.job_manager.stop(job) does nothing without changing job.state, manually or e.g. with .mark_deleted() self.job_manager.stop(job) trans.sa_session.add(job) with transaction(trans.sa_session): trans.sa_session.commit()
[docs] def remove_entry_points(self, entry_points): if entry_points: for entry_point in entry_points: self.remove_entry_point(entry_point, flush=False) with transaction(self.sa_session): self.sa_session.commit()
[docs] def remove_entry_point(self, entry_point, flush=True): entry_point.deleted = True self.sa_session.add(entry_point) if flush: with transaction(self.sa_session): self.sa_session.commit() self.propagator.remove_entry_point(entry_point)
[docs] def target_if_active(self, trans, entry_point): if and not entry_point.deleted: request_host = if not and request_host = protocol = trans.request.host_url.split("//", 1)[0] if entry_point.requires_domain: rval = f"{protocol}//{self.get_entry_point_subdomain(trans, entry_point)}.{request_host}/" if entry_point.entry_url: rval = "{}/{}".format(rval.rstrip("/"), entry_point.entry_url.lstrip("/")) else: rval = self.get_entry_point_path(trans, entry_point) if not and rval = f"{protocol}//{request_host}{rval}" return rval
[docs] def get_entry_point_subdomain(self, trans, entry_point): entry_point_encoded_id = entry_point_class = entry_point.__class__.__name__.lower() entry_point_prefix = entry_point_token = entry_point.token if return f"{entry_point_encoded_id}-{entry_point_token[:10]}.{entry_point_prefix}" return f"{entry_point_encoded_id}-{entry_point_token}.{entry_point_class}.{entry_point_prefix}"
[docs] def get_entry_point_path(self, trans, entry_point): entry_point_encoded_id = entry_point_class = entry_point.__class__.__name__.lower() entry_point_prefix = rval = "/" if not entry_point.requires_domain: rval = str("/").lstrip("/") if rval = f"/{rval}/{entry_point_prefix}/{entry_point_encoded_id}/{entry_point.token[:10]}/" else: rval = f"/{rval}/{entry_point_prefix}/access/{entry_point_class}/{entry_point_encoded_id}/{entry_point.token}/" if entry_point.entry_url: rval = f"{rval.rstrip('/')}/{entry_point.entry_url.lstrip('/')}" rval = "/" + rval.lstrip("/") return rval
[docs] def access_entry_point_target(self, trans, entry_point_id): entry_point = trans.sa_session.query(model.InteractiveToolEntryPoint).get(entry_point_id) if, entry_point): if return self.target_if_active(trans, entry_point) elif entry_point.deleted: raise exceptions.MessageException("InteractiveTool has ended. You will have to start a new one.") else: raise exceptions.MessageException( "InteractiveTool is not active. If you recently launched this tool it may not be ready yet, please wait a moment and refresh this page." ) raise exceptions.ItemAccessibilityException("You do not have access to this InteractiveTool entry point.")