"""
Contains functionality needed in every web interface
"""
import logging
from collections.abc import Callable
from typing import (
Any,
)
from webob.exc import (
HTTPBadRequest,
HTTPInternalServerError,
HTTPNotImplemented,
)
from galaxy import (
exceptions,
security,
util,
web,
)
from galaxy.datatypes.interval import ChromatinInteractions
from galaxy.managers import (
base as managers_base,
users,
workflows,
)
from galaxy.managers.forms import (
get_filtered_form_definitions_current,
get_form_definitions,
get_form_definitions_current,
)
from galaxy.managers.sharable import (
slug_exists,
SlugBuilder,
)
from galaxy.model import (
Dataset,
ExtendedMetadata,
ExtendedMetadataIndex,
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
LibraryDatasetDatasetAssociation,
LibraryDatasetPermissions,
StoredWorkflow,
)
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.structured_app import BasicSharedApp
from galaxy.util.sanitize_html import sanitize_html
from galaxy.web import (
error,
url_for,
)
from galaxy.web.form_builder import (
AddressField,
CheckboxField,
PasswordField,
)
from galaxy.workflow.modules import WorkflowModuleInjector
log = logging.getLogger(__name__)
# States for passing messages
SUCCESS, INFO, WARNING, ERROR = "done", "info", "warning", "error"
class BaseController:
"""
Base class for Galaxy web application controllers.
"""
def __init__(self, app: BasicSharedApp):
"""Initialize an interface for application 'app'"""
self.app = app
self.sa_session = app.model.context
self.user_manager = users.UserManager(app)
def get_toolbox(self):
"""Returns the application toolbox"""
return self.app.toolbox
def get_class(self, class_name):
"""Returns the class object that a string denotes. Without this method, we'd have to do eval(<class_name>)."""
return managers_base.get_class(class_name)
def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None):
"""
Convenience method to get a model object with the specified checks.
"""
return managers_base.get_object(
trans, id, class_name, check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted
)
# this should be here - but catching errors from sharable item controllers that *should* have SharableItemMixin
# but *don't* then becomes difficult
# def security_check( self, trans, item, check_ownership=False, check_accessible=False ):
# log.warning( 'BaseController.security_check: %s, %b, %b', str( item ), check_ownership, check_accessible )
# # meant to be overridden in SharableSecurityMixin
# return item
def get_user(self, trans, id, check_ownership=False, check_accessible=False, deleted=None):
return self.get_object(trans, id, "User", check_ownership=False, check_accessible=False, deleted=deleted)
def get_group(self, trans, id, check_ownership=False, check_accessible=False, deleted=None):
return self.get_object(trans, id, "Group", check_ownership=False, check_accessible=False, deleted=deleted)
def get_role(self, trans, id, check_ownership=False, check_accessible=False, deleted=None):
return self.get_object(trans, id, "Role", check_ownership=False, check_accessible=False, deleted=deleted)
# ---- parsing query params
def decode_id(self, id):
return managers_base.decode_id(self.app, id)
def encode_all_ids(self, trans, rval, recursive=False):
"""
Encodes all integer values in the dict rval whose keys are 'id' or end with '_id'
It might be useful to turn this in to a decorator
"""
return trans.security.encode_all_ids(rval, recursive=recursive)
# TODO this will be replaced by lib.galaxy.managers.base.ModelFilterParser.build_filter_params
def parse_filter_params(self, qdict, filter_attr_key="q", filter_value_key="qv", attr_op_split_char="-"):
""" """
# TODO: import DEFAULT_OP from FilterParser
DEFAULT_OP = "eq"
if filter_attr_key not in qdict:
return []
# precondition: attrs/value pairs are in-order in the qstring
attrs = qdict.get(filter_attr_key)
if not isinstance(attrs, list):
attrs = [attrs]
# ops are strings placed after the attr strings and separated by a split char (e.g. 'create_time-lt')
# ops are optional and default to 'eq'
reparsed_attrs = []
ops = []
for attr in attrs:
op = DEFAULT_OP
if attr_op_split_char in attr:
# note: only split the last (e.g. q=community-tags-in&qv=rna yields ( 'community-tags', 'in', 'rna' )
attr, op = attr.rsplit(attr_op_split_char, 1)
ops.append(op)
reparsed_attrs.append(attr)
attrs = reparsed_attrs
values = qdict.get(filter_value_key, [])
if not isinstance(values, list):
values = [values]
# TODO: it may be more helpful to the consumer if we error on incomplete 3-tuples
# (instead of relying on zip to shorten)
return list(zip(attrs, ops, values))
def parse_limit_offset(self, qdict):
""" """
def _parse_pos_int(i):
try:
new_val = int(i)
if new_val >= 0:
return new_val
except (TypeError, ValueError):
pass
return None
limit = _parse_pos_int(qdict.get("limit", None))
offset = _parse_pos_int(qdict.get("offset", None))
return (limit, offset)
Root = BaseController
class BaseUIController(BaseController):
def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None):
try:
return BaseController.get_object(
self,
trans,
id,
class_name,
check_ownership=check_ownership,
check_accessible=check_accessible,
deleted=deleted,
)
except exceptions.MessageException:
raise # handled in the caller
except Exception:
log.exception("Exception in get_object check for %s %s:", class_name, str(id))
raise Exception(f"Server error retrieving {class_name} id ( {str(id)} ).")
def message_exception(self, trans, message, sanitize=True):
trans.response.status = 400
return {"err_msg": util.sanitize_text(message) if sanitize else message}
class BaseAPIController(BaseController):
def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None):
try:
return BaseController.get_object(
self,
trans,
id,
class_name,
check_ownership=check_ownership,
check_accessible=check_accessible,
deleted=deleted,
)
except exceptions.MessageException:
raise
except Exception as e:
log.exception("Exception in get_object check for %s %s.", class_name, str(id))
raise HTTPInternalServerError(comment=util.unicodify(e))
def not_implemented(self, trans, **kwd):
raise HTTPNotImplemented()
def _parse_serialization_params(self, kwd, default_view):
view = kwd.get("view", None)
keys = kwd.get("keys")
if isinstance(keys, str):
keys = keys.split(",")
return dict(view=view, keys=keys, default_view=default_view)
# TODO: this will be replaced by lib.galaxy.schema.FilterQueryParams.build_order_by
def _parse_order_by(self, manager, order_by_string):
if (ORDER_BY_SEP_CHAR := ",") in order_by_string:
return [manager.parse_order_by(o) for o in order_by_string.split(ORDER_BY_SEP_CHAR)]
return manager.parse_order_by(order_by_string)
class Datatype:
"""Used for storing in-memory list of datatypes currently in the datatypes registry."""
def __init__(self, extension, dtype, type_extension, mimetype, display_in_upload):
self.extension = extension
self.dtype = dtype
self.type_extension = type_extension
self.mimetype = mimetype
self.display_in_upload = display_in_upload
#
# -- Mixins for working with Galaxy objects. --
#
class SharableItemSecurityMixin:
"""Mixin for handling security for sharable items."""
def security_check(self, trans, item, check_ownership=False, check_accessible=False):
"""Security checks for an item: checks if (a) user owns item or (b) item is accessible to user."""
return managers_base.security_check(
trans, item, check_ownership=check_ownership, check_accessible=check_accessible
)
class UsesLibraryMixinItems(SharableItemSecurityMixin):
get_object: Callable
def get_library_folder(self, trans, id: int, check_ownership=False, check_accessible=True):
return self.get_object(trans, id, "LibraryFolder", check_ownership=False, check_accessible=check_accessible)
def get_library_dataset_dataset_association(self, trans, id, check_ownership=False, check_accessible=True):
# Deprecated in lieu to galaxy.managers.lddas.LDDAManager.get() but not
# reusing that exactly because of subtle differences in exception handling
# logic (API controller override get_object to be slightly different).
return self.get_object(
trans, id, "LibraryDatasetDatasetAssociation", check_ownership=False, check_accessible=check_accessible
)
def get_library_dataset(self, trans, id, check_ownership=False, check_accessible=True):
return self.get_object(trans, id, "LibraryDataset", check_ownership=False, check_accessible=check_accessible)
# TODO: it makes no sense that I can get roles from a user but not user.is_admin()
# def can_user_add_to_library_item( self, trans, user, item ):
# if not user: return False
# return ( ( user.is_admin() )
# or ( trans.app.security_agent.can_add_library_item( user.all_roles(), item ) ) )
def can_current_user_add_to_library_item(self, trans, item):
if not trans.user:
return False
return trans.user_is_admin or trans.app.security_agent.can_add_library_item(
trans.get_current_user_roles(), item
)
def check_user_can_add_to_library_item(self, trans, item, check_accessible=True):
"""
Raise exception if user cannot add to the specified library item (i.e.
Folder). Can set check_accessible to False if folder was loaded with
this check.
"""
if not trans.user:
raise exceptions.ItemAccessibilityException("Anonymous users cannot add to library items")
current_user_roles = trans.get_current_user_roles()
if trans.user_is_admin:
return True
if check_accessible:
if not trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user):
raise exceptions.ItemAccessibilityException("You do not have access to the requested item")
if not trans.app.security_agent.can_add_library_item(trans.get_current_user_roles(), item):
# Slight misuse of ItemOwnershipException?
raise exceptions.ItemOwnershipException("User cannot add to library item.")
def _copy_hdca_to_library_folder(self, trans, hda_manager, from_hdca_id: int, folder_id: int, ldda_message=""):
"""
Fetches the collection identified by `from_hcda_id` and dispatches individual collection elements to
_copy_hda_to_library_folder
"""
hdca = trans.sa_session.get(HistoryDatasetCollectionAssociation, from_hdca_id)
if hdca.collection.collection_type != "list":
raise exceptions.NotImplemented(
"Cannot add nested collections to library. Please flatten your collection first."
)
hdas = []
for element in hdca.collection.elements:
hdas.append((element.element_identifier, element.dataset_instance.id))
return [
self._copy_hda_to_library_folder(
trans,
hda_manager=hda_manager,
from_hda_id=hda_id,
folder_id=folder_id,
ldda_message=ldda_message,
element_identifier=element_identifier,
)
for (element_identifier, hda_id) in hdas
]
def _copy_hda_to_library_folder(
self, trans, hda_manager, from_hda_id: int, folder_id: int, ldda_message="", element_identifier=None
):
"""
Copies hda ``from_hda_id`` to library folder ``folder_id``, optionally
adding ``ldda_message`` to the new ldda's ``message``.
``library_contents.create`` will branch to this if called with 'from_hda_id'
in its payload.
"""
log.debug(f"_copy_hda_to_library_folder: {str((from_hda_id, folder_id, ldda_message))}")
# TODO: allow name and other, editable ldda attrs?
if ldda_message:
ldda_message = sanitize_html(ldda_message)
# check permissions on (all three?) resources: hda, library, folder
# TODO: do we really need the library??
hda = hda_manager.get_owned(from_hda_id, trans.user, current_history=trans.history)
hda = hda_manager.error_if_uploading(hda)
folder = self.get_library_folder(trans, folder_id, check_accessible=True)
# TOOD: refactor to use check_user_can_add_to_library_item, eliminate boolean
# can_current_user_add_to_library_item.
if folder.parent_library.deleted:
raise exceptions.ObjectAttributeInvalidException(
"You cannot add datasets into deleted library. Undelete it first."
)
if not self.can_current_user_add_to_library_item(trans, folder):
raise exceptions.InsufficientPermissionsException(
"You do not have proper permissions to add a dataset to this folder,"
)
ldda = self.copy_hda_to_library_folder(
trans, hda, folder, ldda_message=ldda_message, element_identifier=element_identifier
)
# I don't see a reason why hdas copied into libraries should not be visible.
# If there is, refactor `ldda.visible = True` to do this only when adding HDCAs.
ldda.visible = True
ldda.update_parent_folder_update_times()
trans.sa_session.commit()
ldda_dict = ldda.to_dict()
rval = trans.security.encode_dict_ids(ldda_dict)
update_time = ldda.update_time.isoformat()
rval["update_time"] = update_time
return rval
def copy_hda_to_library_folder(
self, trans, hda, library_folder, roles=None, ldda_message="", element_identifier=None
):
# PRECONDITION: permissions for this action on hda and library_folder have been checked
roles = roles or []
# this code was extracted from library_common.add_history_datasets_to_library
# TODO: refactor library_common.add_history_datasets_to_library to use this for each hda to copy
# create the new ldda and apply the folder perms to it
ldda = hda.to_library_dataset_dataset_association(
trans,
target_folder=library_folder,
roles=roles,
ldda_message=ldda_message,
element_identifier=element_identifier,
)
self._apply_library_folder_permissions_to_ldda(trans, library_folder, ldda)
self._apply_hda_permissions_to_ldda(trans, hda, ldda)
# TODO:?? not really clear on how permissions are being traded here
# seems like hda -> ldda permissions should be set in to_library_dataset_dataset_association
# then they get reset in _apply_library_folder_permissions_to_ldda
# then finally, re-applies hda -> ldda for missing actions in _apply_hda_permissions_to_ldda??
return ldda
def _apply_library_folder_permissions_to_ldda(self, trans, library_folder, ldda):
"""
Copy actions/roles from library folder to an ldda (and its library_dataset).
"""
# PRECONDITION: permissions for this action on library_folder and ldda have been checked
security_agent = trans.app.security_agent
security_agent.copy_library_permissions(trans, library_folder, ldda)
security_agent.copy_library_permissions(trans, library_folder, ldda.library_dataset)
return security_agent.get_permissions(ldda)
def _apply_hda_permissions_to_ldda(self, trans, hda, ldda):
"""
Copy actions/roles from hda to ldda.library_dataset (and then ldda) if ldda
doesn't already have roles for the given action.
"""
# PRECONDITION: permissions for this action on hda and ldda have been checked
# Make sure to apply any defined dataset permissions, allowing the permissions inherited from the
# library_dataset to over-ride the same permissions on the dataset, if they exist.
security_agent = trans.app.security_agent
dataset_permissions_dict = security_agent.get_permissions(hda.dataset)
library_dataset = ldda.library_dataset
library_dataset_actions = [permission.action for permission in library_dataset.actions]
# except that: if DATASET_MANAGE_PERMISSIONS exists in the hda.dataset permissions,
# we need to instead apply those roles to the LIBRARY_MANAGE permission to the library dataset
dataset_manage_permissions_action = security_agent.get_action("DATASET_MANAGE_PERMISSIONS").action
library_manage_permissions_action = security_agent.get_action("LIBRARY_MANAGE").action
# TODO: test this and remove if in loop below
# TODO: doesn't handle action.action
# if dataset_manage_permissions_action in dataset_permissions_dict:
# managing_roles = dataset_permissions_dict.pop( dataset_manage_permissions_action )
# dataset_permissions_dict[ library_manage_permissions_action ] = managing_roles
flush_needed = False
for action, dataset_permissions_roles in dataset_permissions_dict.items():
if isinstance(action, security.Action):
action = action.action
# alter : DATASET_MANAGE_PERMISSIONS -> LIBRARY_MANAGE (see above)
if action == dataset_manage_permissions_action:
action = library_manage_permissions_action
# TODO: generalize to util.update_dict_without_overwrite
# add the hda actions & roles to the library_dataset
# NOTE: only apply an hda perm if it's NOT set in the library_dataset perms (don't overwrite)
if action not in library_dataset_actions:
for role in dataset_permissions_roles:
ldps = LibraryDatasetPermissions(action, library_dataset, role)
ldps = [ldps] if not isinstance(ldps, list) else ldps
for ldp in ldps:
trans.sa_session.add(ldp)
flush_needed = True
if flush_needed:
trans.sa_session.commit()
# finally, apply the new library_dataset to its associated ldda (must be the same)
security_agent.copy_library_permissions(trans, library_dataset, ldda)
return security_agent.get_permissions(ldda)
class UsesVisualizationMixin(UsesLibraryMixinItems):
"""
Mixin for controllers that use Visualization objects.
"""
slug_builder = SlugBuilder()
def get_visualization_config(self, trans, visualization):
"""Returns a visualization's configuration."""
latest_revision = visualization.latest_revision
config = latest_revision.config
return config
def get_hda_or_ldda(self, trans, hda_ldda, dataset_id):
"""Returns either HDA or LDDA for hda/ldda and id combination."""
if hda_ldda == "hda":
return self.get_hda(trans, dataset_id, check_ownership=False, check_accessible=True)
else:
return self.get_library_dataset_dataset_association(trans, dataset_id)
def get_hda(self, trans, dataset_id, check_ownership=True, check_accessible=False, check_state=True):
"""
Get an HDA object by id performing security checks using
the current transaction.
Deprecated in lieu to galaxy.managers.hdas.HDAManager.get_accessible(decoded_id, user)
"""
try:
dataset_id = trans.security.decode_id(dataset_id)
except (AttributeError, TypeError):
raise HTTPBadRequest(f"Invalid dataset id: {str(dataset_id)}.")
try:
data = trans.sa_session.get(HistoryDatasetAssociation, int(dataset_id))
except Exception:
raise HTTPBadRequest(f"Invalid dataset id: {str(dataset_id)}.")
if not data:
raise HTTPBadRequest(f"Invalid dataset id: {str(dataset_id)}.")
if check_ownership:
# Verify ownership.
user = trans.get_user()
if not user:
error("Must be logged in to manage Galaxy items")
if data.user != user:
error(f"{data.__class__.__name__} is not owned by current user")
if check_accessible:
current_user_roles = trans.get_current_user_roles()
if not trans.app.security_agent.can_access_dataset(current_user_roles, data.dataset):
error("You are not allowed to access this dataset")
if check_state and data.state == Dataset.states.UPLOAD:
return trans.show_error_message(
"Please wait until this dataset finishes uploading " + "before attempting to view it."
)
return data
def _get_genome_data(self, trans, dataset, dbkey=None):
"""
Returns genome-wide data for dataset if available; if not, message is returned.
"""
rval = None
# Get data sources.
data_sources = dataset.get_datasources(trans)
query_dbkey = dataset.dbkey
if query_dbkey == "?":
query_dbkey = dbkey
chroms_info = self.app.genomes.chroms(trans, dbkey=query_dbkey)
# If there are no messages (messages indicate data is not ready/available), get data.
messages_list = [data_source_dict["message"] for data_source_dict in data_sources.values()]
if message := self._get_highest_priority_msg(messages_list):
rval = message
else:
# HACK: chromatin interactions tracks use data as source.
source = "index"
if isinstance(dataset.datatype, ChromatinInteractions):
source = "data"
data_provider = trans.app.data_provider_registry.get_data_provider(
trans, original_dataset=dataset, source=source
)
# HACK: pass in additional params which are used for only some
# types of data providers; level, cutoffs used for summary tree,
# num_samples for BBI, and interchromosomal used for chromatin interactions.
rval = data_provider.get_genome_data(
chroms_info, level=4, detail_cutoff=0, draw_cutoff=0, num_samples=150, interchromosomal=True
)
return rval
# FIXME: this method probably belongs down in the model.Dataset class.
def _get_highest_priority_msg(self, message_list):
"""
Returns highest priority message from a list of messages.
"""
return_message = None
# For now, priority is: job error (dict), no converter, pending.
for message in message_list:
if message is not None:
if isinstance(message, dict):
return_message = message
break
elif message == "no converter":
return_message = message
elif return_message is None and message == "pending":
return_message = message
return return_message
class UsesStoredWorkflowMixin(SharableItemSecurityMixin, UsesAnnotations):
"""Mixin for controllers that use StoredWorkflow objects."""
slug_builder = SlugBuilder()
def get_stored_workflow(self, trans, id, check_ownership=True, check_accessible=False):
"""Get a StoredWorkflow from the database by id, verifying ownership."""
# Load workflow from database
workflow_contents_manager = workflows.WorkflowsManager(self.app)
workflow = workflow_contents_manager.get_stored_workflow(trans=trans, workflow_id=id)
if not workflow:
error("Workflow not found")
else:
self.security_check(trans, workflow, check_ownership, check_accessible)
# Older workflows may be missing slugs, so set them here.
if not workflow.slug:
self.slug_builder.create_item_slug(trans.sa_session, workflow)
trans.sa_session.commit()
return workflow
def get_stored_workflow_steps(self, trans, stored_workflow: StoredWorkflow):
"""Restores states for a stored workflow's steps."""
module_injector = WorkflowModuleInjector(trans)
workflow = stored_workflow.latest_workflow
module_injector.inject_all(workflow, exact_tools=False, ignore_tool_missing_exception=True)
for step in workflow.steps:
try:
module_injector.compute_runtime_state(step)
except exceptions.ToolMissingException:
pass
def _import_shared_workflow(self, trans, stored: StoredWorkflow):
"""Imports a shared workflow"""
# Copy workflow.
imported_stored = StoredWorkflow()
imported_stored.name = f"imported: {stored.name}"
workflow = stored.latest_workflow.copy(user=trans.user)
workflow.stored_workflow = imported_stored
imported_stored.latest_workflow = workflow
imported_stored.user = trans.user
imported_stored.copy_tags_from(stored.user, stored)
# Save new workflow.
session = trans.sa_session
session.add(imported_stored)
session.commit()
# Copy annotations.
self.copy_item_annotation(session, stored.user, stored, imported_stored.user, imported_stored)
for order_index, step in enumerate(stored.latest_workflow.steps):
self.copy_item_annotation(
session, stored.user, step, imported_stored.user, imported_stored.latest_workflow.steps[order_index]
)
session.commit()
return imported_stored
def _workflow_to_dict(self, trans, stored):
"""
Converts a workflow to a dict of attributes suitable for exporting.
"""
workflow_contents_manager = workflows.WorkflowContentsManager(self.app, self.app.trs_proxy)
return workflow_contents_manager.workflow_to_dict(
trans,
stored,
)
[docs]
class SharableMixin:
"""Mixin for a controller that manages an item that can be shared."""
manager: Any = None
serializer: Any = None
slug_builder = SlugBuilder()
# -- Implemented methods. --
def _is_valid_slug(self, slug):
"""Returns true if slug is valid."""
return SlugBuilder.is_valid_slug(slug)
[docs]
@web.expose
@web.require_login("modify Galaxy items")
def set_slug_async(self, trans, id, new_slug):
item = self.get_item(trans, id)
if item:
# Only update slug if slug is not already in use.
if not slug_exists(trans.sa_session, item.__class__, item.user, new_slug):
item.slug = new_slug
trans.sa_session.commit()
return item.slug
def _make_item_accessible(self, sa_session, item):
"""Makes item accessible--viewable and importable--and sets item's slug.
Does not flush/commit changes, however. Item must have name, user,
importable, and slug attributes."""
item.importable = True
self.slug_builder.create_item_slug(sa_session, item)
# -- Abstract methods. --
[docs]
@web.expose
@web.require_login("share Galaxy items")
def share(self, trans, id=None, email="", **kwd):
"""Handle sharing an item with a particular user."""
raise NotImplementedError()
[docs]
@web.expose
def display_by_username_and_slug(self, trans, username, slug, **kwargs):
"""Display item by username and slug."""
# Ensure slug is in the correct format.
slug = slug.encode("latin1").decode("utf-8")
self._display_by_username_and_slug(trans, username, slug, **kwargs)
def _display_by_username_and_slug(self, trans, username, slug, **kwargs):
raise NotImplementedError()
[docs]
def get_item(self, trans, id):
"""Return item based on id."""
raise NotImplementedError()
[docs]
def sort_by_attr(seq, attr):
"""
Sort the sequence of objects by object's attribute
Arguments:
seq - the list or any sequence (including immutable one) of objects to sort.
attr - the name of attribute to sort by
"""
# Use the "Schwartzian transform"
# Create the auxiliary list of tuples where every i-th tuple has form
# (seq[i].attr, i, seq[i]) and sort it. The second item of tuple is needed not
# only to provide stable sorting, but mainly to eliminate comparison of objects
# (which can be expensive or prohibited) in case of equal attribute values.
intermed = [(getattr(v, attr), i, v) for i, v in enumerate(seq)]
intermed.sort()
return [_[-1] for _ in intermed]
__all__ = (
"HTTPBadRequest",
"SharableMixin",
"sort_by_attr",
"url_for",
"UsesExtendedMetadataMixin",
"UsesFormDefinitionsMixin",
"UsesTagsMixin",
"web",
)