import json
from concurrent.futures import TimeoutError
from functools import lru_cache
from pathlib import Path
from typing import (
Any,
Callable,
Optional,
)
from sqlalchemy import (
exists,
select,
)
from galaxy import model
from galaxy.celery import (
celery_app,
galaxy_task,
)
from galaxy.config import GalaxyAppConfiguration
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry as DatatypesRegistry
from galaxy.jobs import MinimalJobWrapper
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.datasets import (
DatasetAssociationManager,
DatasetManager,
)
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.markdown_util import generate_branded_pdf
from galaxy.managers.model_stores import ModelStoreManager
from galaxy.managers.notification import NotificationManager
from galaxy.managers.tool_data import ToolDataImportManager
from galaxy.metadata.set_metadata import set_metadata_portable
from galaxy.model import (
Job,
User,
)
from galaxy.model.base import transaction
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.objectstore import BaseObjectStore
from galaxy.objectstore.caching import check_caches
from galaxy.queue_worker import GalaxyQueueWorker
from galaxy.schema.notifications import NotificationCreateRequest
from galaxy.schema.tasks import (
ComputeDatasetHashTaskRequest,
GenerateHistoryContentDownload,
GenerateHistoryDownload,
GenerateInvocationDownload,
GeneratePdfDownload,
ImportModelStoreTaskRequest,
MaterializeDatasetInstanceTaskRequest,
PrepareDatasetCollectionDownload,
PurgeDatasetsTaskRequest,
SetupHistoryExportJob,
WriteHistoryContentTo,
WriteHistoryTo,
WriteInvocationTo,
)
from galaxy.short_term_storage import ShortTermStorageMonitor
from galaxy.structured_app import MinimalManagerApp
from galaxy.tools import create_tool_from_representation
from galaxy.tools.data_fetch import do_fetch
from galaxy.util import galaxy_directory
from galaxy.util.custom_logging import get_logger
log = get_logger(__name__)
[docs]@lru_cache
def setup_data_table_manager(app):
app._configure_tool_data_tables(from_shed_config=False)
@galaxy_task(action="recalculate a user's disk usage")
def recalculate_user_disk_usage(
session: galaxy_scoped_session, object_store: BaseObjectStore, task_user_id: Optional[int] = None
):
if task_user_id:
user = session.get(User, task_user_id)
if user:
user.calculate_and_set_disk_usage(object_store)
else:
log.error(f"Recalculate user disk usage task failed, user {task_user_id} not found")
else:
log.error("Recalculate user disk usage task received without user_id.")
@galaxy_task(ignore_result=True, action="purge a history dataset")
def purge_hda(hda_manager: HDAManager, hda_id: int, task_user_id: Optional[int] = None):
hda = hda_manager.by_id(hda_id)
hda_manager._purge(hda)
@galaxy_task(ignore_result=True, action="completely removes a set of datasets from the object_store")
def purge_datasets(
dataset_manager: DatasetManager, request: PurgeDatasetsTaskRequest, task_user_id: Optional[int] = None
):
dataset_manager.purge_datasets(request)
@galaxy_task(ignore_result=True, action="materializing dataset instance")
def materialize(
hda_manager: HDAManager,
request: MaterializeDatasetInstanceTaskRequest,
task_user_id: Optional[int] = None,
):
"""Materialize datasets using HDAManager."""
hda_manager.materialize(request)
@galaxy_task(action="set metadata for job")
def set_job_metadata(
tool_job_working_directory,
extended_metadata_collection: bool,
job_id: int,
sa_session: galaxy_scoped_session,
task_user_id: Optional[int] = None,
) -> None:
return abort_when_job_stops(
set_metadata_portable,
session=sa_session,
job_id=job_id,
tool_job_working_directory=tool_job_working_directory,
extended_metadata_collection=extended_metadata_collection,
)
@galaxy_task(action="set or detect dataset datatype and updates metadata")
def change_datatype(
hda_manager: HDAManager,
ldda_manager: LDDAManager,
datatypes_registry: DatatypesRegistry,
sa_session: galaxy_scoped_session,
dataset_id: int,
datatype: str,
model_class: str = "HistoryDatasetAssociation",
task_user_id: Optional[int] = None,
):
manager = _get_dataset_manager(hda_manager, ldda_manager, model_class)
dataset_instance = manager.by_id(dataset_id)
can_change_datatype = manager.ensure_can_change_datatype(dataset_instance, raiseException=False)
if not can_change_datatype:
log.info(f"Changing datatype is not allowed for {model_class} {dataset_instance.id}")
return
if datatype == "auto":
path = dataset_instance.dataset.get_file_name()
datatype = sniff.guess_ext(path, datatypes_registry.sniff_order)
datatypes_registry.change_datatype(dataset_instance, datatype)
with transaction(sa_session):
sa_session.commit()
set_metadata(hda_manager, ldda_manager, sa_session, dataset_id, model_class)
@galaxy_task(action="touch update_time of object")
def touch(
sa_session: galaxy_scoped_session,
item_id: int,
model_class: str = "HistoryDatasetCollectionAssociation",
task_user_id: Optional[int] = None,
):
if model_class != "HistoryDatasetCollectionAssociation":
raise NotImplementedError(f"touch method not implemented for '{model_class}'")
stmt = select(model.HistoryDatasetCollectionAssociation).filter_by(id=item_id)
item = sa_session.execute(stmt).scalar_one()
item.touch()
with transaction(sa_session):
sa_session.commit()
@galaxy_task(action="set dataset association metadata")
def set_metadata(
hda_manager: HDAManager,
ldda_manager: LDDAManager,
sa_session: galaxy_scoped_session,
dataset_id: int,
model_class: str = "HistoryDatasetAssociation",
overwrite: bool = True,
ensure_can_set_metadata: bool = True,
task_user_id: Optional[int] = None,
):
"""
ensure_can_set_metadata can be bypassed for new outputs.
"""
manager = _get_dataset_manager(hda_manager, ldda_manager, model_class)
dataset_instance = manager.by_id(dataset_id)
if ensure_can_set_metadata:
can_set_metadata = manager.ensure_can_set_metadata(dataset_instance, raiseException=False)
if not can_set_metadata:
log.info(f"Setting metadata is not allowed for {model_class} {dataset_instance.id}")
return
try:
if overwrite:
hda_manager.overwrite_metadata(dataset_instance)
dataset_instance.datatype.set_meta(dataset_instance) # type:ignore [arg-type]
dataset_instance.set_peek()
# Reset SETTING_METADATA state so the dataset instance getter picks the dataset state
dataset_instance.set_metadata_success_state()
except Exception as e:
log.info(f"Setting metadata failed on {model_class} {dataset_instance.id}: {str(e)}")
dataset_instance.state = dataset_instance.states.FAILED_METADATA
with transaction(sa_session):
sa_session.commit()
def _get_dataset_manager(
hda_manager: HDAManager, ldda_manager: LDDAManager, model_class: str = "HistoryDatasetAssociation"
) -> DatasetAssociationManager:
if model_class == "HistoryDatasetAssociation":
return hda_manager
elif model_class == "LibraryDatasetDatasetAssociation":
return ldda_manager
else:
raise NotImplementedError(f"Cannot find manager for model_class {model_class}")
@galaxy_task(bind=True)
def setup_fetch_data(
self,
job_id: int,
raw_tool_source: str,
app: MinimalManagerApp,
sa_session: galaxy_scoped_session,
task_user_id: Optional[int] = None,
):
tool = cached_create_tool_from_representation(app=app, raw_tool_source=raw_tool_source)
job = sa_session.get(Job, job_id)
assert job
# self.request.hostname is the actual worker name given by the `-n` argument, not the hostname as you might think.
job.handler = self.request.hostname
job.job_runner_name = "celery"
# TODO: assert state
mini_job_wrapper = MinimalJobWrapper(job=job, app=app, tool=tool)
mini_job_wrapper.change_state(model.Job.states.QUEUED, flush=False, job=job)
# Set object store after job destination so can leverage parameters...
mini_job_wrapper._set_object_store_ids(job)
request_json = Path(mini_job_wrapper.working_directory) / "request.json"
request_json_value = next(iter(p.value for p in job.parameters if p.name == "request_json"))
request_json.write_text(json.loads(request_json_value))
mini_job_wrapper.setup_external_metadata(
output_fnames=mini_job_wrapper.job_io.get_output_fnames(),
set_extension=True,
tmp_dir=mini_job_wrapper.working_directory,
# We don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
kwds={"overwrite": False},
)
mini_job_wrapper.prepare()
return mini_job_wrapper.working_directory, str(request_json), mini_job_wrapper.job_io.file_sources_dict
@galaxy_task
def finish_job(
job_id: int,
raw_tool_source: str,
app: MinimalManagerApp,
sa_session: galaxy_scoped_session,
task_user_id: Optional[int] = None,
):
tool = cached_create_tool_from_representation(app=app, raw_tool_source=raw_tool_source)
job = sa_session.get(Job, job_id)
assert job
# TODO: assert state ?
mini_job_wrapper = MinimalJobWrapper(job=job, app=app, tool=tool)
mini_job_wrapper.finish("", "")
[docs]def is_aborted(session: galaxy_scoped_session, job_id: int):
return session.execute(
select(
exists(model.Job.state).where(
model.Job.id == job_id,
model.Job.state.in_([model.Job.states.DELETED, model.Job.states.DELETING]),
)
)
).scalar()
[docs]def abort_when_job_stops(function: Callable, session: galaxy_scoped_session, job_id: int, **kwargs) -> Any:
if not is_aborted(session, job_id):
future = celery_app.fork_pool.submit(
function,
timeout=None,
**kwargs,
)
while True:
try:
return future.result(timeout=1)
except TimeoutError:
if is_aborted(session, job_id):
future.cancel()
return
def _fetch_data(setup_return):
tool_job_working_directory, request_path, file_sources_dict = setup_return
working_directory = Path(tool_job_working_directory) / "working"
datatypes_registry = DatatypesRegistry()
datatypes_registry.load_datatypes(
galaxy_directory,
config=Path(tool_job_working_directory) / "metadata" / "registry.xml",
use_build_sites=False,
use_converters=False,
use_display_applications=False,
)
do_fetch(
request_path=request_path,
working_directory=str(working_directory),
registry=datatypes_registry,
file_sources_dict=file_sources_dict,
)
return tool_job_working_directory
@galaxy_task(action="Run fetch_data")
def fetch_data(
setup_return,
job_id: int,
app: MinimalManagerApp,
sa_session: galaxy_scoped_session,
task_user_id: Optional[int] = None,
) -> str:
job = sa_session.get(Job, job_id)
assert job
mini_job_wrapper = MinimalJobWrapper(job=job, app=app)
mini_job_wrapper.change_state(model.Job.states.RUNNING, flush=True, job=job)
return abort_when_job_stops(_fetch_data, session=sa_session, job_id=job_id, setup_return=setup_return)
@galaxy_task(ignore_result=True, action="setting up export history job")
def export_history(
model_store_manager: ModelStoreManager,
request: SetupHistoryExportJob,
task_user_id: Optional[int] = None,
):
model_store_manager.setup_history_export_job(request)
@galaxy_task(action="preparing compressed file for collection download")
def prepare_dataset_collection_download(
request: PrepareDatasetCollectionDownload,
collection_manager: DatasetCollectionManager,
task_user_id: Optional[int] = None,
):
"""Create a short term storage file tracked and available for download of target collection."""
collection_manager.write_dataset_collection(request)
@galaxy_task(action="preparing Galaxy Markdown PDF for download")
def prepare_pdf_download(
request: GeneratePdfDownload,
config: GalaxyAppConfiguration,
short_term_storage_monitor: ShortTermStorageMonitor,
task_user_id: Optional[int] = None,
):
"""Create a short term storage file tracked and available for download of target PDF for Galaxy Markdown."""
generate_branded_pdf(request, config, short_term_storage_monitor)
@galaxy_task(action="generate and stage a history model store for download")
def prepare_history_download(
model_store_manager: ModelStoreManager,
request: GenerateHistoryDownload,
task_user_id: Optional[int] = None,
):
model_store_manager.prepare_history_download(request)
@galaxy_task(action="generate and stage a history content model store for download")
def prepare_history_content_download(
model_store_manager: ModelStoreManager,
request: GenerateHistoryContentDownload,
task_user_id: Optional[int] = None,
):
model_store_manager.prepare_history_content_download(request)
@galaxy_task(action="generate and stage a workflow invocation store for download")
def prepare_invocation_download(
model_store_manager: ModelStoreManager,
request: GenerateInvocationDownload,
task_user_id: Optional[int] = None,
):
model_store_manager.prepare_invocation_download(request)
@galaxy_task(action="generate and stage a workflow invocation store to file source URI")
def write_invocation_to(
model_store_manager: ModelStoreManager,
request: WriteInvocationTo,
task_user_id: Optional[int] = None,
):
model_store_manager.write_invocation_to(request)
@galaxy_task(action="generate and stage a history store to file source URI")
def write_history_to(
model_store_manager: ModelStoreManager,
request: WriteHistoryTo,
task_user_id: Optional[int] = None,
):
model_store_manager.write_history_to(request)
@galaxy_task(action="generate and stage a history content model store to file source URI")
def write_history_content_to(
model_store_manager: ModelStoreManager,
request: WriteHistoryContentTo,
task_user_id: Optional[int] = None,
):
model_store_manager.write_history_content_to(request)
@galaxy_task(action="import objects from a target model store")
def import_model_store(
model_store_manager: ModelStoreManager,
request: ImportModelStoreTaskRequest,
task_user_id: Optional[int] = None,
):
model_store_manager.import_model_store(request)
@galaxy_task(action="compute dataset hash and store in database")
def compute_dataset_hash(
dataset_manager: DatasetManager,
request: ComputeDatasetHashTaskRequest,
task_user_id: Optional[int] = None,
):
dataset_manager.compute_hash(request)
@galaxy_task(action="import a data bundle")
def import_data_bundle(
app: MinimalManagerApp,
hda_manager: HDAManager,
ldda_manager: LDDAManager,
tool_data_import_manager: ToolDataImportManager,
config: GalaxyAppConfiguration,
src: str,
uri: Optional[str] = None,
id: Optional[int] = None,
tool_data_file_path: Optional[str] = None,
task_user_id: Optional[int] = None,
):
setup_data_table_manager(app)
if src == "uri":
assert uri
tool_data_import_manager.import_data_bundle_by_uri(config, uri, tool_data_file_path=tool_data_file_path)
else:
assert id
dataset: model.DatasetInstance
if src == "hda":
dataset = hda_manager.by_id(id)
else:
dataset = ldda_manager.by_id(id)
tool_data_import_manager.import_data_bundle_by_dataset(config, dataset, tool_data_file_path=tool_data_file_path)
queue_worker = GalaxyQueueWorker(app)
queue_worker.send_control_task("reload_tool_data_tables")
@galaxy_task(action="pruning history audit table")
def prune_history_audit_table(sa_session: galaxy_scoped_session):
"""Prune ever growing history_audit table."""
model.HistoryAudit.prune(sa_session)
@galaxy_task(action="clean up short term storage")
def cleanup_short_term_storage(storage_monitor: ShortTermStorageMonitor):
"""Cleanup short term storage."""
storage_monitor.cleanup()
@galaxy_task(action="clean up expired notifications")
def cleanup_expired_notifications(notification_manager: NotificationManager):
"""Cleanup expired notifications."""
result = notification_manager.cleanup_expired_notifications()
log.info(
f"Successfully deleted {result.deleted_notifications_count} notifications and {result.deleted_associations_count} associations."
)
@galaxy_task(action="prune object store cache directories")
def clean_object_store_caches(object_store: BaseObjectStore):
check_caches(object_store.cache_targets())
@galaxy_task(action="send notifications to all recipients")
def send_notification_to_recipients_async(
request: NotificationCreateRequest, notification_manager: NotificationManager
):
"""Send a notification to a list of users."""
_, notifications_sent = notification_manager.send_notification_to_recipients(request=request)
log.info(f"Successfully sent {notifications_sent} notifications.")
@galaxy_task(action="dispatch pending notifications")
def dispatch_pending_notifications(notification_manager: NotificationManager):
"""Dispatch pending notifications."""
count = notification_manager.dispatch_pending_notifications_via_channels()
if count:
log.info(f"Successfully dispatched {count} notifications.")