Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.webapp.model

import logging
import os
import random
import string
import weakref
from datetime import (
    datetime,
    timedelta,
)
from typing import (
    Any,
    Mapping,
    TYPE_CHECKING,
)

from mercurial import (
    hg,
    ui,
)
from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    desc,
    ForeignKey,
    Integer,
    not_,
    String,
    Table,
    TEXT,
    true,
    UniqueConstraint,
)
from sqlalchemy.orm import (
    registry,
    relationship,
)

import tool_shed.repository_types.util as rt_util
from galaxy import util
from galaxy.model.custom_types import (
    MutableJSONType,
    TrimmedString,
)
from galaxy.model.orm.now import now
from galaxy.model.orm.util import add_object_to_object_session
from galaxy.security.validate_user_input import validate_password_str
from galaxy.util import unique_id
from galaxy.util.bunch import Bunch
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.hash_util import new_insecure_hash
from tool_shed.dependencies.repository import relation_builder
from tool_shed.util import (
    hg_util,
    metadata_util,
)
from tool_shed.util.hgweb_config import hgweb_config_manager

log = logging.getLogger(__name__)

WEAK_HG_REPO_CACHE: Mapping["Repository", Any] = weakref.WeakKeyDictionary()

if TYPE_CHECKING:
    # Workaround for https://github.com/python/mypy/issues/14182
    from sqlalchemy.orm.decl_api import DeclarativeMeta as _DeclarativeMeta

    class DeclarativeMeta(_DeclarativeMeta, type):
        pass

else:
    from sqlalchemy.orm.decl_api import DeclarativeMeta

mapper_registry = registry()


