Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.celery.tasks

import json
from concurrent.futures import TimeoutError
from functools import lru_cache
from pathlib import Path
from typing import (
    Any,
    Callable,
    Optional,
)

from sqlalchemy import (
    exists,
    select,
)

from galaxy import model
from galaxy.celery import (
    celery_app,
    galaxy_task,
)
from galaxy.config import GalaxyAppConfiguration
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry as DatatypesRegistry
from galaxy.jobs import MinimalJobWrapper
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.datasets import (
    DatasetAssociationManager,
    DatasetManager,
)
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.markdown_util import generate_branded_pdf
from galaxy.managers.model_stores import ModelStoreManager
from galaxy.managers.tool_data import ToolDataImportManager
from galaxy.metadata.set_metadata import set_metadata_portable
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.queue_worker import GalaxyQueueWorker
from galaxy.schema.tasks import (
    ComputeDatasetHashTaskRequest,
    GenerateHistoryContentDownload,
    GenerateHistoryDownload,
    GenerateInvocationDownload,
    GeneratePdfDownload,
    ImportModelStoreTaskRequest,
    MaterializeDatasetInstanceTaskRequest,
    PrepareDatasetCollectionDownload,
    SetupHistoryExportJob,
    WriteHistoryContentTo,
    WriteHistoryTo,
    WriteInvocationTo,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.tools import create_tool_from_representation
from galaxy.tools.data_fetch import do_fetch
from galaxy.util import galaxy_directory
from galaxy.util.custom_logging import get_logger
from galaxy.web.short_term_storage import ShortTermStorageMonitor

log = get_logger(__name__)


[docs]@lru_cache() def setup_data_table_manager(app): app._configure_tool_data_tables(from_shed_config=False)
[docs]@lru_cache() def cached_create_tool_from_representation(app, raw_tool_source): return create_tool_from_representation( app=app, raw_tool_source=raw_tool_source, tool_dir="", tool_source_class="XmlToolSource" )
@galaxy_task(ignore_result=True, action="recalculate a user's disk usage") def recalculate_user_disk_usage(session: galaxy_scoped_session, user_id: Optional[int] = None): if user_id: user = session.query(model.User).get(user_id) if user: user.calculate_and_set_disk_usage() log.info(f"New user disk usage is {user.disk_usage}") else: log.error(f"Recalculate user disk usage task failed, user {user_id} not found") else: log.error("Recalculate user disk usage task received without user_id.") @galaxy_task(ignore_result=True, action="purge a history dataset") def purge_hda(hda_manager: HDAManager, hda_id: int): hda = hda_manager.by_id(hda_id) hda_manager._purge(hda) @galaxy_task(ignore_result=True, action="materializing dataset instance") def materialize( hda_manager: HDAManager, request: MaterializeDatasetInstanceTaskRequest, ): """Materialize datasets using HDAManager.""" hda_manager.materialize(request) @galaxy_task(action="set metadata for job") def set_job_metadata( tool_job_working_directory, extended_metadata_collection: bool, job_id: int, sa_session: galaxy_scoped_session, ) -> None: return abort_when_job_stops( set_metadata_portable, session=sa_session, job_id=job_id, tool_job_working_directory=tool_job_working_directory, extended_metadata_collection=extended_metadata_collection, ) @galaxy_task(action="set or detect dataset datatype and updates metadata") def change_datatype( hda_manager: HDAManager, ldda_manager: LDDAManager, datatypes_registry: DatatypesRegistry, sa_session: galaxy_scoped_session, dataset_id: int, datatype: str, model_class: str = "HistoryDatasetAssociation", ): manager = _get_dataset_manager(hda_manager, ldda_manager, model_class) dataset_instance = manager.by_id(dataset_id) can_change_datatype = manager.ensure_can_change_datatype(dataset_instance, raiseException=False) if not can_change_datatype: log.info(f"Changing datatype is not allowed for {model_class} {dataset_instance.id}") return if datatype == "auto": path = dataset_instance.dataset.file_name datatype = sniff.guess_ext(path, datatypes_registry.sniff_order) datatypes_registry.change_datatype(dataset_instance, datatype) sa_session.flush() set_metadata(hda_manager, ldda_manager, sa_session, dataset_id, model_class) @galaxy_task(action="touch update_time of object") def touch(sa_session: galaxy_scoped_session, item_id: int, model_class: str = "HistoryDatasetCollectionAssociation"): if model_class != "HistoryDatasetCollectionAssociation": raise NotImplementedError(f"touch method not implemented for '{model_class}'") item = sa_session.query(model.HistoryDatasetCollectionAssociation).filter_by(id=item_id).one() item.touch() sa_session.flush() @galaxy_task(action="set dataset association metadata") def set_metadata( hda_manager: HDAManager, ldda_manager: LDDAManager, sa_session: galaxy_scoped_session, dataset_id: int, model_class: str = "HistoryDatasetAssociation", overwrite: bool = True, ): manager = _get_dataset_manager(hda_manager, ldda_manager, model_class) dataset_instance = manager.by_id(dataset_id) can_set_metadata = manager.ensure_can_set_metadata(dataset_instance, raiseException=False) if not can_set_metadata: log.info(f"Setting metadata is not allowed for {model_class} {dataset_instance.id}") return try: if overwrite: hda_manager.overwrite_metadata(dataset_instance) dataset_instance.datatype.set_meta(dataset_instance) dataset_instance.set_peek() dataset_instance.dataset.state = dataset_instance.dataset.states.OK except Exception as e: log.info(f"Setting metadata failed on {model_class} {dataset_instance.id}: {str(e)}") dataset_instance.dataset.state = dataset_instance.dataset.states.FAILED_METADATA sa_session.flush() def _get_dataset_manager( hda_manager: HDAManager, ldda_manager: LDDAManager, model_class: str = "HistoryDatasetAssociation" ) -> DatasetAssociationManager: if model_class == "HistoryDatasetAssociation": return hda_manager elif model_class == "LibraryDatasetDatasetAssociation": return ldda_manager else: raise NotImplementedError(f"Cannot find manager for model_class {model_class}") @galaxy_task(bind=True) def setup_fetch_data( self, job_id: int, raw_tool_source: str, app: MinimalManagerApp, sa_session: galaxy_scoped_session ): tool = cached_create_tool_from_representation(app=app, raw_tool_source=raw_tool_source) job = sa_session.query(model.Job).get(job_id) # self.request.hostname is the actual worker name given by the `-n` argument, not the hostname as you might think. job.handler = self.request.hostname job.job_runner_name = "celery" # TODO: assert state mini_job_wrapper = MinimalJobWrapper(job=job, app=app, tool=tool) mini_job_wrapper.change_state(model.Job.states.QUEUED, flush=False, job=job) # Set object store after job destination so can leverage parameters... mini_job_wrapper._set_object_store_ids(job) request_json = Path(mini_job_wrapper.working_directory) / "request.json" request_json_value = next(iter(p.value for p in job.parameters if p.name == "request_json")) request_json.write_text(json.loads(request_json_value)) mini_job_wrapper.setup_external_metadata( output_fnames=mini_job_wrapper.job_io.get_output_fnames(), set_extension=True, tmp_dir=mini_job_wrapper.working_directory, # We don't want to overwrite metadata that was copied over in init_meta(), as per established behavior kwds={"overwrite": False}, ) mini_job_wrapper.prepare() return mini_job_wrapper.working_directory, str(request_json), mini_job_wrapper.job_io.file_sources_dict @galaxy_task def finish_job(job_id: int, raw_tool_source: str, app: MinimalManagerApp, sa_session: galaxy_scoped_session): tool = cached_create_tool_from_representation(app=app, raw_tool_source=raw_tool_source) job = sa_session.query(model.Job).get(job_id) # TODO: assert state ? mini_job_wrapper = MinimalJobWrapper(job=job, app=app, tool=tool) mini_job_wrapper.finish("", "")
[docs]def is_aborted(session: galaxy_scoped_session, job_id: int): return session.execute( select( exists(model.Job.state).where( model.Job.id == job_id, model.Job.state.in_( [model.Job.states.DELETED, model.Job.states.DELETED_NEW, model.Job.states.DELETING] ), ) ) ).scalar()
[docs]def abort_when_job_stops(function: Callable, session: galaxy_scoped_session, job_id: int, **kwargs) -> Any: if not is_aborted(session, job_id): future = celery_app.fork_pool.submit( function, timeout=None, **kwargs, ) while True: try: return future.result(timeout=1) except TimeoutError: if is_aborted(session, job_id): return
def _fetch_data(setup_return): tool_job_working_directory, request_path, file_sources_dict = setup_return working_directory = Path(tool_job_working_directory) / "working" datatypes_registry = DatatypesRegistry() datatypes_registry.load_datatypes( galaxy_directory, config=Path(tool_job_working_directory) / "metadata" / "registry.xml", use_build_sites=False, use_converters=False, use_display_applications=False, ) do_fetch( request_path=request_path, working_directory=str(working_directory), registry=datatypes_registry, file_sources_dict=file_sources_dict, ) return tool_job_working_directory @galaxy_task(action="Run fetch_data") def fetch_data( setup_return, job_id: int, app: MinimalManagerApp, sa_session: galaxy_scoped_session, ) -> str: job = sa_session.query(model.Job).get(job_id) mini_job_wrapper = MinimalJobWrapper(job=job, app=app) mini_job_wrapper.change_state(model.Job.states.RUNNING, flush=True, job=job) return abort_when_job_stops(_fetch_data, session=sa_session, job_id=job_id, setup_return=setup_return) @galaxy_task(ignore_result=True, action="setting up export history job") def export_history( model_store_manager: ModelStoreManager, request: SetupHistoryExportJob, ): model_store_manager.setup_history_export_job(request) @galaxy_task(action="preparing compressed file for collection download") def prepare_dataset_collection_download( request: PrepareDatasetCollectionDownload, collection_manager: DatasetCollectionManager, ): """Create a short term storage file tracked and available for download of target collection.""" collection_manager.write_dataset_collection(request) @galaxy_task(action="preparing Galaxy Markdown PDF for download") def prepare_pdf_download( request: GeneratePdfDownload, config: GalaxyAppConfiguration, short_term_storage_monitor: ShortTermStorageMonitor ): """Create a short term storage file tracked and available for download of target PDF for Galaxy Markdown.""" generate_branded_pdf(request, config, short_term_storage_monitor) @galaxy_task(action="generate and stage a history model store for download") def prepare_history_download( model_store_manager: ModelStoreManager, request: GenerateHistoryDownload, ): model_store_manager.prepare_history_download(request) @galaxy_task(action="generate and stage a history content model store for download") def prepare_history_content_download( model_store_manager: ModelStoreManager, request: GenerateHistoryContentDownload, ): model_store_manager.prepare_history_content_download(request) @galaxy_task(action="generate and stage a workflow invocation store for download") def prepare_invocation_download( model_store_manager: ModelStoreManager, request: GenerateInvocationDownload, ): model_store_manager.prepare_invocation_download(request) @galaxy_task(action="generate and stage a workflow invocation store to file source URI") def write_invocation_to( model_store_manager: ModelStoreManager, request: WriteInvocationTo, ): model_store_manager.write_invocation_to(request) @galaxy_task(action="generate and stage a history store to file source URI") def write_history_to( model_store_manager: ModelStoreManager, request: WriteHistoryTo, ): model_store_manager.write_history_to(request) @galaxy_task(action="generate and stage a history content model store to file source URI") def write_history_content_to( model_store_manager: ModelStoreManager, request: WriteHistoryContentTo, ): model_store_manager.write_history_content_to(request) @galaxy_task(action="import objects from a target model store") def import_model_store( model_store_manager: ModelStoreManager, request: ImportModelStoreTaskRequest, ): model_store_manager.import_model_store(request) @galaxy_task(action="compute dataset hash and store in database") def compute_dataset_hash( dataset_manager: DatasetManager, request: ComputeDatasetHashTaskRequest, ): dataset_manager.compute_hash(request) @galaxy_task(action="import a data bundle") def import_data_bundle( app: MinimalManagerApp, hda_manager: HDAManager, ldda_manager: LDDAManager, tool_data_import_manager: ToolDataImportManager, config: GalaxyAppConfiguration, src: str, uri: Optional[str] = None, id: Optional[int] = None, tool_data_file_path: Optional[str] = None, ): setup_data_table_manager(app) if src == "uri": assert uri tool_data_import_manager.import_data_bundle_by_uri(config, uri, tool_data_file_path=tool_data_file_path) else: assert id dataset: model.DatasetInstance if src == "hda": dataset = hda_manager.by_id(id) else: dataset = ldda_manager.by_id(id) tool_data_import_manager.import_data_bundle_by_dataset(config, dataset, tool_data_file_path=tool_data_file_path) queue_worker = GalaxyQueueWorker(app) queue_worker.send_control_task("reload_tool_data_tables") @galaxy_task(action="pruning history audit table") def prune_history_audit_table(sa_session: galaxy_scoped_session): """Prune ever growing history_audit table.""" model.HistoryAudit.prune(sa_session) @galaxy_task(action="clean up short term storage") def cleanup_short_term_storage(storage_monitor: ShortTermStorageMonitor): """Cleanup short term storage.""" storage_monitor.cleanup()