Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.job_files

""" API for asynchronous job running mechanisms can use to fetch or put files
related to running and queued jobs.
"""
import logging
import os
import shutil

from galaxy import (
    exceptions,
    model,
    util
)
from galaxy.web import (
    _future_expose_api_anonymous_and_sessionless as expose_api_anonymous_and_sessionless,
    _future_expose_api_raw_anonymous_and_sessionless as expose_api_raw_anonymous_and_sessionless
)
from galaxy.web.base.controller import BaseAPIController

log = logging.getLogger(__name__)


[docs]class JobFilesAPIController(BaseAPIController): """ This job files controller allows remote job running mechanisms to read and modify the current state of files for queued and running jobs. It is certainly not meant to represent part of Galaxy's stable, user facing API. Furthermore, even if a user key corresponds to the user running the job, it should not be accepted for authorization - this API allows access to low-level unfiltered files and such authorization would break Galaxy's security model for tool execution. """
[docs] @expose_api_raw_anonymous_and_sessionless def index(self, trans, job_id, **kwargs): """ index( self, trans, job_id, **kwargs ) * GET /api/jobs/{job_id}/files Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory files). :type job_id: str :param job_id: encoded id string of the job :type path: str :param path: Path to file. :type job_key: str :param job_key: A key used to authenticate this request as acting on behalf or a job runner for the specified job. ..note: This API method is intended only for consumption by job runners, not end users. :rtype: binary :returns: contents of file """ self.__authorize_job_access(trans, job_id, **kwargs) path = kwargs.get("path", None) return open(path, 'rb')
[docs] @expose_api_anonymous_and_sessionless def create(self, trans, job_id, payload, **kwargs): """ create( self, trans, job_id, payload, **kwargs ) * POST /api/jobs/{job_id}/files Populate an output file (formal dataset, task split part, working directory file (such as those related to metadata)). This should be a multipart post with a 'file' parameter containing the contents of the actual file to create. :type job_id: str :param job_id: encoded id string of the job :type payload: dict :param payload: dictionary structure containing:: 'job_key' = Key authenticating 'path' = Path to file to create. ..note: This API method is intended only for consumption by job runners, not end users. :rtype: dict :returns: an okay message """ job = self.__authorize_job_access(trans, job_id, **payload) path = payload.get("path") self.__check_job_can_write_to_path(trans, job, path) # Is this writing an unneeded file? Should this just copy in Python? if '__file_path' in payload: file_path = payload.get('__file_path') upload_store = trans.app.config.nginx_upload_job_files_store assert upload_store, ("Request appears to have been processed by" " nginx_upload_module but Galaxy is not" " configured to recognize it") assert file_path.startswith(upload_store), \ ("Filename provided by nginx (%s) is not in correct" " directory (%s)" % (file_path, upload_store)) input_file = open(file_path) else: input_file = payload.get("file", payload.get("__file", None)).file try: shutil.move(input_file.name, path) finally: try: input_file.close() except OSError: # Fails to close file if not using nginx upload because the # tempfile has moved and Python wants to delete it. pass return {"message": "ok"}
def __authorize_job_access(self, trans, encoded_job_id, **kwargs): for key in ["path", "job_key"]: if key not in kwargs: error_message = "Job files action requires a valid '%s'." % key raise exceptions.ObjectAttributeMissingException(error_message) job_id = trans.security.decode_id(encoded_job_id) job_key = trans.security.encode_id(job_id, kind="jobs_files") if not util.safe_str_cmp(kwargs["job_key"], job_key): raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") # Verify job is active. Don't update the contents of complete jobs. job = trans.sa_session.query(model.Job).get(job_id) if job.finished: error_message = "Attempting to read or modify the files of a job that has already completed." raise exceptions.ItemAccessibilityException(error_message) return job def __check_job_can_write_to_path(self, trans, job, path): """ Verify an idealized job runner should actually be able to write to the specified path - it must be a dataset output, a dataset "extra file", or a some place in the working directory of this job. Would like similar checks for reading the unstructured nature of loc files make this very difficult. (See abandoned work here https://gist.github.com/jmchilton/9103619.) """ in_work_dir = self.__in_working_directory(job, path, trans.app) allow_temp_dir_file = self.__is_allowed_temp_dir_file(trans.app, job, path) if not in_work_dir and not allow_temp_dir_file and not self.__is_output_dataset_path(job, path): raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") def __is_allowed_temp_dir_file(self, app, job, path): # grrr.. need to get away from new_file_path - these should be written # to job working directory like metadata files. in_temp_dir = util.in_directory(path, app.config.new_file_path) return in_temp_dir and os.path.split(path)[-1].startswith("GALAXY_VERSION_") def __is_output_dataset_path(self, job, path): """ Check if is an output path for this job or a file in the an output's extra files path. """ da_lists = [job.output_datasets, job.output_library_datasets] for da_list in da_lists: for job_dataset_association in da_list: dataset = job_dataset_association.dataset if not dataset: continue if os.path.abspath(dataset.file_name) == os.path.abspath(path): return True elif util.in_directory(path, dataset.extra_files_path): return True return False def __in_working_directory(self, job, path, app): working_directory = app.object_store.get_filename(job, base_dir='job_work', dir_only=True, extra_dir=str(job.id)) return util.in_directory(path, working_directory)