Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.posix

import functools
import os
import shutil
from typing import Any, Dict, List

from galaxy import exceptions
from galaxy.util.path import (
    safe_contains,
    safe_path,
    safe_walk,
)
from ..sources import BaseFilesSource

DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
DEFAULT_ALLOW_SUBDIR_CREATION = True


[docs]class PosixFilesSource(BaseFilesSource): plugin_type = 'posix' # If this were a PyFilesystem2FilesSource all that would be needed would be, # but we couldn't enforce security our way I suspect. # def _open_fs(self): # from fs.osfs import OSFS # handle = OSFS(**self._props) # return handle
[docs] def __init__(self, **kwd): props = self._parse_common_config_opts(kwd) self.root = props["root"] self.enforce_symlink_security = props.get("enforce_symlink_security", DEFAULT_ENFORCE_SYMLINK_SECURITY) self.delete_on_realize = props.get("delete_on_realize", DEFAULT_DELETE_ON_REALIZE) self.allow_subdir_creation = props.get("allow_subdir_creation", DEFAULT_ALLOW_SUBDIR_CREATION)
def _list(self, path="/", recursive=True, user_context=None): dir_path = self._to_native_path(path, user_context=user_context) if not self._safe_directory(dir_path): raise exceptions.ObjectNotFound(f'The specified directory does not exist [{dir_path}].') if recursive: res: List[Dict[str, Any]] = [] effective_root = self._effective_root(user_context) for (p, dirs, files) in safe_walk(dir_path, allowlist=self._allowlist): rel_dir = os.path.relpath(p, effective_root) to_dict = functools.partial(self._resource_info_to_dict, rel_dir, user_context=user_context) res.extend(map(to_dict, dirs)) res.extend(map(to_dict, files)) return res else: res = os.listdir(dir_path) to_dict = functools.partial(self._resource_info_to_dict, path, user_context=user_context) return list(map(to_dict, res)) def _realize_to(self, source_path, native_path, user_context=None): effective_root = self._effective_root(user_context) source_native_path = self._to_native_path(source_path, user_context=user_context) if self.enforce_symlink_security: if not safe_contains(effective_root, source_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: source_native_path = os.path.normpath(source_native_path) assert source_native_path.startswith(os.path.normpath(effective_root)) if not self.delete_on_realize: shutil.copyfile(source_native_path, native_path) else: shutil.move(source_native_path, native_path) def _write_from(self, target_path, native_path, user_context=None): effective_root = self._effective_root(user_context) target_native_path = self._to_native_path(target_path, user_context=user_context) if self.enforce_symlink_security: if not safe_contains(effective_root, target_native_path, allowlist=self._allowlist): raise Exception("Operation not allowed.") else: target_native_path = os.path.normpath(target_native_path) assert target_native_path.startswith(os.path.normpath(effective_root)) target_native_path_parent = os.path.dirname(target_native_path) if not os.path.exists(target_native_path_parent): if self.allow_subdir_creation: os.makedirs(target_native_path_parent) else: raise Exception("Parent directory does not exist.") shutil.copyfile(native_path, target_native_path) def _to_native_path(self, source_path, user_context=None): source_path = os.path.normpath(source_path) if source_path.startswith("/"): source_path = source_path[1:] return os.path.join(self._effective_root(user_context), source_path) def _effective_root(self, user_context=None): return self._evaluate_prop(self.root, user_context=user_context) def _resource_info_to_dict(self, dir, name, user_context=None): rel_path = os.path.normpath(os.path.join(dir, name)) full_path = self._to_native_path(rel_path, user_context=user_context) uri = self.uri_from_path(rel_path) if os.path.isdir(full_path): return {"class": "Directory", "name": name, "uri": uri, "path": rel_path} else: statinfo = os.lstat(full_path) return { "class": "File", "name": name, "size": statinfo.st_size, "ctime": self.to_dict_time(statinfo.st_ctime), "uri": uri, "path": rel_path, } def _safe_directory(self, directory): if self.enforce_symlink_security: if not safe_path(directory, allowlist=self._allowlist): raise exceptions.ConfigDoesNotAllowException(f'directory ({directory}) is a symlink to a location not on the allowlist') if not os.path.exists(directory): return False return True def _serialization_props(self, user_context=None): return { # abspath needed because will be used by external Python from # a job working directory "root": os.path.abspath(self._effective_root(user_context)), "enforce_symlink_security": self.enforce_symlink_security, "delete_on_realize": self.delete_on_realize, "allow_subdir_creation": self.allow_subdir_creation, } @property def _allowlist(self): return self._file_sources_config.symlink_allowlist
__all__ = ('PosixFilesSource',)