Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.controllers.tag

"""
Tags Controller: handles tagging/untagging of entities
and provides autocomplete support.
"""

from six import text_type
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import and_, func

from galaxy import web
from galaxy.web.base.controller import BaseUIController, UsesTagsMixin

import logging
log = logging.getLogger( __name__ )


[docs]class TagsController ( BaseUIController, UsesTagsMixin ):
[docs] @web.expose @web.require_login( "edit item tags" ) def get_tagging_elt_async( self, trans, item_id, item_class, elt_context="" ): """ Returns HTML for editing an item's tags. """ item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) if not item: return trans.show_error_message( "No item of class %s with id %s " % ( item_class, item_id ) ) return trans.fill_template( "/tagging_common.mako", tag_type="individual", user=trans.user, tagged_item=item, elt_context=elt_context, in_form=False, input_size="22", tag_click_fn="default_tag_click_fn", use_toggle_link=False )
[docs] @web.expose @web.require_login( "add tag to an item" ) def add_tag_async( self, trans, item_id=None, item_class=None, new_tag=None, context=None ): """ Add tag to an item. """ # Apply tag. item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user self.get_tag_handler( trans ).apply_item_tags( user, item, new_tag.encode( 'utf-8' ) ) trans.sa_session.flush() # Log. params = dict( item_id=item.id, item_class=item_class, tag=new_tag ) trans.log_action( user, text_type( "tag" ), context, params )
[docs] @web.expose @web.require_login( "remove tag from an item" ) def remove_tag_async( self, trans, item_id=None, item_class=None, tag_name=None, context=None ): """ Remove tag from an item. """ # Remove tag. item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user self.get_tag_handler( trans ).remove_item_tag( user, item, tag_name.encode( 'utf-8' ) ) trans.sa_session.flush() # Log. params = dict( item_id=item.id, item_class=item_class, tag=tag_name ) trans.log_action( user, text_type( "untag" ), context, params )
# Retag an item. All previous tags are deleted and new tags are applied.
[docs] @web.expose @web.require_login( "Apply a new set of tags to an item; previous tags are deleted." ) def retag_async( self, trans, item_id=None, item_class=None, new_tags=None ): """ Apply a new set of tags to an item; previous tags are deleted. """ # Apply tags. item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user self.get_tag_handler( trans ).delete_item_tags( user, item ) self.get_tag_handler( trans ).apply_item_tags( user, item, new_tags.encode( 'utf-8' ) ) trans.sa_session.flush()
[docs] @web.expose @web.require_login( "get autocomplete data for an item's tags" ) def tag_autocomplete_data( self, trans, q=None, limit=None, timestamp=None, item_id=None, item_class=None ): """ Get autocomplete data for an item's tags. """ # Get item, do security check, and get autocomplete data. item = None if item_id is not None: item = self._get_item( trans, item_class, trans.security.decode_id( item_id ) ) user = trans.user item_class = self.get_class( item_class ) q = '' if q is None else q q = q.encode( 'utf-8' ) if q.find( ":" ) == -1: return self._get_tag_autocomplete_names( trans, q, limit, timestamp, user, item, item_class ) else: return self._get_tag_autocomplete_values( trans, q, limit, timestamp, user, item, item_class )
def _get_tag_autocomplete_names( self, trans, q, limit, timestamp, user=None, item=None, item_class=None ): """ Returns autocomplete data for tag names ordered from most frequently used to least frequently used. """ # Get user's item tags and usage counts. # Get item's class object and item-tag association class. if item is None and item_class is None: raise RuntimeError( "Both item and item_class cannot be None" ) elif item is not None: item_class = item.__class__ item_tag_assoc_class = self.get_tag_handler( trans ).get_tag_assoc_class( item_class ) # Build select statement. cols_to_select = [ item_tag_assoc_class.table.c.tag_id, func.count( '*' ) ] from_obj = item_tag_assoc_class.table.join( item_class.table ).join( trans.app.model.Tag.table ) where_clause = and_( trans.app.model.Tag.table.c.name.like( q + "%" ), item_tag_assoc_class.table.c.user_id == user.id ) order_by = [ func.count( "*" ).desc() ] group_by = item_tag_assoc_class.table.c.tag_id # Do query and get result set. query = select( columns=cols_to_select, from_obj=from_obj, whereclause=where_clause, group_by=group_by, order_by=order_by, limit=limit ) result_set = trans.sa_session.execute( query ) # Create and return autocomplete data. ac_data = "#Header|Your Tags\n" for row in result_set: tag = self.get_tag_handler( trans ).get_tag_by_id( row[0] ) # Exclude tags that are already applied to the item. if ( item is not None ) and ( self.get_tag_handler( trans ).item_has_tag( trans.user, item, tag ) ): continue # Add tag to autocomplete data. Use the most frequent name that user # has employed for the tag. tag_names = self._get_usernames_for_tag( trans, trans.user, tag, item_class, item_tag_assoc_class ) ac_data += tag_names[0] + "|" + tag_names[0] + "\n" return ac_data def _get_tag_autocomplete_values( self, trans, q, limit, timestamp, user=None, item=None, item_class=None ): """ Returns autocomplete data for tag values ordered from most frequently used to least frequently used. """ tag_name_and_value = q.split( ":" ) tag_name = tag_name_and_value[0] tag_value = tag_name_and_value[1] tag = self.get_tag_handler( trans ).get_tag_by_name( tag_name ) # Don't autocomplete if tag doesn't exist. if tag is None: return "" # Get item's class object and item-tag association class. if item is None and item_class is None: raise RuntimeError( "Both item and item_class cannot be None" ) elif item is not None: item_class = item.__class__ item_tag_assoc_class = self.get_tag_handler( trans ).get_tag_assoc_class( item_class ) # Build select statement. cols_to_select = [ item_tag_assoc_class.table.c.value, func.count( '*' ) ] from_obj = item_tag_assoc_class.table.join( item_class.table ).join( trans.app.model.Tag.table ) where_clause = and_( item_tag_assoc_class.table.c.user_id == user.id, trans.app.model.Tag.table.c.id == tag.id, item_tag_assoc_class.table.c.value.like( tag_value + "%" ) ) order_by = [ func.count("*").desc(), item_tag_assoc_class.table.c.value ] group_by = item_tag_assoc_class.table.c.value # Do query and get result set. query = select( columns=cols_to_select, from_obj=from_obj, whereclause=where_clause, group_by=group_by, order_by=order_by, limit=limit ) result_set = trans.sa_session.execute( query ) # Create and return autocomplete data. ac_data = "#Header|Your Values for '%s'\n" % ( tag_name ) tag_uname = self._get_usernames_for_tag( trans, trans.user, tag, item_class, item_tag_assoc_class )[0] for row in result_set: ac_data += tag_uname + ":" + row[0] + "|" + row[0] + "\n" return ac_data def _get_usernames_for_tag( self, trans, user, tag, item_class, item_tag_assoc_class ): """ Returns an ordered list of the user names for a tag; list is ordered from most popular to least popular name. """ # Build select stmt. cols_to_select = [ item_tag_assoc_class.table.c.user_tname, func.count( '*' ) ] where_clause = and_( item_tag_assoc_class.table.c.user_id == user.id, item_tag_assoc_class.table.c.tag_id == tag.id ) group_by = item_tag_assoc_class.table.c.user_tname order_by = [ func.count( "*" ).desc() ] # Do query and get result set. query = select( columns=cols_to_select, whereclause=where_clause, group_by=group_by, order_by=order_by ) result_set = trans.sa_session.execute( query ) user_tag_names = list() for row in result_set: user_tag_names.append( row[0] ) return user_tag_names def _get_item( self, trans, item_class_name, id ): """ Get an item based on type and id. """ item_class = self.get_tag_handler( trans ).item_tag_assoc_info[item_class_name].item_class item = trans.sa_session.query( item_class ).filter( item_class.id == id)[0] return item