Source code for galaxy.jobs.runners.pulsar

"""Job runner used to execute Galaxy jobs through Pulsar.

More information on Pulsar can be found at https://pulsar.readthedocs.io/ .
"""

import copy
import errno
import logging
import os
import re
import subprocess
from time import sleep
from typing import (
    Any,
    Dict,
    Optional,
)

import pulsar.core
import yaml
from packaging.version import Version
from pulsar.client import (
    build_client_manager,
    CLIENT_INPUT_PATH_TYPES,
    ClientInput,
    ClientInputs,
    ClientJobDescription,
    ClientOutputs,
    EXTENDED_METADATA_DYNAMIC_COLLECTION_PATTERN,
    finish_job as pulsar_finish_job,
    PathMapper,
    PulsarClientTransportError,
    PulsarOutputs,
    submit_job as pulsar_submit_job,
    url_to_destination_params,
)

# TODO: Perform pulsar release with this included in the client package
from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN
from sqlalchemy import select

from galaxy import model
from galaxy.job_execution.compute_environment import (
    ComputeEnvironment,
    dataset_path_to_extra_path,
)
from galaxy.jobs import JobDestination
from galaxy.jobs.command_factory import build_command
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState,
    JobState,
)
from galaxy.model.base import check_database_connection
from galaxy.tool_util.deps import dependencies
from galaxy.util import (
    galaxy_directory,
    specs,
    string_as_bool_or_none,
    unicodify,
)

log = logging.getLogger(__name__)

__all__ = (
    "PulsarLegacyJobRunner",
    "PulsarRESTJobRunner",
    "PulsarMQJobRunner",
    "PulsarEmbeddedJobRunner",
    "PulsarEmbeddedMQJobRunner",
)

MINIMUM_PULSAR_VERSIONS = {
    "_default_": Version("0.7.0.dev3"),
    "remote_metadata": Version("0.8.0"),
    "remote_container_handling": Version("0.9.1.dev0"),  # probably 0.10 ultimately?
}

NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory."
NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml."
GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server."
FAILED_REMOTE_ERROR = "Remote job server indicated a problem running or monitoring this job."
LOST_REMOTE_ERROR = "Remote job server could not determine this job's state."

UPGRADE_PULSAR_ERROR = "Galaxy is misconfigured, please contact administrator. The target Pulsar server is unsupported, this version of Galaxy requires Pulsar version %s or newer."

# Is there a good way to infer some default for this? Can only use
# url_for from web threads. https://gist.github.com/jmchilton/9098762
DEFAULT_GALAXY_URL = "http://localhost:8080"

PULSAR_PARAM_SPECS = dict(
    transport=dict(map=specs.to_str_or_none, valid=specs.is_in("urllib", "curl", None), default=None),
    transport_timeout=dict(
        map=lambda val: None if val == "None" else int(val),
        default=None,
    ),
    cache=dict(
        map=specs.to_bool_or_none,
        default=None,
    ),
    remote_container_handling=dict(
        map=specs.to_bool,
        default=False,
    ),
    amqp_url=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_key_prefix=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    galaxy_url=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    secret=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    pulsar_config=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    pulsar_app_config=dict(
        default=None,
    ),
    manager=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    persistence_directory=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_acknowledge=dict(map=specs.to_bool_or_none, default=None),
    amqp_ack_republish_time=dict(
        map=lambda val: None if val == "None" else int(val),
        default=None,
    ),
    amqp_consumer_timeout=dict(
        map=lambda val: None if val == "None" else float(val),
        default=None,
    ),
    amqp_connect_ssl_ca_certs=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_connect_ssl_keyfile=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_connect_ssl_certfile=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    amqp_connect_ssl_cert_reqs=dict(
        map=specs.to_str_or_none,
        default=None,
    ),
    # https://kombu.readthedocs.io/en/latest/reference/kombu.html#kombu.Producer.publish
    amqp_publish_retry=dict(
        map=specs.to_bool,
        default=False,
    ),
    amqp_publish_priority=dict(
        map=int,
        valid=lambda x: 0 <= x and x <= 9,
        default=0,
    ),
    # https://kombu.readthedocs.io/en/latest/reference/kombu.html#kombu.Exchange.delivery_mode
    amqp_publish_delivery_mode=dict(
        map=str,
        valid=specs.is_in("transient", "persistent"),
        default="persistent",
    ),
    amqp_publish_retry_max_retries=dict(
        map=int,
        default=None,
    ),
    amqp_publish_retry_interval_start=dict(
        map=int,
        default=None,
    ),
    amqp_publish_retry_interval_step=dict(
        map=int,
        default=None,
    ),
    amqp_publish_retry_interval_max=dict(
        map=int,
        default=None,
    ),
)


PARAMETER_SPECIFICATION_REQUIRED = object()
PARAMETER_SPECIFICATION_IGNORED = object()


