Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files

import logging
import os
from collections import defaultdict
from typing import (
    Any,
    Dict,
    List,
    NamedTuple,
    Optional,
    Set,
)

from galaxy import exceptions
from galaxy.files.sources import (
    BaseFilesSource,
    FilesSourceProperties,
    PluginKind,
)
from galaxy.util import plugin_config
from galaxy.util.config_parsers import parse_allowlist_ips
from galaxy.util.dictifiable import Dictifiable

log = logging.getLogger(__name__)


[docs]class FileSourcePath(NamedTuple): file_source: BaseFilesSource path: str
[docs]class FileSourceScore(NamedTuple): file_source: BaseFilesSource score: int
[docs]class NoMatchingFileSource(Exception): pass
[docs]class ConfiguredFileSources: """Load plugins and resolve Galaxy URIs to FileSource objects.""" _file_sources: List[BaseFilesSource]
[docs] def __init__( self, file_sources_config: "ConfiguredFileSourcesConfig", conf_file=None, conf_dict=None, load_stock_plugins=False, ): self._file_sources_config = file_sources_config self._plugin_classes = self._file_source_plugins_dict() file_sources: List[BaseFilesSource] = [] if conf_file is not None: file_sources = self._load_plugins_from_file(conf_file) elif conf_dict is not None: plugin_source = plugin_config.plugin_source_from_dict(conf_dict) file_sources = self._parse_plugin_source(plugin_source) else: file_sources = [] custom_sources_configured = len(file_sources) > 0 if load_stock_plugins: stock_file_source_conf_dict = [] def _ensure_loaded(plugin_type): for file_source in file_sources: if file_source.plugin_type == plugin_type: return stock_file_source_conf_dict.append({"type": plugin_type}) _ensure_loaded("http") _ensure_loaded("base64") _ensure_loaded("drs") if file_sources_config.ftp_upload_dir is not None: _ensure_loaded("gxftp") if file_sources_config.library_import_dir is not None: _ensure_loaded("gximport") if file_sources_config.user_library_import_dir is not None: _ensure_loaded("gxuserimport") if stock_file_source_conf_dict: stock_plugin_source = plugin_config.plugin_source_from_dict(stock_file_source_conf_dict) # insert at beginning instead of append so FTP and library import appear # at the top of the list (presumably the most common options). Admins can insert # these explicitly for greater control. file_sources = self._parse_plugin_source(stock_plugin_source) + file_sources self._file_sources = file_sources self.custom_sources_configured = custom_sources_configured
def _load_plugins_from_file(self, conf_file): plugin_source = plugin_config.plugin_source_from_path(conf_file) return self._parse_plugin_source(plugin_source) def _file_source_plugins_dict(self): import galaxy.files.sources return plugin_config.plugins_dict(galaxy.files.sources, "plugin_type") def _parse_plugin_source(self, plugin_source): extra_kwds = { "file_sources_config": self._file_sources_config, } return plugin_config.load_plugins( self._plugin_classes, plugin_source, extra_kwds, dict_to_list_key="id", )
[docs] def find_best_match(self, url: str) -> Optional[BaseFilesSource]: """Returns the best matching file source for handling a particular url. Each filesource scores its own ability to match a particular url, and the highest scorer with a score > 0 is selected.""" scores = [FileSourceScore(file_source, file_source.score_url_match(url)) for file_source in self._file_sources] scores.sort(key=lambda f: f.score, reverse=True) return next((fsscore.file_source for fsscore in scores if fsscore.score > 0), None)
[docs] def get_file_source_path(self, uri): """Parse uri into a FileSource object and a path relative to its base.""" if "://" not in uri: raise exceptions.RequestParameterInvalidException(f"Invalid uri [{uri}]") file_source = self.find_best_match(uri) if not file_source: raise exceptions.RequestParameterInvalidException(f"Could not find handler for URI [{uri}]") path = file_source.to_relative_path(uri) return FileSourcePath(file_source, path)
[docs] def validate_uri_root(self, uri: str, user_context: "ProvidesUserFileSourcesUserContext"): # validate a URI against Galaxy's configuration, environment, and the current # user. Throw appropriate exception if there is a problem with the files source # referenced by the URI. if uri.startswith("gxuserimport://"): user_login = user_context.email user_base_dir = self._file_sources_config.user_library_import_dir if user_base_dir is None: raise exceptions.ConfigDoesNotAllowException( "The configuration of this Galaxy instance does not allow upload from user directories." ) full_import_dir = os.path.join(user_base_dir, user_login) if not os.path.exists(full_import_dir): raise exceptions.ObjectNotFound("Your user import directory does not exist.") elif uri.startswith("gximport://"): base_dir = self._file_sources_config.library_import_dir if base_dir is None: raise exceptions.ConfigDoesNotAllowException( "The configuration of this Galaxy instance does not allow usage of import directory." ) elif uri.startswith("gxftp://"): user_ftp_base_dir = self._file_sources_config.ftp_upload_dir if user_ftp_base_dir is None: raise exceptions.ConfigDoesNotAllowException( "The configuration of this Galaxy instance does not allow upload from FTP directories." ) user_ftp_dir = user_context.ftp_dir if not user_ftp_dir or not os.path.exists(user_ftp_dir): raise exceptions.ObjectNotFound( "Your FTP directory does not exist, attempting to upload files to it may cause it to be created." )
[docs] def looks_like_uri(self, path_or_uri): # is this string a URI this object understands how to realize file_source = self.find_best_match(path_or_uri) if file_source: return True else: return False
[docs] def plugins_to_dict( self, for_serialization: bool = False, user_context: Optional["FileSourceDictifiable"] = None, browsable_only: Optional[bool] = False, include_kind: Optional[Set[PluginKind]] = None, exclude_kind: Optional[Set[PluginKind]] = None, ) -> List[FilesSourceProperties]: rval: List[FilesSourceProperties] = [] for file_source in self._file_sources: if not file_source.user_has_access(user_context): continue if include_kind and file_source.plugin_kind not in include_kind: continue if exclude_kind and file_source.plugin_kind in exclude_kind: continue if browsable_only and not file_source.get_browsable(): continue el = file_source.to_dict(for_serialization=for_serialization, user_context=user_context) rval.append(el) return rval
[docs] def to_dict( self, for_serialization: bool = False, user_context: Optional["FileSourceDictifiable"] = None ) -> Dict[str, Any]: return { "file_sources": self.plugins_to_dict(for_serialization=for_serialization, user_context=user_context), "config": self._file_sources_config.to_dict(), }
[docs] @staticmethod def from_app_config(config): config_file = config.file_sources_config_file config_dict = None if not config_file or not os.path.exists(config_file): config_file = None config_dict = config.file_sources file_sources_config = ConfiguredFileSourcesConfig.from_app_config(config) return ConfiguredFileSources( file_sources_config, conf_file=config_file, conf_dict=config_dict, load_stock_plugins=True )
[docs] @staticmethod def from_dict(as_dict, load_stock_plugins=False): if as_dict is not None: sources_as_dict = as_dict["file_sources"] config_as_dict = as_dict["config"] file_sources_config = ConfiguredFileSourcesConfig.from_dict(config_as_dict) else: sources_as_dict = [] file_sources_config = ConfiguredFileSourcesConfig() return ConfiguredFileSources( file_sources_config, conf_dict=sources_as_dict, load_stock_plugins=load_stock_plugins )
[docs]class NullConfiguredFileSources(ConfiguredFileSources):
[docs] def __init__( self, ): super().__init__(ConfiguredFileSourcesConfig())
[docs]class ConfiguredFileSourcesConfig:
[docs] def __init__( self, symlink_allowlist=None, fetch_url_allowlist=None, library_import_dir=None, user_library_import_dir=None, ftp_upload_dir=None, ftp_upload_purge=True, ): symlink_allowlist = symlink_allowlist or [] fetch_url_allowlist = fetch_url_allowlist or [] self.symlink_allowlist = symlink_allowlist self.fetch_url_allowlist = fetch_url_allowlist self.library_import_dir = library_import_dir self.user_library_import_dir = user_library_import_dir self.ftp_upload_dir = ftp_upload_dir self.ftp_upload_purge = ftp_upload_purge
[docs] @staticmethod def from_app_config(config): # Formalize what we read in from config to create a more clear interface # for this component. kwds = {} kwds["symlink_allowlist"] = config.user_library_import_symlink_allowlist kwds["fetch_url_allowlist"] = [str(ip) for ip in config.fetch_url_allowlist_ips] kwds["library_import_dir"] = config.library_import_dir kwds["user_library_import_dir"] = config.user_library_import_dir kwds["ftp_upload_dir"] = config.ftp_upload_dir kwds["ftp_upload_purge"] = config.ftp_upload_purge return ConfiguredFileSourcesConfig(**kwds)
[docs] def to_dict(self): return { "symlink_allowlist": self.symlink_allowlist, "fetch_url_allowlist": self.fetch_url_allowlist, "library_import_dir": self.library_import_dir, "user_library_import_dir": self.user_library_import_dir, "ftp_upload_dir": self.ftp_upload_dir, "ftp_upload_purge": self.ftp_upload_purge, }
[docs] @staticmethod def from_dict(as_dict): return ConfiguredFileSourcesConfig( symlink_allowlist=as_dict["symlink_allowlist"], fetch_url_allowlist=parse_allowlist_ips(as_dict["fetch_url_allowlist"]), library_import_dir=as_dict["library_import_dir"], user_library_import_dir=as_dict["user_library_import_dir"], ftp_upload_dir=as_dict["ftp_upload_dir"], ftp_upload_purge=as_dict["ftp_upload_purge"], )
[docs]class FileSourceDictifiable(Dictifiable): dict_collection_visible_keys = ("email", "username", "ftp_dir", "preferences", "is_admin")
[docs] def to_dict(self, view="collection", value_mapper=None): rval = super().to_dict(view=view, value_mapper=value_mapper) rval["role_names"] = list(self.role_names) rval["group_names"] = list(self.group_names) return rval
@property def role_names(self) -> Set[str]: raise NotImplementedError @property def group_names(self) -> Set[str]: raise NotImplementedError @property def file_sources(self): """Return other filesources available in the system, for chained filesource resolution""" raise NotImplementedError
[docs]class ProvidesUserFileSourcesUserContext(FileSourceDictifiable): """Implement a FileSourcesUserContext from a Galaxy ProvidesUserContext (e.g. trans)."""
[docs] def __init__(self, trans, **kwargs): self.trans = trans
@property def email(self): user = self.trans.user return user and user.email @property def username(self): user = self.trans.user return user and user.username @property def ftp_dir(self): return self.trans.user_ftp_dir @property def preferences(self): user = self.trans.user return user and user.extra_preferences or defaultdict(lambda: None) @property def role_names(self) -> Set[str]: """The set of role names of this user.""" user = self.trans.user return {ura.role.name for ura in user.roles} if user else set() @property def group_names(self) -> Set[str]: """The set of group names to which this user belongs.""" user = self.trans.user return {ugr.group.name for ugr in user.groups} if user else set() @property def is_admin(self): """Whether this user is an administrator.""" return self.trans.user_is_admin @property def user_vault(self): """User vault namespace""" user_vault = self.trans.user_vault return user_vault or defaultdict(lambda: None) @property def app_vault(self): """App vault namespace""" vault = self.trans.app.vault return vault or defaultdict(lambda: None) @property def file_sources(self): return self.trans.app.file_sources
[docs]class DictFileSourcesUserContext(FileSourceDictifiable):
[docs] def __init__(self, **kwd): self._kwd = kwd
@property def email(self): return self._kwd.get("email") @property def username(self): return self._kwd.get("username") @property def ftp_dir(self): return self._kwd.get("user_ftp_dir") @property def preferences(self): return self._kwd.get("preferences") @property def role_names(self): return set(self._kwd.get("role_names", [])) @property def group_names(self): return set(self._kwd.get("group_names", [])) @property def is_admin(self): return self._kwd.get("is_admin") @property def user_vault(self): return self._kwd.get("user_vault") @property def app_vault(self): return self._kwd.get("app_vault") @property def file_sources(self): return self._kwd.get("file_sources")