Source code for galaxy.jobs.runners.godocker

import json
import logging
import time
from datetime import datetime

from galaxy import model
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState,
)
from galaxy.util import (
    DEFAULT_SOCKET_TIMEOUT,
    requests,
    unicodify,
)

log = logging.getLogger(__name__)

__all__ = ("GodockerJobRunner",)


class Godocker:
    """
    API parameters
    """

    def __init__(self, server, login, apikey, noCert):
        self.token = None
        self.server = server
        self.login = login
        self.apikey = apikey
        self.noCert = noCert

    def setToken(self, token):
        self.token = token

    def http_post_request(self, query, data, header):
        """post request with query"""

        verify_ssl = not self.noCert
        try:
            url = self.server + query
            res = requests.post(url, data, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A transport error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def http_get_request(self, query, header):
        """get request with query, server and header required"""

        # remove warnings if using --no-certificate
        requests.packages.urllib3.disable_warnings()
        verify_ssl = not self.noCert
        try:
            url = self.server + query
            res = requests.get(url, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A communication error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def http_delete_request(self, query, header):
        """delete request with query, server and header required"""

        # remove warnings if using --no-certificate
        requests.packages.urllib3.disable_warnings()
        verify_ssl = not self.noCert
        try:
            url = self.server + query
            res = requests.delete(url, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A communication error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def http_put_request(self, query, data, header):
        """put request with query"""

        # remove warnings if using --no-certificate
        requests.packages.urllib3.disable_warnings()
        verify_ssl = not self.noCert
        try:
            url = self.server + query
            res = requests.put(url, data, headers=header, verify=verify_ssl, timeout=DEFAULT_SOCKET_TIMEOUT)

        except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
            log.error("A communication error occurred in the GoDocker job runner:", e)
            return False

        return self.test_status_code(res)

    def test_status_code(self, httpresult):
        """exit if status code is 401 or 403 or 404 or 200"""
        if httpresult.status_code == 401:
            log.debug(
                "Unauthorized : this server could not verify that you are authorized to access the document you requested."
            )

        elif httpresult.status_code == 403:
            log.debug("Forbidden : Access was denied to this resource. Not authorized to access this resource.")

        elif httpresult.status_code == 404:
            log.debug("Not Found : The resource could not be found.")

        elif httpresult.status_code == 200:
            return httpresult

        return False


[docs]class GodockerJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "GodockerJobRunner"
[docs] def __init__(self, app, nworkers, **kwargs): """1: Get runner_param_specs from the job config 2: Initialise job runner parent object 3: Login to godocker and store the token 4: Start the worker and monitor threads """ runner_param_specs = dict( godocker_master=dict(map=str), user=dict(map=str), key=dict(map=str), godocker_project=dict(map=str) ) if "runner_param_specs" not in kwargs: kwargs["runner_param_specs"] = {} kwargs["runner_param_specs"].update(runner_param_specs) # Start the job runner parent object super().__init__(app, nworkers, **kwargs) # godocker API login call self.auth = self.login( self.runner_params["key"], self.runner_params["user"], self.runner_params["godocker_master"] )
[docs] def queue_job(self, job_wrapper): """Create job script and submit it to godocker""" if not self.prepare_job( job_wrapper, include_metadata=False, include_work_dir_outputs=True, modify_command_for_container=False ): return job_destination = job_wrapper.job_destination # Submit job to godocker job_id = self.post_task(job_wrapper) if not job_id: log.error("Job creation failure. No Response from GoDocker") job_wrapper.fail("Not submitted") else: log.debug(f"Starting queue_job for job {job_id}") # Create an object of AsynchronousJobState and add it to the monitor queue. ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=job_id, job_destination=job_destination, ) self.monitor_queue.put(ajs)
[docs] def check_watched_item(self, job_state): """Get the job current status from GoDocker using job_id and update the status in galaxy. If the job execution is successful, call mark_as_finished() and return 'None' to galaxy. else if the job failed, call mark_as_failed() and return 'None' to galaxy. else if the job is running or in pending state, simply return the 'AsynchronousJobState object' (job_state). """ # This function is called by check_watched_items() where param job_state # is an object of AsynchronousJobState. # Expected return type of this function is None or an # AsynchronousJobState object with updated running status. # Get task from GoDocker job_persisted_state = job_state.job_wrapper.get_state() job_status_god = self.get_task(job_state.job_id) log.debug(f"Job ID: {str(job_state.job_id)} Job Status: {str(job_status_god['status']['primary'])}") if job_status_god["status"]["primary"] == "over" or job_persisted_state == model.Job.states.STOPPED: job_state.running = False job_state.job_wrapper.change_state(model.Job.states.OK) if self.create_log_file(job_state, job_status_god): self.mark_as_finished(job_state) else: self.mark_as_failed(job_state) """The function mark_as_finished() executes: self.work_queue.put((self.finish_job, job_state)) *self.finish_job -> job_state.job_wrapper.finish( stdout, stderr, exit_code ) job_state.job_wrapper.reclaim_ownership() job_state.cleanup() *self.work_queue.put( method , arg ) -> The run_next() method starts execution on starting worker threads. This run_next() method executes method(arg) by using self.work_queue.get() *Possible outcomes of finish_job(job_state) -> job_state.job_wrapper.finish( stdout, stderr, exit_code ) job_state.job_wrapper.fail( "Unable to finish job", exception=True) *Similar workflow is done for mark_as_failed() method. """ return None elif job_status_god["status"]["primary"] == "running": job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state elif job_status_god["status"]["primary"] == "pending": return job_state elif job_status_god["status"]["exitcode"] not in [None, 0] and job_persisted_state != model.Job.states.STOPPED: job_state.running = False job_state.job_wrapper.change_state(model.Job.states.ERROR) self.create_log_file(job_state, job_status_god) self.mark_as_failed(job_state) return None else: job_state.running = False self.create_log_file(job_state, job_status_god) self.mark_as_failed(job_state) return None
[docs] def stop_job(self, job_wrapper): """Attempts to delete a dispatched executing Job in GoDocker""" # This function is called by fail_job() # No Return data expected job_id = job_wrapper.job_id log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}") # Get task status from GoDocker. job_status_god = self.get_task_status(job_id) if job_status_god["status"]["primary"] != "over": # Initiate a delete call,if the job is running in GoDocker. self.delete_task(job_id) return None
[docs] def recover(self, job, job_wrapper): """Recovers jobs stuck in the queued/running state when Galaxy started""" # This method is called by Galaxy at startup time. # Jobs in Running & Queued state in galaxy are put in the monitor_queue # by creating an AsynchronousJobState object job_id = job_wrapper.job_id ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=job_id, job_destination=job_wrapper.job_destination, ) job_wrapper.command_line = job.command_line if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED): log.debug( f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the god queue" ) ajs.old_state = "R" ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: log.debug( f"({job.id}/{job.get_job_runner_external_id()}) is still in god queued state, adding to the god queue" ) ajs.old_state = "Q" ajs.running = False self.monitor_queue.put(ajs)
# Helper functions
[docs] def create_log_file(self, job_state, job_status_god): """Create log files in galaxy, namely error_file, output_file, exit_code_file Return true, if all the file creations are successful """ path = None for vol in job_status_god["container"]["volumes"]: if vol["name"] == "go-docker": path = str(vol["path"]) if path: god_output_file = f"{path}/god.log" god_error_file = f"{path}/god.err" try: # Read from GoDocker output_file and write it into galaxy output_file. f = open(god_output_file) out_log = f.read() log_file = open(job_state.output_file, "w") log_file.write(out_log) log_file.close() f.close() # Read from GoDocker error_file and write it into galaxy error_file. f = open(god_error_file) out_log = f.read() log_file = open(job_state.error_file, "w") log_file.write(out_log) log_file.close() f.close() # Read from GoDocker exit_code and write it into galaxy exit_code_file. out_log = str(job_status_god["status"]["exitcode"]) log_file = open(job_state.exit_code_file, "w") log_file.write(out_log) log_file.close() f.close() log.debug(f"CREATE OUTPUT FILE: {job_state.output_file}") log.debug(f"CREATE ERROR FILE: {job_state.error_file}") log.debug(f"CREATE EXIT CODE FILE: {job_state.exit_code_file}") except OSError as e: log.error("Could not access task log file: %s", unicodify(e)) log.debug("IO Error occurred when accessing the files.") return False return True
# GoDocker API helper functions
[docs] def login(self, apikey, login, server, noCert=False): """Login to GoDocker and return the token Create Login model schema of GoDocker and call the http_post_request method. """ log.debug("LOGIN TASK TO BE EXECUTED \n") log.debug(f"GODOCKER LOGIN: {str(login)}") data = json.dumps({"user": login, "apikey": apikey}) # Create object of Godocker class g_auth = Godocker(server, login, apikey, noCert) auth = g_auth.http_post_request( "/api/1.0/authenticate", data, {"Content-type": "application/json", "Accept": "application/json"} ) if not auth: raise Exception("Authentication failure, GoDocker runner cannot be started") else: log.debug("GoDocker authentication successful.") token = auth.json()["token"] g_auth.setToken(token) # Return the object of Godocker class return g_auth
[docs] def post_task(self, job_wrapper): """Sumbit job to GoDocker and return jobid Create Job model schema of GoDocker and call the http_post_request method. """ # Get the params from <destination> tag in job_conf by using job_destination.params[param] if self.auth.token: job_destination = job_wrapper.job_destination try: docker_cpu = int(job_destination.params["docker_cpu"]) except Exception: docker_cpu = 1 try: docker_ram = int(job_destination.params["docker_memory"]) except Exception: docker_ram = 1 try: docker_image = self._find_container(job_wrapper).container_id log.debug("GoDocker runner using container %s.", docker_image) except Exception: log.error(f"Unable to find docker_image for job {job_wrapper.job_id}, failing.") return False volumes = [] labels = [] tags_tab = ["galaxy", job_wrapper.tool.id] tasks_depends = [] name = job_wrapper.tool.name description = "galaxy job" array = None project = None try: project = str(self.runner_params["godocker_project"]) except KeyError: log.debug("godocker_project not defined, using default.") try: volume = job_destination.params["godocker_volumes"] volume = volume.split(",") for i in volume: temp = {"name": i} volumes.append(temp) except Exception: log.debug("godocker_volume not set, using default.") dt = datetime.now() # Enable galaxy venv in the docker containers try: if job_destination.params["virtualenv"] == "true": GALAXY_VENV_TEMPLATE = """GALAXY_VIRTUAL_ENV="%s"; if [ "$GALAXY_VIRTUAL_ENV" != "None" -a -z "$VIRTUAL_ENV" -a -f "$GALAXY_VIRTUAL_ENV/bin/activate" ]; then . "$GALAXY_VIRTUAL_ENV/bin/activate"; fi;""" venv = GALAXY_VENV_TEMPLATE % job_wrapper.galaxy_virtual_env command = ( f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{venv}\n{job_wrapper.runner_command_line}" ) else: command = f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{job_wrapper.runner_command_line}" except Exception: command = f"#!/bin/bash\ncd {job_wrapper.working_directory}\n{job_wrapper.runner_command_line}" # GoDocker Job model schema job = { "date": time.mktime(dt.timetuple()), "meta": {"name": name, "description": description, "tags": tags_tab}, "requirements": { "cpu": docker_cpu, "ram": docker_ram, "array": {"values": array}, "label": labels, "tasks": tasks_depends, "tmpstorage": None, }, "container": { "image": str(docker_image), "volumes": volumes, "network": True, "id": None, "meta": None, "stats": None, "ports": [], "root": False, }, "command": { "interactive": False, "cmd": command, }, "status": {"primary": None, "secondary": None}, } if project is not None: job["user"] = {"project": project} result = self.auth.http_post_request( "/api/1.0/task", json.dumps(job), { "Authorization": f"Bearer {self.auth.token}", "Content-type": "application/json", "Accept": "application/json", }, ) # Return job_id return str(result.json()["id"])
[docs] def get_task(self, job_id): """Get job details from GoDocker and return the job. Pass job_id to the http_get_request method. """ job = False if self.auth.token: result = self.auth.http_get_request( f"/api/1.0/task/{str(job_id)}", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return the job return job
[docs] def task_suspend(self, job_id): """Suspend actively running job in galaxy. Pass job_id to the http_get_request method. """ job = False if self.auth.token: result = self.auth.http_get_request( f"/api/1.0/task/{str(job_id)}/suspend", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return the job return job
[docs] def get_task_status(self, job_id): """Get job status from GoDocker and return the status of job. Pass job_id to http_get_request method. """ job = False if self.auth.token: result = self.auth.http_get_request( f"/api/1.0/task/{str(job_id)}/status", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return task status return job
[docs] def delete_task(self, job_id): """Delete a suspended task in GoDocker. Pass job_id to http_delete_request method. """ job = False if self.auth.token: result = self.auth.http_delete_request( f"/api/1.0/task/{str(job_id)}", {"Authorization": f"Bearer {self.auth.token}"} ) job = result.json() # Return the job return job