This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.dataproviders.external

Data providers that iterate over a source that is not in memory
or not in a file.

import gzip
import logging
import subprocess
import tempfile
from urllib.parse import (
from urllib.request import urlopen

from galaxy.util import DEFAULT_SOCKET_TIMEOUT
from . import (

_TODO = """
YAGNI: ftp, image, cryptos, sockets
job queue
admin: admin server log rgx/stats, ps aux

log = logging.getLogger(__name__)

# ----------------------------------------------------------------------------- server subprocess / external prog
[docs]class SubprocessDataProvider(base.DataProvider): """ Data provider that uses the output from an intermediate program and subprocess as its data source. """ # TODO: need better ways of checking returncode, stderr for errors and raising
[docs] def __init__(self, *args, **kwargs): """ :param args: the list of strings used to build commands. :type args: variadic function args """ self.exit_code = None command_list = args self.popen = self.subprocess(*command_list, **kwargs) # TODO:?? not communicate()? super().__init__(self.popen.stdout) self.exit_code = self.popen.poll()
# NOTE: there's little protection here v. sending a ';' and a dangerous command here # but...we're all adults here, right? ...RIGHT?!
[docs] def subprocess(self, *command_list, **kwargs): """ :param args: the list of strings used as commands. :type args: variadic function args """ try: # how expensive is this? popen = subprocess.Popen(command_list, stderr=subprocess.PIPE, stdout=subprocess.PIPE) log.info(f"opened subrocess ({str(command_list)}), PID: {str(popen.pid)}") except OSError as os_err: command_str = " ".join(self.command) raise OSError(" ".join((str(os_err), ":", command_str))) return popen
def __exit__(self, *args): # poll the subrocess for an exit code self.exit_code = self.popen.poll() log.info(f"{str(self)}.__exit__, exit_code: {str(self.exit_code)}") return super().__exit__(*args) def __str__(self): # provide the pid and current return code source_str = "" if hasattr(self, "popen"): source_str = f"{str(self.popen.pid)}:{str(self.popen.poll())}" return f"{self.__class__.__name__}({str(source_str)})"
[docs]class RegexSubprocessDataProvider(line.RegexLineDataProvider): """ RegexLineDataProvider that uses a SubprocessDataProvider as its data source. """ # this is a conv. class and not really all that necc...
[docs] def __init__(self, *args, **kwargs): # using subprocess as proxy data source in filtered line prov. subproc_provider = SubprocessDataProvider(*args) super().__init__(subproc_provider, **kwargs)
# ----------------------------------------------------------------------------- other apis
[docs]class URLDataProvider(base.DataProvider): """ Data provider that uses the contents of a URL for its data source. This can be piped through other providers (column, map, genome region, etc.). """ VALID_METHODS = ("GET", "POST")
[docs] def __init__(self, url, method="GET", data=None, **kwargs): """ :param url: the base URL to open. :param method: the HTTP method to use. Optional: defaults to 'GET' :param data: any data to pass (either in query for 'GET' or as post data with 'POST') :type data: dict """ self.url = url self.method = method self.data = data or {} encoded_data = urlencode(self.data) scheme = urlparse(url).scheme assert scheme in ("http", "https", "ftp"), f"Invalid URL scheme: {scheme}" if method == "GET": self.url += f"?{encoded_data}" opened = urlopen(url, timeout=DEFAULT_SOCKET_TIMEOUT) elif method == "POST": opened = urlopen(url, encoded_data, timeout=DEFAULT_SOCKET_TIMEOUT) else: raise ValueError(f"Not a valid method: {method}") super().__init__(opened, **kwargs)
# NOTE: the request object is now accessible as self.source def __enter__(self): pass def __exit__(self, *args): self.source.close()
# ----------------------------------------------------------------------------- generic compression
[docs]class GzipDataProvider(base.DataProvider): """ Data provider that uses g(un)zip on a file as its source. This can be piped through other providers (column, map, genome region, etc.). """
[docs] def __init__(self, source, **kwargs): unzipped = gzip.GzipFile(source, "rb") super().__init__(unzipped, **kwargs)
# NOTE: the GzipFile is now accessible in self.source # ----------------------------------------------------------------------------- intermediate tempfile
[docs]class TempfileDataProvider(base.DataProvider): """ Writes the data from the given source to a temp file, allowing it to be used as a source where a file_name is needed (e.g. as a parameter to a command line tool: samtools view -t <this_provider.source.get_file_name()>) """
[docs] def __init__(self, source, **kwargs): # TODO: raise NotImplementedError()
# write the file here # self.create_file # super(TempfileDataProvider, self).__init__(self.tmp_file, **kwargs)
[docs] def create_file(self): self.tmp_file = tempfile.NamedTemporaryFile() return self.tmp_file
[docs] def write_to_file(self): parent_gen = super().__iter__() with open(self.tmp_file, "w") as open_file: for datum in parent_gen: open_file.write(f"{datum}\n")