Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.remote_files

import hashlib
import logging
from operator import itemgetter
from typing import (
    Optional,
    Set,
    Tuple,
)

from galaxy import exceptions
from galaxy.files import (
    ConfiguredFileSources,
    FileSourcePath,
    ProvidesFileSourcesUserContext,
)
from galaxy.files.sources import (
    FilesSourceOptions,
    PluginKind,
)
from galaxy.managers.context import ProvidesUserContext
from galaxy.schema.remote_files import (
    AnyRemoteFilesListResponse,
    CreatedEntryResponse,
    CreateEntryPayload,
    RemoteFilesDisableMode,
    RemoteFilesFormat,
    RemoteFilesTarget,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import (
    jstree,
    smart_str,
)

log = logging.getLogger(__name__)


[docs]class RemoteFilesManager: """ Interface/service object for interacting with remote files. """
[docs] def __init__(self, app: MinimalManagerApp): self._app = app
[docs] def index( self, user_ctx: ProvidesUserContext, target: str, format: Optional[RemoteFilesFormat], recursive: Optional[bool], disable: Optional[RemoteFilesDisableMode], writeable: Optional[bool] = False, limit: Optional[int] = None, offset: Optional[int] = None, query: Optional[str] = None, sort_by: Optional[str] = None, ) -> Tuple[AnyRemoteFilesListResponse, int]: """Returns a list of remote files and directories available to the user and the total count of them.""" user_file_source_context = ProvidesFileSourcesUserContext(user_ctx) default_recursive = False default_format = RemoteFilesFormat.uri if "://" in target: uri = target elif target == RemoteFilesTarget.userdir: uri = "gxuserimport://" default_format = RemoteFilesFormat.flat default_recursive = True elif target == RemoteFilesTarget.importdir: uri = "gximport://" default_format = RemoteFilesFormat.flat default_recursive = True elif target in [RemoteFilesTarget.ftpdir, "ftp"]: # legacy, allow both uri = "gxftp://" default_format = RemoteFilesFormat.flat default_recursive = True else: raise exceptions.RequestParameterInvalidException(f"Invalid target parameter supplied [{target}]") if format is None: format = default_format if recursive is None: recursive = default_recursive self._file_sources.validate_uri_root(uri, user_context=user_file_source_context) file_source_path = self._file_sources.get_file_source_path(uri) file_source = file_source_path.file_source opts = FilesSourceOptions() opts.writeable = writeable or False try: index, count = file_source.list( file_source_path.path, recursive=recursive, user_context=user_file_source_context, opts=opts, limit=limit, offset=offset, query=query, sort_by=sort_by, ) except exceptions.MessageException: log.warning(self._get_error_message(file_source_path), exc_info=True) raise except Exception: message = self._get_error_message(file_source_path) log.warning(message, exc_info=True) raise exceptions.InternalServerError(message) if format == RemoteFilesFormat.flat: # rip out directories, ensure sorted by path index = [i for i in index if i["class"] == "File"] index = sorted(index, key=itemgetter("path")) elif format == RemoteFilesFormat.jstree: if disable is None: disable = RemoteFilesDisableMode.folders jstree_paths = [] for ent in index: path = ent["path"] path_hash = hashlib.sha1(smart_str(path)).hexdigest() if ent["class"] == "Directory": path_type = "folder" disabled = True if disable == RemoteFilesDisableMode.folders else False else: path_type = "file" disabled = True if disable == RemoteFilesDisableMode.files else False jstree_paths.append( jstree.Path( path, path_hash, {"type": path_type, "state": {"disabled": disabled}, "li_attr": {"full_path": path}}, ) ) userdir_jstree = jstree.JSTree(jstree_paths) index = userdir_jstree.jsonData() return index, count
def _get_error_message(self, file_source_path: FileSourcePath) -> str: return f"Problem listing file source path {file_source_path.file_source.get_uri_root()}{file_source_path.path}"
[docs] def get_files_source_plugins( self, user_context: ProvidesUserContext, browsable_only: Optional[bool] = True, include_kind: Optional[Set[PluginKind]] = None, exclude_kind: Optional[Set[PluginKind]] = None, ): """Display plugin information for each of the gxfiles:// URI targets available.""" user_file_source_context = ProvidesFileSourcesUserContext(user_context) browsable_only = True if browsable_only is None else browsable_only plugins_dict = self._file_sources.plugins_to_dict( user_context=user_file_source_context, browsable_only=browsable_only, include_kind=include_kind, exclude_kind=exclude_kind, ) return plugins_dict
@property def _file_sources(self) -> ConfiguredFileSources: return self._app.file_sources
[docs] def create_entry(self, user_ctx: ProvidesUserContext, entry_data: CreateEntryPayload) -> CreatedEntryResponse: """Create an entry (directory or record) in a remote files location.""" target = entry_data.target user_file_source_context = ProvidesFileSourcesUserContext(user_ctx) self._file_sources.validate_uri_root(target, user_context=user_file_source_context) file_source_path = self._file_sources.get_file_source_path(target) file_source = file_source_path.file_source try: result = file_source.create_entry(entry_data.dict(), user_context=user_file_source_context) except exceptions.MessageException: log.warning(f"Problem creating entry {entry_data.name} in file source {entry_data.target}", exc_info=True) raise except Exception: message = f"Problem creating entry {entry_data.name} in file source {entry_data.target}" log.warning(message, exc_info=True) raise exceptions.InternalServerError(message) return CreatedEntryResponse( name=result["name"], uri=result["uri"], external_link=result.get("external_link", None), )