Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.ftp
import urllib.parse
try:
from fs.ftpfs import FTPFS
except ImportError:
FTPFS = None # type: ignore[misc,assignment]
from typing import (
cast,
Optional,
Tuple,
)
from . import (
FilesSourceOptions,
FilesSourceProperties,
)
from ._pyfilesystem2 import PyFilesystem2FilesSource
class FTPFilesSourceProperties(FilesSourceProperties, total=False):
host: str
port: int
user: str
passwd: str
[docs]class FtpFilesSource(PyFilesystem2FilesSource):
plugin_type = "ftp"
required_module = FTPFS
required_package = "fs.ftpfs"
def _open_fs(self, user_context=None, opts: Optional[FilesSourceOptions] = None):
props = self._serialization_props(user_context)
extra_props: FTPFilesSourceProperties = cast(FTPFilesSourceProperties, opts.extra_props or {} if opts else {})
handle = FTPFS(**{**props, **extra_props})
return handle
def _realize_to(
self, source_path: str, native_path: str, user_context=None, opts: Optional[FilesSourceOptions] = None
):
extra_props: FTPFilesSourceProperties
if opts and opts.extra_props:
extra_props = cast(FTPFilesSourceProperties, opts.extra_props)
else:
opts = FilesSourceOptions()
extra_props = {}
path, opts.extra_props = self._get_props_and_rel_path(extra_props, source_path)
super()._realize_to(path, native_path, user_context=user_context, opts=opts)
def _write_from(
self, target_path: str, native_path: str, user_context=None, opts: Optional[FilesSourceOptions] = None
):
extra_props: FTPFilesSourceProperties
if opts and opts.extra_props:
extra_props = cast(FTPFilesSourceProperties, opts.extra_props)
else:
opts = FilesSourceOptions()
extra_props = {}
path, opts.extra_props = self._get_props_and_rel_path(extra_props, target_path)
super()._write_from(path, native_path, user_context=user_context, opts=opts)
def _get_props_and_rel_path(
self, extra_props: FTPFilesSourceProperties, url: str
) -> Tuple[str, FTPFilesSourceProperties]:
host = self._props.get("host")
port = self._props.get("port")
user = self._props.get("user")
passwd = self._props.get("passwd")
rel_path = url
if url.startswith(f"ftp://{host or ''}"):
props = self._extract_url_props(url)
extra_props["host"] = host or props["host"]
extra_props["port"] = port or props["port"]
extra_props["user"] = user or props["user"]
extra_props["passwd"] = passwd or props["passwd"]
rel_path = props["path"] or url
return rel_path, extra_props
def _extract_url_props(self, url: str):
result = urllib.parse.urlparse(url)
return {
"host": result.hostname,
"port": result.port or 21,
"user": result.username,
"passwd": result.password,
"path": result.path,
}
[docs] def score_url_match(self, url: str):
host = self._props.get("host")
port = self._props.get("port")
if host and port and url.startswith(f"ftp://{host}:{port}"):
return len(f"ftp://{host}:{port}")
# For security, we need to ensure that a partial match doesn't work e.g. ftp://{host}something/myfiles
elif host and (url.startswith(f"ftp://{host}/") or url == f"ftp://{host}"):
return len(f"ftp://{host}")
elif not host and url.startswith("ftp://"):
return len("ftp://")
else:
return super().score_url_match(url)
__all__ = ("FtpFilesSource",)