This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.job_execution.ports.view

import logging

from galaxy import (
from galaxy.exceptions import (

log = logging.getLogger(__name__)

[docs]class JobPortsView:
[docs] def __init__(self, app): self._app = app
[docs] def register_container_information(self, job_id, **kwd): job = self.__authorize_job_access(job_id, **kwd) container_runtime = kwd.get("container_runtime") log.info(kwd) self._app.interactivetool_manager.configure_entry_points(job, container_runtime) return {"message": "ok"}
# Copy/paste from JobFilesView - TODO: de-duplicate. def __authorize_job_access(self, encoded_job_id, **kwargs): if (key := "job_key") not in kwargs: error_message = f"Job files action requires a valid '{key}'." raise ObjectAttributeMissingException(error_message) job_id = self._security.decode_id(encoded_job_id) job_key = self._security.encode_id(job_id, kind="jobs_files") if not util.safe_str_cmp(kwargs["job_key"], job_key): raise ItemAccessibilityException("Invalid job_key supplied.") # Verify job is active. Don't update the contents of complete jobs. sa_session = self._app.model.session job = sa_session.query(model.Job).get(job_id) if not job.running: error_message = "Attempting to read or modify the files of a job that has already completed." raise ItemAccessibilityException(error_message) return job @property def _security(self): return self._app.security