Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.posix
import functools
import os
import shutil
from typing import (
Any,
Optional,
Union,
)
from galaxy import exceptions
from galaxy.files.models import (
AnyRemoteEntry,
BaseFileSourceConfiguration,
BaseFileSourceTemplateConfiguration,
FilesSourceRuntimeContext,
RemoteDirectory,
RemoteFile,
)
from galaxy.util.config_templates import TemplateExpansion
from galaxy.util.path import (
safe_contains,
safe_path,
safe_walk,
)
from . import BaseFilesSource
DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
DEFAULT_ALLOW_SUBDIR_CREATION = True
DEFAULT_PREFER_LINKS = False
class PosixTemplateConfiguration(BaseFileSourceTemplateConfiguration):
"""Posix template configuration with templating support."""
root: Union[str, TemplateExpansion, None] = None
# These are not using TemplateExpansion because they are not user-configurable.
enforce_symlink_security: bool = DEFAULT_ENFORCE_SYMLINK_SECURITY
delete_on_realize: bool = DEFAULT_DELETE_ON_REALIZE
allow_subdir_creation: bool = DEFAULT_ALLOW_SUBDIR_CREATION
prefer_links: bool = DEFAULT_PREFER_LINKS
class PosixConfiguration(BaseFileSourceConfiguration):
"""Posix resolved configuration with proper types."""
root: Optional[str] = None
enforce_symlink_security: bool = DEFAULT_ENFORCE_SYMLINK_SECURITY
delete_on_realize: bool = DEFAULT_DELETE_ON_REALIZE
allow_subdir_creation: bool = DEFAULT_ALLOW_SUBDIR_CREATION
prefer_links: bool = DEFAULT_PREFER_LINKS
[docs]
class PosixFilesSource(BaseFilesSource[PosixTemplateConfiguration, PosixConfiguration]):
plugin_type = "posix"
template_config_class = PosixTemplateConfiguration
resolved_config_class = PosixConfiguration
# If this were a PyFilesystem2FilesSource it would be much simpler,
# but we couldn't enforce security our way I suspect.
[docs]
def __init__(self, template_config: PosixTemplateConfiguration):
super().__init__(template_config)
if not self.template_config.root:
self.template_config.writable = False
@property
def root(self) -> Optional[str]:
"""Return the root directory for backward compatibility."""
return self.template_config.root
def _list(
self,
context: FilesSourceRuntimeContext[PosixConfiguration],
path="/",
recursive=False,
write_intent: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Optional[str] = None,
sort_by: Optional[str] = None,
) -> tuple[list[AnyRemoteEntry], int]:
if not context.config.root:
raise exceptions.ItemAccessibilityException("Listing files at file:// URLs has been disabled.")
dir_path = self._to_native_path(path, context.config)
if not self._safe_directory(dir_path, context.config):
raise exceptions.ObjectNotFound(f"The specified directory does not exist [{dir_path}].")
if recursive:
res: list[AnyRemoteEntry] = []
effective_root = self._effective_root(context.config)
for p, dirs, files in safe_walk(dir_path, allowlist=self._allowlist):
rel_dir = os.path.relpath(p, effective_root)
to_dict = functools.partial(self._resource_info_to_dict, rel_dir, config=context.config)
res.extend(map(to_dict, dirs))
res.extend(map(to_dict, files))
return res, len(res)
else:
entry_names = os.listdir(dir_path)
to_dict = functools.partial(self._resource_info_to_dict, path, config=context.config)
return list(map(to_dict, entry_names)), len(entry_names)
def _realize_to(self, source_path: str, native_path: str, context: FilesSourceRuntimeContext[PosixConfiguration]):
if not context.config.root and not context.user_data.is_admin:
raise exceptions.ItemAccessibilityException("Writing to file:// URLs has been disabled.")
effective_root = self._effective_root(context.config)
source_native_path = self._to_native_path(source_path, context.config)
if context.config.enforce_symlink_security:
if not safe_contains(effective_root, source_native_path, allowlist=self._allowlist):
raise Exception("Operation not allowed.")
else:
source_native_path = os.path.normpath(source_native_path)
assert source_native_path.startswith(os.path.normpath(effective_root))
if not context.config.delete_on_realize:
shutil.copyfile(source_native_path, native_path)
else:
shutil.move(source_native_path, native_path)
def _write_from(self, target_path: str, native_path: str, context: FilesSourceRuntimeContext[PosixConfiguration]):
effective_root = self._effective_root(context.config)
target_native_path = self._to_native_path(target_path, context.config)
if context.config.enforce_symlink_security:
if not safe_contains(effective_root, target_native_path, allowlist=self._allowlist):
raise Exception("Operation not allowed.")
else:
target_native_path = os.path.normpath(target_native_path)
assert target_native_path.startswith(os.path.normpath(effective_root))
target_native_path_parent, target_native_path_name = os.path.split(target_native_path)
if not os.path.exists(target_native_path_parent):
if context.config.allow_subdir_creation:
os.makedirs(target_native_path_parent)
else:
raise Exception("Parent directory does not exist.")
# Use a temporary name while writing so anything that consumes written files can detect when they've completed,
# and identify interrupted writes
target_native_path_part = os.path.join(target_native_path_parent, f"_{target_native_path_name}.part")
shutil.copyfile(native_path, target_native_path_part)
os.rename(target_native_path_part, target_native_path)
def _to_native_path(self, source_path: str, config: PosixConfiguration):
source_path = os.path.normpath(source_path)
if source_path.startswith("/"):
source_path = source_path[1:]
return os.path.join(self._effective_root(config), source_path)
def _effective_root(self, config: PosixConfiguration) -> str:
return config.root or "/"
def _resource_info_to_dict(self, dir: str, name: str, config: PosixConfiguration) -> AnyRemoteEntry:
rel_path = os.path.normpath(os.path.join(dir, name))
full_path = self._to_native_path(rel_path, config)
uri = self.uri_from_path(rel_path)
if os.path.isdir(full_path):
return RemoteDirectory(name=name, uri=uri, path=rel_path)
else:
file_stat_info = os.lstat(full_path)
return RemoteFile(
name=name,
size=file_stat_info.st_size,
ctime=self.to_dict_time(file_stat_info.st_ctime),
uri=uri,
path=rel_path,
)
def _safe_directory(self, directory: str, config: PosixConfiguration) -> bool:
if config.enforce_symlink_security:
if not safe_path(directory, allowlist=self._allowlist):
raise exceptions.ConfigDoesNotAllowException(
f"directory ({directory}) is a symlink to a location not on the allowlist"
)
if not os.path.exists(directory):
return False
return True
def _serialize_config(self, config: PosixConfiguration) -> dict[str, Any]:
# abspath needed because will be used by external Python from
# a job working directory
abs_root = os.path.abspath(self._effective_root(config))
serialized_config = super()._serialize_config(config)
serialized_config.update({"root": abs_root})
return serialized_config
@property
def _allowlist(self):
return self._file_sources_config.symlink_allowlist
[docs]
def score_url_match(self, url: str):
# We need to use template_config here because this is called before the template is expanded.
root = self.template_config.root
# For security, we need to ensure that a partial match doesn't work. e.g. file://{root}something/myfiles
if root and (url.startswith(f"{self.get_uri_root()}://{root}/") or url == f"self.get_uri_root()://{root}"):
return len(f"self.get_uri_root()://{root}")
elif root and (url.startswith(f"file://{root}/") or url == f"file://{root}"):
return len(f"file://{root}")
elif not root and url.startswith("file://"):
return len("file://")
else:
return super().score_url_match(url)
[docs]
def to_relative_path(self, url: str) -> str:
# We need to use template_config.root here because this is called before the template is expanded.
root = self.template_config.root
if url.startswith(f"file://{root}"):
return url[len(f"file://{root}") :]
elif url.startswith("file://"):
return url[7:]
else:
return super().to_relative_path(url)
__all__ = ("PosixFilesSource",)