Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.pykube_util
"""Interface layer for pykube library shared between Galaxy and Pulsar."""
import logging
import os
import re
import uuid
try:
from pykube.config import KubeConfig
from pykube.http import HTTPClient
from pykube.objects import (
Job,
Pod
)
except ImportError as exc:
KubeConfig = None
Job = None
Pod = None
K8S_IMPORT_MESSAGE = ('The Python pykube package is required to use '
'this feature, please install it or correct the '
'following error:\nImportError %s' % str(exc))
log = logging.getLogger(__name__)
DEFAULT_JOB_API_VERSION = "batch/v1"
DEFAULT_NAMESPACE = "default"
INSTANCE_ID_INVALID_MESSAGE = ("Galaxy instance [%s] is either too long "
"(>20 characters) or it includes non DNS "
"acceptable characters, ignoring it.")
[docs]def pykube_client_from_dict(params):
if "k8s_use_service_account" in params and params["k8s_use_service_account"]:
pykube_client = HTTPClient(KubeConfig.from_service_account())
else:
config_path = params.get("k8s_config_path")
if config_path is None:
config_path = os.environ.get('KUBECONFIG', None)
if config_path is None:
config_path = '~/.kube/config'
pykube_client = HTTPClient(KubeConfig.from_file(config_path))
return pykube_client
[docs]def produce_unique_k8s_job_name(app_prefix=None, instance_id=None, job_id=None):
if job_id is None:
job_id = str(uuid.uuid4())
job_name = ""
if app_prefix:
job_name += "%s-" % app_prefix
if instance_id and len(instance_id) > 0:
job_name += "%s-" % instance_id
return "{}{}-{}".format(job_name, job_id, uuid.uuid4())[-63:]
[docs]def pull_policy(params):
# If this doesn't validate it returns None, that seems odd?
if "k8s_pull_policy" in params:
if params['k8s_pull_policy'] in ["Always", "IfNotPresent", "Never"]:
return params['k8s_pull_policy']
return None
[docs]def find_job_object_by_name(pykube_api, job_name, namespace=None):
return _find_object_by_name(Job, pykube_api, job_name, namespace=namespace)
[docs]def find_pod_object_by_name(pykube_api, pod_name, namespace=None):
return _find_object_by_name(Pod, pykube_api, pod_name, namespace=namespace)
def _find_object_by_name(clazz, pykube_api, object_name, namespace=None):
filter_kwd = dict(selector="app=%s" % object_name)
if namespace is not None:
filter_kwd["namespace"] = namespace
objs = clazz.objects(pykube_api).filter(**filter_kwd)
obj = None
if len(objs.response['items']) > 0:
obj = clazz(pykube_api, objs.response['items'][0])
return obj
[docs]def stop_job(job, cleanup="always"):
job_failed = (job.obj['status']['failed'] > 0
if 'failed' in job.obj['status'] else False)
# Scale down the job just in case even if cleanup is never
job.scale(replicas=0)
api_delete = cleanup == "always"
if not api_delete and cleanup == "onsuccess" and not job_failed:
api_delete = True
if api_delete:
delete_options = {
"apiVersion": "v1",
"kind": "DeleteOptions",
"propagationPolicy": "Background"
}
r = job.api.delete(json=delete_options, **job.api_kwargs())
job.api.raise_for_status(r)
[docs]def job_object_dict(params, job_name, spec):
k8s_job_obj = {
"apiVersion": params.get('k8s_job_api_version', DEFAULT_JOB_API_VERSION),
"kind": "Job",
"metadata": {
# metadata.name is the name of the pod resource created, and must be unique
# http://kubernetes.io/docs/user-guide/configuring-containers/
"name": job_name,
"namespace": params.get('k8s_namespace', DEFAULT_NAMESPACE),
"labels": {"app": job_name}
},
"spec": spec,
}
return k8s_job_obj
[docs]def galaxy_instance_id(params):
"""Parse and validate the id of the Galaxy instance from supplied dict.
This will be added to Jobs and Pods names, so it needs to be DNS friendly,
this means: `The Internet standards (Requests for Comments) for protocols mandate that component hostname labels
may contain only the ASCII letters 'a' through 'z' (in a case-insensitive manner), the digits '0' through '9',
and the minus sign ('-').`
It looks for the value set on params['k8s_galaxy_instance_id'], which might or not be set. The
idea behind this is to allow the Galaxy instance to trust (or not) existing k8s Jobs and Pods that match the
setup of a Job that is being recovered or restarted after a downtime/reboot.
"""
if "k8s_galaxy_instance_id" in params:
raw_value = params['k8s_galaxy_instance_id']
if re.match(r"(?!-)[a-z\d-]{1,20}(?<!-)$", raw_value):
return raw_value
else:
log.error(INSTANCE_ID_INVALID_MESSAGE % raw_value)
return None
__all__ = (
"DEFAULT_JOB_API_VERSION",
"ensure_pykube",
"find_job_object_by_name",
"find_pod_object_by_name",
"galaxy_instance_id",
"Job",
"job_object_dict",
"Pod",
"produce_unique_k8s_job_name",
"pull_policy",
"pykube_client_from_dict",
"stop_job",
)