Source code for galaxy.dependencies

"""
Determine what optional dependencies are needed.
"""

import os
import sys
from os.path import (
    dirname,
    exists,
    join,
)

import yaml
from dparse import parse

from galaxy.util import (
    asbool,
    etree,
    parse_xml,
    which,
)
from galaxy.util.properties import (
    find_config_file,
    load_app_properties,
)


[docs]class ConditionalDependencies:
[docs] def __init__(self, config_file, config=None): self.config_file = config_file self.job_runners = [] self.authenticators = [] self.object_stores = [] self.file_sources = [] self.conditional_reqs = [] self.container_interface_types = [] self.job_rule_modules = [] self.error_report_modules = [] self.vault_type = None if config is None: self.config = load_app_properties(config_file=self.config_file) else: self.config = config self.parse_configs() self.get_conditional_requirements()
[docs] def parse_configs(self): def load_job_config_dict(job_conf_dict): runners = job_conf_dict.get("runners", {}) for runner in runners.values(): if "load" in runner: self.job_runners.append(runner.get("load")) environments = job_conf_dict.get("execution", {}).get("environments", {}) for env in environments.values(): if "rules_module" in env: self.job_rule_modules.append(env.get("rules_module")) if "job_config" in self.config: load_job_config_dict(self.config.get("job_config")) else: job_conf_path = self.config.get("job_config_file") if not job_conf_path: job_conf_path = join(dirname(self.config_file), "job_conf.yml") if not exists(job_conf_path): job_conf_path = join(dirname(self.config_file), "job_conf.xml") else: job_conf_path = join(dirname(self.config_file), job_conf_path) if ".xml" in job_conf_path: try: job_conf_tree = parse_xml(job_conf_path) plugins_elem = job_conf_tree.find("plugins") if plugins_elem: for plugin in plugins_elem.findall("plugin"): if "load" in plugin.attrib: self.job_runners.append(plugin.attrib["load"]) for plugin in job_conf_tree.findall('.//destination/param[@id="rules_module"]'): self.job_rule_modules.append(plugin.text) except (OSError, etree.ParseError): pass else: try: with open(job_conf_path) as f: job_conf_dict = yaml.safe_load(f) load_job_config_dict(job_conf_dict) except OSError: pass object_store_conf_path = self.config.get( "object_store_config_file", join(dirname(self.config_file), "object_store_conf.xml") ) try: if ".xml" in object_store_conf_path: for store in parse_xml(object_store_conf_path).iter("object_store"): if "type" in store.attrib: self.object_stores.append(store.attrib["type"]) else: with open(object_store_conf_path) as f: job_conf_dict = yaml.safe_load(f) def collect_types(from_dict): if not isinstance(from_dict, dict): return if "type" in from_dict: self.object_stores.append(from_dict["type"]) for value in from_dict.values(): if isinstance(value, list): for val in value: collect_types(val) else: collect_types(value) collect_types(job_conf_dict) except OSError: pass # Parse auth conf auth_conf_xml = self.config.get("auth_config_file", join(dirname(self.config_file), "auth_conf.xml")) try: for auth in parse_xml(auth_conf_xml).findall("authenticator"): auth_type = auth.find("type") if auth_type is not None: self.authenticators.append(auth_type.text) except OSError: pass # Parse oidc_backends_config_file specifically for PKCE support. self.pkce_support = False oidc_backend_conf_xml = self.config.get( "oidc_backends_config_file", join(dirname(self.config_file), "oidc_backends_config.xml") ) try: for pkce_support_element in parse_xml(oidc_backend_conf_xml).iterfind("./provider/pkce_support"): if pkce_support_element.text == "true": self.pkce_support = True break except OSError: pass # Parse error report config error_report_yml = self.config.get("error_report_file", join(dirname(self.config_file), "error_report.yml")) try: with open(error_report_yml) as f: error_reporters = yaml.safe_load(f) self.error_report_modules = [er.get("type", None) for er in error_reporters] except OSError: pass # Parse file sources config file_sources_conf_yml = self.config.get( "file_sources_config_file", join(dirname(self.config_file), "file_sources_conf.yml") ) if exists(file_sources_conf_yml): with open(file_sources_conf_yml) as f: file_sources_conf = yaml.safe_load(f) else: file_sources_conf = [] self.file_sources = [c.get("type", None) for c in file_sources_conf] # Parse vault config vault_conf_yml = self.config.get("vault_config_file", join(dirname(self.config_file), "vault_conf.yml")) if exists(vault_conf_yml): with open(vault_conf_yml) as f: vault_conf = yaml.safe_load(f) else: vault_conf = {} self.vault_type = vault_conf.get("type", "").lower()
[docs] def get_conditional_requirements(self): crfile = join(dirname(__file__), "conditional-requirements.txt") with open(crfile) as fh: dependency_file = parse(fh.read(), file_type="requirements.txt") for dep in dependency_file.dependencies: self.conditional_reqs.append(dep)
[docs] def check(self, name): try: name = name.replace("-", "_").replace(".", "_") return getattr(self, f"check_{name}")() except Exception: return False
[docs] def check_psycopg2_binary(self): return self.config["database_connection"].startswith("postgres")
[docs] def check_mysqlclient(self): return self.config["database_connection"].startswith("mysql")
[docs] def check_drmaa(self): return ( "galaxy.jobs.runners.drmaa:DRMAAJobRunner" in self.job_runners or "galaxy.jobs.runners.slurm:SlurmJobRunner" in self.job_runners or "galaxy.jobs.runners.univa:UnivaJobRunner" in self.job_runners )
[docs] def check_galaxycloudrunner(self): return "galaxycloudrunner.rules" in self.job_rule_modules
[docs] def check_total_perspective_vortex(self): return "tpv.rules" in self.job_rule_modules
[docs] def check_pbs_python(self): return "galaxy.jobs.runners.pbs:PBSJobRunner" in self.job_runners
[docs] def check_pykube_ng(self): return "galaxy.jobs.runners.kubernetes:KubernetesJobRunner" in self.job_runners or which("kubectl")
[docs] def check_chronos_python(self): return "galaxy.jobs.runners.chronos:ChronosJobRunner" in self.job_runners
[docs] def check_boto3_python(self): return "galaxy.jobs.runners.aws:AWSBatchJobRunner" in self.job_runners
[docs] def check_fluent_logger(self): return asbool(self.config["fluent_log"])
[docs] def check_sentry_sdk(self): return self.config.get("sentry_dsn", None) is not None
[docs] def check_statsd(self): return self.config.get("statsd_host", None) is not None
[docs] def check_python_ldap(self): return "ldap" in self.authenticators or "activedirectory" in self.authenticators
[docs] def check_ldap3(self): return "ldap3" in self.authenticators
[docs] def check_python_pam(self): return "PAM" in self.authenticators
[docs] def check_azure_storage(self): return "azure_blob" in self.object_stores
[docs] def check_boto3(self): return "boto3" in self.object_stores
[docs] def check_kamaki(self): return "pithos" in self.object_stores
[docs] def check_python_irodsclient(self): return "irods" in self.object_stores
[docs] def check_fs_dropboxfs(self): return "dropbox" in self.file_sources
[docs] def check_fs_webdavfs(self): return "webdav" in self.file_sources
[docs] def check_fs_anvilfs(self): # pyfilesystem plugin access to terra on anvil return "anvil" in self.file_sources
[docs] def check_fs_sshfs(self): return "ssh" in self.file_sources
[docs] def check_fs_googledrivefs(self): return "googledrive" in self.file_sources
[docs] def check_fs_gcsfs(self): return "googlecloudstorage" in self.file_sources
[docs] def check_google_cloud_storage(self): return "googlecloudstorage" in self.file_sources
[docs] def check_onedatafilerestclient(self): return "onedata" in self.object_stores
[docs] def check_fs_onedatarestfs(self): return "onedata" in self.file_sources
[docs] def check_fs_basespace(self): return "basespace" in self.file_sources
[docs] def check_watchdog(self): install_set = {"auto", "True", "true", "polling", True} return self.config["watch_tools"] in install_set or self.config["watch_tool_data_dir"] in install_set
[docs] def check_python_gitlab(self): return "gitlab" in self.error_report_modules
[docs] def check_pygithub(self): return "github" in self.error_report_modules
[docs] def check_influxdb(self): return "influxdb" in self.error_report_modules
[docs] def check_tensorflow(self): return asbool(self.config["enable_tool_recommendations"])
[docs] def check_weasyprint(self): # See notes in ./conditional-requirements.txt for more information. return os.environ.get("GALAXY_DEPENDENCIES_INSTALL_WEASYPRINT") == "1"
[docs] def check_pydyf(self): # See notes in ./conditional-requirements.txt for more information. return os.environ.get("GALAXY_DEPENDENCIES_INSTALL_WEASYPRINT") == "1"
[docs] def check_custos_sdk(self): return "custos" == self.vault_type
[docs] def check_hvac(self): return "hashicorp" == self.vault_type
[docs] def check_pkce(self): return self.pkce_support
[docs] def check_rucio_clients(self): return "rucio" in self.object_stores and sys.version_info >= (3, 9)
[docs]def optional(config_file=None): if not config_file: config_file = find_config_file(["galaxy", "universe_wsgi"], include_samples=True) if not config_file: print("galaxy.dependencies.optional: no config file found", file=sys.stderr) return [] rval = [] conditional = ConditionalDependencies(config_file) for dependency in conditional.conditional_reqs: if conditional.check(dependency.name): rval.append(dependency.line) return rval