Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.queues

"""

All message queues used by Galaxy

"""

import socket
from typing import Optional

from kombu import (
    Connection,
    Exchange,
    Queue,
)

ALL_CONTROL = "control.*"
galaxy_exchange = Exchange("galaxy_core_exchange", type="topic")


[docs]def all_control_queues_for_declare(application_stack): """ For in-memory routing (used by sqlalchemy-based transports), we need to be able to build the entire routing table in producers. """ # Get all active processes and construct queues for each process process_names = ( f"{p.server_name}@{p.hostname}" for p in application_stack.app.database_heartbeat.get_active_processes() ) return [Queue(f"control.{server_name}", galaxy_exchange, routing_key="control.*") for server_name in process_names]
[docs]def control_queues_from_config(config): """ Returns a Queue instance with the correct name and routing key for this galaxy process's config """ hostname = socket.gethostname() process_name = f"{config.server_name}@{hostname}" exchange_queue = Queue(f"control.{process_name}", galaxy_exchange, routing_key="control.*") non_exchange_queue = Queue(f"control.{process_name}", routing_key=f"control.{process_name}") return exchange_queue, non_exchange_queue
[docs]def connection_from_config(config) -> Optional[Connection]: if config.amqp_internal_connection: return Connection(config.amqp_internal_connection) else: return None