Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.taggable

"""
Mixins for Taggable model managers and serializers.
"""

# from galaxy import exceptions as galaxy_exceptions

import logging
from typing import Type

from sqlalchemy import (
    func,
    sql,
)

from galaxy import model
from galaxy.model.tags import GalaxyTagHandler
from galaxy.util import unicodify
from .base import (
    ModelValidator,
    raise_filter_err,
)

log = logging.getLogger(__name__)


# TODO: work out the relation between serializers and managers and then fold these into the parent of the two
def _tag_str_gen(item):
    # TODO: which user is this? all?
    for tag in item.tags:
        tag_str = tag.user_tname
        if tag.value is not None:
            tag_str += f":{tag.user_value}"
        yield tag_str


def _tags_to_strings(item):
    if not hasattr(item, "tags"):
        return None
    return sorted(list(_tag_str_gen(item)))


def _tags_from_strings(item, tag_handler, new_tags_list, user=None):
    # TODO: have to assume trans.user here...
    if not user:
        # raise galaxy_exceptions.RequestParameterMissingException( 'User required for tags on ' + str( item ) )
        # TODO: this becomes a 'silent failure' - no tags are set. This is a questionable approach but
        # I haven't found a better one for anon users copying items with tags
        return
    # TODO: duped from tags manager - de-dupe when moved to taggable mixin
    tag_handler.delete_item_tags(user, item)
    new_tags_str = ",".join(new_tags_list)
    tag_handler.apply_item_tags(user, item, unicodify(new_tags_str, "utf-8"))
    # TODO:!! does the creation of new_tags_list mean there are now more and more unused tag rows in the db?


[docs]class TaggableManagerMixin: tag_assoc: Type[model.ItemTagAssociation] tag_handler: GalaxyTagHandler # TODO: most of this can be done by delegating to the GalaxyTagHandler?
[docs] def get_tags(self, item): """ Return a list of tag strings. """ return _tags_to_strings(item)
[docs] def set_tags(self, item, new_tags, user=None): """ Set an `item`'s tags from a list of strings. """ return _tags_from_strings(item, self.tag_handler, new_tags, user=user)
# def tags_by_user( self, user, **kwargs ): # TODO: here or GalaxyTagHandler # pass
[docs]class TaggableSerializerMixin:
[docs] def add_serializers(self): self.serializers["tags"] = self.serialize_tags
[docs] def serialize_tags(self, item, key, **context): """ Return tags as a list of strings. """ return _tags_to_strings(item)
[docs]class TaggableDeserializerMixin: tag_handler: GalaxyTagHandler validate: ModelValidator
[docs] def add_deserializers(self): self.deserializers["tags"] = self.deserialize_tags
[docs] def deserialize_tags(self, item, key, val, user=None, **context): """ Make sure `val` is a valid list of tag strings and assign them. Note: this will erase any previous tags. """ new_tags_list = self.validate.basestring_list(key, val) _tags_from_strings(item, self.tag_handler, new_tags_list, user=user) return item.tags
[docs]class TaggableFilterMixin: valid_ops = ("eq", "contains", "has")
[docs] def create_tag_filter(self, attr, op, val): def _create_tag_filter(model_class=None): if op not in TaggableFilterMixin.valid_ops: raise_filter_err(attr, op, val, "bad op in filter") if model_class is None: return True class_name = model_class.__name__ if class_name == "HistoryDatasetCollectionAssociation": # Unfortunately we were a little inconsistent with our naming scheme class_name = "HistoryDatasetCollection" target_model = getattr(model, f"{class_name}TagAssociation") id_column = f"{target_model.table.name.rsplit('_tag_association')[0]}_id" column = target_model.table.c.user_tname + ":" + target_model.table.c.user_value lower_val = val.lower() # Ignore case if op == "eq": if ":" not in lower_val: # We require an exact match and the tag to look for has no user_value, # so we can't just concatenate user_tname, ':' and user_vale cond = func.lower(target_model.table.c.user_tname) == lower_val else: cond = func.lower(column) == lower_val else: cond = func.lower(column).contains(lower_val, autoescape=True) return sql.expression.and_(model_class.table.c.id == getattr(target_model.table.c, id_column), cond) return _create_tag_filter
def _add_parsers(self): self.orm_filter_parsers.update({"tag": self.create_tag_filter})