Source code for galaxy.files.sources.posix

import functools
import os
import shutil
from typing import (
    Any,
    Dict,
    List,
)

from galaxy import exceptions
from galaxy.util.path import (
    safe_contains,
    safe_path,
    safe_walk,
)
from . import BaseFilesSource

DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
DEFAULT_ALLOW_SUBDIR_CREATION = True


[docs]class PosixFilesSource(BaseFilesSource): plugin_type = "posix" # If this were a PyFilesystem2FilesSource all that would be needed would be, # but we couldn't enforce security our way I suspect. # def _open_fs(self): # from fs.osfs import OSFS # handle = OSFS(**self._props) # return handle
[docs] def __init__(self, **kwd): props = self._parse_common_config_opts(kwd) self.root = props["root"] self.enforce_symlink_security = props.get("enforce_symlink_security", DEFAULT_ENFORCE_SYMLINK_SECURITY) self.delete_on_realize = props.get("delete_on_realize", DEFAULT_DELETE_ON_REALIZE) self.allow_subdir_creation = props.get("allow_subdir_creation", DEFAULT_ALLOW_SUBDIR_CREATION)
def _list(self, path="/", recursive=True, user_context=None): dir_path = self._to_native_path(path, user_context=user_context) if not self._safe_directory(dir_path): raise exceptions.ObjectNotFound(f"The specified directory does not exist [{dir_path}].") if recursive: res: List[Dict[str, Any]] = [] effective_root = self._effective_root(user_context) for p, dirs, files in safe_walk(dir_path, allowlist=self._allowlist): rel_dir = os.path.relpath(p, effective_root) to_dict = functools.partial(self._resource_info_to_dict, rel_dir, user_context=user_context) res.extend(map(to_dict, dirs)) res.extend(map(to_dict, files)) return res else: res = os.listdir(dir_path) to_dict = functools.partial(self._resource_info_to_dict, path, user_context=user_context) return list(map(to_dict, res)) def _realize_to(self, source_path, native_path, user_context=None): effective_root = self._effective_root(user_context) source_native_path = self._to_native_path(source_path, user_context=user_context) if self.enforce_symlink_security: if not safe_contains(effective_root, source_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: source_native_path = os.path.normpath(source_native_path) assert source_native_path.startswith(os.path.normpath(effective_root)) if not self.delete_on_realize: shutil.copyfile(source_native_path, native_path) else: shutil.move(source_native_path, native_path) def _write_from(self, target_path, native_path, user_context=None): effective_root = self._effective_root(user_context) target_native_path = self._to_native_path(target_path, user_context=user_context) if self.enforce_symlink_security: if not safe_contains(effective_root, target_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: target_native_path = os.path.normpath(target_native_path) assert target_native_path.startswith(os.path.normpath(effective_root)) target_native_path_parent = os.path.dirname(target_native_path) if not os.path.exists(target_native_path_parent): if self.allow_subdir_creation: os.makedirs(target_native_path_parent) else: raise Exception("Parent directory does not exist.") shutil.copyfile(native_path, target_native_path) def _to_native_path(self, source_path, user_context=None): source_path = os.path.normpath(source_path) if source_path.startswith("/"): source_path = source_path[1:] return os.path.join(self._effective_root(user_context), source_path) def _effective_root(self, user_context=None): return self._evaluate_prop(self.root, user_context=user_context) def _resource_info_to_dict(self, dir, name, user_context=None): rel_path = os.path.normpath(os.path.join(dir, name)) full_path = self._to_native_path(rel_path, user_context=user_context) uri = self.uri_from_path(rel_path) if os.path.isdir(full_path): return {"class": "Directory", "name": name, "uri": uri, "path": rel_path} else: statinfo = os.lstat(full_path) return { "class": "File", "name": name, "size": statinfo.st_size, "ctime": self.to_dict_time(statinfo.st_ctime), "uri": uri, "path": rel_path, } def _safe_directory(self, directory): if self.enforce_symlink_security: if not safe_path(directory, allowlist=self._allowlist): raise exceptions.ConfigDoesNotAllowException( f"directory ({directory}) is a symlink to a location not on the allowlist" ) if not os.path.exists(directory): return False return True def _serialization_props(self, user_context=None): return { # abspath needed because will be used by external Python from # a job working directory "root": os.path.abspath(self._effective_root(user_context)), "enforce_symlink_security": self.enforce_symlink_security, "delete_on_realize": self.delete_on_realize, "allow_subdir_creation": self.allow_subdir_creation, } @property def _allowlist(self): return self._file_sources_config.symlink_allowlist
__all__ = ("PosixFilesSource",)