Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.webapp.model

import logging
import os
import weakref
from datetime import (
    datetime,
    timedelta
)

import tool_shed.repository_types.util as rt_util
from galaxy import util
from galaxy.model.orm.now import now
from galaxy.security.validate_user_input import validate_password_str
from galaxy.util import unique_id
from galaxy.util.bunch import Bunch
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.hash_util import new_secure_hash
from tool_shed.dependencies.repository import relation_builder
from tool_shed.util import (
    hg_util,
    metadata_util
)
from tool_shed.util.hgweb_config import hgweb_config_manager

log = logging.getLogger(__name__)

WEAK_HG_REPO_CACHE = weakref.WeakKeyDictionary()


[docs]class APIKeys(object): pass
[docs]class User(Dictifiable): dict_collection_visible_keys = ['id', 'username'] dict_element_visible_keys = ['id', 'username'] def __init__(self, email=None, password=None): self.email = email self.password = password self.external = False self.deleted = False self.purged = False self.username = None self.new_repo_alert = False
[docs] def all_roles(self): roles = [ura.role for ura in self.roles] for group in [uga.group for uga in self.groups]: for role in [gra.role for gra in group.roles]: if role not in roles: roles.append(role) return roles
[docs] def check_password(self, cleartext): """Check if 'cleartext' matches 'self.password' when hashed.""" return self.password == new_secure_hash(text_type=cleartext)
[docs] def get_disk_usage(self, nice_size=False): return 0
@property def nice_total_disk_usage(self): return 0
[docs] def set_disk_usage(self, bytes): pass
total_disk_usage = property(get_disk_usage, set_disk_usage)
[docs] def set_password_cleartext(self, cleartext): message = validate_password_str(cleartext) if message: raise Exception("Invalid password: %s" % message) """Set 'self.password' to the digest of 'cleartext'.""" self.password = new_secure_hash(text_type=cleartext)
[docs]class PasswordResetToken(object): def __init__(self, user, token=None): if token: self.token = token else: self.token = unique_id() self.user = user self.expiration_time = now() + timedelta(hours=24)
[docs]class Group(Dictifiable): dict_collection_visible_keys = ['id', 'name'] dict_element_visible_keys = ['id', 'name'] def __init__(self, name=None): self.name = name self.deleted = False
[docs]class Role(Dictifiable): dict_collection_visible_keys = ['id', 'name'] dict_element_visible_keys = ['id', 'name', 'description', 'type'] private_id = None types = Bunch(PRIVATE='private', SYSTEM='system', USER='user', ADMIN='admin', SHARING='sharing') def __init__(self, name="", description="", type="system", deleted=False): self.name = name self.description = description self.type = type self.deleted = deleted @property def is_repository_admin_role(self): # A repository admin role must always be associated with a repository. The mapper returns an # empty list for those roles that have no repositories. This method will require changes if # new features are introduced that results in more than one role per repository. if self.repositories: return True return False
[docs]class UserGroupAssociation(object): def __init__(self, user, group): self.user = user self.group = group
[docs]class UserRoleAssociation(object): def __init__(self, user, role): self.user = user self.role = role
[docs]class GroupRoleAssociation(object): def __init__(self, group, role): self.group = group self.role = role
[docs]class RepositoryRoleAssociation(object): def __init__(self, repository, role): self.repository = repository self.role = role
[docs]class GalaxySession(object): def __init__(self, id=None, user=None, remote_host=None, remote_addr=None, referer=None, current_history=None, session_key=None, is_valid=False, prev_session_id=None, last_action=None): self.id = id self.user = user self.remote_host = remote_host self.remote_addr = remote_addr self.referer = referer self.current_history = current_history self.session_key = session_key self.is_valid = is_valid self.prev_session_id = prev_session_id self.last_action = last_action or datetime.now()
[docs]class Repository(Dictifiable): dict_collection_visible_keys = ['id', 'name', 'type', 'remote_repository_url', 'homepage_url', 'description', 'user_id', 'private', 'deleted', 'times_downloaded', 'deprecated', 'create_time'] dict_element_visible_keys = ['id', 'name', 'type', 'remote_repository_url', 'homepage_url', 'description', 'long_description', 'user_id', 'private', 'deleted', 'times_downloaded', 'deprecated', 'create_time', 'ratings', 'reviews', 'reviewers'] file_states = Bunch(NORMAL='n', NEEDS_MERGING='m', MARKED_FOR_REMOVAL='r', MARKED_FOR_ADDITION='a', NOT_TRACKED='?') def __init__(self, id=None, name=None, type=None, remote_repository_url=None, homepage_url=None, description=None, long_description=None, user_id=None, private=False, deleted=None, email_alerts=None, times_downloaded=0, deprecated=False, create_time=None): self.id = id self.name = name or "Unnamed repository" self.type = type self.remote_repository_url = remote_repository_url self.homepage_url = homepage_url self.description = description self.long_description = long_description self.user_id = user_id self.private = private self.deleted = deleted self.email_alerts = email_alerts self.times_downloaded = times_downloaded self.deprecated = deprecated self.create_time = create_time @property def hg_repo(self): from mercurial import ( hg, ui ) if not WEAK_HG_REPO_CACHE.get(self): WEAK_HG_REPO_CACHE[self] = hg.cachedlocalrepo(hg.repository(ui.ui(), self.repo_path().encode('utf-8'))) return WEAK_HG_REPO_CACHE[self].fetch()[0] @property def admin_role(self): admin_role_name = '%s_%s_admin' % (str(self.name), str(self.user.username)) for rra in self.roles: role = rra.role if str(role.name) == admin_role_name: return role raise Exception('Repository %s owned by %s is not associated with a required administrative role.' % (str(self.name), str(self.user.username)))
[docs] def allow_push(self): hgrc_file = hg_util.get_hgrc_path(self.repo_path()) with open(hgrc_file, 'r') as fh: for line in fh.read().splitlines(): if line.startswith('allow_push = '): return line[len('allow_push = '):] return ''
[docs] def can_change_type(self): # Allow changing the type only if the repository has no contents, has never been installed, or has # never been changed from the default type. if self.is_new(): return True if self.times_downloaded == 0: return True if self.type == rt_util.UNRESTRICTED: return True return False
[docs] def can_change_type_to(self, app, new_type_label): if self.type == new_type_label: return False if self.can_change_type(): new_type = app.repository_types_registry.get_class_by_label(new_type_label) if new_type.is_valid_for_type(self): return True return False
[docs] def get_changesets_for_setting_metadata(self, app): type_class = self.get_type_class(app) return type_class.get_changesets_for_setting_metadata(app, self)
[docs] def get_repository_dependencies(self, app, changeset, toolshed_url): # We aren't concerned with repositories of type tool_dependency_definition here if a # repository_metadata record is not returned because repositories of this type will never # have repository dependencies. However, if a readme file is uploaded, or some other change # is made that does not create a new downloadable changeset revision but updates the existing # one, we still want to be able to get repository dependencies. repository_metadata = metadata_util.get_current_repository_metadata_for_changeset_revision(app, self, changeset) if repository_metadata: metadata = repository_metadata.metadata if metadata: rb = relation_builder.RelationBuilder(app, self, repository_metadata, toolshed_url) repository_dependencies = rb.get_repository_dependencies_for_changeset_revision() if repository_dependencies: return repository_dependencies return None
[docs] def get_type_class(self, app): return app.repository_types_registry.get_class_by_label(self.type)
[docs] def get_tool_dependencies(self, app, changeset_revision): changeset_revision = metadata_util.get_next_downloadable_changeset_revision(app, self, changeset_revision) for downloadable_revision in self.downloadable_revisions: if downloadable_revision.changeset_revision == changeset_revision: return downloadable_revision.metadata.get('tool_dependencies', {}) return {}
[docs] def installable_revisions(self, app, sort_revisions=True): return metadata_util.get_metadata_revisions(app, self, sort_revisions=sort_revisions)
[docs] def is_new(self): tip_rev = self.hg_repo.changelog.tiprev() return tip_rev < 0
[docs] def repo_path(self, app=None): # Keep app argument for compatibility with tool_shed_install Repository model return hgweb_config_manager.get_entry(os.path.join("repos", self.user.username, self.name))
[docs] def revision(self): repo = self.hg_repo tip_ctx = repo[repo.changelog.tip()] return "%s:%s" % (str(tip_ctx.rev()), str(tip_ctx))
[docs] def set_allow_push(self, usernames, remove_auth=''): allow_push = util.listify(self.allow_push()) if remove_auth: allow_push.remove(remove_auth) else: for username in util.listify(usernames): if username not in allow_push: allow_push.append(username) allow_push = '%s\n' % ','.join(allow_push) # Why doesn't the following work? # repo.ui.setconfig('web', 'allow_push', allow_push) repo_dir = self.repo_path() hgrc_file = hg_util.get_hgrc_path(repo_dir) with open(hgrc_file, 'r') as fh: lines = fh.readlines() with open(hgrc_file, 'w') as fh: for line in lines: if line.startswith('allow_push'): fh.write('allow_push = %s' % allow_push) else: fh.write(line)
[docs] def tip(self): repo = self.hg_repo return str(repo[repo.changelog.tip()])
[docs] def to_dict(self, view='collection', value_mapper=None): rval = super(Repository, self).to_dict(view=view, value_mapper=value_mapper) if 'user_id' in rval: rval['owner'] = self.user.username return rval
[docs]class RepositoryMetadata(Dictifiable): dict_collection_visible_keys = ['id', 'repository_id', 'numeric_revision', 'changeset_revision', 'malicious', 'downloadable', 'missing_test_components', 'has_repository_dependencies', 'includes_datatypes', 'includes_tools', 'includes_tool_dependencies', 'includes_tools_for_display_in_tool_panel', 'includes_workflows'] dict_element_visible_keys = ['id', 'repository_id', 'numeric_revision', 'changeset_revision', 'malicious', 'downloadable', 'missing_test_components', 'has_repository_dependencies', 'includes_datatypes', 'includes_tools', 'includes_tool_dependencies', 'includes_tools_for_display_in_tool_panel', 'includes_workflows', 'repository_dependencies'] def __init__(self, id=None, repository_id=None, numeric_revision=None, changeset_revision=None, metadata=None, tool_versions=None, malicious=False, downloadable=False, missing_test_components=None, tools_functionally_correct=False, test_install_error=False, has_repository_dependencies=False, includes_datatypes=False, includes_tools=False, includes_tool_dependencies=False, includes_workflows=False): self.id = id self.repository_id = repository_id self.numeric_revision = numeric_revision self.changeset_revision = changeset_revision self.metadata = metadata self.tool_versions = tool_versions self.malicious = malicious self.downloadable = downloadable self.missing_test_components = missing_test_components self.has_repository_dependencies = has_repository_dependencies # We don't consider the special case has_repository_dependencies_only_if_compiling_contained_td here. self.includes_datatypes = includes_datatypes self.includes_tools = includes_tools self.includes_tool_dependencies = includes_tool_dependencies self.includes_workflows = includes_workflows @property def includes_tools_for_display_in_tool_panel(self): if self.metadata: tool_dicts = self.metadata.get('tools', []) for tool_dict in tool_dicts: if tool_dict.get('add_to_tool_panel', True): return True return False @property def repository_dependencies(self): if self.has_repository_dependencies: return [repository_dependency for repository_dependency in self.metadata['repository_dependencies']['repository_dependencies']] return []
[docs]class RepositoryReview(Dictifiable): dict_collection_visible_keys = ['id', 'repository_id', 'changeset_revision', 'user_id', 'rating', 'deleted'] dict_element_visible_keys = ['id', 'repository_id', 'changeset_revision', 'user_id', 'rating', 'deleted'] approved_states = Bunch(NO='no', YES='yes') def __init__(self, repository_id=None, changeset_revision=None, user_id=None, rating=None, deleted=False): self.repository_id = repository_id self.changeset_revision = changeset_revision self.user_id = user_id self.rating = rating self.deleted = deleted
[docs]class ComponentReview(Dictifiable): dict_collection_visible_keys = ['id', 'repository_review_id', 'component_id', 'private', 'approved', 'rating', 'deleted'] dict_element_visible_keys = ['id', 'repository_review_id', 'component_id', 'private', 'approved', 'rating', 'deleted'] approved_states = Bunch(NO='no', YES='yes', NA='not_applicable') def __init__(self, repository_review_id=None, component_id=None, comment=None, private=False, approved=False, rating=None, deleted=False): self.repository_review_id = repository_review_id self.component_id = component_id self.comment = comment self.private = private self.approved = approved self.rating = rating self.deleted = deleted
[docs]class Component(object): def __init__(self, name=None, description=None): self.name = name self.description = description
[docs]class ItemRatingAssociation(object):
[docs] def __init__(self, id=None, user=None, item=None, rating=0, comment=''): self.id = id self.user = user self.item = item self.rating = rating self.comment = comment
[docs] def set_item(self, item): """ Set association's item. """ pass
[docs]class RepositoryRatingAssociation(ItemRatingAssociation):
[docs] def set_item(self, repository): self.repository = repository
[docs]class Category(Dictifiable): dict_collection_visible_keys = ['id', 'name', 'description', 'deleted'] dict_element_visible_keys = ['id', 'name', 'description', 'deleted'] def __init__(self, name=None, description=None, deleted=False): self.name = name self.description = description self.deleted = deleted
[docs]class RepositoryCategoryAssociation(object): def __init__(self, repository=None, category=None): self.repository = repository self.category = category
[docs]class Tag(object): def __init__(self, id=None, type=None, parent_id=None, name=None): self.id = id self.type = type self.parent_id = parent_id self.name = name def __str__(self): return "Tag(id=%s, type=%i, parent_id=%s, name=%s)" % (self.id, self.type, self.parent_id, self.name)
[docs]class ItemTagAssociation(object):
[docs] def __init__(self, id=None, user=None, item_id=None, tag_id=None, user_tname=None, value=None): self.id = id self.user = user self.item_id = item_id self.tag_id = tag_id self.user_tname = user_tname self.value = None self.user_value = None
[docs]class PostJobAction(object):
[docs] def __init__(self, action_type, workflow_step, output_name=None, action_arguments=None): self.action_type = action_type self.output_name = output_name self.action_arguments = action_arguments self.workflow_step = workflow_step
[docs]class StoredWorkflowAnnotationAssociation(object): pass
[docs]class WorkflowStepAnnotationAssociation(object): pass
[docs]class Workflow(object):
[docs] def __init__(self): self.user = None self.name = None self.has_cycles = None self.has_errors = None self.steps = []
[docs]class WorkflowStep(object):
[docs] def __init__(self): self.id = None self.type = None self.name = None self.tool_id = None self.tool_inputs = None self.tool_errors = None self.position = None self.inputs = [] self.config = None self.label = None
[docs] def get_or_add_input(self, input_name): for step_input in self.inputs: if step_input.name == input_name: return step_input step_input = WorkflowStepInput() step_input.workflow_step = self step_input.name = input_name self.inputs.append(step_input) return step_input
@property def input_connections(self): connections = [_ for step_input in self.inputs for _ in step_input.connections] return connections
[docs]class WorkflowStepInput(object):
[docs] def __init__(self): self.id = None self.name = None self.connections = []
[docs]class WorkflowStepConnection(object):
[docs] def __init__(self): self.output_step = None self.output_name = None self.input_step = None self.input_name = None
# Utility methods
[docs]def sort_by_attr(seq, attr): """ Sort the sequence of objects by object's attribute Arguments: seq - the list or any sequence (including immutable one) of objects to sort. attr - the name of attribute to sort by """ # Use the "Schwartzian transform" # Create the auxiliary list of tuples where every i-th tuple has form # (seq[i].attr, i, seq[i]) and sort it. The second item of tuple is needed not # only to provide stable sorting, but mainly to eliminate comparison of objects # (which can be expensive or prohibited) in case of equal attribute values. intermed = [(getattr(v, attr), i, v) for i, v in enumerate(seq)] intermed.sort() return [_[-1] for _ in intermed]