Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.pulsar

"""Job runner used to execute Galaxy jobs through Pulsar.

More information on Pulsar can be found at https://pulsar.readthedocs.io/ .
"""
from __future__ import absolute_import  # Need to import pulsar_client absolutely.

import errno
import logging
import os
import subprocess
from time import sleep

import packaging.version
import pulsar.core
import yaml
from pulsar.client import (
    build_client_manager,
    ClientJobDescription,
    ClientOutputs,
    finish_job as pulsar_finish_job,
    PathMapper,
    PulsarClientTransportError,
    PulsarOutputs,
    submit_job as pulsar_submit_job,
    url_to_destination_params
)

from galaxy import model
from galaxy.jobs import (
    ComputeEnvironment,
    JobDestination
)
from galaxy.jobs.command_factory import build_command
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState
)
from galaxy.tools.deps import dependencies
from galaxy.util import (
    galaxy_directory,
    specs,
    string_as_bool_or_none
)
from galaxy.util.bunch import Bunch

log = logging.getLogger(__name__)

__all__ = (
    'PulsarLegacyJobRunner',
    'PulsarRESTJobRunner',
    'PulsarMQJobRunner',
    'PulsarEmbeddedJobRunner',
)

MINIMUM_PULSAR_VERSIONS = {
    '_default_': packaging.version.parse("0.7.0.dev3"),
    'remote_metadata': packaging.version.parse("0.8.0"),
}

NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory."
NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml."
GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server."
FAILED_REMOTE_ERROR = "Remote job server indicated a problem running or monitoring this job."
LOST_REMOTE_ERROR = "Remote job server could not determine this job's state."

UPGRADE_PULSAR_ERROR = "Galaxy is misconfigured, please contact administrator. The target Pulsar server is unsupported, this version of Galaxy requires Pulsar version %s or newer."

# Is there a good way to infer some default for this? Can only use
# url_for from web threads. https://gist.github.com/jmchilton/9098762
DEFAULT_GALAXY_URL = "http://localhost:8080"

PULSAR_PARAM_SPECS = dict(
    transport=dict(
        map=specs.to_str_or_none,
        valid=specs.is_in("urllib", "curl", None),
        default=None
    ),
    transport_timeout=dict(
        map=lambda val: None if val == "None" else int(val),
        default=None,
    ),
    cache=dict(
        map=specs.to_bool_or_none,
        default=None,
    ),
    amqp_url=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    galaxy_url=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    pulsar_config=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    manager=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    persistence_directory=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_acknowledge=dict(
        map=specs.to_bool_or_none,
        default=None
    ),
    amqp_ack_republish_time=dict(
        map=lambda val: None if val == "None" else int(val),
        default=None,
    ),
    amqp_consumer_timeout=dict(
        map=lambda val: None if val == "None" else float(val),
        default=None,
    ),
    amqp_connect_ssl_ca_certs=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_connect_ssl_keyfile=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_connect_ssl_certfile=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_connect_ssl_cert_reqs=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    # https://kombu.readthedocs.io/en/latest/reference/kombu.html#kombu.Producer.publish
    amqp_publish_retry=dict(
        map=specs.to_bool,
        default=False,
    ),
    amqp_publish_priority=dict(
        map=int,
        valid=lambda x: 0 <= x and x <= 9,
        default=0,
    ),
    # https://kombu.readthedocs.io/en/latest/reference/kombu.html#kombu.Exchange.delivery_mode
    amqp_publish_delivery_mode=dict(
        map=str,
        valid=specs.is_in("transient", "persistent"),
        default="persistent",
    ),
    amqp_publish_retry_max_retries=dict(
        map=int,
        default=None,
    ),
    amqp_publish_retry_interval_start=dict(
        map=int,
        default=None,
    ),
    amqp_publish_retry_interval_step=dict(
        map=int,
        default=None,
    ),
    amqp_publish_retry_interval_max=dict(
        map=int,
        default=None,
    ),
)


PARAMETER_SPECIFICATION_REQUIRED = object()
PARAMETER_SPECIFICATION_IGNORED = object()


