Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.dependencies
"""
Determine what optional dependencies are needed.
"""
import os
import sys
from os.path import (
dirname,
exists,
join,
)
import yaml
from dparse import parse
from galaxy.config import GalaxyAppConfiguration
from galaxy.util import (
asbool,
etree,
parse_xml,
which,
)
from galaxy.util.properties import (
find_config_file,
load_app_properties,
)
[docs]
class ConditionalDependencies:
[docs]
def __init__(self, config_file, config=None):
self.config_file = config_file
self.job_runners = []
self.authenticators = []
self.object_stores = []
self.file_sources = []
self.conditional_reqs = []
self.container_interface_types = []
self.job_rule_modules = []
self.error_report_modules = []
self.vault_type = None
if config is None:
self.config = load_app_properties(config_file=self.config_file)
else:
self.config = config
self.config_object = GalaxyAppConfiguration(config_file=self.config_file, override_tempdir=False, **self.config)
self.parse_configs()
self.get_conditional_requirements()
[docs]
def parse_configs(self):
def load_job_config_dict(job_conf_dict):
runners = job_conf_dict.get("runners", {})
for runner in runners.values():
if "load" in runner:
self.job_runners.append(runner.get("load"))
environments = job_conf_dict.get("execution", {}).get("environments", {})
for env in environments.values():
runner = env.get("runner")
if runner == "dynamic_tpv":
self.job_rule_modules.append("tpv.rules")
elif "rules_module" in env:
self.job_rule_modules.append(env.get("rules_module"))
if "job_config" in self.config:
load_job_config_dict(self.config.get("job_config"))
else:
job_conf_path = self.config_object.job_config_file
if not job_conf_path:
job_conf_path = join(dirname(self.config_file), "job_conf.yml")
if not exists(job_conf_path):
job_conf_path = join(dirname(self.config_file), "job_conf.xml")
else:
job_conf_path = join(dirname(self.config_file), job_conf_path)
if ".xml" in job_conf_path:
try:
job_conf_tree = parse_xml(job_conf_path)
plugins_elem = job_conf_tree.find("plugins")
if plugins_elem:
for plugin in plugins_elem.findall("plugin"):
if "load" in plugin.attrib:
self.job_runners.append(plugin.attrib["load"])
for plugin in job_conf_tree.findall('.//destination/param[@id="rules_module"]'):
self.job_rule_modules.append(plugin.text)
except (OSError, etree.ParseError):
pass
else:
try:
with open(job_conf_path) as f:
job_conf_dict = yaml.safe_load(f)
load_job_config_dict(job_conf_dict)
except OSError:
pass
object_store_conf_path = self.config_object.object_store_config_file
try:
if ".xml" in object_store_conf_path:
for store in parse_xml(object_store_conf_path).iter("object_store"):
if "type" in store.attrib:
self.object_stores.append(store.attrib["type"])
else:
with open(object_store_conf_path) as f:
job_conf_dict = yaml.safe_load(f)
def collect_types(from_dict):
if not isinstance(from_dict, dict):
return
if "type" in from_dict:
self.object_stores.append(from_dict["type"])
for value in from_dict.values():
if isinstance(value, list):
for val in value:
collect_types(val)
else:
collect_types(value)
collect_types(job_conf_dict)
except OSError:
pass
# Parse auth conf
auth_conf_xml = self.config_object.auth_config_file
try:
for auth in parse_xml(auth_conf_xml).findall("authenticator"):
auth_type = auth.find("type")
if auth_type is not None:
self.authenticators.append(auth_type.text)
except OSError:
pass
# Parse oidc_backends_config_file specifically for PKCE support.
self.pkce_support = False
oidc_backend_conf_xml = self.config_object.oidc_backends_config_file
try:
for pkce_support_element in parse_xml(oidc_backend_conf_xml).iterfind("./provider/pkce_support"):
if pkce_support_element.text == "true":
self.pkce_support = True
break
except OSError:
pass
# Parse error report config
error_report_yml = self.config_object.error_report_file
try:
with open(error_report_yml) as f:
error_reporters = yaml.safe_load(f)
self.error_report_modules = [er.get("type", None) for er in error_reporters]
except OSError:
pass
# Parse file sources config
file_sources_conf_yml = self.config_object.file_sources_config_file
if exists(file_sources_conf_yml):
with open(file_sources_conf_yml) as f:
file_sources_conf = yaml.safe_load(f)
else:
file_sources_conf = []
self.file_sources = [c.get("type", None) for c in file_sources_conf]
# Parse vault config
vault_conf_yml = self.config_object.vault_config_file
if exists(vault_conf_yml):
with open(vault_conf_yml) as f:
vault_conf = yaml.safe_load(f)
else:
vault_conf = {}
self.vault_type = vault_conf.get("type", "").lower()
[docs]
def get_conditional_requirements(self):
crfile = join(dirname(__file__), "conditional-requirements.txt")
with open(crfile) as fh:
dependency_file = parse(fh.read(), file_type="requirements.txt")
for dep in dependency_file.dependencies:
self.conditional_reqs.append(dep)
[docs]
def check(self, name):
try:
name = name.replace("-", "_").replace(".", "_")
return getattr(self, f"check_{name}")()
except Exception:
return False
[docs]
def check_psycopg2_binary(self):
return self.config["database_connection"].startswith("postgres")
[docs]
def check_drmaa(self):
return (
"galaxy.jobs.runners.drmaa:DRMAAJobRunner" in self.job_runners
or "galaxy.jobs.runners.slurm:SlurmJobRunner" in self.job_runners
or "galaxy.jobs.runners.univa:UnivaJobRunner" in self.job_runners
)
[docs]
def check_pbs_python(self):
return "galaxy.jobs.runners.pbs:PBSJobRunner" in self.job_runners
[docs]
def check_pykube_ng(self):
return "galaxy.jobs.runners.kubernetes:KubernetesJobRunner" in self.job_runners or which("kubectl")
[docs]
def check_chronos_python(self):
return "galaxy.jobs.runners.chronos:ChronosJobRunner" in self.job_runners
[docs]
def check_boto3_python(self):
return "galaxy.jobs.runners.aws:AWSBatchJobRunner" in self.job_runners
[docs]
def check_python_ldap(self):
return "ldap" in self.authenticators or "activedirectory" in self.authenticators
[docs]
def check_webdavclient3(self):
# fs.webdavfs dependency for which we need an unreleased version
return self.check_fs_webdavfs()
[docs]
def check_fs_anvilfs(self):
# pyfilesystem plugin access to terra on anvil
return "anvil" in self.file_sources
[docs]
def check_watchdog(self):
install_set = {"auto", "True", "true", "polling", True}
return self.config["watch_tools"] in install_set or self.config["watch_tool_data_dir"] in install_set
[docs]
def check_weasyprint(self):
# See notes in ./conditional-requirements.txt for more information.
return os.environ.get("GALAXY_DEPENDENCIES_INSTALL_WEASYPRINT") == "1"
[docs]
def check_pydyf(self):
# See notes in ./conditional-requirements.txt for more information.
return os.environ.get("GALAXY_DEPENDENCIES_INSTALL_WEASYPRINT") == "1"
[docs]
def optional(config_file=None):
if not config_file:
config_file = find_config_file(["galaxy", "universe_wsgi"], include_samples=True)
if not config_file:
print("galaxy.dependencies.optional: no config file found", file=sys.stderr)
return []
rval = []
conditional = ConditionalDependencies(config_file)
for dependency in conditional.conditional_reqs:
if conditional.check(dependency.name):
rval.append(dependency.line)
return rval