Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.queues
"""
All message queues used by Galaxy
"""
import datetime
import logging
import socket
from typing import (
Optional,
TYPE_CHECKING,
)
from kombu import (
Connection,
Exchange,
Queue,
)
from sqlalchemy import select
from galaxy.model import WorkerProcess
from galaxy.model.orm.now import now
if TYPE_CHECKING:
from galaxy.web_stack import ApplicationStack
log = logging.getLogger(__name__)
ALL_CONTROL = "control.*"
galaxy_exchange = Exchange("galaxy_core_exchange", type="topic")
DEFAULT_ACTIVE_PROCESS_WINDOW_SECONDS = 120
# Matches WorkerProcess.app_type set by DatabaseHeartbeat for webapp processes.
WEBAPP_APP_TYPE = "webapp"
[docs]
def all_control_queues_for_declare(application_stack: "ApplicationStack", webapp_only: bool = False) -> list[Queue]:
"""
For in-memory routing (used by sqlalchemy-based transports), we need to be able to
build the entire routing table in producers.
Queries ``WorkerProcess`` directly rather than going through
``DatabaseHeartbeat`` so this works from Celery workers too — they have a
``model`` but no heartbeat thread. Without this, a notification created in
a Celery task publishes a ``notify_users`` control task with an empty
``declare`` list, so on the sqlalchemy+sqlite kombu transport the message
never lands in a web worker's queue.
When ``webapp_only`` is True, only returns queues for processes that have
registered themselves with ``app_type='webapp'``. This is what the SSE
dispatcher wants: job handlers and workflow schedulers have no browser
connections, so routing SSE events to them is wasted work.
"""
app = application_stack.app
try:
stmt = select(WorkerProcess).where(
WorkerProcess.update_time > now() - datetime.timedelta(seconds=DEFAULT_ACTIVE_PROCESS_WINDOW_SECONDS)
)
if webapp_only:
stmt = stmt.where(WorkerProcess.app_type == WEBAPP_APP_TYPE)
with app.model.new_session() as session:
processes = session.scalars(stmt).all()
except Exception:
log.debug("Failed to look up active processes for control-queue declare", exc_info=True)
return []
return [Queue(f"control.{p.server_name}@{p.hostname}", galaxy_exchange, routing_key="control.*") for p in processes]
[docs]
def control_queues_from_config(config):
"""
Returns a Queue instance with the correct name and routing key for this
galaxy process's config
"""
hostname = socket.gethostname()
process_name = f"{config.server_name}@{hostname}"
exchange_queue = Queue(f"control.{process_name}", galaxy_exchange, routing_key="control.*")
non_exchange_queue = Queue(f"control.{process_name}", routing_key=f"control.{process_name}")
return exchange_queue, non_exchange_queue
[docs]
def connection_from_config(config) -> Optional[Connection]:
if config.amqp_internal_connection:
return Connection(config.amqp_internal_connection)
else:
return None