Source code for galaxy.files.sources

import abc
import os
import time
from typing import (
    ClassVar,
    Set,
)

from galaxy.exceptions import (
    ConfigurationError,
    ItemAccessibilityException,
)
from galaxy.util.bool_expressions import (
    BooleanExpressionEvaluator,
    TokenContainedEvaluator,
)
from galaxy.util.template import fill_template

DEFAULT_SCHEME = "gxfiles"
DEFAULT_WRITABLE = False


[docs]class FilesSource(metaclass=abc.ABCMeta): """ """
[docs] @abc.abstractmethod def get_uri_root(self) -> str: """Return a prefix for the root (e.g. gxfiles://prefix/)."""
[docs] @abc.abstractmethod def get_scheme(self) -> str: """Return a prefix for the root (e.g. the gxfiles in gxfiles://prefix/path)."""
[docs] @abc.abstractmethod def get_writable(self): """Return a boolean indicating if this target is writable."""
[docs] @abc.abstractmethod def user_has_access(self, user_context) -> bool: """Return a boolean indicating if the user can access the FileSource."""
# TODO: off-by-default
[docs] @abc.abstractmethod def list(self, source_path="/", recursive=False, user_context=None): """Return dictionary of 'Directory's and 'File's."""
[docs] @abc.abstractmethod def realize_to(self, source_path, native_path, user_context=None): """Realize source path (relative to uri root) to local file system path."""
[docs] @abc.abstractmethod def write_from(self, target_path, native_path, user_context=None): """Write file at native path to target_path (relative to uri root)."""
[docs] @abc.abstractmethod def to_dict(self, for_serialization=False, user_context=None): """Return a dictified representation of this FileSource instance. If ``user_context`` is supplied, properties should be written so user context doesn't need to be present after the plugin is re-hydrated. """
[docs]class BaseFilesSource(FilesSource): plugin_type: ClassVar[str]
[docs] def get_prefix(self): return self.id
[docs] def get_scheme(self): return "gxfiles"
[docs] def get_writable(self): return self.writable
[docs] def user_has_access(self, user_context) -> bool: if user_context is None and self.user_context_required: return False return ( user_context is None or user_context.is_admin or (self._user_has_required_roles(user_context) and self._user_has_required_groups(user_context)) )
@property def user_context_required(self) -> bool: return self.requires_roles is not None or self.requires_groups is not None
[docs] def get_uri_root(self): prefix = self.get_prefix() scheme = self.get_scheme() root = f"{scheme}://" if prefix: root = uri_join(root, prefix) return root
[docs] def uri_from_path(self, path): uri_root = self.get_uri_root() return uri_join(uri_root, path)
def _parse_common_config_opts(self, kwd: dict): self._file_sources_config = kwd.pop("file_sources_config") self.id = kwd.pop("id") self.label = kwd.pop("label", None) or self.id self.doc = kwd.pop("doc", None) self.scheme = kwd.pop("scheme", DEFAULT_SCHEME) self.writable = kwd.pop("writable", DEFAULT_WRITABLE) self.requires_roles = kwd.pop("requires_roles", None) self.requires_groups = kwd.pop("requires_groups", None) self._validate_security_rules() # If coming from to_dict, strip API helper values kwd.pop("uri_root", None) kwd.pop("type", None) return kwd
[docs] def to_dict(self, for_serialization=False, user_context=None): rval = { "id": self.id, "type": self.plugin_type, "uri_root": self.get_uri_root(), "label": self.label, "doc": self.doc, "writable": self.writable, "requires_roles": self.requires_roles, "requires_groups": self.requires_groups, } if for_serialization: rval.update(self._serialization_props(user_context=user_context)) return rval
[docs] def to_dict_time(self, ctime): if ctime is None: return None elif isinstance(ctime, (int, float)): return time.strftime("%m/%d/%Y %I:%M:%S %p", time.localtime(ctime)) else: return ctime.strftime("%m/%d/%Y %I:%M:%S %p")
@abc.abstractmethod def _serialization_props(self, user_context=None): """Serialize properties needed to recover plugin configuration. Used in to_dict method if for_serialization is True. """
[docs] def list(self, path="/", recursive=False, user_context=None): self._check_user_access(user_context) return self._list(path, recursive, user_context)
@abc.abstractmethod def _list(self, path="/", recursive=False, user_context=None): pass
[docs] def write_from(self, target_path, native_path, user_context=None): if not self.get_writable(): raise Exception("Cannot write to a non-writable file source.") self._check_user_access(user_context) self._write_from(target_path, native_path, user_context=user_context)
@abc.abstractmethod def _write_from(self, target_path, native_path, user_context=None): pass
[docs] def realize_to(self, source_path, native_path, user_context=None): self._check_user_access(user_context) self._realize_to(source_path, native_path, user_context)
@abc.abstractmethod def _realize_to(self, source_path, native_path, user_context=None): pass def _check_user_access(self, user_context): """Raises an exception if the given user doesn't have the rights to access this file source. Warning: if the user_context is None, then the check is skipped. This is due to tool executions context not having access to the user_context. The validation will be done when checking the tool parameters. """ if user_context is not None and not self.user_has_access(user_context): raise ItemAccessibilityException(f"User {user_context.username} has no access to file source.") def _evaluate_prop(self, prop_val, user_context): rval = prop_val if isinstance(prop_val, str) and "$" in prop_val: template_context = dict( user=user_context, environ=os.environ, config=self._file_sources_config, ) rval = fill_template(prop_val, context=template_context, futurized=True) return rval def _user_has_required_roles(self, user_context) -> bool: if self.requires_roles: return self._evaluate_security_rules(self.requires_roles, user_context.role_names) return True def _user_has_required_groups(self, user_context) -> bool: if self.requires_groups: return self._evaluate_security_rules(self.requires_groups, user_context.group_names) return True def _evaluate_security_rules(self, rule_expression: str, user_credentials: Set[str]) -> bool: token_evaluator = TokenContainedEvaluator(user_credentials) evaluator = BooleanExpressionEvaluator(token_evaluator) return evaluator.evaluate_expression(rule_expression) def _validate_security_rules(self) -> None: """Checks if the security rules defined in the plugin configuration are valid boolean expressions or raises a ConfigurationError exception otherwise.""" def _get_error_msg_for(rule_name: str) -> str: return f"Invalid boolean expression for '{rule_name}' in {self.label} file source plugin configuration." if self.requires_roles and not BooleanExpressionEvaluator.is_valid_expression(self.requires_roles): raise ConfigurationError(_get_error_msg_for("requires_roles")) if self.requires_groups and not BooleanExpressionEvaluator.is_valid_expression(self.requires_groups): raise ConfigurationError(_get_error_msg_for("requires_groups"))
[docs]def uri_join(*args): # url_join doesn't work with non-standard scheme arg0 = args[0] if "://" in arg0: scheme, path = arg0.split("://", 1) rval = f"{scheme}://{slash_join(path, *args[1:]) if path else slash_join(*args[1:])}" else: rval = slash_join(*args) return rval
[docs]def slash_join(*args): # https://codereview.stackexchange.com/questions/175421/joining-strings-to-form-a-url return "/".join(arg.strip("/") for arg in args)