
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.godocker

import json
import logging
import time
from datetime import datetime

from galaxy import model
from galaxy.jobs.runners import (
from galaxy.util import (

log = logging.getLogger(__name__)

__all__ = ("GodockerJobRunner",)

class Godocker:
    API parameters

    def __init__(self, server, login, apikey, noCert):
        self.token = None
        self.server = server
        self.login = login
        self.apikey = apikey
        self.noCert = noCert

    def setToken(self, token):
        self.token = token

    def http_post_request(self, query, data, header):
        """post request with query"""

        verify_ssl = not self.noCert
            url = self.server + query
            res = requests.post(url, data, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A transport error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def http_get_request(self, query, header):
        """get request with query, server and header required"""

        # remove warnings if using --no-certificate
        verify_ssl = not self.noCert
            url = self.server + query
            res = requests.get(url, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A communication error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def http_delete_request(self, query, header):
        """delete request with query, server and header required"""

        # remove warnings if using --no-certificate
        verify_ssl = not self.noCert
            url = self.server + query
            res = requests.delete(url, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A communication error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def http_put_request(self, query, data, header):
        """put request with query"""

        # remove warnings if using --no-certificate
        verify_ssl = not self.noCert
            url = self.server + query
            res = requests.put(url, data, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A communication error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def test_status_code(self, httpresult):
        """exit if status code is 401 or 403 or 404 or 200"""
        if httpresult.status_code == 401:
                "Unauthorized : this server could not verify that you are authorized to access the document you requested."

        elif httpresult.status_code == 403:
            log.debug("Forbidden : Access was denied to this resource. Not authorized to access this resource.")

        elif httpresult.status_code == 404:
            log.debug("Not Found : The resource could not be found.")

        elif httpresult.status_code == 200:
            return httpresult

        return False

[docs] class GodockerJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "GodockerJobRunner"
[docs] def __init__(self, app, nworkers, **kwargs): """1: Get runner_param_specs from the job config 2: Initialise job runner parent object 3: Login to godocker and store the token 4: Start the worker and monitor threads """ runner_param_specs = dict( godocker_master=dict(map=str), user=dict(map=str), key=dict(map=str), godocker_project=dict(map=str) ) if "runner_param_specs" not in kwargs: kwargs["runner_param_specs"] = {} kwargs["runner_param_specs"].update(runner_param_specs) # Start the job runner parent object super().__init__(app, nworkers, **kwargs) # godocker API login call self.auth = self.login( self.runner_params["key"], self.runner_params["user"], self.runner_params["godocker_master"] )
[docs] def queue_job(self, job_wrapper): """Create job script and submit it to godocker""" if not self.prepare_job( job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=False ): return job_destination = job_wrapper.job_destination # Submit job to godocker job_id = self.post_task(job_wrapper) if not job_id: log.error("Job creation failure. No Response from GoDocker") job_wrapper.fail("Not submitted") else: log.debug(f"Starting queue_job for job {job_id}") # Create an object of AsynchronousJobState and add it to the monitor queue. ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=job_id, job_destination=job_destination, ) self.monitor_queue.put(ajs)
[docs] def check_watched_item(self, job_state): """Get the job current status from GoDocker using job_id and update the status in galaxy. If the job execution is successful, call mark_as_finished() and return 'None' to galaxy. else if the job failed, call mark_as_failed() and return 'None' to galaxy. else if the job is running or in pending state, simply return the 'AsynchronousJobState object' (job_state). """ # This function is called by check_watched_items() where param job_state # is an object of AsynchronousJobState. # Expected return type of this function is None or an # AsynchronousJobState object with updated running status. # Get task from GoDocker job_persisted_state = job_state.job_wrapper.get_state() job_status_god = self.get_task(job_state.job_id) log.debug(f"Job ID: {str(job_state.job_id)} Job Status: {str(job_status_god['status']['primary'])}") if job_status_god["status"]["primary"] == "over" or job_persisted_state == model.Job.states.STOPPED: job_state.running = False job_state.job_wrapper.change_state(model.Job.states.OK) if self.create_log_file(job_state, job_status_god): self.mark_as_finished(job_state) else: self.mark_as_failed(job_state) """The function mark_as_finished() executes: self.work_queue.put((self.finish_job, job_state)) *self.finish_job -> job_state.job_wrapper.finish( stdout, stderr, exit_code ) job_state.job_wrapper.reclaim_ownership() job_state.cleanup() *self.work_queue.put( method , arg ) -> The run_next() method starts execution on starting worker threads. This run_next() method executes method(arg) by using self.work_queue.get() *Possible outcomes of finish_job(job_state) -> job_state.job_wrapper.finish( stdout, stderr, exit_code ) job_state.job_wrapper.fail( "Unable to finish job", exception=True) *Similar workflow is done for mark_as_failed() method. """ return None elif job_status_god["status"]["primary"] == "running": job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_status_god["status"]["primary"] == "pending": return job_state elif job_status_god["status"]["exitcode"] not in [None, 0] and job_persisted_state != model.Job.states.STOPPED: job_state.running = False job_state.job_wrapper.change_state(model.Job.states.ERROR) self.create_log_file(job_state, job_status_god) self.mark_as_failed(job_state) return None else: job_state.running = False self.create_log_file(job_state, job_status_god) self.mark_as_failed(job_state) return None
[docs] def stop_job(self, job_wrapper): """Attempts to delete a dispatched executing Job in GoDocker""" # This function is called by fail_job() # No Return data expected job_id = job_wrapper.job_id log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}") # Get task status from GoDocker. job_status_god = self.get_task_status(job_id) if job_status_god["status"]["primary"] != "over": # Initiate a delete call,if the job is running in GoDocker. self.delete_task(job_id) return None
[docs] def recover(self, job, job_wrapper): """Recovers jobs stuck in the queued/running state when Galaxy started""" # This method is called by Galaxy at startup time. # Jobs in Running & Queued state in galaxy are put in the monitor_queue # by creating an AsynchronousJobState object job_id = job_wrapper.job_id ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=job_id, job_destination=job_wrapper.job_destination, ) job_wrapper.command_line = job.command_line if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED): log.debug( f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the god queue" ) ajs.old_state = "R" ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: log.debug( f"({job.id}/{job.get_job_runner_external_id()}) is still in god queued state, adding to the god queue" ) ajs.old_state = "Q" ajs.running = False self.monitor_queue.put(ajs)
# Helper functions
[docs] def create_log_file(self, job_state, job_status_god): """Create log files in galaxy, namely error_file, output_file, exit_code_file Return true, if all the file creations are successful """ path = None for vol in job_status_god["container"]["volumes"]: if vol["name"] == "go-docker": path = str(vol["path"]) if path: god_output_file = f"{path}/god.log" god_error_file = f"{path}/god.err" try: # Read from GoDocker output_file and write it into galaxy output_file. f = open(god_output_file) out_log = f.read() log_file = open(job_state.output_file, "w") log_file.write(out_log) log_file.close() f.close() # Read from GoDocker error_file and write it into galaxy error_file. f = open(god_error_file) out_log = f.read() log_file = open(job_state.error_file, "w") log_file.write(out_log) log_file.close() f.close() # Read from GoDocker exit_code and write it into galaxy exit_code_file. out_log = str(job_status_god["status"]["exitcode"]) log_file = open(job_state.exit_code_file, "w") log_file.write(out_log) log_file.close() f.close() log.debug(f"CREATE OUTPUT FILE: {job_state.output_file}") log.debug(f"CREATE ERROR FILE: {job_state.error_file}") log.debug(f"CREATE EXIT CODE FILE: {job_state.exit_code_file}") except OSError as e: log.error("Could not access task log file: %s", unicodify(e)) log.debug("IO Error occurred when accessing the files.") return False return True
# GoDocker API helper functions
[docs] def login(self, apikey, login, server, noCert=False): """Login to GoDocker and return the token Create Login model schema of GoDocker and call the http_post_request method. """ log.debug("LOGIN TASK TO BE EXECUTED \n") log.debug(f"GODOCKER LOGIN: {str(login)}") data = json.dumps({"user": login, "apikey": apikey}) # Create object of Godocker class g_auth = Godocker(server, login, apikey, noCert) auth = g_auth.http_post_request( "/api/1.0/authenticate", data, {"Content-type": "application/json", "Accept": "application/json"} ) if not auth: raise Exception("Authentication failure, GoDocker runner cannot be started") else: log.debug("GoDocker authentication successful.") token = auth.json()["token"] g_auth.setToken(token) # Return the object of Godocker class return g_auth
[docs] def post_task(self, job_wrapper): """Sumbit job to GoDocker and return jobid Create Job model schema of GoDocker and call the http_post_request method. """ # Get the params from <destination> tag in job_conf by using job_destination.params[param] if self.auth.token: job_destination = job_wrapper.job_destination try: docker_cpu = int(job_destination.params["docker_cpu"]) except Exception: docker_cpu = 1 try: docker_ram = int(job_destination.params["docker_memory"]) except Exception: docker_ram = 1 try: docker_image = self._find_container(job_wrapper).container_id log.debug("GoDocker runner using container %s.", docker_image) except Exception: log.error(f"Unable to find docker_image for job {job_wrapper.job_id}, failing.") return False volumes = [] labels = [] tags_tab = ["galaxy", job_wrapper.tool.id] tasks_depends = [] name = job_wrapper.tool.name description = "galaxy job" array = None project = None try: project = str(self.runner_params["godocker_project"]) except KeyError: log.debug("godocker_project not defined, using default.") try: volume = job_destination.params["godocker_volumes"] volume = volume.split(",") for i in volume: temp = {"name": i} volumes.append(temp) except Exception: log.debug("godocker_volume not set, using default.") dt = datetime.now() # Enable galaxy venv in the docker containers try: if job_destination.params["virtualenv"] == "true": GALAXY_VENV_TEMPLATE = """GALAXY_VIRTUAL_ENV="%s"; if [ "$GALAXY_VIRTUAL_ENV" != "None" -a -z "$VIRTUAL_ENV" -a -f "$GALAXY_VIRTUAL_ENV/bin/activate" ]; then . "$GALAXY_VIRTUAL_ENV/bin/activate"; fi;""" venv = GALAXY_VENV_TEMPLATE % job_wrapper.galaxy_virtual_env command = ( f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{venv}\n{job_wrapper.runner_command_line}" ) else: command = f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{job_wrapper.runner_command_line}" except Exception: command = f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{job_wrapper.runner_command_line}" # GoDocker Job model schema job = { "date": time.mktime(dt.timetuple()), "meta": {"name": name, "description": description, "tags": tags_tab}, "requirements": { "cpu": docker_cpu, "ram": docker_ram, "array": {"values": array}, "label": labels, "tasks": tasks_depends, "tmpstorage": None, }, "container": { "image": str(docker_image), "volumes": volumes, "network": True, "id": None, "meta": None, "stats": None, "ports": [], "root": False, }, "command": { "interactive": False, "cmd": command, }, "status": {"primary": None, "secondary": None}, } if project is not None: job["user"] = {"project": project} result = self.auth.http_post_request( "/api/1.0/task", json.dumps(job), { "Authorization": f"Bearer {self.auth.token}", "Content-type": "application/json", "Accept": "application/json", }, ) # Return job_id return str(result.json()["id"])
[docs] def get_task(self, job_id): """Get job details from GoDocker and return the job. Pass job_id to the http_get_request method. """ job = False if self.auth.token: result = self.auth.http_get_request( f"/api/1.0/task/{str(job_id)}", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return the job return job
[docs] def task_suspend(self, job_id): """Suspend actively running job in galaxy. Pass job_id to the http_get_request method. """ job = False if self.auth.token: result = self.auth.http_get_request( f"/api/1.0/task/{str(job_id)}/suspend", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return the job return job
[docs] def get_task_status(self, job_id): """Get job status from GoDocker and return the status of job. Pass job_id to http_get_request method. """ job = False if self.auth.token: result = self.auth.http_get_request( f"/api/1.0/task/{str(job_id)}/status", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return task status return job
[docs] def delete_task(self, job_id): """Delete a suspended task in GoDocker. Pass job_id to http_delete_request method. """ job = False if self.auth.token: result = self.auth.http_delete_request( f"/api/1.0/task/{str(job_id)}", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return the job return job