Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.ssh

from io import StringIO
from typing import (
    Optional,
    TYPE_CHECKING,
    Union,
)

try:
    from fsspec.implementations.sftp import SFTPFileSystem
    from paramiko.ecdsakey import ECDSAKey
    from paramiko.ed25519key import Ed25519Key
    from paramiko.rsakey import RSAKey
except ImportError:
    SFTPFileSystem = None
    if TYPE_CHECKING:
        from paramiko.ecdsakey import ECDSAKey
        from paramiko.ed25519key import Ed25519Key
        from paramiko.rsakey import RSAKey

from galaxy.exceptions import AuthenticationFailed
from galaxy.files.models import FilesSourceRuntimeContext
from galaxy.files.sources._fsspec import (
    CacheOptionsDictType,
    FsspecBaseFileSourceConfiguration,
    FsspecBaseFileSourceTemplateConfiguration,
    FsspecFilesSource,
)
from galaxy.util.config_templates import TemplateExpansion


def _parse_private_key(private_key: str, password: Optional[str]):
    # Paramiko cannot autodetect the key type, so try the supported key classes.
    for pkey_class in (RSAKey, ECDSAKey, Ed25519Key):
        try:
            with StringIO(private_key) as pkey_file:
                return pkey_class.from_private_key(pkey_file, password=password)
        except Exception:
            continue

    return None


class SshFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration):
    host: Union[str, TemplateExpansion]
    user: Optional[Union[str, TemplateExpansion]] = None
    passwd: Optional[Union[str, TemplateExpansion]] = None
    pkey: Optional[Union[str, TemplateExpansion]] = None
    timeout: Union[int, TemplateExpansion] = 10
    port: Union[int, TemplateExpansion] = 22
    compress: Union[bool, TemplateExpansion] = False
    path: Union[str, TemplateExpansion]


class SshFileSourceConfiguration(FsspecBaseFileSourceConfiguration):
    host: str
    user: Optional[str] = None
    passwd: Optional[str] = None
    pkey: Optional[str] = None
    timeout: int = 10
    port: int = 22
    compress: bool = False
    path: str


[docs] class SshFilesSource(FsspecFilesSource[SshFileSourceTemplateConfiguration, SshFileSourceConfiguration]): plugin_type = "ssh" required_module = SFTPFileSystem required_package = "fsspec" template_config_class = SshFileSourceTemplateConfiguration resolved_config_class = SshFileSourceConfiguration def _open_fs( self, context: FilesSourceRuntimeContext[SshFileSourceConfiguration], cache_options: CacheOptionsDictType, # Ignored because fsspec's SFTPFileSystem does not support caching options. ): if SFTPFileSystem is None: raise self.required_package_exception config = context.config pkey = None password = config.passwd if config.pkey: pkey = _parse_private_key(config.pkey, config.passwd) if pkey is None: raise AuthenticationFailed("Invalid or unsupported SSH private key provided.") password = None fs = SFTPFileSystem( host=config.host, username=config.user, password=password, pkey=pkey, port=config.port, timeout=config.timeout, compress=config.compress, ) return fs def _to_filesystem_path(self, path: str, config: SshFileSourceConfiguration) -> str: base = config.path.rstrip("/") relative = path.lstrip("/") if not relative: return base or "/" return f"{base}/{relative}" def _adapt_entry_path(self, filesystem_path: str, config: SshFileSourceConfiguration) -> str: base = config.path.rstrip("/") if base and filesystem_path.startswith(base): virtual_path = filesystem_path[len(base) :] if not virtual_path: return "/" if not virtual_path.startswith("/"): virtual_path = f"/{virtual_path}" return virtual_path return filesystem_path
__all__ = ("SshFilesSource",)