Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.datatypes.dataproviders.external
"""
Data providers that iterate over a source that is not in memory
or not in a file.
"""
import gzip
import logging
import subprocess
import tempfile
from urllib.parse import (
urlencode,
urlparse,
)
from urllib.request import urlopen
from galaxy.util import DEFAULT_SOCKET_TIMEOUT
from . import (
base,
line,
)
_TODO = """
YAGNI: ftp, image, cryptos, sockets
job queue
admin: admin server log rgx/stats, ps aux
"""
log = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- server subprocess / external prog
[docs]class SubprocessDataProvider(base.DataProvider):
"""
Data provider that uses the output from an intermediate program and
subprocess as its data source.
"""
# TODO: need better ways of checking returncode, stderr for errors and raising
[docs] def __init__(self, *args, **kwargs):
"""
:param args: the list of strings used to build commands.
:type args: variadic function args
"""
self.exit_code = None
command_list = args
self.popen = self.subprocess(*command_list, **kwargs)
# TODO:?? not communicate()?
super().__init__(self.popen.stdout)
self.exit_code = self.popen.poll()
# NOTE: there's little protection here v. sending a ';' and a dangerous command here
# but...we're all adults here, right? ...RIGHT?!
[docs] def subprocess(self, *command_list, **kwargs):
"""
:param args: the list of strings used as commands.
:type args: variadic function args
"""
try:
# how expensive is this?
popen = subprocess.Popen(command_list, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
log.info(f"opened subrocess ({str(command_list)}), PID: {str(popen.pid)}")
except OSError as os_err:
command_str = " ".join(self.command)
raise OSError(" ".join((str(os_err), ":", command_str)))
return popen
def __exit__(self, *args):
# poll the subrocess for an exit code
self.exit_code = self.popen.poll()
log.info(f"{str(self)}.__exit__, exit_code: {str(self.exit_code)}")
return super().__exit__(*args)
def __str__(self):
# provide the pid and current return code
source_str = ""
if hasattr(self, "popen"):
source_str = f"{str(self.popen.pid)}:{str(self.popen.poll())}"
return f"{self.__class__.__name__}({str(source_str)})"
[docs]class RegexSubprocessDataProvider(line.RegexLineDataProvider):
"""
RegexLineDataProvider that uses a SubprocessDataProvider as its data source.
"""
# this is a conv. class and not really all that necc...
[docs] def __init__(self, *args, **kwargs):
# using subprocess as proxy data source in filtered line prov.
subproc_provider = SubprocessDataProvider(*args)
super().__init__(subproc_provider, **kwargs)
# ----------------------------------------------------------------------------- other apis
[docs]class URLDataProvider(base.DataProvider):
"""
Data provider that uses the contents of a URL for its data source.
This can be piped through other providers (column, map, genome region, etc.).
"""
VALID_METHODS = ("GET", "POST")
[docs] def __init__(self, url, method="GET", data=None, **kwargs):
"""
:param url: the base URL to open.
:param method: the HTTP method to use.
Optional: defaults to 'GET'
:param data: any data to pass (either in query for 'GET'
or as post data with 'POST')
:type data: dict
"""
self.url = url
self.method = method
self.data = data or {}
encoded_data = urlencode(self.data)
scheme = urlparse(url).scheme
assert scheme in ("http", "https", "ftp"), f"Invalid URL scheme: {scheme}"
if method == "GET":
self.url += f"?{encoded_data}"
opened = urlopen(url, timeout=DEFAULT_SOCKET_TIMEOUT)
elif method == "POST":
opened = urlopen(url, encoded_data, timeout=DEFAULT_SOCKET_TIMEOUT)
else:
raise ValueError(f"Not a valid method: {method}")
super().__init__(opened, **kwargs)
# NOTE: the request object is now accessible as self.source
def __enter__(self):
pass
def __exit__(self, *args):
self.source.close()
# ----------------------------------------------------------------------------- generic compression
[docs]class GzipDataProvider(base.DataProvider):
"""
Data provider that uses g(un)zip on a file as its source.
This can be piped through other providers (column, map, genome region, etc.).
"""
[docs] def __init__(self, source, **kwargs):
unzipped = gzip.GzipFile(source, "rb")
super().__init__(unzipped, **kwargs)
# NOTE: the GzipFile is now accessible in self.source
# ----------------------------------------------------------------------------- intermediate tempfile
[docs]class TempfileDataProvider(base.DataProvider):
"""
Writes the data from the given source to a temp file, allowing
it to be used as a source where a file_name is needed (e.g. as a parameter
to a command line tool: samtools view -t <this_provider.source.get_file_name()>)
"""
# write the file here
# self.create_file
# super(TempfileDataProvider, self).__init__(self.tmp_file, **kwargs)
[docs] def write_to_file(self):
parent_gen = super().__iter__()
with open(self.tmp_file, "w") as open_file:
for datum in parent_gen:
open_file.write(f"{datum}\n")