Source code for galaxy.files.sources.posix

import functools
import os
import shutil
from typing import (
    Any,
    Optional,
    Union,
)

from galaxy import exceptions
from galaxy.files.models import (
    AnyRemoteEntry,
    BaseFileSourceConfiguration,
    BaseFileSourceTemplateConfiguration,
    FilesSourceRuntimeContext,
    RemoteDirectory,
    RemoteFile,
)
from galaxy.util.config_templates import TemplateExpansion
from galaxy.util.path import (
    safe_contains,
    safe_path,
    safe_walk,
)
from . import BaseFilesSource

DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
DEFAULT_ALLOW_SUBDIR_CREATION = True
DEFAULT_PREFER_LINKS = False


class PosixTemplateConfiguration(BaseFileSourceTemplateConfiguration):
    """Posix template configuration with templating support."""

    root: Union[str, TemplateExpansion, None] = None
    # These are not using TemplateExpansion because they are not user-configurable.
    enforce_symlink_security: bool = DEFAULT_ENFORCE_SYMLINK_SECURITY
    delete_on_realize: bool = DEFAULT_DELETE_ON_REALIZE
    allow_subdir_creation: bool = DEFAULT_ALLOW_SUBDIR_CREATION
    prefer_links: bool = DEFAULT_PREFER_LINKS


class PosixConfiguration(BaseFileSourceConfiguration):
    """Posix resolved configuration with proper types."""

    root: Optional[str] = None
    enforce_symlink_security: bool = DEFAULT_ENFORCE_SYMLINK_SECURITY
    delete_on_realize: bool = DEFAULT_DELETE_ON_REALIZE
    allow_subdir_creation: bool = DEFAULT_ALLOW_SUBDIR_CREATION
    prefer_links: bool = DEFAULT_PREFER_LINKS


[docs] class PosixFilesSource(BaseFilesSource[PosixTemplateConfiguration, PosixConfiguration]): plugin_type = "posix" template_config_class = PosixTemplateConfiguration resolved_config_class = PosixConfiguration # If this were a PyFilesystem2FilesSource it would be much simpler, # but we couldn't enforce security our way I suspect.
[docs] def __init__(self, template_config: PosixTemplateConfiguration): super().__init__(template_config) if not self.template_config.root: self.template_config.writable = False
@property def root(self) -> Optional[str]: """Return the root directory for backward compatibility.""" return self.template_config.root def _list( self, context: FilesSourceRuntimeContext[PosixConfiguration], path="/", recursive=False, write_intent: bool = False, limit: Optional[int] = None, offset: Optional[int] = None, query: Optional[str] = None, sort_by: Optional[str] = None, ) -> tuple[list[AnyRemoteEntry], int]: if not context.config.root: raise exceptions.ItemAccessibilityException("Listing files at file:// URLs has been disabled.") dir_path = self._to_native_path(path, context.config) if not self._safe_directory(dir_path, context.config): raise exceptions.ObjectNotFound(f"The specified directory does not exist [{dir_path}].") if recursive: res: list[AnyRemoteEntry] = [] effective_root = self._effective_root(context.config) for p, dirs, files in safe_walk(dir_path, allowlist=self._allowlist): rel_dir = os.path.relpath(p, effective_root) to_dict = functools.partial(self._resource_info_to_dict, rel_dir, config=context.config) res.extend(map(to_dict, dirs)) res.extend(map(to_dict, files)) return res, len(res) else: entry_names = os.listdir(dir_path) to_dict = functools.partial(self._resource_info_to_dict, path, config=context.config) return list(map(to_dict, entry_names)), len(entry_names) def _realize_to(self, source_path: str, native_path: str, context: FilesSourceRuntimeContext[PosixConfiguration]): if not context.config.root and not context.user_data.is_admin: raise exceptions.ItemAccessibilityException("Writing to file:// URLs has been disabled.") effective_root = self._effective_root(context.config) source_native_path = self._to_native_path(source_path, context.config) if context.config.enforce_symlink_security: if not safe_contains(effective_root, source_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: source_native_path = os.path.normpath(source_native_path) assert source_native_path.startswith(os.path.normpath(effective_root)) if not context.config.delete_on_realize: shutil.copyfile(source_native_path, native_path) else: shutil.move(source_native_path, native_path) def _write_from(self, target_path: str, native_path: str, context: FilesSourceRuntimeContext[PosixConfiguration]): effective_root = self._effective_root(context.config) target_native_path = self._to_native_path(target_path, context.config) if context.config.enforce_symlink_security: if not safe_contains(effective_root, target_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: target_native_path = os.path.normpath(target_native_path) assert target_native_path.startswith(os.path.normpath(effective_root)) target_native_path_parent, target_native_path_name = os.path.split(target_native_path) if not os.path.exists(target_native_path_parent): if context.config.allow_subdir_creation: os.makedirs(target_native_path_parent) else: raise Exception("Parent directory does not exist.") # Use a temporary name while writing so anything that consumes written files can detect when they've completed, # and identify interrupted writes target_native_path_part = os.path.join(target_native_path_parent, f"_{target_native_path_name}.part") shutil.copyfile(native_path, target_native_path_part) os.rename(target_native_path_part, target_native_path) def _to_native_path(self, source_path: str, config: PosixConfiguration): source_path = os.path.normpath(source_path) if source_path.startswith("/"): source_path = source_path[1:] return os.path.join(self._effective_root(config), source_path) def _effective_root(self, config: PosixConfiguration) -> str: return config.root or "/" def _resource_info_to_dict(self, dir: str, name: str, config: PosixConfiguration) -> AnyRemoteEntry: rel_path = os.path.normpath(os.path.join(dir, name)) full_path = self._to_native_path(rel_path, config) uri = self.uri_from_path(rel_path) if os.path.isdir(full_path): return RemoteDirectory(name=name, uri=uri, path=rel_path) else: file_stat_info = os.lstat(full_path) return RemoteFile( name=name, size=file_stat_info.st_size, ctime=self.to_dict_time(file_stat_info.st_ctime), uri=uri, path=rel_path, ) def _safe_directory(self, directory: str, config: PosixConfiguration) -> bool: if config.enforce_symlink_security: if not safe_path(directory, allowlist=self._allowlist): raise exceptions.ConfigDoesNotAllowException( f"directory ({directory}) is a symlink to a location not on the allowlist" ) if not os.path.exists(directory): return False return True def _serialize_config(self, config: PosixConfiguration) -> dict[str, Any]: # abspath needed because will be used by external Python from # a job working directory abs_root = os.path.abspath(self._effective_root(config)) serialized_config = super()._serialize_config(config) serialized_config.update({"root": abs_root}) return serialized_config @property def _allowlist(self): return self._file_sources_config.symlink_allowlist
[docs] def score_url_match(self, url: str): # We need to use template_config here because this is called before the template is expanded. root = self.template_config.root # For security, we need to ensure that a partial match doesn't work. e.g. file://{root}something/myfiles if root and (url.startswith(f"{self.get_uri_root()}://{root}/") or url == f"self.get_uri_root()://{root}"): return len(f"self.get_uri_root()://{root}") elif root and (url.startswith(f"file://{root}/") or url == f"file://{root}"): return len(f"file://{root}") elif not root and url.startswith("file://"): return len("file://") else: return super().score_url_match(url)
[docs] def to_relative_path(self, url: str) -> str: # We need to use template_config.root here because this is called before the template is expanded. root = self.template_config.root if url.startswith(f"file://{root}"): return url[len(f"file://{root}") :] elif url.startswith("file://"): return url[7:] else: return super().to_relative_path(url)
__all__ = ("PosixFilesSource",)