Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.services.jobs

from enum import Enum
from typing import (
    Any,
    Dict,
)

from galaxy import (
    exceptions,
    model,
)
from galaxy.managers import hdas
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.jobs import (
    JobManager,
    JobSearch,
    view_show_job,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.schema import JobIndexQueryPayload


[docs]class JobIndexViewEnum(str, Enum): collection = "collection" admin_job_list = "admin_job_list"
[docs]class JobIndexPayload(JobIndexQueryPayload): view: JobIndexViewEnum = JobIndexViewEnum.collection
[docs]class JobsService: job_manager: JobManager job_search: JobSearch hda_manager: hdas.HDAManager
[docs] def __init__( self, job_manager: JobManager, job_search: JobSearch, hda_manager: hdas.HDAManager, ): self.job_manager = job_manager self.job_search = job_search self.hda_manager = hda_manager
[docs] def show( self, trans: ProvidesUserContext, id: DecodedDatabaseIdField, full: bool = False, ) -> Dict[str, Any]: job = self.job_manager.get_accessible_job(trans, id) return view_show_job(trans, job, bool(full))
[docs] def index( self, trans: ProvidesUserContext, payload: JobIndexPayload, ): security = trans.security is_admin = trans.user_is_admin if payload.view == JobIndexViewEnum.admin_job_list: payload.user_details = True user_details = payload.user_details if payload.view == JobIndexViewEnum.admin_job_list and not is_admin: raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view") query = self.job_manager.index_query(trans, payload) out = [] view = payload.view for job in query.yield_per(model.YIELD_PER_ROWS): job_dict = job.to_dict(view, system_details=is_admin) j = security.encode_all_ids(job_dict, True) if view == JobIndexViewEnum.admin_job_list: j["decoded_job_id"] = job.id if user_details: j["user_email"] = job.get_user_email() out.append(j) return out