class PulsarJobRunner(AsynchronousJobRunner):
    """Base class for pulsar job runners."""

    runner_name = "PulsarJobRunner"
    default_build_pulsar_app = False

    def __init__(self, app, nworkers, **kwds):
        """Start the job runner."""
        super(PulsarJobRunner, self).__init__(app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds)
        self._init_worker_threads()
        galaxy_url = self.runner_params.galaxy_url
        if not galaxy_url:
            galaxy_url = app.config.galaxy_infrastructure_url
        if galaxy_url:
            galaxy_url = galaxy_url.rstrip("/")
        self.galaxy_url = galaxy_url
        self.__init_client_manager()
        self._monitor()

    def _monitor(self):
        # Extension point allow MQ variant to setup callback instead
        self._init_monitor_thread()

    def __init_client_manager(self):
        pulsar_conf = self.runner_params.get('pulsar_config', None)
        self.__init_pulsar_app(pulsar_conf)

        client_manager_kwargs = {}
        for kwd in 'manager', 'cache', 'transport', 'persistence_directory':
            client_manager_kwargs[kwd] = self.runner_params[kwd]
        if self.pulsar_app is not None:
            client_manager_kwargs["pulsar_app"] = self.pulsar_app
            # TODO: Hack remove this following line pulsar lib update
            # that includes https://github.com/galaxyproject/pulsar/commit/ce0636a5b64fae52d165bcad77b2caa3f0e9c232
            client_manager_kwargs["file_cache"] = None

        for kwd in self.runner_params.keys():
            if kwd.startswith('amqp_') or kwd.startswith('transport_'):
                client_manager_kwargs[kwd] = self.runner_params[kwd]
        self.client_manager = build_client_manager(**client_manager_kwargs)

    def __init_pulsar_app(self, pulsar_conf_path):
        if pulsar_conf_path is None and not self.default_build_pulsar_app:
            self.pulsar_app = None
            return
        conf = {}
        if pulsar_conf_path is None:
            log.info("Creating a Pulsar app with default configuration (no pulsar_conf specified).")
        else:
            log.info("Loading Pulsar app configuration from %s" % pulsar_conf_path)
            with open(pulsar_conf_path, "r") as f:
                conf.update(yaml.safe_load(f) or {})
        if "job_metrics_config_file" not in conf:
            conf["job_metrics"] = self.app.job_metrics
        if "staging_directory" not in conf:
            conf["staging_directory"] = "database/pulsar_staging"
        if "persistence_directory" not in conf:
            conf["persistence_directory"] = "database/pulsar_persisted_data"
        if "galaxy_home" not in conf:
            conf["galaxy_home"] = galaxy_directory()
        self.pulsar_app = pulsar.core.PulsarApp(**conf)

    def url_to_destination(self, url):
        """Convert a legacy URL to a job destination."""
        return JobDestination(runner="pulsar", params=url_to_destination_params(url))

    def check_watched_item(self, job_state):
        try:
            client = self.get_client_from_state(job_state)
            status = client.get_status()
        except PulsarClientTransportError as exc:
            log.error("Communication error with Pulsar server on state check, will retry: %s", exc)
            return job_state
        except Exception:
            # An orphaned job was put into the queue at app startup, so remote server went down
            # either way we are done I guess.
            self.mark_as_finished(job_state)
            return None
        job_state = self._update_job_state_for_status(job_state, status)
        return job_state

    def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
        if pulsar_status == "complete":
            self.mark_as_finished(job_state)
            return None
        if pulsar_status in ["failed", "lost"]:
            if pulsar_status == "failed":
                message = FAILED_REMOTE_ERROR
            else:
                message = LOST_REMOTE_ERROR
            if not job_state.job_wrapper.get_job().finished:
                self.fail_job(job_state, message, full_status=full_status)
            return None
        if pulsar_status == "running" and not job_state.running:
            job_state.running = True
            job_state.job_wrapper.change_state(model.Job.states.RUNNING)
        return job_state

    def queue_job(self, job_wrapper):
        job_destination = job_wrapper.job_destination
        self._populate_parameter_defaults(job_destination)

        command_line, client, remote_job_config, compute_environment = self.__prepare_job(job_wrapper, job_destination)

        if not command_line:
            return

        try:
            dependencies_description = PulsarJobRunner.__dependencies_description(client, job_wrapper)
            rewrite_paths = not PulsarJobRunner.__rewrite_parameters(client)
            unstructured_path_rewrites = {}
            output_names = []
            if compute_environment:
                unstructured_path_rewrites = compute_environment.unstructured_path_rewrites
                output_names = compute_environment.output_names()

            if self.app.config.metadata_strategy == "legacy":
                # Drop this branch in 19.09.
                metadata_directory = job_wrapper.working_directory
            else:
                metadata_directory = os.path.join(job_wrapper.working_directory, "metadata")

            config_files = job_wrapper.extra_filenames
            tool_script = os.path.join(job_wrapper.working_directory, "tool_script.sh")
            if os.path.exists(tool_script):
                log.debug("Registering tool_script for Pulsar transfer [%s]" % tool_script)
                config_files.append(tool_script)
            client_job_description = ClientJobDescription(
                command_line=command_line,
                input_files=self.get_input_files(job_wrapper),
                client_outputs=self.__client_outputs(client, job_wrapper),
                working_directory=job_wrapper.tool_working_directory,
                metadata_directory=metadata_directory,
                tool=job_wrapper.tool,
                config_files=config_files,
                dependencies_description=dependencies_description,
                env=client.env,
                rewrite_paths=rewrite_paths,
                arbitrary_files=unstructured_path_rewrites,
                touch_outputs=output_names,
            )
            job_id = pulsar_submit_job(client, client_job_description, remote_job_config)
            log.info("Pulsar job submitted with job_id %s" % job_id)
            job_wrapper.set_job_destination(job_destination, job_id)
            job_wrapper.change_state(model.Job.states.QUEUED)
        except Exception:
            job_wrapper.fail("failure running job", exception=True)
            log.exception("failure running job %d", job_wrapper.job_id)
            return

        pulsar_job_state = AsynchronousJobState()
        pulsar_job_state.job_wrapper = job_wrapper
        pulsar_job_state.job_id = job_id
        pulsar_job_state.old_state = True
        pulsar_job_state.running = False
        pulsar_job_state.job_destination = job_destination
        self.monitor_job(pulsar_job_state)

    def __needed_features(self, client):
        return {
            'remote_metadata': PulsarJobRunner.__remote_metadata(client),
        }

    def __prepare_job(self, job_wrapper, job_destination):
        """Build command-line and Pulsar client for this job."""
        command_line = None
        client = None
        remote_job_config = None
        compute_environment = None

        fail_or_resubmit = False
        try:
            client = self.get_client_from_wrapper(job_wrapper)
            tool = job_wrapper.tool
            remote_job_config = client.setup(tool.id, tool.version, tool.requires_galaxy_python_environment)
            needed_features = self.__needed_features(client)
            PulsarJobRunner.check_job_config(remote_job_config, check_features=needed_features)
            rewrite_parameters = PulsarJobRunner.__rewrite_parameters(client)
            prepare_kwds = {}
            if rewrite_parameters:
                compute_environment = PulsarComputeEnvironment(client, job_wrapper, remote_job_config)
                prepare_kwds['compute_environment'] = compute_environment
            job_wrapper.prepare(**prepare_kwds)
            self.__prepare_input_files_locally(job_wrapper)
            remote_metadata = PulsarJobRunner.__remote_metadata(client)
            dependency_resolution = PulsarJobRunner.__dependency_resolution(client)
            metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config)
            remote_working_directory = remote_job_config['working_directory']
            remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir))
            remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files"))
            # This should be remote_job_directory ideally, this patch using configs is a workaround for
            # older Pulsar versions that didn't support writing stuff to the job directory natively.
            script_directory = os.path.join(remote_job_directory, "configs")
            remote_command_params = dict(
                working_directory=remote_job_config['metadata_directory'],
                script_directory=script_directory,
                metadata_kwds=metadata_kwds,
                dependency_resolution=dependency_resolution,
            )
            # TODO: Following directories work for Pulsar, always worked for Pulsar - but should be
            # calculated at some other level.
            container = self._find_container(
                job_wrapper,
                compute_working_directory=remote_working_directory,
                compute_tool_directory=remote_tool_directory,
                compute_job_directory=remote_job_directory,
            )
            job_wrapper.disable_commands_in_new_shell()

            # Pulsar handles ``create_tool_working_directory`` and
            # ``include_work_dir_outputs`` details.
            command_line = build_command(
                self,
                job_wrapper=job_wrapper,
                container=container,
                include_metadata=remote_metadata,
                create_tool_working_directory=False,
                include_work_dir_outputs=False,
                remote_command_params=remote_command_params,
                remote_job_directory=remote_job_directory,
            )
        except UnsupportedPulsarException:
            log.exception("failure running job %d, unsupported Pulsar target", job_wrapper.job_id)
            fail_or_resubmit = True
        except PulsarClientTransportError:
            log.exception("failure running job %d, Pulsar connection failed", job_wrapper.job_id)
            fail_or_resubmit = True
        except Exception:
            log.exception("failure running job %d", job_wrapper.job_id)
            fail_or_resubmit = True

        # If we were unable to get a command line, there was problem
        fail_or_resubmit = fail_or_resubmit or not command_line
        if fail_or_resubmit:
            job_state = self._job_state(job_wrapper.get_job(), job_wrapper)
            self.work_queue.put((self.fail_job, job_state))

        return command_line, client, remote_job_config, compute_environment

    def __prepare_input_files_locally(self, job_wrapper):
        """Run task splitting commands locally."""
        prepare_input_files_cmds = getattr(job_wrapper, 'prepare_input_files_cmds', None)
        if prepare_input_files_cmds is not None:
            for cmd in prepare_input_files_cmds:  # run the commands to stage the input files
                subprocess.check_call(cmd, shell=True)
            job_wrapper.prepare_input_files_cmds = None  # prevent them from being used in-line

    def _populate_parameter_defaults(self, job_destination):
        updated = False
        params = job_destination.params
        for key, value in self.destination_defaults.items():
            if key in params:
                if value is PARAMETER_SPECIFICATION_IGNORED:
                    log.warning("Pulsar runner in selected configuration ignores parameter %s" % key)
                continue
            # if self.runner_params.get( key, None ):
            #    # Let plugin define defaults for some parameters -
            #    # for instance that way jobs_directory can be
            #    # configured next to AMQP url (where it belongs).
            #    params[ key ] = self.runner_params[ key ]
            #    continue

            if not value:
                continue

            if value is PARAMETER_SPECIFICATION_REQUIRED:
                raise Exception("Pulsar destination does not define required parameter %s" % key)
            elif value is not PARAMETER_SPECIFICATION_IGNORED:
                params[key] = value
                updated = True
        return updated

    def get_output_files(self, job_wrapper):
        output_paths = job_wrapper.get_output_fnames()
        return [str(o) for o in output_paths]   # Force job_path from DatasetPath objects.

    def get_input_files(self, job_wrapper):
        input_paths = job_wrapper.get_input_paths()
        return [str(i) for i in input_paths]  # Force job_path from DatasetPath objects.

    def get_client_from_wrapper(self, job_wrapper):
        job_id = job_wrapper.job_id
        if hasattr(job_wrapper, 'task_id'):
            job_id = "%s_%s" % (job_id, job_wrapper.task_id)
        params = job_wrapper.job_destination.params.copy()
        user = job_wrapper.get_job().user
        if user:
            for key, value in params.items():
                if value:
                    params[key] = model.User.expand_user_properties(user, value)

        env = getattr(job_wrapper.job_destination, "env", [])
        return self.get_client(params, job_id, env)

    def get_client_from_state(self, job_state):
        job_destination_params = job_state.job_destination.params
        job_id = job_state.job_id
        return self.get_client(job_destination_params, job_id)

    def get_client(self, job_destination_params, job_id, env=[]):
        # Cannot use url_for outside of web thread.
        # files_endpoint = url_for( controller="job_files", job_id=encoded_job_id )

        encoded_job_id = self.app.security.encode_id(job_id)
        job_key = self.app.security.encode_id(job_id, kind="jobs_files")
        endpoint_base = "%s/api/jobs/%s/files?job_key=%s"
        if self.app.config.nginx_upload_job_files_path:
            endpoint_base = "%s" + \
                            self.app.config.nginx_upload_job_files_path + \
                            "?job_id=%s&job_key=%s"
        files_endpoint = endpoint_base % (
            self.galaxy_url,
            encoded_job_id,
            job_key
        )
        get_client_kwds = dict(
            job_id=str(job_id),
            files_endpoint=files_endpoint,
            env=env
        )
        return self.client_manager.get_client(job_destination_params, **get_client_kwds)

    def finish_job(self, job_state):
        stderr = stdout = ''
        job_wrapper = job_state.job_wrapper
        try:
            client = self.get_client_from_state(job_state)
            run_results = client.full_status()
            remote_metadata_directory = run_results.get("metadata_directory", None)
            stdout = run_results.get('stdout', '')
            stderr = run_results.get('stderr', '')
            exit_code = run_results.get('returncode', None)
            pulsar_outputs = PulsarOutputs.from_status_response(run_results)
            # Use Pulsar client code to transfer/copy files back
            # and cleanup job if needed.
            completed_normally = \
                job_wrapper.get_state() not in [model.Job.states.ERROR, model.Job.states.DELETED]
            cleanup_job = job_wrapper.cleanup_job
            client_outputs = self.__client_outputs(client, job_wrapper)
            finish_args = dict(client=client,
                               job_completed_normally=completed_normally,
                               cleanup_job=cleanup_job,
                               client_outputs=client_outputs,
                               pulsar_outputs=pulsar_outputs)
            failed = pulsar_finish_job(**finish_args)
            if failed:
                job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True)
        except Exception:
            self.fail_job(job_state, message=GENERIC_REMOTE_ERROR, exception=True)
            log.exception("failure finishing job %d", job_wrapper.job_id)
            return
        if not PulsarJobRunner.__remote_metadata(client):
            self._handle_metadata_externally(job_wrapper, resolve_requirements=True)
        # Finish the job
        try:
            job_metrics_directory = os.path.join(job_wrapper.working_directory, "metadata")
            # Following check is a hack for jobs started during 19.01 or earlier release
            # and finishing with a 19.05 code base. Eliminate the hack in 19.09 or later
            # along with hacks for legacy metadata compute strategy.
            if not os.path.exists(job_metrics_directory) or not any(["__instrument" in f for f in os.listdir(job_metrics_directory)]):
                job_metrics_directory = job_wrapper.working_directory
            job_wrapper.finish(
                stdout,
                stderr,
                exit_code,
                remote_metadata_directory=remote_metadata_directory,
                job_metrics_directory=job_metrics_directory,
            )
        except Exception:
            log.exception("Job wrapper finish method failed")
            job_wrapper.fail("Unable to finish job", exception=True)

    def fail_job(self, job_state, message=GENERIC_REMOTE_ERROR, full_status=None, exception=False):
        """Seperated out so we can use the worker threads for it."""
        self.stop_job(job_state.job_wrapper)
        stdout = ""
        stderr = ""
        if full_status:
            stdout = full_status.get("stdout", "")
            stderr = full_status.get("stderr", "")
        self._handle_runner_state('failure', job_state)
        if not job_state.runner_state_handled:
            job_state.job_wrapper.fail(getattr(job_state, "fail_message", message),
                                       tool_stdout=stdout, tool_stderr=stderr, exception=exception)

    def check_pid(self, pid):
        try:
            os.kill(pid, 0)
            return True
        except OSError as e:
            if e.errno == errno.ESRCH:
                log.debug("check_pid(): PID %d is dead" % pid)
            else:
                log.warning("check_pid(): Got errno %s when attempting to check PID %d: %s" % (errno.errorcode[e.errno], pid, e.strerror))
            return False

    def stop_job(self, job_wrapper):
        job = job_wrapper.get_job()
        if not job.job_runner_external_id:
            return
        # if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
        client = self.get_client(job.destination_params, job.job_runner_external_id)
        job_ext_output_metadata = job.get_external_output_metadata()
        if not PulsarJobRunner.__remote_metadata(client) and job_ext_output_metadata:
            pid = job_ext_output_metadata[0].job_runner_external_pid  # every JobExternalOutputMetadata has a pid set, we just need to take from one of them
            if pid in [None, '']:
                log.warning("stop_job(): %s: no PID in database for job, unable to stop" % job.id)
                return
            pid = int(pid)
            if not self.check_pid(pid):
                log.warning("stop_job(): %s: PID %d was already dead or can't be signaled" % (job.id, pid))
                return
            for sig in [15, 9]:
                try:
                    os.killpg(pid, sig)
                except OSError as e:
                    log.warning("stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % (job.id, errno.errorcode[e.errno], sig, pid, e.strerror))
                    return  # give up
                sleep(2)
                if not self.check_pid(pid):
                    log.debug("stop_job(): %s: PID %d successfully killed with signal %d" % (job.id, pid, sig))
                    return
                else:
                    log.warning("stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % (job.id, pid))
        else:
            # Remote kill
            pulsar_url = job.job_runner_name
            job_id = job.job_runner_external_id
            log.debug("Attempt remote Pulsar kill of job with url %s and id %s" % (pulsar_url, job_id))
            client = self.get_client(job.destination_params, job_id)
            client.kill()

    def recover(self, job, job_wrapper):
        """Recover jobs stuck in the queued/running state when Galaxy started."""
        job_state = self._job_state(job, job_wrapper)
        job_wrapper.command_line = job.get_command_line()
        state = job.get_state()
        if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]:
            log.debug("(Pulsar/%s) is still in running state, adding to the Pulsar queue" % (job.id))
            job_state.old_state = True
            job_state.running = state == model.Job.states.RUNNING
            self.monitor_queue.put(job_state)

    def shutdown(self):
        super(PulsarJobRunner, self).shutdown()
        self.client_manager.shutdown()

    def _job_state(self, job, job_wrapper):
        job_state = AsynchronousJobState()
        # TODO: Determine why this is set when using normal message queue updates
        # but not CLI submitted MQ updates...
        raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id
        job_state.job_id = str(raw_job_id)
        job_state.runner_url = job_wrapper.get_job_runner_url()
        job_state.job_destination = job_wrapper.job_destination
        job_state.job_wrapper = job_wrapper
        return job_state

    def __client_outputs(self, client, job_wrapper):
        work_dir_outputs = self.get_work_dir_outputs(job_wrapper)
        output_files = self.get_output_files(job_wrapper)
        if self.app.config.metadata_strategy == "legacy":
            # Drop this branch in 19.09.
            metadata_directory = job_wrapper.working_directory
        else:
            metadata_directory = os.path.join(job_wrapper.working_directory, "metadata")
        client_outputs = ClientOutputs(
            working_directory=job_wrapper.tool_working_directory,
            metadata_directory=metadata_directory,
            work_dir_outputs=work_dir_outputs,
            output_files=output_files,
            version_file=job_wrapper.get_version_string_path(),
        )
        return client_outputs

    @staticmethod
    def check_job_config(remote_job_config, check_features=None):
        check_features = check_features or {}
        # 0.6.0 was newest Pulsar version that did not report it's version.
        pulsar_version = packaging.version.parse(remote_job_config.get('pulsar_version', "0.6.0"))
        needed_version = packaging.version.parse("0.0.0")
        log.info("pulsar_version is %s" % pulsar_version)
        for feature in list(check_features.keys()) + ['_default_']:
            if pulsar_version < MINIMUM_PULSAR_VERSIONS[feature]:
                needed_version = max(needed_version, MINIMUM_PULSAR_VERSIONS[feature])
        if pulsar_version < needed_version:
            raise UnsupportedPulsarException(needed_version)

    @staticmethod
    def __dependencies_description(pulsar_client, job_wrapper):
        dependency_resolution = PulsarJobRunner.__dependency_resolution(pulsar_client)
        remote_dependency_resolution = dependency_resolution == "remote"
        if not remote_dependency_resolution:
            return None
        requirements = job_wrapper.tool.requirements or []
        installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies or []
        return dependencies.DependenciesDescription(
            requirements=requirements,
            installed_tool_dependencies=installed_tool_dependencies,
        )

    @staticmethod
    def __dependency_resolution(pulsar_client):
        dependency_resolution = pulsar_client.destination_params.get("dependency_resolution", "remote")
        if dependency_resolution not in ["none", "local", "remote"]:
            raise Exception("Unknown dependency_resolution value encountered %s" % dependency_resolution)
        return dependency_resolution

    @staticmethod
    def __remote_metadata(pulsar_client):
        remote_metadata = string_as_bool_or_none(pulsar_client.destination_params.get("remote_metadata", False))
        return remote_metadata

    @staticmethod
    def __use_remote_datatypes_conf(pulsar_client):
        """Use remote metadata datatypes instead of Galaxy's.

        When setting remote metadata, use integrated datatypes from this
        Galaxy instance or use the datatypes config configured via the remote
        Pulsar.

        Both options are broken in different ways for same reason - datatypes
        may not match. One can push the local datatypes config to the remote
        server - but there is no guarentee these datatypes will be defined
        there. Alternatively, one can use the remote datatype config - but
        there is no guarentee that it will contain all the datatypes available
        to this Galaxy.
        """
        use_remote_datatypes = string_as_bool_or_none(pulsar_client.destination_params.get("use_remote_datatypes", False))
        return use_remote_datatypes

    @staticmethod
    def __rewrite_parameters(pulsar_client):
        return string_as_bool_or_none(pulsar_client.destination_params.get("rewrite_parameters", False)) or False

    def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config):
        metadata_kwds = {}
        if remote_metadata:
            remote_system_properties = remote_job_config.get("system_properties", {})
            remote_galaxy_home = remote_system_properties.get("galaxy_home", None)
            if not remote_galaxy_home:
                raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
            metadata_kwds['exec_dir'] = remote_galaxy_home
            outputs_directory = remote_job_config['outputs_directory']
            working_directory = remote_job_config['working_directory']
            metadata_directory = remote_job_config['metadata_directory']
            # For metadata calculation, we need to build a list of of output
            # file objects with real path indicating location on Galaxy server
            # and false path indicating location on compute server. Since the
            # Pulsar disables from_work_dir copying as part of the job command
            # line we need to take the list of output locations on the Pulsar
            # server (produced by self.get_output_files(job_wrapper)) and for
            # each work_dir output substitute the effective path on the Pulsar
            # server relative to the remote working directory as the
            # false_path to send the metadata command generation module.
            work_dir_outputs = self.get_work_dir_outputs(job_wrapper, tool_working_directory=working_directory)
            outputs = [Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)]
            for output in outputs:
                for pulsar_workdir_path, real_path in work_dir_outputs:
                    if real_path == output.real_path:
                        output.false_path = pulsar_workdir_path
            metadata_kwds['output_fnames'] = outputs
            metadata_kwds['compute_tmp_dir'] = metadata_directory
            metadata_kwds['config_root'] = remote_galaxy_home
            default_config_file = os.path.join(remote_galaxy_home, 'config/galaxy.ini')
            metadata_kwds['config_file'] = remote_system_properties.get('galaxy_config_file', default_config_file)
            metadata_kwds['dataset_files_path'] = remote_system_properties.get('galaxy_dataset_files_path', None)
            if PulsarJobRunner.__use_remote_datatypes_conf(client):
                remote_datatypes_config = remote_system_properties.get('galaxy_datatypes_config_file', None)
                if not remote_datatypes_config:
                    log.warning(NO_REMOTE_DATATYPES_CONFIG)
                    remote_datatypes_config = os.path.join(remote_galaxy_home, 'datatypes_conf.xml')
                metadata_kwds['datatypes_config'] = remote_datatypes_config
            else:
                datatypes_config = os.path.join(job_wrapper.working_directory, 'registry.xml')
                self.app.datatypes_registry.to_xml_file(path=datatypes_config)
                # Ensure this file gets pushed out to the remote config dir.
                job_wrapper.extra_filenames.append(datatypes_config)
                metadata_kwds['datatypes_config'] = datatypes_config
        return metadata_kwds


