Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.remote_files
import hashlib
import logging
from operator import itemgetter
from typing import (
Optional,
Set,
)
from galaxy import exceptions
from galaxy.files import (
ConfiguredFileSources,
ProvidesUserFileSourcesUserContext,
)
from galaxy.files.sources import (
FilesSourceOptions,
PluginKind,
)
from galaxy.managers.context import ProvidesUserContext
from galaxy.schema.remote_files import (
AnyRemoteFilesListResponse,
CreatedEntryResponse,
CreateEntryPayload,
RemoteFilesDisableMode,
RemoteFilesFormat,
RemoteFilesTarget,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import (
jstree,
smart_str,
)
log = logging.getLogger(__name__)
[docs]class RemoteFilesManager:
"""
Interface/service object for interacting with remote files.
"""
[docs] def index(
self,
user_ctx: ProvidesUserContext,
target: str,
format: Optional[RemoteFilesFormat],
recursive: Optional[bool],
disable: Optional[RemoteFilesDisableMode],
writeable: Optional[bool] = False,
) -> AnyRemoteFilesListResponse:
"""Returns a list of remote files available to the user."""
user_file_source_context = ProvidesUserFileSourcesUserContext(user_ctx)
default_recursive = False
default_format = RemoteFilesFormat.uri
if "://" in target:
uri = target
elif target == RemoteFilesTarget.userdir:
uri = "gxuserimport://"
default_format = RemoteFilesFormat.flat
default_recursive = True
elif target == RemoteFilesTarget.importdir:
uri = "gximport://"
default_format = RemoteFilesFormat.flat
default_recursive = True
elif target in [RemoteFilesTarget.ftpdir, "ftp"]: # legacy, allow both
uri = "gxftp://"
default_format = RemoteFilesFormat.flat
default_recursive = True
else:
raise exceptions.RequestParameterInvalidException(f"Invalid target parameter supplied [{target}]")
if format is None:
format = default_format
if recursive is None:
recursive = default_recursive
self._file_sources.validate_uri_root(uri, user_context=user_file_source_context)
file_source_path = self._file_sources.get_file_source_path(uri)
file_source = file_source_path.file_source
opts = FilesSourceOptions()
opts.writeable = writeable or False
try:
index = file_source.list(
file_source_path.path,
recursive=recursive,
user_context=user_file_source_context,
opts=opts,
)
except exceptions.MessageException:
log.warning(f"Problem listing file source path {file_source_path}", exc_info=True)
raise
except Exception:
message = f"Problem listing file source path {file_source_path}"
log.warning(message, exc_info=True)
raise exceptions.InternalServerError(message)
if format == RemoteFilesFormat.flat:
# rip out directories, ensure sorted by path
index = [i for i in index if i["class"] == "File"]
index = sorted(index, key=itemgetter("path"))
elif format == RemoteFilesFormat.jstree:
if disable is None:
disable = RemoteFilesDisableMode.folders
jstree_paths = []
for ent in index:
path = ent["path"]
path_hash = hashlib.sha1(smart_str(path)).hexdigest()
if ent["class"] == "Directory":
path_type = "folder"
disabled = True if disable == RemoteFilesDisableMode.folders else False
else:
path_type = "file"
disabled = True if disable == RemoteFilesDisableMode.files else False
jstree_paths.append(
jstree.Path(
path,
path_hash,
{"type": path_type, "state": {"disabled": disabled}, "li_attr": {"full_path": path}},
)
)
userdir_jstree = jstree.JSTree(jstree_paths)
index = userdir_jstree.jsonData()
return index
[docs] def get_files_source_plugins(
self,
user_context: ProvidesUserContext,
browsable_only: Optional[bool] = True,
include_kind: Optional[Set[PluginKind]] = None,
exclude_kind: Optional[Set[PluginKind]] = None,
):
"""Display plugin information for each of the gxfiles:// URI targets available."""
user_file_source_context = ProvidesUserFileSourcesUserContext(user_context)
browsable_only = True if browsable_only is None else browsable_only
plugins_dict = self._file_sources.plugins_to_dict(
user_context=user_file_source_context,
browsable_only=browsable_only,
include_kind=include_kind,
exclude_kind=exclude_kind,
)
return plugins_dict
@property
def _file_sources(self) -> ConfiguredFileSources:
return self._app.file_sources
[docs] def create_entry(self, user_ctx: ProvidesUserContext, entry_data: CreateEntryPayload) -> CreatedEntryResponse:
"""Create an entry (directory or record) in a remote files location."""
target = entry_data.target
user_file_source_context = ProvidesUserFileSourcesUserContext(user_ctx)
self._file_sources.validate_uri_root(target, user_context=user_file_source_context)
file_source_path = self._file_sources.get_file_source_path(target)
file_source = file_source_path.file_source
try:
result = file_source.create_entry(entry_data.dict(), user_context=user_file_source_context)
except Exception:
message = f"Problem creating entry {entry_data.name} in file source {entry_data.target}"
log.warning(message, exc_info=True)
raise exceptions.InternalServerError(message)
return CreatedEntryResponse(
name=result["name"],
uri=result["uri"],
external_link=result.get("external_link", None),
)