Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.tags
import logging
import re
from typing import Dict
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import func
import galaxy.model
from galaxy.util import (
strip_control_characters,
unicodify,
)
log = logging.getLogger(__name__)
# Item-specific information needed to perform tagging.
[docs]class ItemTagAssocInfo:
[docs] def __init__(self, item_class, tag_assoc_class, item_id_col):
self.item_class = item_class
self.tag_assoc_class = tag_assoc_class
self.item_id_col = item_id_col
[docs]class TagHandler:
"""
Manages CRUD operations related to tagging objects.
"""
[docs] def __init__(self, sa_session: scoped_session) -> None:
self.sa_session = sa_session
# Minimum tag length.
self.min_tag_len = 1
# Maximum tag length.
self.max_tag_len = 255
# Tag separator.
self.tag_separators = ',;'
# Hierarchy separator.
self.hierarchy_separator = '.'
# Key-value separator.
self.key_value_separators = "=:"
# Initialize with known classes - add to this in subclasses.
self.item_tag_assoc_info: Dict[str, ItemTagAssocInfo] = {}
[docs] def create_tag_handler_session(self):
# Creates a transient tag handler that avoids repeated flushes
return GalaxyTagHandlerSession(self.sa_session)
[docs] def add_tags_from_list(self, user, item, new_tags_list, flush=True):
new_tags_set = set(new_tags_list)
if item.tags:
new_tags_set.update(self.get_tags_str(item.tags).split(','))
return self.set_tags_from_list(user, item, new_tags_set, flush=flush)
[docs] def remove_tags_from_list(self, user, item, tag_to_remove_list, flush=True):
tag_to_remove_set = set(tag_to_remove_list)
tags_set = {_.strip() for _ in self.get_tags_str(item.tags).split(',')}
if item.tags:
tags_set -= tag_to_remove_set
return self.set_tags_from_list(user, item, tags_set, flush=flush)
[docs] def set_tags_from_list(self, user, item, new_tags_list, flush=True):
# precondition: item is already security checked against user
# precondition: incoming tags is a list of sanitized/formatted strings
self.delete_item_tags(user, item)
new_tags_str = ','.join(new_tags_list)
self.apply_item_tags(user, item, unicodify(new_tags_str, 'utf-8'), flush=flush)
if flush:
self.sa_session.flush()
return item.tags
[docs] def get_tag_assoc_class(self, item_class):
"""Returns tag association class for item class."""
return self.item_tag_assoc_info[item_class.__name__].tag_assoc_class
[docs] def get_id_col_in_item_tag_assoc_table(self, item_class):
"""Returns item id column in class' item-tag association table."""
return self.item_tag_assoc_info[item_class.__name__].item_id_col
[docs] def get_community_tags(self, item=None, limit=None):
"""Returns community tags for an item."""
# Get item-tag association class.
item_class = item.__class__
item_tag_assoc_class = self.get_tag_assoc_class(item_class)
if not item_tag_assoc_class:
return []
# Build select statement.
cols_to_select = [item_tag_assoc_class.table.c.tag_id, func.count('*')]
from_obj = item_tag_assoc_class.table.join(item_class.table).join(galaxy.model.Tag.table)
where_clause = (self.get_id_col_in_item_tag_assoc_table(item_class) == item.id)
order_by = [func.count("*").desc()]
group_by = item_tag_assoc_class.table.c.tag_id
# Do query and get result set.
query = select(columns=cols_to_select,
from_obj=from_obj,
whereclause=where_clause,
group_by=group_by,
order_by=order_by,
limit=limit)
result_set = self.sa_session.execute(query)
# Return community tags.
community_tags = []
for row in result_set:
tag_id = row[0]
community_tags.append(self.get_tag_by_id(tag_id))
return community_tags
[docs] def get_tool_tags(self):
query = select(columns=[galaxy.model.ToolTagAssociation.table.c.tag_id],
from_obj=galaxy.model.ToolTagAssociation.table).distinct()
result_set = self.sa_session.execute(query)
tags = []
for row in result_set:
tag_id = row[0]
tags.append(self.get_tag_by_id(tag_id))
return tags
[docs] def remove_item_tag(self, user, item, tag_name):
"""Remove a tag from an item."""
# Get item tag association.
item_tag_assoc = self._get_item_tag_assoc(user, item, tag_name)
# Remove association.
if item_tag_assoc:
# Delete association.
self.sa_session.delete(item_tag_assoc)
item.tags.remove(item_tag_assoc)
return True
return False
[docs] def delete_item_tags(self, user, item):
"""Delete tags from an item."""
# Delete item-tag associations.
for tag in item.tags:
if tag.id:
# Only can and need to delete tag if tag is persisted
self.sa_session.delete(tag)
# Delete tags from item.
del item.tags[:]
[docs] def item_has_tag(self, user, item, tag):
"""Returns true if item is has a given tag."""
# Get tag name.
if isinstance(tag, str):
tag_name = tag
elif isinstance(tag, galaxy.model.Tag):
tag_name = tag.name
elif isinstance(tag, galaxy.model.ItemTagAssociation):
tag_name = tag.user_tname
# Check for an item-tag association to see if item has a given tag.
item_tag_assoc = self._get_item_tag_assoc(user, item, tag_name)
if item_tag_assoc:
return True
return False
[docs] def apply_item_tag(self, user, item, name, value=None, flush=True):
# Use lowercase name for searching/creating tag.
if name is None:
return
lc_name = name.lower()
# Get or create item-tag association.
item_tag_assoc = self._get_item_tag_assoc(user, item, lc_name)
# If the association does not exist, or if it has a different value, add another.
# We do allow multiple associations with different values.
if not item_tag_assoc or (item_tag_assoc and item_tag_assoc.value != value):
# Create item-tag association.
# Create tag; if None, skip the tag (and log error).
tag = self._get_or_create_tag(lc_name)
if not tag:
log.warning(f"Failed to create tag with name {lc_name}")
return
# Create tag association based on item class.
item_tag_assoc_class = self.get_tag_assoc_class(item.__class__)
item_tag_assoc = item_tag_assoc_class()
# Add tag to association.
item.tags.append(item_tag_assoc)
item_tag_assoc.tag = tag
item_tag_assoc.user = user
# Apply attributes to item-tag association. Strip whitespace from user name and tag.
lc_value = None
if value:
lc_value = value.lower()
item_tag_assoc.user_tname = name
item_tag_assoc.user_value = value
item_tag_assoc.value = lc_value
if flush:
self.sa_session.flush()
return item_tag_assoc
[docs] def apply_item_tags(self, user, item, tags_str, flush=True):
"""Apply tags to an item."""
# Parse tags.
parsed_tags = self.parse_tags(tags_str)
# Apply each tag.
for name, value in parsed_tags:
self.apply_item_tag(user, item, name, value, flush=flush)
[docs] def get_tags_str(self, tags):
"""Build a string from an item's tags."""
# Return empty string if there are no tags.
if not tags:
return ""
# Create string of tags.
tags_str_list = list()
for tag in tags:
tag_str = tag.user_tname
if tag.value is not None:
tag_str += f":{tag.user_value}"
tags_str_list.append(tag_str)
return ", ".join(tags_str_list)
[docs] def get_tag_by_id(self, tag_id):
"""Get a Tag object from a tag id."""
return self.sa_session.query(galaxy.model.Tag).filter_by(id=tag_id).first()
[docs] def get_tag_by_name(self, tag_name):
"""Get a Tag object from a tag name (string)."""
if tag_name:
return self.sa_session.query(galaxy.model.Tag).filter_by(name=tag_name.lower()).first()
return None
def _create_tag(self, tag_str: str):
"""Create a Tag object from a tag string."""
tag_hierarchy = tag_str.split(self.hierarchy_separator)
tag_prefix = ""
parent_tag = None
tag = None
for sub_tag in tag_hierarchy:
# Get or create subtag.
sub_tag_name = self._scrub_tag_name(sub_tag)
if sub_tag_name:
tag_name = tag_prefix + sub_tag_name
tag = self._get_tag(tag_name)
if not tag:
tag = self._create_tag_instance(tag_name)
# Set tag parent.
tag.parent = parent_tag
# Update parent and tag prefix.
parent_tag = tag
tag_prefix = tag.name + self.hierarchy_separator
return tag
def _get_tag(self, tag_name):
return self.sa_session.query(galaxy.model.Tag).filter_by(name=tag_name).first()
def _create_tag_instance(self, tag_name):
# For good performance caller should first check if there's already an appropriate tag
tag = galaxy.model.Tag(type=0, name=tag_name)
if not self.sa_session:
return tag
Session = sessionmaker(self.sa_session.bind)
with Session() as separate_session:
separate_session.add(tag)
try:
separate_session.commit()
separate_session.flush()
except IntegrityError:
# tag already exists, get from database
separate_session.rollback()
return self._get_tag(tag_name)
def _get_or_create_tag(self, tag_str):
"""Get or create a Tag object from a tag string."""
# Scrub tag; if tag is None after being scrubbed, return None.
scrubbed_tag_str = self._scrub_tag_name(tag_str)
if not scrubbed_tag_str:
return None
# Get item tag.
tag = self.get_tag_by_name(scrubbed_tag_str)
# Create tag if necessary.
if tag is None:
tag = self._create_tag(scrubbed_tag_str)
return tag
def _get_item_tag_assoc(self, user, item, tag_name):
"""
Return ItemTagAssociation object for a user, item, and tag string; returns None if there is
no such association.
"""
scrubbed_tag_name = self._scrub_tag_name(tag_name)
for item_tag_assoc in item.tags:
if (item_tag_assoc.user == user) and (item_tag_assoc.user_tname == scrubbed_tag_name):
return item_tag_assoc
return None
[docs] def parse_tags(self, tag_str):
"""
Return a list of tag tuples (name, value) pairs derived from a string.
>>> th = TagHandler("bridge_of_death")
>>> assert th.parse_tags("#ARTHUR") == [('name', 'ARTHUR')]
>>> tags = th.parse_tags("name:Lancelot of Camelot;#Holy Grail;blue")
>>> assert tags == [('name', 'LancelotofCamelot'), ('name', 'HolyGrail'), ('blue', None)]
"""
# Gracefully handle None.
if not tag_str:
return dict()
# Strip unicode control characters
tag_str = strip_control_characters(tag_str)
# Split tags based on separators.
reg_exp = re.compile(f"[{self.tag_separators}]")
raw_tags = reg_exp.split(tag_str)
return self.parse_tags_list(raw_tags)
[docs] def parse_tags_list(self, tags_list):
"""
Return a list of tag tuples (name, value) pairs derived from a list.
Method scrubs tag names and values as well.
>>> th = TagHandler("bridge_of_death")
>>> tags = th.parse_tags_list(["name:Lancelot of Camelot", "#Holy Grail", "blue"])
>>> assert tags == [('name', 'LancelotofCamelot'), ('name', 'HolyGrail'), ('blue', None)]
"""
name_value_pairs = []
for raw_tag in tags_list:
nv_pair = self._get_name_value_pair(raw_tag)
scrubbed_name = self._scrub_tag_name(nv_pair[0])
scrubbed_value = self._scrub_tag_value(nv_pair[1])
# Append tag_name, tag_value tuple -- TODO use NamedTuple
name_value_pairs.append((scrubbed_name, scrubbed_value))
return name_value_pairs
def _scrub_tag_value(self, value):
"""Scrub a tag value."""
# Gracefully handle None:
if not value:
return None
# Remove whitespace from value.
reg_exp = re.compile(r'\s')
scrubbed_value = re.sub(reg_exp, "", value)
return scrubbed_value
def _scrub_tag_name(self, name):
"""Scrub a tag name."""
# Gracefully handle None:
if not name:
return None
# Remove whitespace from name.
reg_exp = re.compile(r'\s')
scrubbed_name = re.sub(reg_exp, "", name)
# Ignore starting ':' char.
if scrubbed_name.startswith(self.hierarchy_separator):
scrubbed_name = scrubbed_name[1:]
# If name is too short or too long, return None.
if len(scrubbed_name) < self.min_tag_len or len(scrubbed_name) > self.max_tag_len:
return None
return scrubbed_name
def _scrub_tag_name_list(self, tag_name_list):
"""Scrub a tag name list."""
scrubbed_tag_list = list()
for tag in tag_name_list:
scrubbed_tag_list.append(self._scrub_tag_name(tag))
return scrubbed_tag_list
def _get_name_value_pair(self, tag_str):
"""Get name, value pair from a tag string."""
# Use regular expression to parse name, value.
if tag_str.startswith('#'):
tag_str = f"name:{tag_str[1:]}"
reg_exp = re.compile(f"[{self.key_value_separators}]")
name_value_pair = reg_exp.split(tag_str, 1)
# Add empty slot if tag does not have value.
if len(name_value_pair) < 2:
name_value_pair.append(None)
return name_value_pair
[docs]class GalaxyTagHandler(TagHandler):
[docs] def __init__(self, sa_session: scoped_session):
from galaxy import model
TagHandler.__init__(self, sa_session)
self.item_tag_assoc_info["History"] = ItemTagAssocInfo(model.History,
model.HistoryTagAssociation,
model.HistoryTagAssociation.history_id)
self.item_tag_assoc_info["HistoryDatasetAssociation"] = \
ItemTagAssocInfo(model.HistoryDatasetAssociation,
model.HistoryDatasetAssociationTagAssociation,
model.HistoryDatasetAssociationTagAssociation.history_dataset_association_id)
self.item_tag_assoc_info["HistoryDatasetCollectionAssociation"] = \
ItemTagAssocInfo(model.HistoryDatasetCollectionAssociation,
model.HistoryDatasetCollectionTagAssociation,
model.HistoryDatasetCollectionTagAssociation.history_dataset_collection_id)
self.item_tag_assoc_info["LibraryDatasetDatasetAssociation"] = \
ItemTagAssocInfo(model.LibraryDatasetDatasetAssociation,
model.LibraryDatasetDatasetAssociationTagAssociation,
model.LibraryDatasetDatasetAssociationTagAssociation.library_dataset_dataset_association_id)
self.item_tag_assoc_info["Page"] = ItemTagAssocInfo(model.Page,
model.PageTagAssociation,
model.PageTagAssociation.page_id)
self.item_tag_assoc_info["StoredWorkflow"] = ItemTagAssocInfo(model.StoredWorkflow,
model.StoredWorkflowTagAssociation,
model.StoredWorkflowTagAssociation.stored_workflow_id)
self.item_tag_assoc_info["Visualization"] = ItemTagAssocInfo(model.Visualization,
model.VisualizationTagAssociation,
model.VisualizationTagAssociation.visualization_id)
[docs]class GalaxyTagHandlerSession(GalaxyTagHandler):
"""Like GalaxyTagHandler, but avoids one flush per created tag."""
def _get_tag(self, tag_name):
"""Get tag from cache or database."""
# Avoids creating multiple new tags with the same tag_name, which violates unique key constraint
return self.created_tags.get(tag_name) or super(GalaxyTagHandler, self)._get_tag(tag_name)
def _create_tag_instance(self, tag_name):
"""Create tag and and store in cache."""
tag = super()._create_tag_instance(tag_name)
self.created_tags[tag_name] = tag
return tag
[docs]class GalaxySessionlessTagHandler(GalaxyTagHandlerSession):
def _get_tag(self, tag_name):
"""Get tag from cache or database."""
# Short-circuit session access
return self.created_tags.get(tag_name)