Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.godocker
import json
import logging
import time
from datetime import datetime
import requests
from galaxy import model
from galaxy.jobs.runners import (
AsynchronousJobRunner,
AsynchronousJobState,
)
from galaxy.util import (
DEFAULT_SOCKET_TIMEOUT,
unicodify,
)
log = logging.getLogger(__name__)
__all__ = ("GodockerJobRunner",)
class Godocker:
"""
API parameters
"""
def __init__(self, server, login, apikey, noCert):
self.token = None
self.server = server
self.login = login
self.apikey = apikey
self.noCert = noCert
def setToken(self, token):
self.token = token
def http_post_request(self, query, data, header):
"""post request with query"""
verify_ssl = not self.noCert
try:
url = self.server + query
res = requests.post(url, data, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
log.error("A transport error occurred in the GoDocker job runner:", e)
return False
return self.test_status_code(res)
def http_get_request(self, query, header):
"""get request with query, server and header required"""
# remove warnings if using --no-certificate
requests.packages.urllib3.disable_warnings()
verify_ssl = not self.noCert
try:
url = self.server + query
res = requests.get(url, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
log.error("A communication error occurred in the GoDocker job runner:", e)
return False
return self.test_status_code(res)
def http_delete_request(self, query, header):
"""delete request with query, server and header required"""
# remove warnings if using --no-certificate
requests.packages.urllib3.disable_warnings()
verify_ssl = not self.noCert
try:
url = self.server + query
res = requests.delete(url, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
log.error("A communication error occurred in the GoDocker job runner:", e)
return False
return self.test_status_code(res)
def http_put_request(self, query, data, header):
"""put request with query"""
# remove warnings if using --no-certificate
requests.packages.urllib3.disable_warnings()
verify_ssl = not self.noCert
try:
url = self.server + query
res = requests.put(url, data, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
log.error("A communication error occurred in the GoDocker job runner:", e)
return False
return self.test_status_code(res)
def test_status_code(self, httpresult):
"""exit if status code is 401 or 403 or 404 or 200"""
if httpresult.status_code == 401:
log.debug(
"Unauthorized : this server could not verify that you are authorized to access the document you requested."
)
elif httpresult.status_code == 403:
log.debug("Forbidden : Access was denied to this resource. Not authorized to access this resource.")
elif httpresult.status_code == 404:
log.debug("Not Found : The resource could not be found.")
elif httpresult.status_code == 200:
return httpresult
return False
[docs]class GodockerJobRunner(AsynchronousJobRunner):
"""
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
runner_name = "GodockerJobRunner"
[docs] def __init__(self, app, nworkers, **kwargs):
"""1: Get runner_param_specs from the job config
2: Initialise job runner parent object
3: Login to godocker and store the token
4: Start the worker and monitor threads
"""
runner_param_specs = dict(
godocker_master=dict(map=str), user=dict(map=str), key=dict(map=str), godocker_project=dict(map=str)
)
if "runner_param_specs" not in kwargs:
kwargs["runner_param_specs"] = dict()
kwargs["runner_param_specs"].update(runner_param_specs)
# Start the job runner parent object
super().__init__(app, nworkers, **kwargs)
# godocker API login call
self.auth = self.login(
self.runner_params["key"], self.runner_params["user"], self.runner_params["godocker_master"]
)
[docs] def queue_job(self, job_wrapper):
"""Create job script and submit it to godocker"""
if not self.prepare_job(
job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=False
):
return
job_destination = job_wrapper.job_destination
# Submit job to godocker
job_id = self.post_task(job_wrapper)
if not job_id:
log.error("Job creation failure. No Response from GoDocker")
job_wrapper.fail("Not submitted")
else:
log.debug(f"Starting queue_job for job {job_id}")
# Create an object of AsynchronousJobState and add it to the monitor queue.
ajs = AsynchronousJobState(
files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_id=job_id,
job_destination=job_destination,
)
self.monitor_queue.put(ajs)
[docs] def check_watched_item(self, job_state):
"""Get the job current status from GoDocker
using job_id and update the status in galaxy.
If the job execution is successful, call
mark_as_finished() and return 'None' to galaxy.
else if the job failed, call mark_as_failed()
and return 'None' to galaxy.
else if the job is running or in pending state, simply
return the 'AsynchronousJobState object' (job_state).
"""
# This function is called by check_watched_items() where param job_state
# is an object of AsynchronousJobState.
# Expected return type of this function is None or an
# AsynchronousJobState object with updated running status.
# Get task from GoDocker
job_persisted_state = job_state.job_wrapper.get_state()
job_status_god = self.get_task(job_state.job_id)
log.debug(f"Job ID: {str(job_state.job_id)} Job Status: {str(job_status_god['status']['primary'])}")
if job_status_god["status"]["primary"] == "over" or job_persisted_state == model.Job.states.STOPPED:
job_state.running = False
job_state.job_wrapper.change_state(model.Job.states.OK)
if self.create_log_file(job_state, job_status_god):
self.mark_as_finished(job_state)
else:
self.mark_as_failed(job_state)
"""The function mark_as_finished() executes:
self.work_queue.put((self.finish_job, job_state))
*self.finish_job ->
job_state.job_wrapper.finish( stdout, stderr, exit_code )
job_state.job_wrapper.reclaim_ownership()
job_state.cleanup()
*self.work_queue.put( method , arg ) ->
The run_next() method starts execution on starting worker threads.
This run_next() method executes method(arg)
by using self.work_queue.get()
*Possible outcomes of finish_job(job_state) ->
job_state.job_wrapper.finish( stdout, stderr, exit_code )
job_state.job_wrapper.fail( "Unable to finish job", exception=True)
*Similar workflow is done for mark_as_failed() method.
"""
return None
elif job_status_god["status"]["primary"] == "running":
job_state.running = True
job_state.job_wrapper.change_state(model.Job.states.RUNNING)
return job_state
elif job_status_god["status"]["primary"] == "pending":
return job_state
elif job_status_god["status"]["exitcode"] not in [None, 0] and job_persisted_state != model.Job.states.STOPPED:
job_state.running = False
job_state.job_wrapper.change_state(model.Job.states.ERROR)
self.create_log_file(job_state, job_status_god)
self.mark_as_failed(job_state)
return None
else:
job_state.running = False
self.create_log_file(job_state, job_status_god)
self.mark_as_failed(job_state)
return None
[docs] def stop_job(self, job_wrapper):
"""Attempts to delete a dispatched executing Job in GoDocker"""
# This function is called by fail_job() where
# param job = self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id)
# No Return data expected
job_id = job_wrapper.job_id
log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}")
# Get task status from GoDocker.
job_status_god = self.get_task_status(job_id)
if job_status_god["status"]["primary"] != "over":
# Initiate a delete call,if the job is running in GoDocker.
self.delete_task(job_id)
return None
[docs] def recover(self, job, job_wrapper):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
# This method is called by Galaxy at startup time.
# Jobs in Running & Queued state in galaxy are put in the monitor_queue
# by creating an AsynchronousJobState object
job_id = job_wrapper.job_id
ajs = AsynchronousJobState(
files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_id=job_id,
job_destination=job_wrapper.job_destination,
)
job_wrapper.command_line = job.command_line
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
log.debug(
f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the god queue"
)
ajs.old_state = "R"
ajs.running = True
self.monitor_queue.put(ajs)
elif job.state == model.Job.states.QUEUED:
log.debug(
f"({job.id}/{job.get_job_runner_external_id()}) is still in god queued state, adding to the god queue"
)
ajs.old_state = "Q"
ajs.running = False
self.monitor_queue.put(ajs)
# Helper functions
[docs] def create_log_file(self, job_state, job_status_god):
"""Create log files in galaxy, namely error_file, output_file, exit_code_file
Return true, if all the file creations are successful
"""
path = None
for vol in job_status_god["container"]["volumes"]:
if vol["name"] == "go-docker":
path = str(vol["path"])
if path:
god_output_file = f"{path}/god.log"
god_error_file = f"{path}/god.err"
try:
# Read from GoDocker output_file and write it into galaxy output_file.
f = open(god_output_file)
out_log = f.read()
log_file = open(job_state.output_file, "w")
log_file.write(out_log)
log_file.close()
f.close()
# Read from GoDocker error_file and write it into galaxy error_file.
f = open(god_error_file)
out_log = f.read()
log_file = open(job_state.error_file, "w")
log_file.write(out_log)
log_file.close()
f.close()
# Read from GoDocker exit_code and write it into galaxy exit_code_file.
out_log = str(job_status_god["status"]["exitcode"])
log_file = open(job_state.exit_code_file, "w")
log_file.write(out_log)
log_file.close()
f.close()
log.debug(f"CREATE OUTPUT FILE: {job_state.output_file}")
log.debug(f"CREATE ERROR FILE: {job_state.error_file}")
log.debug(f"CREATE EXIT CODE FILE: {job_state.exit_code_file}")
except OSError as e:
log.error("Could not access task log file: %s", unicodify(e))
log.debug("IO Error occurred when accessing the files.")
return False
return True
# GoDocker API helper functions
[docs] def login(self, apikey, login, server, noCert=False):
"""Login to GoDocker and return the token
Create Login model schema of GoDocker and call the http_post_request method.
"""
log.debug("LOGIN TASK TO BE EXECUTED \n")
log.debug(f"GODOCKER LOGIN: {str(login)}")
data = json.dumps({"user": login, "apikey": apikey})
# Create object of Godocker class
g_auth = Godocker(server, login, apikey, noCert)
auth = g_auth.http_post_request(
"/api/1.0/authenticate", data, {"Content-type": "application/json", "Accept": "application/json"}
)
if not auth:
raise Exception("Authentication failure, GoDocker runner cannot be started")
else:
log.debug("GoDocker authentication successful.")
token = auth.json()["token"]
g_auth.setToken(token)
# Return the object of Godocker class
return g_auth
[docs] def post_task(self, job_wrapper):
"""Sumbit job to GoDocker and return jobid
Create Job model schema of GoDocker and call the http_post_request method.
"""
# Get the params from <destination> tag in job_conf by using job_destination.params[param]
if self.auth.token:
job_destination = job_wrapper.job_destination
try:
docker_cpu = int(job_destination.params["docker_cpu"])
except Exception:
docker_cpu = 1
try:
docker_ram = int(job_destination.params["docker_memory"])
except Exception:
docker_ram = 1
try:
docker_image = self._find_container(job_wrapper).container_id
log.debug("GoDocker runner using container %s.", docker_image)
except Exception:
log.error(f"Unable to find docker_image for job {job_wrapper.job_id}, failing.")
return False
volumes = []
labels = []
tags_tab = ["galaxy", job_wrapper.tool.id]
tasks_depends = []
name = job_wrapper.tool.name
description = "galaxy job"
array = None
project = None
try:
project = str(self.runner_params["godocker_project"])
except KeyError:
log.debug("godocker_project not defined, using default.")
try:
volume = job_destination.params["godocker_volumes"]
volume = volume.split(",")
for i in volume:
temp = dict({"name": i})
volumes.append(temp)
except Exception:
log.debug("godocker_volume not set, using default.")
dt = datetime.now()
# Enable galaxy venv in the docker containers
try:
if job_destination.params["virtualenv"] == "true":
GALAXY_VENV_TEMPLATE = """GALAXY_VIRTUAL_ENV="%s"; if [ "$GALAXY_VIRTUAL_ENV" != "None" -a -z "$VIRTUAL_ENV" -a -f "$GALAXY_VIRTUAL_ENV/bin/activate" ]; then . "$GALAXY_VIRTUAL_ENV/bin/activate"; fi;"""
venv = GALAXY_VENV_TEMPLATE % job_wrapper.galaxy_virtual_env
command = (
f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{venv}\n{job_wrapper.runner_command_line}"
)
else:
command = f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{job_wrapper.runner_command_line}"
except Exception:
command = f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{job_wrapper.runner_command_line}"
# GoDocker Job model schema
job = {
"date": time.mktime(dt.timetuple()),
"meta": {"name": name, "description": description, "tags": tags_tab},
"requirements": {
"cpu": docker_cpu,
"ram": docker_ram,
"array": {"values": array},
"label": labels,
"tasks": tasks_depends,
"tmpstorage": None,
},
"container": {
"image": str(docker_image),
"volumes": volumes,
"network": True,
"id": None,
"meta": None,
"stats": None,
"ports": [],
"root": False,
},
"command": {
"interactive": False,
"cmd": command,
},
"status": {"primary": None, "secondary": None},
}
if project is not None:
job["user"] = {"project": project}
result = self.auth.http_post_request(
"/api/1.0/task",
json.dumps(job),
{
"Authorization": f"Bearer {self.auth.token}",
"Content-type": "application/json",
"Accept": "application/json",
},
)
# Return job_id
return str(result.json()["id"])
[docs] def get_task(self, job_id):
"""Get job details from GoDocker and return the job.
Pass job_id to the http_get_request method.
"""
job = False
if self.auth.token:
result = self.auth.http_get_request(
f"/api/1.0/task/{str(job_id)}", {"Authorization": f"Bearer {self.auth.token}"}
)
job = result.json()
# Return the job
return job
[docs] def task_suspend(self, job_id):
"""Suspend actively running job in galaxy.
Pass job_id to the http_get_request method.
"""
job = False
if self.auth.token:
result = self.auth.http_get_request(
f"/api/1.0/task/{str(job_id)}/suspend", {"Authorization": f"Bearer {self.auth.token}"}
)
job = result.json()
# Return the job
return job
[docs] def get_task_status(self, job_id):
"""Get job status from GoDocker and return the status of job.
Pass job_id to http_get_request method.
"""
job = False
if self.auth.token:
result = self.auth.http_get_request(
f"/api/1.0/task/{str(job_id)}/status", {"Authorization": f"Bearer {self.auth.token}"}
)
job = result.json()
# Return task status
return job
[docs] def delete_task(self, job_id):
"""Delete a suspended task in GoDocker.
Pass job_id to http_delete_request method.
"""
job = False
if self.auth.token:
result = self.auth.http_delete_request(
f"/api/1.0/task/{str(job_id)}", {"Authorization": f"Bearer {self.auth.token}"}
)
job = result.json()
# Return the job
return job