Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.celery
import os
from functools import (
lru_cache,
wraps,
)
from celery import (
Celery,
shared_task,
)
from kombu import serialization
from lagom import magic_bind_to_container
from galaxy.config import Configuration
from galaxy.main_config import find_config
from galaxy.util.custom_logging import get_logger
from galaxy.util.properties import load_app_properties
from ._serialization import schema_dumps, schema_loads
log = get_logger(__name__)
[docs]@lru_cache(maxsize=1)
def get_galaxy_app():
import galaxy.app
if galaxy.app.app:
return galaxy.app.app
kwargs = get_app_properties()
if kwargs:
kwargs['check_migrate_databases'] = False
galaxy_app = galaxy.app.GalaxyManagerApplication(configure_logging=False, **kwargs)
return galaxy_app
[docs]@lru_cache(maxsize=1)
def get_app_properties():
config_file = os.environ.get("GALAXY_CONFIG_FILE")
galaxy_root_dir = os.environ.get('GALAXY_ROOT_DIR')
if not config_file and galaxy_root_dir:
config_file = find_config(config_file, galaxy_root_dir)
if config_file:
properties = load_app_properties(
config_file=os.path.abspath(config_file),
config_section='galaxy',
)
if galaxy_root_dir:
properties['root_dir'] = galaxy_root_dir
return properties
[docs]@lru_cache(maxsize=1)
def get_config():
kwargs = get_app_properties()
if kwargs:
kwargs['override_tempdir'] = False
return Configuration(**kwargs)
[docs]def get_history_audit_table_prune_interval():
config = get_config()
if config:
return config.history_audit_table_prune_interval
else:
return 3600
broker = get_broker()
celery_app = Celery('galaxy', broker=broker, include=['galaxy.celery.tasks'])
celery_app.set_default()
prune_interval = get_history_audit_table_prune_interval()
if prune_interval > 0:
celery_app.conf.beat_schedule = {
'prune-history-audit-table': {
'task': 'galaxy.celery.tasks.prune_history_audit_table',
'schedule': prune_interval,
},
}
celery_app.conf.timezone = 'UTC'
CELERY_TASKS = []
PYDANTIC_AWARE_SERIALIER_NAME = 'pydantic-aware-json'
serialization.register(
PYDANTIC_AWARE_SERIALIER_NAME,
encoder=schema_dumps,
decoder=schema_loads,
content_type='application/json'
)
[docs]def galaxy_task(*args, **celery_task_kwd):
if 'serializer' not in celery_task_kwd:
celery_task_kwd['serializer'] = PYDANTIC_AWARE_SERIALIER_NAME
def decorate(func):
CELERY_TASKS.append(func.__name__)
@shared_task(**celery_task_kwd)
@wraps(func)
def wrapper(*args, **kwds):
app = get_galaxy_app()
assert app
return magic_bind_to_container(app)(func)(*args, **kwds)
return wrapper
if len(args) == 1 and callable(args[0]):
return decorate(args[0])
else:
return decorate
if __name__ == '__main__':
celery_app.start()