[docs]class Base(metaclass=DeclarativeMeta): __abstract__ = True registry = mapper_registry metadata = mapper_registry.metadata __init__ = mapper_registry.constructor @classmethod def __declare_last__(cls): cls.table = cls.__table__
[docs]class APIKeys(Base): __tablename__ = "api_keys" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) user_id = Column(ForeignKey("galaxy_user.id"), index=True) key = Column(TrimmedString(32), index=True, unique=True) user = relationship("User", back_populates="api_keys") deleted = Column(Boolean, index=True, default=False)
[docs]class User(Base, Dictifiable): __tablename__ = "galaxy_user" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) email = Column(TrimmedString(255), nullable=False) username = Column(String(255), index=True) password = Column(TrimmedString(40), nullable=False) external = Column(Boolean, default=False) new_repo_alert = Column(Boolean, default=False) deleted = Column(Boolean, index=True, default=False) purged = Column(Boolean, index=True, default=False) active_repositories = relationship( "Repository", primaryjoin=(lambda: (Repository.user_id == User.id) & (not_(Repository.deleted))), # type: ignore[has-type] back_populates="user", order_by=lambda: desc(Repository.name), # type: ignore[has-type] ) galaxy_sessions = relationship( "GalaxySession", back_populates="user", order_by=lambda: desc(GalaxySession.update_time) # type: ignore[has-type] ) api_keys = relationship("APIKeys", back_populates="user", order_by=lambda: desc(APIKeys.create_time)) reset_tokens = relationship("PasswordResetToken", back_populates="user") groups = relationship("UserGroupAssociation", back_populates="user") dict_collection_visible_keys = ["id", "username"] dict_element_visible_keys = ["id", "username"] bootstrap_admin_user = False roles = relationship("UserRoleAssociation", back_populates="user") non_private_roles = relationship( "UserRoleAssociation", viewonly=True, primaryjoin=( lambda: (User.id == UserRoleAssociation.user_id) # type: ignore[has-type] & (UserRoleAssociation.role_id == Role.id) # type: ignore[has-type] & not_(Role.name == User.email) # type: ignore[has-type] ), ) def __init__(self, email=None, password=None): self.email = email self.password = password self.external = False self.deleted = False self.purged = False self.new_repo_alert = False
[docs] def all_roles(self): roles = [ura.role for ura in self.roles] for group in [uga.group for uga in self.groups]: for role in [gra.role for gra in group.roles]: if role not in roles: roles.append(role) return roles
[docs] def check_password(self, cleartext): """Check if 'cleartext' matches 'self.password' when hashed.""" return self.password == new_insecure_hash(text_type=cleartext)
[docs] def get_disk_usage(self, nice_size=False): return 0
@property def nice_total_disk_usage(self): return 0
[docs] def set_disk_usage(self, bytes): pass
total_disk_usage = property(get_disk_usage, set_disk_usage)
[docs] def set_password_cleartext(self, cleartext): message = validate_password_str(cleartext) if message: raise Exception(f"Invalid password: {message}") # Set 'self.password' to the digest of 'cleartext'. self.password = new_insecure_hash(text_type=cleartext)
[docs] def set_random_password(self, length=16): """ Sets user password to a random string of the given length. :return: void """ self.set_password_cleartext( "".join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(length)) )
[docs]class PasswordResetToken(Base): __tablename__ = "password_reset_token" token = Column(String(32), primary_key=True, unique=True, index=True) expiration_time = Column(DateTime) user_id = Column(ForeignKey("galaxy_user.id"), index=True) user = relationship("User", back_populates="reset_tokens") def __init__(self, user, token=None): if token: self.token = token else: self.token = unique_id() add_object_to_object_session(self, user) self.user = user self.expiration_time = now() + timedelta(hours=24)
[docs]class Group(Base, Dictifiable): __tablename__ = "galaxy_group" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) name = Column(String(255), index=True, unique=True) deleted = Column(Boolean, index=True, default=False) roles = relationship("GroupRoleAssociation", back_populates="group") users = relationship("UserGroupAssociation", back_populates="group") dict_collection_visible_keys = ["id", "name"] dict_element_visible_keys = ["id", "name"] def __init__(self, name=None): self.name = name self.deleted = False
[docs]class Role(Base, Dictifiable): __tablename__ = "role" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) name = Column(String(255), index=True, unique=True) description = Column(TEXT) type = Column(String(40), index=True) deleted = Column(Boolean, index=True, default=False) repositories = relationship("RepositoryRoleAssociation", back_populates="role") groups = relationship("GroupRoleAssociation", back_populates="role") users = relationship("UserRoleAssociation", back_populates="role") dict_collection_visible_keys = ["id", "name"] dict_element_visible_keys = ["id", "name", "description", "type"] private_id = None types = Bunch(PRIVATE="private", SYSTEM="system", USER="user", ADMIN="admin", SHARING="sharing") def __init__(self, name=None, description=None, type=types.SYSTEM, deleted=False): self.name = name self.description = description self.type = type self.deleted = deleted @property def is_repository_admin_role(self): # A repository admin role must always be associated with a repository. The mapper returns an # empty list for those roles that have no repositories. This method will require changes if # new features are introduced that results in more than one role per repository. if self.repositories: return True return False
[docs]class UserGroupAssociation(Base): __tablename__ = "user_group_association" id = Column(Integer, primary_key=True) user_id = Column(ForeignKey("galaxy_user.id"), index=True) group_id = Column(ForeignKey("galaxy_group.id"), index=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) user = relationship("User", back_populates="groups") group = relationship("Group", back_populates="users") def __init__(self, user, group): add_object_to_object_session(self, user) self.user = user self.group = group
[docs]class UserRoleAssociation(Base): __tablename__ = "user_role_association" id = Column(Integer, primary_key=True) user_id = Column(ForeignKey("galaxy_user.id"), index=True) role_id = Column(ForeignKey("role.id"), index=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) user = relationship("User", back_populates="roles") role = relationship("Role", back_populates="users") def __init__(self, user, role): add_object_to_object_session(self, user) self.user = user add_object_to_object_session(self, role) self.role = role
[docs]class GroupRoleAssociation(Base): __tablename__ = "group_role_association" id = Column(Integer, primary_key=True) group_id = Column(ForeignKey("galaxy_group.id"), index=True) role_id = Column(ForeignKey("role.id"), index=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) group = relationship("Group", back_populates="roles") role = relationship("Role", back_populates="groups") def __init__(self, group, role): self.group = group self.role = role
[docs]class RepositoryRoleAssociation(Base): __tablename__ = "repository_role_association" id = Column(Integer, primary_key=True) repository_id = Column(ForeignKey("repository.id"), index=True) role_id = Column(ForeignKey("role.id"), index=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) repository = relationship("Repository", back_populates="roles") role = relationship("Role", back_populates="repositories") def __init__(self, repository, role): add_object_to_object_session(self, repository) self.repository = repository self.role = role
[docs]class GalaxySession(Base): __tablename__ = "galaxy_session" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) user_id = Column(ForeignKey("galaxy_user.id"), index=True, nullable=True) remote_host = Column(String(255)) remote_addr = Column(String(255)) referer = Column(TEXT) # unique 128 bit random number coerced to a string session_key = Column(TrimmedString(255), index=True, unique=True) is_valid = Column(Boolean, default=False) # saves a reference to the previous session so we have a way to chain them together prev_session_id = Column(Integer) last_action = Column(DateTime) user = relationship("User", back_populates="galaxy_sessions") def __init__(self, is_valid=False, **kwd): super().__init__(**kwd) self.is_valid = is_valid self.last_action = self.last_action or datetime.now()
[docs]class Repository(Base, Dictifiable): __tablename__ = "repository" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) name = Column(TrimmedString(255), index=True) type = Column(TrimmedString(255), index=True) remote_repository_url = Column(TrimmedString(255)) homepage_url = Column(TrimmedString(255)) description = Column(TEXT) long_description = Column(TEXT) user_id = Column(ForeignKey("galaxy_user.id"), index=True) private = Column(Boolean, default=False) deleted = Column(Boolean, index=True, default=False) email_alerts = Column(MutableJSONType, nullable=True) times_downloaded = Column(Integer) deprecated = Column(Boolean, default=False) categories = relationship("RepositoryCategoryAssociation", back_populates="repository") ratings = relationship( "RepositoryRatingAssociation", order_by=lambda: desc(RepositoryRatingAssociation.update_time), back_populates="repository", ) user = relationship("User", back_populates="active_repositories") downloadable_revisions = relationship( "RepositoryMetadata", primaryjoin=lambda: (Repository.id == RepositoryMetadata.repository_id) & (RepositoryMetadata.downloadable == true()), # type: ignore[attr-defined,has-type] viewonly=True, order_by=lambda: desc(RepositoryMetadata.update_time), # type: ignore[attr-defined] ) metadata_revisions = relationship( "RepositoryMetadata", order_by=lambda: desc(RepositoryMetadata.update_time), # type: ignore[attr-defined] back_populates="repository", ) roles = relationship("RepositoryRoleAssociation", back_populates="repository") dict_collection_visible_keys = [ "id", "name", "type", "remote_repository_url", "homepage_url", "description", "user_id", "private", "deleted", "times_downloaded", "deprecated", "create_time", ] dict_element_visible_keys = [ "id", "name", "type", "remote_repository_url", "homepage_url", "description", "long_description", "user_id", "private", "deleted", "times_downloaded", "deprecated", "create_time", ] file_states = Bunch(NORMAL="n", NEEDS_MERGING="m", MARKED_FOR_REMOVAL="r", MARKED_FOR_ADDITION="a", NOT_TRACKED="?") def __init__(self, private=False, times_downloaded=0, deprecated=False, **kwd): super().__init__(**kwd) self.private = private self.times_downloaded = times_downloaded self.deprecated = deprecated self.name = self.name or "Unnamed repository" @property def hg_repo(self): if not WEAK_HG_REPO_CACHE.get(self): WEAK_HG_REPO_CACHE[self] = hg.cachedlocalrepo(hg.repository(ui.ui(), self.repo_path().encode("utf-8"))) return WEAK_HG_REPO_CACHE[self].fetch()[0] @property def admin_role(self): admin_role_name = f"{str(self.name)}_{str(self.user.username)}_admin" for rra in self.roles: role = rra.role if str(role.name) == admin_role_name: return role raise Exception( "Repository %s owned by %s is not associated with a required administrative role." % (str(self.name), str(self.user.username)) )
[docs] def allow_push(self): hgrc_file = hg_util.get_hgrc_path(self.repo_path()) with open(hgrc_file) as fh: for line in fh.read().splitlines(): if line.startswith("allow_push = "): return line[len("allow_push = ") :] return ""
[docs] def can_change_type(self): # Allow changing the type only if the repository has no contents, has never been installed, or has # never been changed from the default type. if self.is_new(): return True if self.times_downloaded == 0: return True if self.type == rt_util.UNRESTRICTED: return True return False
[docs] def can_change_type_to(self, app, new_type_label): if self.type == new_type_label: return False if self.can_change_type(): new_type = app.repository_types_registry.get_class_by_label(new_type_label) if new_type.is_valid_for_type(self): return True return False
[docs] def get_changesets_for_setting_metadata(self, app): type_class = self.get_type_class(app) return type_class.get_changesets_for_setting_metadata(app, self)
[docs] def get_repository_dependencies(self, app, changeset, toolshed_url): # We aren't concerned with repositories of type tool_dependency_definition here if a # repository_metadata record is not returned because repositories of this type will never # have repository dependencies. However, if a readme file is uploaded, or some other change # is made that does not create a new downloadable changeset revision but updates the existing # one, we still want to be able to get repository dependencies. repository_metadata = metadata_util.get_current_repository_metadata_for_changeset_revision(app, self, changeset) if repository_metadata: metadata = repository_metadata.metadata if metadata: rb = relation_builder.RelationBuilder(app, self, repository_metadata, toolshed_url) repository_dependencies = rb.get_repository_dependencies_for_changeset_revision() if repository_dependencies: return repository_dependencies return None
[docs] def get_type_class(self, app): return app.repository_types_registry.get_class_by_label(self.type)
[docs] def get_tool_dependencies(self, app, changeset_revision): changeset_revision = metadata_util.get_next_downloadable_changeset_revision(app, self, changeset_revision) for downloadable_revision in self.downloadable_revisions: if downloadable_revision.changeset_revision == changeset_revision: return downloadable_revision.metadata.get("tool_dependencies", {}) return {}
[docs] def installable_revisions(self, app, sort_revisions=True): return metadata_util.get_metadata_revisions(app, self, sort_revisions=sort_revisions)
[docs] def is_new(self): tip_rev = self.hg_repo.changelog.tiprev() return tip_rev < 0
[docs] def repo_path(self, app=None): # Keep app argument for compatibility with tool_shed_install Repository model return hgweb_config_manager.get_entry(os.path.join("repos", self.user.username, self.name))
[docs] def revision(self): repo = self.hg_repo tip_ctx = repo[repo.changelog.tip()] return f"{str(tip_ctx.rev())}:{str(tip_ctx)}"
[docs] def set_allow_push(self, usernames, remove_auth=""): allow_push = util.listify(self.allow_push()) if remove_auth: allow_push.remove(remove_auth) else: for username in util.listify(usernames): if username not in allow_push: allow_push.append(username) allow_push = f"{','.join(allow_push)}\n" # Why doesn't the following work? # repo.ui.setconfig('web', 'allow_push', allow_push) repo_dir = self.repo_path() hgrc_file = hg_util.get_hgrc_path(repo_dir) with open(hgrc_file) as fh: lines = fh.readlines() with open(hgrc_file, "w") as fh: for line in lines: if line.startswith("allow_push"): fh.write(f"allow_push = {allow_push}") else: fh.write(line)
[docs] def tip(self): repo = self.hg_repo return str(repo[repo.changelog.tip()])
[docs] def to_dict(self, view="collection", value_mapper=None): rval = super().to_dict(view=view, value_mapper=value_mapper) if "user_id" in rval: rval["owner"] = self.user.username return rval
[docs]class ItemRatingAssociation:
[docs] def __init__(self, id=None, user=None, item=None, rating=0, comment=""): self.id = id self.user = user self.item = item self.rating = rating self.comment = comment
[docs] def set_item(self, item): """Set association's item."""
[docs]class RepositoryRatingAssociation(Base, ItemRatingAssociation): __tablename__ = "repository_rating_association" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) repository_id = Column(ForeignKey("repository.id"), index=True) user_id = Column(ForeignKey("galaxy_user.id"), index=True) rating = Column(Integer, index=True) comment = Column(TEXT) repository = relationship("Repository", back_populates="ratings") user = relationship("User")
[docs] def set_item(self, repository): self.repository = repository
[docs]class Category(Base, Dictifiable): __tablename__ = "category" id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) name = Column(TrimmedString(255), index=True, unique=True) description = Column(TEXT) deleted = Column(Boolean, index=True, default=False) repositories = relationship("RepositoryCategoryAssociation", back_populates="category") dict_collection_visible_keys = ["id", "name", "description", "deleted"] dict_element_visible_keys = ["id", "name", "description", "deleted"] def __init__(self, deleted=False, **kwd): super().__init__(**kwd) self.deleted = deleted
[docs]class RepositoryCategoryAssociation(Base): __tablename__ = "repository_category_association" id = Column(Integer, primary_key=True) repository_id = Column(ForeignKey("repository.id"), index=True) category_id = Column(ForeignKey("category.id"), index=True) category = relationship("Category", back_populates="repositories") repository = relationship("Repository", back_populates="categories") def __init__(self, repository=None, category=None): self.repository = repository self.category = category
[docs]class Tag(Base): __tablename__ = "tag" __table_args__ = (UniqueConstraint("name"),) id = Column(Integer, primary_key=True) type = Column(Integer) parent_id = Column(ForeignKey("tag.id")) name = Column(TrimmedString(255)) children = relationship("Tag", back_populates="parent") parent = relationship("Tag", back_populates="children", remote_side=[id]) def __str__(self): return "Tag(id=%s, type=%i, parent_id=%s, name=%s)" % (self.id, self.type, self.parent_id, self.name)
# The RepositoryMetadata model is mapped imperatively (for details see discussion in PR #12064). # TLDR: a declaratively-mapped class cannot have a .metadata attribute (it is used by SQLAlchemy's DeclarativeBase).
[docs]class RepositoryMetadata(Dictifiable): # Once the class has been mapped, all Column items in this table will be available # as instrumented class attributes on RepositoryMetadata. table = Table( "repository_metadata", mapper_registry.metadata, Column("id", Integer, primary_key=True), Column("create_time", DateTime, default=now), Column("update_time", DateTime, default=now, onupdate=now), Column("repository_id", ForeignKey("repository.id"), index=True), Column("changeset_revision", TrimmedString(255), index=True), Column("numeric_revision", Integer, index=True), Column("metadata", MutableJSONType, nullable=True), Column("tool_versions", MutableJSONType, nullable=True), Column("malicious", Boolean, default=False), Column("downloadable", Boolean, default=True), Column("missing_test_components", Boolean, default=False, index=True), Column("has_repository_dependencies", Boolean, default=False, index=True), Column("includes_datatypes", Boolean, default=False, index=True), Column("includes_tools", Boolean, default=False, index=True), Column("includes_tool_dependencies", Boolean, default=False, index=True), Column("includes_workflows", Boolean, default=False, index=True), ) dict_collection_visible_keys = [ "id", "repository_id", "numeric_revision", "changeset_revision", "malicious", "downloadable", "missing_test_components", "has_repository_dependencies", "includes_datatypes", "includes_tools", "includes_tool_dependencies", "includes_tools_for_display_in_tool_panel", "includes_workflows", ] dict_element_visible_keys = [ "id", "repository_id", "numeric_revision", "changeset_revision", "malicious", "downloadable", "missing_test_components", "has_repository_dependencies", "includes_datatypes", "includes_tools", "includes_tool_dependencies", "includes_tools_for_display_in_tool_panel", "includes_workflows", "repository_dependencies", ] def __init__( self, id=None, repository_id=None, numeric_revision=None, changeset_revision=None, metadata=None, tool_versions=None, malicious=False, downloadable=False, missing_test_components=None, tools_functionally_correct=False, test_install_error=False, has_repository_dependencies=False, includes_datatypes=False, includes_tools=False, includes_tool_dependencies=False, includes_workflows=False, ): self.id = id self.repository_id = repository_id self.numeric_revision = numeric_revision self.changeset_revision = changeset_revision self.metadata = metadata self.tool_versions = tool_versions self.malicious = malicious self.downloadable = downloadable self.missing_test_components = missing_test_components self.has_repository_dependencies = has_repository_dependencies # We don't consider the special case has_repository_dependencies_only_if_compiling_contained_td here. self.includes_datatypes = includes_datatypes self.includes_tools = includes_tools self.includes_tool_dependencies = includes_tool_dependencies self.includes_workflows = includes_workflows @property def includes_tools_for_display_in_tool_panel(self): if self.metadata: tool_dicts = self.metadata.get("tools", []) for tool_dict in tool_dicts: if tool_dict.get("add_to_tool_panel", True): return True return False @property def repository_dependencies(self): if self.has_repository_dependencies: return [ repository_dependency for repository_dependency in self.metadata["repository_dependencies"]["repository_dependencies"] ] return []
# After the map_imperatively statement has been executed, the members of the # properties dictionary (repository) will be available as instrumented # class attributes on RepositoryMetadata. mapper_registry.map_imperatively( RepositoryMetadata, RepositoryMetadata.table, properties=dict( repository=relationship(Repository, back_populates="metadata_revisions"), ), ) # Utility methods
[docs]def sort_by_attr(seq, attr): """ Sort the sequence of objects by object's attribute Arguments: seq - the list or any sequence (including immutable one) of objects to sort. attr - the name of attribute to sort by """ # Use the "Schwartzian transform" # Create the auxiliary list of tuples where every i-th tuple has form # (seq[i].attr, i, seq[i]) and sort it. The second item of tuple is needed not # only to provide stable sorting, but mainly to eliminate comparison of objects # (which can be expensive or prohibited) in case of equal attribute values. intermed = [(getattr(v, attr), i, v) for i, v in enumerate(seq)] intermed.sort() return [_[-1] for _ in intermed]