class PulsarJobRunner(AsynchronousJobRunner):
    """Base class for pulsar job runners."""

    start_methods = ["_init_worker_threads", "_init_client_manager", "_monitor"]
    runner_name = "PulsarJobRunner"
    default_build_pulsar_app = False
    use_mq = False
    poll = True

    def __init__(self, app, nworkers, **kwds):
        """Start the job runner."""
        super().__init__(app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds)
        galaxy_url = self.runner_params.galaxy_url
        if not galaxy_url:
            galaxy_url = app.config.galaxy_infrastructure_url
        if galaxy_url:
            galaxy_url = galaxy_url.rstrip("/")
        self.galaxy_url = galaxy_url

    def _monitor(self):
        if self.use_mq:
            # This is a message queue driven runner, don't monitor
            # just setup required callback.
            self.client_manager.ensure_has_status_update_callback(self.__async_update)
            self.client_manager.ensure_has_ack_consumers()

        if self.poll:
            self._init_monitor_thread()
        else:
            self._init_noop_monitor()

    def _init_client_manager(self):
        pulsar_conf = self.runner_params.get("pulsar_app_config", None)
        pulsar_conf_file = None
        if pulsar_conf is None:
            pulsar_conf_file = self.runner_params.get("pulsar_config", None)
        self.__init_pulsar_app(pulsar_conf, pulsar_conf_file)

        client_manager_kwargs = {}
        for kwd in "manager", "cache", "transport", "persistence_directory":
            client_manager_kwargs[kwd] = self.runner_params[kwd]
        if self.pulsar_app is not None:
            client_manager_kwargs["pulsar_app"] = self.pulsar_app
            # TODO: Hack remove this following line pulsar lib update
            # that includes https://github.com/galaxyproject/pulsar/commit/ce0636a5b64fae52d165bcad77b2caa3f0e9c232
            client_manager_kwargs["file_cache"] = None

        for kwd in self.runner_params.keys():
            if kwd.startswith("amqp_") or kwd.startswith("transport_"):
                client_manager_kwargs[kwd] = self.runner_params[kwd]
        self.client_manager = build_client_manager(**client_manager_kwargs)

    def __init_pulsar_app(self, conf, pulsar_conf_path):
        if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
            self.pulsar_app = None
            return
        if conf is None:
            conf = {}
            if pulsar_conf_path is None:
                log.info("Creating a Pulsar app with default configuration (no pulsar_conf specified).")
            else:
                log.info(f"Loading Pulsar app configuration from {pulsar_conf_path}")
                with open(pulsar_conf_path) as f:
                    conf.update(yaml.safe_load(f) or {})
        if "job_metrics_config_file" not in conf:
            conf["job_metrics"] = self.app.job_metrics
        if "staging_directory" not in conf:
            conf["staging_directory"] = os.path.join(self.app.config.data_dir, "pulsar_staging")
        if "persistence_directory" not in conf:
            conf["persistence_directory"] = os.path.join(self.app.config.data_dir, "pulsar_persisted_data")
        if "galaxy_home" not in conf:
            conf["galaxy_home"] = galaxy_directory()
        self.pulsar_app = pulsar.core.PulsarApp(**conf)

    def url_to_destination(self, url):
        """Convert a legacy URL to a job destination."""
        return JobDestination(runner="pulsar", params=url_to_destination_params(url))

    def check_watched_item(self, job_state):
        if self.use_mq:
            # Might still need to check pod IPs.
            job_wrapper = job_state.job_wrapper
            guest_ports = job_wrapper.guest_ports
            if len(guest_ports) > 0:
                persisted_state = job_wrapper.get_state()
                if persisted_state in model.Job.terminal_states + [model.Job.states.DELETING]:
                    log.debug(
                        "(%s) Watched job in terminal state, will stop monitoring: %s",
                        job_state.job_id,
                        persisted_state,
                    )
                    job_state = None
                elif persisted_state == model.Job.states.RUNNING:
                    client = self.get_client_from_state(job_state)
                    job_ip = client.job_ip()
                    if job_ip:
                        ports_dict = {}
                        for guest_port in guest_ports:
                            ports_dict[str(guest_port)] = dict(host=job_ip, port=guest_port, protocol="http")
                        self.app.interactivetool_manager.configure_entry_points(job_wrapper.get_job(), ports_dict)
                        log.debug("(%s) Got ports for entry point: %s", job_state.job_id, str(ports_dict))
                        job_state = None
            else:
                # No need to monitor MQ jobs that have no entry points
                job_state = None
            return job_state
        else:
            return self.check_watched_item_state(job_state)

    def check_watched_item_state(self, job_state):
        try:
            client = self.get_client_from_state(job_state)
            status = client.get_status()
        except PulsarClientTransportError as exc:
            log.error("Communication error with Pulsar server on state check, will retry: %s", exc)
            return job_state
        except Exception:
            # An orphaned job was put into the queue at app startup, so remote server went down
            # either way we are done I guess.
            self.mark_as_finished(job_state)
            return None
        job_state = self._update_job_state_for_status(job_state, status)
        return job_state

    def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
        log.debug("(%s) Received status update: %s", job_state.job_id, pulsar_status)
        if pulsar_status in ["complete", "cancelled"]:
            self.mark_as_finished(job_state)
            return None
        if job_state.job_wrapper.get_state() == model.Job.states.STOPPED:
            client = self.get_client_from_state(job_state)
            client.kill()
            self.mark_as_finished(job_state)
            return None
        if pulsar_status in ["failed", "lost"]:
            if pulsar_status == "failed":
                message = FAILED_REMOTE_ERROR
            else:
                message = LOST_REMOTE_ERROR
            if not job_state.job_wrapper.get_job().finished:
                self.fail_job(job_state, message=message, full_status=full_status)
            return None
        if pulsar_status == "running" and not job_state.running:
            job_state.running = True
            job_state.job_wrapper.change_state(model.Job.states.RUNNING)
        return job_state

    def queue_job(self, job_wrapper):
        job_destination = job_wrapper.job_destination
        self._populate_parameter_defaults(job_destination)

        command_line, client, remote_job_config, compute_environment, remote_container = self.__prepare_job(
            job_wrapper, job_destination
        )

        if not command_line:
            return

        try:
            dependencies_description = PulsarJobRunner.__dependencies_description(client, job_wrapper)
            rewrite_paths = not PulsarJobRunner.__rewrite_parameters(client)
            path_rewrites_unstructured = {}
            output_names = []
            if compute_environment:
                path_rewrites_unstructured = compute_environment.path_rewrites_unstructured
                output_names = compute_environment.output_names()

                client_inputs_list = []
                for input_dataset_wrapper in job_wrapper.job_io.get_input_paths():
                    # str here to resolve false_path if set on a DatasetPath object.
                    path = str(input_dataset_wrapper)
                    object_store_ref = {
                        "dataset_id": input_dataset_wrapper.dataset_id,
                        "dataset_uuid": str(input_dataset_wrapper.dataset_uuid),
                        "object_store_id": input_dataset_wrapper.object_store_id,
                    }
                    client_inputs_list.append(
                        ClientInput(path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH, object_store_ref=object_store_ref)
                    )

                for input_extra_path in compute_environment.path_rewrites_input_extra.keys():
                    # TODO: track dataset for object_Store_ref...
                    client_inputs_list.append(
                        ClientInput(input_extra_path, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)
                    )

                for input_metadata_path in compute_environment.path_rewrites_input_metadata.keys():
                    # TODO: track dataset for object_Store_ref...
                    client_inputs_list.append(
                        ClientInput(input_metadata_path, CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH)
                    )

                input_files = None
                client_inputs = ClientInputs(client_inputs_list)
            else:
                input_files = self.get_input_files(job_wrapper)
                client_inputs = None

            if self.app.config.metadata_strategy == "legacy":
                # Drop this branch in 19.09.
                metadata_directory = job_wrapper.working_directory
            else:
                metadata_directory = os.path.join(job_wrapper.working_directory, "metadata")

            dest_params = job_destination.params
            remote_pulsar_app_config = dest_params.get("pulsar_app_config", {}).copy()
            if "pulsar_app_config_path" in dest_params:
                pulsar_app_config_path = dest_params["pulsar_app_config_path"]
                with open(pulsar_app_config_path) as fh:
                    remote_pulsar_app_config.update(yaml.safe_load(fh))
            job_directory_files = []
            config_files = job_wrapper.extra_filenames
            tool_script = os.path.join(job_wrapper.working_directory, "tool_script.sh")
            if os.path.exists(tool_script):
                log.debug(f"Registering tool_script for Pulsar transfer [{tool_script}]")
                job_directory_files.append(tool_script)
                config_files.append(tool_script)
            # Following is job destination environment variables
            env = client.env
            # extend it with tool defined environment variables
            tool_envs = job_wrapper.environment_variables
            env.extend(tool_envs)
            for tool_env in tool_envs:
                job_directory_path = tool_env.get("job_directory_path")
                if job_directory_path:
                    config_files.append(job_directory_path)
            tool_directory_required_files = job_wrapper.tool.required_files
            client_job_description = ClientJobDescription(
                command_line=command_line,
                input_files=input_files,
                client_inputs=client_inputs,  # Only one of these input defs should be non-None
                client_outputs=self.__client_outputs(client, job_wrapper),
                working_directory=job_wrapper.tool_working_directory,
                metadata_directory=metadata_directory,
                tool=job_wrapper.tool,
                config_files=config_files,
                dependencies_description=dependencies_description,
                env=env,
                rewrite_paths=rewrite_paths,
                arbitrary_files=path_rewrites_unstructured,
                touch_outputs=output_names,
                remote_pulsar_app_config=remote_pulsar_app_config,
                job_directory_files=job_directory_files,
                container=None if not remote_container else remote_container.container_id,
                guest_ports=job_wrapper.guest_ports,
                tool_directory_required_files=tool_directory_required_files,
            )
            external_job_id = pulsar_submit_job(client, client_job_description, remote_job_config)
            log.info(f"Pulsar job submitted with job_id {external_job_id}")
            job = job_wrapper.get_job()
            # Set the job destination here (unlike other runners) because there are likely additional job destination
            # params from the Pulsar client.
            # Flush with change_state.
            job_wrapper.set_job_destination(job_destination, external_id=external_job_id, flush=False, job=job)
            job_wrapper.change_state(model.Job.states.QUEUED, job=job)
        except Exception:
            job_wrapper.fail("failure running job", exception=True)
            log.exception("failure running job %d", job_wrapper.job_id)
            return

        pulsar_job_state = AsynchronousJobState(
            job_wrapper=job_wrapper, job_id=external_job_id, job_destination=job_destination
        )
        pulsar_job_state.old_state = True
        pulsar_job_state.running = False
        self.monitor_job(pulsar_job_state)

    def __needed_features(self, client):
        return {
            "remote_metadata": PulsarJobRunner.__remote_metadata(client),
            "remote_container_handling": PulsarJobRunner.__remote_container_handling(client),
        }

    def __prepare_job(self, job_wrapper, job_destination):
        """Build command-line and Pulsar client for this job."""
        command_line = None
        client = None
        remote_job_config = None
        compute_environment: Optional[PulsarComputeEnvironment] = None
        remote_container = None

        fail_or_resubmit = False
        try:
            client = self.get_client_from_wrapper(job_wrapper)
            tool = job_wrapper.tool
            remote_job_config = client.setup(tool.id, tool.version, tool.requires_galaxy_python_environment)
            remote_container_handling = PulsarJobRunner.__remote_container_handling(client)
            if remote_container_handling:
                # Handle this remotely and don't pass it to build_command
                remote_container = self._find_container(
                    job_wrapper,
                )

            needed_features = self.__needed_features(client)
            PulsarJobRunner.check_job_config(remote_job_config, check_features=needed_features)
            rewrite_parameters = PulsarJobRunner.__rewrite_parameters(client)
            prepare_kwds = {}
            if rewrite_parameters:
                compute_environment = PulsarComputeEnvironment(client, job_wrapper, remote_job_config)
                prepare_kwds["compute_environment"] = compute_environment
            job_wrapper.prepare(**prepare_kwds)
            self.__prepare_input_files_locally(job_wrapper)
            remote_metadata = PulsarJobRunner.__remote_metadata(client)
            dependency_resolution = PulsarJobRunner.__dependency_resolution(client)
            metadata_kwds = self.__build_metadata_configuration(
                client,
                job_wrapper,
                remote_metadata,
                remote_job_config,
                compute_environment=compute_environment,
            )
            remote_working_directory = remote_job_config["working_directory"]
            remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir))
            remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files"))
            pulsar_version = PulsarJobRunner.pulsar_version(remote_job_config)
            remote_command_params = dict(
                working_directory=remote_job_config["metadata_directory"],
                script_directory=remote_job_directory,
                metadata_kwds=metadata_kwds,
                dependency_resolution=dependency_resolution,
                pulsar_version=pulsar_version,
            )
            rewrite_paths = not PulsarJobRunner.__rewrite_parameters(client)
            if pulsar_version < Version("0.14.999") and rewrite_paths:
                job_wrapper.disable_commands_in_new_shell()
            container = None
            if remote_container is None:
                container = self._find_container(
                    job_wrapper,
                    compute_working_directory=remote_working_directory,
                    compute_tool_directory=remote_tool_directory,
                    compute_job_directory=remote_job_directory,
                )

            # Pulsar handles ``create_tool_working_directory`` and
            # ``include_work_dir_outputs`` details.
            command_line = build_command(
                self,
                job_wrapper=job_wrapper,
                container=container,
                include_metadata=remote_metadata,
                create_tool_working_directory=False,
                include_work_dir_outputs=False,
                remote_command_params=remote_command_params,
                remote_job_directory=remote_job_directory,
            )
        except UnsupportedPulsarException:
            log.exception("failure running job %d, unsupported Pulsar target", job_wrapper.job_id)
            fail_or_resubmit = True
        except PulsarClientTransportError:
            log.exception("failure running job %d, Pulsar connection failed", job_wrapper.job_id)
            fail_or_resubmit = True
        except Exception:
            log.exception("failure running job %d", job_wrapper.job_id)
            fail_or_resubmit = True

        # If we were unable to get a command line, there was problem
        fail_or_resubmit = fail_or_resubmit or not command_line
        if fail_or_resubmit:
            job_state = self._job_state(job_wrapper.get_job(), job_wrapper)
            self.work_queue.put((self.fail_job, job_state))

        return command_line, client, remote_job_config, compute_environment, remote_container

    def __prepare_input_files_locally(self, job_wrapper):
        """Run task splitting commands locally."""
        prepare_input_files_cmds = getattr(job_wrapper, "prepare_input_files_cmds", None)
        if prepare_input_files_cmds is not None:
            for cmd in prepare_input_files_cmds:  # run the commands to stage the input files
                subprocess.check_call(cmd, shell=True)
            job_wrapper.prepare_input_files_cmds = None  # prevent them from being used in-line

    def _populate_parameter_defaults(self, job_destination):
        updated = False
        params = job_destination.params
        for key, value in self.destination_defaults.items():
            if key in params:
                if value is PARAMETER_SPECIFICATION_IGNORED:
                    log.warning(f"Pulsar runner in selected configuration ignores parameter {key}")
                continue
            # if self.runner_params.get( key, None ):
            #    # Let plugin define defaults for some parameters -
            #    # for instance that way jobs_directory can be
            #    # configured next to AMQP url (where it belongs).
            #    params[ key ] = self.runner_params[ key ]
            #    continue

            if not value:
                continue

            if value is PARAMETER_SPECIFICATION_REQUIRED:
                raise Exception(f"Pulsar destination does not define required parameter {key}")
            elif value is not PARAMETER_SPECIFICATION_IGNORED:
                params[key] = value
                updated = True
        return updated

    def get_output_files(self, job_wrapper):
        output_paths = job_wrapper.job_io.get_output_fnames()
        return [str(o) for o in output_paths]  # Force job_path from DatasetPath objects.

    def get_input_files(self, job_wrapper):
        input_paths = job_wrapper.job_io.get_input_paths()
        return [str(i) for i in input_paths]  # Force job_path from DatasetPath objects.

    def get_client_from_wrapper(self, job_wrapper):
        job_id = job_wrapper.job_id
        if hasattr(job_wrapper, "task_id"):
            job_id = f"{job_id}_{job_wrapper.task_id}"
        params = job_wrapper.job_destination.params.copy()
        if user := job_wrapper.get_job().user:
            for key, value in params.items():
                if value and isinstance(value, str):
                    params[key] = model.User.expand_user_properties(user, value)

        env = getattr(job_wrapper.job_destination, "env", [])
        return self.get_client(params, job_id, env)

    def get_client_from_state(self, job_state):
        job_destination_params = job_state.job_destination.params
        job_id = job_state.job_wrapper.job_id  # we want the Galaxy ID here, job_state.job_id is the external one.
        return self.get_client(job_destination_params, job_id)

    def get_client(self, job_destination_params, job_id, env=None):
        # Cannot use url_for outside of web thread.
        # files_endpoint = url_for( controller="job_files", job_id=encoded_job_id )
        if env is None:
            env = []
        encoded_job_id = self.app.security.encode_id(job_id)
        job_key = self.app.security.encode_id(job_id, kind="jobs_files")
        endpoint_base = "%s/api/jobs/%s/files?job_key=%s"
        if self.app.config.nginx_upload_job_files_path:
            endpoint_base = "%s" + self.app.config.nginx_upload_job_files_path + "?job_id=%s&job_key=%s"
        files_endpoint = endpoint_base % (self.galaxy_url, encoded_job_id, job_key)
        secret = job_destination_params.get("job_secret_base", "jobs_token")
        job_key = self.app.security.encode_id(job_id, kind=secret)
        token_endpoint = f"{self.galaxy_url}/api/jobs/{encoded_job_id}/oidc-tokens?job_key={job_key}"
        get_client_kwds = dict(
            job_id=str(job_id), files_endpoint=files_endpoint, token_endpoint=token_endpoint, env=env
        )
        # Turn MutableDict into standard dict for pulsar consumption
        job_destination_params = dict(job_destination_params.items())
        return self.client_manager.get_client(job_destination_params, **get_client_kwds)

    def finish_job(self, job_state: JobState):
        assert isinstance(
            job_state, AsynchronousJobState
        ), f"job_state type is '{type(job_state)}', expected AsynchronousJobState"
        job_wrapper = job_state.job_wrapper
        try:
            client = self.get_client_from_state(job_state)
            run_results = client.full_status()
            remote_metadata_directory = run_results.get("metadata_directory", None)
            tool_stdout = unicodify(run_results.get("stdout", ""), strip_null=True)
            tool_stderr = unicodify(run_results.get("stderr", ""), strip_null=True)
            job_stdout = run_results.get("job_stdout")
            job_stderr = run_results.get("job_stderr")
            exit_code = run_results.get("returncode")
            pulsar_outputs = PulsarOutputs.from_status_response(run_results)
            state = job_wrapper.get_state()
            # Use Pulsar client code to transfer/copy files back
            # and cleanup job if needed.
            completed_normally = state not in [model.Job.states.ERROR, model.Job.states.DELETED]
            if completed_normally and state == model.Job.states.STOPPED:
                # Discard pulsar exit code (probably -9), we know the user stopped the job
                log.debug("Setting exit code for stopped job {job_wrapper.job_id} to 0 (was {exit_code})")
                exit_code = 0
            cleanup_job = job_wrapper.cleanup_job
            client_outputs = self.__client_outputs(client, job_wrapper)
            finish_args = dict(
                client=client,
                job_completed_normally=completed_normally,
                cleanup_job=cleanup_job,
                client_outputs=client_outputs,
                pulsar_outputs=pulsar_outputs,
            )
            failed = pulsar_finish_job(**finish_args)
            if failed:
                job_wrapper.fail(
                    "Failed to find or download one or more job outputs from remote server.", exception=True
                )
        except Exception:
            self.fail_job(job_state, message=GENERIC_REMOTE_ERROR, exception=True)
            log.exception("failure finishing job %d", job_wrapper.job_id)
            return
        if not PulsarJobRunner.__remote_metadata(client):
            # we need an actual exit code file in the job working directory to detect job errors in the metadata script
            with open(
                os.path.join(job_wrapper.working_directory, f"galaxy_{job_wrapper.job_id}.ec"), "w"
            ) as exit_code_file:
                exit_code_file.write(str(exit_code))
            self._handle_metadata_externally(job_wrapper, resolve_requirements=True)
        # Finish the job
        try:
            job_metrics_directory = os.path.join(job_wrapper.working_directory, "metadata")
            # Following check is a hack for jobs started during 19.01 or earlier release
            # and finishing with a 19.05 code base. Eliminate the hack in 19.09 or later
            # along with hacks for legacy metadata compute strategy.
            if not os.path.exists(job_metrics_directory) or not any(
                "__instrument" in f for f in os.listdir(job_metrics_directory)
            ):
                job_metrics_directory = job_wrapper.working_directory
            job_wrapper.finish(
                tool_stdout,
                tool_stderr,
                exit_code,
                job_stdout=job_stdout,
                job_stderr=job_stderr,
                remote_metadata_directory=remote_metadata_directory,
                job_metrics_directory=job_metrics_directory,
            )
        except Exception:
            log.exception("Job wrapper finish method failed")
            job_wrapper.fail("Unable to finish job", exception=True)

    def check_pid(self, pid):
        try:
            os.kill(pid, 0)
            return True
        except OSError as e:
            if e.errno == errno.ESRCH:
                log.debug("check_pid(): PID %d is dead" % pid)
            else:
                log.warning(
                    "check_pid(): Got errno %s when attempting to check PID %d: %s"
                    % (errno.errorcode[e.errno], pid, e.strerror)
                )
            return False

    def stop_job(self, job_wrapper):
        job = job_wrapper.get_job()
        if not job.job_runner_external_id:
            return
        # if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
        client = self.get_client(job.destination_params, job.job_runner_external_id)
        job_ext_output_metadata = job.get_external_output_metadata()
        if not PulsarJobRunner.__remote_metadata(client) and job_ext_output_metadata:
            pid = job_ext_output_metadata[
                0
            ].job_runner_external_pid  # every JobExternalOutputMetadata has a pid set, we just need to take from one of them
            if pid in [None, ""]:
                log.warning(f"stop_job(): {job.id}: no PID in database for job, unable to stop")
                return
            pid = int(pid)
            if not self.check_pid(pid):
                log.warning("stop_job(): %s: PID %d was already dead or can't be signaled" % (job.id, pid))
                return
            for sig in [15, 9]:
                try:
                    os.killpg(pid, sig)
                except OSError as e:
                    log.warning(
                        "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s"
                        % (job.id, errno.errorcode[e.errno], sig, pid, e.strerror)
                    )
                    return  # give up
                sleep(2)
                if not self.check_pid(pid):
                    log.debug("stop_job(): %s: PID %d successfully killed with signal %d" % (job.id, pid, sig))
                    return
                else:
                    log.warning("stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % (job.id, pid))
        else:
            # Remote kill
            pulsar_url = job.job_runner_name
            job_id = job.job_runner_external_id
            log.debug(f"Attempt remote Pulsar kill of job with url {pulsar_url} and id {job_id}")
            client = self.get_client(job.destination_params, job_id)
            client.kill()

    def recover(self, job, job_wrapper):
        """Recover jobs stuck in the queued/running state when Galaxy started."""
        job_state = self._job_state(job, job_wrapper)
        job_wrapper.command_line = job.get_command_line()
        state = job.get_state()
        if state in [model.Job.states.RUNNING, model.Job.states.QUEUED, model.Job.states.STOPPED]:
            log.debug(f"(Pulsar/{job.id}) is still in {state} state, adding to the Pulsar queue")
            job_state.old_state = True
            job_state.running = state == model.Job.states.RUNNING
            self.monitor_queue.put(job_state)

    def shutdown(self):
        super().shutdown()
        self.client_manager.shutdown()
        if self.pulsar_app:
            self.pulsar_app.shutdown()

    def _job_state(self, job, job_wrapper):
        raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id
        job_state = AsynchronousJobState(
            job_wrapper=job_wrapper, job_id=raw_job_id, job_destination=job_wrapper.job_destination
        )
        # TODO: Determine why this is set when using normal message queue updates
        # but not CLI submitted MQ updates...
        job_state.runner_url = job_wrapper.get_job_runner_url()
        return job_state

    def __client_outputs(self, client, job_wrapper):
        metadata_directory = os.path.join(job_wrapper.working_directory, "metadata")
        metadata_strategy = job_wrapper.get_destination_configuration("metadata_strategy", None)
        tool = job_wrapper.tool
        tool_provided_metadata_file_path = tool.provided_metadata_file
        tool_provided_metadata_style = tool.provided_metadata_style

        dynamic_outputs = None  # use default
        if metadata_strategy == "extended" and PulsarJobRunner.__remote_metadata(client):
            # if Pulsar is doing remote metadata and the remote metadata is extended,
            # we only need to recover the final model store.
            dynamic_outputs = EXTENDED_METADATA_DYNAMIC_COLLECTION_PATTERN
            output_files = []
            work_dir_outputs = []
        else:
            # otherwise collect everything we might need
            dynamic_outputs = DEFAULT_DYNAMIC_COLLECTION_PATTERN[:]
            # grab discovered outputs...
            dynamic_outputs.extend(job_wrapper.tool.output_discover_patterns)
            # grab tool provided metadata (galaxy.json) also...
            dynamic_outputs.append(re.escape(tool_provided_metadata_file_path))
            output_files = self.get_output_files(job_wrapper)
            work_dir_outputs = self.get_work_dir_outputs(job_wrapper)
        dynamic_file_sources = [
            {
                "path": tool_provided_metadata_file_path,
                "type": "galaxy" if tool_provided_metadata_style == "default" else "legacy_galaxy",
            }
        ]
        client_outputs = ClientOutputs(
            working_directory=job_wrapper.tool_working_directory,
            metadata_directory=metadata_directory,
            work_dir_outputs=work_dir_outputs,
            output_files=output_files,
            version_file=job_wrapper.get_version_string_path(),
            dynamic_outputs=dynamic_outputs,
            dynamic_file_sources=dynamic_file_sources,
        )
        return client_outputs

    @staticmethod
    def pulsar_version(remote_job_config):
        pulsar_version = Version(remote_job_config.get("pulsar_version", "0.6.0"))
        return pulsar_version

    @staticmethod
    def check_job_config(remote_job_config, check_features=None):
        check_features = check_features or {}
        # 0.6.0 was newest Pulsar version that did not report it's version.
        pulsar_version = PulsarJobRunner.pulsar_version(remote_job_config)
        needed_version = Version("0.0.0")
        log.info(f"pulsar_version is {pulsar_version}")
        for feature, needed in list(check_features.items()) + [("_default_", True)]:
            if not needed:
                continue
            if pulsar_version < MINIMUM_PULSAR_VERSIONS[feature]:
                needed_version = max(needed_version, MINIMUM_PULSAR_VERSIONS[feature])
        if pulsar_version < needed_version:
            raise UnsupportedPulsarException(needed_version)

    @staticmethod
    def __dependencies_description(pulsar_client, job_wrapper):
        dependency_resolution = PulsarJobRunner.__dependency_resolution(pulsar_client)
        remote_dependency_resolution = dependency_resolution == "remote"
        if not remote_dependency_resolution:
            return None
        requirements = job_wrapper.tool.requirements
        installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies
        return dependencies.DependenciesDescription(
            requirements=requirements,
            installed_tool_dependencies=installed_tool_dependencies,
        )

    @staticmethod
    def __dependency_resolution(pulsar_client):
        dependency_resolution = pulsar_client.destination_params.get("dependency_resolution", "remote")
        if dependency_resolution not in ["none", "local", "remote"]:
            raise Exception(f"Unknown dependency_resolution value encountered {dependency_resolution}")
        return dependency_resolution

    @staticmethod
    def __remote_metadata(pulsar_client):
        remote_metadata = string_as_bool_or_none(pulsar_client.destination_params.get("remote_metadata", False))
        return remote_metadata

    @staticmethod
    def __remote_container_handling(pulsar_client):
        remote_container_handling = string_as_bool_or_none(
            pulsar_client.destination_params.get("remote_container_handling", False)
        )
        return remote_container_handling

    @staticmethod
    def __use_remote_datatypes_conf(pulsar_client):
        """Use remote metadata datatypes instead of Galaxy's.

        When setting remote metadata, use integrated datatypes from this
        Galaxy instance or use the datatypes config configured via the remote
        Pulsar.

        Both options are broken in different ways for same reason - datatypes
        may not match. One can push the local datatypes config to the remote
        server - but there is no guarentee these datatypes will be defined
        there. Alternatively, one can use the remote datatype config - but
        there is no guarentee that it will contain all the datatypes available
        to this Galaxy.
        """
        use_remote_datatypes = string_as_bool_or_none(
            pulsar_client.destination_params.get("use_remote_datatypes", False)
        )
        return use_remote_datatypes

    @staticmethod
    def __rewrite_parameters(pulsar_client):
        return string_as_bool_or_none(pulsar_client.destination_params.get("rewrite_parameters", False)) or False

    def __build_metadata_configuration(
        self,
        client,
        job_wrapper,
        remote_metadata,
        remote_job_config,
        compute_environment: Optional["PulsarComputeEnvironment"] = None,
    ):
        metadata_kwds: Dict[str, Any] = {}
        if remote_metadata:
            working_directory = remote_job_config["working_directory"]
            metadata_directory = remote_job_config["metadata_directory"]
            # For metadata calculation, we need to build a list of of output
            # file objects with real path indicating location on Galaxy server
            # and false path indicating location on compute server. Since the
            # Pulsar disables from_work_dir copying as part of the job command
            # line we need to take the list of output locations on the Pulsar
            # server (produced by job_wrapper.job_io.get_output_fnames() and for
            # each work_dir output substitute the effective path on the Pulsar
            # server relative to the remote working directory as the
            # false_path to send the metadata command generation module.
            work_dir_outputs = self.get_work_dir_outputs(job_wrapper, tool_working_directory=working_directory)
            outputs = job_wrapper.job_io.get_output_fnames()
            # copy fixes 'test/integration/test_pulsar_embedded_remote_metadata.py::test_tools[job_properties]'
            real_path_to_output = {o.real_path: copy.copy(o) for o in outputs}
            rewritten_outputs = []
            for pulsar_workdir_path, real_path in work_dir_outputs:
                work_dir_output = real_path_to_output.pop(real_path, None)
                if work_dir_output:
                    work_dir_output.false_path = pulsar_workdir_path
                    rewritten_outputs.append(work_dir_output)

            for output in real_path_to_output.values():
                if compute_environment:
                    output.false_path = compute_environment.path_mapper.remote_output_path_rewrite(str(output))
                rewritten_outputs.append(output)

            metadata_kwds["output_fnames"] = rewritten_outputs
            remote_system_properties = remote_job_config.get("system_properties", {})
            remote_galaxy_home = remote_system_properties.get("galaxy_home")
            if not job_wrapper.use_metadata_binary:
                if not remote_galaxy_home:
                    raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
                metadata_kwds["exec_dir"] = remote_galaxy_home
                metadata_kwds["compute_tmp_dir"] = metadata_directory
                metadata_kwds["config_root"] = remote_galaxy_home
                default_config_file = os.path.join(remote_galaxy_home, "config/galaxy.ini")
                metadata_kwds["config_file"] = remote_system_properties.get("galaxy_config_file", default_config_file)
                metadata_kwds["dataset_files_path"] = remote_system_properties.get("galaxy_dataset_files_path", None)
            if PulsarJobRunner.__use_remote_datatypes_conf(client):
                remote_datatypes_config = remote_system_properties.get("galaxy_datatypes_config_file")
                if not remote_datatypes_config:
                    log.warning(NO_REMOTE_DATATYPES_CONFIG)
                    if not remote_galaxy_home:
                        raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
                    remote_datatypes_config = os.path.join(remote_galaxy_home, "datatypes_conf.xml")
                metadata_kwds["datatypes_config"] = remote_datatypes_config
            else:
                datatypes_config = os.path.join(job_wrapper.working_directory, "registry.xml")
                self.app.datatypes_registry.to_xml_file(path=datatypes_config)
                # Ensure this file gets pushed out to the remote config dir.
                job_wrapper.extra_filenames.append(datatypes_config)
                metadata_kwds["datatypes_config"] = datatypes_config
        return metadata_kwds

    def __async_update(self, full_status):
        galaxy_job_id = None
        remote_job_id = None
        try:
            check_database_connection(self.sa_session)
            remote_job_id = full_status["job_id"]
            if len(remote_job_id) == 32:
                # It is a UUID - assign_ids = uuid in destination params...
                stmt = select(model.Job.id).filter(model.Job.job_runner_external_id == remote_job_id)
                galaxy_job_id = self.app.model.session.execute(stmt).scalar_one()
            else:
                galaxy_job_id = remote_job_id
            job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
            job_state = self._job_state(job, job_wrapper)
            self._update_job_state_for_status(job_state, full_status["status"], full_status=full_status)
        except Exception:
            log.exception(f"Failed to update Pulsar job status for job_id ({galaxy_job_id}/{remote_job_id})")
            raise
            # Nothing else to do? - Attempt to fail the job?


[docs]class PulsarLegacyJobRunner(PulsarJobRunner): """Flavor of Pulsar job runner mimicking behavior of old LWR runner.""" destination_defaults = dict( rewrite_parameters="false", dependency_resolution="local", )
[docs]class PulsarMQJobRunner(PulsarJobRunner): """Flavor of Pulsar job runner with sensible defaults for message queue communication.""" use_mq = True poll = False destination_defaults = dict( default_file_action="remote_transfer", rewrite_parameters="true", dependency_resolution="remote", jobs_directory=PARAMETER_SPECIFICATION_REQUIRED, url=PARAMETER_SPECIFICATION_IGNORED, private_token=PARAMETER_SPECIFICATION_IGNORED, )
DEFAULT_PULSAR_CONTAINER = "galaxy/pulsar-pod-staging:0.15.0.2" COEXECUTION_DESTENTATION_DEFAULTS = { "default_file_action": "remote_transfer", "rewrite_parameters": "true", "jobs_directory": "/pulsar_staging", "pulsar_container_image": DEFAULT_PULSAR_CONTAINER, "remote_container_handling": True, "url": PARAMETER_SPECIFICATION_IGNORED, "private_token": PARAMETER_SPECIFICATION_IGNORED, } class PulsarCoexecutionJobRunner(PulsarMQJobRunner): destination_defaults = COEXECUTION_DESTENTATION_DEFAULTS def _populate_parameter_defaults(self, job_destination): super()._populate_parameter_defaults(job_destination) params = job_destination.params # Set some sensible defaults for Pulsar application that runs in staging container. if "pulsar_app_config" not in params: params["pulsar_app_config"] = {} pulsar_app_config = params["pulsar_app_config"] if "staging_directory" not in pulsar_app_config: # coexecution always uses a fixed path for staging directory pulsar_app_config["staging_directory"] = params.get("jobs_directory") KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTENTATION_DEFAULTS} class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner): destination_defaults = KUBERNETES_DESTINATION_DEFAULTS poll = True # Poll so we can check API for pod IP for ITs. TES_DESTENTATION_DEFAULTS: Dict[str, Any] = { "tes_url": PARAMETER_SPECIFICATION_REQUIRED, **COEXECUTION_DESTENTATION_DEFAULTS, } class PulsarTesJobRunner(PulsarCoexecutionJobRunner): destination_defaults = TES_DESTENTATION_DEFAULTS
[docs]class PulsarRESTJobRunner(PulsarJobRunner): """Flavor of Pulsar job runner with sensible defaults for RESTful usage.""" destination_defaults = dict( default_file_action="transfer", rewrite_parameters="true", dependency_resolution="remote", url=PARAMETER_SPECIFICATION_REQUIRED, )
[docs]class PulsarEmbeddedJobRunner(PulsarJobRunner): """Flavor of Puslar job runner that runs Pulsar's server code directly within Galaxy. This is an appropriate job runner for when the desire is to use Pulsar staging but their is not need to run a remote service. """ destination_defaults = dict( default_file_action="copy", rewrite_parameters="true", dependency_resolution="remote", ) default_build_pulsar_app = True
[docs]class PulsarEmbeddedMQJobRunner(PulsarMQJobRunner): default_build_pulsar_app = True
class PulsarComputeEnvironment(ComputeEnvironment): def __init__(self, pulsar_client, job_wrapper, remote_job_config): self.pulsar_client = pulsar_client self.job_wrapper = job_wrapper self.local_path_config = job_wrapper.default_compute_environment() self.path_rewrites_unstructured = {} self.path_rewrites_input_extra = {} self.path_rewrites_input_metadata = {} # job_wrapper.prepare is going to expunge the job backing the following # computations, so precalculate these paths. self.path_mapper = PathMapper(pulsar_client, remote_job_config, self.local_path_config.working_directory()) self._config_directory = remote_job_config["configs_directory"] self._working_directory = remote_job_config["working_directory"] self._sep = remote_job_config["system_properties"]["separator"] self._tool_dir = remote_job_config["tools_directory"] self._tmp_dir = remote_job_config.get("tmp_dir") self._shared_home_dir = remote_job_config.get("shared_home_dir") version_path = self.local_path_config.version_path() new_version_path = self.path_mapper.remote_version_path_rewrite(version_path) if new_version_path: version_path = new_version_path self._version_path = version_path def output_names(self): # Maybe this should use the path mapper, but the path mapper just uses basenames return self.job_wrapper.job_io.get_output_basenames() def input_path_rewrite(self, dataset): local_input_path_rewrite = self.local_path_config.input_path_rewrite(dataset) if local_input_path_rewrite is not None: local_input_path = local_input_path_rewrite else: local_input_path = dataset.get_file_name() remote_path = self.path_mapper.remote_input_path_rewrite(local_input_path) return remote_path def output_path_rewrite(self, dataset): local_output_path_rewrite = self.local_path_config.output_path_rewrite(dataset) if local_output_path_rewrite is not None: local_output_path = local_output_path_rewrite else: local_output_path = dataset.get_file_name() remote_path = self.path_mapper.remote_output_path_rewrite(local_output_path) return remote_path def input_extra_files_rewrite(self, dataset): input_path_rewrite = self.input_path_rewrite(dataset) remote_extra_files_path_rewrite = dataset_path_to_extra_path(input_path_rewrite) self.path_rewrites_input_extra[dataset.extra_files_path] = remote_extra_files_path_rewrite return remote_extra_files_path_rewrite def output_extra_files_rewrite(self, dataset): output_path_rewrite = self.output_path_rewrite(dataset) remote_extra_files_path_rewrite = dataset_path_to_extra_path(output_path_rewrite) return remote_extra_files_path_rewrite def input_metadata_rewrite(self, dataset, metadata_val): # May technically be incorrect to not pass through local_path_config.input_metadata_rewrite # first but that adds untested logic that wouln't ever be used. remote_input_path = self.path_mapper.remote_input_path_rewrite( metadata_val, client_input_path_type=CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH ) if remote_input_path: log.info(f"input_metadata_rewrite is {remote_input_path} from {metadata_val}") self.path_rewrites_input_metadata[metadata_val] = remote_input_path return remote_input_path # No rewrite... return None def unstructured_path_rewrite(self, parameter_value): path_rewrites_unstructured = self.path_rewrites_unstructured if parameter_value in path_rewrites_unstructured: # Path previously mapped, use previous mapping. return path_rewrites_unstructured[parameter_value] rewrite, new_unstructured_path_rewrites = self.path_mapper.check_for_arbitrary_rewrite(parameter_value) if rewrite: path_rewrites_unstructured.update(new_unstructured_path_rewrites) return rewrite else: # Did not need to rewrite, use original path or value. return None def working_directory(self): return self._working_directory def env_config_directory(self): return self.config_directory() def config_directory(self): return self._config_directory def new_file_path(self): return self.working_directory() # Problems with doing this? def sep(self): return self._sep def version_path(self): return self._version_path def tool_directory(self): return self._tool_dir def home_directory(self): return self._target_to_directory(self.job_wrapper.home_target) def tmp_directory(self): return self._target_to_directory(self.job_wrapper.tmp_target) def _target_to_directory(self, target): tmp_dir = self._tmp_dir if target is None or (target == "job_tmp_if_explicit" and tmp_dir is None): return None elif target in ["job_tmp", "job_tmp_if_explicit"]: return "$_GALAXY_JOB_TMP_DIR" elif target == "shared_home": return self._shared_home_dir elif target == "job_home": return "$_GALAXY_JOB_HOME_DIR" elif target == "pwd": os.path.join(self.working_directory(), "working") else: raise Exception(f"Unknown target type [{target}]") def galaxy_url(self): return self.job_wrapper.get_destination_configuration("galaxy_infrastructure_url") def get_file_sources_dict(self): return self.job_wrapper.job_io.file_sources_dict class UnsupportedPulsarException(Exception): def __init__(self, needed): super().__init__(UPGRADE_PULSAR_ERROR % needed)