Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.uris

import ipaddress
import logging
import os
import socket
import tempfile
from typing import (
    List,
    Optional,
)
from urllib.parse import urlparse

from galaxy.exceptions import (
    AdminRequiredException,
    ConfigDoesNotAllowException,
)
from galaxy.files import (
    ConfiguredFileSources,
    NoMatchingFileSource,
)
from galaxy.files.sources import FilesSourceOptions
from galaxy.util import (
    stream_to_open_named_file,
    unicodify,
)
from galaxy.util.config_parsers import IpAllowedListEntryT

log = logging.getLogger(__name__)


[docs]def stream_url_to_str( path: str, file_sources: Optional["ConfiguredFileSources"] = None, prefix: str = "gx_file_stream" ) -> str: tmp_file = stream_url_to_file(path, file_sources=file_sources, prefix=prefix) try: with open(tmp_file) as f: return f.read() finally: os.remove(tmp_file)
[docs]def stream_url_to_file( url: str, file_sources: Optional["ConfiguredFileSources"] = None, prefix: str = "gx_file_stream", dir: Optional[str] = None, user_context=None, target_path: Optional[str] = None, file_source_opts: Optional[FilesSourceOptions] = None, ) -> str: if file_sources is None: file_sources = ConfiguredFileSources.from_dict(None, load_stock_plugins=True) file_source, rel_path = file_sources.get_file_source_path(url) if file_source: if not target_path: with tempfile.NamedTemporaryFile(prefix=prefix, delete=False, dir=dir) as temp: target_path = temp.name file_source.realize_to(rel_path, target_path, user_context=user_context, opts=file_source_opts) return target_path else: raise NoMatchingFileSource(f"Could not find a matching handler for: {url}")
[docs]def stream_to_file(stream, suffix="", prefix="", dir=None, text=False, **kwd): """Writes a stream to a temporary file, returns the temporary file's name""" fd, temp_name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir, text=text) return stream_to_open_named_file(stream, fd, temp_name, **kwd)
[docs]def validate_uri_access(uri: str, is_admin: bool, ip_allowlist: List[IpAllowedListEntryT]) -> None: """Perform uniform checks on supplied URIs. - Prevent access to local IPs not found in ip_allowlist. - Don't allow non-admins to access file:// URIs. """ validate_non_local(uri, ip_allowlist) if not is_admin and uri.lstrip().startswith("file://"): raise AdminRequiredException()
[docs]def validate_non_local(uri: str, ip_allowlist: List[IpAllowedListEntryT]) -> str: # If it doesn't look like a URL, ignore it. if not (uri.lstrip().startswith("http://") or uri.lstrip().startswith("https://")): return uri # Strip leading whitespace before passing url to urlparse() url = uri.lstrip() # Extract hostname component parsed_url = urlparse(url).netloc # If credentials are in this URL, we need to strip those. if parsed_url.count("@") > 0: # credentials. parsed_url = parsed_url[parsed_url.rindex("@") + 1 :] # Percent encoded colons and other characters will not be resolved as such # so we don't have to either. # Sometimes the netloc will contain the port which is not desired, so we # need to extract that. port = None # However, it could ALSO be an IPv6 address they've supplied. if ":" in parsed_url: # IPv6 addresses have colons in them already (it seems like always more than two) if parsed_url.count(":") >= 2: # Since IPv6 already use colons extensively, they wrap it in # brackets when there is a port, e.g. http://[2001:db8:1f70::999:de8:7648:6e8]:100/ # However if it ends with a ']' then there is no port after it and # they've wrapped it in brackets just for fun. if "]" in parsed_url and not parsed_url.endswith("]"): # If this +1 throws a range error, we don't care, their url # shouldn't end with a colon. idx = parsed_url.rindex(":") # We parse as an int and let this fail ungracefully if parsing # fails because we desire to fail closed rather than open. port = int(parsed_url[idx + 1 :]) parsed_url = parsed_url[:idx] else: # Plain ipv6 without port pass else: # This should finally be ipv4 with port. It cannot be IPv6 as that # was caught by earlier cases, and it cannot be due to credentials. idx = parsed_url.rindex(":") port = int(parsed_url[idx + 1 :]) parsed_url = parsed_url[:idx] # safe to log out, no credentials/request path, just an IP + port log.debug("parsed url %s, port: %s", parsed_url, port) # Call getaddrinfo to resolve hostname into tuples containing IPs. addrinfo = socket.getaddrinfo(parsed_url, port) # Get the IP addresses that this entry resolves to (uniquely) # We drop: # AF_* family: It will resolve to AF_INET or AF_INET6, getaddrinfo(3) doesn't even mention AF_UNIX, # socktype: We don't care if a stream/dgram/raw protocol # protocol: we don't care if it is tcp or udp. addrinfo_results = {info[4][0] for info in addrinfo} # There may be multiple (e.g. IPv4 + IPv6 or DNS round robin). Any one of these # could resolve to a local addresses (and could be returned by chance), # therefore we must check them all. for raw_ip in addrinfo_results: # Convert to an IP object so we can tell if it is in private space. ip = ipaddress.ip_address(unicodify(raw_ip)) # If this is a private address if ip.is_private: results = [] # If this IP is not anywhere in the allowlist for allowlisted in ip_allowlist: # If it's an IP address range (rather than a single one...) if isinstance(allowlisted, (ipaddress.IPv4Network, ipaddress.IPv6Network)): results.append(ip in allowlisted) else: results.append(ip == allowlisted) if any(results): # If we had any True, then THIS (and ONLY THIS) IP address that # that specific DNS entry resolved to is in allowlisted and # safe to access. But we cannot exit here, we must ensure that # all IPs that that DNS entry resolves to are likewise safe. pass else: # Otherwise, we deny access. raise ConfigDoesNotAllowException("Access to this address in not permitted by server configuration") return url