Source code for galaxy.webapps.base.controller

Contains functionality needed in every web interface
import logging

from sqlalchemy import true
from webob.exc import (

from galaxy import (
from galaxy.datatypes.interval import ChromatinInteractions
from galaxy.managers import (
    base as managers_base,
from galaxy.model import (
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.sanitize_html import sanitize_html
from galaxy.web import (
from galaxy.web.form_builder import (
from galaxy.workflow.modules import WorkflowModuleInjector

log = logging.getLogger(__name__)

# States for passing messages
SUCCESS, INFO, WARNING, ERROR = "done", "info", "warning", "error"

[docs]class BaseController: """ Base class for Galaxy web application controllers. """
[docs] def __init__(self, app): """Initialize an interface for application 'app'""" self.app = app self.sa_session = app.model.context self.user_manager = users.UserManager(app)
[docs] def get_toolbox(self): """Returns the application toolbox""" return self.app.toolbox
[docs] def get_class(self, class_name): """ Returns the class object that a string denotes. Without this method, we'd have to do eval(<class_name>). """ return managers_base.get_class(class_name)
[docs] def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None): """ Convenience method to get a model object with the specified checks. """ return managers_base.get_object(trans, id, class_name, check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted)
# this should be here - but catching errors from sharable item controllers that *should* have SharableItemMixin # but *don't* then becomes difficult # def security_check( self, trans, item, check_ownership=False, check_accessible=False ): # log.warning( 'BaseController.security_check: %s, %b, %b', str( item ), check_ownership, check_accessible ) # # meant to be overridden in SharableSecurityMixin # return item
[docs] def get_user(self, trans, id, check_ownership=False, check_accessible=False, deleted=None): return self.get_object(trans, id, 'User', check_ownership=False, check_accessible=False, deleted=deleted)
[docs] def get_group(self, trans, id, check_ownership=False, check_accessible=False, deleted=None): return self.get_object(trans, id, 'Group', check_ownership=False, check_accessible=False, deleted=deleted)
[docs] def get_role(self, trans, id, check_ownership=False, check_accessible=False, deleted=None): return self.get_object(trans, id, 'Role', check_ownership=False, check_accessible=False, deleted=deleted)
# ---- parsing query params
[docs] def decode_id(self, id): return managers_base.decode_id(self.app, id)
[docs] def encode_all_ids(self, trans, rval, recursive=False): """ Encodes all integer values in the dict rval whose keys are 'id' or end with '_id' It might be useful to turn this in to a decorator """ return trans.security.encode_all_ids(rval, recursive=recursive)
[docs] def parse_filter_params(self, qdict, filter_attr_key='q', filter_value_key='qv', attr_op_split_char='-'): """ """ # TODO: import DEFAULT_OP from FilterParser DEFAULT_OP = 'eq' if filter_attr_key not in qdict: return [] # precondition: attrs/value pairs are in-order in the qstring attrs = qdict.get(filter_attr_key) if not isinstance(attrs, list): attrs = [attrs] # ops are strings placed after the attr strings and separated by a split char (e.g. 'create_time-lt') # ops are optional and default to 'eq' reparsed_attrs = [] ops = [] for attr in attrs: op = DEFAULT_OP if attr_op_split_char in attr: # note: only split the last (e.g. q=community-tags-in&qv=rna yields ( 'community-tags', 'in', 'rna' ) attr, op = attr.rsplit(attr_op_split_char, 1) ops.append(op) reparsed_attrs.append(attr) attrs = reparsed_attrs values = qdict.get(filter_value_key, []) if not isinstance(values, list): values = [values] # TODO: it may be more helpful to the consumer if we error on incomplete 3-tuples # (instead of relying on zip to shorten) return list(zip(attrs, ops, values))
[docs] def parse_limit_offset(self, qdict): """ """ def _parse_pos_int(i): try: new_val = int(i) if new_val >= 0: return new_val except (TypeError, ValueError): pass return None limit = _parse_pos_int(qdict.get('limit', None)) offset = _parse_pos_int(qdict.get('offset', None)) return (limit, offset)
Root = BaseController
[docs]class BaseUIController(BaseController):
[docs] def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None): try: return BaseController.get_object(self, trans, id, class_name, check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted) except exceptions.MessageException: raise # handled in the caller except Exception: log.exception("Exception in get_object check for %s %s:", class_name, str(id)) raise Exception('Server error retrieving {} id ( {} ).'.format(class_name, str(id)))
[docs] def message_exception(self, trans, message, sanitize=True): trans.response.status = 400 return {'err_msg': util.sanitize_text(message) if sanitize else message}
[docs]class BaseAPIController(BaseController):
[docs] def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None): try: return BaseController.get_object(self, trans, id, class_name, check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted) except exceptions.ItemDeletionException as e: raise HTTPBadRequest(detail="Invalid {} id ( {} ) specified: {}".format(class_name, str(id), util.unicodify(e))) except exceptions.MessageException as e: raise HTTPBadRequest(detail=e.err_msg) except Exception as e: log.exception("Exception in get_object check for %s %s.", class_name, str(id)) raise HTTPInternalServerError(comment=util.unicodify(e))
[docs] def validate_in_users_and_groups(self, trans, payload): """ For convenience, in_users and in_groups can be encoded IDs or emails/group names in the API. """ def get_id(item, model_class, column): try: return trans.security.decode_id(item) except Exception: pass # maybe an email/group name # this will raise if the item is invalid return trans.sa_session.query(model_class).filter(column == item).first().id new_in_users = [] new_in_groups = [] invalid = [] for item in util.listify(payload.get('in_users', [])): try: new_in_users.append(get_id(item, trans.app.model.User, trans.app.model.User.table.c.email)) except Exception: invalid.append(item) for item in util.listify(payload.get('in_groups', [])): try: new_in_groups.append(get_id(item, trans.app.model.Group, trans.app.model.Group.table.c.name)) except Exception: invalid.append(item) if invalid: msg = "The following value(s) for associated users and/or groups could not be parsed: %s." % ', '.join(invalid) msg += " Valid values are email addresses of users, names of groups, or IDs of both." raise Exception(msg) payload['in_users'] = list(map(str, new_in_users)) payload['in_groups'] = list(map(str, new_in_groups))
[docs] def not_implemented(self, trans, **kwd): raise HTTPNotImplemented()
def _parse_serialization_params(self, kwd, default_view): view = kwd.get('view', None) keys = kwd.get('keys') if isinstance(keys, str): keys = keys.split(',') return dict(view=view, keys=keys, default_view=default_view) def _parse_order_by(self, manager, order_by_string): ORDER_BY_SEP_CHAR = ',' if ORDER_BY_SEP_CHAR in order_by_string: return [manager.parse_order_by(o) for o in order_by_string.split(ORDER_BY_SEP_CHAR)] return manager.parse_order_by(order_by_string)
[docs]class JSAppLauncher(BaseUIController): """ A controller that launches JavaScript web applications. """ #: path to js app template JS_APP_MAKO_FILEPATH = "/js-app.mako" #: window-scoped js function to call to start the app (will be passed options, bootstrapped) DEFAULT_ENTRY_FN = "app" #: keys used when serializing current user for bootstrapped data USER_BOOTSTRAP_KEYS = ('id', 'email', 'username', 'is_admin', 'tags_used', 'total_disk_usage', 'nice_total_disk_usage', 'quota_percent', 'preferences')
[docs] def __init__(self, app): super().__init__(app) self.user_manager = users.UserManager(app) self.user_serializer = users.CurrentUserSerializer(app) self.config_serializer = configuration.ConfigSerializer(app) self.admin_config_serializer = configuration.AdminConfigSerializer(app)
def _check_require_login(self, trans): if self.app.config.require_login and self.user_manager.is_anonymous(trans.user): # TODO: this doesn't properly redirect when login is done # (see webapp __ensure_logged_in_user for the initial redirect - not sure why it doesn't redirect to login?) login_url = web.url_for(controller="root", action="login") trans.response.send_redirect(login_url)
[docs] @web.expose def client(self, trans, **kwd): """ Endpoint for clientside routes. This ships the primary SPA client. Should not be used with url_for -- see (https://github.com/galaxyproject/galaxy/issues/1878) for why. """ self._check_require_login(trans) return self._bootstrapped_client(trans, **kwd)
# This includes contextualized user options in the bootstrapped data; we # don't want to cache it. @web.do_not_cache def _bootstrapped_client(self, trans, app_name='analysis', **kwd): js_options = self._get_js_options(trans) js_options['config'].update(self._get_extended_config(trans)) return self.template(trans, app_name, options=js_options, **kwd) def _get_js_options(self, trans, root=None): """ Return a dictionary of session/site configuration/options to jsonify and pass onto the js app. Defaults to `config`, `user`, and the root url. Pass kwargs to update further. """ root = root or web.url_for('/') js_options = { 'root' : root, 'user' : self.user_serializer.serialize(trans.user, self.USER_BOOTSTRAP_KEYS, trans=trans), 'config' : self._get_site_configuration(trans), 'params' : dict(trans.request.params), 'session_csrf_token' : trans.session_csrf_token, } return js_options def _get_extended_config(self, trans): config = { 'active_view' : 'analysis', 'enable_cloud_launch' : trans.app.config.get_bool('enable_cloud_launch', False), 'enable_webhooks' : True if trans.app.webhooks_registry.webhooks else False, 'toolbox' : trans.app.toolbox.to_dict(trans), 'message_box_visible' : trans.app.config.message_box_visible, 'show_inactivity_warning' : trans.app.config.user_activation_on and trans.user and not trans.user.active, 'tool_shed_urls' : list(trans.app.tool_shed_registry.tool_sheds.values()) if trans.app.tool_shed_registry else [], 'tool_dynamic_configs' : list(trans.app.toolbox.dynamic_conf_filenames()) } # TODO: move to user stored_workflow_menu_index = {} stored_workflow_menu_entries = [] for menu_item in getattr(trans.user, 'stored_workflow_menu_entries', []): encoded_stored_workflow_id = trans.security.encode_id(menu_item.stored_workflow_id) if encoded_stored_workflow_id not in stored_workflow_menu_index: stored_workflow_menu_index[encoded_stored_workflow_id] = True stored_workflow_menu_entries.append({ 'id': encoded_stored_workflow_id, 'name': util.unicodify(menu_item.stored_workflow.name) }) config['stored_workflow_menu_entries'] = stored_workflow_menu_entries return config def _get_site_configuration(self, trans): """ Return a dictionary representing Galaxy's current configuration. """ try: serializer = self.config_serializer if self.user_manager.is_admin(trans.user, trans=trans): serializer = self.admin_config_serializer return serializer.serialize_to_view(self.app.config, view='all') except Exception as exc: log.exception(exc) return {}
[docs] def template(self, trans, app_name, entry_fn='app', options=None, bootstrapped_data=None, masthead=True, **additional_options): """ Render and return the single page mako template that starts the app. `app_name` (string): the first portion of the webpack bundle to as the app. `entry_fn` (string): the name of the window-scope function that starts the app. Defaults to 'app'. `bootstrapped_data` (dict): (optional) update containing any more data the app may need. `masthead` (boolean): (optional, default=True) include masthead elements in the initial page dom. `additional_options` (kwargs): update to the options sent to the app. """ options = options or self._get_js_options(trans) options.update(additional_options) return trans.fill_template( self.JS_APP_MAKO_FILEPATH, js_app_name=app_name, js_app_entry_fn=(entry_fn or self.DEFAULT_ENTRY_FN), options=options, bootstrapped=(bootstrapped_data or {}), masthead=masthead )
[docs]class Datatype: """Used for storing in-memory list of datatypes currently in the datatypes registry."""
[docs] def __init__(self, extension, dtype, type_extension, mimetype, display_in_upload): self.extension = extension self.dtype = dtype self.type_extension = type_extension self.mimetype = mimetype self.display_in_upload = display_in_upload
# # -- Mixins for working with Galaxy objects. -- #
[docs]class CreatesApiKeysMixin: """ Mixing centralizing logic for creating API keys for user objects. Deprecated - please use api_keys.ApiKeyManager for new development. """
[docs] def create_api_key(self, trans, user): return api_keys.ApiKeyManager(trans.app).create_api_key(user)
[docs]class SharableItemSecurityMixin: """ Mixin for handling security for sharable items. """
[docs] def security_check(self, trans, item, check_ownership=False, check_accessible=False): """ Security checks for an item: checks if (a) user owns item or (b) item is accessible to user. """ return managers_base.security_check(trans, item, check_ownership=check_ownership, check_accessible=check_accessible)
[docs]class ExportsHistoryMixin:
[docs] def serve_ready_history_export(self, trans, jeha): assert jeha.ready if jeha.compressed: trans.response.set_content_type('application/x-gzip') else: trans.response.set_content_type('application/x-tar') disposition = 'attachment; filename="%s"' % jeha.export_name trans.response.headers["Content-Disposition"] = disposition archive = trans.app.object_store.get_filename(jeha.dataset) return open(archive, mode='rb')
[docs] def queue_history_export(self, trans, history, gzip=True, include_hidden=False, include_deleted=False): # Convert options to booleans. if isinstance(gzip, str): gzip = (gzip in ['True', 'true', 'T', 't']) if isinstance(include_hidden, str): include_hidden = (include_hidden in ['True', 'true', 'T', 't']) if isinstance(include_deleted, str): include_deleted = (include_deleted in ['True', 'true', 'T', 't']) # Run job to do export. history_exp_tool = trans.app.toolbox.get_tool('__EXPORT_HISTORY__') params = { 'history_to_export': history, 'compress': gzip, 'include_hidden': include_hidden, 'include_deleted': include_deleted } history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True)
[docs]class ImportsHistoryMixin:
[docs] def queue_history_import(self, trans, archive_type, archive_source): # Run job to do import. history_imp_tool = trans.app.toolbox.get_tool('__IMPORT_HISTORY__') incoming = {'__ARCHIVE_SOURCE__' : archive_source, '__ARCHIVE_TYPE__' : archive_type} history_imp_tool.execute(trans, incoming=incoming)
[docs]class UsesLibraryMixin:
[docs] def get_library(self, trans, id, check_ownership=False, check_accessible=True): l = self.get_object(trans, id, 'Library') if check_accessible and not (trans.user_is_admin or trans.app.security_agent.can_access_library(trans.get_current_user_roles(), l)): error("LibraryFolder is not accessible to the current user") return l
[docs]class UsesLibraryMixinItems(SharableItemSecurityMixin):
[docs] def get_library_folder(self, trans, id, check_ownership=False, check_accessible=True): return self.get_object(trans, id, 'LibraryFolder', check_ownership=False, check_accessible=check_accessible)
[docs] def get_library_dataset_dataset_association(self, trans, id, check_ownership=False, check_accessible=True): # Deprecated in lieu to galaxy.managers.lddas.LDDAManager.get() but not # reusing that exactly because of subtle differences in exception handling # logic (API controller override get_object to be slightly different). return self.get_object(trans, id, 'LibraryDatasetDatasetAssociation', check_ownership=False, check_accessible=check_accessible)
[docs] def get_library_dataset(self, trans, id, check_ownership=False, check_accessible=True): return self.get_object(trans, id, 'LibraryDataset', check_ownership=False, check_accessible=check_accessible)
# TODO: it makes no sense that I can get roles from a user but not user.is_admin() # def can_user_add_to_library_item( self, trans, user, item ): # if not user: return False # return ( ( user.is_admin() ) # or ( trans.app.security_agent.can_add_library_item( user.all_roles(), item ) ) )
[docs] def can_current_user_add_to_library_item(self, trans, item): if not trans.user: return False return (trans.user_is_admin or trans.app.security_agent.can_add_library_item(trans.get_current_user_roles(), item))
[docs] def check_user_can_add_to_library_item(self, trans, item, check_accessible=True): """ Raise exception if user cannot add to the specified library item (i.e. Folder). Can set check_accessible to False if folder was loaded with this check. """ if not trans.user: return False current_user_roles = trans.get_current_user_roles() if trans.user_is_admin: return True if check_accessible: if not trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user): raise exceptions.ItemAccessibilityException('You do not have access to the requested item') if not trans.app.security_agent.can_add_library_item(trans.get_current_user_roles(), item): # Slight misuse of ItemOwnershipException? raise exceptions.ItemOwnershipException("User cannot add to library item.")
def _copy_hdca_to_library_folder(self, trans, hda_manager, from_hdca_id, folder_id, ldda_message=''): """ Fetches the collection identified by `from_hcda_id` and dispatches individual collection elements to _copy_hda_to_library_folder """ hdca = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(from_hdca_id) if hdca.collection.collection_type != 'list': raise exceptions.NotImplemented('Cannot add nested collections to library. Please flatten your collection first.') hdas = [] for element in hdca.collection.elements: hdas.append((element.element_identifier, element.dataset_instance.id)) return [self._copy_hda_to_library_folder(trans, hda_manager=hda_manager, from_hda_id=hda_id, folder_id=folder_id, ldda_message=ldda_message, element_identifier=element_identifier) for (element_identifier, hda_id) in hdas] def _copy_hda_to_library_folder(self, trans, hda_manager, from_hda_id, folder_id, ldda_message='', element_identifier=None): """ Copies hda ``from_hda_id`` to library folder ``folder_id``, optionally adding ``ldda_message`` to the new ldda's ``message``. ``library_contents.create`` will branch to this if called with 'from_hda_id' in its payload. """ log.debug('_copy_hda_to_library_folder: %s' % (str((from_hda_id, folder_id, ldda_message)))) # PRECONDITION: folder_id has already been altered to remove the folder prefix ('F') # TODO: allow name and other, editable ldda attrs? if ldda_message: ldda_message = sanitize_html(ldda_message) # check permissions on (all three?) resources: hda, library, folder # TODO: do we really need the library?? hda = hda_manager.get_owned(from_hda_id, trans.user, current_history=trans.history) hda = hda_manager.error_if_uploading(hda) folder = self.get_library_folder(trans, folder_id, check_accessible=True) # TOOD: refactor to use check_user_can_add_to_library_item, eliminate boolean # can_current_user_add_to_library_item. if folder.parent_library.deleted: raise exceptions.ObjectAttributeInvalidException('You cannot add datasets into deleted library. Undelete it first.') if not self.can_current_user_add_to_library_item(trans, folder): raise exceptions.InsufficientPermissionsException('You do not have proper permissions to add a dataset to this folder,') ldda = self.copy_hda_to_library_folder(trans, hda, folder, ldda_message=ldda_message, element_identifier=element_identifier) # I don't see a reason why hdas copied into libraries should not be visible. # If there is, refactor `ldda.visible = True` to do this only when adding HDCAs. ldda.visible = True ldda.update_parent_folder_update_times() trans.sa_session.flush() ldda_dict = ldda.to_dict() rval = trans.security.encode_dict_ids(ldda_dict) update_time = ldda.update_time.strftime("%Y-%m-%d %I:%M %p") rval['update_time'] = update_time return rval
[docs] def copy_hda_to_library_folder(self, trans, hda, library_folder, roles=None, ldda_message='', element_identifier=None): # PRECONDITION: permissions for this action on hda and library_folder have been checked roles = roles or [] # this code was extracted from library_common.add_history_datasets_to_library # TODO: refactor library_common.add_history_datasets_to_library to use this for each hda to copy # create the new ldda and apply the folder perms to it ldda = hda.to_library_dataset_dataset_association(trans, target_folder=library_folder, roles=roles, ldda_message=ldda_message, element_identifier=element_identifier) self._apply_library_folder_permissions_to_ldda(trans, library_folder, ldda) self._apply_hda_permissions_to_ldda(trans, hda, ldda) # TODO:?? not really clear on how permissions are being traded here # seems like hda -> ldda permissions should be set in to_library_dataset_dataset_association # then they get reset in _apply_library_folder_permissions_to_ldda # then finally, re-applies hda -> ldda for missing actions in _apply_hda_permissions_to_ldda?? return ldda
def _apply_library_folder_permissions_to_ldda(self, trans, library_folder, ldda): """ Copy actions/roles from library folder to an ldda (and its library_dataset). """ # PRECONDITION: permissions for this action on library_folder and ldda have been checked security_agent = trans.app.security_agent security_agent.copy_library_permissions(trans, library_folder, ldda) security_agent.copy_library_permissions(trans, library_folder, ldda.library_dataset) return security_agent.get_permissions(ldda) def _apply_hda_permissions_to_ldda(self, trans, hda, ldda): """ Copy actions/roles from hda to ldda.library_dataset (and then ldda) if ldda doesn't already have roles for the given action. """ # PRECONDITION: permissions for this action on hda and ldda have been checked # Make sure to apply any defined dataset permissions, allowing the permissions inherited from the # library_dataset to over-ride the same permissions on the dataset, if they exist. security_agent = trans.app.security_agent dataset_permissions_dict = security_agent.get_permissions(hda.dataset) library_dataset = ldda.library_dataset library_dataset_actions = [permission.action for permission in library_dataset.actions] # except that: if DATASET_MANAGE_PERMISSIONS exists in the hda.dataset permissions, # we need to instead apply those roles to the LIBRARY_MANAGE permission to the library dataset dataset_manage_permissions_action = security_agent.get_action('DATASET_MANAGE_PERMISSIONS').action library_manage_permissions_action = security_agent.get_action('LIBRARY_MANAGE').action # TODO: test this and remove if in loop below # TODO: doesn't handle action.action # if dataset_manage_permissions_action in dataset_permissions_dict: # managing_roles = dataset_permissions_dict.pop( dataset_manage_permissions_action ) # dataset_permissions_dict[ library_manage_permissions_action ] = managing_roles flush_needed = False for action, dataset_permissions_roles in dataset_permissions_dict.items(): if isinstance(action, security.Action): action = action.action # alter : DATASET_MANAGE_PERMISSIONS -> LIBRARY_MANAGE (see above) if action == dataset_manage_permissions_action: action = library_manage_permissions_action # TODO: generalize to util.update_dict_without_overwrite # add the hda actions & roles to the library_dataset # NOTE: only apply an hda perm if it's NOT set in the library_dataset perms (don't overwrite) if action not in library_dataset_actions: for role in dataset_permissions_roles: ldps = trans.model.LibraryDatasetPermissions(action, library_dataset, role) ldps = [ldps] if not isinstance(ldps, list) else ldps for ldp in ldps: trans.sa_session.add(ldp) flush_needed = True if flush_needed: trans.sa_session.flush() # finally, apply the new library_dataset to its associated ldda (must be the same) security_agent.copy_library_permissions(trans, library_dataset, ldda) return security_agent.get_permissions(ldda)
[docs]class UsesVisualizationMixin(UsesLibraryMixinItems): """ Mixin for controllers that use Visualization objects. """
[docs] def get_visualization(self, trans, id, check_ownership=True, check_accessible=False): """ Get a Visualization from the database by id, verifying ownership. """ # Load workflow from database try: visualization = trans.sa_session.query(trans.model.Visualization).get(trans.security.decode_id(id)) except TypeError: visualization = None if not visualization: error("Visualization not found") else: return self.security_check(trans, visualization, check_ownership, check_accessible)
[docs] def get_visualizations_by_user(self, trans, user, order_by=None, query_only=False): """ Return query or query results of visualizations filtered by a user. Set `order_by` to a column or list of columns to change the order returned. Defaults to `DEFAULT_ORDER_BY`. Set `query_only` to return just the query for further filtering or processing. """ # TODO: move into model (as class attr) DEFAULT_ORDER_BY = [model.Visualization.title] if not order_by: order_by = DEFAULT_ORDER_BY if not isinstance(order_by, list): order_by = [order_by] query = trans.sa_session.query(model.Visualization) query = query.filter(model.Visualization.user == user) if order_by: query = query.order_by(*order_by) if query_only: return query return query.all()
[docs] def get_visualizations_shared_with_user(self, trans, user, order_by=None, query_only=False): """ Return query or query results for visualizations shared with the given user. Set `order_by` to a column or list of columns to change the order returned. Defaults to `DEFAULT_ORDER_BY`. Set `query_only` to return just the query for further filtering or processing. """ DEFAULT_ORDER_BY = [model.Visualization.title] if not order_by: order_by = DEFAULT_ORDER_BY if not isinstance(order_by, list): order_by = [order_by] query = trans.sa_session.query(model.Visualization).join(model.VisualizationUserShareAssociation) query = query.filter(model.VisualizationUserShareAssociation.user_id == user.id) # remove duplicates when a user shares with themselves? query = query.filter(model.Visualization.user_id != user.id) if order_by: query = query.order_by(*order_by) if query_only: return query return query.all()
[docs] def get_published_visualizations(self, trans, exclude_user=None, order_by=None, query_only=False): """ Return query or query results for published visualizations optionally excluding the user in `exclude_user`. Set `order_by` to a column or list of columns to change the order returned. Defaults to `DEFAULT_ORDER_BY`. Set `query_only` to return just the query for further filtering or processing. """ DEFAULT_ORDER_BY = [model.Visualization.title] if not order_by: order_by = DEFAULT_ORDER_BY if not isinstance(order_by, list): order_by = [order_by] query = trans.sa_session.query(model.Visualization) query = query.filter(model.Visualization.published == true()) if exclude_user: query = query.filter(model.Visualization.user != exclude_user) if order_by: query = query.order_by(*order_by) if query_only: return query return query.all()
# TODO: move into model (to_dict)
[docs] def get_visualization_summary_dict(self, visualization): """ Return a set of summary attributes for a visualization in dictionary form. NOTE: that encoding ids isn't done here should happen at the caller level. """ # TODO: deleted # TODO: importable return { 'id' : visualization.id, 'title' : visualization.title, 'type' : visualization.type, 'dbkey' : visualization.dbkey, }
[docs] def get_visualization_dict(self, visualization): """ Return a set of detailed attributes for a visualization in dictionary form. The visualization's latest_revision is returned in its own sub-dictionary. NOTE: that encoding ids isn't done here should happen at the caller level. """ return { 'model_class': 'Visualization', 'id' : visualization.id, 'title' : visualization.title, 'type' : visualization.type, 'user_id' : visualization.user.id, 'dbkey' : visualization.dbkey, 'slug' : visualization.slug, # to_dict only the latest revision (allow older to be fetched elsewhere) 'latest_revision' : self.get_visualization_revision_dict(visualization.latest_revision), 'revisions' : [r.id for r in visualization.revisions], }
[docs] def get_visualization_revision_dict(self, revision): """ Return a set of detailed attributes for a visualization in dictionary form. NOTE: that encoding ids isn't done here should happen at the caller level. """ return { 'model_class' : 'VisualizationRevision', 'id' : revision.id, 'visualization_id' : revision.visualization.id, 'title' : revision.title, 'dbkey' : revision.dbkey, 'config' : revision.config, }
[docs] def import_visualization(self, trans, id, user=None): """ Copy the visualization with the given id and associate the copy with the given user (defaults to trans.user). Raises `ItemAccessibilityException` if `user` is not passed and the current user is anonymous, and if the visualization is not `importable`. Raises `ItemDeletionException` if the visualization has been deleted. """ # default to trans.user, error if anon if not user: if not trans.user: raise exceptions.ItemAccessibilityException("You must be logged in to import Galaxy visualizations") user = trans.user # check accessibility visualization = self.get_visualization(trans, id, check_ownership=False) if not visualization.importable: raise exceptions.ItemAccessibilityException("The owner of this visualization has disabled imports via this link.") if visualization.deleted: raise exceptions.ItemDeletionException("You can't import this visualization because it has been deleted.") # copy vis and alter title # TODO: need to handle custom db keys. imported_visualization = visualization.copy(user=user, title="imported: " + visualization.title) trans.sa_session.add(imported_visualization) trans.sa_session.flush() return imported_visualization
[docs] def create_visualization(self, trans, type, title="Untitled Visualization", slug=None, dbkey=None, annotation=None, config={}, save=True): """ Create visualiation and first revision. """ visualization = self._create_visualization(trans, title, type, dbkey, slug, annotation, save) # TODO: handle this error structure better either in _create or here if isinstance(visualization, dict): err_dict = visualization raise ValueError(err_dict['title_err'] or err_dict['slug_err']) # Create and save first visualization revision revision = trans.model.VisualizationRevision(visualization=visualization, title=title, config=config, dbkey=dbkey) visualization.latest_revision = revision if save: session = trans.sa_session session.add(revision) session.flush() return visualization
[docs] def add_visualization_revision(self, trans, visualization, config, title, dbkey): """ Adds a new `VisualizationRevision` to the given `visualization` with the given parameters and set its parent visualization's `latest_revision` to the new revision. """ # precondition: only add new revision on owned vis's # TODO:?? should we default title, dbkey, config? to which: visualization or latest_revision? revision = trans.model.VisualizationRevision(visualization, title, dbkey, config) visualization.latest_revision = revision # TODO:?? does this automatically add revision to visualzation.revisions? trans.sa_session.add(revision) trans.sa_session.flush() return revision
[docs] def save_visualization(self, trans, config, type, id=None, title=None, dbkey=None, slug=None, annotation=None): session = trans.sa_session # Create/get visualization. if not id: # Create new visualization. vis = self._create_visualization(trans, title, type, dbkey, slug, annotation) else: decoded_id = trans.security.decode_id(id) vis = session.query(trans.model.Visualization).get(decoded_id) # TODO: security check? # Create new VisualizationRevision that will be attached to the viz vis_rev = trans.model.VisualizationRevision() vis_rev.visualization = vis # do NOT alter the dbkey vis_rev.dbkey = vis.dbkey # do alter the title and config vis_rev.title = title # -- Validate config. -- if vis.type == 'trackster': def unpack_track(track_dict): """ Unpack a track from its json. """ dataset_dict = track_dict['dataset'] return { "dataset_id": trans.security.decode_id(dataset_dict['id']), "hda_ldda": dataset_dict.get('hda_ldda', 'hda'), "track_type": track_dict['track_type'], "prefs": track_dict['prefs'], "mode": track_dict['mode'], "filters": track_dict['filters'], "tool_state": track_dict['tool_state'] } def unpack_collection(collection_json): """ Unpack a collection from its json. """ unpacked_drawables = [] drawables = collection_json['drawables'] for drawable_json in drawables: if 'track_type' in drawable_json: drawable = unpack_track(drawable_json) else: drawable = unpack_collection(drawable_json) unpacked_drawables.append(drawable) return { "obj_type": collection_json['obj_type'], "drawables": unpacked_drawables, "prefs": collection_json.get('prefs', []), "filters": collection_json.get('filters', None) } # TODO: unpack and validate bookmarks: def unpack_bookmarks(bookmarks_json): return bookmarks_json # Unpack and validate view content. view_content = unpack_collection(config['view']) bookmarks = unpack_bookmarks(config['bookmarks']) vis_rev.config = {"view": view_content, "bookmarks": bookmarks} # Viewport from payload viewport = config.get('viewport') if viewport: chrom = viewport['chrom'] start = viewport['start'] end = viewport['end'] overview = viewport['overview'] vis_rev.config["viewport"] = {'chrom': chrom, 'start': start, 'end': end, 'overview': overview} else: # Default action is to save the config as is with no validation. vis_rev.config = config vis.latest_revision = vis_rev session.add(vis_rev) session.flush() encoded_id = trans.security.encode_id(vis.id) return {"vis_id": encoded_id, "url": url_for(controller='visualization', action=vis.type, id=encoded_id)}
[docs] def get_tool_def(self, trans, hda): """ Returns definition of an interactive tool for an HDA. """ # Get dataset's job. job = None for job_output_assoc in hda.creating_job_associations: job = job_output_assoc.job break if not job: return None tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version) if not tool: return None # Tool must have a Trackster configuration. if not tool.trackster_conf: return None # -- Get tool definition and add input values from job. -- tool_dict = tool.to_dict(trans, io_details=True) tool_param_values = {p.name: p.value for p in job.parameters} tool_param_values = tool.params_from_strings(tool_param_values, trans.app, ignore_errors=True) # Only get values for simple inputs for now. inputs_dict = [i for i in tool_dict['inputs'] if i['type'] not in ['data', 'hidden_data', 'conditional']] for t_input in inputs_dict: # Add value to tool. if 'name' in t_input: name = t_input['name'] if name in tool_param_values: value = tool_param_values[name] if isinstance(value, Dictifiable): value = value.to_dict() t_input['value'] = value return tool_dict
[docs] def get_visualization_config(self, trans, visualization): """ Returns a visualization's configuration. Only works for trackster visualizations right now. """ config = None if visualization.type in ['trackster', 'genome']: # Unpack Trackster config. latest_revision = visualization.latest_revision bookmarks = latest_revision.config.get('bookmarks', []) def pack_track(track_dict): dataset_id = track_dict['dataset_id'] hda_ldda = track_dict.get('hda_ldda', 'hda') dataset_id = trans.security.encode_id(dataset_id) dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id) try: prefs = track_dict['prefs'] except KeyError: prefs = {} track_data_provider = trans.app.data_provider_registry.get_data_provider(trans, original_dataset=dataset, source='data') return { "track_type": dataset.datatype.track_type, "dataset": trans.security.encode_dict_ids(dataset.to_dict()), "prefs": prefs, "mode": track_dict.get('mode', 'Auto'), "filters": track_dict.get('filters', {'filters' : track_data_provider.get_filters()}), "tool": self.get_tool_def(trans, dataset), "tool_state": track_dict.get('tool_state', {}) } def pack_collection(collection_dict): drawables = [] for drawable_dict in collection_dict['drawables']: if 'track_type' in drawable_dict: drawables.append(pack_track(drawable_dict)) else: drawables.append(pack_collection(drawable_dict)) return { 'obj_type': collection_dict['obj_type'], 'drawables': drawables, 'prefs': collection_dict.get('prefs', []), 'filters': collection_dict.get('filters', {}) } def encode_dbkey(dbkey): """ Encodes dbkey as needed. For now, prepends user's public name to custom dbkey keys. """ encoded_dbkey = dbkey user = visualization.user if 'dbkeys' in user.preferences and str(dbkey) in user.preferences['dbkeys']: encoded_dbkey = "{}:{}".format(user.username, dbkey) return encoded_dbkey # Set tracks. tracks = [] if 'tracks' in latest_revision.config: # Legacy code. for track_dict in visualization.latest_revision.config['tracks']: tracks.append(pack_track(track_dict)) elif 'view' in latest_revision.config: for drawable_dict in visualization.latest_revision.config['view']['drawables']: if 'track_type' in drawable_dict: tracks.append(pack_track(drawable_dict)) else: tracks.append(pack_collection(drawable_dict)) config = {"title": visualization.title, "vis_id": trans.security.encode_id(visualization.id) if visualization.id is not None else None, "tracks": tracks, "bookmarks": bookmarks, "chrom": "", "dbkey": encode_dbkey(visualization.dbkey)} if 'viewport' in latest_revision.config: config['viewport'] = latest_revision.config['viewport'] else: # Default action is to return config unaltered. latest_revision = visualization.latest_revision config = latest_revision.config return config
[docs] def get_new_track_config(self, trans, dataset): """ Returns track configuration dict for a dataset. """ # Get data provider. track_data_provider = trans.app.data_provider_registry.get_data_provider(trans, original_dataset=dataset) # Get track definition. return { "track_type": dataset.datatype.track_type, "name": dataset.name, "dataset": trans.security.encode_dict_ids(dataset.to_dict()), "prefs": {}, "filters": {'filters' : track_data_provider.get_filters()}, "tool": self.get_tool_def(trans, dataset), "tool_state": {} }
[docs] def get_hda_or_ldda(self, trans, hda_ldda, dataset_id): """ Returns either HDA or LDDA for hda/ldda and id combination. """ if hda_ldda == "hda": return self.get_hda(trans, dataset_id, check_ownership=False, check_accessible=True) else: return self.get_library_dataset_dataset_association(trans, dataset_id)
[docs] def get_hda(self, trans, dataset_id, check_ownership=True, check_accessible=False, check_state=True): """ Get an HDA object by id performing security checks using the current transaction. Deprecated in lieu to galaxy.managers.hdas.HDAManager.get_accessible(decoded_id, user) """ try: dataset_id = trans.security.decode_id(dataset_id) except (AttributeError, TypeError): raise HTTPBadRequest("Invalid dataset id: %s." % str(dataset_id)) try: data = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(int(dataset_id)) except Exception: raise HTTPBadRequest("Invalid dataset id: %s." % str(dataset_id)) if not data: raise HTTPBadRequest("Invalid dataset id: %s." % str(dataset_id)) if check_ownership: # Verify ownership. user = trans.get_user() if not user: error("Must be logged in to manage Galaxy items") if data.history.user != user: error("%s is not owned by current user" % data.__class__.__name__) if check_accessible: current_user_roles = trans.get_current_user_roles() if not trans.app.security_agent.can_access_dataset(current_user_roles, data.dataset): error("You are not allowed to access this dataset") if check_state and data.state == trans.model.Dataset.states.UPLOAD: return trans.show_error_message("Please wait until this dataset finishes uploading " + "before attempting to view it.") return data
# -- Helper functions -- def _create_visualization(self, trans, title, type, dbkey=None, slug=None, annotation=None, save=True): """ Create visualization but not first revision. Returns Visualization object. """ user = trans.get_user() # Error checking. title_err = slug_err = "" if not title: title_err = "visualization name is required" elif slug and not managers_base.is_valid_slug(slug): slug_err = "visualization identifier must consist of only lowercase letters, numbers, and the '-' character" elif slug and trans.sa_session.query(trans.model.Visualization).filter_by(user=user, slug=slug, deleted=False).first(): slug_err = "visualization identifier must be unique" if title_err or slug_err: return {'title_err': title_err, 'slug_err': slug_err} # Create visualization visualization = trans.model.Visualization(user=user, title=title, dbkey=dbkey, type=type) if slug: visualization.slug = slug else: self.create_item_slug(trans.sa_session, visualization) if annotation: annotation = sanitize_html(annotation) # TODO: if this is to stay in the mixin, UsesAnnotations should be added to the superclasses # right now this is depending on the classes that include this mixin to have UsesAnnotations self.add_item_annotation(trans.sa_session, trans.user, visualization, annotation) if save: session = trans.sa_session session.add(visualization) session.flush() return visualization def _get_genome_data(self, trans, dataset, dbkey=None): """ Returns genome-wide data for dataset if available; if not, message is returned. """ rval = None # Get data sources. data_sources = dataset.get_datasources(trans) query_dbkey = dataset.dbkey if query_dbkey == "?": query_dbkey = dbkey chroms_info = self.app.genomes.chroms(trans, dbkey=query_dbkey) # If there are no messages (messages indicate data is not ready/available), get data. messages_list = [data_source_dict['message'] for data_source_dict in data_sources.values()] message = self._get_highest_priority_msg(messages_list) if message: rval = message else: # HACK: chromatin interactions tracks use data as source. source = 'index' if isinstance(dataset.datatype, ChromatinInteractions): source = 'data' data_provider = trans.app.data_provider_registry.get_data_provider(trans, original_dataset=dataset, source=source) # HACK: pass in additional params which are used for only some # types of data providers; level, cutoffs used for summary tree, # num_samples for BBI, and interchromosomal used for chromatin interactions. rval = data_provider.get_genome_data(chroms_info, level=4, detail_cutoff=0, draw_cutoff=0, num_samples=150, interchromosomal=True) return rval # FIXME: this method probably belongs down in the model.Dataset class. def _get_highest_priority_msg(self, message_list): """ Returns highest priority message from a list of messages. """ return_message = None # For now, priority is: job error (dict), no converter, pending. for message in message_list: if message is not None: if isinstance(message, dict): return_message = message break elif message == "no converter": return_message = message elif return_message is None and message == "pending": return_message = message return return_message
[docs]class UsesStoredWorkflowMixin(SharableItemSecurityMixin, UsesAnnotations): """ Mixin for controllers that use StoredWorkflow objects. """
[docs] def get_stored_workflow(self, trans, id, check_ownership=True, check_accessible=False): """ Get a StoredWorkflow from the database by id, verifying ownership. """ # Load workflow from database workflow_contents_manager = workflows.WorkflowsManager(self.app) workflow = workflow_contents_manager.get_stored_workflow(trans=trans, workflow_id=id) if not workflow: error("Workflow not found") else: self.security_check(trans, workflow, check_ownership, check_accessible) # Older workflows may be missing slugs, so set them here. if not workflow.slug: self.create_item_slug(trans.sa_session, workflow) trans.sa_session.flush() return workflow
[docs] def get_stored_workflow_steps(self, trans, stored_workflow): """ Restores states for a stored workflow's steps. """ module_injector = WorkflowModuleInjector(trans) for step in stored_workflow.latest_workflow.steps: try: module_injector.inject(step, exact_tools=False) except exceptions.ToolMissingException: pass
def _import_shared_workflow(self, trans, stored): """ Imports a shared workflow """ # Copy workflow. imported_stored = model.StoredWorkflow() imported_stored.name = "imported: " + stored.name workflow = stored.latest_workflow.copy(user=trans.user) workflow.stored_workflow = imported_stored imported_stored.latest_workflow = workflow imported_stored.user = trans.user # Save new workflow. session = trans.sa_session session.add(imported_stored) session.flush() # Copy annotations. self.copy_item_annotation(session, stored.user, stored, imported_stored.user, imported_stored) for order_index, step in enumerate(stored.latest_workflow.steps): self.copy_item_annotation(session, stored.user, step, imported_stored.user, imported_stored.latest_workflow.steps[order_index]) session.flush() return imported_stored def _workflow_from_dict(self, trans, data, source=None, add_to_menu=False, publish=False, exact_tools=True, fill_defaults=False, from_tool_form=False): """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ # TODO: replace this method with direct access to manager. workflow_contents_manager = workflows.WorkflowContentsManager(self.app) raw_workflow_description = workflow_contents_manager.ensure_raw_description(data) created_workflow = workflow_contents_manager.build_workflow_from_raw_description( trans, raw_workflow_description, source=source, add_to_menu=add_to_menu, publish=publish, exact_tools=exact_tools, fill_defaults=fill_defaults, from_tool_form=from_tool_form, ) return created_workflow.stored_workflow, created_workflow.missing_tools def _workflow_to_dict(self, trans, stored): """ Converts a workflow to a dict of attributes suitable for exporting. """ workflow_contents_manager = workflows.WorkflowContentsManager(self.app) return workflow_contents_manager.workflow_to_dict( trans, stored, )
[docs]class UsesFormDefinitionsMixin: """Mixin for controllers that use Galaxy form objects."""
[docs] def get_all_forms(self, trans, all_versions=False, filter=None, form_type='All'): """ Return all the latest forms from the form_definition_current table if all_versions is set to True. Otherwise return all the versions of all the forms from the form_definition table. """ if all_versions: return trans.sa_session.query(trans.app.model.FormDefinition) if filter: fdc_list = trans.sa_session.query(trans.app.model.FormDefinitionCurrent).filter_by(**filter) else: fdc_list = trans.sa_session.query(trans.app.model.FormDefinitionCurrent) if form_type == 'All': return [fdc.latest_form for fdc in fdc_list] else: return [fdc.latest_form for fdc in fdc_list if fdc.latest_form.type == form_type]
[docs] def save_widget_field(self, trans, field_obj, widget_name, **kwd): # Save a form_builder field object params = util.Params(kwd) if isinstance(field_obj, trans.model.UserAddress): field_obj.desc = util.restore_text(params.get('%s_short_desc' % widget_name, '')) field_obj.name = util.restore_text(params.get('%s_name' % widget_name, '')) field_obj.institution = util.restore_text(params.get('%s_institution' % widget_name, '')) field_obj.address = util.restore_text(params.get('%s_address' % widget_name, '')) field_obj.city = util.restore_text(params.get('%s_city' % widget_name, '')) field_obj.state = util.restore_text(params.get('%s_state' % widget_name, '')) field_obj.postal_code = util.restore_text(params.get('%s_postal_code' % widget_name, '')) field_obj.country = util.restore_text(params.get('%s_country' % widget_name, '')) field_obj.phone = util.restore_text(params.get('%s_phone' % widget_name, '')) trans.sa_session.add(field_obj) trans.sa_session.flush()
[docs] def get_form_values(self, trans, user, form_definition, **kwd): ''' Returns the name:value dictionary containing all the form values ''' params = util.Params(kwd) values = {} for field in form_definition.fields: field_type = field['type'] field_name = field['name'] input_value = params.get(field_name, '') if field_type == AddressField.__name__: input_text_value = util.restore_text(input_value) if input_text_value == 'new': # Save this new address in the list of this user's addresses user_address = trans.model.UserAddress(user=user) self.save_widget_field(trans, user_address, field_name, **kwd) trans.sa_session.refresh(user) field_value = int(user_address.id) elif input_text_value in ['', 'none', 'None', None]: field_value = '' else: field_value = int(input_text_value) elif field_type == CheckboxField.__name__: field_value = CheckboxField.is_checked(input_value) elif field_type == PasswordField.__name__: field_value = kwd.get(field_name, '') else: field_value = util.restore_text(input_value) values[field_name] = field_value return values
[docs]class SharableMixin: """ Mixin for a controller that manages an item that can be shared. """ manager = None serializer = None # -- Implemented methods. -- def _is_valid_slug(self, slug): """ Returns true if slug is valid. """ return managers_base.is_valid_slug(slug)
[docs] @web.expose @web.require_login("modify Galaxy items") def set_slug_async(self, trans, id, new_slug): item = self.get_item(trans, id) if item: # Only update slug if slug is not already in use. if trans.sa_session.query(item.__class__).filter_by(user=item.user, slug=new_slug).count() == 0: item.slug = new_slug trans.sa_session.flush() return item.slug
def _make_item_accessible(self, sa_session, item): """ Makes item accessible--viewable and importable--and sets item's slug. Does not flush/commit changes, however. Item must have name, user, importable, and slug attributes. """ item.importable = True self.create_item_slug(sa_session, item)
[docs] def create_item_slug(self, sa_session, item): """ Create/set item slug. Slug is unique among user's importable items for item's class. Returns true if item's slug was set/changed; false otherwise. """ cur_slug = item.slug # Setup slug base. if cur_slug is None or cur_slug == "": # Item can have either a name or a title. if hasattr(item, 'name'): item_name = item.name elif hasattr(item, 'title'): item_name = item.title slug_base = util.ready_name_for_url(item_name.lower()) else: slug_base = cur_slug # Using slug base, find a slug that is not taken. If slug is taken, # add integer to end. new_slug = slug_base count = 1 # Ensure unique across model class and user and don't include this item # in the check in case it has previously been assigned a valid slug. while sa_session.query(item.__class__).filter(item.__class__.user == item.user, item.__class__.slug == new_slug, item.__class__.id != item.id).count() != 0: # Slug taken; choose a new slug based on count. This approach can # handle numerous items with the same name gracefully. new_slug = '%s-%i' % (slug_base, count) count += 1 # Set slug and return. item.slug = new_slug return item.slug == cur_slug
[docs] @web.legacy_expose_api def sharing(self, trans, id, payload=None, **kwd): skipped = False class_name = self.manager.model_class.__name__ item = self.get_object(trans, id, class_name, check_ownership=True, check_accessible=True, deleted=False) if payload and payload.get("action"): action = payload.get("action") if action == "make_accessible_via_link": self._make_item_accessible(trans.sa_session, item) if hasattr(item, "has_possible_members") and item.has_possible_members and payload.get("make_members_public", False): shared, skipped = self._make_members_public(trans, item) elif action == "make_accessible_and_publish": self._make_item_accessible(trans.sa_session, item) if hasattr(item, "has_possible_members") and item.has_possible_members and payload.get("make_members_public", False): shared, skipped = self._make_members_public(trans, item) item.published = True elif action == "publish": if item.importable: item.published = True if hasattr(item, "has_possible_members") and item.has_possible_members and payload.get("make_members_public", False): shared, skipped = self._make_members_public(trans, item) else: raise exceptions.MessageException("%s not importable." % class_name) elif action == "disable_link_access": item.importable = False elif action == "unpublish": item.published = False elif action == "disable_link_access_and_unpublish": item.importable = item.published = False elif action == "unshare_user": user = trans.sa_session.query(trans.app.model.User).get(self.decode_id(payload.get("user_id"))) class_name_lc = class_name.lower() ShareAssociation = getattr(trans.app.model, "%sUserShareAssociation" % class_name) usas = trans.sa_session.query(ShareAssociation).filter_by(**{"user": user, class_name_lc: item}).all() if not usas: raise exceptions.MessageException("%s was not shared with user." % class_name) for usa in usas: trans.sa_session.delete(usa) trans.sa_session.add(item) trans.sa_session.flush() if item.importable and not item.slug: self._make_item_accessible(trans.sa_session, item) item_dict = self.serializer.serialize_to_view(item, user=trans.user, trans=trans, default_view="sharing") item_dict["users_shared_with"] = [{"id": self.app.security.encode_id(a.user.id), "email": a.user.email} for a in item.users_shared_with] if skipped: item_dict["skipped"] = True return item_dict
def _make_members_public(self, trans, item): """ Make the non-purged datasets in history public Performs pemissions check. """ # TODO eventually we should handle more classes than just History skipped = False for hda in item.activatable_datasets: dataset = hda.dataset if not trans.app.security_agent.dataset_is_public(dataset): if trans.app.security_agent.can_manage_dataset(trans.user.all_roles(), dataset): try: trans.app.security_agent.make_dataset_public(hda.dataset) except Exception: log.warning("Unable to make dataset with id: %s public", dataset.id) skipped = True else: log.warning("User without permissions tried to make dataset with id: %s public", dataset.id) skipped = True return item, skipped # -- Abstract methods. --
[docs] @web.expose @web.require_login("share Galaxy items") def share(self, trans, id=None, email="", **kwd): """ Handle sharing an item with a particular user. """ raise NotImplementedError()
[docs] @web.expose def display_by_username_and_slug(self, trans, username, slug): """ Display item by username and slug. """ raise NotImplementedError()
[docs] @web.expose @web.require_login("get item content asynchronously") def get_item_content_async(self, trans, id): """ Returns item content in HTML format. """ raise NotImplementedError()
[docs] def get_item(self, trans, id): """ Return item based on id. """ raise NotImplementedError()
[docs]class UsesQuotaMixin:
[docs] def get_quota(self, trans, id, check_ownership=False, check_accessible=False, deleted=None): return self.get_object(trans, id, 'Quota', check_ownership=False, check_accessible=False, deleted=deleted)
[docs]class UsesTagsMixin(SharableItemSecurityMixin):
[docs] def get_tag_handler(self, trans): return trans.app.tag_handler
def _get_user_tags(self, trans, item_class_name, id): user = trans.user tagged_item = self._get_tagged_item(trans, item_class_name, id) return [tag for tag in tagged_item.tags if tag.user == user] def _get_tagged_item(self, trans, item_class_name, id, check_ownership=True): tagged_item = self.get_object(trans, id, item_class_name, check_ownership=check_ownership, check_accessible=True) return tagged_item def _remove_items_tag(self, trans, item_class_name, id, tag_name): """Remove a tag from an item.""" user = trans.user tagged_item = self._get_tagged_item(trans, item_class_name, id) deleted = tagged_item and self.get_tag_handler(trans).remove_item_tag(trans, user, tagged_item, tag_name) trans.sa_session.flush() return deleted def _apply_item_tag(self, trans, item_class_name, id, tag_name, tag_value=None): user = trans.user tagged_item = self._get_tagged_item(trans, item_class_name, id) tag_assoc = self.get_tag_handler(trans).apply_item_tag(user, tagged_item, tag_name, tag_value) trans.sa_session.flush() return tag_assoc def _get_item_tag_assoc(self, trans, item_class_name, id, tag_name): user = trans.user tagged_item = self._get_tagged_item(trans, item_class_name, id) log.debug("In get_item_tag_assoc with tagged_item %s" % tagged_item) return self.get_tag_handler(trans)._get_item_tag_assoc(user, tagged_item, tag_name)
[docs] def set_tags_from_list(self, trans, item, new_tags_list, user=None): tag_handler = tags.GalaxyTagHandler(trans.app.model.context) return tag_handler.set_tags_from_list(user, item, new_tags_list)
[docs] def get_user_tags_used(self, trans, user=None): """ Return a list of distinct 'user_tname:user_value' strings that the given user has used. user defaults to trans.user. Returns an empty list if no user is given and trans.user is anonymous. """ # TODO: for lack of a UsesUserMixin - placing this here - maybe into UsesTags, tho user = user or trans.user if not user: return [] # get all the taggable model TagAssociations tag_models = [v.tag_assoc_class for v in trans.app.tag_handler.item_tag_assoc_info.values()] # create a union of subqueries for each for this user - getting only the tname and user_value all_tags_query = None for tag_model in tag_models: subq = (trans.sa_session.query(tag_model.user_tname, tag_model.user_value) .filter(tag_model.user == trans.user)) all_tags_query = subq if all_tags_query is None else all_tags_query.union(subq) # if nothing init'd the query, bail if all_tags_query is None: return [] # boil the tag tuples down into a sorted list of DISTINCT name:val strings tags = all_tags_query.distinct().all() tags = [((name + ':' + val) if val else name) for name, val in tags] return sorted(tags)
[docs]class UsesExtendedMetadataMixin(SharableItemSecurityMixin): """ Mixin for getting and setting item extended metadata. """
[docs] def get_item_extended_metadata_obj(self, trans, item): """ Given an item object (such as a LibraryDatasetDatasetAssociation), find the object of the associated extended metadata """ if item.extended_metadata: return item.extended_metadata return None
[docs] def set_item_extended_metadata_obj(self, trans, item, extmeta_obj, check_writable=False): if item.__class__ == LibraryDatasetDatasetAssociation: if not check_writable or trans.app.security_agent.can_modify_library_item(trans.get_current_user_roles(), item, trans.user): item.extended_metadata = extmeta_obj trans.sa_session.flush() if item.__class__ == HistoryDatasetAssociation: history = None if check_writable: history = self.security_check(trans, item, check_ownership=True, check_accessible=True) else: history = self.security_check(trans, item, check_ownership=False, check_accessible=True) if history: item.extended_metadata = extmeta_obj trans.sa_session.flush()
[docs] def unset_item_extended_metadata_obj(self, trans, item, check_writable=False): if item.__class__ == LibraryDatasetDatasetAssociation: if not check_writable or trans.app.security_agent.can_modify_library_item(trans.get_current_user_roles(), item, trans.user): item.extended_metadata = None trans.sa_session.flush() if item.__class__ == HistoryDatasetAssociation: history = None if check_writable: history = self.security_check(trans, item, check_ownership=True, check_accessible=True) else: history = self.security_check(trans, item, check_ownership=False, check_accessible=True) if history: item.extended_metadata = None trans.sa_session.flush()
[docs] def create_extended_metadata(self, trans, extmeta): """ Create/index an extended metadata object. The returned object is not associated with any items """ ex_meta = ExtendedMetadata(extmeta) trans.sa_session.add(ex_meta) trans.sa_session.flush() for path, value in self._scan_json_block(extmeta): meta_i = ExtendedMetadataIndex(ex_meta, path, value) trans.sa_session.add(meta_i) trans.sa_session.flush() return ex_meta
[docs] def delete_extended_metadata(self, trans, item): if item.__class__ == ExtendedMetadata: trans.sa_session.delete(item) trans.sa_session.flush()
def _scan_json_block(self, meta, prefix=""): """ Scan a json style data structure, and emit all fields and their values. Example paths Data { "data" : [ 1, 2, 3 ] } Path: /data == [1,2,3] /data/[0] == 1 """ if isinstance(meta, dict): for a in meta: yield from self._scan_json_block(meta[a], prefix + "/" + a) elif isinstance(meta, list): for i, a in enumerate(meta): yield from self._scan_json_block(a, prefix + "[%d]" % (i)) else: # BUG: Everything is cast to string, which can lead to false positives # for cross type comparisions, ie "True" == True yield prefix, ("%s" % (meta)).encode("utf8", errors='replace')
[docs]class ControllerUnavailable(Exception): """ Deprecated: `BaseController` used to be available under the name `Root` """
# ---- Utility methods -------------------------------------------------------
[docs]def sort_by_attr(seq, attr): """ Sort the sequence of objects by object's attribute Arguments: seq - the list or any sequence (including immutable one) of objects to sort. attr - the name of attribute to sort by """ # Use the "Schwartzian transform" # Create the auxiliary list of tuples where every i-th tuple has form # (seq[i].attr, i, seq[i]) and sort it. The second item of tuple is needed not # only to provide stable sorting, but mainly to eliminate comparison of objects # (which can be expensive or prohibited) in case of equal attribute values. intermed = [(getattr(v, attr), i, v) for i, v in enumerate(seq)] intermed.sort() return [_[-1] for _ in intermed]