[docs]class PulsarLegacyJobRunner(PulsarJobRunner): """Flavor of Pulsar job runner mimicking behavior of old LWR runner.""" destination_defaults = dict( rewrite_parameters="false", dependency_resolution="local", )
[docs]class PulsarMQJobRunner(PulsarJobRunner): """Flavor of Pulsar job runner with sensible defaults for message queue communication.""" destination_defaults = dict( default_file_action="remote_transfer", rewrite_parameters="true", dependency_resolution="remote", jobs_directory=PARAMETER_SPECIFICATION_REQUIRED, url=PARAMETER_SPECIFICATION_IGNORED, private_token=PARAMETER_SPECIFICATION_IGNORED ) def _monitor(self): # This is a message queue driven runner, don't monitor # just setup required callback. self._init_noop_monitor() self.client_manager.ensure_has_status_update_callback(self.__async_update) self.client_manager.ensure_has_ack_consumers() def __async_update(self, full_status): job_id = None try: job_id = full_status["job_id"] job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(job_id) job_state = self._job_state(job, job_wrapper) self._update_job_state_for_status(job_state, full_status["status"], full_status=full_status) except Exception: log.exception("Failed to update Pulsar job status for job_id %s", job_id) raise
# Nothing else to do? - Attempt to fail the job?
[docs]class PulsarRESTJobRunner(PulsarJobRunner): """Flavor of Pulsar job runner with sensible defaults for RESTful usage.""" destination_defaults = dict( default_file_action="transfer", rewrite_parameters="true", dependency_resolution="remote", url=PARAMETER_SPECIFICATION_REQUIRED, )
[docs]class PulsarEmbeddedJobRunner(PulsarJobRunner): """Flavor of Puslar job runnner that runs Pulsar's server code directly within Galaxy. This is an appropriate job runner for when the desire is to use Pulsar staging but their is not need to run a remote service. """ destination_defaults = dict( default_file_action="copy", rewrite_parameters="true", dependency_resolution="remote", ) default_build_pulsar_app = True
class PulsarComputeEnvironment(ComputeEnvironment): def __init__(self, pulsar_client, job_wrapper, remote_job_config): self.pulsar_client = pulsar_client self.job_wrapper = job_wrapper self.local_path_config = job_wrapper.default_compute_environment() self.unstructured_path_rewrites = {} # job_wrapper.prepare is going to expunge the job backing the following # computations, so precalculate these paths. self._wrapper_input_paths = self.local_path_config.input_paths() self._wrapper_output_paths = self.local_path_config.output_paths() self.path_mapper = PathMapper(pulsar_client, remote_job_config, self.local_path_config.working_directory()) self._config_directory = remote_job_config["configs_directory"] self._working_directory = remote_job_config["working_directory"] self._sep = remote_job_config["system_properties"]["separator"] self._tool_dir = remote_job_config["tools_directory"] version_path = self.local_path_config.version_path() new_version_path = self.path_mapper.remote_version_path_rewrite(version_path) if new_version_path: version_path = new_version_path self._version_path = version_path def output_names(self): # Maybe this should use the path mapper, but the path mapper just uses basenames return self.job_wrapper.get_output_basenames() def output_paths(self): local_output_paths = self._wrapper_output_paths results = [] for local_output_path in local_output_paths: wrapper_path = str(local_output_path) remote_path = self.path_mapper.remote_output_path_rewrite(wrapper_path) results.append(self._dataset_path(local_output_path, remote_path)) return results def input_paths(self): local_input_paths = self._wrapper_input_paths results = [] for local_input_path in local_input_paths: wrapper_path = str(local_input_path) # This will over-copy in some cases. For instance in the case of task # splitting, this input will be copied even though only the work dir # input will actually be used. remote_path = self.path_mapper.remote_input_path_rewrite(wrapper_path) results.append(self._dataset_path(local_input_path, remote_path)) return results def _dataset_path(self, local_dataset_path, remote_path): remote_extra_files_path = None if remote_path: remote_extra_files_path = "%s_files" % remote_path[0:-len(".dat")] return local_dataset_path.with_path_for_job(remote_path, remote_extra_files_path) def working_directory(self): return self._working_directory def config_directory(self): return self._config_directory def new_file_path(self): return self.working_directory() # Problems with doing this? def sep(self): return self._sep def version_path(self): return self._version_path def rewriter(self, parameter_value): unstructured_path_rewrites = self.unstructured_path_rewrites if parameter_value in unstructured_path_rewrites: # Path previously mapped, use previous mapping. return unstructured_path_rewrites[parameter_value] if parameter_value in unstructured_path_rewrites.values(): # Path is a rewritten remote path (this might never occur, # consider dropping check...) return parameter_value rewrite, new_unstructured_path_rewrites = self.path_mapper.check_for_arbitrary_rewrite(parameter_value) if rewrite: unstructured_path_rewrites.update(new_unstructured_path_rewrites) return rewrite else: # Did need to rewrite, use original path or value. return parameter_value def unstructured_path_rewriter(self): return self.rewriter def tool_directory(self): return self._tool_dir def home_directory(self): # TODO: revisit and implement this, won't break anything working in the # meantime. return None def tmp_directory(self): # TODO: revisit and implement this, won't break anything working in the # meantime. return None class UnsupportedPulsarException(Exception): def __init__(self, needed): super(UnsupportedPulsarException, self).__init__(UPGRADE_PULSAR_ERROR % needed)