Source code for galaxy.files.sources.posix

import functools
import os
import shutil
from typing import (
    List,
    Optional,
    Tuple,
)

from typing_extensions import Unpack

from galaxy import exceptions
from galaxy.files import OptionalUserContext
from galaxy.util.path import (
    safe_contains,
    safe_path,
    safe_walk,
)
from . import (
    AnyRemoteEntry,
    BaseFilesSource,
    FilesSourceOptions,
    FilesSourceProperties,
)

DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
DEFAULT_ALLOW_SUBDIR_CREATION = True


class PosixFilesSourceProperties(FilesSourceProperties, total=False):
    root: str
    enforce_symlink_security: bool
    delete_on_realize: bool
    allow_subdir_creation: bool


[docs]class PosixFilesSource(BaseFilesSource): plugin_type = "posix" # If this were a PyFilesystem2FilesSource all that would be needed would be, # but we couldn't enforce security our way I suspect. # def _open_fs(self): # from fs.osfs import OSFS # handle = OSFS(**self._props) # return handle
[docs] def __init__(self, **kwd: Unpack[PosixFilesSourceProperties]): props = self._parse_common_config_opts(kwd) self.root = props.get("root") if not self.root: self.writable = False self.enforce_symlink_security = props.get("enforce_symlink_security", DEFAULT_ENFORCE_SYMLINK_SECURITY) self.delete_on_realize = props.get("delete_on_realize", DEFAULT_DELETE_ON_REALIZE) self.allow_subdir_creation = props.get("allow_subdir_creation", DEFAULT_ALLOW_SUBDIR_CREATION)
def _list( self, path="/", recursive=True, user_context: OptionalUserContext = None, opts: Optional[FilesSourceOptions] = None, limit: Optional[int] = None, offset: Optional[int] = None, query: Optional[str] = None, sort_by: Optional[str] = None, ) -> Tuple[List[AnyRemoteEntry], int]: if not self.root: raise exceptions.ItemAccessibilityException("Listing files at file:// URLs has been disabled.") dir_path = self._to_native_path(path, user_context=user_context) if not self._safe_directory(dir_path): raise exceptions.ObjectNotFound(f"The specified directory does not exist [{dir_path}].") if recursive: res: List[AnyRemoteEntry] = [] effective_root = self._effective_root(user_context) for p, dirs, files in safe_walk(dir_path, allowlist=self._allowlist): rel_dir = os.path.relpath(p, effective_root) to_dict = functools.partial(self._resource_info_to_dict, rel_dir, user_context=user_context) res.extend(map(to_dict, dirs)) res.extend(map(to_dict, files)) return res, len(res) else: res = os.listdir(dir_path) to_dict = functools.partial(self._resource_info_to_dict, path, user_context=user_context) return list(map(to_dict, res)), len(res) def _realize_to( self, source_path: str, native_path: str, user_context: OptionalUserContext = None, opts: Optional[FilesSourceOptions] = None, ): if not self.root and (not user_context or not user_context.is_admin): raise exceptions.ItemAccessibilityException("Writing to file:// URLs has been disabled.") effective_root = self._effective_root(user_context) source_native_path = self._to_native_path(source_path, user_context=user_context) if self.enforce_symlink_security: if not safe_contains(effective_root, source_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: source_native_path = os.path.normpath(source_native_path) assert source_native_path.startswith(os.path.normpath(effective_root)) if not self.delete_on_realize: shutil.copyfile(source_native_path, native_path) else: shutil.move(source_native_path, native_path) def _write_from( self, target_path: str, native_path: str, user_context: OptionalUserContext = None, opts: Optional[FilesSourceOptions] = None, ): effective_root = self._effective_root(user_context) target_native_path = self._to_native_path(target_path, user_context=user_context) if self.enforce_symlink_security: if not safe_contains(effective_root, target_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: target_native_path = os.path.normpath(target_native_path) assert target_native_path.startswith(os.path.normpath(effective_root)) target_native_path_parent, target_native_path_name = os.path.split(target_native_path) if not os.path.exists(target_native_path_parent): if self.allow_subdir_creation: os.makedirs(target_native_path_parent) else: raise Exception("Parent directory does not exist.") # Use a temporary name while writing so anything that consumes written files can detect when they've completed, # and identify interrupted writes target_native_path_part = os.path.join(target_native_path_parent, f"_{target_native_path_name}.part") shutil.copyfile(native_path, target_native_path_part) os.rename(target_native_path_part, target_native_path) def _to_native_path(self, source_path: str, user_context: OptionalUserContext = None): source_path = os.path.normpath(source_path) if source_path.startswith("/"): source_path = source_path[1:] return os.path.join(self._effective_root(user_context), source_path) def _effective_root(self, user_context: OptionalUserContext = None): return self._evaluate_prop(self.root or "/", user_context=user_context) def _resource_info_to_dict(self, dir: str, name: str, user_context: OptionalUserContext = None) -> AnyRemoteEntry: rel_path = os.path.normpath(os.path.join(dir, name)) full_path = self._to_native_path(rel_path, user_context=user_context) uri = self.uri_from_path(rel_path) if os.path.isdir(full_path): return {"class": "Directory", "name": name, "uri": uri, "path": rel_path} else: statinfo = os.lstat(full_path) return { "class": "File", "name": name, "size": statinfo.st_size, "ctime": self.to_dict_time(statinfo.st_ctime), "uri": uri, "path": rel_path, } def _safe_directory(self, directory): if self.enforce_symlink_security: if not safe_path(directory, allowlist=self._allowlist): raise exceptions.ConfigDoesNotAllowException( f"directory ({directory}) is a symlink to a location not on the allowlist" ) if not os.path.exists(directory): return False return True def _serialization_props(self, user_context: OptionalUserContext = None) -> PosixFilesSourceProperties: return { # abspath needed because will be used by external Python from # a job working directory "root": os.path.abspath(self._effective_root(user_context)), "enforce_symlink_security": self.enforce_symlink_security, "delete_on_realize": self.delete_on_realize, "allow_subdir_creation": self.allow_subdir_creation, } @property def _allowlist(self): return self._file_sources_config.symlink_allowlist
[docs] def score_url_match(self, url: str): # For security, we need to ensure that a partial match doesn't work. e.g. file://{root}something/myfiles if self.root and ( url.startswith(f"{self.get_uri_root()}://{self.root}/") or url == f"self.get_uri_root()://{self.root}" ): return len(f"self.get_uri_root()://{self.root}") elif self.root and (url.startswith(f"file://{self.root}/") or url == f"file://{self.root}"): return len(f"file://{self.root}") elif not self.root and url.startswith("file://"): return len("file://") else: return super().score_url_match(url)
[docs] def to_relative_path(self, url: str) -> str: if url.startswith(f"file://{self.root}"): return url[len(f"file://{self.root}") :] elif url.startswith("file://"): return url[7:] else: return super().to_relative_path(url)
__all__ = ("PosixFilesSource",)