Source code for galaxy.managers.tags

import logging
import re

from six import string_types
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import func

import galaxy.model
from galaxy.util import (
    strip_control_characters,
    unicodify,
)

log = logging.getLogger(__name__)


# Item-specific information needed to perform tagging.
[docs]class ItemTagAssocInfo(object):
[docs] def __init__(self, item_class, tag_assoc_class, item_id_col): self.item_class = item_class self.tag_assoc_class = tag_assoc_class self.item_id_col = item_id_col
[docs]class TagManager(object): """ Manages CRUD operations related to tagging objects. """
[docs] def __init__(self, sa_session): self.sa_session = sa_session # Minimum tag length. self.min_tag_len = 2 # Maximum tag length. self.max_tag_len = 255 # Tag separator. self.tag_separators = ',;' # Hierarchy separator. self.hierarchy_separator = '.' # Key-value separator. self.key_value_separators = "=:" # Initialize with known classes - add to this in subclasses. self.item_tag_assoc_info = {}
[docs] def add_tags_from_list(self, user, item, new_tags_list): new_tags_set = set(new_tags_list) if item.tags: new_tags_set.update(self.get_tags_str(item.tags).split(',')) return self.set_tags_from_list(user, item, new_tags_set)
[docs] def remove_tags_from_list(self, user, item, tag_to_remove_list): tag_to_remove_set = set(tag_to_remove_list) tags_set = set(self.get_tags_str(item.tags).split(',')) if item.tags: tags_set -= tag_to_remove_set return self.set_tags_from_list(user, item, tags_set)
[docs] def set_tags_from_list(self, user, item, new_tags_list): # precondition: item is already security checked against user # precondition: incoming tags is a list of sanitized/formatted strings self.delete_item_tags(user, item) new_tags_str = ','.join(new_tags_list) self.apply_item_tags(user, item, unicodify(new_tags_str, 'utf-8')) self.sa_session.flush() return item.tags
[docs] def get_tag_assoc_class(self, item_class): """Returns tag association class for item class.""" return self.item_tag_assoc_info[item_class.__name__].tag_assoc_class
[docs] def get_id_col_in_item_tag_assoc_table(self, item_class): """Returns item id column in class' item-tag association table.""" return self.item_tag_assoc_info[item_class.__name__].item_id_col
[docs] def get_community_tags(self, item=None, limit=None): """Returns community tags for an item.""" # Get item-tag association class. item_class = item.__class__ item_tag_assoc_class = self.get_tag_assoc_class(item_class) if not item_tag_assoc_class: return [] # Build select statement. cols_to_select = [item_tag_assoc_class.table.c.tag_id, func.count('*')] from_obj = item_tag_assoc_class.table.join(item_class.table).join(galaxy.model.Tag.table) where_clause = (self.get_id_col_in_item_tag_assoc_table(item_class) == item.id) order_by = [func.count("*").desc()] group_by = item_tag_assoc_class.table.c.tag_id # Do query and get result set. query = select(columns=cols_to_select, from_obj=from_obj, whereclause=where_clause, group_by=group_by, order_by=order_by, limit=limit) result_set = self.sa_session.execute(query) # Return community tags. community_tags = [] for row in result_set: tag_id = row[0] community_tags.append(self.get_tag_by_id(tag_id)) return community_tags
[docs] def get_tool_tags(self): query = select(columns=[galaxy.model.ToolTagAssociation.table.c.tag_id], from_obj=galaxy.model.ToolTagAssociation.table).distinct() result_set = self.sa_session.execute(query) tags = [] for row in result_set: tag_id = row[0] tags.append(self.get_tag_by_id(tag_id)) return tags
[docs] def remove_item_tag(self, user, item, tag_name): """Remove a tag from an item.""" # Get item tag association. item_tag_assoc = self._get_item_tag_assoc(user, item, tag_name) # Remove association. if item_tag_assoc: # Delete association. self.sa_session.delete(item_tag_assoc) item.tags.remove(item_tag_assoc) return True return False
[docs] def delete_item_tags(self, user, item): """Delete tags from an item.""" # Delete item-tag associations. for tag in item.tags: self.sa_session.delete(tag) # Delete tags from item. del item.tags[:]
[docs] def item_has_tag(self, user, item, tag): """Returns true if item is has a given tag.""" # Get tag name. if isinstance(tag, string_types): tag_name = tag elif isinstance(tag, galaxy.model.Tag): tag_name = tag.name elif isinstance(tag, galaxy.model.ItemTagAssociation): tag_name = tag.user_tname # Check for an item-tag association to see if item has a given tag. item_tag_assoc = self._get_item_tag_assoc(user, item, tag_name) if item_tag_assoc: return True return False
[docs] def apply_item_tag(self, user, item, name, value=None): # Use lowercase name for searching/creating tag. lc_name = name.lower() # Get or create item-tag association. item_tag_assoc = self._get_item_tag_assoc(user, item, lc_name) # If the association does not exist, or if it has a different value, add another. # We do allow multiple associations with different values. if not item_tag_assoc or (item_tag_assoc and item_tag_assoc.value != value): # Create item-tag association. # Create tag; if None, skip the tag (and log error). tag = self._get_or_create_tag(lc_name) if not tag: log.warning("Failed to create tag with name %s" % lc_name) return # Create tag association based on item class. item_tag_assoc_class = self.get_tag_assoc_class(item.__class__) item_tag_assoc = item_tag_assoc_class() # Add tag to association. item.tags.append(item_tag_assoc) item_tag_assoc.tag = tag item_tag_assoc.user = user # Apply attributes to item-tag association. Strip whitespace from user name and tag. lc_value = None if value: lc_value = value.lower() item_tag_assoc.user_tname = name item_tag_assoc.user_value = value item_tag_assoc.value = lc_value # Need to flush to get an ID. We need an ID to apply multiple tags with the same tname to an object. self.sa_session.flush() return item_tag_assoc
[docs] def apply_item_tags(self, user, item, tags_str): """Apply tags to an item.""" # Parse tags. parsed_tags = self.parse_tags(tags_str) # Apply each tag. for name, value in parsed_tags: self.apply_item_tag(user, item, name, value)
[docs] def get_tags_str(self, tags): """Build a string from an item's tags.""" # Return empty string if there are no tags. if not tags: return "" # Create string of tags. tags_str_list = list() for tag in tags: tag_str = tag.user_tname if tag.value is not None: tag_str += ":" + tag.user_value tags_str_list.append(tag_str) return ", ".join(tags_str_list)
[docs] def get_tag_by_id(self, tag_id): """Get a Tag object from a tag id.""" return self.sa_session.query(galaxy.model.Tag).filter_by(id=tag_id).first()
[docs] def get_tag_by_name(self, tag_name): """Get a Tag object from a tag name (string).""" if tag_name: return self.sa_session.query(galaxy.model.Tag).filter_by(name=tag_name.lower()).first() return None
def _create_tag(self, tag_str): """Create a Tag object from a tag string.""" tag_hierarchy = tag_str.split(self.hierarchy_separator) tag_prefix = "" parent_tag = None for sub_tag in tag_hierarchy: # Get or create subtag. tag_name = tag_prefix + self._scrub_tag_name(sub_tag) tag = self.sa_session.query(galaxy.model.Tag).filter_by(name=tag_name).first() if not tag: tag = galaxy.model.Tag(type=0, name=tag_name) # Set tag parent. tag.parent = parent_tag # Update parent and tag prefix. parent_tag = tag tag_prefix = tag.name + self.hierarchy_separator return tag def _get_or_create_tag(self, tag_str): """Get or create a Tag object from a tag string.""" # Scrub tag; if tag is None after being scrubbed, return None. scrubbed_tag_str = self._scrub_tag_name(tag_str) if not scrubbed_tag_str: return None # Get item tag. tag = self.get_tag_by_name(scrubbed_tag_str) # Create tag if necessary. if tag is None: tag = self._create_tag(scrubbed_tag_str) return tag def _get_item_tag_assoc(self, user, item, tag_name): """ Return ItemTagAssociation object for a user, item, and tag string; returns None if there is no such association. """ scrubbed_tag_name = self._scrub_tag_name(tag_name) for item_tag_assoc in item.tags: if (item_tag_assoc.user == user) and (item_tag_assoc.user_tname == scrubbed_tag_name): return item_tag_assoc return None
[docs] def parse_tags(self, tag_str): """ Returns a list of raw (tag-name, value) pairs derived from a string; method scrubs tag names and values as well. Return value is a list of (tag_name, tag_value) tuples. """ # Gracefully handle None. if not tag_str: return dict() # Strip unicode control characters tag_str = strip_control_characters(tag_str) # Split tags based on separators. reg_exp = re.compile('[' + self.tag_separators + ']') raw_tags = reg_exp.split(tag_str) # Extract name-value pairs. name_value_pairs = [] for raw_tag in raw_tags: nv_pair = self._get_name_value_pair(raw_tag) scrubbed_name = self._scrub_tag_name(nv_pair[0]) scrubbed_value = self._scrub_tag_value(nv_pair[1]) # Append tag_name, tag_value tuple -- TODO use NamedTuple name_value_pairs.append((scrubbed_name, scrubbed_value)) return name_value_pairs
def _scrub_tag_value(self, value): """Scrub a tag value.""" # Gracefully handle None: if not value: return None # Remove whitespace from value. reg_exp = re.compile(r'\s') scrubbed_value = re.sub(reg_exp, "", value) return scrubbed_value def _scrub_tag_name(self, name): """Scrub a tag name.""" # Gracefully handle None: if not name: return None # Remove whitespace from name. reg_exp = re.compile(r'\s') scrubbed_name = re.sub(reg_exp, "", name) # Ignore starting ':' char. if scrubbed_name.startswith(self.hierarchy_separator): scrubbed_name = scrubbed_name[1:] # If name is too short or too long, return None. if len(scrubbed_name) < self.min_tag_len or len(scrubbed_name) > self.max_tag_len: return None return scrubbed_name def _scrub_tag_name_list(self, tag_name_list): """Scrub a tag name list.""" scrubbed_tag_list = list() for tag in tag_name_list: scrubbed_tag_list.append(self._scrub_tag_name(tag)) return scrubbed_tag_list def _get_name_value_pair(self, tag_str): """Get name, value pair from a tag string.""" # Use regular expression to parse name, value. reg_exp = re.compile("[" + self.key_value_separators + "]") name_value_pair = reg_exp.split(tag_str, 1) # Add empty slot if tag does not have value. if len(name_value_pair) < 2: name_value_pair.append(None) return name_value_pair
[docs]class GalaxyTagManager(TagManager):
[docs] def __init__(self, sa_session): from galaxy import model TagManager.__init__(self, sa_session) self.item_tag_assoc_info["History"] = ItemTagAssocInfo(model.History, model.HistoryTagAssociation, model.HistoryTagAssociation.table.c.history_id) self.item_tag_assoc_info["HistoryDatasetAssociation"] = \ ItemTagAssocInfo(model.HistoryDatasetAssociation, model.HistoryDatasetAssociationTagAssociation, model.HistoryDatasetAssociationTagAssociation.table.c.history_dataset_association_id) self.item_tag_assoc_info["HistoryDatasetCollectionAssociation"] = \ ItemTagAssocInfo(model.HistoryDatasetCollectionAssociation, model.HistoryDatasetCollectionTagAssociation, model.HistoryDatasetCollectionTagAssociation.table.c.history_dataset_collection_id) self.item_tag_assoc_info["LibraryDatasetDatasetAssociation"] = \ ItemTagAssocInfo(model.LibraryDatasetDatasetAssociation, model.LibraryDatasetDatasetAssociationTagAssociation, model.LibraryDatasetDatasetAssociationTagAssociation.table.c.library_dataset_dataset_association_id) self.item_tag_assoc_info["Page"] = ItemTagAssocInfo(model.Page, model.PageTagAssociation, model.PageTagAssociation.table.c.page_id) self.item_tag_assoc_info["StoredWorkflow"] = ItemTagAssocInfo(model.StoredWorkflow, model.StoredWorkflowTagAssociation, model.StoredWorkflowTagAssociation.table.c.stored_workflow_id) self.item_tag_assoc_info["Visualization"] = ItemTagAssocInfo(model.Visualization, model.VisualizationTagAssociation, model.VisualizationTagAssociation.table.c.visualization_id)
[docs]class CommunityTagManager(TagManager):
[docs] def __init__(self, sa_session): TagManager.__init__(self, sa_session)